diff options
Diffstat (limited to 'src/core/ext/client_channel/client_channel.c')
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 220 |
1 files changed, 115 insertions, 105 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 6cbc333b83..651e62b96d 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -76,24 +76,82 @@ typedef enum { WAIT_FOR_READY_TRUE } wait_for_ready_value; -typedef struct method_parameters { +typedef struct { + gpr_refcount refs; gpr_timespec timeout; wait_for_ready_value wait_for_ready; } method_parameters; +static method_parameters* method_parameters_ref( + method_parameters *method_params) { + gpr_ref(&method_params->refs); + return method_params; +} + +static void method_parameters_unref(method_parameters *method_params) { + if (gpr_unref(&method_params->refs)) { + gpr_free(method_params); + } +} + static void *method_parameters_copy(void *value) { - void *new_value = gpr_malloc(sizeof(method_parameters)); - memcpy(new_value, value, sizeof(method_parameters)); - return new_value; + return method_parameters_ref(value); } -static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *p) { - gpr_free(p); +static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) { + method_parameters_unref(value); } static const grpc_slice_hash_table_vtable method_parameters_vtable = { method_parameters_free, method_parameters_copy}; +static bool parse_wait_for_ready(grpc_json *field, + wait_for_ready_value *wait_for_ready) { + if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { + return false; + } + *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE + : WAIT_FOR_READY_FALSE; + return true; +} + +static bool parse_timeout(grpc_json *field, gpr_timespec* timeout) { + if (field->type != GRPC_JSON_STRING) return false; + size_t len = strlen(field->value); + if (field->value[len - 1] != 's') return false; + char *buf = gpr_strdup(field->value); + buf[len - 1] = '\0'; // Remove trailing 's'. + char *decimal_point = strchr(buf, '.'); + if (decimal_point != NULL) { + *decimal_point = '\0'; + timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1); + if (timeout->tv_nsec == -1) { + gpr_free(buf); + return false; + } + // There should always be exactly 3, 6, or 9 fractional digits. + int multiplier = 1; + switch (strlen(decimal_point + 1)) { + case 9: + break; + case 6: + multiplier *= 1000; + break; + case 3: + multiplier *= 1000000; + break; + default: // Unsupported number of digits. + gpr_free(buf); + return false; + } + timeout->tv_nsec *= multiplier; + } + timeout->tv_sec = gpr_parse_nonnegative_int(buf); + gpr_free(buf); + if (timeout->tv_sec == -1) return false; + return true; +} + static void *method_parameters_create_from_json(const grpc_json *json) { wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET; gpr_timespec timeout = {0, 0, GPR_TIMESPAN}; @@ -101,49 +159,14 @@ static void *method_parameters_create_from_json(const grpc_json *json) { if (field->key == NULL) continue; if (strcmp(field->key, "waitForReady") == 0) { if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate. - if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { - return NULL; - } - wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE - : WAIT_FOR_READY_FALSE; + if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL; } else if (strcmp(field->key, "timeout") == 0) { if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate. - if (field->type != GRPC_JSON_STRING) return NULL; - size_t len = strlen(field->value); - if (field->value[len - 1] != 's') return NULL; - char *buf = gpr_strdup(field->value); - buf[len - 1] = '\0'; // Remove trailing 's'. - char *decimal_point = strchr(buf, '.'); - if (decimal_point != NULL) { - *decimal_point = '\0'; - timeout.tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1); - if (timeout.tv_nsec == -1) { - gpr_free(buf); - return NULL; - } - // There should always be exactly 3, 6, or 9 fractional digits. - int multiplier = 1; - switch (strlen(decimal_point + 1)) { - case 9: - break; - case 6: - multiplier *= 1000; - break; - case 3: - multiplier *= 1000000; - break; - default: // Unsupported number of digits. - gpr_free(buf); - return NULL; - } - timeout.tv_nsec *= multiplier; - } - timeout.tv_sec = gpr_parse_nonnegative_int(buf); - if (timeout.tv_sec == -1) return NULL; - gpr_free(buf); + if (!parse_timeout(field, &timeout)) return NULL; } } method_parameters *value = gpr_malloc(sizeof(method_parameters)); + gpr_ref_init(&value->refs, 1); value->timeout = timeout; value->wait_for_ready = wait_for_ready; return value; @@ -629,7 +652,7 @@ typedef struct client_channel_call_data { grpc_slice path; // Request path. gpr_timespec call_start_time; gpr_timespec deadline; - wait_for_ready_value wait_for_ready_from_service_config; + method_parameters *method_params; grpc_closure read_service_config; grpc_error *cancel_error; @@ -836,10 +859,11 @@ static bool pick_subchannel_locked( initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; const bool wait_for_ready_set_from_service_config = - calld->wait_for_ready_from_service_config != WAIT_FOR_READY_UNSET; + calld->method_params != NULL && + calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET; if (!wait_for_ready_set_from_api && wait_for_ready_set_from_service_config) { - if (calld->wait_for_ready_from_service_config == WAIT_FOR_READY_TRUE) { + if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) { initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; } else { initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; @@ -977,10 +1001,9 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, add_waiting_locked(calld, op); } -static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, - void *arg, +static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { - GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0); + GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0); grpc_transport_stream_op *op = arg; grpc_call_element *elem = op->handler_private.args[0]; @@ -990,7 +1013,7 @@ static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "start_transport_stream_op"); - GPR_TIMER_END("cc_start_transport_stream_op_locked", 0); + GPR_TIMER_END("start_transport_stream_op_locked", 0); } /* The logic here is fairly complicated, due to (a) the fact that we @@ -1030,52 +1053,53 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_closure_sched( exec_ctx, grpc_closure_init(&op->handler_private.closure, - cc_start_transport_stream_op_locked, op, + start_transport_stream_op_locked, op, grpc_combiner_scheduler(chand->combiner, false)), GRPC_ERROR_NONE); GPR_TIMER_END("cc_start_transport_stream_op", 0); } +// Sets calld->method_params. +// If the method params specify a timeout, populates +// *per_method_deadline and returns true. +static bool set_call_method_params_from_service_config_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + gpr_timespec *per_method_deadline) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (chand->method_params_table != NULL) { + calld->method_params = grpc_method_config_table_get( + exec_ctx, chand->method_params_table, calld->path); + if (calld->method_params != NULL) { + method_parameters_ref(calld->method_params); + if (gpr_time_cmp(calld->method_params->timeout, + gpr_time_0(GPR_TIMESPAN)) != 0) { + *per_method_deadline = gpr_time_add( + calld->call_start_time, calld->method_params->timeout); + return true; + } + } + } + return false; +} + // Gets data from the service config. Invoked when the resolver returns // its initial result. static void read_service_config_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = arg; - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; // If this is an error, there's no point in looking at the service config. if (error == GRPC_ERROR_NONE) { - // Get the method config table from channel data. - grpc_slice_hash_table *method_params_table = NULL; - if (chand->method_params_table != NULL) { - method_params_table = - grpc_slice_hash_table_ref(chand->method_params_table); - } - // If the method config table was present, use it. - if (method_params_table != NULL) { - const method_parameters *method_params = grpc_method_config_table_get( - exec_ctx, method_params_table, calld->path); - if (method_params != NULL) { - const bool have_method_timeout = - gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0; - if (have_method_timeout || - method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - if (have_method_timeout) { - const gpr_timespec per_method_deadline = - gpr_time_add(calld->call_start_time, method_params->timeout); - if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { - calld->deadline = per_method_deadline; - // Reset deadline timer. - grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); - } - } - if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - calld->wait_for_ready_from_service_config = - method_params->wait_for_ready; - } - } + gpr_timespec per_method_deadline; + if (set_call_method_params_from_service_config_locked( + exec_ctx, elem, &per_method_deadline)) { + // If the deadline from the service config is shorter than the one + // from the client API, reset the deadline timer. + if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { + calld->deadline = per_method_deadline; + grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); } - grpc_slice_hash_table_unref(exec_ctx, method_params_table); } } GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); @@ -1090,29 +1114,12 @@ static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx, // If the resolver has already returned results, then we can access // the service config parameters immediately. Otherwise, we need to // defer that work until the resolver returns an initial result. - // TODO(roth): This code is almost but not quite identical to the code - // in read_service_config() above. It would be nice to find a way to - // combine them, to avoid having to maintain it twice. if (chand->lb_policy != NULL) { // We already have a resolver result, so check for service config. - if (chand->method_params_table != NULL) { - grpc_slice_hash_table *method_params_table = - grpc_slice_hash_table_ref(chand->method_params_table); - method_parameters *method_params = grpc_method_config_table_get( - exec_ctx, method_params_table, calld->path); - if (method_params != NULL) { - if (gpr_time_cmp(method_params->timeout, - gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) { - gpr_timespec per_method_deadline = - gpr_time_add(calld->call_start_time, method_params->timeout); - calld->deadline = gpr_time_min(calld->deadline, per_method_deadline); - } - if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - calld->wait_for_ready_from_service_config = - method_params->wait_for_ready; - } - } - grpc_slice_hash_table_unref(exec_ctx, method_params_table); + gpr_timespec per_method_deadline; + if (set_call_method_params_from_service_config_locked( + exec_ctx, elem, &per_method_deadline)) { + calld->deadline = gpr_time_min(calld->deadline, per_method_deadline); } } else { // We don't yet have a resolver result, so register a callback to @@ -1143,7 +1150,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, calld->path = grpc_slice_ref_internal(args->path); calld->call_start_time = args->start_time; calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); - calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET; + calld->method_params = NULL; calld->cancel_error = GRPC_ERROR_NONE; gpr_atm_rel_store(&calld->subchannel_call, 0); calld->connected_subchannel = NULL; @@ -1171,6 +1178,9 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; grpc_deadline_state_destroy(exec_ctx, elem); grpc_slice_unref_internal(exec_ctx, calld->path); + if (calld->method_params != NULL) { + method_parameters_unref(calld->method_params); + } GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { |