diff options
author | 2017-03-27 09:18:21 -0700 | |
---|---|---|
committer | 2017-03-27 09:18:21 -0700 | |
commit | a0649dde0ec92624d56f943d3b0fae09bcb65e5b (patch) | |
tree | 8b0885a3e6d7584b79b5a31725932f4c606768a1 /src/core/ext/client_channel | |
parent | 3c7e460de68625c926b8e4581ed573e543a16a67 (diff) | |
parent | c4478a103b68100b86f506061cedcb0ce70016ba (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into grpcz_client
Diffstat (limited to 'src/core/ext/client_channel')
-rw-r--r-- | src/core/ext/client_channel/channel_connectivity.c | 4 | ||||
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 188 | ||||
-rw-r--r-- | src/core/ext/client_channel/client_channel_plugin.c | 3 | ||||
-rw-r--r-- | src/core/ext/client_channel/connector.h | 2 | ||||
-rw-r--r-- | src/core/ext/client_channel/default_initial_connect_string.c | 38 | ||||
-rw-r--r-- | src/core/ext/client_channel/http_connect_handshaker.c | 4 | ||||
-rw-r--r-- | src/core/ext/client_channel/initial_connect_string.h | 50 | ||||
-rw-r--r-- | src/core/ext/client_channel/proxy_mapper_registry.c | 12 | ||||
-rw-r--r-- | src/core/ext/client_channel/resolver_registry.c | 1 | ||||
-rw-r--r-- | src/core/ext/client_channel/retry_throttle.c | 210 | ||||
-rw-r--r-- | src/core/ext/client_channel/retry_throttle.h (renamed from src/core/ext/client_channel/initial_connect_string.c) | 43 | ||||
-rw-r--r-- | src/core/ext/client_channel/subchannel.c | 70 | ||||
-rw-r--r-- | src/core/ext/client_channel/subchannel.h | 18 |
13 files changed, 474 insertions, 169 deletions
diff --git a/src/core/ext/client_channel/channel_connectivity.c b/src/core/ext/client_channel/channel_connectivity.c index dd70bc2c6c..f6cb3b9115 100644 --- a/src/core/ext/client_channel/channel_connectivity.c +++ b/src/core/ext/client_channel/channel_connectivity.c @@ -139,8 +139,8 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, error = GRPC_ERROR_NONE; } else { if (error == GRPC_ERROR_NONE) { - error = - GRPC_ERROR_CREATE("Timed out waiting for connection state change"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Timed out waiting for connection state change"); } else if (error == GRPC_ERROR_CANCELLED) { error = GRPC_ERROR_NONE; } diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index f2475cf6ae..435a3ab0fe 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -47,6 +47,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/client_channel/retry_throttle.h" #include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" @@ -189,6 +190,8 @@ typedef struct client_channel_channel_data { grpc_combiner *combiner; /** currently active load balancer */ grpc_lb_policy *lb_policy; + /** retry throttle data */ + grpc_server_retry_throttle_data *retry_throttle_data; /** maps method names to method_parameters structs */ grpc_slice_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ @@ -284,6 +287,65 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, &w->on_changed); } +typedef struct { + char *server_name; + grpc_server_retry_throttle_data *retry_throttle_data; +} service_config_parsing_state; + +static void parse_retry_throttle_params(const grpc_json *field, void *arg) { + service_config_parsing_state *parsing_state = arg; + if (strcmp(field->key, "retryThrottling") == 0) { + if (parsing_state->retry_throttle_data != NULL) return; // Duplicate. + if (field->type != GRPC_JSON_OBJECT) return; + int max_milli_tokens = 0; + int milli_token_ratio = 0; + for (grpc_json *sub_field = field->child; sub_field != NULL; + sub_field = sub_field->next) { + if (sub_field->key == NULL) return; + if (strcmp(sub_field->key, "maxTokens") == 0) { + if (max_milli_tokens != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value); + if (max_milli_tokens == -1) return; + max_milli_tokens *= 1000; + } else if (strcmp(sub_field->key, "tokenRatio") == 0) { + if (milli_token_ratio != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + // We support up to 3 decimal digits. + size_t whole_len = strlen(sub_field->value); + uint32_t multiplier = 1; + uint32_t decimal_value = 0; + const char *decimal_point = strchr(sub_field->value, '.'); + if (decimal_point != NULL) { + whole_len = (size_t)(decimal_point - sub_field->value); + multiplier = 1000; + size_t decimal_len = strlen(decimal_point + 1); + if (decimal_len > 3) decimal_len = 3; + if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len, + &decimal_value)) { + return; + } + uint32_t decimal_multiplier = 1; + for (size_t i = 0; i < (3 - decimal_len); ++i) { + decimal_multiplier *= 10; + } + decimal_value *= decimal_multiplier; + } + uint32_t whole_value; + if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len, + &whole_value)) { + return; + } + milli_token_ratio = (int)((whole_value * multiplier) + decimal_value); + if (milli_token_ratio <= 0) return; + } + } + parsing_state->retry_throttle_data = + grpc_retry_throttle_map_get_data_for_server( + parsing_state->server_name, max_milli_tokens, milli_token_ratio); + } +} + static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; @@ -293,8 +355,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *method_params_table = NULL; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; bool exit_idle = false; - grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); + grpc_error *state_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); char *service_config_json = NULL; + service_config_parsing_state parsing_state; + memset(&parsing_state, 0, sizeof(parsing_state)); if (chand->resolver_result != NULL) { // Find LB policy name. @@ -355,6 +420,19 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_service_config *service_config = grpc_service_config_create(service_config_json); if (service_config != NULL) { + channel_arg = + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); + GPR_ASSERT(channel_arg != NULL); + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + grpc_uri *uri = + grpc_uri_parse(exec_ctx, channel_arg->value.string, true); + GPR_ASSERT(uri->path[0] != '\0'); + parsing_state.server_name = + uri->path[0] == '/' ? uri->path + 1 : uri->path; + grpc_service_config_parse_global_params( + service_config, parse_retry_throttle_params, &parsing_state); + parsing_state.server_name = NULL; + grpc_uri_destroy(uri); method_params_table = grpc_service_config_create_method_config_table( exec_ctx, service_config, method_parameters_create_from_json, &method_parameters_vtable); @@ -386,6 +464,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, chand->info_service_config_json = service_config_json; } gpr_mu_unlock(&chand->info_mu); + + if (chand->retry_throttle_data != NULL) { + grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); + } + chand->retry_throttle_data = parsing_state.retry_throttle_data; if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -393,9 +476,9 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, if (lb_policy != NULL) { grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); } else if (chand->resolver == NULL /* disconnected */) { - grpc_closure_list_fail_all( - &chand->waiting_for_config_closures, - GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1)); + grpc_closure_list_fail_all(&chand->waiting_for_config_closures, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Channel disconnected", &error, 1)); grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); } if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { @@ -423,8 +506,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_error *refs[] = {error, state_error}; set_channel_connectivity_state_locked( exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs, - GPR_ARRAY_SIZE(refs)), + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Got config after disconnection", refs, GPR_ARRAY_SIZE(refs)), "resolver_gone"); } @@ -463,8 +546,9 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, if (op->send_ping != NULL) { if (chand->lb_policy == NULL) { - grpc_closure_sched(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("Ping with no load balancing")); + grpc_closure_sched( + exec_ctx, op->send_ping, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing")); } else { grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping); op->bind_pollset = NULL; @@ -579,7 +663,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, if (proxy_name != NULL) gpr_free(proxy_name); if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args); if (chand->resolver == NULL) { - return GRPC_ERROR_CREATE("resolver creation failed"); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed"); } return GRPC_ERROR_NONE; } @@ -613,6 +697,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, } gpr_free(chand->info_lb_policy_name); gpr_free(chand->info_service_config_json); + if (chand->retry_throttle_data != NULL) { + grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); + } if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -654,6 +741,7 @@ typedef struct client_channel_call_data { grpc_slice path; // Request path. gpr_timespec call_start_time; gpr_timespec deadline; + grpc_server_retry_throttle_data *retry_throttle_data; method_parameters *method_params; grpc_error *cancel_error; @@ -661,6 +749,7 @@ typedef struct client_channel_call_data { /** either 0 for no call, 1 for cancelled, or a pointer to a grpc_subchannel_call */ gpr_atm subchannel_call; + gpr_arena *arena; subchannel_creation_phase creation_phase; grpc_connected_subchannel *connected_subchannel; @@ -675,6 +764,9 @@ typedef struct client_channel_call_data { grpc_call_stack *owning_call; grpc_linked_mdelem lb_token_mdelem; + + grpc_closure on_complete; + grpc_closure *original_on_complete; } call_data; grpc_subchannel_call *grpc_client_channel_get_subchannel_call( @@ -727,7 +819,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { gpr_free(ops); } -// Sets calld->method_params. +// Sets calld->method_params and calld->retry_throttle_data. // If the method params specify a timeout, populates // *per_method_deadline and returns true. static bool set_call_method_params_from_service_config_locked( @@ -735,6 +827,10 @@ static bool set_call_method_params_from_service_config_locked( gpr_timespec *per_method_deadline) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + if (chand->retry_throttle_data != NULL) { + calld->retry_throttle_data = + grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); + } if (chand->method_params_table != NULL) { calld->method_params = grpc_method_config_table_get( exec_ctx, chand->method_params_table, calld->path); @@ -780,12 +876,14 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (calld->connected_subchannel == NULL) { gpr_atm_no_barrier_store(&calld->subchannel_call, 1); - fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING( - "Failed to create subchannel", &error, 1)); + fail_locked(exec_ctx, calld, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed to create subchannel", &error, 1)); } else if (GET_CALL(calld) == CANCELLED_CALL) { /* already cancelled before subchannel became ready */ - grpc_error *cancellation_error = GRPC_ERROR_CREATE_REFERENCING( - "Cancelled before creating subchannel", &error, 1); + grpc_error *cancellation_error = + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Cancelled before creating subchannel", &error, 1); /* if due to deadline, attach the deadline exceeded status to the error */ if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) { cancellation_error = @@ -796,9 +894,14 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { /* Create call on subchannel. */ grpc_subchannel_call *subchannel_call = NULL; + const grpc_connected_subchannel_call_args call_args = { + .pollent = calld->pollent, + .path = calld->path, + .start_time = calld->call_start_time, + .deadline = calld->deadline, + .arena = calld->arena}; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->call_start_time, calld->deadline, &subchannel_call); + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); subchannel_call = CANCELLED_CALL; @@ -882,9 +985,9 @@ static bool pick_subchannel_locked( cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_closure_sched( - exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1)); + grpc_closure_sched(exec_ctx, cpa->on_ready, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick cancelled", &error, 1)); } } GPR_TIMER_END("pick_subchannel", 0); @@ -941,7 +1044,8 @@ static bool pick_subchannel_locked( grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { - grpc_closure_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected")); + grpc_closure_sched(exec_ctx, on_ready, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); } GPR_TIMER_END("pick_subchannel", 0); @@ -1025,9 +1129,14 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; + const grpc_connected_subchannel_call_args call_args = { + .pollent = calld->pollent, + .path = calld->path, + .start_time = calld->call_start_time, + .deadline = calld->deadline, + .arena = calld->arena}; grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->call_start_time, calld->deadline, &subchannel_call); + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); @@ -1045,6 +1154,26 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, add_waiting_locked(calld, op); } +static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_call_element *elem = arg; + call_data *calld = elem->call_data; + if (calld->retry_throttle_data != NULL) { + if (error == GRPC_ERROR_NONE) { + grpc_server_retry_throttle_data_record_success( + calld->retry_throttle_data); + } else { + // TODO(roth): In a subsequent PR, check the return value here and + // decide whether or not to retry. Note that we should only + // record failures whose statuses match the configured retryable + // or non-fatal status codes. + grpc_server_retry_throttle_data_record_failure( + calld->retry_throttle_data); + } + } + grpc_closure_run(exec_ctx, calld->original_on_complete, + GRPC_ERROR_REF(error)); +} + static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0); @@ -1053,6 +1182,14 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_call_element *elem = op->handler_private.args[0]; call_data *calld = elem->call_data; + if (op->recv_trailing_metadata != NULL) { + GPR_ASSERT(op->on_complete != NULL); + calld->original_on_complete = op->on_complete; + grpc_closure_init(&calld->on_complete, on_complete, elem, + grpc_schedule_on_exec_ctx); + op->on_complete = &calld->on_complete; + } + start_transport_stream_op_locked_inner(exec_ctx, op, elem); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, @@ -1114,6 +1251,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, calld->call_start_time = args->start_time; calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); calld->owning_call = args->call_stack; + calld->arena = args->arena; grpc_deadline_state_start(exec_ctx, elem, calld->deadline); return GRPC_ERROR_NONE; } @@ -1122,7 +1260,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *then_schedule_closure) { call_data *calld = elem->call_data; grpc_deadline_state_destroy(exec_ctx, elem); grpc_slice_unref_internal(exec_ctx, calld->path); @@ -1132,6 +1270,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { + grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure); + then_schedule_closure = NULL; GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); @@ -1141,7 +1281,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, "picked"); } gpr_free(calld->waiting_ops); - gpr_free(and_free_memory); + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c index 28d3b63f99..f51277d0b2 100644 --- a/src/core/ext/client_channel/client_channel_plugin.c +++ b/src/core/ext/client_channel/client_channel_plugin.c @@ -43,6 +43,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/client_channel/retry_throttle.h" #include "src/core/ext/client_channel/subchannel_index.h" #include "src/core/lib/surface/channel_init.h" @@ -82,6 +83,7 @@ static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx, void grpc_client_channel_init(void) { grpc_lb_policy_registry_init(); grpc_resolver_registry_init(); + grpc_retry_throttle_map_init(); grpc_proxy_mapper_registry_init(); grpc_register_http_proxy_mapper(); grpc_subchannel_index_init(); @@ -96,6 +98,7 @@ void grpc_client_channel_shutdown(void) { grpc_subchannel_index_shutdown(); grpc_channel_init_shutdown(); grpc_proxy_mapper_registry_shutdown(); + grpc_retry_throttle_map_shutdown(); grpc_resolver_registry_shutdown(); grpc_lb_policy_registry_shutdown(); } diff --git a/src/core/ext/client_channel/connector.h b/src/core/ext/client_channel/connector.h index 9bff41f003..94b5fb5c9e 100644 --- a/src/core/ext/client_channel/connector.h +++ b/src/core/ext/client_channel/connector.h @@ -48,8 +48,6 @@ struct grpc_connector { typedef struct { /** set of pollsets interested in this connection */ grpc_pollset_set *interested_parties; - /** initial connect string to send */ - grpc_slice initial_connect_string; /** deadline for connection */ gpr_timespec deadline; /** channel arguments (to be passed to transport) */ diff --git a/src/core/ext/client_channel/default_initial_connect_string.c b/src/core/ext/client_channel/default_initial_connect_string.c deleted file mode 100644 index 6db82d84ef..0000000000 --- a/src/core/ext/client_channel/default_initial_connect_string.c +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc/slice.h> -#include "src/core/lib/iomgr/resolve_address.h" - -void grpc_set_default_initial_connect_string(grpc_resolved_address **addr, - grpc_slice *initial_str) {} diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c index 58ab233f1b..778d76c39f 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.c +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -116,7 +116,7 @@ static void handshake_failed_locked(grpc_exec_ctx* exec_ctx, // If we were shut down after an endpoint operation succeeded but // before the endpoint callback was invoked, we need to generate our // own error. - error = GRPC_ERROR_CREATE("Handshaker shutdown"); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshaker shutdown"); } if (!handshaker->shutdown) { // TODO(ctiller): It is currently necessary to shutdown endpoints @@ -226,7 +226,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, char* msg; gpr_asprintf(&msg, "HTTP proxy returned response code %d", handshaker->http_response.status); - error = GRPC_ERROR_CREATE(msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); handshake_failed_locked(exec_ctx, handshaker, error); goto done; diff --git a/src/core/ext/client_channel/initial_connect_string.h b/src/core/ext/client_channel/initial_connect_string.h deleted file mode 100644 index 876abea40e..0000000000 --- a/src/core/ext/client_channel/initial_connect_string.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H -#define GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H - -#include <grpc/slice.h> -#include "src/core/lib/iomgr/resolve_address.h" - -typedef void (*grpc_set_initial_connect_string_func)( - grpc_resolved_address **addr, grpc_slice *initial_str); - -void grpc_test_set_initial_connect_string_function( - grpc_set_initial_connect_string_func func); - -/** Set a string to be sent once connected. Optionally reset addr. */ -void grpc_set_initial_connect_string(grpc_resolved_address **addr, - grpc_slice *connect_string); - -#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H */ diff --git a/src/core/ext/client_channel/proxy_mapper_registry.c b/src/core/ext/client_channel/proxy_mapper_registry.c index 2c44b9d490..0935ddbdbd 100644 --- a/src/core/ext/client_channel/proxy_mapper_registry.c +++ b/src/core/ext/client_channel/proxy_mapper_registry.c @@ -94,6 +94,14 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) { grpc_proxy_mapper_destroy(list->list[i]); } gpr_free(list->list); + // Clean up in case we re-initialze later. + // TODO(ctiller): This should ideally live in + // grpc_proxy_mapper_registry_init(). However, if we did this there, + // then we would do it AFTER we start registering proxy mappers from + // third-party plugins, so they'd never show up (and would leak memory). + // We probably need some sort of dependency system for plugins to fix + // this. + memset(list, 0, sizeof(*list)); } // @@ -102,9 +110,7 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) { static grpc_proxy_mapper_list g_proxy_mapper_list; -void grpc_proxy_mapper_registry_init() { - memset(&g_proxy_mapper_list, 0, sizeof(g_proxy_mapper_list)); -} +void grpc_proxy_mapper_registry_init() {} void grpc_proxy_mapper_registry_shutdown() { grpc_proxy_mapper_list_destroy(&g_proxy_mapper_list); diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c index 3c5a6fb3ff..0f074a3386 100644 --- a/src/core/ext/client_channel/resolver_registry.c +++ b/src/core/ext/client_channel/resolver_registry.c @@ -93,7 +93,6 @@ static grpc_resolver_factory *lookup_factory(const char *name) { return g_all_of_the_resolvers[i]; } } - return NULL; } diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c new file mode 100644 index 0000000000..8926c3d782 --- /dev/null +++ b/src/core/ext/client_channel/retry_throttle.c @@ -0,0 +1,210 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/client_channel/retry_throttle.h" + +#include <limits.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/avl.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> + +// +// server_retry_throttle_data +// + +struct grpc_server_retry_throttle_data { + gpr_refcount refs; + int max_milli_tokens; + int milli_token_ratio; + gpr_atm milli_tokens; + // A pointer to the replacement for this grpc_server_retry_throttle_data + // entry. If non-NULL, then this entry is stale and must not be used. + // We hold a reference to the replacement. + gpr_atm replacement; +}; + +static void get_replacement_throttle_data_if_needed( + grpc_server_retry_throttle_data** throttle_data) { + while (true) { + grpc_server_retry_throttle_data* new_throttle_data = + (grpc_server_retry_throttle_data*)gpr_atm_acq_load( + &(*throttle_data)->replacement); + if (new_throttle_data == NULL) return; + *throttle_data = new_throttle_data; + } +} + +bool grpc_server_retry_throttle_data_record_failure( + grpc_server_retry_throttle_data* throttle_data) { + // First, check if we are stale and need to be replaced. + get_replacement_throttle_data_if_needed(&throttle_data); + // We decrement milli_tokens by 1000 (1 token) for each failure. + const int new_value = (int)gpr_atm_no_barrier_clamped_add( + &throttle_data->milli_tokens, (gpr_atm)-1000, (gpr_atm)0, + (gpr_atm)throttle_data->max_milli_tokens); + // Retries are allowed as long as the new value is above the threshold + // (max_milli_tokens / 2). + return new_value > throttle_data->max_milli_tokens / 2; +} + +void grpc_server_retry_throttle_data_record_success( + grpc_server_retry_throttle_data* throttle_data) { + // First, check if we are stale and need to be replaced. + get_replacement_throttle_data_if_needed(&throttle_data); + // We increment milli_tokens by milli_token_ratio for each success. + gpr_atm_no_barrier_clamped_add( + &throttle_data->milli_tokens, (gpr_atm)throttle_data->milli_token_ratio, + (gpr_atm)0, (gpr_atm)throttle_data->max_milli_tokens); +} + +grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( + grpc_server_retry_throttle_data* throttle_data) { + gpr_ref(&throttle_data->refs); + return throttle_data; +} + +void grpc_server_retry_throttle_data_unref( + grpc_server_retry_throttle_data* throttle_data) { + if (gpr_unref(&throttle_data->refs)) { + grpc_server_retry_throttle_data* replacement = + (grpc_server_retry_throttle_data*)gpr_atm_acq_load( + &throttle_data->replacement); + if (replacement != NULL) { + grpc_server_retry_throttle_data_unref(replacement); + } + gpr_free(throttle_data); + } +} + +static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( + int max_milli_tokens, int milli_token_ratio, + grpc_server_retry_throttle_data* old_throttle_data) { + grpc_server_retry_throttle_data* throttle_data = + gpr_malloc(sizeof(*throttle_data)); + memset(throttle_data, 0, sizeof(*throttle_data)); + gpr_ref_init(&throttle_data->refs, 1); + throttle_data->max_milli_tokens = max_milli_tokens; + throttle_data->milli_token_ratio = milli_token_ratio; + int initial_milli_tokens = max_milli_tokens; + // If there was a pre-existing entry for this server name, initialize + // the token count by scaling proportionately to the old data. This + // ensures that if we're already throttling retries on the old scale, + // we will start out doing the same thing on the new one. + if (old_throttle_data != NULL) { + double token_fraction = + (int)gpr_atm_acq_load(&old_throttle_data->milli_tokens) / + (double)old_throttle_data->max_milli_tokens; + initial_milli_tokens = (int)(token_fraction * max_milli_tokens); + } + gpr_atm_rel_store(&throttle_data->milli_tokens, + (gpr_atm)initial_milli_tokens); + // If there was a pre-existing entry, mark it as stale and give it a + // pointer to the new entry, which is its replacement. + if (old_throttle_data != NULL) { + grpc_server_retry_throttle_data_ref(throttle_data); + gpr_atm_rel_store(&old_throttle_data->replacement, (gpr_atm)throttle_data); + } + return throttle_data; +} + +// +// avl vtable for string -> server_retry_throttle_data map +// + +static void* copy_server_name(void* key) { return gpr_strdup(key); } + +static long compare_server_name(void* key1, void* key2) { + return strcmp(key1, key2); +} + +static void destroy_server_retry_throttle_data(void* value) { + grpc_server_retry_throttle_data* throttle_data = value; + grpc_server_retry_throttle_data_unref(throttle_data); +} + +static void* copy_server_retry_throttle_data(void* value) { + grpc_server_retry_throttle_data* throttle_data = value; + return grpc_server_retry_throttle_data_ref(throttle_data); +} + +static const gpr_avl_vtable avl_vtable = { + gpr_free /* destroy_key */, copy_server_name, compare_server_name, + destroy_server_retry_throttle_data, copy_server_retry_throttle_data}; + +// +// server_retry_throttle_map +// + +static gpr_mu g_mu; +static gpr_avl g_avl; + +void grpc_retry_throttle_map_init() { + gpr_mu_init(&g_mu); + g_avl = gpr_avl_create(&avl_vtable); +} + +void grpc_retry_throttle_map_shutdown() { + gpr_mu_destroy(&g_mu); + gpr_avl_unref(g_avl); +} + +grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( + const char* server_name, int max_milli_tokens, int milli_token_ratio) { + gpr_mu_lock(&g_mu); + grpc_server_retry_throttle_data* throttle_data = + gpr_avl_get(g_avl, (char*)server_name); + if (throttle_data == NULL) { + // Entry not found. Create a new one. + throttle_data = grpc_server_retry_throttle_data_create( + max_milli_tokens, milli_token_ratio, NULL); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + } else { + if (throttle_data->max_milli_tokens != max_milli_tokens || + throttle_data->milli_token_ratio != milli_token_ratio) { + // Entry found but with old parameters. Create a new one based on + // the original one. + throttle_data = grpc_server_retry_throttle_data_create( + max_milli_tokens, milli_token_ratio, throttle_data); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + } else { + // Entry found. Increase refcount. + grpc_server_retry_throttle_data_ref(throttle_data); + } + } + gpr_mu_unlock(&g_mu); + return throttle_data; +} diff --git a/src/core/ext/client_channel/initial_connect_string.c b/src/core/ext/client_channel/retry_throttle.h index 8ebd06c458..f9971faf65 100644 --- a/src/core/ext/client_channel/initial_connect_string.c +++ b/src/core/ext/client_channel/retry_throttle.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2017, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,22 +31,35 @@ * */ -#include "src/core/ext/client_channel/initial_connect_string.h" +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H -#include <stddef.h> +#include <stdbool.h> -extern void grpc_set_default_initial_connect_string( - grpc_resolved_address **addr, grpc_slice *initial_str); +/// Tracks retry throttling data for an individual server name. +typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data; -static grpc_set_initial_connect_string_func g_set_initial_connect_string_func = - grpc_set_default_initial_connect_string; +/// Records a failure. Returns true if it's okay to send a retry. +bool grpc_server_retry_throttle_data_record_failure( + grpc_server_retry_throttle_data* throttle_data); +/// Records a success. +void grpc_server_retry_throttle_data_record_success( + grpc_server_retry_throttle_data* throttle_data); -void grpc_test_set_initial_connect_string_function( - grpc_set_initial_connect_string_func func) { - g_set_initial_connect_string_func = func; -} +grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( + grpc_server_retry_throttle_data* throttle_data); +void grpc_server_retry_throttle_data_unref( + grpc_server_retry_throttle_data* throttle_data); -void grpc_set_initial_connect_string(grpc_resolved_address **addr, - grpc_slice *initial_str) { - g_set_initial_connect_string_func(addr, initial_str); -} +/// Initializes global map of failure data for each server name. +void grpc_retry_throttle_map_init(); +/// Shuts down global map of failure data for each server name. +void grpc_retry_throttle_map_shutdown(); + +/// Returns a reference to the failure data for \a server_name, creating +/// a new entry if needed. +/// Caller must eventually unref via \a grpc_server_retry_throttle_data_unref(). +grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( + const char* server_name, int max_milli_tokens, int milli_token_ratio); + +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H */ diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 5df0a9060d..063c0badff 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -41,7 +41,6 @@ #include <grpc/support/string_util.h> #include "src/core/ext/client_channel/client_channel.h" -#include "src/core/ext/client_channel/initial_connect_string.h" #include "src/core/ext/client_channel/parse_address.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/subchannel_index.h" @@ -103,9 +102,6 @@ struct grpc_subchannel { grpc_subchannel_key *key; - /** initial string to send to peer */ - grpc_slice initial_connect_string; - /** set during connection */ grpc_connect_out_args connecting_result; @@ -148,6 +144,7 @@ struct grpc_subchannel { struct grpc_subchannel_call { grpc_connected_subchannel *connection; + grpc_closure *schedule_closure_after_destroy; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) @@ -214,7 +211,6 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_subchannel *c = arg; gpr_free((void *)c->filters); grpc_channel_args_destroy(exec_ctx, c->args); - grpc_slice_unref_internal(exec_ctx, c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); grpc_pollset_set_destroy(exec_ctx, c->pollset_set); @@ -273,8 +269,9 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); c->disconnected = true; - grpc_connector_shutdown(exec_ctx, c->connector, - GRPC_ERROR_CREATE("Subchannel disconnected")); + grpc_connector_shutdown( + exec_ctx, c->connector, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Subchannel disconnected")); con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection"); @@ -332,7 +329,6 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, c->pollset_set = grpc_pollset_set_create(); grpc_resolved_address *addr = gpr_malloc(sizeof(*addr)); grpc_get_subchannel_address_arg(exec_ctx, args->args, addr); - grpc_set_initial_connect_string(&addr, &c->initial_connect_string); grpc_resolved_address *new_address = NULL; grpc_channel_args *new_args = NULL; if (grpc_proxy_mappers_map_address(exec_ctx, addr, args->args, &new_address, @@ -340,17 +336,15 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(new_address != NULL); gpr_free(addr); addr = new_address; - if (new_args != NULL) c->args = new_args; - } - if (c->args == NULL) { - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; - grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); - c->args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, - 1); - gpr_free(new_arg.value.string); } + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; + grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); gpr_free(addr); + c->args = grpc_channel_args_copy_and_add_and_remove( + new_args != NULL ? new_args : args->args, keys_to_remove, + GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); + gpr_free(new_arg.value.string); + if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c, @@ -405,7 +399,6 @@ static void continue_connect_locked(grpc_exec_ctx *exec_ctx, args.interested_parties = c->pollset_set; args.deadline = c->next_attempt; args.channel_args = c->args; - args.initial_connect_string = c->initial_connect_string; grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, @@ -445,7 +438,8 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_mu_lock(&c->mu); c->have_alarm = false; if (c->disconnected) { - error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1); + error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", + &error, 1); } else { GRPC_ERROR_REF(error); } @@ -696,9 +690,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, } else { grpc_connectivity_state_set( exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - grpc_error_set_int( - GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Connect Failed", &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), "connect_failed"); const char *errmsg = grpc_error_string(error); @@ -719,13 +713,22 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_subchannel_call *c = call; + GPR_ASSERT(c->schedule_closure_after_destroy != NULL); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); grpc_connected_subchannel *connection = c->connection; - grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, c); + grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, + c->schedule_closure_after_destroy); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } +void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call *call, + grpc_closure *closure) { + GPR_ASSERT(call->schedule_closure_after_destroy == NULL); + GPR_ASSERT(closure != NULL); + call->schedule_closure_after_destroy = closure; +} + void grpc_subchannel_call_ref( grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); @@ -761,15 +764,22 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, - gpr_timespec deadline, grpc_subchannel_call **call) { + const grpc_connected_subchannel_call_args *args, + grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - *call = gpr_zalloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); + *call = gpr_arena_alloc( + args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. - grpc_error *error = - grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, path, start_time, deadline, callstk); + const grpc_call_element_args call_args = {.call_stack = callstk, + .server_transport_data = NULL, + .context = NULL, + .path = args->path, + .start_time = args->start_time, + .deadline = args->deadline, + .arena = args->arena}; + grpc_error *error = grpc_call_stack_init( + exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); @@ -778,7 +788,7 @@ grpc_error *grpc_connected_subchannel_create_call( return error; } GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); - grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent); + grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, args->pollent); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 6a70a76467..3e64a2507c 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -37,6 +37,7 @@ #include "src/core/ext/client_channel/connector.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" @@ -112,10 +113,18 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_REF_EXTRA_ARGS); /** construct a subchannel call */ +typedef struct { + grpc_polling_entity *pollent; + grpc_slice path; + gpr_timespec start_time; + gpr_timespec deadline; + gpr_arena *arena; +} grpc_connected_subchannel_call_args; + grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, - gpr_timespec deadline, grpc_subchannel_call **subchannel_call); + const grpc_connected_subchannel_call_args *args, + grpc_subchannel_call **subchannel_call); /** process a transport level op */ void grpc_connected_subchannel_process_transport_op( @@ -154,6 +163,11 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call); +/** Must be called once per call. Sets the 'then_schedule_closure' argument for + call stack destruction. */ +void grpc_subchannel_call_set_cleanup_closure( + grpc_subchannel_call *subchannel_call, grpc_closure *closure); + grpc_call_stack *grpc_subchannel_call_get_call_stack( grpc_subchannel_call *subchannel_call); |