diff options
Diffstat (limited to 'src/core/ext/client_config/client_channel.c')
-rw-r--r-- | src/core/ext/client_config/client_channel.c | 537 |
1 files changed, 391 insertions, 146 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 2c0c4abffc..61e012578e 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -33,6 +33,7 @@ #include "src/core/ext/client_config/client_channel.h" +#include <stdbool.h> #include <stdio.h> #include <string.h> @@ -41,10 +42,11 @@ #include <grpc/support/sync.h> #include <grpc/support/useful.h> -#include "src/core/ext/client_config/subchannel_call_holder.h" +#include "src/core/ext/client_config/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/channel.h" @@ -52,30 +54,31 @@ /* Client channel implementation */ -typedef grpc_subchannel_call_holder call_data; +/************************************************************************* + * CHANNEL-WIDE FUNCTIONS + */ typedef struct client_channel_channel_data { /** resolver for this channel */ grpc_resolver *resolver; /** have we started resolving this channel */ - int started_resolving; + bool started_resolving; /** mutex protecting client configuration, including all variables below in this data structure */ - gpr_mu mu_config; - /** currently active load balancer - guarded by mu_config */ + gpr_mu mu; + /** currently active load balancer - guarded by mu */ grpc_lb_policy *lb_policy; - /** incoming configuration - set by resolver.next - guarded by mu_config */ - grpc_client_config *incoming_configuration; + /** incoming resolver result - set by resolver.next(), guarded by mu */ + grpc_resolver_result *resolver_result; /** a list of closures that are all waiting for config to come in */ grpc_closure_list waiting_for_config_closures; /** resolver callback */ - grpc_closure on_config_changed; + grpc_closure on_resolver_result_changed; /** connectivity state being tracked */ grpc_connectivity_state_tracker state_tracker; /** when an lb_policy arrives, should we try to exit idle */ - int exit_idle_when_lb_policy_arrives; + bool exit_idle_when_lb_policy_arrives; /** owning stack */ grpc_channel_stack *owning_stack; /** interested parties (owned) */ @@ -83,10 +86,8 @@ typedef struct client_channel_channel_data { } channel_data; /** We create one watcher for each new lb_policy that is returned from a - resolver, - to watch for state changes from the lb_policy. When a state change is seen, - we - update the channel, and create a new watcher */ + resolver, to watch for state changes from the lb_policy. When a state + change is seen, we update the channel, and create a new watcher. */ typedef struct { channel_data *chand; grpc_closure on_changed; @@ -94,22 +95,6 @@ typedef struct { grpc_lb_policy *lb_policy; } lb_policy_connectivity_watcher; -typedef struct { - grpc_closure closure; - grpc_call_element *elem; -} waiting_call; - -static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data); -} - -static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op); -} - static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state); @@ -156,9 +141,9 @@ static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { lb_policy_connectivity_watcher *w = arg; - gpr_mu_lock(&w->chand->mu_config); + gpr_mu_lock(&w->chand->mu); on_lb_policy_state_changed_locked(exec_ctx, w, error); - gpr_mu_unlock(&w->chand->mu_config); + gpr_mu_unlock(&w->chand->mu); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); @@ -178,17 +163,17 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, &w->on_changed); } -static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; - int exit_idle = 0; + bool exit_idle = false; grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); - if (chand->incoming_configuration != NULL) { - lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); + if (chand->resolver_result != NULL) { + lb_policy = grpc_resolver_result_get_lb_policy(chand->resolver_result); if (lb_policy != NULL) { GRPC_LB_POLICY_REF(lb_policy, "channel"); GRPC_LB_POLICY_REF(lb_policy, "config_change"); @@ -197,17 +182,17 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); } - grpc_client_config_unref(exec_ctx, chand->incoming_configuration); + grpc_resolver_result_unref(exec_ctx, chand->resolver_result); } - chand->incoming_configuration = NULL; + chand->resolver_result = NULL; if (lb_policy != NULL) { grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties, chand->interested_parties); } - gpr_mu_lock(&chand->mu_config); + gpr_mu_lock(&chand->mu); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (lb_policy != NULL) { @@ -222,8 +207,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); - exit_idle = 1; - chand->exit_idle_when_lb_policy_arrives = 0; + exit_idle = true; + chand->exit_idle_when_lb_policy_arrives = false; } if (error == GRPC_ERROR_NONE && chand->resolver) { @@ -233,10 +218,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, watch_lb_policy(exec_ctx, chand, lb_policy, state); } GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, - &chand->incoming_configuration, - &chand->on_config_changed); - gpr_mu_unlock(&chand->mu_config); + grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, + &chand->on_resolver_result_changed); + gpr_mu_unlock(&chand->mu); } else { if (chand->resolver != NULL) { grpc_resolver_shutdown(exec_ctx, chand->resolver); @@ -249,7 +233,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs, GPR_ARRAY_SIZE(refs)), "resolver_gone"); - gpr_mu_unlock(&chand->mu_config); + gpr_mu_unlock(&chand->mu); } if (exit_idle) { @@ -284,7 +268,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, op->bind_pollset); } - gpr_mu_lock(&chand->mu_config); + gpr_mu_lock(&chand->mu); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( exec_ctx, &chand->state_tracker, op->connectivity_state, @@ -329,7 +313,189 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } GRPC_ERROR_UNREF(op->disconnect_with_error); } - gpr_mu_unlock(&chand->mu_config); + gpr_mu_unlock(&chand->mu); +} + +/* Constructor for channel_data */ +static void cc_init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { + channel_data *chand = elem->channel_data; + + memset(chand, 0, sizeof(*chand)); + + GPR_ASSERT(args->is_last); + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + + gpr_mu_init(&chand->mu); + grpc_closure_init(&chand->on_resolver_result_changed, + on_resolver_result_changed, chand); + chand->owning_stack = args->channel_stack; + + grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, + "client_channel"); + chand->interested_parties = grpc_pollset_set_create(); +} + +/* Destructor for channel_data */ +static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + + if (chand->resolver != NULL) { + grpc_resolver_shutdown(exec_ctx, chand->resolver); + GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + } + if (chand->lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, + chand->lb_policy->interested_parties, + chand->interested_parties); + GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); + } + grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); + grpc_pollset_set_destroy(chand->interested_parties); + gpr_mu_destroy(&chand->mu); +} + +/************************************************************************* + * PER-CALL FUNCTIONS + */ + +#define GET_CALL(call_data) \ + ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call))) + +#define CANCELLED_CALL ((grpc_subchannel_call *)1) + +typedef enum { + GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING, + GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL +} subchannel_creation_phase; + +/** Call data. Holds a pointer to grpc_subchannel_call and the + associated machinery to create such a pointer. + Handles queueing of stream ops until a call object is ready, waiting + for initial metadata before trying to create a call object, + and handling cancellation gracefully. */ +typedef struct client_channel_call_data { + /** either 0 for no call, 1 for cancelled, or a pointer to a + grpc_subchannel_call */ + gpr_atm subchannel_call; + + gpr_mu mu; + + subchannel_creation_phase creation_phase; + grpc_connected_subchannel *connected_subchannel; + grpc_polling_entity *pollent; + + grpc_transport_stream_op *waiting_ops; + size_t waiting_ops_count; + size_t waiting_ops_capacity; + + grpc_closure next_step; + + grpc_call_stack *owning_call; +} call_data; + +static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) { + GPR_TIMER_BEGIN("add_waiting_locked", 0); + if (calld->waiting_ops_count == calld->waiting_ops_capacity) { + calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity); + calld->waiting_ops = + gpr_realloc(calld->waiting_ops, + calld->waiting_ops_capacity * sizeof(*calld->waiting_ops)); + } + calld->waiting_ops[calld->waiting_ops_count++] = *op; + GPR_TIMER_END("add_waiting_locked", 0); +} + +static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, + grpc_error *error) { + size_t i; + for (i = 0; i < calld->waiting_ops_count; i++) { + grpc_transport_stream_op_finish_with_failure( + exec_ctx, &calld->waiting_ops[i], GRPC_ERROR_REF(error)); + } + calld->waiting_ops_count = 0; + GRPC_ERROR_UNREF(error); +} + +typedef struct { + grpc_transport_stream_op *ops; + size_t nops; + grpc_subchannel_call *call; +} retry_ops_args; + +static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { + retry_ops_args *a = args; + size_t i; + for (i = 0; i < a->nops; i++) { + grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]); + } + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); + gpr_free(a->ops); + gpr_free(a); +} + +static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { + retry_ops_args *a = gpr_malloc(sizeof(*a)); + a->ops = calld->waiting_ops; + a->nops = calld->waiting_ops_count; + a->call = GET_CALL(calld); + if (a->call == CANCELLED_CALL) { + gpr_free(a); + fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED); + return; + } + calld->waiting_ops = NULL; + calld->waiting_ops_count = 0; + calld->waiting_ops_capacity = 0; + GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); + grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a), + GRPC_ERROR_NONE, NULL); +} + +static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + call_data *calld = arg; + gpr_mu_lock(&calld->mu); + GPR_ASSERT(calld->creation_phase == + GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); + calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + if (calld->connected_subchannel == NULL) { + gpr_atm_no_barrier_store(&calld->subchannel_call, 1); + fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING( + "Failed to create subchannel", &error, 1)); + } else if (1 == gpr_atm_acq_load(&calld->subchannel_call)) { + /* already cancelled before subchannel became ready */ + fail_locked(exec_ctx, calld, + GRPC_ERROR_CREATE_REFERENCING( + "Cancelled before creating subchannel", &error, 1)); + } else { + grpc_subchannel_call *subchannel_call = NULL; + grpc_error *new_error = grpc_connected_subchannel_create_call( + exec_ctx, calld->connected_subchannel, calld->pollent, + &subchannel_call); + if (new_error != GRPC_ERROR_NONE) { + new_error = grpc_error_add_child(new_error, error); + subchannel_call = CANCELLED_CALL; + fail_locked(exec_ctx, calld, new_error); + } + gpr_atm_rel_store(&calld->subchannel_call, + (gpr_atm)(uintptr_t)subchannel_call); + retry_waiting_locked(exec_ctx, calld); + } + gpr_mu_unlock(&calld->mu); + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); +} + +static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + call_data *calld = elem->call_data; + grpc_subchannel_call *subchannel_call = GET_CALL(calld); + if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) { + return NULL; + } else { + return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); + } } typedef struct { @@ -341,11 +507,11 @@ typedef struct { grpc_closure closure; } continue_picking_args; -static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready); +static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata, + uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, + grpc_closure *on_ready); static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -354,22 +520,21 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); - } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->initial_metadata_flags, - cpa->connected_subchannel, cpa->on_ready)) { + } else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, + cpa->initial_metadata_flags, + cpa->connected_subchannel, cpa->on_ready)) { grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); } gpr_free(cpa); } -static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready) { - GPR_TIMER_BEGIN("cc_pick_subchannel", 0); +static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata, + uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, + grpc_closure *on_ready) { + GPR_TIMER_BEGIN("pick_subchannel", 0); - grpc_call_element *elem = elemp; channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; continue_picking_args *cpa; @@ -377,7 +542,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, GPR_ASSERT(connected_subchannel); - gpr_mu_lock(&chand->mu_config); + gpr_mu_lock(&chand->mu); if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, @@ -392,28 +557,27 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, GRPC_ERROR_CREATE("Pick cancelled"), NULL); } } - gpr_mu_unlock(&chand->mu_config); - GPR_TIMER_END("cc_pick_subchannel", 0); - return 1; + gpr_mu_unlock(&chand->mu); + GPR_TIMER_END("pick_subchannel", 0); + return true; } if (chand->lb_policy != NULL) { grpc_lb_policy *lb_policy = chand->lb_policy; int r; - GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel"); - gpr_mu_unlock(&chand->mu_config); + GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); + gpr_mu_unlock(&chand->mu); r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent, initial_metadata, initial_metadata_flags, connected_subchannel, on_ready); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel"); - GPR_TIMER_END("cc_pick_subchannel", 0); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); + GPR_TIMER_END("pick_subchannel", 0); return r; } if (chand->resolver != NULL && !chand->started_resolving) { - chand->started_resolving = 1; + chand->started_resolving = true; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, - &chand->incoming_configuration, - &chand->on_config_changed); + grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, + &chand->on_resolver_result_changed); } if (chand->resolver != NULL) { cpa = gpr_malloc(sizeof(*cpa)); @@ -429,67 +593,145 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"), NULL); } - gpr_mu_unlock(&chand->mu_config); + gpr_mu_unlock(&chand->mu); - GPR_TIMER_END("cc_pick_subchannel", 0); - return 0; + GPR_TIMER_END("pick_subchannel", 0); + return false; +} + +// The logic here is fairly complicated, due to (a) the fact that we +// need to handle the case where we receive the send op before the +// initial metadata op, and (b) the need for efficiency, especially in +// the streaming case. +// TODO(ctiller): Explain this more thoroughly. +static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + call_data *calld = elem->call_data; + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + /* try to (atomically) get the call */ + grpc_subchannel_call *call = GET_CALL(calld); + GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); + if (call == CANCELLED_CALL) { + grpc_transport_stream_op_finish_with_failure(exec_ctx, op, + GRPC_ERROR_CANCELLED); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + return; + } + if (call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, call, op); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + return; + } + /* we failed; lock and figure out what to do */ + gpr_mu_lock(&calld->mu); +retry: + /* need to recheck that another thread hasn't set the call */ + call = GET_CALL(calld); + if (call == CANCELLED_CALL) { + gpr_mu_unlock(&calld->mu); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op, + GRPC_ERROR_CANCELLED); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + return; + } + if (call != NULL) { + gpr_mu_unlock(&calld->mu); + grpc_subchannel_call_process_op(exec_ctx, call, op); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + return; + } + /* if this is a cancellation, then we can raise our cancelled flag */ + if (op->cancel_error != GRPC_ERROR_NONE) { + if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, + (gpr_atm)(uintptr_t)CANCELLED_CALL)) { + goto retry; + } else { + switch (calld->creation_phase) { + case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: + fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); + break; + case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: + pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel, + NULL); + break; + } + gpr_mu_unlock(&calld->mu); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op, + GRPC_ERROR_CANCELLED); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + return; + } + } + /* if we don't have a subchannel, try to get one */ + if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && + calld->connected_subchannel == NULL && + op->send_initial_metadata != NULL) { + calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; + grpc_closure_init(&calld->next_step, subchannel_ready, calld); + GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); + if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, + op->send_initial_metadata_flags, + &calld->connected_subchannel, &calld->next_step)) { + calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); + } + } + /* if we've got a subchannel, then let's ask it to create a call */ + if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && + calld->connected_subchannel != NULL) { + grpc_subchannel_call *subchannel_call = NULL; + grpc_error *error = grpc_connected_subchannel_create_call( + exec_ctx, calld->connected_subchannel, calld->pollent, + &subchannel_call); + if (error != GRPC_ERROR_NONE) { + subchannel_call = CANCELLED_CALL; + fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error); + } + gpr_atm_rel_store(&calld->subchannel_call, + (gpr_atm)(uintptr_t)subchannel_call); + retry_waiting_locked(exec_ctx, calld); + goto retry; + } + /* nothing to be done but wait */ + add_waiting_locked(calld, op); + gpr_mu_unlock(&calld->mu); + GPR_TIMER_END("cc_start_transport_stream_op", 0); } /* Constructor for call_data */ -static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_call_element_args *args) { - grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem, - args->call_stack); +static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_call_element_args *args) { + call_data *calld = elem->call_data; + gpr_atm_rel_store(&calld->subchannel_call, 0); + gpr_mu_init(&calld->mu); + calld->connected_subchannel = NULL; + calld->waiting_ops = NULL; + calld->waiting_ops_count = 0; + calld->waiting_ops_capacity = 0; + calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + calld->owning_call = args->call_stack; + calld->pollent = NULL; return GRPC_ERROR_NONE; } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const grpc_call_final_info *final_info, - void *and_free_memory) { - grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data); - gpr_free(and_free_memory); -} - -/* Constructor for channel_data */ -static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_channel_element_args *args) { - channel_data *chand = elem->channel_data; - - memset(chand, 0, sizeof(*chand)); - - GPR_ASSERT(args->is_last); - GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - - gpr_mu_init(&chand->mu_config); - grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); - chand->owning_stack = args->channel_stack; - - grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, - "client_channel"); - chand->interested_parties = grpc_pollset_set_create(); -} - -/* Destructor for channel_data */ -static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) { - channel_data *chand = elem->channel_data; - - if (chand->resolver != NULL) { - grpc_resolver_shutdown(exec_ctx, chand->resolver); - GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); - } - if (chand->lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, - chand->lb_policy->interested_parties, - chand->interested_parties); - GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); +static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_call_final_info *final_info, + void *and_free_memory) { + call_data *calld = elem->call_data; + grpc_subchannel_call *call = GET_CALL(calld); + if (call != NULL && call != CANCELLED_CALL) { + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } - grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(chand->interested_parties); - gpr_mu_destroy(&chand->mu_config); + GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); + gpr_mu_destroy(&calld->mu); + GPR_ASSERT(calld->waiting_ops_count == 0); + gpr_free(calld->waiting_ops); + gpr_free(and_free_memory); } static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, @@ -499,16 +741,20 @@ static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, calld->pollent = pollent; } +/************************************************************************* + * EXPORTED SYMBOLS + */ + const grpc_channel_filter grpc_client_channel_filter = { cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data), - init_call_elem, + cc_init_call_elem, cc_set_pollset_or_pollset_set, - destroy_call_elem, + cc_destroy_call_elem, sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, + cc_init_channel_elem, + cc_destroy_channel_elem, cc_get_peer, "client-channel", }; @@ -519,41 +765,40 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, /* post construction initialization: set the transport setup pointer */ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->mu_config); + gpr_mu_lock(&chand->mu); GPR_ASSERT(!chand->resolver); chand->resolver = resolver; GRPC_RESOLVER_REF(resolver, "channel"); if (!grpc_closure_list_empty(chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives) { - chand->started_resolving = 1; + chand->started_resolving = true; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, - &chand->on_config_changed); + grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result, + &chand->on_resolver_result_changed); } - gpr_mu_unlock(&chand->mu_config); + gpr_mu_unlock(&chand->mu); } grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { channel_data *chand = elem->channel_data; grpc_connectivity_state out; - gpr_mu_lock(&chand->mu_config); + gpr_mu_lock(&chand->mu); out = grpc_connectivity_state_check(&chand->state_tracker, NULL); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { if (chand->lb_policy != NULL) { grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); } else { - chand->exit_idle_when_lb_policy_arrives = 1; + chand->exit_idle_when_lb_policy_arrives = true; if (!chand->started_resolving && chand->resolver != NULL) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - chand->started_resolving = 1; - grpc_resolver_next(exec_ctx, chand->resolver, - &chand->incoming_configuration, - &chand->on_config_changed); + chand->started_resolving = true; + grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, + &chand->on_resolver_result_changed); } } } - gpr_mu_unlock(&chand->mu_config); + gpr_mu_unlock(&chand->mu); return out; } @@ -588,8 +833,8 @@ void grpc_client_channel_watch_connectivity_state( grpc_closure_init(&w->my_closure, on_external_watch_complete, w); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); - gpr_mu_lock(&chand->mu_config); + gpr_mu_lock(&chand->mu); grpc_connectivity_state_notify_on_state_change( exec_ctx, &chand->state_tracker, state, &w->my_closure); - gpr_mu_unlock(&chand->mu_config); + gpr_mu_unlock(&chand->mu); } |