diff options
author | Sree Kuchibhotla <sreek@google.com> | 2017-03-21 16:38:57 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2017-03-21 16:38:57 -0700 |
commit | 313b895064053adbae6071c3bee5d03cd16a6e56 (patch) | |
tree | 89af0e6561e94ebade6a78f22e1f52f870500061 /src/core/ext/client_channel | |
parent | 36c370793ba250fc423dcd0947fc8a07759c4d08 (diff) | |
parent | 9f615de5ed5c622bd7abe0baaed418fcc560ba9b (diff) |
Merge branch 'master' into cq_create_api_changes
Diffstat (limited to 'src/core/ext/client_channel')
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 123 | ||||
-rw-r--r-- | src/core/ext/client_channel/client_channel_plugin.c | 3 | ||||
-rw-r--r-- | src/core/ext/client_channel/retry_throttle.c | 210 | ||||
-rw-r--r-- | src/core/ext/client_channel/retry_throttle.h | 65 |
4 files changed, 400 insertions, 1 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 960d00e815..00c20913b0 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -47,6 +47,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/client_channel/retry_throttle.h" #include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" @@ -189,6 +190,8 @@ typedef struct client_channel_channel_data { grpc_combiner *combiner; /** currently active load balancer */ grpc_lb_policy *lb_policy; + /** retry throttle data */ + grpc_server_retry_throttle_data *retry_throttle_data; /** maps method names to method_parameters structs */ grpc_slice_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ @@ -284,6 +287,65 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, &w->on_changed); } +typedef struct { + char *server_name; + grpc_server_retry_throttle_data *retry_throttle_data; +} service_config_parsing_state; + +static void parse_retry_throttle_params(const grpc_json *field, void *arg) { + service_config_parsing_state *parsing_state = arg; + if (strcmp(field->key, "retryThrottling") == 0) { + if (parsing_state->retry_throttle_data != NULL) return; // Duplicate. + if (field->type != GRPC_JSON_OBJECT) return; + int max_milli_tokens = 0; + int milli_token_ratio = 0; + for (grpc_json *sub_field = field->child; sub_field != NULL; + sub_field = sub_field->next) { + if (sub_field->key == NULL) return; + if (strcmp(sub_field->key, "maxTokens") == 0) { + if (max_milli_tokens != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value); + if (max_milli_tokens == -1) return; + max_milli_tokens *= 1000; + } else if (strcmp(sub_field->key, "tokenRatio") == 0) { + if (milli_token_ratio != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + // We support up to 3 decimal digits. + size_t whole_len = strlen(sub_field->value); + uint32_t multiplier = 1; + uint32_t decimal_value = 0; + const char *decimal_point = strchr(sub_field->value, '.'); + if (decimal_point != NULL) { + whole_len = (size_t)(decimal_point - sub_field->value); + multiplier = 1000; + size_t decimal_len = strlen(decimal_point + 1); + if (decimal_len > 3) decimal_len = 3; + if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len, + &decimal_value)) { + return; + } + uint32_t decimal_multiplier = 1; + for (size_t i = 0; i < (3 - decimal_len); ++i) { + decimal_multiplier *= 10; + } + decimal_value *= decimal_multiplier; + } + uint32_t whole_value; + if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len, + &whole_value)) { + return; + } + milli_token_ratio = (int)((whole_value * multiplier) + decimal_value); + if (milli_token_ratio <= 0) return; + } + } + parsing_state->retry_throttle_data = + grpc_retry_throttle_map_get_data_for_server( + parsing_state->server_name, max_milli_tokens, milli_token_ratio); + } +} + static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; @@ -295,6 +357,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, bool exit_idle = false; grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); char *service_config_json = NULL; + service_config_parsing_state parsing_state; + memset(&parsing_state, 0, sizeof(parsing_state)); if (chand->resolver_result != NULL) { // Find LB policy name. @@ -355,6 +419,19 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_service_config *service_config = grpc_service_config_create(service_config_json); if (service_config != NULL) { + channel_arg = + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); + GPR_ASSERT(channel_arg != NULL); + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + grpc_uri *uri = + grpc_uri_parse(exec_ctx, channel_arg->value.string, true); + GPR_ASSERT(uri->path[0] != '\0'); + parsing_state.server_name = + uri->path[0] == '/' ? uri->path + 1 : uri->path; + grpc_service_config_parse_global_params( + service_config, parse_retry_throttle_params, &parsing_state); + parsing_state.server_name = NULL; + grpc_uri_destroy(uri); method_params_table = grpc_service_config_create_method_config_table( exec_ctx, service_config, method_parameters_create_from_json, &method_parameters_vtable); @@ -386,6 +463,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, chand->info_service_config_json = service_config_json; } gpr_mu_unlock(&chand->info_mu); + + if (chand->retry_throttle_data != NULL) { + grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); + } + chand->retry_throttle_data = parsing_state.retry_throttle_data; if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -613,6 +695,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, } gpr_free(chand->info_lb_policy_name); gpr_free(chand->info_service_config_json); + if (chand->retry_throttle_data != NULL) { + grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); + } if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -654,6 +739,7 @@ typedef struct client_channel_call_data { grpc_slice path; // Request path. gpr_timespec call_start_time; gpr_timespec deadline; + grpc_server_retry_throttle_data *retry_throttle_data; method_parameters *method_params; grpc_error *cancel_error; @@ -676,6 +762,9 @@ typedef struct client_channel_call_data { grpc_call_stack *owning_call; grpc_linked_mdelem lb_token_mdelem; + + grpc_closure on_complete; + grpc_closure *original_on_complete; } call_data; grpc_subchannel_call *grpc_client_channel_get_subchannel_call( @@ -728,7 +817,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { gpr_free(ops); } -// Sets calld->method_params. +// Sets calld->method_params and calld->retry_throttle_data. // If the method params specify a timeout, populates // *per_method_deadline and returns true. static bool set_call_method_params_from_service_config_locked( @@ -736,6 +825,10 @@ static bool set_call_method_params_from_service_config_locked( gpr_timespec *per_method_deadline) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + if (chand->retry_throttle_data != NULL) { + calld->retry_throttle_data = + grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); + } if (chand->method_params_table != NULL) { calld->method_params = grpc_method_config_table_get( exec_ctx, chand->method_params_table, calld->path); @@ -1056,6 +1149,26 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, add_waiting_locked(calld, op); } +static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_call_element *elem = arg; + call_data *calld = elem->call_data; + if (calld->retry_throttle_data != NULL) { + if (error == GRPC_ERROR_NONE) { + grpc_server_retry_throttle_data_record_success( + calld->retry_throttle_data); + } else { + // TODO(roth): In a subsequent PR, check the return value here and + // decide whether or not to retry. Note that we should only + // record failures whose statuses match the configured retryable + // or non-fatal status codes. + grpc_server_retry_throttle_data_record_failure( + calld->retry_throttle_data); + } + } + grpc_closure_run(exec_ctx, calld->original_on_complete, + GRPC_ERROR_REF(error)); +} + static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0); @@ -1064,6 +1177,14 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_call_element *elem = op->handler_private.args[0]; call_data *calld = elem->call_data; + if (op->recv_trailing_metadata != NULL) { + GPR_ASSERT(op->on_complete != NULL); + calld->original_on_complete = op->on_complete; + grpc_closure_init(&calld->on_complete, on_complete, elem, + grpc_schedule_on_exec_ctx); + op->on_complete = &calld->on_complete; + } + start_transport_stream_op_locked_inner(exec_ctx, op, elem); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c index 28d3b63f99..f51277d0b2 100644 --- a/src/core/ext/client_channel/client_channel_plugin.c +++ b/src/core/ext/client_channel/client_channel_plugin.c @@ -43,6 +43,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/client_channel/retry_throttle.h" #include "src/core/ext/client_channel/subchannel_index.h" #include "src/core/lib/surface/channel_init.h" @@ -82,6 +83,7 @@ static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx, void grpc_client_channel_init(void) { grpc_lb_policy_registry_init(); grpc_resolver_registry_init(); + grpc_retry_throttle_map_init(); grpc_proxy_mapper_registry_init(); grpc_register_http_proxy_mapper(); grpc_subchannel_index_init(); @@ -96,6 +98,7 @@ void grpc_client_channel_shutdown(void) { grpc_subchannel_index_shutdown(); grpc_channel_init_shutdown(); grpc_proxy_mapper_registry_shutdown(); + grpc_retry_throttle_map_shutdown(); grpc_resolver_registry_shutdown(); grpc_lb_policy_registry_shutdown(); } diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c new file mode 100644 index 0000000000..8926c3d782 --- /dev/null +++ b/src/core/ext/client_channel/retry_throttle.c @@ -0,0 +1,210 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/client_channel/retry_throttle.h" + +#include <limits.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/avl.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> + +// +// server_retry_throttle_data +// + +struct grpc_server_retry_throttle_data { + gpr_refcount refs; + int max_milli_tokens; + int milli_token_ratio; + gpr_atm milli_tokens; + // A pointer to the replacement for this grpc_server_retry_throttle_data + // entry. If non-NULL, then this entry is stale and must not be used. + // We hold a reference to the replacement. + gpr_atm replacement; +}; + +static void get_replacement_throttle_data_if_needed( + grpc_server_retry_throttle_data** throttle_data) { + while (true) { + grpc_server_retry_throttle_data* new_throttle_data = + (grpc_server_retry_throttle_data*)gpr_atm_acq_load( + &(*throttle_data)->replacement); + if (new_throttle_data == NULL) return; + *throttle_data = new_throttle_data; + } +} + +bool grpc_server_retry_throttle_data_record_failure( + grpc_server_retry_throttle_data* throttle_data) { + // First, check if we are stale and need to be replaced. + get_replacement_throttle_data_if_needed(&throttle_data); + // We decrement milli_tokens by 1000 (1 token) for each failure. + const int new_value = (int)gpr_atm_no_barrier_clamped_add( + &throttle_data->milli_tokens, (gpr_atm)-1000, (gpr_atm)0, + (gpr_atm)throttle_data->max_milli_tokens); + // Retries are allowed as long as the new value is above the threshold + // (max_milli_tokens / 2). + return new_value > throttle_data->max_milli_tokens / 2; +} + +void grpc_server_retry_throttle_data_record_success( + grpc_server_retry_throttle_data* throttle_data) { + // First, check if we are stale and need to be replaced. + get_replacement_throttle_data_if_needed(&throttle_data); + // We increment milli_tokens by milli_token_ratio for each success. + gpr_atm_no_barrier_clamped_add( + &throttle_data->milli_tokens, (gpr_atm)throttle_data->milli_token_ratio, + (gpr_atm)0, (gpr_atm)throttle_data->max_milli_tokens); +} + +grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( + grpc_server_retry_throttle_data* throttle_data) { + gpr_ref(&throttle_data->refs); + return throttle_data; +} + +void grpc_server_retry_throttle_data_unref( + grpc_server_retry_throttle_data* throttle_data) { + if (gpr_unref(&throttle_data->refs)) { + grpc_server_retry_throttle_data* replacement = + (grpc_server_retry_throttle_data*)gpr_atm_acq_load( + &throttle_data->replacement); + if (replacement != NULL) { + grpc_server_retry_throttle_data_unref(replacement); + } + gpr_free(throttle_data); + } +} + +static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( + int max_milli_tokens, int milli_token_ratio, + grpc_server_retry_throttle_data* old_throttle_data) { + grpc_server_retry_throttle_data* throttle_data = + gpr_malloc(sizeof(*throttle_data)); + memset(throttle_data, 0, sizeof(*throttle_data)); + gpr_ref_init(&throttle_data->refs, 1); + throttle_data->max_milli_tokens = max_milli_tokens; + throttle_data->milli_token_ratio = milli_token_ratio; + int initial_milli_tokens = max_milli_tokens; + // If there was a pre-existing entry for this server name, initialize + // the token count by scaling proportionately to the old data. This + // ensures that if we're already throttling retries on the old scale, + // we will start out doing the same thing on the new one. + if (old_throttle_data != NULL) { + double token_fraction = + (int)gpr_atm_acq_load(&old_throttle_data->milli_tokens) / + (double)old_throttle_data->max_milli_tokens; + initial_milli_tokens = (int)(token_fraction * max_milli_tokens); + } + gpr_atm_rel_store(&throttle_data->milli_tokens, + (gpr_atm)initial_milli_tokens); + // If there was a pre-existing entry, mark it as stale and give it a + // pointer to the new entry, which is its replacement. + if (old_throttle_data != NULL) { + grpc_server_retry_throttle_data_ref(throttle_data); + gpr_atm_rel_store(&old_throttle_data->replacement, (gpr_atm)throttle_data); + } + return throttle_data; +} + +// +// avl vtable for string -> server_retry_throttle_data map +// + +static void* copy_server_name(void* key) { return gpr_strdup(key); } + +static long compare_server_name(void* key1, void* key2) { + return strcmp(key1, key2); +} + +static void destroy_server_retry_throttle_data(void* value) { + grpc_server_retry_throttle_data* throttle_data = value; + grpc_server_retry_throttle_data_unref(throttle_data); +} + +static void* copy_server_retry_throttle_data(void* value) { + grpc_server_retry_throttle_data* throttle_data = value; + return grpc_server_retry_throttle_data_ref(throttle_data); +} + +static const gpr_avl_vtable avl_vtable = { + gpr_free /* destroy_key */, copy_server_name, compare_server_name, + destroy_server_retry_throttle_data, copy_server_retry_throttle_data}; + +// +// server_retry_throttle_map +// + +static gpr_mu g_mu; +static gpr_avl g_avl; + +void grpc_retry_throttle_map_init() { + gpr_mu_init(&g_mu); + g_avl = gpr_avl_create(&avl_vtable); +} + +void grpc_retry_throttle_map_shutdown() { + gpr_mu_destroy(&g_mu); + gpr_avl_unref(g_avl); +} + +grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( + const char* server_name, int max_milli_tokens, int milli_token_ratio) { + gpr_mu_lock(&g_mu); + grpc_server_retry_throttle_data* throttle_data = + gpr_avl_get(g_avl, (char*)server_name); + if (throttle_data == NULL) { + // Entry not found. Create a new one. + throttle_data = grpc_server_retry_throttle_data_create( + max_milli_tokens, milli_token_ratio, NULL); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + } else { + if (throttle_data->max_milli_tokens != max_milli_tokens || + throttle_data->milli_token_ratio != milli_token_ratio) { + // Entry found but with old parameters. Create a new one based on + // the original one. + throttle_data = grpc_server_retry_throttle_data_create( + max_milli_tokens, milli_token_ratio, throttle_data); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + } else { + // Entry found. Increase refcount. + grpc_server_retry_throttle_data_ref(throttle_data); + } + } + gpr_mu_unlock(&g_mu); + return throttle_data; +} diff --git a/src/core/ext/client_channel/retry_throttle.h b/src/core/ext/client_channel/retry_throttle.h new file mode 100644 index 0000000000..f9971faf65 --- /dev/null +++ b/src/core/ext/client_channel/retry_throttle.h @@ -0,0 +1,65 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H + +#include <stdbool.h> + +/// Tracks retry throttling data for an individual server name. +typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data; + +/// Records a failure. Returns true if it's okay to send a retry. +bool grpc_server_retry_throttle_data_record_failure( + grpc_server_retry_throttle_data* throttle_data); +/// Records a success. +void grpc_server_retry_throttle_data_record_success( + grpc_server_retry_throttle_data* throttle_data); + +grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( + grpc_server_retry_throttle_data* throttle_data); +void grpc_server_retry_throttle_data_unref( + grpc_server_retry_throttle_data* throttle_data); + +/// Initializes global map of failure data for each server name. +void grpc_retry_throttle_map_init(); +/// Shuts down global map of failure data for each server name. +void grpc_retry_throttle_map_shutdown(); + +/// Returns a reference to the failure data for \a server_name, creating +/// a new entry if needed. +/// Caller must eventually unref via \a grpc_server_retry_throttle_data_unref(). +grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( + const char* server_name, int max_milli_tokens, int milli_token_ratio); + +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H */ |