diff options
author | Craig Tiller <ctiller@google.com> | 2017-02-16 16:36:15 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-02-16 16:36:15 -0800 |
commit | 9596d6b1120ab3a53cbbecfbb15b8551dff5902b (patch) | |
tree | f94ec5c4b682fb3518dba81eccebb0fc58d03eb6 /src | |
parent | 69c0437e7558795a2badb6b7ca5222820d2f098f (diff) | |
parent | 9e5ac1bf115a182c0418f21d60d15245af6307bf (diff) |
Merge pull request #9660 from ctiller/c3
Convert client_channel.c to use a combiner lock
Diffstat (limited to 'src')
24 files changed, 419 insertions, 295 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 06038bb5ba..62fb061bcf 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -51,6 +51,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/deadline_filter.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" @@ -160,13 +161,10 @@ typedef struct client_channel_channel_data { /** client channel factory */ grpc_client_channel_factory *client_channel_factory; - /** mutex protecting all variables below in this data structure */ - gpr_mu mu; + /** combiner protecting all variables below in this data structure */ + grpc_combiner *combiner; /** currently active load balancer */ - char *lb_policy_name; grpc_lb_policy *lb_policy; - /** service config in JSON form */ - char *service_config_json; /** maps method names to method_parameters structs */ grpc_slice_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ @@ -183,6 +181,13 @@ typedef struct client_channel_channel_data { grpc_channel_stack *owning_stack; /** interested parties (owned) */ grpc_pollset_set *interested_parties; + + /* the following properties are guarded by a mutex since API's require them + to be instantaniously available */ + gpr_mu info_mu; + char *info_lb_policy_name; + /** service config in JSON form */ + char *info_service_config_json; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -218,32 +223,23 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, } static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, - lb_policy_connectivity_watcher *w, - grpc_error *error) { + void *arg, grpc_error *error) { + lb_policy_connectivity_watcher *w = arg; grpc_connectivity_state publish_state = w->state; - /* check if the notification is for a stale policy */ - if (w->lb_policy != w->chand->lb_policy) return; - - if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { - publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); - GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); - w->chand->lb_policy = NULL; - } - set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, - GRPC_ERROR_REF(error), "lb_changed"); - if (w->state != GRPC_CHANNEL_SHUTDOWN) { - watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + /* check if the notification is for the latest policy */ + if (w->lb_policy == w->chand->lb_policy) { + if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { + publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; + grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); + GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); + w->chand->lb_policy = NULL; + } + set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, + GRPC_ERROR_REF(error), "lb_changed"); + if (w->state != GRPC_CHANNEL_SHUTDOWN) { + watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + } } -} - -static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - lb_policy_connectivity_watcher *w = arg; - - gpr_mu_lock(&w->chand->mu); - on_lb_policy_state_changed_locked(exec_ctx, w, error); - gpr_mu_unlock(&w->chand->mu); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); @@ -256,16 +252,16 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; - grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w, + grpc_combiner_scheduler(chand->combiner, false)); w->state = current_state; w->lb_policy = lb_policy; grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, &w->on_changed); } -static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { channel_data *chand = arg; char *lb_policy_name = NULL; grpc_lb_policy *lb_policy = NULL; @@ -353,17 +349,18 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->interested_parties); } - gpr_mu_lock(&chand->mu); + gpr_mu_lock(&chand->info_mu); if (lb_policy_name != NULL) { - gpr_free(chand->lb_policy_name); - chand->lb_policy_name = lb_policy_name; + gpr_free(chand->info_lb_policy_name); + chand->info_lb_policy_name = lb_policy_name; } old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (service_config_json != NULL) { - gpr_free(chand->service_config_json); - chand->service_config_json = service_config_json; + gpr_free(chand->info_service_config_json); + chand->info_service_config_json = service_config_json; } + gpr_mu_unlock(&chand->info_mu); if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -391,7 +388,6 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, &chand->on_resolver_result_changed); - gpr_mu_unlock(&chand->mu); } else { if (chand->resolver != NULL) { grpc_resolver_shutdown(exec_ctx, chand->resolver); @@ -404,7 +400,6 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs, GPR_ARRAY_SIZE(refs)), "resolver_gone"); - gpr_mu_unlock(&chand->mu); } if (exit_idle) { @@ -426,20 +421,12 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_UNREF(state_error); } -static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_transport_op *op) { +static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + grpc_transport_op *op = arg; + grpc_channel_element *elem = op->transport_private.args[0]; channel_data *chand = elem->channel_data; - grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); - - GPR_ASSERT(op->set_accept_stream == false); - if (op->bind_pollset != NULL) { - grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, - op->bind_pollset); - } - - gpr_mu_lock(&chand->mu); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( exec_ctx, &chand->state_tracker, op->connectivity_state, @@ -482,25 +469,48 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } GRPC_ERROR_UNREF(op->disconnect_with_error); } - gpr_mu_unlock(&chand->mu); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op"); + + grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); +} + +static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { + channel_data *chand = elem->channel_data; + + GPR_ASSERT(op->set_accept_stream == false); + if (op->bind_pollset != NULL) { + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, + op->bind_pollset); + } + + op->transport_private.args[0] = elem; + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); + grpc_closure_sched( + exec_ctx, grpc_closure_init( + &op->transport_private.closure, start_transport_op_locked, + op, grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } static void cc_get_channel_info(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, const grpc_channel_info *info) { channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->mu); + gpr_mu_lock(&chand->info_mu); if (info->lb_policy_name != NULL) { - *info->lb_policy_name = chand->lb_policy_name == NULL + *info->lb_policy_name = chand->info_lb_policy_name == NULL ? NULL - : gpr_strdup(chand->lb_policy_name); + : gpr_strdup(chand->info_lb_policy_name); } if (info->service_config_json != NULL) { - *info->service_config_json = chand->service_config_json == NULL - ? NULL - : gpr_strdup(chand->service_config_json); + *info->service_config_json = + chand->info_service_config_json == NULL + ? NULL + : gpr_strdup(chand->info_service_config_json); } - gpr_mu_unlock(&chand->mu); + gpr_mu_unlock(&chand->info_mu); } /* Constructor for channel_data */ @@ -512,11 +522,12 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. - gpr_mu_init(&chand->mu); + chand->combiner = grpc_combiner_create(NULL); + gpr_mu_init(&chand->info_mu); chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, - on_resolver_result_changed, chand, - grpc_schedule_on_exec_ctx); + on_resolver_result_changed_locked, chand, + grpc_combiner_scheduler(chand->combiner, false)); chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); @@ -565,14 +576,15 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } - gpr_free(chand->lb_policy_name); - gpr_free(chand->service_config_json); + gpr_free(chand->info_lb_policy_name); + gpr_free(chand->info_service_config_json); if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(chand->interested_parties); - gpr_mu_destroy(&chand->mu); + grpc_pollset_set_destroy(exec_ctx, chand->interested_parties); + GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel"); + gpr_mu_destroy(&chand->info_mu); } /************************************************************************* @@ -615,8 +627,6 @@ typedef struct client_channel_call_data { grpc_subchannel_call */ gpr_atm subchannel_call; - gpr_mu mu; - subchannel_creation_phase creation_phase; grpc_connected_subchannel *connected_subchannel; grpc_polling_entity *pollent; @@ -661,52 +671,32 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, GRPC_ERROR_UNREF(error); } -typedef struct { - grpc_transport_stream_op **ops; - size_t nops; - grpc_subchannel_call *call; -} retry_ops_args; - -static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { - retry_ops_args *a = args; - size_t i; - for (i = 0; i < a->nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]); - } - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); - gpr_free(a->ops); - gpr_free(a); -} - static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { if (calld->waiting_ops_count == 0) { return; } - retry_ops_args *a = gpr_malloc(sizeof(*a)); - a->ops = calld->waiting_ops; - a->nops = calld->waiting_ops_count; - a->call = GET_CALL(calld); - if (a->call == CANCELLED_CALL) { - gpr_free(a); + grpc_subchannel_call *call = GET_CALL(calld); + grpc_transport_stream_op **ops = calld->waiting_ops; + size_t nops = calld->waiting_ops_count; + if (call == CANCELLED_CALL) { fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED); return; } calld->waiting_ops = NULL; calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; - GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_closure_sched( - exec_ctx, grpc_closure_create(retry_ops, a, grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + for (size_t i = 0; i < nops; i++) { + grpc_subchannel_call_process_op(exec_ctx, call, ops[i]); + } + gpr_free(ops); } -static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_call_element *elem = arg; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - gpr_mu_lock(&calld->mu); GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, @@ -742,7 +732,6 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, (gpr_atm)(uintptr_t)subchannel_call); retry_waiting_locked(exec_ctx, calld); } - gpr_mu_unlock(&calld->mu); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } @@ -768,37 +757,35 @@ typedef struct { /** Return true if subchannel is available immediately (in which case on_ready should not be called), or false otherwise (in which case on_ready should be called when the subchannel is available). */ -static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready, grpc_error *error); - -static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static bool pick_subchannel_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, + grpc_error *error); + +static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { continue_picking_args *cpa = arg; if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); } else { - call_data *calld = cpa->elem->call_data; - gpr_mu_lock(&calld->mu); - if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->initial_metadata_flags, cpa->connected_subchannel, - cpa->on_ready, GRPC_ERROR_NONE)) { + if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata, + cpa->initial_metadata_flags, + cpa->connected_subchannel, cpa->on_ready, + GRPC_ERROR_NONE)) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } - gpr_mu_unlock(&calld->mu); } gpr_free(cpa); } -static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready, grpc_error *error) { +static bool pick_subchannel_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, + grpc_error *error) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; @@ -808,7 +795,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_ASSERT(connected_subchannel); - gpr_mu_lock(&chand->mu); if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, @@ -824,7 +810,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1)); } } - gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); GRPC_ERROR_UNREF(error); return true; @@ -833,7 +818,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (chand->lb_policy != NULL) { grpc_lb_policy *lb_policy = chand->lb_policy; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); - gpr_mu_unlock(&chand->mu); // If the application explicitly set wait_for_ready, use that. // Otherwise, if the service config specified a value for this // method, use that. @@ -872,88 +856,66 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, cpa->connected_subchannel = connected_subchannel; cpa->on_ready = on_ready; cpa->elem = elem; - grpc_closure_init(&cpa->closure, continue_picking, cpa, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, + grpc_combiner_scheduler(chand->combiner, true)); grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { grpc_closure_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected")); } - gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); return false; } -// The logic here is fairly complicated, due to (a) the fact that we -// need to handle the case where we receive the send op before the -// initial metadata op, and (b) the need for efficiency, especially in -// the streaming case. -// TODO(ctiller): Explain this more thoroughly. -static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { - call_data *calld = elem->call_data; +static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, + grpc_transport_stream_op *op, + grpc_call_element *elem) { channel_data *chand = elem->channel_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); - /* try to (atomically) get the call */ - grpc_subchannel_call *call = GET_CALL(calld); - GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); - if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); - return; - } - if (call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, call, op); - GPR_TIMER_END("cc_start_transport_stream_op", 0); - return; - } - /* we failed; lock and figure out what to do */ - gpr_mu_lock(&calld->mu); -retry: + call_data *calld = elem->call_data; + grpc_subchannel_call *call; + /* need to recheck that another thread hasn't set the call */ call = GET_CALL(calld); if (call == CANCELLED_CALL) { - gpr_mu_unlock(&calld->mu); grpc_transport_stream_op_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ return; } if (call != NULL) { - gpr_mu_unlock(&calld->mu); grpc_subchannel_call_process_op(exec_ctx, call, op); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ return; } /* if this is a cancellation, then we can raise our cancelled flag */ if (op->cancel_error != GRPC_ERROR_NONE) { if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, (gpr_atm)(uintptr_t)CANCELLED_CALL)) { - goto retry; + /* recurse to retry */ + start_transport_stream_op_locked_inner(exec_ctx, op, elem); + /* early out */ + return; } else { - // Stash a copy of cancel_error in our call data, so that we can use - // it for subsequent operations. This ensures that if the call is - // cancelled before any ops are passed down (e.g., if the deadline - // is in the past when the call starts), we can return the right - // error to the caller when the first op does get passed down. + /* Stash a copy of cancel_error in our call data, so that we can use + it for subsequent operations. This ensures that if the call is + cancelled before any ops are passed down (e.g., if the deadline + is in the past when the call starts), we can return the right + error to the caller when the first op does get passed down. */ calld->cancel_error = GRPC_ERROR_REF(op->cancel_error); switch (calld->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: - pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel, - NULL, GRPC_ERROR_REF(op->cancel_error)); + pick_subchannel_locked(exec_ctx, elem, NULL, 0, + &calld->connected_subchannel, NULL, + GRPC_ERROR_REF(op->cancel_error)); break; } - gpr_mu_unlock(&calld->mu); grpc_transport_stream_op_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(op->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ return; } } @@ -962,16 +924,16 @@ retry: calld->connected_subchannel == NULL && op->send_initial_metadata != NULL) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; - grpc_closure_init(&calld->next_step, subchannel_ready, elem, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, + grpc_combiner_scheduler(chand->combiner, true)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so that IO of the lb_policy and resolver could be done under it. */ - if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, - op->send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step, - GRPC_ERROR_NONE)) { + if (pick_subchannel_locked(exec_ctx, elem, op->send_initial_metadata, + op->send_initial_metadata_flags, + &calld->connected_subchannel, &calld->next_step, + GRPC_ERROR_NONE)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } else { @@ -994,31 +956,89 @@ retry: gpr_atm_rel_store(&calld->subchannel_call, (gpr_atm)(uintptr_t)subchannel_call); retry_waiting_locked(exec_ctx, calld); - goto retry; + /* recurse to retry */ + start_transport_stream_op_locked_inner(exec_ctx, op, elem); + /* early out */ + return; } /* nothing to be done but wait */ add_waiting_locked(calld, op); - gpr_mu_unlock(&calld->mu); +} + +static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_ignored) { + GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0); + + grpc_transport_stream_op *op = arg; + grpc_call_element *elem = op->handler_private.args[0]; + call_data *calld = elem->call_data; + + start_transport_stream_op_locked_inner(exec_ctx, op, elem); + + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "start_transport_stream_op"); + GPR_TIMER_END("cc_start_transport_stream_op_locked", 0); +} + +/* The logic here is fairly complicated, due to (a) the fact that we + need to handle the case where we receive the send op before the + initial metadata op, and (b) the need for efficiency, especially in + the streaming case. + + We use double-checked locking to initially see if initialization has been + performed. If it has not, we acquire the combiner and perform initialization. + If it has, we proceed on the fast path. */ +static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); + /* try to (atomically) get the call */ + grpc_subchannel_call *call = GET_CALL(calld); + GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); + if (call == CANCELLED_CALL) { + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ + return; + } + if (call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, call, op); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ + return; + } + /* we failed; lock and figure out what to do */ + GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op"); + op->handler_private.args[0] = elem; + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&op->handler_private.closure, + cc_start_transport_stream_op_locked, op, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); GPR_TIMER_END("cc_start_transport_stream_op", 0); } // Gets data from the service config. Invoked when the resolver returns // its initial result. -static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void read_service_config_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_call_element *elem = arg; channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; // If this is an error, there's no point in looking at the service config. if (error == GRPC_ERROR_NONE) { // Get the method config table from channel data. - gpr_mu_lock(&chand->mu); grpc_slice_hash_table *method_params_table = NULL; if (chand->method_params_table != NULL) { method_params_table = grpc_slice_hash_table_ref(chand->method_params_table); } - gpr_mu_unlock(&chand->mu); // If the method config table was present, use it. if (method_params_table != NULL) { const method_parameters *method_params = grpc_method_config_table_get( @@ -1028,7 +1048,6 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0; if (have_method_timeout || method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - gpr_mu_lock(&calld->mu); if (have_method_timeout) { const gpr_timespec per_method_deadline = gpr_time_add(calld->call_start_time, method_params->timeout); @@ -1042,7 +1061,6 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, calld->wait_for_ready_from_service_config = method_params->wait_for_ready; } - gpr_mu_unlock(&calld->mu); } } grpc_slice_hash_table_unref(exec_ctx, method_params_table); @@ -1051,43 +1069,25 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); } -/* Constructor for call_data */ -static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_call_element_args *args) { +static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_ignored) { + grpc_call_element *elem = arg; channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - // Initialize data members. - grpc_deadline_state_init(exec_ctx, elem, args->call_stack); - calld->path = grpc_slice_ref_internal(args->path); - calld->call_start_time = args->start_time; - calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); - calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET; - calld->cancel_error = GRPC_ERROR_NONE; - gpr_atm_rel_store(&calld->subchannel_call, 0); - gpr_mu_init(&calld->mu); - calld->connected_subchannel = NULL; - calld->waiting_ops = NULL; - calld->waiting_ops_count = 0; - calld->waiting_ops_capacity = 0; - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; - calld->owning_call = args->call_stack; - calld->pollent = NULL; // If the resolver has already returned results, then we can access // the service config parameters immediately. Otherwise, we need to // defer that work until the resolver returns an initial result. // TODO(roth): This code is almost but not quite identical to the code // in read_service_config() above. It would be nice to find a way to // combine them, to avoid having to maintain it twice. - gpr_mu_lock(&chand->mu); if (chand->lb_policy != NULL) { // We already have a resolver result, so check for service config. if (chand->method_params_table != NULL) { grpc_slice_hash_table *method_params_table = grpc_slice_hash_table_ref(chand->method_params_table); - gpr_mu_unlock(&chand->mu); method_parameters *method_params = grpc_method_config_table_get( - exec_ctx, method_params_table, args->path); + exec_ctx, method_params_table, calld->path); if (method_params != NULL) { if (gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) { @@ -1101,24 +1101,53 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, } } grpc_slice_hash_table_unref(exec_ctx, method_params_table); - } else { - gpr_mu_unlock(&chand->mu); } } else { // We don't yet have a resolver result, so register a callback to // get the service config data once the resolver returns. // Take a reference to the call stack to be owned by the callback. GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config"); - grpc_closure_init(&calld->read_service_config, read_service_config, elem, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->read_service_config, read_service_config_locked, + elem, grpc_combiner_scheduler(chand->combiner, false)); grpc_closure_list_append(&chand->waiting_for_config_closures, &calld->read_service_config, GRPC_ERROR_NONE); - gpr_mu_unlock(&chand->mu); } // Start the deadline timer with the current deadline value. If we // do not yet have service config data, then the timer may be reset // later. grpc_deadline_state_start(exec_ctx, elem, calld->deadline); + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "initial_read_service_config"); +} + +/* Constructor for call_data */ +static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_call_element_args *args) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + // Initialize data members. + grpc_deadline_state_init(exec_ctx, elem, args->call_stack); + calld->path = grpc_slice_ref_internal(args->path); + calld->call_start_time = args->start_time; + calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); + calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET; + calld->cancel_error = GRPC_ERROR_NONE; + gpr_atm_rel_store(&calld->subchannel_call, 0); + calld->connected_subchannel = NULL; + calld->waiting_ops = NULL; + calld->waiting_ops_count = 0; + calld->waiting_ops_capacity = 0; + calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + calld->owning_call = args->call_stack; + calld->pollent = NULL; + GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config"); + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&calld->read_service_config, + initial_read_service_config_locked, elem, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); return GRPC_ERROR_NONE; } @@ -1136,7 +1165,6 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); - gpr_mu_destroy(&calld->mu); GPR_ASSERT(calld->waiting_ops_count == 0); if (calld->connected_subchannel != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, @@ -1172,26 +1200,36 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; +static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + channel_data *chand = arg; + if (chand->lb_policy != NULL) { + grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); + } else { + chand->exit_idle_when_lb_policy_arrives = true; + if (!chand->started_resolving && chand->resolver != NULL) { + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + chand->started_resolving = true; + grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, + &chand->on_resolver_result_changed); + } + } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); +} + grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { channel_data *chand = elem->channel_data; - grpc_connectivity_state out; - gpr_mu_lock(&chand->mu); - out = grpc_connectivity_state_check(&chand->state_tracker, NULL); + grpc_connectivity_state out = + grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { - if (chand->lb_policy != NULL) { - grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); - } else { - chand->exit_idle_when_lb_policy_arrives = true; - if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - chand->started_resolving = true; - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); - } - } + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); + grpc_closure_sched( + exec_ctx, + grpc_closure_create(try_to_connect_locked, chand, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } - gpr_mu_unlock(&chand->mu); return out; } @@ -1199,6 +1237,7 @@ typedef struct { channel_data *chand; grpc_pollset *pollset; grpc_closure *on_complete; + grpc_connectivity_state *state; grpc_closure my_closure; } external_connectivity_watcher; @@ -1211,7 +1250,16 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); gpr_free(w); - follow_up->cb(exec_ctx, follow_up->cb_arg, error); + grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error)); +} + +static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + external_connectivity_watcher *w = arg; + grpc_closure_init(&w->my_closure, on_external_watch_complete, w, + grpc_schedule_on_exec_ctx); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); } void grpc_client_channel_watch_connectivity_state( @@ -1222,13 +1270,13 @@ void grpc_client_channel_watch_connectivity_state( w->chand = chand; w->pollset = pollset; w->on_complete = on_complete; + w->state = state; grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); - grpc_closure_init(&w->my_closure, on_external_watch_complete, w, - grpc_schedule_on_exec_ctx); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); - gpr_mu_lock(&chand->mu); - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &chand->state_tracker, state, &w->my_closure); - gpr_mu_unlock(&chand->mu); + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w, + grpc_combiner_scheduler(chand->combiner, true)), + GRPC_ERROR_NONE); } diff --git a/src/core/ext/client_channel/lb_policy.c b/src/core/ext/client_channel/lb_policy.c index 45ee72e2f0..90401b586f 100644 --- a/src/core/ext/client_channel/lb_policy.c +++ b/src/core/ext/client_channel/lb_policy.c @@ -94,7 +94,7 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); if (old_val == 1) { - grpc_pollset_set_destroy(policy->interested_parties); + grpc_pollset_set_destroy(exec_ctx, policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); } } diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index aa036e883b..09c68a91dd 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -217,7 +217,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_slice_unref_internal(exec_ctx, c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); - grpc_pollset_set_destroy(c->pollset_set); + grpc_pollset_set_destroy(exec_ctx, c->pollset_set); grpc_subchannel_key_destroy(exec_ctx, c->key); gpr_free(c); } @@ -419,7 +419,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c, grpc_error **error) { grpc_connectivity_state state; gpr_mu_lock(&c->mu); - state = grpc_connectivity_state_check(&c->state_tracker, error); + state = grpc_connectivity_state_get(&c->state_tracker, error); gpr_mu_unlock(&c->mu); return state; } diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index ab62e5ed6a..8a2af48328 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -492,9 +492,8 @@ static grpc_lb_addresses *process_serverlist_locked( static bool update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) { - grpc_error *curr_state_error; - const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check( - &glb_policy->state_tracker, &curr_state_error); + const grpc_connectivity_state curr_glb_state = + grpc_connectivity_state_check(&glb_policy->state_tracker); /* The new connectivity status is a function of the previous one and the new * input coming from the status of the RR policy. @@ -1098,8 +1097,8 @@ static grpc_connectivity_state glb_check_connectivity( glb_lb_policy *glb_policy = (glb_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&glb_policy->mu); - st = grpc_connectivity_state_check(&glb_policy->state_tracker, - connectivity_error); + st = grpc_connectivity_state_get(&glb_policy->state_tracker, + connectivity_error); gpr_mu_unlock(&glb_policy->mu); return st; } diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 9f2aa461be..1b965183f6 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -398,7 +398,7 @@ static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p = (pick_first_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker, error); + st = grpc_connectivity_state_get(&p->state_tracker, error); gpr_mu_unlock(&p->mu); return st; } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 3e060d189a..63e3d033ad 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -655,7 +655,7 @@ static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p = (round_robin_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker, error); + st = grpc_connectivity_state_get(&p->state_tracker, error); gpr_mu_unlock(&p->mu); return st; } diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 2c9623211b..c08b53ea04 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -244,7 +244,7 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { if (r->resolved_result != NULL) { grpc_channel_args_destroy(exec_ctx, r->resolved_result); } - grpc_pollset_set_destroy(r->interested_parties); + grpc_pollset_set_destroy(exec_ctx, r->interested_parties); gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index ddbc656b96..d1fab25478 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1041,8 +1041,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_TIMER_BEGIN("perform_stream_op_locked", 0); grpc_transport_stream_op *op = stream_op; - grpc_chttp2_transport *t = op->transport_private.args[0]; - grpc_chttp2_stream *s = op->transport_private.args[1]; + grpc_chttp2_transport *t = op->handler_private.args[0]; + grpc_chttp2_stream *s = op->handler_private.args[1]; if (grpc_http_trace) { char *str = grpc_transport_stream_op_string(op); @@ -1263,13 +1263,13 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_free(str); } - op->transport_private.args[0] = gt; - op->transport_private.args[1] = gs; + op->handler_private.args[0] = gt; + op->handler_private.args[1] = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); grpc_closure_sched( exec_ctx, grpc_closure_init( - &op->transport_private.closure, perform_stream_op_locked, op, + &op->handler_private.closure, perform_stream_op_locked, op, grpc_combiner_scheduler(t->combiner, op->covered_by_poller)), GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op", 0); diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index fb2108987b..6d7aa43b81 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -93,8 +93,9 @@ void grpc_httpcli_context_init(grpc_httpcli_context *context) { context->pollset_set = grpc_pollset_set_create(); } -void grpc_httpcli_context_destroy(grpc_httpcli_context *context) { - grpc_pollset_set_destroy(context->pollset_set); +void grpc_httpcli_context_destroy(grpc_exec_ctx *exec_ctx, + grpc_httpcli_context *context) { + grpc_pollset_set_destroy(exec_ctx, context->pollset_set); } static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, diff --git a/src/core/lib/http/httpcli.h b/src/core/lib/http/httpcli.h index 11e03b44df..8ae03ee78f 100644 --- a/src/core/lib/http/httpcli.h +++ b/src/core/lib/http/httpcli.h @@ -83,7 +83,8 @@ typedef struct grpc_httpcli_request { typedef struct grpc_http_response grpc_httpcli_response; void grpc_httpcli_context_init(grpc_httpcli_context *context); -void grpc_httpcli_context_destroy(grpc_httpcli_context *context); +void grpc_httpcli_context_destroy(grpc_exec_ctx *exec_ctx, + grpc_httpcli_context *context); /* Asynchronously perform a HTTP GET. 'context' specifies the http context under which to do the get diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index fc56843128..fac3705142 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -1842,13 +1842,12 @@ static grpc_pollset_set *pollset_set_create(void) { return pss; } -static void pollset_set_destroy(grpc_pollset_set *pss) { +static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pss) { gpr_mu_destroy(&pss->po.mu); if (pss->po.pi != NULL) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - PI_UNREF(&exec_ctx, pss->po.pi, "pss_destroy"); - grpc_exec_ctx_finish(&exec_ctx); + PI_UNREF(exec_ctx, pss->po.pi, "pss_destroy"); } gpr_free(pss); diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 21eb62753e..c03fadaebb 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -149,7 +149,7 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, static bool fd_is_orphaned(grpc_fd *fd); /* Reference counting for fds */ -/*#define GRPC_FD_REF_COUNT_DEBUG*/ +//#define GRPC_FD_REF_COUNT_DEBUG #ifdef GRPC_FD_REF_COUNT_DEBUG static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line); static void fd_unref(grpc_fd *fd, const char *reason, const char *file, @@ -191,6 +191,7 @@ struct grpc_pollset { int kicked_without_pollers; grpc_closure *shutdown_done; grpc_closure_list idle_jobs; + int pollset_set_count; /* all polled fds */ size_t fd_count; size_t fd_capacity; @@ -228,7 +229,7 @@ static grpc_error *pollset_kick_ext(grpc_pollset *p, /* Return 1 if the pollset has active threads in pollset_work (pollset must * be locked) */ -static int pollset_has_workers(grpc_pollset *pollset); +static bool pollset_has_workers(grpc_pollset *pollset); /******************************************************************************* * pollset_set definitions @@ -282,8 +283,8 @@ cv_fd_table g_cvfds; static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, - gpr_atm_no_barrier_load(&fd->refst), - gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); + (int)gpr_atm_no_barrier_load(&fd->refst), + (int)gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); #else #define REF_BY(fd, n, reason) ref_by(fd, n) #define UNREF_BY(fd, n, reason) unref_by(fd, n) @@ -297,8 +298,8 @@ static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { gpr_atm old; gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, - gpr_atm_no_barrier_load(&fd->refst), - gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line); + (int)gpr_atm_no_barrier_load(&fd->refst), + (int)gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line); #else static void unref_by(grpc_fd *fd, int n) { gpr_atm old; @@ -658,10 +659,18 @@ static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->next->prev = worker->prev; } -static int pollset_has_workers(grpc_pollset *p) { +static bool pollset_has_workers(grpc_pollset *p) { return p->root_worker.next != &p->root_worker; } +static bool pollset_in_pollset_sets(grpc_pollset *p) { + return p->pollset_set_count; +} + +static bool pollset_has_observers(grpc_pollset *p) { + return pollset_has_workers(p) || pollset_in_pollset_sets(p); +} + static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { if (pollset_has_workers(p)) { grpc_pollset_worker *w = p->root_worker.next; @@ -800,6 +809,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->fd_count = 0; pollset->fd_capacity = 0; pollset->fds = NULL; + pollset->pollset_set_count = 0; } static void pollset_destroy(grpc_pollset *pollset) { @@ -1061,7 +1071,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->shutting_down) { if (pollset_has_workers(pollset)) { pollset_kick(pollset, NULL); - } else if (!pollset->called_shutdown) { + } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); finish_shutdown(exec_ctx, pollset); @@ -1093,7 +1103,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (!pollset_has_workers(pollset)) { grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs); } - if (!pollset->called_shutdown && !pollset_has_workers(pollset)) { + if (!pollset->called_shutdown && !pollset_has_observers(pollset)) { pollset->called_shutdown = 1; finish_shutdown(exec_ctx, pollset); } @@ -1127,12 +1137,27 @@ static grpc_pollset_set *pollset_set_create(void) { return pollset_set; } -static void pollset_set_destroy(grpc_pollset_set *pollset_set) { +static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set) { size_t i; gpr_mu_destroy(&pollset_set->mu); for (i = 0; i < pollset_set->fd_count; i++) { GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); } + for (i = 0; i < pollset_set->pollset_count; i++) { + grpc_pollset *pollset = pollset_set->pollsets[i]; + gpr_mu_lock(&pollset->mu); + pollset->pollset_set_count--; + /* check shutdown */ + if (pollset->shutting_down && !pollset->called_shutdown && + !pollset_has_observers(pollset)) { + pollset->called_shutdown = 1; + gpr_mu_unlock(&pollset->mu); + finish_shutdown(exec_ctx, pollset); + } else { + gpr_mu_unlock(&pollset->mu); + } + } gpr_free(pollset_set->pollsets); gpr_free(pollset_set->pollset_sets); gpr_free(pollset_set->fds); @@ -1143,6 +1168,9 @@ static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_pollset *pollset) { size_t i, j; + gpr_mu_lock(&pollset->mu); + pollset->pollset_set_count++; + gpr_mu_unlock(&pollset->mu); gpr_mu_lock(&pollset_set->mu); if (pollset_set->pollset_count == pollset_set->pollset_capacity) { pollset_set->pollset_capacity = @@ -1178,6 +1206,17 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, } } gpr_mu_unlock(&pollset_set->mu); + gpr_mu_lock(&pollset->mu); + pollset->pollset_set_count--; + /* check shutdown */ + if (pollset->shutting_down && !pollset->called_shutdown && + !pollset_has_observers(pollset)) { + pollset->called_shutdown = 1; + gpr_mu_unlock(&pollset->mu); + finish_shutdown(exec_ctx, pollset); + } else { + gpr_mu_unlock(&pollset->mu); + } } static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 95b1d99d75..b5be5504b9 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -215,8 +215,9 @@ grpc_pollset_set *grpc_pollset_set_create(void) { return g_event_engine->pollset_set_create(); } -void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { - g_event_engine->pollset_set_destroy(pollset_set); +void grpc_pollset_set_destroy(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set) { + g_event_engine->pollset_set_destroy(exec_ctx, pollset_set); } void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1aea7d61f3..1a9e5c115a 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -74,7 +74,8 @@ typedef struct grpc_event_engine_vtable { struct grpc_fd *fd); grpc_pollset_set *(*pollset_set_create)(void); - void (*pollset_set_destroy)(grpc_pollset_set *pollset_set); + void (*pollset_set_destroy)(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set); void (*pollset_set_add_pollset)(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_pollset *pollset); diff --git a/src/core/lib/iomgr/pollset_set.h b/src/core/lib/iomgr/pollset_set.h index 34bb728c41..d11801d63b 100644 --- a/src/core/lib/iomgr/pollset_set.h +++ b/src/core/lib/iomgr/pollset_set.h @@ -44,7 +44,8 @@ typedef struct grpc_pollset_set grpc_pollset_set; grpc_pollset_set *grpc_pollset_set_create(void); -void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); +void grpc_pollset_set_destroy(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set); void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_pollset *pollset); diff --git a/src/core/lib/iomgr/pollset_set_uv.c b/src/core/lib/iomgr/pollset_set_uv.c index e5ef8b29e0..836cfee4ef 100644 --- a/src/core/lib/iomgr/pollset_set_uv.c +++ b/src/core/lib/iomgr/pollset_set_uv.c @@ -41,7 +41,8 @@ grpc_pollset_set* grpc_pollset_set_create(void) { return (grpc_pollset_set*)((intptr_t)0xdeafbeef); } -void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {} +void grpc_pollset_set_destroy(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* pollset_set) {} void grpc_pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pollset_set, diff --git a/src/core/lib/iomgr/pollset_set_windows.c b/src/core/lib/iomgr/pollset_set_windows.c index 645650db9b..ae18c8a3ce 100644 --- a/src/core/lib/iomgr/pollset_set_windows.c +++ b/src/core/lib/iomgr/pollset_set_windows.c @@ -42,7 +42,8 @@ grpc_pollset_set* grpc_pollset_set_create(void) { return (grpc_pollset_set*)((intptr_t)0xdeafbeef); } -void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {} +void grpc_pollset_set_destroy(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* pollset_set) {} void grpc_pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pollset_set, diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index a098741b70..ecd26de9fa 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -154,7 +154,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { } gpr_mu_unlock(g_polling_mu); - grpc_httpcli_context_destroy(&context); + grpc_httpcli_context_destroy(exec_ctx, &context); grpc_closure_init(&destroy_closure, destroy_pollset, grpc_polling_entity_pollset(&detector.pollent), grpc_schedule_on_exec_ctx); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index 2270be8f44..f128177e8c 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -898,10 +898,10 @@ grpc_jwt_verifier *grpc_jwt_verifier_create( return v; } -void grpc_jwt_verifier_destroy(grpc_jwt_verifier *v) { +void grpc_jwt_verifier_destroy(grpc_exec_ctx *exec_ctx, grpc_jwt_verifier *v) { size_t i; if (v == NULL) return; - grpc_httpcli_context_destroy(&v->http_ctx); + grpc_httpcli_context_destroy(exec_ctx, &v->http_ctx); if (v->mappings != NULL) { for (i = 0; i < v->num_mappings; i++) { gpr_free(v->mappings[i].email_domain); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.h b/src/core/lib/security/credentials/jwt/jwt_verifier.h index 4fa320a415..5c3d2a7788 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.h +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.h @@ -109,7 +109,8 @@ grpc_jwt_verifier *grpc_jwt_verifier_create( size_t num_mappings); /*The verifier must not be destroyed if there are still outstanding callbacks.*/ -void grpc_jwt_verifier_destroy(grpc_jwt_verifier *verifier); +void grpc_jwt_verifier_destroy(grpc_exec_ctx *exec_ctx, + grpc_jwt_verifier *verifier); /* User provided callback that will be called when the verification of the JWT is done (maybe in another thread). diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c index 1b0e43a1e4..c0f260f938 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c @@ -124,7 +124,7 @@ static void oauth2_token_fetcher_destruct(grpc_exec_ctx *exec_ctx, (grpc_oauth2_token_fetcher_credentials *)creds; grpc_credentials_md_store_unref(exec_ctx, c->access_token_md); gpr_mu_destroy(&c->mu); - grpc_httpcli_context_destroy(&c->httpcli_context); + grpc_httpcli_context_destroy(exec_ctx, &c->httpcli_context); } grpc_credentials_status diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 8fc5bf3e9a..afe1f6164d 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -62,7 +62,7 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) { void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state, const char *name) { - tracker->current_state = init_state; + gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state); tracker->current_error = GRPC_ERROR_NONE; tracker->watchers = NULL; tracker->name = gpr_strdup(name); @@ -89,15 +89,30 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, } grpc_connectivity_state grpc_connectivity_state_check( + grpc_connectivity_state_tracker *tracker) { + grpc_connectivity_state cur = + (grpc_connectivity_state)gpr_atm_no_barrier_load( + &tracker->current_state_atm); + if (grpc_connectivity_state_trace) { + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name, + grpc_connectivity_state_name(cur)); + } + return cur; +} + +grpc_connectivity_state grpc_connectivity_state_get( grpc_connectivity_state_tracker *tracker, grpc_error **error) { + grpc_connectivity_state cur = + (grpc_connectivity_state)gpr_atm_no_barrier_load( + &tracker->current_state_atm); if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name, - grpc_connectivity_state_name(tracker->current_state)); + grpc_connectivity_state_name(cur)); } if (error != NULL) { *error = GRPC_ERROR_REF(tracker->current_error); } - return tracker->current_state; + return cur; } bool grpc_connectivity_state_has_watchers( @@ -108,6 +123,9 @@ bool grpc_connectivity_state_has_watchers( bool grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify) { + grpc_connectivity_state cur = + (grpc_connectivity_state)gpr_atm_no_barrier_load( + &tracker->current_state_atm); if (grpc_connectivity_state_trace) { if (current == NULL) { gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", tracker, @@ -115,7 +133,7 @@ bool grpc_connectivity_state_notify_on_state_change( } else { gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker, tracker->name, grpc_connectivity_state_name(*current), - grpc_connectivity_state_name(tracker->current_state), notify); + grpc_connectivity_state_name(cur), notify); } } if (current == NULL) { @@ -138,8 +156,8 @@ bool grpc_connectivity_state_notify_on_state_change( } return false; } else { - if (tracker->current_state != *current) { - *current = tracker->current_state; + if (cur != *current) { + *current = cur; grpc_closure_sched(exec_ctx, notify, GRPC_ERROR_REF(tracker->current_error)); } else { @@ -149,7 +167,7 @@ bool grpc_connectivity_state_notify_on_state_change( w->next = tracker->watchers; tracker->watchers = w; } - return tracker->current_state == GRPC_CHANNEL_IDLE; + return cur == GRPC_CHANNEL_IDLE; } } @@ -157,11 +175,14 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, grpc_error *error, const char *reason) { + grpc_connectivity_state cur = + (grpc_connectivity_state)gpr_atm_no_barrier_load( + &tracker->current_state_atm); grpc_connectivity_state_watcher *w; if (grpc_connectivity_state_trace) { const char *error_string = grpc_error_string(error); gpr_log(GPR_DEBUG, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker, - tracker->name, grpc_connectivity_state_name(tracker->current_state), + tracker->name, grpc_connectivity_state_name(cur), grpc_connectivity_state_name(state), reason, error, error_string); } switch (state) { @@ -178,13 +199,13 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, } GRPC_ERROR_UNREF(tracker->current_error); tracker->current_error = error; - if (tracker->current_state == state) { + if (cur == state) { return; } - GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_SHUTDOWN); - tracker->current_state = state; + GPR_ASSERT(cur != GRPC_CHANNEL_SHUTDOWN); + gpr_atm_no_barrier_store(&tracker->current_state_atm, state); while ((w = tracker->watchers) != NULL) { - *w->current = tracker->current_state; + *w->current = state; tracker->watchers = w->next; if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "NOTIFY: %p %s: %p", tracker, tracker->name, diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index 769c675b79..c9604c34dd 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -47,8 +47,8 @@ typedef struct grpc_connectivity_state_watcher { } grpc_connectivity_state_watcher; typedef struct { - /** current connectivity state */ - grpc_connectivity_state current_state; + /** current grpc_connectivity_state */ + gpr_atm current_state_atm; /** error associated with state */ grpc_error *current_error; /** all our watchers */ @@ -59,6 +59,7 @@ typedef struct { extern int grpc_connectivity_state_trace; +/** enum --> string conversion */ const char *grpc_connectivity_state_name(grpc_connectivity_state state); void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, @@ -68,22 +69,31 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker); /** Set connectivity state; not thread safe; access must be serialized with an - * external lock */ + * external lock */ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, grpc_error *associated_error, const char *reason); +/** Return true if this connectivity state has watchers. + Access must be serialized with an external lock. */ bool grpc_connectivity_state_has_watchers( grpc_connectivity_state_tracker *tracker); +/** Return the last seen connectivity state. No need to synchronize access. */ grpc_connectivity_state grpc_connectivity_state_check( - grpc_connectivity_state_tracker *tracker, grpc_error **current_error); + grpc_connectivity_state_tracker *tracker); + +/** Return the last seen connectivity state, and the associated error. + Access must be serialized with an external lock. */ +grpc_connectivity_state grpc_connectivity_state_get( + grpc_connectivity_state_tracker *tracker, grpc_error **error); /** Return 1 if the channel should start connecting, 0 otherwise. If current==NULL cancel notify if it is already queued (success==0 in that - case) */ + case). + Access must be serialized with an external lock. */ bool grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify); diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 9a0abe1ca4..e56bf2780a 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -167,9 +167,9 @@ typedef struct grpc_transport_stream_op { /*************************************************************************** * remaining fields are initialized and used at the discretion of the - * transport implementation */ + * current handler of the op */ - grpc_transport_private_op_data transport_private; + grpc_transport_private_op_data handler_private; } grpc_transport_stream_op; /** Transport op: a set of operations to perform on a transport as a whole */ |