diff options
author | 2017-02-17 06:55:55 -0800 | |
---|---|---|
committer | 2017-02-17 06:55:55 -0800 | |
commit | 2bc4cc4237f57933d00a56064e859151604a7187 (patch) | |
tree | be8bdd224ea30bcb9f2dea68248f53dc6d318578 /src/core/ext | |
parent | a8fd05745bf94f83db750cc37c6411532f199683 (diff) | |
parent | 46ca4f7d4eca6cb69a6d80ce35f58f2325cf945d (diff) |
Merge github.com:grpc/grpc into bm_call_create
Diffstat (limited to 'src/core/ext')
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 512 | ||||
-rw-r--r-- | src/core/ext/client_channel/lb_policy.c | 2 | ||||
-rw-r--r-- | src/core/ext/client_channel/subchannel.c | 4 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 9 | ||||
-rw-r--r-- | src/core/ext/lb_policy/pick_first/pick_first.c | 2 | ||||
-rw-r--r-- | src/core/ext/lb_policy/round_robin/round_robin.c | 2 | ||||
-rw-r--r-- | src/core/ext/load_reporting/load_reporting.c | 20 | ||||
-rw-r--r-- | src/core/ext/load_reporting/load_reporting.h | 17 | ||||
-rw-r--r-- | src/core/ext/load_reporting/load_reporting_filter.c | 18 | ||||
-rw-r--r-- | src/core/ext/resolver/dns/native/dns_resolver.c | 2 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 10 |
11 files changed, 319 insertions, 279 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index f85f445c33..b1a4aa9a1c 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -51,6 +51,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/deadline_filter.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" @@ -160,13 +161,10 @@ typedef struct client_channel_channel_data { /** client channel factory */ grpc_client_channel_factory *client_channel_factory; - /** mutex protecting all variables below in this data structure */ - gpr_mu mu; + /** combiner protecting all variables below in this data structure */ + grpc_combiner *combiner; /** currently active load balancer */ - char *lb_policy_name; grpc_lb_policy *lb_policy; - /** service config in JSON form */ - char *service_config_json; /** maps method names to method_parameters structs */ grpc_slice_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ @@ -183,6 +181,13 @@ typedef struct client_channel_channel_data { grpc_channel_stack *owning_stack; /** interested parties (owned) */ grpc_pollset_set *interested_parties; + + /* the following properties are guarded by a mutex since API's require them + to be instantaniously available */ + gpr_mu info_mu; + char *info_lb_policy_name; + /** service config in JSON form */ + char *info_service_config_json; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -218,32 +223,23 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, } static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, - lb_policy_connectivity_watcher *w, - grpc_error *error) { + void *arg, grpc_error *error) { + lb_policy_connectivity_watcher *w = arg; grpc_connectivity_state publish_state = w->state; - /* check if the notification is for a stale policy */ - if (w->lb_policy != w->chand->lb_policy) return; - - if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { - publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); - GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); - w->chand->lb_policy = NULL; - } - set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, - GRPC_ERROR_REF(error), "lb_changed"); - if (w->state != GRPC_CHANNEL_SHUTDOWN) { - watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + /* check if the notification is for the latest policy */ + if (w->lb_policy == w->chand->lb_policy) { + if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { + publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; + grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); + GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); + w->chand->lb_policy = NULL; + } + set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, + GRPC_ERROR_REF(error), "lb_changed"); + if (w->state != GRPC_CHANNEL_SHUTDOWN) { + watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + } } -} - -static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - lb_policy_connectivity_watcher *w = arg; - - gpr_mu_lock(&w->chand->mu); - on_lb_policy_state_changed_locked(exec_ctx, w, error); - gpr_mu_unlock(&w->chand->mu); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); @@ -256,16 +252,16 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; - grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w, + grpc_combiner_scheduler(chand->combiner, false)); w->state = current_state; w->lb_policy = lb_policy; grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, &w->on_changed); } -static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { channel_data *chand = arg; char *lb_policy_name = NULL; grpc_lb_policy *lb_policy = NULL; @@ -353,17 +349,18 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->interested_parties); } - gpr_mu_lock(&chand->mu); + gpr_mu_lock(&chand->info_mu); if (lb_policy_name != NULL) { - gpr_free(chand->lb_policy_name); - chand->lb_policy_name = lb_policy_name; + gpr_free(chand->info_lb_policy_name); + chand->info_lb_policy_name = lb_policy_name; } old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (service_config_json != NULL) { - gpr_free(chand->service_config_json); - chand->service_config_json = service_config_json; + gpr_free(chand->info_service_config_json); + chand->info_service_config_json = service_config_json; } + gpr_mu_unlock(&chand->info_mu); if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -391,7 +388,6 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, &chand->on_resolver_result_changed); - gpr_mu_unlock(&chand->mu); } else { if (chand->resolver != NULL) { grpc_resolver_shutdown(exec_ctx, chand->resolver); @@ -404,7 +400,6 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs, GPR_ARRAY_SIZE(refs)), "resolver_gone"); - gpr_mu_unlock(&chand->mu); } if (exit_idle) { @@ -426,20 +421,12 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_UNREF(state_error); } -static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_transport_op *op) { +static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + grpc_transport_op *op = arg; + grpc_channel_element *elem = op->transport_private.args[0]; channel_data *chand = elem->channel_data; - grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); - - GPR_ASSERT(op->set_accept_stream == false); - if (op->bind_pollset != NULL) { - grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, - op->bind_pollset); - } - - gpr_mu_lock(&chand->mu); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( exec_ctx, &chand->state_tracker, op->connectivity_state, @@ -482,25 +469,48 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } GRPC_ERROR_UNREF(op->disconnect_with_error); } - gpr_mu_unlock(&chand->mu); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op"); + + grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); +} + +static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { + channel_data *chand = elem->channel_data; + + GPR_ASSERT(op->set_accept_stream == false); + if (op->bind_pollset != NULL) { + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, + op->bind_pollset); + } + + op->transport_private.args[0] = elem; + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); + grpc_closure_sched( + exec_ctx, grpc_closure_init( + &op->transport_private.closure, start_transport_op_locked, + op, grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } static void cc_get_channel_info(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, const grpc_channel_info *info) { channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->mu); + gpr_mu_lock(&chand->info_mu); if (info->lb_policy_name != NULL) { - *info->lb_policy_name = chand->lb_policy_name == NULL + *info->lb_policy_name = chand->info_lb_policy_name == NULL ? NULL - : gpr_strdup(chand->lb_policy_name); + : gpr_strdup(chand->info_lb_policy_name); } if (info->service_config_json != NULL) { - *info->service_config_json = chand->service_config_json == NULL - ? NULL - : gpr_strdup(chand->service_config_json); + *info->service_config_json = + chand->info_service_config_json == NULL + ? NULL + : gpr_strdup(chand->info_service_config_json); } - gpr_mu_unlock(&chand->mu); + gpr_mu_unlock(&chand->info_mu); } /* Constructor for channel_data */ @@ -512,11 +522,12 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. - gpr_mu_init(&chand->mu); + chand->combiner = grpc_combiner_create(NULL); + gpr_mu_init(&chand->info_mu); chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, - on_resolver_result_changed, chand, - grpc_schedule_on_exec_ctx); + on_resolver_result_changed_locked, chand, + grpc_combiner_scheduler(chand->combiner, false)); chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); @@ -565,14 +576,15 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } - gpr_free(chand->lb_policy_name); - gpr_free(chand->service_config_json); + gpr_free(chand->info_lb_policy_name); + gpr_free(chand->info_service_config_json); if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(chand->interested_parties); - gpr_mu_destroy(&chand->mu); + grpc_pollset_set_destroy(exec_ctx, chand->interested_parties); + GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel"); + gpr_mu_destroy(&chand->info_mu); } /************************************************************************* @@ -615,8 +627,6 @@ typedef struct client_channel_call_data { grpc_subchannel_call */ gpr_atm subchannel_call; - gpr_mu mu; - subchannel_creation_phase creation_phase; grpc_connected_subchannel *connected_subchannel; grpc_polling_entity *pollent; @@ -661,52 +671,32 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, GRPC_ERROR_UNREF(error); } -typedef struct { - grpc_transport_stream_op **ops; - size_t nops; - grpc_subchannel_call *call; -} retry_ops_args; - -static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { - retry_ops_args *a = args; - size_t i; - for (i = 0; i < a->nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]); - } - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); - gpr_free(a->ops); - gpr_free(a); -} - static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { if (calld->waiting_ops_count == 0) { return; } - retry_ops_args *a = gpr_malloc(sizeof(*a)); - a->ops = calld->waiting_ops; - a->nops = calld->waiting_ops_count; - a->call = GET_CALL(calld); - if (a->call == CANCELLED_CALL) { - gpr_free(a); + grpc_subchannel_call *call = GET_CALL(calld); + grpc_transport_stream_op **ops = calld->waiting_ops; + size_t nops = calld->waiting_ops_count; + if (call == CANCELLED_CALL) { fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED); return; } calld->waiting_ops = NULL; calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; - GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_closure_sched( - exec_ctx, grpc_closure_create(retry_ops, a, grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + for (size_t i = 0; i < nops; i++) { + grpc_subchannel_call_process_op(exec_ctx, call, ops[i]); + } + gpr_free(ops); } -static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_call_element *elem = arg; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - gpr_mu_lock(&calld->mu); GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, @@ -742,7 +732,6 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, (gpr_atm)(uintptr_t)subchannel_call); retry_waiting_locked(exec_ctx, calld); } - gpr_mu_unlock(&calld->mu); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } @@ -768,37 +757,35 @@ typedef struct { /** Return true if subchannel is available immediately (in which case on_ready should not be called), or false otherwise (in which case on_ready should be called when the subchannel is available). */ -static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready, grpc_error *error); - -static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static bool pick_subchannel_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, + grpc_error *error); + +static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { continue_picking_args *cpa = arg; if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); } else { - call_data *calld = cpa->elem->call_data; - gpr_mu_lock(&calld->mu); - if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->initial_metadata_flags, cpa->connected_subchannel, - cpa->on_ready, GRPC_ERROR_NONE)) { + if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata, + cpa->initial_metadata_flags, + cpa->connected_subchannel, cpa->on_ready, + GRPC_ERROR_NONE)) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } - gpr_mu_unlock(&calld->mu); } gpr_free(cpa); } -static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready, grpc_error *error) { +static bool pick_subchannel_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, + grpc_error *error) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; @@ -808,7 +795,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_ASSERT(connected_subchannel); - gpr_mu_lock(&chand->mu); if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, @@ -824,7 +810,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1)); } } - gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); GRPC_ERROR_UNREF(error); return true; @@ -833,7 +818,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (chand->lb_policy != NULL) { grpc_lb_policy *lb_policy = chand->lb_policy; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); - gpr_mu_unlock(&chand->mu); // If the application explicitly set wait_for_ready, use that. // Otherwise, if the service config specified a value for this // method, use that. @@ -872,88 +856,66 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, cpa->connected_subchannel = connected_subchannel; cpa->on_ready = on_ready; cpa->elem = elem; - grpc_closure_init(&cpa->closure, continue_picking, cpa, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, + grpc_combiner_scheduler(chand->combiner, true)); grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { grpc_closure_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected")); } - gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); return false; } -// The logic here is fairly complicated, due to (a) the fact that we -// need to handle the case where we receive the send op before the -// initial metadata op, and (b) the need for efficiency, especially in -// the streaming case. -// TODO(ctiller): Explain this more thoroughly. -static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { - call_data *calld = elem->call_data; +static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, + grpc_transport_stream_op *op, + grpc_call_element *elem) { channel_data *chand = elem->channel_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); - /* try to (atomically) get the call */ - grpc_subchannel_call *call = GET_CALL(calld); - GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); - if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); - return; - } - if (call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, call, op); - GPR_TIMER_END("cc_start_transport_stream_op", 0); - return; - } - /* we failed; lock and figure out what to do */ - gpr_mu_lock(&calld->mu); -retry: + call_data *calld = elem->call_data; + grpc_subchannel_call *call; + /* need to recheck that another thread hasn't set the call */ call = GET_CALL(calld); if (call == CANCELLED_CALL) { - gpr_mu_unlock(&calld->mu); grpc_transport_stream_op_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ return; } if (call != NULL) { - gpr_mu_unlock(&calld->mu); grpc_subchannel_call_process_op(exec_ctx, call, op); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ return; } /* if this is a cancellation, then we can raise our cancelled flag */ if (op->cancel_error != GRPC_ERROR_NONE) { if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, (gpr_atm)(uintptr_t)CANCELLED_CALL)) { - goto retry; + /* recurse to retry */ + start_transport_stream_op_locked_inner(exec_ctx, op, elem); + /* early out */ + return; } else { - // Stash a copy of cancel_error in our call data, so that we can use - // it for subsequent operations. This ensures that if the call is - // cancelled before any ops are passed down (e.g., if the deadline - // is in the past when the call starts), we can return the right - // error to the caller when the first op does get passed down. + /* Stash a copy of cancel_error in our call data, so that we can use + it for subsequent operations. This ensures that if the call is + cancelled before any ops are passed down (e.g., if the deadline + is in the past when the call starts), we can return the right + error to the caller when the first op does get passed down. */ calld->cancel_error = GRPC_ERROR_REF(op->cancel_error); switch (calld->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: - pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel, - NULL, GRPC_ERROR_REF(op->cancel_error)); + pick_subchannel_locked(exec_ctx, elem, NULL, 0, + &calld->connected_subchannel, NULL, + GRPC_ERROR_REF(op->cancel_error)); break; } - gpr_mu_unlock(&calld->mu); grpc_transport_stream_op_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(op->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ return; } } @@ -962,16 +924,16 @@ retry: calld->connected_subchannel == NULL && op->send_initial_metadata != NULL) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; - grpc_closure_init(&calld->next_step, subchannel_ready, elem, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, + grpc_combiner_scheduler(chand->combiner, true)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so that IO of the lb_policy and resolver could be done under it. */ - if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, - op->send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step, - GRPC_ERROR_NONE)) { + if (pick_subchannel_locked(exec_ctx, elem, op->send_initial_metadata, + op->send_initial_metadata_flags, + &calld->connected_subchannel, &calld->next_step, + GRPC_ERROR_NONE)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } else { @@ -994,31 +956,89 @@ retry: gpr_atm_rel_store(&calld->subchannel_call, (gpr_atm)(uintptr_t)subchannel_call); retry_waiting_locked(exec_ctx, calld); - goto retry; + /* recurse to retry */ + start_transport_stream_op_locked_inner(exec_ctx, op, elem); + /* early out */ + return; } /* nothing to be done but wait */ add_waiting_locked(calld, op); - gpr_mu_unlock(&calld->mu); +} + +static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_ignored) { + GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0); + + grpc_transport_stream_op *op = arg; + grpc_call_element *elem = op->handler_private.args[0]; + call_data *calld = elem->call_data; + + start_transport_stream_op_locked_inner(exec_ctx, op, elem); + + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "start_transport_stream_op"); + GPR_TIMER_END("cc_start_transport_stream_op_locked", 0); +} + +/* The logic here is fairly complicated, due to (a) the fact that we + need to handle the case where we receive the send op before the + initial metadata op, and (b) the need for efficiency, especially in + the streaming case. + + We use double-checked locking to initially see if initialization has been + performed. If it has not, we acquire the combiner and perform initialization. + If it has, we proceed on the fast path. */ +static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); + /* try to (atomically) get the call */ + grpc_subchannel_call *call = GET_CALL(calld); + GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); + if (call == CANCELLED_CALL) { + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ + return; + } + if (call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, call, op); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ + return; + } + /* we failed; lock and figure out what to do */ + GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op"); + op->handler_private.args[0] = elem; + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&op->handler_private.closure, + cc_start_transport_stream_op_locked, op, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); GPR_TIMER_END("cc_start_transport_stream_op", 0); } // Gets data from the service config. Invoked when the resolver returns // its initial result. -static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void read_service_config_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_call_element *elem = arg; channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; // If this is an error, there's no point in looking at the service config. if (error == GRPC_ERROR_NONE) { // Get the method config table from channel data. - gpr_mu_lock(&chand->mu); grpc_slice_hash_table *method_params_table = NULL; if (chand->method_params_table != NULL) { method_params_table = grpc_slice_hash_table_ref(chand->method_params_table); } - gpr_mu_unlock(&chand->mu); // If the method config table was present, use it. if (method_params_table != NULL) { const method_parameters *method_params = grpc_method_config_table_get( @@ -1028,7 +1048,6 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0; if (have_method_timeout || method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - gpr_mu_lock(&calld->mu); if (have_method_timeout) { const gpr_timespec per_method_deadline = gpr_time_add(calld->call_start_time, method_params->timeout); @@ -1042,7 +1061,6 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, calld->wait_for_ready_from_service_config = method_params->wait_for_ready; } - gpr_mu_unlock(&calld->mu); } } grpc_slice_hash_table_unref(exec_ctx, method_params_table); @@ -1051,43 +1069,25 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); } -/* Constructor for call_data */ -static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - const grpc_call_element_args *args) { +static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_ignored) { + grpc_call_element *elem = arg; channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - // Initialize data members. - grpc_deadline_state_init(exec_ctx, elem, args->call_stack); - calld->path = grpc_slice_ref_internal(args->path); - calld->call_start_time = args->start_time; - calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); - calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET; - calld->cancel_error = GRPC_ERROR_NONE; - gpr_atm_rel_store(&calld->subchannel_call, 0); - gpr_mu_init(&calld->mu); - calld->connected_subchannel = NULL; - calld->waiting_ops = NULL; - calld->waiting_ops_count = 0; - calld->waiting_ops_capacity = 0; - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; - calld->owning_call = args->call_stack; - calld->pollent = NULL; // If the resolver has already returned results, then we can access // the service config parameters immediately. Otherwise, we need to // defer that work until the resolver returns an initial result. // TODO(roth): This code is almost but not quite identical to the code // in read_service_config() above. It would be nice to find a way to // combine them, to avoid having to maintain it twice. - gpr_mu_lock(&chand->mu); if (chand->lb_policy != NULL) { // We already have a resolver result, so check for service config. if (chand->method_params_table != NULL) { grpc_slice_hash_table *method_params_table = grpc_slice_hash_table_ref(chand->method_params_table); - gpr_mu_unlock(&chand->mu); method_parameters *method_params = grpc_method_config_table_get( - exec_ctx, method_params_table, args->path); + exec_ctx, method_params_table, calld->path); if (method_params != NULL) { if (gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) { @@ -1101,24 +1101,53 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, } } grpc_slice_hash_table_unref(exec_ctx, method_params_table); - } else { - gpr_mu_unlock(&chand->mu); } } else { // We don't yet have a resolver result, so register a callback to // get the service config data once the resolver returns. // Take a reference to the call stack to be owned by the callback. GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config"); - grpc_closure_init(&calld->read_service_config, read_service_config, elem, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->read_service_config, read_service_config_locked, + elem, grpc_combiner_scheduler(chand->combiner, false)); grpc_closure_list_append(&chand->waiting_for_config_closures, &calld->read_service_config, GRPC_ERROR_NONE); - gpr_mu_unlock(&chand->mu); } // Start the deadline timer with the current deadline value. If we // do not yet have service config data, then the timer may be reset // later. grpc_deadline_state_start(exec_ctx, elem, calld->deadline); + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "initial_read_service_config"); +} + +/* Constructor for call_data */ +static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_call_element_args *args) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + // Initialize data members. + grpc_deadline_state_init(exec_ctx, elem, args->call_stack); + calld->path = grpc_slice_ref_internal(args->path); + calld->call_start_time = args->start_time; + calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); + calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET; + calld->cancel_error = GRPC_ERROR_NONE; + gpr_atm_rel_store(&calld->subchannel_call, 0); + calld->connected_subchannel = NULL; + calld->waiting_ops = NULL; + calld->waiting_ops_count = 0; + calld->waiting_ops_capacity = 0; + calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + calld->owning_call = args->call_stack; + calld->pollent = NULL; + GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config"); + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&calld->read_service_config, + initial_read_service_config_locked, elem, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); return GRPC_ERROR_NONE; } @@ -1136,7 +1165,6 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); - gpr_mu_destroy(&calld->mu); GPR_ASSERT(calld->waiting_ops_count == 0); if (calld->connected_subchannel != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, @@ -1172,26 +1200,36 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; +static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + channel_data *chand = arg; + if (chand->lb_policy != NULL) { + grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); + } else { + chand->exit_idle_when_lb_policy_arrives = true; + if (!chand->started_resolving && chand->resolver != NULL) { + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + chand->started_resolving = true; + grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, + &chand->on_resolver_result_changed); + } + } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); +} + grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { channel_data *chand = elem->channel_data; - grpc_connectivity_state out; - gpr_mu_lock(&chand->mu); - out = grpc_connectivity_state_check(&chand->state_tracker, NULL); + grpc_connectivity_state out = + grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { - if (chand->lb_policy != NULL) { - grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); - } else { - chand->exit_idle_when_lb_policy_arrives = true; - if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - chand->started_resolving = true; - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); - } - } + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); + grpc_closure_sched( + exec_ctx, + grpc_closure_create(try_to_connect_locked, chand, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } - gpr_mu_unlock(&chand->mu); return out; } @@ -1199,6 +1237,7 @@ typedef struct { channel_data *chand; grpc_pollset *pollset; grpc_closure *on_complete; + grpc_connectivity_state *state; grpc_closure my_closure; } external_connectivity_watcher; @@ -1211,7 +1250,16 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); gpr_free(w); - follow_up->cb(exec_ctx, follow_up->cb_arg, error); + grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error)); +} + +static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + external_connectivity_watcher *w = arg; + grpc_closure_init(&w->my_closure, on_external_watch_complete, w, + grpc_schedule_on_exec_ctx); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); } void grpc_client_channel_watch_connectivity_state( @@ -1222,13 +1270,13 @@ void grpc_client_channel_watch_connectivity_state( w->chand = chand; w->pollset = pollset; w->on_complete = on_complete; + w->state = state; grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); - grpc_closure_init(&w->my_closure, on_external_watch_complete, w, - grpc_schedule_on_exec_ctx); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); - gpr_mu_lock(&chand->mu); - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &chand->state_tracker, state, &w->my_closure); - gpr_mu_unlock(&chand->mu); + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w, + grpc_combiner_scheduler(chand->combiner, true)), + GRPC_ERROR_NONE); } diff --git a/src/core/ext/client_channel/lb_policy.c b/src/core/ext/client_channel/lb_policy.c index 45ee72e2f0..90401b586f 100644 --- a/src/core/ext/client_channel/lb_policy.c +++ b/src/core/ext/client_channel/lb_policy.c @@ -94,7 +94,7 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); if (old_val == 1) { - grpc_pollset_set_destroy(policy->interested_parties); + grpc_pollset_set_destroy(exec_ctx, policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); } } diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index aa036e883b..09c68a91dd 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -217,7 +217,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_slice_unref_internal(exec_ctx, c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); - grpc_pollset_set_destroy(c->pollset_set); + grpc_pollset_set_destroy(exec_ctx, c->pollset_set); grpc_subchannel_key_destroy(exec_ctx, c->key); gpr_free(c); } @@ -419,7 +419,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c, grpc_error **error) { grpc_connectivity_state state; gpr_mu_lock(&c->mu); - state = grpc_connectivity_state_check(&c->state_tracker, error); + state = grpc_connectivity_state_get(&c->state_tracker, error); gpr_mu_unlock(&c->mu); return state; } diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index ab62e5ed6a..8a2af48328 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -492,9 +492,8 @@ static grpc_lb_addresses *process_serverlist_locked( static bool update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) { - grpc_error *curr_state_error; - const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check( - &glb_policy->state_tracker, &curr_state_error); + const grpc_connectivity_state curr_glb_state = + grpc_connectivity_state_check(&glb_policy->state_tracker); /* The new connectivity status is a function of the previous one and the new * input coming from the status of the RR policy. @@ -1098,8 +1097,8 @@ static grpc_connectivity_state glb_check_connectivity( glb_lb_policy *glb_policy = (glb_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&glb_policy->mu); - st = grpc_connectivity_state_check(&glb_policy->state_tracker, - connectivity_error); + st = grpc_connectivity_state_get(&glb_policy->state_tracker, + connectivity_error); gpr_mu_unlock(&glb_policy->mu); return st; } diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 9f2aa461be..1b965183f6 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -398,7 +398,7 @@ static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p = (pick_first_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker, error); + st = grpc_connectivity_state_get(&p->state_tracker, error); gpr_mu_unlock(&p->mu); return st; } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 3e060d189a..63e3d033ad 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -655,7 +655,7 @@ static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p = (round_robin_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker, error); + st = grpc_connectivity_state_get(&p->state_tracker, error); gpr_mu_unlock(&p->mu); return st; } diff --git a/src/core/ext/load_reporting/load_reporting.c b/src/core/ext/load_reporting/load_reporting.c index 37b06a737f..942aea4fd1 100644 --- a/src/core/ext/load_reporting/load_reporting.c +++ b/src/core/ext/load_reporting/load_reporting.c @@ -34,14 +34,34 @@ #include <limits.h> #include <string.h> +#include <grpc/load_reporting.h> #include <grpc/support/alloc.h> #include <grpc/support/sync.h> #include "src/core/ext/load_reporting/load_reporting.h" #include "src/core/ext/load_reporting/load_reporting_filter.h" #include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel_init.h" +static void destroy_lr_cost_context(void *c) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_load_reporting_cost_context *cost_ctx = c; + for (size_t i = 0; i < cost_ctx->values_count; ++i) { + grpc_slice_unref_internal(&exec_ctx, cost_ctx->values[i]); + } + grpc_exec_ctx_finish(&exec_ctx); + gpr_free(cost_ctx->values); + gpr_free(cost_ctx); +} + +void grpc_call_set_load_reporting_cost_context( + grpc_call *call, grpc_load_reporting_cost_context *ctx) { + grpc_call_context_set(call, GRPC_CONTEXT_LR_COST, ctx, + destroy_lr_cost_context); +} + static bool is_load_reporting_enabled(const grpc_channel_args *a) { if (a == NULL) return false; for (size_t i = 0; i < a->num_args; i++) { diff --git a/src/core/ext/load_reporting/load_reporting.h b/src/core/ext/load_reporting/load_reporting.h index a157844734..22859a599a 100644 --- a/src/core/ext/load_reporting/load_reporting.h +++ b/src/core/ext/load_reporting/load_reporting.h @@ -35,23 +35,8 @@ #define GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_H #include <grpc/impl/codegen/grpc_types.h> -#include "src/core/lib/channel/channel_stack.h" - -/** Metadata key for the gRPC LB load balancer token. - * - * The value corresponding to this key is an opaque token that is given to the - * frontend as part of each pick; the frontend sends this token to the backend - * in each request it sends when using that pick. The token is used by the - * backend to verify the request and to allow the backend to report load to the - * gRPC LB system. */ -#define GRPC_LB_TOKEN_MD_KEY "lb-token" -/** Metadata key for gRPC LB cost reporting. - * - * The value corresponding to this key is an opaque binary blob reported by the - * backend as part of its trailing metadata containing cost information for the - * call. */ -#define GRPC_LB_COST_MD_KEY "lb-cost-bin" +#include "src/core/lib/channel/channel_stack.h" /** Identifiers for the invocation point of the users LR callback */ typedef enum grpc_load_reporting_source { diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index c1f72a101a..c6386a8942 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -31,11 +31,13 @@ * */ +#include <string.h> + +#include <grpc/load_reporting.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/sync.h> -#include <string.h> #include "src/core/ext/load_reporting/load_reporting.h" #include "src/core/ext/load_reporting/load_reporting_filter.h" @@ -46,8 +48,6 @@ typedef struct call_data { intptr_t id; /**< an id unique to the call */ - bool have_trailing_md_string; - grpc_slice trailing_md_string; bool have_initial_md_string; grpc_slice initial_md_string; bool have_service_method; @@ -142,9 +142,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (calld->have_initial_md_string) { grpc_slice_unref_internal(exec_ctx, calld->initial_md_string); } - if (calld->have_trailing_md_string) { - grpc_slice_unref_internal(exec_ctx, calld->trailing_md_string); - } if (calld->have_service_method) { grpc_slice_unref_internal(exec_ctx, calld->service_method); } @@ -201,15 +198,6 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx, /* substitute our callback for the higher callback */ calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready; op->recv_initial_metadata_ready = &calld->on_initial_md_ready; - } else if (op->send_trailing_metadata) { - if (op->send_trailing_metadata->idx.named.lb_cost_bin != NULL) { - calld->trailing_md_string = grpc_slice_ref_internal( - GRPC_MDVALUE(op->send_trailing_metadata->idx.named.lb_cost_bin->md)); - calld->have_trailing_md_string = true; - grpc_metadata_batch_remove( - exec_ctx, op->send_trailing_metadata, - op->send_trailing_metadata->idx.named.lb_cost_bin); - } } grpc_call_next_op(exec_ctx, elem, op); diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 2c9623211b..c08b53ea04 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -244,7 +244,7 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { if (r->resolved_result != NULL) { grpc_channel_args_destroy(exec_ctx, r->resolved_result); } - grpc_pollset_set_destroy(r->interested_parties); + grpc_pollset_set_destroy(exec_ctx, r->interested_parties); gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index ddbc656b96..d1fab25478 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1041,8 +1041,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_TIMER_BEGIN("perform_stream_op_locked", 0); grpc_transport_stream_op *op = stream_op; - grpc_chttp2_transport *t = op->transport_private.args[0]; - grpc_chttp2_stream *s = op->transport_private.args[1]; + grpc_chttp2_transport *t = op->handler_private.args[0]; + grpc_chttp2_stream *s = op->handler_private.args[1]; if (grpc_http_trace) { char *str = grpc_transport_stream_op_string(op); @@ -1263,13 +1263,13 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_free(str); } - op->transport_private.args[0] = gt; - op->transport_private.args[1] = gs; + op->handler_private.args[0] = gt; + op->handler_private.args[1] = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); grpc_closure_sched( exec_ctx, grpc_closure_init( - &op->transport_private.closure, perform_stream_op_locked, op, + &op->handler_private.closure, perform_stream_op_locked, op, grpc_combiner_scheduler(t->combiner, op->covered_by_poller)), GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op", 0); |