diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 505 | ||||
-rw-r--r-- | src/core/ext/client_channel/resolver.c | 18 | ||||
-rw-r--r-- | src/core/ext/client_channel/resolver.h | 31 | ||||
-rw-r--r-- | src/core/ext/client_channel/resolver_factory.h | 1 | ||||
-rw-r--r-- | src/core/ext/client_channel/resolver_registry.c | 4 | ||||
-rw-r--r-- | src/core/ext/client_channel/resolver_registry.h | 3 | ||||
-rw-r--r-- | src/core/ext/client_channel/subchannel.c | 2 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 9 | ||||
-rw-r--r-- | src/core/ext/lb_policy/pick_first/pick_first.c | 2 | ||||
-rw-r--r-- | src/core/ext/lb_policy/round_robin/round_robin.c | 2 | ||||
-rw-r--r-- | src/core/ext/resolver/dns/native/dns_resolver.c | 61 | ||||
-rw-r--r-- | src/core/ext/resolver/sockaddr/sockaddr_resolver.c | 46 | ||||
-rw-r--r-- | src/core/lib/transport/connectivity_state.c | 45 | ||||
-rw-r--r-- | src/core/lib/transport/connectivity_state.h | 20 |
14 files changed, 412 insertions, 337 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 06038bb5ba..758bdc0f17 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -51,6 +51,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/deadline_filter.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" @@ -160,13 +161,10 @@ typedef struct client_channel_channel_data { /** client channel factory */ grpc_client_channel_factory *client_channel_factory; - /** mutex protecting all variables below in this data structure */ - gpr_mu mu; + /** combiner protecting all variables below in this data structure */ + grpc_combiner *combiner; /** currently active load balancer */ - char *lb_policy_name; grpc_lb_policy *lb_policy; - /** service config in JSON form */ - char *service_config_json; /** maps method names to method_parameters structs */ grpc_slice_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ @@ -183,6 +181,13 @@ typedef struct client_channel_channel_data { grpc_channel_stack *owning_stack; /** interested parties (owned) */ grpc_pollset_set *interested_parties; + + /* the following properties are guarded by a mutex since API's require them + to be instantaniously available */ + gpr_mu info_mu; + char *info_lb_policy_name; + /** service config in JSON form */ + char *info_service_config_json; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -218,32 +223,23 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, } static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, - lb_policy_connectivity_watcher *w, - grpc_error *error) { + void *arg, grpc_error *error) { + lb_policy_connectivity_watcher *w = arg; grpc_connectivity_state publish_state = w->state; - /* check if the notification is for a stale policy */ - if (w->lb_policy != w->chand->lb_policy) return; - - if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { - publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); - GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); - w->chand->lb_policy = NULL; - } - set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, - GRPC_ERROR_REF(error), "lb_changed"); - if (w->state != GRPC_CHANNEL_SHUTDOWN) { - watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + /* check if the notification is for the latest policy */ + if (w->lb_policy == w->chand->lb_policy) { + if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { + publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; + grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver); + GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); + w->chand->lb_policy = NULL; + } + set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, + GRPC_ERROR_REF(error), "lb_changed"); + if (w->state != GRPC_CHANNEL_SHUTDOWN) { + watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + } } -} - -static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - lb_policy_connectivity_watcher *w = arg; - - gpr_mu_lock(&w->chand->mu); - on_lb_policy_state_changed_locked(exec_ctx, w, error); - gpr_mu_unlock(&w->chand->mu); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); @@ -256,16 +252,16 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; - grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w, + grpc_combiner_scheduler(chand->combiner, false)); w->state = current_state; w->lb_policy = lb_policy; grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, &w->on_changed); } -static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { channel_data *chand = arg; char *lb_policy_name = NULL; grpc_lb_policy *lb_policy = NULL; @@ -353,17 +349,18 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->interested_parties); } - gpr_mu_lock(&chand->mu); + gpr_mu_lock(&chand->info_mu); if (lb_policy_name != NULL) { - gpr_free(chand->lb_policy_name); - chand->lb_policy_name = lb_policy_name; + gpr_free(chand->info_lb_policy_name); + chand->info_lb_policy_name = lb_policy_name; } old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (service_config_json != NULL) { - gpr_free(chand->service_config_json); - chand->service_config_json = service_config_json; + gpr_free(chand->info_service_config_json); + chand->info_service_config_json = service_config_json; } + gpr_mu_unlock(&chand->info_mu); if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -389,12 +386,12 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, watch_lb_policy(exec_ctx, chand, lb_policy, state); } GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); - gpr_mu_unlock(&chand->mu); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } else { if (chand->resolver != NULL) { - grpc_resolver_shutdown(exec_ctx, chand->resolver); + grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; } @@ -404,7 +401,6 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs, GPR_ARRAY_SIZE(refs)), "resolver_gone"); - gpr_mu_unlock(&chand->mu); } if (exit_idle) { @@ -426,20 +422,12 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_UNREF(state_error); } -static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_transport_op *op) { +static void cc_start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + grpc_transport_op *op = arg; + grpc_channel_element *elem = op->transport_private.args[0]; channel_data *chand = elem->channel_data; - grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); - - GPR_ASSERT(op->set_accept_stream == false); - if (op->bind_pollset != NULL) { - grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, - op->bind_pollset); - } - - gpr_mu_lock(&chand->mu); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( exec_ctx, &chand->state_tracker, op->connectivity_state, @@ -464,7 +452,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, set_channel_connectivity_state_locked( exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error), "disconnect"); - grpc_resolver_shutdown(exec_ctx, chand->resolver); + grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; if (!chand->started_resolving) { @@ -482,25 +470,49 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } GRPC_ERROR_UNREF(op->disconnect_with_error); } - gpr_mu_unlock(&chand->mu); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op"); + + grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); +} + +static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { + channel_data *chand = elem->channel_data; + + GPR_ASSERT(op->set_accept_stream == false); + if (op->bind_pollset != NULL) { + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, + op->bind_pollset); + } + + op->transport_private.args[0] = elem; + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&op->transport_private.closure, + cc_start_transport_op_locked, op, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } static void cc_get_channel_info(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, const grpc_channel_info *info) { channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->mu); + gpr_mu_lock(&chand->info_mu); if (info->lb_policy_name != NULL) { - *info->lb_policy_name = chand->lb_policy_name == NULL + *info->lb_policy_name = chand->info_lb_policy_name == NULL ? NULL - : gpr_strdup(chand->lb_policy_name); + : gpr_strdup(chand->info_lb_policy_name); } if (info->service_config_json != NULL) { - *info->service_config_json = chand->service_config_json == NULL - ? NULL - : gpr_strdup(chand->service_config_json); + *info->service_config_json = + chand->info_service_config_json == NULL + ? NULL + : gpr_strdup(chand->info_service_config_json); } - gpr_mu_unlock(&chand->mu); + gpr_mu_unlock(&chand->info_mu); } /* Constructor for channel_data */ @@ -512,11 +524,12 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. - gpr_mu_init(&chand->mu); + chand->combiner = grpc_combiner_create(NULL); + gpr_mu_init(&chand->info_mu); chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, - on_resolver_result_changed, chand, - grpc_schedule_on_exec_ctx); + on_resolver_result_changed_locked, chand, + grpc_combiner_scheduler(chand->combiner, false)); chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); @@ -539,7 +552,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, chand->resolver = grpc_resolver_create( exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string, new_args != NULL ? new_args : args->channel_args, - chand->interested_parties); + chand->interested_parties, chand->combiner); if (proxy_name != NULL) gpr_free(proxy_name); if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args); if (chand->resolver == NULL) { @@ -548,13 +561,23 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } +static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_resolver *resolver = arg; + grpc_resolver_shutdown_locked(exec_ctx, resolver); + GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel"); +} + /* Destructor for channel_data */ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; if (chand->resolver != NULL) { - grpc_resolver_shutdown(exec_ctx, chand->resolver); - GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + grpc_closure_sched( + exec_ctx, + grpc_closure_create(shutdown_resolver_locked, chand->resolver, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } if (chand->client_channel_factory != NULL) { grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory); @@ -565,14 +588,15 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } - gpr_free(chand->lb_policy_name); - gpr_free(chand->service_config_json); + gpr_free(chand->info_lb_policy_name); + gpr_free(chand->info_service_config_json); if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); - gpr_mu_destroy(&chand->mu); + grpc_combiner_unref(exec_ctx, chand->combiner); + gpr_mu_destroy(&chand->info_mu); } /************************************************************************* @@ -615,8 +639,6 @@ typedef struct client_channel_call_data { grpc_subchannel_call */ gpr_atm subchannel_call; - gpr_mu mu; - subchannel_creation_phase creation_phase; grpc_connected_subchannel *connected_subchannel; grpc_polling_entity *pollent; @@ -661,52 +683,32 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, GRPC_ERROR_UNREF(error); } -typedef struct { - grpc_transport_stream_op **ops; - size_t nops; - grpc_subchannel_call *call; -} retry_ops_args; - -static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { - retry_ops_args *a = args; - size_t i; - for (i = 0; i < a->nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]); - } - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); - gpr_free(a->ops); - gpr_free(a); -} - static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { if (calld->waiting_ops_count == 0) { return; } - retry_ops_args *a = gpr_malloc(sizeof(*a)); - a->ops = calld->waiting_ops; - a->nops = calld->waiting_ops_count; - a->call = GET_CALL(calld); - if (a->call == CANCELLED_CALL) { - gpr_free(a); + grpc_subchannel_call *call = GET_CALL(calld); + grpc_transport_stream_op **ops = calld->waiting_ops; + size_t nops = calld->waiting_ops_count; + if (call == CANCELLED_CALL) { fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED); return; } calld->waiting_ops = NULL; calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; - GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_closure_sched( - exec_ctx, grpc_closure_create(retry_ops, a, grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + for (size_t i = 0; i < nops; i++) { + grpc_subchannel_call_process_op(exec_ctx, call, ops[i]); + } + gpr_free(ops); } -static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_call_element *elem = arg; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - gpr_mu_lock(&calld->mu); GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, @@ -742,7 +744,6 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, (gpr_atm)(uintptr_t)subchannel_call); retry_waiting_locked(exec_ctx, calld); } - gpr_mu_unlock(&calld->mu); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } @@ -768,37 +769,35 @@ typedef struct { /** Return true if subchannel is available immediately (in which case on_ready should not be called), or false otherwise (in which case on_ready should be called when the subchannel is available). */ -static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready, grpc_error *error); - -static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static bool pick_subchannel_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, + grpc_error *error); + +static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { continue_picking_args *cpa = arg; if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); } else { - call_data *calld = cpa->elem->call_data; - gpr_mu_lock(&calld->mu); - if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->initial_metadata_flags, cpa->connected_subchannel, - cpa->on_ready, GRPC_ERROR_NONE)) { + if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata, + cpa->initial_metadata_flags, + cpa->connected_subchannel, cpa->on_ready, + GRPC_ERROR_NONE)) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } - gpr_mu_unlock(&calld->mu); } gpr_free(cpa); } -static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready, grpc_error *error) { +static bool pick_subchannel_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, + grpc_error *error) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; @@ -808,7 +807,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_ASSERT(connected_subchannel); - gpr_mu_lock(&chand->mu); if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, @@ -824,7 +822,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1)); } } - gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); GRPC_ERROR_UNREF(error); return true; @@ -833,7 +830,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (chand->lb_policy != NULL) { grpc_lb_policy *lb_policy = chand->lb_policy; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); - gpr_mu_unlock(&chand->mu); // If the application explicitly set wait_for_ready, use that. // Otherwise, if the service config specified a value for this // method, use that. @@ -862,8 +858,9 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (chand->resolver != NULL && !chand->started_resolving) { chand->started_resolving = true; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } if (chand->resolver != NULL) { cpa = gpr_malloc(sizeof(*cpa)); @@ -872,62 +869,38 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, cpa->connected_subchannel = connected_subchannel; cpa->on_ready = on_ready; cpa->elem = elem; - grpc_closure_init(&cpa->closure, continue_picking, cpa, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, + grpc_combiner_scheduler(chand->combiner, true)); grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { grpc_closure_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected")); } - gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); return false; } -// The logic here is fairly complicated, due to (a) the fact that we -// need to handle the case where we receive the send op before the -// initial metadata op, and (b) the need for efficiency, especially in -// the streaming case. -// TODO(ctiller): Explain this more thoroughly. -static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_ignored) { + grpc_transport_stream_op *op = arg; + grpc_call_element *elem = op->transport_private.args[0]; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); - /* try to (atomically) get the call */ - grpc_subchannel_call *call = GET_CALL(calld); - GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); - if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); - return; - } - if (call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, call, op); - GPR_TIMER_END("cc_start_transport_stream_op", 0); - return; - } - /* we failed; lock and figure out what to do */ - gpr_mu_lock(&calld->mu); + grpc_subchannel_call *call; + retry: /* need to recheck that another thread hasn't set the call */ call = GET_CALL(calld); if (call == CANCELLED_CALL) { - gpr_mu_unlock(&calld->mu); grpc_transport_stream_op_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); - return; + goto done; } if (call != NULL) { - gpr_mu_unlock(&calld->mu); grpc_subchannel_call_process_op(exec_ctx, call, op); - GPR_TIMER_END("cc_start_transport_stream_op", 0); - return; + goto done; } /* if this is a cancellation, then we can raise our cancelled flag */ if (op->cancel_error != GRPC_ERROR_NONE) { @@ -946,15 +919,14 @@ retry: fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: - pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel, - NULL, GRPC_ERROR_REF(op->cancel_error)); + pick_subchannel_locked(exec_ctx, elem, NULL, 0, + &calld->connected_subchannel, NULL, + GRPC_ERROR_REF(op->cancel_error)); break; } - gpr_mu_unlock(&calld->mu); grpc_transport_stream_op_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(op->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); - return; + goto done; } } /* if we don't have a subchannel, try to get one */ @@ -962,16 +934,16 @@ retry: calld->connected_subchannel == NULL && op->send_initial_metadata != NULL) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; - grpc_closure_init(&calld->next_step, subchannel_ready, elem, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, + grpc_combiner_scheduler(chand->combiner, true)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so that IO of the lb_policy and resolver could be done under it. */ - if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, - op->send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step, - GRPC_ERROR_NONE)) { + if (pick_subchannel_locked(exec_ctx, elem, op->send_initial_metadata, + op->send_initial_metadata_flags, + &calld->connected_subchannel, &calld->next_step, + GRPC_ERROR_NONE)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } else { @@ -998,27 +970,64 @@ retry: } /* nothing to be done but wait */ add_waiting_locked(calld, op); - gpr_mu_unlock(&calld->mu); +done: + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "start_transport_stream_op"); GPR_TIMER_END("cc_start_transport_stream_op", 0); } +// The logic here is fairly complicated, due to (a) the fact that we +// need to handle the case where we receive the send op before the +// initial metadata op, and (b) the need for efficiency, especially in +// the streaming case. +// TODO(ctiller): Explain this more thoroughly. +static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); + /* try to (atomically) get the call */ + grpc_subchannel_call *call = GET_CALL(calld); + GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); + if (call == CANCELLED_CALL) { + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + return; + } + if (call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, call, op); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + return; + } + /* we failed; lock and figure out what to do */ + GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op"); + op->transport_private.args[0] = elem; + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&op->transport_private.closure, + cc_start_transport_stream_op_locked, op, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); +} + // Gets data from the service config. Invoked when the resolver returns // its initial result. -static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void read_service_config_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_call_element *elem = arg; channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; // If this is an error, there's no point in looking at the service config. if (error == GRPC_ERROR_NONE) { // Get the method config table from channel data. - gpr_mu_lock(&chand->mu); grpc_slice_hash_table *method_params_table = NULL; if (chand->method_params_table != NULL) { method_params_table = grpc_slice_hash_table_ref(chand->method_params_table); } - gpr_mu_unlock(&chand->mu); // If the method config table was present, use it. if (method_params_table != NULL) { const method_parameters *method_params = grpc_method_config_table_get( @@ -1028,7 +1037,6 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0; if (have_method_timeout || method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - gpr_mu_lock(&calld->mu); if (have_method_timeout) { const gpr_timespec per_method_deadline = gpr_time_add(calld->call_start_time, method_params->timeout); @@ -1042,7 +1050,6 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, calld->wait_for_ready_from_service_config = method_params->wait_for_ready; } - gpr_mu_unlock(&calld->mu); } } grpc_slice_hash_table_unref(exec_ctx, method_params_table); @@ -1051,43 +1058,25 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); } -/* Constructor for call_data */ -static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_call_element_args *args) { +static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_ignored) { + grpc_call_element *elem = arg; channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - // Initialize data members. - grpc_deadline_state_init(exec_ctx, elem, args->call_stack); - calld->path = grpc_slice_ref_internal(args->path); - calld->call_start_time = args->start_time; - calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); - calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET; - calld->cancel_error = GRPC_ERROR_NONE; - gpr_atm_rel_store(&calld->subchannel_call, 0); - gpr_mu_init(&calld->mu); - calld->connected_subchannel = NULL; - calld->waiting_ops = NULL; - calld->waiting_ops_count = 0; - calld->waiting_ops_capacity = 0; - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; - calld->owning_call = args->call_stack; - calld->pollent = NULL; // If the resolver has already returned results, then we can access // the service config parameters immediately. Otherwise, we need to // defer that work until the resolver returns an initial result. // TODO(roth): This code is almost but not quite identical to the code // in read_service_config() above. It would be nice to find a way to // combine them, to avoid having to maintain it twice. - gpr_mu_lock(&chand->mu); if (chand->lb_policy != NULL) { // We already have a resolver result, so check for service config. if (chand->method_params_table != NULL) { grpc_slice_hash_table *method_params_table = grpc_slice_hash_table_ref(chand->method_params_table); - gpr_mu_unlock(&chand->mu); method_parameters *method_params = grpc_method_config_table_get( - exec_ctx, method_params_table, args->path); + exec_ctx, method_params_table, calld->path); if (method_params != NULL) { if (gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) { @@ -1101,24 +1090,53 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, } } grpc_slice_hash_table_unref(exec_ctx, method_params_table); - } else { - gpr_mu_unlock(&chand->mu); } } else { // We don't yet have a resolver result, so register a callback to // get the service config data once the resolver returns. // Take a reference to the call stack to be owned by the callback. GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config"); - grpc_closure_init(&calld->read_service_config, read_service_config, elem, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->read_service_config, read_service_config_locked, + elem, grpc_combiner_scheduler(chand->combiner, false)); grpc_closure_list_append(&chand->waiting_for_config_closures, &calld->read_service_config, GRPC_ERROR_NONE); - gpr_mu_unlock(&chand->mu); } // Start the deadline timer with the current deadline value. If we // do not yet have service config data, then the timer may be reset // later. grpc_deadline_state_start(exec_ctx, elem, calld->deadline); + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "initial_read_service_config"); +} + +/* Constructor for call_data */ +static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_call_element_args *args) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + // Initialize data members. + grpc_deadline_state_init(exec_ctx, elem, args->call_stack); + calld->path = grpc_slice_ref_internal(args->path); + calld->call_start_time = args->start_time; + calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); + calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET; + calld->cancel_error = GRPC_ERROR_NONE; + gpr_atm_rel_store(&calld->subchannel_call, 0); + calld->connected_subchannel = NULL; + calld->waiting_ops = NULL; + calld->waiting_ops_count = 0; + calld->waiting_ops_capacity = 0; + calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + calld->owning_call = args->call_stack; + calld->pollent = NULL; + GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config"); + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&calld->read_service_config, + initial_read_service_config_locked, elem, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); return GRPC_ERROR_NONE; } @@ -1136,7 +1154,6 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); - gpr_mu_destroy(&calld->mu); GPR_ASSERT(calld->waiting_ops_count == 0); if (calld->connected_subchannel != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, @@ -1172,26 +1189,37 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; +static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + channel_data *chand = arg; + if (chand->lb_policy != NULL) { + grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); + } else { + chand->exit_idle_when_lb_policy_arrives = true; + if (!chand->started_resolving && chand->resolver != NULL) { + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + chand->started_resolving = true; + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); + } + } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); +} + grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { channel_data *chand = elem->channel_data; grpc_connectivity_state out; - gpr_mu_lock(&chand->mu); - out = grpc_connectivity_state_check(&chand->state_tracker, NULL); + out = grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { - if (chand->lb_policy != NULL) { - grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); - } else { - chand->exit_idle_when_lb_policy_arrives = true; - if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - chand->started_resolving = true; - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); - } - } + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); + grpc_closure_sched( + exec_ctx, + grpc_closure_create(try_to_connect_locked, chand, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } - gpr_mu_unlock(&chand->mu); return out; } @@ -1199,6 +1227,7 @@ typedef struct { channel_data *chand; grpc_pollset *pollset; grpc_closure *on_complete; + grpc_connectivity_state *state; grpc_closure my_closure; } external_connectivity_watcher; @@ -1211,7 +1240,17 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); gpr_free(w); - follow_up->cb(exec_ctx, follow_up->cb_arg, error); + grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error)); +} + +static void cc_watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_ignored) { + external_connectivity_watcher *w = arg; + grpc_closure_init(&w->my_closure, on_external_watch_complete, w, + grpc_schedule_on_exec_ctx); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); } void grpc_client_channel_watch_connectivity_state( @@ -1222,13 +1261,13 @@ void grpc_client_channel_watch_connectivity_state( w->chand = chand; w->pollset = pollset; w->on_complete = on_complete; + w->state = state; grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); - grpc_closure_init(&w->my_closure, on_external_watch_complete, w, - grpc_schedule_on_exec_ctx); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); - gpr_mu_lock(&chand->mu); - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &chand->state_tracker, state, &w->my_closure); - gpr_mu_unlock(&chand->mu); + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&w->my_closure, cc_watch_connectivity_state_locked, w, + grpc_combiner_scheduler(chand->combiner, true)), + GRPC_ERROR_NONE); } diff --git a/src/core/ext/client_channel/resolver.c b/src/core/ext/client_channel/resolver.c index 2ae4fe862e..22e6a9e525 100644 --- a/src/core/ext/client_channel/resolver.c +++ b/src/core/ext/client_channel/resolver.c @@ -66,16 +66,18 @@ void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { } } -void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { - resolver->vtable->shutdown(exec_ctx, resolver); +void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + resolver->vtable->shutdown_locked(exec_ctx, resolver); } -void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { - resolver->vtable->channel_saw_error(exec_ctx, resolver); +void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + resolver->vtable->channel_saw_error_locked(exec_ctx, resolver); } -void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete) { - resolver->vtable->next(exec_ctx, resolver, result, on_complete); +void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, + grpc_closure *on_complete) { + resolver->vtable->next_locked(exec_ctx, resolver, result, on_complete); } diff --git a/src/core/ext/client_channel/resolver.h b/src/core/ext/client_channel/resolver.h index 96ece92b9d..01fe9a4a46 100644 --- a/src/core/ext/client_channel/resolver.h +++ b/src/core/ext/client_channel/resolver.h @@ -48,10 +48,11 @@ struct grpc_resolver { struct grpc_resolver_vtable { void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*next)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete); + void (*shutdown_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); + void (*channel_saw_error_locked)(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); + void (*next_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, grpc_closure *on_complete); }; #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG @@ -72,19 +73,27 @@ void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *policy); void grpc_resolver_init(grpc_resolver *resolver, const grpc_resolver_vtable *vtable); -void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); +void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); /** Notification that the channel has seen an error on some address. - Can be used as a hint that re-resolution is desirable soon. */ -void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver); + Can be used as a hint that re-resolution is desirable soon. + + Must be called from the combiner passed as a resolver_arg at construction + time.*/ +void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); /** Get the next result from the resolver. Expected to set \a *result with new channel args and then schedule \a on_complete for execution. If resolution is fatally broken, set \a *result to NULL and - schedule \a on_complete. */ -void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete); + schedule \a on_complete. + + Must be called from the combiner passed as a resolver_arg at construction + time.*/ +void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, + grpc_closure *on_complete); #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H */ diff --git a/src/core/ext/client_channel/resolver_factory.h b/src/core/ext/client_channel/resolver_factory.h index 3792ddca18..e3cd99ec5a 100644 --- a/src/core/ext/client_channel/resolver_factory.h +++ b/src/core/ext/client_channel/resolver_factory.h @@ -50,6 +50,7 @@ typedef struct grpc_resolver_args { grpc_uri *uri; const grpc_channel_args *args; grpc_pollset_set *pollset_set; + grpc_combiner *combiner; } grpc_resolver_args; struct grpc_resolver_factory_vtable { diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c index 5110a7cad9..f8e8bc9c39 100644 --- a/src/core/ext/client_channel/resolver_registry.c +++ b/src/core/ext/client_channel/resolver_registry.c @@ -133,7 +133,8 @@ static grpc_resolver_factory *resolve_factory(const char *target, grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *args, - grpc_pollset_set *pollset_set) { + grpc_pollset_set *pollset_set, + grpc_combiner *combiner) { grpc_uri *uri = NULL; char *canonical_target = NULL; grpc_resolver_factory *factory = @@ -144,6 +145,7 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, resolver_args.uri = uri; resolver_args.args = args; resolver_args.pollset_set = pollset_set; + resolver_args.combiner = combiner; resolver = grpc_resolver_factory_create_resolver(exec_ctx, factory, &resolver_args); grpc_uri_destroy(uri); diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h index a4606463eb..e2c189cf0c 100644 --- a/src/core/ext/client_channel/resolver_registry.h +++ b/src/core/ext/client_channel/resolver_registry.h @@ -65,7 +65,8 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); should not be NULL. */ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *args, - grpc_pollset_set *pollset_set); + grpc_pollset_set *pollset_set, + grpc_combiner *combiner); /** Find a resolver factory given a name and return an (owned-by-the-caller) * reference to it */ diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index aa036e883b..c37134fd5e 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -419,7 +419,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c, grpc_error **error) { grpc_connectivity_state state; gpr_mu_lock(&c->mu); - state = grpc_connectivity_state_check(&c->state_tracker, error); + state = grpc_connectivity_state_get(&c->state_tracker, error); gpr_mu_unlock(&c->mu); return state; } diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index ab62e5ed6a..8a2af48328 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -492,9 +492,8 @@ static grpc_lb_addresses *process_serverlist_locked( static bool update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) { - grpc_error *curr_state_error; - const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check( - &glb_policy->state_tracker, &curr_state_error); + const grpc_connectivity_state curr_glb_state = + grpc_connectivity_state_check(&glb_policy->state_tracker); /* The new connectivity status is a function of the previous one and the new * input coming from the status of the RR policy. @@ -1098,8 +1097,8 @@ static grpc_connectivity_state glb_check_connectivity( glb_lb_policy *glb_policy = (glb_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&glb_policy->mu); - st = grpc_connectivity_state_check(&glb_policy->state_tracker, - connectivity_error); + st = grpc_connectivity_state_get(&glb_policy->state_tracker, + connectivity_error); gpr_mu_unlock(&glb_policy->mu); return st; } diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 9f2aa461be..1b965183f6 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -398,7 +398,7 @@ static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p = (pick_first_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker, error); + st = grpc_connectivity_state_get(&p->state_tracker, error); gpr_mu_unlock(&p->mu); return st; } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 3e060d189a..63e3d033ad 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -655,7 +655,7 @@ static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p = (round_robin_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker, error); + st = grpc_connectivity_state_get(&p->state_tracker, error); gpr_mu_unlock(&p->mu); return st; } diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 2c9623211b..7efe3be250 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -40,6 +40,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/backoff.h" @@ -62,9 +63,9 @@ typedef struct { grpc_channel_args *channel_args; /** pollset_set to drive the name resolution process */ grpc_pollset_set *interested_parties; + /** combiner (shared with client channel) */ + grpc_combiner *combiner; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** are we currently resolving? */ bool resolving; /** which version of the result have we published? */ @@ -95,18 +96,20 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, dns_resolver *r); -static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, - grpc_channel_args **target_result, - grpc_closure *on_complete); +static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *r); +static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_channel_args **target_result, + grpc_closure *on_complete); static const grpc_resolver_vtable dns_resolver_vtable = { - dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next}; + dns_destroy, dns_shutdown_locked, dns_channel_saw_error_locked, + dns_next_locked}; -static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { +static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); if (r->have_retry_timer) { grpc_timer_cancel(exec_ctx, &r->retry_timer); } @@ -116,25 +119,21 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { GRPC_ERROR_CREATE("Resolver Shutdown")); r->next_completion = NULL; } - gpr_mu_unlock(&r->mu); } -static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); if (!r->resolving) { gpr_backoff_reset(&r->backoff_state); dns_start_resolving_locked(exec_ctx, r); } - gpr_mu_unlock(&r->mu); } -static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **target_result, - grpc_closure *on_complete) { +static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **target_result, + grpc_closure *on_complete) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_result = target_result; @@ -144,30 +143,26 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, } else { dns_maybe_finish_next_locked(exec_ctx, r); } - gpr_mu_unlock(&r->mu); } -static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { dns_resolver *r = arg; - gpr_mu_lock(&r->mu); r->have_retry_timer = false; if (error == GRPC_ERROR_NONE) { if (!r->resolving) { dns_start_resolving_locked(exec_ctx, r); } } - gpr_mu_unlock(&r->mu); GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer"); } -static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { dns_resolver *r = arg; grpc_channel_args *result = NULL; - gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving); r->resolving = false; if (r->addresses != NULL) { @@ -198,8 +193,8 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, } else { gpr_log(GPR_DEBUG, "retrying immediately"); } - grpc_closure_init(&r->on_retry, dns_on_retry_timer, r, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r, + grpc_combiner_scheduler(r->combiner, false)); grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now); } if (r->resolved_result != NULL) { @@ -208,7 +203,6 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, r->resolved_result = result; r->resolved_version++; dns_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); } @@ -221,7 +215,8 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, r->addresses = NULL; grpc_resolve_address( exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties, - grpc_closure_create(dns_on_resolved, r, grpc_schedule_on_exec_ctx), + grpc_closure_create(dns_on_resolved_locked, r, + grpc_combiner_scheduler(r->combiner, false)), &r->addresses); } @@ -240,7 +235,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { dns_resolver *r = (dns_resolver *)gr; - gpr_mu_destroy(&r->mu); + grpc_combiner_unref(exec_ctx, r->combiner); if (r->resolved_result != NULL) { grpc_channel_args_destroy(exec_ctx, r->resolved_result); } @@ -264,7 +259,7 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx, // Create resolver. dns_resolver *r = gpr_malloc(sizeof(dns_resolver)); memset(r, 0, sizeof(*r)); - gpr_mu_init(&r->mu); + r->combiner = args->combiner; grpc_resolver_init(&r->base, &dns_resolver_vtable); r->name_to_resolve = gpr_strdup(path); r->default_port = gpr_strdup(default_port); diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index a1365f6465..2dd5259719 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -45,6 +45,7 @@ #include "src/core/ext/client_channel/parse_address.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/slice/slice_internal.h" @@ -58,8 +59,8 @@ typedef struct { grpc_lb_addresses *addresses; /** channel args */ grpc_channel_args *channel_args; - /** mutex guarding the rest of the state */ - gpr_mu mu; + /** combiner guarding the rest of the state */ + grpc_combiner *combiner; /** have we published? */ bool published; /** pending next completion, or NULL */ @@ -73,48 +74,43 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, sockaddr_resolver *r); -static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *r); -static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, - grpc_channel_args **target_result, - grpc_closure *on_complete); +static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *r); +static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_channel_args **target_result, + grpc_closure *on_complete); static const grpc_resolver_vtable sockaddr_resolver_vtable = { - sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, - sockaddr_next}; + sockaddr_destroy, sockaddr_shutdown_locked, + sockaddr_channel_saw_error_locked, sockaddr_next_locked}; -static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_result = NULL; grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; } - gpr_mu_unlock(&r->mu); } -static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); r->published = false; sockaddr_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); } -static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **target_result, - grpc_closure *on_complete) { +static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver, + grpc_channel_args **target_result, + grpc_closure *on_complete) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_result = target_result; sockaddr_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); } static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, @@ -131,7 +127,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; - gpr_mu_destroy(&r->mu); + grpc_combiner_unref(exec_ctx, r->combiner); grpc_lb_addresses_destroy(exec_ctx, r->addresses); grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); @@ -201,7 +197,7 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx, memset(r, 0, sizeof(*r)); r->addresses = addresses; r->channel_args = grpc_channel_args_copy(args->args); - gpr_mu_init(&r->mu); + r->combiner = grpc_combiner_ref(args->combiner); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); return &r->base; } diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 8fc5bf3e9a..afe1f6164d 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -62,7 +62,7 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) { void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state, const char *name) { - tracker->current_state = init_state; + gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state); tracker->current_error = GRPC_ERROR_NONE; tracker->watchers = NULL; tracker->name = gpr_strdup(name); @@ -89,15 +89,30 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, } grpc_connectivity_state grpc_connectivity_state_check( + grpc_connectivity_state_tracker *tracker) { + grpc_connectivity_state cur = + (grpc_connectivity_state)gpr_atm_no_barrier_load( + &tracker->current_state_atm); + if (grpc_connectivity_state_trace) { + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name, + grpc_connectivity_state_name(cur)); + } + return cur; +} + +grpc_connectivity_state grpc_connectivity_state_get( grpc_connectivity_state_tracker *tracker, grpc_error **error) { + grpc_connectivity_state cur = + (grpc_connectivity_state)gpr_atm_no_barrier_load( + &tracker->current_state_atm); if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name, - grpc_connectivity_state_name(tracker->current_state)); + grpc_connectivity_state_name(cur)); } if (error != NULL) { *error = GRPC_ERROR_REF(tracker->current_error); } - return tracker->current_state; + return cur; } bool grpc_connectivity_state_has_watchers( @@ -108,6 +123,9 @@ bool grpc_connectivity_state_has_watchers( bool grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify) { + grpc_connectivity_state cur = + (grpc_connectivity_state)gpr_atm_no_barrier_load( + &tracker->current_state_atm); if (grpc_connectivity_state_trace) { if (current == NULL) { gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", tracker, @@ -115,7 +133,7 @@ bool grpc_connectivity_state_notify_on_state_change( } else { gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker, tracker->name, grpc_connectivity_state_name(*current), - grpc_connectivity_state_name(tracker->current_state), notify); + grpc_connectivity_state_name(cur), notify); } } if (current == NULL) { @@ -138,8 +156,8 @@ bool grpc_connectivity_state_notify_on_state_change( } return false; } else { - if (tracker->current_state != *current) { - *current = tracker->current_state; + if (cur != *current) { + *current = cur; grpc_closure_sched(exec_ctx, notify, GRPC_ERROR_REF(tracker->current_error)); } else { @@ -149,7 +167,7 @@ bool grpc_connectivity_state_notify_on_state_change( w->next = tracker->watchers; tracker->watchers = w; } - return tracker->current_state == GRPC_CHANNEL_IDLE; + return cur == GRPC_CHANNEL_IDLE; } } @@ -157,11 +175,14 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, grpc_error *error, const char *reason) { + grpc_connectivity_state cur = + (grpc_connectivity_state)gpr_atm_no_barrier_load( + &tracker->current_state_atm); grpc_connectivity_state_watcher *w; if (grpc_connectivity_state_trace) { const char *error_string = grpc_error_string(error); gpr_log(GPR_DEBUG, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker, - tracker->name, grpc_connectivity_state_name(tracker->current_state), + tracker->name, grpc_connectivity_state_name(cur), grpc_connectivity_state_name(state), reason, error, error_string); } switch (state) { @@ -178,13 +199,13 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, } GRPC_ERROR_UNREF(tracker->current_error); tracker->current_error = error; - if (tracker->current_state == state) { + if (cur == state) { return; } - GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_SHUTDOWN); - tracker->current_state = state; + GPR_ASSERT(cur != GRPC_CHANNEL_SHUTDOWN); + gpr_atm_no_barrier_store(&tracker->current_state_atm, state); while ((w = tracker->watchers) != NULL) { - *w->current = tracker->current_state; + *w->current = state; tracker->watchers = w->next; if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "NOTIFY: %p %s: %p", tracker, tracker->name, diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index 769c675b79..c9604c34dd 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -47,8 +47,8 @@ typedef struct grpc_connectivity_state_watcher { } grpc_connectivity_state_watcher; typedef struct { - /** current connectivity state */ - grpc_connectivity_state current_state; + /** current grpc_connectivity_state */ + gpr_atm current_state_atm; /** error associated with state */ grpc_error *current_error; /** all our watchers */ @@ -59,6 +59,7 @@ typedef struct { extern int grpc_connectivity_state_trace; +/** enum --> string conversion */ const char *grpc_connectivity_state_name(grpc_connectivity_state state); void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, @@ -68,22 +69,31 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker); /** Set connectivity state; not thread safe; access must be serialized with an - * external lock */ + * external lock */ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, grpc_error *associated_error, const char *reason); +/** Return true if this connectivity state has watchers. + Access must be serialized with an external lock. */ bool grpc_connectivity_state_has_watchers( grpc_connectivity_state_tracker *tracker); +/** Return the last seen connectivity state. No need to synchronize access. */ grpc_connectivity_state grpc_connectivity_state_check( - grpc_connectivity_state_tracker *tracker, grpc_error **current_error); + grpc_connectivity_state_tracker *tracker); + +/** Return the last seen connectivity state, and the associated error. + Access must be serialized with an external lock. */ +grpc_connectivity_state grpc_connectivity_state_get( + grpc_connectivity_state_tracker *tracker, grpc_error **error); /** Return 1 if the channel should start connecting, 0 otherwise. If current==NULL cancel notify if it is already queued (success==0 in that - case) */ + case). + Access must be serialized with an external lock. */ bool grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify); |