diff options
Diffstat (limited to 'src/core/ext/client_config/client_channel.c')
-rw-r--r-- | src/core/ext/client_config/client_channel.c | 193 |
1 files changed, 174 insertions, 19 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index cbf79afa17..beaa9637c3 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -43,6 +43,7 @@ #include <grpc/support/useful.h> #include "src/core/ext/client_config/lb_policy_registry.h" +#include "src/core/ext/client_config/method_config.h" #include "src/core/ext/client_config/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" @@ -53,6 +54,9 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/metadata.h" +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/static_metadata.h" /* Client channel implementation */ @@ -68,12 +72,13 @@ typedef struct client_channel_channel_data { /** client channel factory */ grpc_client_channel_factory *client_channel_factory; - /** mutex protecting client configuration, including all - variables below in this data structure */ + /** mutex protecting all variables below in this data structure */ gpr_mu mu; - /** currently active load balancer - guarded by mu */ + /** currently active load balancer */ grpc_lb_policy *lb_policy; - /** incoming resolver result - set by resolver.next(), guarded by mu */ + /** method config table */ + grpc_method_config_table *method_config_table; + /** incoming resolver result - set by resolver.next() */ grpc_resolver_result *resolver_result; /** a list of closures that are all waiting for config to come in */ grpc_closure_list waiting_for_config_closures; @@ -172,6 +177,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; + grpc_method_config_table *method_config_table = NULL; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; bool exit_idle = false; grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); @@ -220,6 +226,13 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); } + const grpc_arg *channel_arg = grpc_channel_args_find( + lb_policy_args.additional_args, GRPC_ARG_SERVICE_CONFIG); + if (channel_arg != NULL) { + GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); + method_config_table = grpc_method_config_table_ref( + (grpc_method_config_table *)channel_arg->value.pointer.p); + } grpc_resolver_result_unref(exec_ctx, chand->resolver_result); chand->resolver_result = NULL; } @@ -232,6 +245,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&chand->mu); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; + if (chand->method_config_table != NULL) { + grpc_method_config_table_unref(chand->method_config_table); + } + chand->method_config_table = method_config_table; if (lb_policy != NULL) { grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, NULL); @@ -392,6 +409,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } + if (chand->method_config_table != NULL) { + grpc_method_config_table_unref(chand->method_config_table); + } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); gpr_mu_destroy(&chand->mu); @@ -424,7 +444,16 @@ typedef struct client_channel_call_data { // stack and each has its own mutex. If/when we have time, find a way // to avoid this without breaking the grpc_deadline_state abstraction. grpc_deadline_state deadline_state; + + grpc_mdstr *path; // Request path. + gpr_timespec call_start_time; gpr_timespec deadline; + enum { + WAIT_FOR_READY_UNSET, + WAIT_FOR_READY_FALSE, + WAIT_FOR_READY_TRUE + } wait_for_ready_from_service_config; + grpc_closure read_service_config; grpc_error *cancel_error; @@ -532,10 +561,11 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_REFERENCING( "Cancelled before creating subchannel", &error, 1)); } else { + /* Create call on subchannel. */ grpc_subchannel_call *subchannel_call = NULL; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, - &subchannel_call); + exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, + calld->deadline, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); subchannel_call = CANCELLED_CALL; @@ -584,11 +614,15 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); - } else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->initial_metadata_flags, - cpa->connected_subchannel, cpa->on_ready, - GRPC_ERROR_NONE)) { - grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); + } else { + call_data *calld = cpa->elem->call_data; + gpr_mu_lock(&calld->mu); + if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, + cpa->initial_metadata_flags, cpa->connected_subchannel, + cpa->on_ready, GRPC_ERROR_NONE)) { + grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); + } + gpr_mu_unlock(&calld->mu); } gpr_free(cpa); } @@ -631,18 +665,33 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_ASSERT(error == GRPC_ERROR_NONE); if (chand->lb_policy != NULL) { grpc_lb_policy *lb_policy = chand->lb_policy; - int r; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); gpr_mu_unlock(&chand->mu); + // If the application explicitly set wait_for_ready, use that. + // Otherwise, if the service config specified a value for this + // method, use that. + const bool wait_for_ready_set_from_api = + initial_metadata_flags & + GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; + const bool wait_for_ready_set_from_service_config = + calld->wait_for_ready_from_service_config != WAIT_FOR_READY_UNSET; + if (!wait_for_ready_set_from_api && + wait_for_ready_set_from_service_config) { + if (calld->wait_for_ready_from_service_config == WAIT_FOR_READY_TRUE) { + initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; + } else { + initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; + } + } // TODO(dgq): make this deadline configurable somehow. const grpc_lb_policy_pick_args inputs = { initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem, gpr_inf_future(GPR_CLOCK_MONOTONIC)}; - r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel, - NULL, on_ready); + const bool result = grpc_lb_policy_pick( + exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); GPR_TIMER_END("pick_subchannel", 0); - return r; + return result; } if (chand->resolver != NULL && !chand->started_resolving) { chand->started_resolving = true; @@ -768,8 +817,8 @@ retry: calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, - &subchannel_call); + exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, + calld->deadline, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); @@ -786,13 +835,69 @@ retry: GPR_TIMER_END("cc_start_transport_stream_op", 0); } +// Gets data from the service config. Invoked when the resolver returns +// its initial result. +static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = arg; + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + // If this is an error, there's no point in looking at the service config. + if (error == GRPC_ERROR_NONE) { + // Get the method config table from channel data. + gpr_mu_lock(&chand->mu); + grpc_method_config_table *method_config_table = NULL; + if (chand->method_config_table != NULL) { + method_config_table = + grpc_method_config_table_ref(chand->method_config_table); + } + gpr_mu_unlock(&chand->mu); + // If the method config table was present, use it. + if (method_config_table != NULL) { + const grpc_method_config *method_config = + grpc_method_config_table_get_method_config(method_config_table, + calld->path); + if (method_config != NULL) { + const gpr_timespec *per_method_timeout = + grpc_method_config_get_timeout(method_config); + const bool *wait_for_ready = + grpc_method_config_get_wait_for_ready(method_config); + if (per_method_timeout != NULL || wait_for_ready != NULL) { + gpr_mu_lock(&calld->mu); + if (per_method_timeout != NULL) { + gpr_timespec per_method_deadline = + gpr_time_add(calld->call_start_time, *per_method_timeout); + if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { + calld->deadline = per_method_deadline; + // Reset deadline timer. + grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); + } + } + if (wait_for_ready != NULL) { + calld->wait_for_ready_from_service_config = + *wait_for_ready ? WAIT_FOR_READY_TRUE : WAIT_FOR_READY_FALSE; + } + gpr_mu_unlock(&calld->mu); + } + } + grpc_method_config_table_unref(method_config_table); + } + } + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); +} + /* Constructor for call_data */ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { + channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - grpc_deadline_state_init(exec_ctx, elem, args); - calld->deadline = args->deadline; + // Initialize data members. + grpc_deadline_state_init(exec_ctx, elem, args->call_stack); + calld->path = GRPC_MDSTR_REF(args->path); + calld->call_start_time = args->start_time; + calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); + calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET; calld->cancel_error = GRPC_ERROR_NONE; gpr_atm_rel_store(&calld->subchannel_call, 0); gpr_mu_init(&calld->mu); @@ -803,6 +908,55 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; calld->owning_call = args->call_stack; calld->pollent = NULL; + // If the resolver has already returned results, then we can access + // the service config parameters immediately. Otherwise, we need to + // defer that work until the resolver returns an initial result. + // TODO(roth): This code is almost but not quite identical to the code + // in read_service_config() above. It would be nice to find a way to + // combine them, to avoid having to maintain it twice. + gpr_mu_lock(&chand->mu); + if (chand->lb_policy != NULL) { + // We already have a resolver result, so check for service config. + if (chand->method_config_table != NULL) { + grpc_method_config_table *method_config_table = + grpc_method_config_table_ref(chand->method_config_table); + gpr_mu_unlock(&chand->mu); + grpc_method_config *method_config = + grpc_method_config_table_get_method_config(method_config_table, + args->path); + if (method_config != NULL) { + const gpr_timespec *per_method_timeout = + grpc_method_config_get_timeout(method_config); + if (per_method_timeout != NULL) { + gpr_timespec per_method_deadline = + gpr_time_add(calld->call_start_time, *per_method_timeout); + calld->deadline = gpr_time_min(calld->deadline, per_method_deadline); + } + const bool *wait_for_ready = + grpc_method_config_get_wait_for_ready(method_config); + if (wait_for_ready != NULL) { + calld->wait_for_ready_from_service_config = + *wait_for_ready ? WAIT_FOR_READY_TRUE : WAIT_FOR_READY_FALSE; + } + } + grpc_method_config_table_unref(method_config_table); + } else { + gpr_mu_unlock(&chand->mu); + } + } else { + // We don't yet have a resolver result, so register a callback to + // get the service config data once the resolver returns. + // Take a reference to the call stack to be owned by the callback. + GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config"); + grpc_closure_init(&calld->read_service_config, read_service_config, elem); + grpc_closure_list_append(&chand->waiting_for_config_closures, + &calld->read_service_config, GRPC_ERROR_NONE); + gpr_mu_unlock(&chand->mu); + } + // Start the deadline timer with the current deadline value. If we + // do not yet have service config data, then the timer may be reset + // later. + grpc_deadline_state_start(exec_ctx, elem, calld->deadline); return GRPC_ERROR_NONE; } @@ -813,6 +967,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, void *and_free_memory) { call_data *calld = elem->call_data; grpc_deadline_state_destroy(exec_ctx, elem); + GRPC_MDSTR_UNREF(calld->path); GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { |