diff options
Diffstat (limited to 'src/core/ext/client_config/client_channel.c')
-rw-r--r-- | src/core/ext/client_config/client_channel.c | 123 |
1 files changed, 89 insertions, 34 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index fed3ae0450..c693c95a03 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -42,9 +42,11 @@ #include <grpc/support/sync.h> #include <grpc/support/useful.h> +#include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/ext/client_config/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/channel/deadline_filter.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" @@ -63,6 +65,8 @@ typedef struct client_channel_channel_data { grpc_resolver *resolver; /** have we started resolving this channel */ bool started_resolving; + /** client channel factory */ + grpc_client_channel_factory *client_channel_factory; /** mutex protecting client configuration, including all variables below in this data structure */ @@ -111,7 +115,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy_cancel_picks( exec_ctx, chand->lb_policy, /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY, - /* check= */ 0); + /* check= */ 0, GRPC_ERROR_REF(error)); } grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, reason); @@ -173,20 +177,28 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); if (chand->resolver_result != NULL) { - lb_policy = grpc_resolver_result_get_lb_policy(chand->resolver_result); + grpc_lb_policy_args lb_policy_args; + lb_policy_args.server_name = + grpc_resolver_result_get_server_name(chand->resolver_result); + lb_policy_args.addresses = + grpc_resolver_result_get_addresses(chand->resolver_result); + lb_policy_args.additional_args = + grpc_resolver_result_get_lb_policy_args(chand->resolver_result); + lb_policy_args.client_channel_factory = chand->client_channel_factory; + lb_policy = grpc_lb_policy_create( + exec_ctx, + grpc_resolver_result_get_lb_policy_name(chand->resolver_result), + &lb_policy_args); if (lb_policy != NULL) { - GRPC_LB_POLICY_REF(lb_policy, "channel"); GRPC_LB_POLICY_REF(lb_policy, "config_change"); GRPC_ERROR_UNREF(state_error); state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); } - grpc_resolver_result_unref(exec_ctx, chand->resolver_result); + chand->resolver_result = NULL; } - chand->resolver_result = NULL; - if (lb_policy != NULL) { grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties, chand->interested_parties); @@ -346,6 +358,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_resolver_shutdown(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); } + if (chand->client_channel_factory != NULL) { + grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory); + } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, chand->lb_policy->interested_parties, @@ -377,6 +392,17 @@ typedef enum { for initial metadata before trying to create a call object, and handling cancellation gracefully. */ typedef struct client_channel_call_data { + // State for handling deadlines. + // The code in deadline_filter.c requires this to be the first field. + // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state + // and this struct both independently store a pointer to the call + // stack and each has its own mutex. If/when we have time, find a way + // to avoid this without breaking the grpc_deadline_state abstraction. + grpc_deadline_state deadline_state; + gpr_timespec deadline; + + grpc_error *cancel_error; + /** either 0 for no call, 1 for cancelled, or a pointer to a grpc_subchannel_call */ gpr_atm subchannel_call; @@ -387,13 +413,15 @@ typedef struct client_channel_call_data { grpc_connected_subchannel *connected_subchannel; grpc_polling_entity *pollent; - grpc_transport_stream_op *waiting_ops; + grpc_transport_stream_op **waiting_ops; size_t waiting_ops_count; size_t waiting_ops_capacity; grpc_closure next_step; grpc_call_stack *owning_call; + + grpc_linked_mdelem lb_token_mdelem; } call_data; static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) { @@ -404,7 +432,7 @@ static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) { gpr_realloc(calld->waiting_ops, calld->waiting_ops_capacity * sizeof(*calld->waiting_ops)); } - calld->waiting_ops[calld->waiting_ops_count++] = *op; + calld->waiting_ops[calld->waiting_ops_count++] = op; GPR_TIMER_END("add_waiting_locked", 0); } @@ -413,14 +441,14 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, size_t i; for (i = 0; i < calld->waiting_ops_count; i++) { grpc_transport_stream_op_finish_with_failure( - exec_ctx, &calld->waiting_ops[i], GRPC_ERROR_REF(error)); + exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error)); } calld->waiting_ops_count = 0; GRPC_ERROR_UNREF(error); } typedef struct { - grpc_transport_stream_op *ops; + grpc_transport_stream_op **ops; size_t nops; grpc_subchannel_call *call; } retry_ops_args; @@ -429,7 +457,7 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { retry_ops_args *a = args; size_t i; for (i = 0; i < a->nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]); + grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]); } GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); gpr_free(a->ops); @@ -437,6 +465,10 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { } static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { + if (calld->waiting_ops_count == 0) { + return; + } + retry_ops_args *a = gpr_malloc(sizeof(*a)); a->ops = calld->waiting_ops; a->nops = calld->waiting_ops_count; @@ -465,7 +497,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, gpr_atm_no_barrier_store(&calld->subchannel_call, 1); fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING( "Failed to create subchannel", &error, 1)); - } else if (1 == gpr_atm_acq_load(&calld->subchannel_call)) { + } else if (GET_CALL(calld) == CANCELLED_CALL) { /* already cancelled before subchannel became ready */ fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING( @@ -473,7 +505,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, } else { grpc_subchannel_call *subchannel_call = NULL; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, + exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); @@ -511,7 +543,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready); + grpc_closure *on_ready, grpc_error *error); static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -522,7 +554,8 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); } else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, - cpa->connected_subchannel, cpa->on_ready)) { + cpa->connected_subchannel, cpa->on_ready, + GRPC_ERROR_NONE)) { grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); } gpr_free(cpa); @@ -532,7 +565,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready) { + grpc_closure *on_ready, grpc_error *error) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; @@ -546,29 +579,34 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, - connected_subchannel); + connected_subchannel, GRPC_ERROR_REF(error)); } for (closure = chand->waiting_for_config_closures.head; closure != NULL; closure = closure->next_data.next) { cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE("Pick cancelled"), NULL); + grpc_exec_ctx_sched( + exec_ctx, cpa->on_ready, + GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL); } } gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); + GRPC_ERROR_UNREF(error); return true; } + GPR_ASSERT(error == GRPC_ERROR_NONE); if (chand->lb_policy != NULL) { grpc_lb_policy *lb_policy = chand->lb_policy; int r; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); gpr_mu_unlock(&chand->mu); - r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent, - initial_metadata, initial_metadata_flags, - connected_subchannel, on_ready); + const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata, + initial_metadata_flags, + &calld->lb_token_mdelem}; + r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel, + NULL, on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); GPR_TIMER_END("pick_subchannel", 0); return r; @@ -610,12 +648,13 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); /* try to (atomically) get the call */ grpc_subchannel_call *call = GET_CALL(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -631,8 +670,8 @@ retry: call = GET_CALL(calld); if (call == CANCELLED_CALL) { gpr_mu_unlock(&calld->mu); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -648,18 +687,24 @@ retry: (gpr_atm)(uintptr_t)CANCELLED_CALL)) { goto retry; } else { + // Stash a copy of cancel_error in our call data, so that we can use + // it for subsequent operations. This ensures that if the call is + // cancelled before any ops are passed down (e.g., if the deadline + // is in the past when the call starts), we can return the right + // error to the caller when the first op does get passed down. + calld->cancel_error = GRPC_ERROR_REF(op->cancel_error); switch (calld->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel, - NULL); + NULL, GRPC_ERROR_REF(op->cancel_error)); break; } gpr_mu_unlock(&calld->mu); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(op->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -673,7 +718,8 @@ retry: GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, op->send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step)) { + &calld->connected_subchannel, &calld->next_step, + GRPC_ERROR_NONE)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } @@ -683,7 +729,7 @@ retry: calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, + exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; @@ -706,6 +752,9 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { call_data *calld = elem->call_data; + grpc_deadline_state_init(exec_ctx, elem, args); + calld->deadline = args->deadline; + calld->cancel_error = GRPC_ERROR_NONE; gpr_atm_rel_store(&calld->subchannel_call, 0); gpr_mu_init(&calld->mu); calld->connected_subchannel = NULL; @@ -724,6 +773,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_final_info *final_info, void *and_free_memory) { call_data *calld = elem->call_data; + grpc_deadline_state_destroy(exec_ctx, elem); + GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); @@ -760,10 +811,12 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; -void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, - grpc_resolver *resolver) { +void grpc_client_channel_finish_initialization( + grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, + grpc_resolver *resolver, + grpc_client_channel_factory *client_channel_factory) { /* post construction initialization: set the transport setup pointer */ + GPR_ASSERT(client_channel_factory != NULL); grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu); @@ -778,6 +831,8 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, // grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result, // &chand->on_resolver_result_changed); // } + chand->client_channel_factory = client_channel_factory; + grpc_client_channel_factory_ref(client_channel_factory); gpr_mu_unlock(&chand->mu); } |