aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c667
-rw-r--r--src/core/ext/filters/client_channel/client_channel_plugin.c3
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.c17
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h7
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c3
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c45
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c38
-rw-r--r--src/core/ext/filters/client_channel/resolver.c35
-rw-r--r--src/core/ext/filters/client_channel/resolver.h14
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c2
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c19
-rw-r--r--src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c4
-rw-r--r--src/core/ext/filters/client_channel/subchannel.c24
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h2
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.c35
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_plugin.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c69
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_rst_stream.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.c14
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h6
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c12
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c87
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c52
26 files changed, 649 insertions, 519 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index f29c5d55ed..de516ab4c9 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -178,8 +178,8 @@ typedef struct client_channel_channel_data {
grpc_slice_hash_table *method_params_table;
/** incoming resolver result - set by resolver.next() */
grpc_channel_args *resolver_result;
- /** a list of closures that are all waiting for config to come in */
- grpc_closure_list waiting_for_config_closures;
+ /** a list of closures that are all waiting for resolver result to come in */
+ grpc_closure_list waiting_for_resolver_result_closures;
/** resolver callback */
grpc_closure on_resolver_result_changed;
/** connectivity state being tracked */
@@ -342,49 +342,15 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
}
}
-// Wrap a closure associated with \a lb_policy. The associated callback (\a
-// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after
-// scheduling \a wrapped_closure.
-typedef struct wrapped_on_pick_closure_arg {
- /* the closure instance using this struct as argument */
- grpc_closure wrapper_closure;
-
- /* the original closure. Usually a on_complete/notify cb for pick() and ping()
- * calls against the internal RR instance, respectively. */
- grpc_closure *wrapped_closure;
-
- /* The policy instance related to the closure */
- grpc_lb_policy *lb_policy;
-} wrapped_on_pick_closure_arg;
-
-// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free \a arg.
-static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- wrapped_on_pick_closure_arg *wc_arg = arg;
- GPR_ASSERT(wc_arg != NULL);
- GPR_ASSERT(wc_arg->wrapped_closure != NULL);
- GPR_ASSERT(wc_arg->lb_policy != NULL);
- GRPC_CLOSURE_RUN(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
- GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping");
- gpr_free(wc_arg);
-}
-
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
channel_data *chand = arg;
+ // Extract the following fields from the resolver result, if non-NULL.
char *lb_policy_name = NULL;
- grpc_lb_policy *lb_policy = NULL;
- grpc_lb_policy *old_lb_policy = NULL;
- grpc_slice_hash_table *method_params_table = NULL;
- grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- bool exit_idle = false;
- grpc_error *state_error =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
+ grpc_lb_policy *new_lb_policy = NULL;
char *service_config_json = NULL;
- service_config_parsing_state parsing_state;
- memset(&parsing_state, 0, sizeof(parsing_state));
-
- bool lb_policy_updated = false;
+ grpc_server_retry_throttle_data *retry_throttle_data = NULL;
+ grpc_slice_hash_table *method_params_table = NULL;
if (chand->resolver_result != NULL) {
// Find LB policy name.
const grpc_arg *channel_arg =
@@ -419,32 +385,29 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (lb_policy_name == NULL) lb_policy_name = "pick_first";
- // Instantiate LB policy.
grpc_lb_policy_args lb_policy_args;
lb_policy_args.args = chand->resolver_result;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy_args.combiner = chand->combiner;
-
+ // Check to see if we're already using the right LB policy.
+ // Note: It's safe to use chand->info_lb_policy_name here without
+ // taking a lock on chand->info_mu, because this function is the
+ // only thing that modifies its value, and it can only be invoked
+ // once at any given time.
const bool lb_policy_type_changed =
- (chand->info_lb_policy_name == NULL) ||
- (strcmp(chand->info_lb_policy_name, lb_policy_name) != 0);
+ chand->info_lb_policy_name == NULL ||
+ strcmp(chand->info_lb_policy_name, lb_policy_name) != 0;
if (chand->lb_policy != NULL && !lb_policy_type_changed) {
- // update
- lb_policy_updated = true;
+ // Continue using the same LB policy. Update with new addresses.
grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
} else {
- lb_policy =
+ // Instantiate new LB policy.
+ new_lb_policy =
grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
- if (lb_policy != NULL) {
- GRPC_LB_POLICY_REF(lb_policy, "config_change");
- GRPC_ERROR_UNREF(state_error);
- state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
- &state_error);
- old_lb_policy = chand->lb_policy;
- chand->lb_policy = lb_policy;
+ if (new_lb_policy == NULL) {
+ gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
}
}
-
// Find service config.
channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
@@ -461,12 +424,14 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_uri *uri =
grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
+ service_config_parsing_state parsing_state;
+ memset(&parsing_state, 0, sizeof(parsing_state));
parsing_state.server_name =
uri->path[0] == '/' ? uri->path + 1 : uri->path;
grpc_service_config_parse_global_params(
service_config, parse_retry_throttle_params, &parsing_state);
- parsing_state.server_name = NULL;
grpc_uri_destroy(uri);
+ retry_throttle_data = parsing_state.retry_throttle_data;
method_params_table = grpc_service_config_create_method_config_table(
exec_ctx, service_config, method_parameters_create_from_json,
method_parameters_free);
@@ -480,12 +445,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
chand->resolver_result = NULL;
}
-
- if (lb_policy != NULL) {
- grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
- chand->interested_parties);
- }
-
+ // Now swap out fields in chand. Note that the new values may still
+ // be NULL if (e.g.) the resolver failed to return results or the
+ // results did not contain the necessary data.
+ //
+ // First, swap out the data used by cc_get_channel_info().
gpr_mu_lock(&chand->info_mu);
if (lb_policy_name != NULL) {
gpr_free(chand->info_lb_policy_name);
@@ -496,75 +460,77 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
chand->info_service_config_json = service_config_json;
}
gpr_mu_unlock(&chand->info_mu);
-
+ // Swap out the retry throttle data.
if (chand->retry_throttle_data != NULL) {
grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
}
- chand->retry_throttle_data = parsing_state.retry_throttle_data;
+ chand->retry_throttle_data = retry_throttle_data;
+ // Swap out the method params table.
if (chand->method_params_table != NULL) {
grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
chand->method_params_table = method_params_table;
- if (lb_policy != NULL) {
- GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures);
- } else if (chand->resolver == NULL /* disconnected */) {
- grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Channel disconnected", &error, 1));
- GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures);
- }
- if (!lb_policy_updated && lb_policy != NULL &&
- chand->exit_idle_when_lb_policy_arrives) {
- GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
- exit_idle = true;
- chand->exit_idle_when_lb_policy_arrives = false;
- }
-
- if (error == GRPC_ERROR_NONE && chand->resolver) {
- if (!lb_policy_updated) {
- set_channel_connectivity_state_locked(exec_ctx, chand, state,
- GRPC_ERROR_REF(state_error),
- "new_lb+resolver");
- if (lb_policy != NULL) {
- watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
- }
+ // If we have a new LB policy or are shutting down (in which case
+ // new_lb_policy will be NULL), swap out the LB policy, unreffing the
+ // old one and removing its fds from chand->interested_parties.
+ // Note that we do NOT do this if either (a) we updated the existing
+ // LB policy above or (b) we failed to create the new LB policy (in
+ // which case we want to continue using the most recent one we had).
+ if (new_lb_policy != NULL || error != GRPC_ERROR_NONE ||
+ chand->resolver == NULL) {
+ if (chand->lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ chand->lb_policy->interested_parties,
+ chand->interested_parties);
+ GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next_locked(exec_ctx, chand->resolver,
- &chand->resolver_result,
- &chand->on_resolver_result_changed);
- } else {
+ chand->lb_policy = new_lb_policy;
+ }
+ // Now that we've swapped out the relevant fields of chand, check for
+ // error or shutdown.
+ if (error != GRPC_ERROR_NONE || chand->resolver == NULL) {
if (chand->resolver != NULL) {
grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
- grpc_error *refs[] = {error, state_error};
set_channel_connectivity_state_locked(
exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Got config after disconnection", refs, GPR_ARRAY_SIZE(refs)),
+ "Got resolver result after disconnection", &error, 1),
"resolver_gone");
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
+ grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Channel disconnected", &error, 1));
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx,
+ &chand->waiting_for_resolver_result_closures);
+ } else { // Not shutting down.
+ grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ grpc_error *state_error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
+ if (new_lb_policy != NULL) {
+ GRPC_ERROR_UNREF(state_error);
+ state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy,
+ &state_error);
+ grpc_pollset_set_add_pollset_set(exec_ctx,
+ new_lb_policy->interested_parties,
+ chand->interested_parties);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx,
+ &chand->waiting_for_resolver_result_closures);
+ if (chand->exit_idle_when_lb_policy_arrives) {
+ grpc_lb_policy_exit_idle_locked(exec_ctx, new_lb_policy);
+ chand->exit_idle_when_lb_policy_arrives = false;
+ }
+ watch_lb_policy_locked(exec_ctx, chand, new_lb_policy, state);
+ }
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
+ grpc_resolver_next_locked(exec_ctx, chand->resolver,
+ &chand->resolver_result,
+ &chand->on_resolver_result_changed);
+ GRPC_ERROR_UNREF(state_error);
}
-
- if (!lb_policy_updated && lb_policy != NULL && exit_idle) {
- grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy);
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
- }
-
- if (old_lb_policy != NULL) {
- grpc_pollset_set_del_pollset_set(
- exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
- GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
- old_lb_policy = NULL;
- }
-
- if (!lb_policy_updated && lb_policy != NULL) {
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
- }
-
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
- GRPC_ERROR_UNREF(state_error);
}
static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -602,9 +568,10 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
if (!chand->started_resolving) {
- grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
+ grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
GRPC_ERROR_REF(op->disconnect_with_error));
- GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx,
+ &chand->waiting_for_resolver_result_closures);
}
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
@@ -770,6 +737,16 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
* PER-CALL FUNCTIONS
*/
+// Max number of batches that can be pending on a call at any given
+// time. This includes:
+// recv_initial_metadata
+// send_initial_metadata
+// recv_message
+// send_message
+// recv_trailing_metadata
+// send_trailing_metadata
+#define MAX_WAITING_BATCHES 6
+
/** Call data. Holds a pointer to grpc_subchannel_call and the
associated machinery to create such a pointer.
Handles queueing of stream ops until a call object is ready, waiting
@@ -800,11 +777,10 @@ typedef struct client_channel_call_data {
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
grpc_polling_entity *pollent;
- grpc_transport_stream_op_batch **waiting_ops;
- size_t waiting_ops_count;
- size_t waiting_ops_capacity;
+ grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES];
+ size_t waiting_for_pick_batches_count;
- grpc_closure next_step;
+ grpc_transport_stream_op_batch_payload *initial_metadata_payload;
grpc_call_stack *owning_call;
@@ -853,57 +829,44 @@ grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
return get_call_or_error(call_elem->call_data).subchannel_call;
}
-static void add_waiting_locked(call_data *calld,
- grpc_transport_stream_op_batch *op) {
- GPR_TIMER_BEGIN("add_waiting_locked", 0);
- if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
- calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity);
- calld->waiting_ops =
- gpr_realloc(calld->waiting_ops,
- calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
- }
- calld->waiting_ops[calld->waiting_ops_count++] = op;
- GPR_TIMER_END("add_waiting_locked", 0);
+static void waiting_for_pick_batches_add_locked(
+ call_data *calld, grpc_transport_stream_op_batch *batch) {
+ GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES);
+ calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] =
+ batch;
}
-static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
- grpc_error *error) {
- size_t i;
- for (i = 0; i < calld->waiting_ops_count; i++) {
+static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
+ call_data *calld,
+ grpc_error *error) {
+ for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
+ exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error));
}
- calld->waiting_ops_count = 0;
+ calld->waiting_for_pick_batches_count = 0;
GRPC_ERROR_UNREF(error);
}
-static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
- if (calld->waiting_ops_count == 0) {
- return;
- }
-
- call_or_error call = get_call_or_error(calld);
- grpc_transport_stream_op_batch **ops = calld->waiting_ops;
- size_t nops = calld->waiting_ops_count;
- if (call.error != GRPC_ERROR_NONE) {
- fail_locked(exec_ctx, calld, GRPC_ERROR_REF(call.error));
+static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx,
+ call_data *calld) {
+ if (calld->waiting_for_pick_batches_count == 0) return;
+ call_or_error coe = get_call_or_error(calld);
+ if (coe.error != GRPC_ERROR_NONE) {
+ waiting_for_pick_batches_fail_locked(exec_ctx, calld,
+ GRPC_ERROR_REF(coe.error));
return;
}
- calld->waiting_ops = NULL;
- calld->waiting_ops_count = 0;
- calld->waiting_ops_capacity = 0;
- for (size_t i = 0; i < nops; i++) {
- grpc_subchannel_call_process_op(exec_ctx, call.subchannel_call, ops[i]);
+ for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call,
+ calld->waiting_for_pick_batches[i]);
}
- gpr_free(ops);
+ calld->waiting_for_pick_batches_count = 0;
}
-// Sets calld->method_params and calld->retry_throttle_data.
-// If the method params specify a timeout, populates
-// *per_method_deadline and returns true.
-static bool set_call_method_params_from_service_config_locked(
- grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- gpr_timespec *per_method_deadline) {
+// Applies service config to the call. Must be invoked once we know
+// that the resolver has returned results to the channel.
+static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
if (chand->retry_throttle_data != NULL) {
@@ -915,39 +878,48 @@ static bool set_call_method_params_from_service_config_locked(
exec_ctx, chand->method_params_table, calld->path);
if (calld->method_params != NULL) {
method_parameters_ref(calld->method_params);
- if (gpr_time_cmp(calld->method_params->timeout,
+ // If the deadline from the service config is shorter than the one
+ // from the client API, reset the deadline timer.
+ if (chand->deadline_checking_enabled &&
+ gpr_time_cmp(calld->method_params->timeout,
gpr_time_0(GPR_TIMESPAN)) != 0) {
- *per_method_deadline =
+ const gpr_timespec per_method_deadline =
gpr_time_add(calld->call_start_time, calld->method_params->timeout);
- return true;
+ if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
+ calld->deadline = per_method_deadline;
+ grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
+ }
}
}
}
- return false;
}
-static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- /* apply service-config level configuration to the call (now that we're
- * certain it exists) */
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_timespec per_method_deadline;
- if (set_call_method_params_from_service_config_locked(exec_ctx, elem,
- &per_method_deadline)) {
- // If the deadline from the service config is shorter than the one
- // from the client API, reset the deadline timer.
- if (chand->deadline_checking_enabled &&
- gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
- calld->deadline = per_method_deadline;
- grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
- }
+static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
+ call_data *calld, grpc_error *error) {
+ grpc_subchannel_call *subchannel_call = NULL;
+ const grpc_connected_subchannel_call_args call_args = {
+ .pollent = calld->pollent,
+ .path = calld->path,
+ .start_time = calld->call_start_time,
+ .deadline = calld->deadline,
+ .arena = calld->arena,
+ .context = calld->subchannel_call_context};
+ grpc_error *new_error = grpc_connected_subchannel_create_call(
+ exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
+ GPR_ASSERT(set_call_or_error(
+ calld, (call_or_error){.subchannel_call = subchannel_call}));
+ if (new_error != GRPC_ERROR_NONE) {
+ new_error = grpc_error_add_child(new_error, error);
+ waiting_for_pick_batches_fail_locked(exec_ctx, calld, new_error);
+ } else {
+ waiting_for_pick_batches_resume_locked(exec_ctx, calld);
}
+ GRPC_ERROR_UNREF(error);
}
-static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
+static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
grpc_error *error) {
- grpc_call_element *elem = arg;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(calld->pick_pending);
@@ -956,6 +928,7 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
chand->interested_parties);
call_or_error coe = get_call_or_error(calld);
if (calld->connected_subchannel == NULL) {
+ // Failed to create subchannel.
grpc_error *failure =
error == GRPC_ERROR_NONE
? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
@@ -963,7 +936,7 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed to create subchannel", &error, 1);
set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)});
- fail_locked(exec_ctx, calld, failure);
+ waiting_for_pick_batches_fail_locked(exec_ctx, calld, failure);
} else if (coe.error != GRPC_ERROR_NONE) {
/* already cancelled before subchannel became ready */
grpc_error *child_errors[] = {error, coe.error};
@@ -977,29 +950,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_DEADLINE_EXCEEDED);
}
- fail_locked(exec_ctx, calld, cancellation_error);
+ waiting_for_pick_batches_fail_locked(exec_ctx, calld, cancellation_error);
} else {
/* Create call on subchannel. */
- grpc_subchannel_call *subchannel_call = NULL;
- const grpc_connected_subchannel_call_args call_args = {
- .pollent = calld->pollent,
- .path = calld->path,
- .start_time = calld->call_start_time,
- .deadline = calld->deadline,
- .arena = calld->arena,
- .context = calld->subchannel_call_context};
- grpc_error *new_error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
- GPR_ASSERT(set_call_or_error(
- calld, (call_or_error){.subchannel_call = subchannel_call}));
- if (new_error != GRPC_ERROR_NONE) {
- new_error = grpc_error_add_child(new_error, error);
- fail_locked(exec_ctx, calld, new_error);
- } else {
- retry_waiting_locked(exec_ctx, calld);
- }
+ create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
}
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
+ GRPC_ERROR_UNREF(error);
}
static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
@@ -1013,41 +970,32 @@ static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
}
}
+/** Return true if subchannel is available immediately (in which case
+ subchannel_ready_locked() should not be called), or false otherwise (in
+ which case subchannel_ready_locked() should be called when the subchannel
+ is available). */
+static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem);
+
typedef struct {
- grpc_metadata_batch *initial_metadata;
- uint32_t initial_metadata_flags;
- grpc_connected_subchannel **connected_subchannel;
- grpc_call_context_element *subchannel_call_context;
- grpc_closure *on_ready;
grpc_call_element *elem;
+ bool cancelled;
grpc_closure closure;
-} continue_picking_args;
+} pick_after_resolver_result_args;
-/** Return true if subchannel is available immediately (in which case on_ready
- should not be called), or false otherwise (in which case on_ready should be
- called when the subchannel is available). */
-static bool pick_subchannel_locked(
- grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
- grpc_connected_subchannel **connected_subchannel,
- grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready);
-
-static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- continue_picking_args *cpa = arg;
- if (cpa->connected_subchannel == NULL) {
+static void continue_picking_after_resolver_result_locked(
+ grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ pick_after_resolver_result_args *args = arg;
+ if (args->cancelled) {
/* cancelled, do nothing */
} else if (error != GRPC_ERROR_NONE) {
- GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error));
+ subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
} else {
- if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata,
- cpa->initial_metadata_flags,
- cpa->connected_subchannel,
- cpa->subchannel_call_context, cpa->on_ready)) {
- GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE);
+ if (pick_subchannel_locked(exec_ctx, args->elem)) {
+ subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE);
}
}
- gpr_free(cpa);
+ gpr_free(args);
}
static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
@@ -1059,39 +1007,85 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
&calld->connected_subchannel,
GRPC_ERROR_REF(error));
}
- for (grpc_closure *closure = chand->waiting_for_config_closures.head;
+ // If we don't yet have a resolver result, then a closure for
+ // continue_picking_after_resolver_result_locked() will have been added to
+ // chand->waiting_for_resolver_result_closures, and it may not be invoked
+ // until after this call has been destroyed. We mark the operation as
+ // cancelled, so that when continue_picking_after_resolver_result_locked()
+ // is called, it will be a no-op. We also immediately invoke
+ // subchannel_ready_locked() to propagate the error back to the caller.
+ for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head;
closure != NULL; closure = closure->next_data.next) {
- continue_picking_args *cpa = closure->cb_arg;
- if (cpa->connected_subchannel == &calld->connected_subchannel) {
- cpa->connected_subchannel = NULL;
- GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Pick cancelled", &error, 1));
+ pick_after_resolver_result_args *args = closure->cb_arg;
+ if (!args->cancelled && args->elem == elem) {
+ args->cancelled = true;
+ subchannel_ready_locked(exec_ctx, elem,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Pick cancelled", &error, 1));
}
}
GRPC_ERROR_UNREF(error);
}
-static bool pick_subchannel_locked(
- grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
- grpc_connected_subchannel **connected_subchannel,
- grpc_call_context_element *subchannel_call_context,
- grpc_closure *on_ready) {
- GPR_TIMER_BEGIN("pick_subchannel", 0);
+// State for pick callback that holds a reference to the LB policy
+// from which the pick was requested.
+typedef struct {
+ grpc_lb_policy *lb_policy;
+ grpc_call_element *elem;
+ grpc_closure closure;
+} pick_callback_args;
+
+// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
+// Unrefs the LB policy after invoking subchannel_ready_locked().
+static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ pick_callback_args *args = arg;
+ GPR_ASSERT(args != NULL);
+ GPR_ASSERT(args->lb_policy != NULL);
+ subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
+ GRPC_LB_POLICY_UNREF(exec_ctx, args->lb_policy, "pick_subchannel");
+ gpr_free(args);
+}
+// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
+// If the pick was completed synchronously, unrefs the LB policy and
+// returns true.
+static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ const grpc_lb_policy_pick_args *inputs) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
+ pick_callback_args *pick_args = gpr_zalloc(sizeof(*pick_args));
+ GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel");
+ pick_args->lb_policy = chand->lb_policy;
+ pick_args->elem = elem;
+ GRPC_CLOSURE_INIT(&pick_args->closure, pick_callback_done_locked, pick_args,
+ grpc_combiner_scheduler(chand->combiner));
+ const bool pick_done = grpc_lb_policy_pick_locked(
+ exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel,
+ calld->subchannel_call_context, NULL, &pick_args->closure);
+ if (pick_done) {
+ /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
+ GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "pick_subchannel");
+ gpr_free(pick_args);
+ }
+ return pick_done;
+}
- GPR_ASSERT(connected_subchannel);
-
+static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ GPR_TIMER_BEGIN("pick_subchannel", 0);
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ bool pick_done = false;
if (chand->lb_policy != NULL) {
- apply_final_configuration_locked(exec_ctx, elem);
- grpc_lb_policy *lb_policy = chand->lb_policy;
- GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
+ apply_service_config_to_call_locked(exec_ctx, elem);
// If the application explicitly set wait_for_ready, use that.
// Otherwise, if the service config specified a value for this
// method, use that.
+ uint32_t initial_metadata_flags =
+ calld->initial_metadata_payload->send_initial_metadata
+ .send_initial_metadata_flags;
const bool wait_for_ready_set_from_api =
initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
@@ -1107,78 +1101,57 @@ static bool pick_subchannel_locked(
}
}
const grpc_lb_policy_pick_args inputs = {
- initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem};
-
- // Wrap the user-provided callback in order to hold a strong reference to
- // the LB policy for the duration of the pick.
- wrapped_on_pick_closure_arg *w_on_pick_arg =
- gpr_zalloc(sizeof(*w_on_pick_arg));
- GRPC_CLOSURE_INIT(&w_on_pick_arg->wrapper_closure,
- wrapped_on_pick_closure_cb, w_on_pick_arg,
- grpc_schedule_on_exec_ctx);
- w_on_pick_arg->wrapped_closure = on_ready;
- GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping");
- w_on_pick_arg->lb_policy = lb_policy;
- const bool pick_done = grpc_lb_policy_pick_locked(
- exec_ctx, lb_policy, &inputs, connected_subchannel,
- subchannel_call_context, NULL, &w_on_pick_arg->wrapper_closure);
- if (pick_done) {
- /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
- GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy,
- "pick_subchannel_wrapping");
- gpr_free(w_on_pick_arg);
+ calld->initial_metadata_payload->send_initial_metadata
+ .send_initial_metadata,
+ initial_metadata_flags, &calld->lb_token_mdelem};
+ pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs);
+ } else if (chand->resolver != NULL) {
+ if (!chand->started_resolving) {
+ chand->started_resolving = true;
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ grpc_resolver_next_locked(exec_ctx, chand->resolver,
+ &chand->resolver_result,
+ &chand->on_resolver_result_changed);
}
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
- GPR_TIMER_END("pick_subchannel", 0);
- return pick_done;
- }
- if (chand->resolver != NULL && !chand->started_resolving) {
- chand->started_resolving = true;
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next_locked(exec_ctx, chand->resolver,
- &chand->resolver_result,
- &chand->on_resolver_result_changed);
- }
- if (chand->resolver != NULL) {
- continue_picking_args *cpa = gpr_malloc(sizeof(*cpa));
- cpa->initial_metadata = initial_metadata;
- cpa->initial_metadata_flags = initial_metadata_flags;
- cpa->connected_subchannel = connected_subchannel;
- cpa->subchannel_call_context = subchannel_call_context;
- cpa->on_ready = on_ready;
- cpa->elem = elem;
- GRPC_CLOSURE_INIT(&cpa->closure, continue_picking_locked, cpa,
+ pick_after_resolver_result_args *args =
+ (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
+ args->elem = elem;
+ GRPC_CLOSURE_INIT(&args->closure,
+ continue_picking_after_resolver_result_locked, args,
grpc_combiner_scheduler(chand->combiner));
- grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
- GRPC_ERROR_NONE);
+ grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
+ &args->closure, GRPC_ERROR_NONE);
} else {
- GRPC_CLOSURE_SCHED(exec_ctx, on_ready,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
+ subchannel_ready_locked(
+ exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
}
-
GPR_TIMER_END("pick_subchannel", 0);
- return false;
+ return pick_done;
}
-static void start_transport_stream_op_batch_locked_inner(
- grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
- grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
+static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error_ignored) {
+ GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
+ grpc_transport_stream_op_batch *op = arg;
+ grpc_call_element *elem = op->handler_private.extra_arg;
call_data *calld = elem->call_data;
-
+ channel_data *chand = elem->channel_data;
/* need to recheck that another thread hasn't set the call */
call_or_error coe = get_call_or_error(calld);
if (coe.error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op, GRPC_ERROR_REF(coe.error));
- /* early out */
- return;
+ goto done;
}
if (coe.subchannel_call != NULL) {
grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
- /* early out */
- return;
+ goto done;
}
+ // Add to waiting-for-pick list. If we succeed in getting a
+ // subchannel call below, we'll handle this batch (along with any
+ // other waiting batches) in waiting_for_pick_batches_resume_locked().
+ waiting_for_pick_batches_add_locked(calld, op);
/* if this is a cancellation, then we can raise our cancelled flag */
if (op->cancel_stream) {
grpc_error *error = op->payload->cancel_stream.cancel_error;
@@ -1190,30 +1163,22 @@ static void start_transport_stream_op_batch_locked_inner(
set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)});
if (calld->pick_pending) {
cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
- } else {
- fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
}
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op,
- GRPC_ERROR_REF(error));
- /* early out */
- return;
+ waiting_for_pick_batches_fail_locked(exec_ctx, calld,
+ GRPC_ERROR_REF(error));
+ goto done;
}
/* if we don't have a subchannel, try to get one */
if (!calld->pick_pending && calld->connected_subchannel == NULL &&
op->send_initial_metadata) {
+ calld->initial_metadata_payload = op->payload;
calld->pick_pending = true;
- GRPC_CLOSURE_INIT(&calld->next_step, subchannel_ready_locked, elem,
- grpc_combiner_scheduler(chand->combiner));
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data's interested_parties, so
that IO of the lb_policy and resolver could be done under it. */
- if (pick_subchannel_locked(
- exec_ctx, elem,
- op->payload->send_initial_metadata.send_initial_metadata,
- op->payload->send_initial_metadata.send_initial_metadata_flags,
- &calld->connected_subchannel, calld->subchannel_call_context,
- &calld->next_step)) {
+ if (pick_subchannel_locked(exec_ctx, elem)) {
+ // Pick was returned synchronously.
calld->pick_pending = false;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
if (calld->connected_subchannel == NULL) {
@@ -1221,42 +1186,20 @@ static void start_transport_stream_op_batch_locked_inner(
"Call dropped by load balancing policy");
set_call_or_error(calld,
(call_or_error){.error = GRPC_ERROR_REF(error)});
- fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- return; // Early out.
+ waiting_for_pick_batches_fail_locked(exec_ctx, calld, error);
+ } else {
+ // Create subchannel call.
+ create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_NONE);
}
} else {
grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
}
}
- /* if we've got a subchannel, then let's ask it to create a call */
- if (!calld->pick_pending && calld->connected_subchannel != NULL) {
- grpc_subchannel_call *subchannel_call = NULL;
- const grpc_connected_subchannel_call_args call_args = {
- .pollent = calld->pollent,
- .path = calld->path,
- .start_time = calld->call_start_time,
- .deadline = calld->deadline,
- .arena = calld->arena,
- .context = calld->subchannel_call_context};
- grpc_error *error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
- GPR_ASSERT(set_call_or_error(
- calld, (call_or_error){.subchannel_call = subchannel_call}));
- if (error != GRPC_ERROR_NONE) {
- fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- } else {
- retry_waiting_locked(exec_ctx, calld);
- /* recurse to retry */
- start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
- }
- /* early out */
- return;
- }
- /* nothing to be done but wait */
- add_waiting_locked(calld, op);
+done:
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
+ "start_transport_stream_op_batch");
+ GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
}
static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
@@ -1279,30 +1222,6 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_ERROR_REF(error));
}
-static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
- void *arg,
- grpc_error *error_ignored) {
- GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
-
- grpc_transport_stream_op_batch *op = arg;
- grpc_call_element *elem = op->handler_private.extra_arg;
- call_data *calld = elem->call_data;
-
- if (op->recv_trailing_metadata) {
- GPR_ASSERT(op->on_complete != NULL);
- calld->original_on_complete = op->on_complete;
- GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem,
- grpc_schedule_on_exec_ctx);
- op->on_complete = &calld->on_complete;
- }
-
- start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
-
- GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
- "start_transport_stream_op_batch");
- GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
-}
-
/* The logic here is fairly complicated, due to (a) the fact that we
need to handle the case where we receive the send op before the
initial metadata op, and (b) the need for efficiency, especially in
@@ -1321,6 +1240,15 @@ static void cc_start_transport_stream_op_batch(
grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
op);
}
+ // Intercept on_complete for recv_trailing_metadata so that we can
+ // check retry throttle status.
+ if (op->recv_trailing_metadata) {
+ GPR_ASSERT(op->on_complete != NULL);
+ calld->original_on_complete = op->on_complete;
+ GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem,
+ grpc_schedule_on_exec_ctx);
+ op->on_complete = &calld->on_complete;
+ }
/* try to (atomically) get the call */
call_or_error coe = get_call_or_error(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
@@ -1390,7 +1318,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
"client_channel_destroy_call");
}
GPR_ASSERT(!calld->pick_pending);
- GPR_ASSERT(calld->waiting_ops_count == 0);
+ GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
if (calld->connected_subchannel != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
"picked");
@@ -1401,7 +1329,6 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
calld->subchannel_call_context[i].value);
}
}
- gpr_free(calld->waiting_ops);
GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.c b/src/core/ext/filters/client_channel/client_channel_plugin.c
index 2c6af1d78e..60e77d6268 100644
--- a/src/core/ext/filters/client_channel/client_channel_plugin.c
+++ b/src/core/ext/filters/client_channel/client_channel_plugin.c
@@ -78,6 +78,9 @@ void grpc_client_channel_init(void) {
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter,
(void *)&grpc_client_channel_filter);
grpc_http_connect_register_handshaker_factory();
+#ifndef NDEBUG
+ grpc_register_tracer("resolver_refcount", &grpc_trace_resolver_refcount);
+#endif
}
void grpc_client_channel_shutdown(void) {
diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c
index 50f8faef8e..8d69ba6af5 100644
--- a/src/core/ext/filters/client_channel/lb_policy.c
+++ b/src/core/ext/filters/client_channel/lb_policy.c
@@ -21,6 +21,10 @@
#define WEAK_REF_BITS 16
+#ifndef NDEBUG
+grpc_tracer_flag grpc_trace_lb_policy_refcount = GRPC_TRACER_INITIALIZER(false);
+#endif
+
void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable,
grpc_combiner *combiner) {
@@ -30,7 +34,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy");
}
-#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
+#ifndef NDEBUG
#define REF_FUNC_EXTRA_ARGS , const char *file, int line, const char *reason
#define REF_MUTATE_EXTRA_ARGS REF_FUNC_EXTRA_ARGS, const char *purpose
#define REF_FUNC_PASS_ARGS(new_reason) , file, line, new_reason
@@ -46,11 +50,12 @@ static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta,
int barrier REF_MUTATE_EXTRA_ARGS) {
gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
: gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
-#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "LB_POLICY: 0x%" PRIxPTR " %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR
- " [%s]",
- (intptr_t)c, purpose, old_val, old_val + delta, reason);
+#ifndef NDEBUG
+ if (GRPC_TRACER_ON(grpc_trace_lb_policy_refcount)) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "LB_POLICY: 0x%p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c,
+ purpose, old_val, old_val + delta, reason);
+ }
#endif
return old_val;
}
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 42503c37ca..645d51e138 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -29,6 +29,10 @@ typedef struct grpc_lb_policy grpc_lb_policy;
typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable;
typedef struct grpc_lb_policy_args grpc_lb_policy_args;
+#ifndef NDEBUG
+extern grpc_tracer_flag grpc_trace_lb_policy_refcount;
+#endif
+
struct grpc_lb_policy {
const grpc_lb_policy_vtable *vtable;
gpr_atm ref_pair;
@@ -96,8 +100,7 @@ struct grpc_lb_policy_vtable {
const grpc_lb_policy_args *args);
};
-//#define GRPC_LB_POLICY_REFCOUNT_DEBUG
-#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
+#ifndef NDEBUG
/* Strong references: the policy will shutdown when they reach zero */
#define GRPC_LB_POLICY_REF(p, r) \
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index e80b1dcd80..5a5ff2902d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -1880,6 +1880,9 @@ static bool maybe_add_client_load_reporting_filter(
void grpc_lb_policy_grpclb_init() {
grpc_register_lb_policy(grpc_glb_lb_factory_create());
grpc_register_tracer("glb", &grpc_lb_glb_trace);
+#ifndef NDEBUG
+ grpc_register_tracer("lb_policy_refcount", &grpc_trace_lb_policy_refcount);
+#endif
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_client_load_reporting_filter,
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index 307e3bad67..d0acd7a901 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -95,6 +95,9 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(p->subchannels);
gpr_free(p->new_subchannels);
gpr_free(p);
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void *)p);
+ }
}
static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
@@ -268,11 +271,20 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
pick_first_lb_policy *p) {
if (p->num_subchannels > 0) {
GPR_ASSERT(p->selected == NULL);
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p unsubscribing from subchannel %p",
+ (void *)p, (void *)p->subchannels[p->checking_subchannel]);
+ }
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
p->updating_subchannels = true;
} else if (p->selected != NULL) {
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG,
+ "Pick First %p unsubscribing from selected subchannel %p",
+ (void *)p, (void *)p->selected);
+ }
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
p->updating_selected = true;
@@ -451,12 +463,25 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_subchannel *selected_subchannel;
pending_pick *pp;
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(
+ GPR_DEBUG,
+ "Pick First %p connectivity changed. Updating selected: %d; Updating "
+ "subchannels: %d; Checking %lu index (%lu total); State: %d; ",
+ (void *)p, p->updating_selected, p->updating_subchannels,
+ (unsigned long)p->checking_subchannel,
+ (unsigned long)p->num_subchannels, p->checking_connectivity);
+ }
bool restart = false;
- if (p->updating_selected && error == GRPC_ERROR_CANCELLED) {
+ if (p->updating_selected && error != GRPC_ERROR_NONE) {
/* Captured the unsubscription for p->selected */
GPR_ASSERT(p->selected != NULL);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
"pf_update_connectivity");
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p unreffing selected subchannel %p",
+ (void *)p, (void *)p->selected);
+ }
p->updating_selected = false;
if (p->num_new_subchannels == 0) {
p->selected = NULL;
@@ -464,12 +489,16 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
restart = true;
}
- if (p->updating_subchannels && error == GRPC_ERROR_CANCELLED) {
+ if (p->updating_subchannels && error != GRPC_ERROR_NONE) {
/* Captured the unsubscription for the checking subchannel */
GPR_ASSERT(p->selected == NULL);
for (size_t i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i],
"pf_update_connectivity");
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p unreffing subchannel %p", (void *)p,
+ (void *)p->subchannels[i]);
+ }
}
gpr_free(p->subchannels);
p->subchannels = NULL;
@@ -481,14 +510,12 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (restart) {
p->selected = NULL;
p->selected_key = NULL;
-
GPR_ASSERT(p->new_subchannels != NULL);
GPR_ASSERT(p->num_new_subchannels > 0);
p->num_subchannels = p->num_new_subchannels;
p->subchannels = p->new_subchannels;
p->num_new_subchannels = 0;
p->new_subchannels = NULL;
-
if (p->started_picking) {
/* If we were picking, continue to do so over the new subchannels,
* starting from the 0th index. */
@@ -542,7 +569,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
"picked_first");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_INFO, "Selected subchannel %p", (void *)p->selected);
+ gpr_log(GPR_INFO,
+ "Pick First %p selected subchannel %p (connected %p)",
+ (void *)p, (void *)selected_subchannel, (void *)p->selected);
}
p->selected_key = grpc_subchannel_get_key(selected_subchannel);
/* drop the pick list: we are connected now */
@@ -568,7 +597,8 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
if (p->checking_subchannel == 0) {
- /* only trigger transient failure when we've tried all alternatives */
+ /* only trigger transient failure when we've tried all alternatives
+ */
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
@@ -652,6 +682,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->client_channel_factory != NULL);
pick_first_lb_policy *p = gpr_zalloc(sizeof(*p));
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p created.", (void *)p);
+ }
pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p,
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 3c8520cc1c..8e9d6b0f47 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -126,6 +126,8 @@ struct rr_subchannel_list {
size_t num_ready;
/** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures;
+ /** how many subchannels are in state SHUTDOWN */
+ size_t num_shutdown;
/** how many subchannels are in state IDLE */
size_t num_idle;
@@ -425,6 +427,9 @@ static void update_state_counters_locked(subchannel_data *sd) {
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(subchannel_list->num_transient_failures > 0);
--subchannel_list->num_transient_failures;
+ } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ GPR_ASSERT(subchannel_list->num_shutdown > 0);
+ --subchannel_list->num_shutdown;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(subchannel_list->num_idle > 0);
--subchannel_list->num_idle;
@@ -433,6 +438,8 @@ static void update_state_counters_locked(subchannel_data *sd) {
++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++subchannel_list->num_transient_failures;
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ ++subchannel_list->num_shutdown;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
++subchannel_list->num_idle;
}
@@ -455,7 +462,8 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
* 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
- * CHECK: p->subchannel_list->num_subchannels = 0.
+ * CHECK: p->subchannel_list->num_shutdown ==
+ * p->subchannel_list->num_subchannels.
*
* 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
@@ -464,37 +472,39 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
* 5) RULE: ALL subchannels are IDLE => policy is IDLE.
* CHECK: p->num_idle == p->subchannel_list->num_subchannels.
*/
+ grpc_connectivity_state new_state = sd->curr_connectivity_state;
rr_subchannel_list *subchannel_list = sd->subchannel_list;
round_robin_lb_policy *p = subchannel_list->policy;
if (subchannel_list->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
- return GRPC_CHANNEL_READY;
+ new_state = GRPC_CHANNEL_READY;
} else if (sd->curr_connectivity_state ==
GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
"rr_connecting");
- return GRPC_CHANNEL_CONNECTING;
- } else if (p->subchannel_list->num_subchannels == 0) { /* 3) SHUTDOWN */
+ new_state = GRPC_CHANNEL_CONNECTING;
+ } else if (p->subchannel_list->num_shutdown ==
+ p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown");
- return GRPC_CHANNEL_SHUTDOWN;
+ new_state = GRPC_CHANNEL_SHUTDOWN;
} else if (subchannel_list->num_transient_failures ==
p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
- return GRPC_CHANNEL_TRANSIENT_FAILURE;
+ new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
} else if (subchannel_list->num_idle ==
p->subchannel_list->num_subchannels) { /* 5) IDLE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, "rr_idle");
- return GRPC_CHANNEL_IDLE;
+ new_state = GRPC_CHANNEL_IDLE;
}
- /* no change */
- return sd->curr_connectivity_state;
+ GRPC_ERROR_UNREF(error);
+ return new_state;
}
static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -571,13 +581,15 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list);
GPR_ASSERT(!sd->subchannel_list->shutting_down);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ const unsigned long num_subchannels =
+ p->subchannel_list != NULL
+ ? (unsigned long)p->subchannel_list->num_subchannels
+ : 0;
gpr_log(GPR_DEBUG,
"[RR %p] phasing out subchannel list %p (size %lu) in favor "
"of %p (size %lu)",
- (void *)p, (void *)p->subchannel_list,
- (unsigned long)p->subchannel_list->num_subchannels,
- (void *)sd->subchannel_list,
- (unsigned long)sd->subchannel_list->num_subchannels);
+ (void *)p, (void *)p->subchannel_list, num_subchannels,
+ (void *)sd->subchannel_list, num_subchannels);
}
if (p->subchannel_list != NULL) {
// dispose of the current subchannel_list
diff --git a/src/core/ext/filters/client_channel/resolver.c b/src/core/ext/filters/client_channel/resolver.c
index 69b1c31e59..de9a8ce41b 100644
--- a/src/core/ext/filters/client_channel/resolver.c
+++ b/src/core/ext/filters/client_channel/resolver.c
@@ -19,6 +19,10 @@
#include "src/core/ext/filters/client_channel/resolver.h"
#include "src/core/lib/iomgr/combiner.h"
+#ifndef NDEBUG
+grpc_tracer_flag grpc_trace_resolver_refcount = GRPC_TRACER_INITIALIZER(false);
+#endif
+
void grpc_resolver_init(grpc_resolver *resolver,
const grpc_resolver_vtable *vtable,
grpc_combiner *combiner) {
@@ -27,25 +31,30 @@ void grpc_resolver_init(grpc_resolver *resolver,
gpr_ref_init(&resolver->refs, 1);
}
-#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
-void grpc_resolver_ref(grpc_resolver *resolver, grpc_closure_list *closure_list,
- const char *file, int line, const char *reason) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p ref %d -> %d %s",
- resolver, (int)resolver->refs.count, (int)resolver->refs.count + 1,
- reason);
+#ifndef NDEBUG
+void grpc_resolver_ref(grpc_resolver *resolver, const char *file, int line,
+ const char *reason) {
+ if (GRPC_TRACER_ON(grpc_trace_resolver_refcount)) {
+ gpr_atm old_refs = gpr_atm_no_barrier_load(&resolver->refs.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "RESOLVER:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", resolver,
+ old_refs, old_refs + 1, reason);
+ }
#else
void grpc_resolver_ref(grpc_resolver *resolver) {
#endif
gpr_ref(&resolver->refs);
}
-#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
-void grpc_resolver_unref(grpc_resolver *resolver,
- grpc_closure_list *closure_list, const char *file,
- int line, const char *reason) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p unref %d -> %d %s",
- resolver, (int)resolver->refs.count, (int)resolver->refs.count - 1,
- reason);
+#ifndef NDEBUG
+void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ const char *file, int line, const char *reason) {
+ if (GRPC_TRACER_ON(grpc_trace_resolver_refcount)) {
+ gpr_atm old_refs = gpr_atm_no_barrier_load(&resolver->refs.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "RESOLVER:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", resolver,
+ old_refs, old_refs - 1, reason);
+ }
#else
void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
#endif
diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h
index c78bb316cb..ae9c8f66fe 100644
--- a/src/core/ext/filters/client_channel/resolver.h
+++ b/src/core/ext/filters/client_channel/resolver.h
@@ -25,6 +25,10 @@
typedef struct grpc_resolver grpc_resolver;
typedef struct grpc_resolver_vtable grpc_resolver_vtable;
+#ifndef NDEBUG
+extern grpc_tracer_flag grpc_trace_resolver_refcount;
+#endif
+
/** \a grpc_resolver provides \a grpc_channel_args objects to its caller */
struct grpc_resolver {
const grpc_resolver_vtable *vtable;
@@ -41,17 +45,17 @@ struct grpc_resolver_vtable {
grpc_channel_args **result, grpc_closure *on_complete);
};
-#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
+#ifndef NDEBUG
#define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_RESOLVER_UNREF(cl, p, r) \
- grpc_resolver_unref((cl), (p), __FILE__, __LINE__, (r))
+#define GRPC_RESOLVER_UNREF(e, p, r) \
+ grpc_resolver_unref((e), (p), __FILE__, __LINE__, (r))
void grpc_resolver_ref(grpc_resolver *policy, const char *file, int line,
const char *reason);
-void grpc_resolver_unref(grpc_resolver *policy, grpc_closure_list *closure_list,
+void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *policy,
const char *file, int line, const char *reason);
#else
#define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p))
-#define GRPC_RESOLVER_UNREF(cl, p, r) grpc_resolver_unref((cl), (p))
+#define GRPC_RESOLVER_UNREF(e, p, r) grpc_resolver_unref((e), (p))
void grpc_resolver_ref(grpc_resolver *policy);
void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *policy);
#endif
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
index eb1a8ab011..386012d2ed 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
@@ -19,8 +19,6 @@
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H
-#include <ares.h>
-
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
index 4e79c44ba3..1ab8295e9e 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
@@ -19,6 +19,8 @@
#include "src/core/lib/iomgr/port.h"
#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET)
+#include <ares.h>
+
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
#include <grpc/support/alloc.h>
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
index 8e73d606aa..56ed4371a9 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
@@ -56,6 +56,10 @@ typedef struct {
// grpc_resolver_next_locked()'s closure.
grpc_channel_args* next_results;
+ // Results to use for the pretended re-resolution in
+ // fake_resolver_channel_saw_error_locked().
+ grpc_channel_args* results_upon_error;
+
// pending next completion, or NULL
grpc_closure* next_completion;
// target result address for next completion
@@ -65,6 +69,7 @@ typedef struct {
static void fake_resolver_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) {
fake_resolver* r = (fake_resolver*)gr;
grpc_channel_args_destroy(exec_ctx, r->next_results);
+ grpc_channel_args_destroy(exec_ctx, r->results_upon_error);
grpc_channel_args_destroy(exec_ctx, r->channel_args);
gpr_free(r);
}
@@ -74,7 +79,9 @@ static void fake_resolver_shutdown_locked(grpc_exec_ctx* exec_ctx,
fake_resolver* r = (fake_resolver*)resolver;
if (r->next_completion != NULL) {
*r->target_result = NULL;
- GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(
+ exec_ctx, r->next_completion,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver Shutdown"));
r->next_completion = NULL;
}
}
@@ -85,15 +92,19 @@ static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx,
*r->target_result =
grpc_channel_args_union(r->next_results, r->channel_args);
grpc_channel_args_destroy(exec_ctx, r->next_results);
+ r->next_results = NULL;
GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
- r->next_results = NULL;
}
}
static void fake_resolver_channel_saw_error_locked(grpc_exec_ctx* exec_ctx,
grpc_resolver* resolver) {
fake_resolver* r = (fake_resolver*)resolver;
+ if (r->next_results == NULL && r->results_upon_error != NULL) {
+ // Pretend we re-resolved.
+ r->next_results = grpc_channel_args_copy(r->results_upon_error);
+ }
fake_resolver_maybe_finish_next_locked(exec_ctx, r);
}
@@ -149,6 +160,10 @@ static void set_response_cb(grpc_exec_ctx* exec_ctx, void* arg,
grpc_channel_args_destroy(exec_ctx, r->next_results);
}
r->next_results = generator->next_response;
+ if (r->results_upon_error != NULL) {
+ grpc_channel_args_destroy(exec_ctx, r->results_upon_error);
+ }
+ r->results_upon_error = grpc_channel_args_copy(generator->next_response);
fake_resolver_maybe_finish_next_locked(exec_ctx, r);
}
diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
index b5d2e5c92b..7b4fe38272 100644
--- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
@@ -73,7 +73,9 @@ static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx,
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
if (r->next_completion != NULL) {
*r->target_result = NULL;
- GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(
+ exec_ctx, r->next_completion,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver Shutdown"));
r->next_completion = NULL;
}
}
diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c
index 0877ef4e67..88157ed738 100644
--- a/src/core/ext/filters/client_channel/subchannel.c
+++ b/src/core/ext/filters/client_channel/subchannel.c
@@ -140,25 +140,13 @@ struct grpc_subchannel_call {
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
grpc_error *error);
-#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+#ifndef NDEBUG
#define REF_REASON reason
-#define REF_LOG(name, p) \
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
- (name), (p), (p)->refs.count, (p)->refs.count + 1, reason)
-#define UNREF_LOG(name, p) \
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
- (name), (p), (p)->refs.count, (p)->refs.count - 1, reason)
#define REF_MUTATE_EXTRA_ARGS \
GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose
#define REF_MUTATE_PURPOSE(x) , file, line, reason, x
#else
#define REF_REASON ""
-#define REF_LOG(name, p) \
- do { \
- } while (0)
-#define UNREF_LOG(name, p) \
- do { \
- } while (0)
#define REF_MUTATE_EXTRA_ARGS
#define REF_MUTATE_PURPOSE(x)
#endif
@@ -207,10 +195,12 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
int barrier REF_MUTATE_EXTRA_ARGS) {
gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
: gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
-#ifdef GRPC_STREAM_REFCOUNT_DEBUG
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "SUBCHANNEL: %p %s 0x%08" PRIxPTR " -> 0x%08" PRIxPTR " [%s]", c,
- purpose, old_val, old_val + delta, reason);
+#ifndef NDEBUG
+ if (GRPC_TRACER_ON(grpc_trace_stream_refcount)) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c,
+ purpose, old_val, old_val + delta, reason);
+ }
#endif
return old_val;
}
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index f38bf42803..6d2abb04df 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -37,7 +37,7 @@ typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
typedef struct grpc_subchannel_key grpc_subchannel_key;
-#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+#ifndef NDEBUG
#define GRPC_SUBCHANNEL_REF(p, r) \
grpc_subchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
index 04cb1d94f8..71a8bc5bec 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.c
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -255,6 +255,23 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
}
}
+static void handle_send_message_batch(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op,
+ bool has_compression_algorithm) {
+ call_data *calld = elem->call_data;
+ if (!skip_compression(elem, op->payload->send_message.send_message->flags,
+ has_compression_algorithm)) {
+ calld->send_op = op;
+ calld->send_length = op->payload->send_message.send_message->length;
+ calld->send_flags = op->payload->send_message.send_message->flags;
+ continue_send_message(exec_ctx, elem);
+ } else {
+ /* pass control down the stack */
+ grpc_call_next_op(exec_ctx, elem, op);
+ }
+}
+
static void compress_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
@@ -307,8 +324,9 @@ static void compress_start_transport_stream_op_batch(
goto retry_send_im;
}
if (cur != INITIAL_METADATA_UNSEEN) {
- grpc_call_next_op(exec_ctx, elem,
- (grpc_transport_stream_op_batch *)cur);
+ handle_send_message_batch(exec_ctx, elem,
+ (grpc_transport_stream_op_batch *)cur,
+ has_compression_algorithm);
}
}
}
@@ -325,17 +343,8 @@ static void compress_start_transport_stream_op_batch(
break;
case HAS_COMPRESSION_ALGORITHM:
case NO_COMPRESSION_ALGORITHM:
- if (!skip_compression(elem,
- op->payload->send_message.send_message->flags,
- cur == HAS_COMPRESSION_ALGORITHM)) {
- calld->send_op = op;
- calld->send_length = op->payload->send_message.send_message->length;
- calld->send_flags = op->payload->send_message.send_message->flags;
- continue_send_message(exec_ctx, elem);
- } else {
- /* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, op);
- }
+ handle_send_message_batch(exec_ctx, elem, op,
+ cur == HAS_COMPRESSION_ALGORITHM);
break;
default:
if (cur & CANCELLED_BIT) {
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
index b0ffdc0cf9..6a8c81445a 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
@@ -23,6 +23,9 @@
void grpc_chttp2_plugin_init(void) {
grpc_register_tracer("http", &grpc_http_trace);
grpc_register_tracer("flowctl", &grpc_flowctl_trace);
+#ifndef NDEBUG
+ grpc_register_tracer("chttp2_refcount", &grpc_trace_chttp2_refcount);
+#endif
}
void grpc_chttp2_plugin_shutdown(void) {}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 0ad63d1af2..6e8eadf7a1 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -52,7 +52,7 @@
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu
#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
-#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
+#define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024)
#define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
#define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
@@ -76,6 +76,10 @@ static bool g_default_keepalive_permit_without_calls =
grpc_tracer_flag grpc_http_trace = GRPC_TRACER_INITIALIZER(false);
grpc_tracer_flag grpc_flowctl_trace = GRPC_TRACER_INITIALIZER(false);
+#ifndef NDEBUG
+grpc_tracer_flag grpc_trace_chttp2_refcount = GRPC_TRACER_INITIALIZER(false);
+#endif
+
static const grpc_transport_vtable vtable;
/* forward declarations of various callbacks that we'll build closures around */
@@ -91,8 +95,9 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *t,
static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
grpc_error *error);
/** Set a transport level setting, and push it to our peer */
-static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_setting_id id, uint32_t value);
+static void queue_setting_update(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_setting_id id, uint32_t value);
static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error);
@@ -212,20 +217,26 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
gpr_free(t);
}
-#ifdef GRPC_CHTTP2_REFCOUNTING_DEBUG
+#ifndef NDEBUG
void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, const char *reason,
const char *file, int line) {
- gpr_log(GPR_DEBUG, "chttp2:unref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]", t,
- t->refs.count, t->refs.count - 1, reason, file, line);
+ if (GRPC_TRACER_ON(grpc_trace_chttp2_refcount)) {
+ gpr_atm val = gpr_atm_no_barrier_load(&t->refs.count);
+ gpr_log(GPR_DEBUG, "chttp2:unref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]",
+ t, val, val - 1, reason, file, line);
+ }
if (!gpr_unref(&t->refs)) return;
destruct_transport(exec_ctx, t);
}
void grpc_chttp2_ref_transport(grpc_chttp2_transport *t, const char *reason,
const char *file, int line) {
- gpr_log(GPR_DEBUG, "chttp2: ref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]", t,
- t->refs.count, t->refs.count + 1, reason, file, line);
+ if (GRPC_TRACER_ON(grpc_trace_chttp2_refcount)) {
+ gpr_atm val = gpr_atm_no_barrier_load(&t->refs.count);
+ gpr_log(GPR_DEBUG, "chttp2: ref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]",
+ t, val, val + 1, reason, file, line);
+ }
gpr_ref(&t->refs);
}
#else
@@ -342,15 +353,16 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* configure http2 the way we like it */
if (is_client) {
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
+ queue_setting_update(exec_ctx, t,
+ GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
}
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
- DEFAULT_WINDOW);
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
- DEFAULT_MAX_HEADER_LIST_SIZE);
- push_setting(exec_ctx, t,
- GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ DEFAULT_WINDOW);
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ DEFAULT_MAX_HEADER_LIST_SIZE);
+ queue_setting_update(exec_ctx, t,
+ GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
t->ping_policy = (grpc_chttp2_repeated_ping_policy){
.max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA,
@@ -517,8 +529,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
int value = grpc_channel_arg_get_integer(
&channel_args->args[i], settings_map[j].integer_options);
if (value >= 0) {
- push_setting(exec_ctx, t, settings_map[j].setting_id,
- (uint32_t)value);
+ queue_setting_update(exec_ctx, t, settings_map[j].setting_id,
+ (uint32_t)value);
}
}
break;
@@ -620,7 +632,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+#ifndef NDEBUG
void grpc_chttp2_stream_ref(grpc_chttp2_stream *s, const char *reason) {
grpc_stream_ref(s->refcount, reason);
}
@@ -929,8 +941,11 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("terminate_writing_with_lock", 0);
}
-static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_setting_id id, uint32_t value) {
+// Dirties an HTTP2 setting to be sent out next time a writing path occurs.
+// If the change needs to occur immediately, manually initiate a write.
+static void queue_setting_update(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_setting_id id, uint32_t value) {
const grpc_chttp2_setting_parameters *sp =
&grpc_chttp2_settings_parameters[id];
uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
@@ -941,7 +956,6 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
t->dirtied_local_settings = 1;
- grpc_chttp2_initiate_write(exec_ctx, t, "push_setting");
}
}
@@ -1403,6 +1417,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
op_payload->recv_initial_metadata.recv_initial_metadata_ready;
s->recv_initial_metadata =
op_payload->recv_initial_metadata.recv_initial_metadata;
+ s->trailing_metadata_available =
+ op_payload->recv_initial_metadata.trailing_metadata_available;
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
}
@@ -2107,8 +2123,8 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_DEBUG, "%s: update initial window size to %d", t->peer_string,
(int)bdp);
}
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
- (uint32_t)bdp);
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ (uint32_t)bdp);
}
static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -2127,8 +2143,8 @@ static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_DEBUG, "%s: update max_frame size to %d", t->peer_string,
(int)frame_size);
}
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
- (uint32_t)frame_size);
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ (uint32_t)frame_size);
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@@ -2726,6 +2742,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s;
+ GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_NONE;
return incoming_byte_stream;
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h
index 0a1fb4d772..0c4e2a91c0 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h
@@ -26,6 +26,10 @@
extern grpc_tracer_flag grpc_http_trace;
extern grpc_tracer_flag grpc_flowctl_trace;
+#ifndef NDEBUG
+extern grpc_tracer_flag grpc_trace_chttp2_refcount;
+#endif
+
grpc_transport *grpc_create_chttp2_transport(
grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
grpc_endpoint *ep, int is_client);
diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
index ccca0f1871..689dc8935c 100644
--- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
+++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
@@ -93,7 +93,7 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(grpc_exec_ctx *exec_ctx,
(((uint32_t)p->reason_bytes[2]) << 8) |
(((uint32_t)p->reason_bytes[3]));
grpc_error *error = GRPC_ERROR_NONE;
- if (reason != GRPC_HTTP2_NO_ERROR || s->header_frames_received < 2) {
+ if (reason != GRPC_HTTP2_NO_ERROR || s->metadata_buffer[1].size == 0) {
char *message;
gpr_asprintf(&message, "Received RST_STREAM with error code %d", reason);
error = grpc_error_set_int(
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
index 28c6632695..a0e748e7b1 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
@@ -608,15 +608,14 @@ void grpc_chttp2_hpack_compressor_set_max_table_size(
void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_compressor *c,
+ grpc_mdelem **extra_headers,
+ size_t extra_headers_size,
grpc_metadata_batch *metadata,
const grpc_encode_header_options *options,
grpc_slice_buffer *outbuf) {
- framer_state st;
- grpc_linked_mdelem *l;
- gpr_timespec deadline;
-
GPR_ASSERT(options->stream_id != 0);
+ framer_state st;
st.seen_regular_header = 0;
st.stream_id = options->stream_id;
st.output = outbuf;
@@ -633,11 +632,14 @@ void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx,
if (c->advertise_table_size_change != 0) {
emit_advertise_table_size_change(c, &st);
}
+ for (size_t i = 0; i < extra_headers_size; ++i) {
+ hpack_enc(exec_ctx, c, *extra_headers[i], &st);
+ }
grpc_metadata_batch_assert_ok(metadata);
- for (l = metadata->list.head; l; l = l->next) {
+ for (grpc_linked_mdelem *l = metadata->list.head; l; l = l->next) {
hpack_enc(exec_ctx, c, l->md, &st);
}
- deadline = metadata->deadline;
+ gpr_timespec deadline = metadata->deadline;
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) {
deadline_enc(exec_ctx, c, deadline, &st);
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.h b/src/core/ext/transport/chttp2/transport/hpack_encoder.h
index 84ab6dde2c..271192f894 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.h
@@ -85,6 +85,8 @@ typedef struct {
void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_compressor *c,
+ grpc_mdelem **extra_headers,
+ size_t extra_headers_size,
grpc_metadata_batch *metadata,
const grpc_encode_header_options *options,
grpc_slice_buffer *outbuf);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 4041b29fec..9fa72ddbdf 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -447,6 +447,7 @@ struct grpc_chttp2_stream {
grpc_metadata_batch *recv_initial_metadata;
grpc_closure *recv_initial_metadata_ready;
+ bool *trailing_metadata_available;
grpc_byte_stream **recv_message;
grpc_closure *recv_message_ready;
grpc_metadata_batch *recv_trailing_metadata;
@@ -748,7 +749,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_start_writing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t);
-#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+#ifndef NDEBUG
#define GRPC_CHTTP2_STREAM_REF(stream, reason) \
grpc_chttp2_stream_ref(stream, reason)
#define GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream, reason) \
@@ -764,8 +765,7 @@ void grpc_chttp2_stream_ref(grpc_chttp2_stream *s);
void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s);
#endif
-//#define GRPC_CHTTP2_REFCOUNTING_DEBUG 1
-#ifdef GRPC_CHTTP2_REFCOUNTING_DEBUG
+#ifndef NDEBUG
#define GRPC_CHTTP2_REF_TRANSPORT(t, r) \
grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__)
#define GRPC_CHTTP2_UNREF_TRANSPORT(cl, t, r) \
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 941260be9a..3c8b470b4f 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -681,9 +681,19 @@ static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx,
t->parser_data = &t->hpack_parser;
switch (s->header_frames_received) {
case 0:
- t->hpack_parser.on_header = on_initial_header;
+ if (t->is_client && t->header_eof) {
+ GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing Trailers-Only"));
+ if (s->trailing_metadata_available != NULL) {
+ *s->trailing_metadata_available = true;
+ }
+ t->hpack_parser.on_header = on_trailing_header;
+ } else {
+ GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing initial_metadata"));
+ t->hpack_parser.on_header = on_initial_header;
+ }
break;
case 1:
+ GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing trailing_metadata"));
t->hpack_parser.on_header = on_trailing_header;
break;
case 2:
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 4db0fbb098..315f2a67a2 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -162,6 +162,20 @@ static uint32_t target_write_size(grpc_chttp2_transport *t) {
return 1024 * 1024;
}
+// Returns true if initial_metadata contains only default headers.
+//
+// TODO(roth): The fact that we hard-code these particular headers here
+// is fairly ugly. Need some better way to know which headers are
+// default, maybe via a bit in the static metadata table?
+static bool is_default_initial_metadata(grpc_metadata_batch *initial_metadata) {
+ int num_default_fields =
+ (initial_metadata->idx.named.status != NULL) +
+ (initial_metadata->idx.named.content_type != NULL) +
+ (initial_metadata->idx.named.grpc_encoding != NULL) +
+ (initial_metadata->idx.named.grpc_accept_encoding != NULL);
+ return (size_t)num_default_fields == initial_metadata->list.count;
+}
+
grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
@@ -218,31 +232,59 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
t->is_client ? "CLIENT" : "SERVER", s->id, sent_initial_metadata,
s->send_initial_metadata != NULL, s->announce_window));
+ grpc_mdelem *extra_headers_for_trailing_metadata[2];
+ size_t num_extra_headers_for_trailing_metadata = 0;
+
/* send initial metadata if it's available */
- if (!sent_initial_metadata && s->send_initial_metadata) {
- grpc_encode_header_options hopt = {
- .stream_id = s->id,
- .is_eof = false,
- .use_true_binary_metadata =
- t->settings
- [GRPC_PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] != 0,
- .max_frame_size = t->settings[GRPC_PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
- .stats = &s->stats.outgoing};
- grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor,
- s->send_initial_metadata, &hopt, &t->outbuf);
+ if (!sent_initial_metadata && s->send_initial_metadata != NULL) {
+ // We skip this on the server side if there is no custom initial
+ // metadata, there are no messages to send, and we are also sending
+ // trailing metadata. This results in a Trailers-Only response,
+ // which is required for retries, as per:
+ // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
+ if (t->is_client || s->fetching_send_message != NULL ||
+ s->flow_controlled_buffer.length != 0 ||
+ s->send_trailing_metadata == NULL ||
+ !is_default_initial_metadata(s->send_initial_metadata)) {
+ grpc_encode_header_options hopt = {
+ .stream_id = s->id,
+ .is_eof = false,
+ .use_true_binary_metadata =
+ t->settings
+ [GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] != 0,
+ .max_frame_size = t->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
+ .stats = &s->stats.outgoing};
+ grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor, NULL, 0,
+ s->send_initial_metadata, &hopt, &t->outbuf);
+ now_writing = true;
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
+ if (!t->is_client) {
+ t->ping_recv_state.last_ping_recv_time =
+ gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ t->ping_recv_state.ping_strikes = 0;
+ }
+ } else {
+ GRPC_CHTTP2_IF_TRACING(
+ gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
+ // When sending Trailers-Only, we need to move the :status and
+ // content-type headers to the trailers.
+ if (s->send_initial_metadata->idx.named.status != NULL) {
+ extra_headers_for_trailing_metadata
+ [num_extra_headers_for_trailing_metadata++] =
+ &s->send_initial_metadata->idx.named.status->md;
+ }
+ if (s->send_initial_metadata->idx.named.content_type != NULL) {
+ extra_headers_for_trailing_metadata
+ [num_extra_headers_for_trailing_metadata++] =
+ &s->send_initial_metadata->idx.named.content_type->md;
+ }
+ }
s->send_initial_metadata = NULL;
s->sent_initial_metadata = true;
sent_initial_metadata = true;
- now_writing = true;
- t->ping_state.pings_before_data_required =
- t->ping_policy.max_pings_without_data;
- if (!t->is_client) {
- t->ping_recv_state.last_ping_recv_time =
- gpr_inf_past(GPR_CLOCK_MONOTONIC);
- t->ping_recv_state.ping_strikes = 0;
- }
}
/* send any window updates */
if (s->announce_window > 0) {
@@ -320,6 +362,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
if (s->send_trailing_metadata != NULL &&
s->fetching_send_message == NULL &&
s->flow_controlled_buffer.length == 0) {
+ GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) {
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true,
&s->stats.outgoing, &t->outbuf);
@@ -337,6 +380,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
.stats = &s->stats.outgoing};
grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor,
+ extra_headers_for_trailing_metadata,
+ num_extra_headers_for_trailing_metadata,
s->send_trailing_metadata, &hopt,
&t->outbuf);
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index ce72fc3d08..29dfa885de 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -766,20 +766,50 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op,
bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
stream_state->state_callback_received[OP_FAILED];
if (is_canceled_or_failed) {
- if (op_id == OP_SEND_INITIAL_METADATA) result = false;
- if (op_id == OP_SEND_MESSAGE) result = false;
- if (op_id == OP_SEND_TRAILING_METADATA) result = false;
- if (op_id == OP_CANCEL_ERROR) result = false;
+ if (op_id == OP_SEND_INITIAL_METADATA) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
+ if (op_id == OP_SEND_MESSAGE) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
+ if (op_id == OP_SEND_TRAILING_METADATA) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
+ if (op_id == OP_CANCEL_ERROR) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
/* already executed */
if (op_id == OP_RECV_INITIAL_METADATA &&
- stream_state->state_op_done[OP_RECV_INITIAL_METADATA])
+ stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
- if (op_id == OP_RECV_MESSAGE &&
- stream_state->state_op_done[OP_RECV_MESSAGE])
+ }
+ if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
+ }
if (op_id == OP_RECV_TRAILING_METADATA &&
- stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
+ stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
+ }
+ /* ON_COMPLETE can be processed if one of the following conditions is met:
+ * 1. the stream failed
+ * 2. the stream is cancelled, and the callback is received
+ * 3. the stream succeeded before cancel is effective
+ * 4. the stream is cancelled, and the stream is never started */
+ if (op_id == OP_ON_COMPLETE &&
+ !(stream_state->state_callback_received[OP_FAILED] ||
+ stream_state->state_callback_received[OP_CANCELED] ||
+ stream_state->state_callback_received[OP_SUCCEEDED] ||
+ !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
} else if (op_id == OP_SEND_INITIAL_METADATA) {
/* already executed */
if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
@@ -868,7 +898,7 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op,
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
} else if (curr_op->recv_message &&
- !stream_state->state_op_done[OP_RECV_MESSAGE]) {
+ !op_state->state_op_done[OP_RECV_MESSAGE]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
} else if (curr_op->cancel_stream &&
@@ -1067,6 +1097,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream failed.");
@@ -1074,6 +1105,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
@@ -1214,8 +1246,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
} else if (stream_op->cancel_stream &&
op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
- CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
if (s->cbs) {
+ CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
bidirectional_stream_cancel(s->cbs);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {