diff options
-rw-r--r-- | BUILD | 2 | ||||
-rw-r--r-- | CMakeLists.txt | 4 | ||||
-rw-r--r-- | Makefile | 4 | ||||
-rw-r--r-- | binding.gyp | 1 | ||||
-rw-r--r-- | build.yaml | 2 | ||||
-rw-r--r-- | config.m4 | 1 | ||||
-rw-r--r-- | gRPC-Core.podspec | 3 | ||||
-rwxr-xr-x | grpc.gemspec | 2 | ||||
-rw-r--r-- | package.xml | 2 | ||||
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 118 | ||||
-rw-r--r-- | src/core/ext/client_channel/client_channel_plugin.c | 3 | ||||
-rw-r--r-- | src/core/ext/client_channel/retry_throttle.c | 242 | ||||
-rw-r--r-- | src/core/ext/client_channel/retry_throttle.h | 69 | ||||
-rw-r--r-- | src/core/lib/transport/service_config.c | 12 | ||||
-rw-r--r-- | src/core/lib/transport/service_config.h | 6 | ||||
-rw-r--r-- | src/python/grpcio/grpc_core_dependencies.py | 1 | ||||
-rw-r--r-- | tools/doxygen/Doxyfile.core.internal | 2 | ||||
-rw-r--r-- | tools/run_tests/generated/sources_and_headers.json | 3 | ||||
-rw-r--r-- | vsprojects/vcxproj/grpc/grpc.vcxproj | 3 | ||||
-rw-r--r-- | vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 6 | ||||
-rw-r--r-- | vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj | 3 | ||||
-rw-r--r-- | vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters | 6 |
22 files changed, 495 insertions, 0 deletions
@@ -692,6 +692,7 @@ grpc_cc_library( "src/core/ext/client_channel/resolver.c", "src/core/ext/client_channel/resolver_factory.c", "src/core/ext/client_channel/resolver_registry.c", + "src/core/ext/client_channel/retry_throttle.c", "src/core/ext/client_channel/subchannel.c", "src/core/ext/client_channel/subchannel_index.c", "src/core/ext/client_channel/uri_parser.c", @@ -712,6 +713,7 @@ grpc_cc_library( "src/core/ext/client_channel/resolver.h", "src/core/ext/client_channel/resolver_factory.h", "src/core/ext/client_channel/resolver_registry.h", + "src/core/ext/client_channel/retry_throttle.h", "src/core/ext/client_channel/subchannel.h", "src/core/ext/client_channel/subchannel_index.h", "src/core/ext/client_channel/uri_parser.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 65df5d7f7c..8bc07255f1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1046,6 +1046,7 @@ add_library(grpc src/core/ext/client_channel/resolver.c src/core/ext/client_channel/resolver_factory.c src/core/ext/client_channel/resolver_registry.c + src/core/ext/client_channel/retry_throttle.c src/core/ext/client_channel/subchannel.c src/core/ext/client_channel/subchannel_index.c src/core/ext/client_channel/uri_parser.c @@ -1328,6 +1329,7 @@ add_library(grpc_cronet src/core/ext/client_channel/resolver.c src/core/ext/client_channel/resolver_factory.c src/core/ext/client_channel/resolver_registry.c + src/core/ext/client_channel/retry_throttle.c src/core/ext/client_channel/subchannel.c src/core/ext/client_channel/subchannel_index.c src/core/ext/client_channel/uri_parser.c @@ -1876,6 +1878,7 @@ add_library(grpc_unsecure src/core/ext/client_channel/resolver.c src/core/ext/client_channel/resolver_factory.c src/core/ext/client_channel/resolver_registry.c + src/core/ext/client_channel/retry_throttle.c src/core/ext/client_channel/subchannel.c src/core/ext/client_channel/subchannel_index.c src/core/ext/client_channel/uri_parser.c @@ -2457,6 +2460,7 @@ add_library(grpc++_cronet src/core/ext/client_channel/resolver.c src/core/ext/client_channel/resolver_factory.c src/core/ext/client_channel/resolver_registry.c + src/core/ext/client_channel/retry_throttle.c src/core/ext/client_channel/subchannel.c src/core/ext/client_channel/subchannel_index.c src/core/ext/client_channel/uri_parser.c @@ -2933,6 +2933,7 @@ LIBGRPC_SRC = \ src/core/ext/client_channel/resolver.c \ src/core/ext/client_channel/resolver_factory.c \ src/core/ext/client_channel/resolver_registry.c \ + src/core/ext/client_channel/retry_throttle.c \ src/core/ext/client_channel/subchannel.c \ src/core/ext/client_channel/subchannel_index.c \ src/core/ext/client_channel/uri_parser.c \ @@ -3218,6 +3219,7 @@ LIBGRPC_CRONET_SRC = \ src/core/ext/client_channel/resolver.c \ src/core/ext/client_channel/resolver_factory.c \ src/core/ext/client_channel/resolver_registry.c \ + src/core/ext/client_channel/retry_throttle.c \ src/core/ext/client_channel/subchannel.c \ src/core/ext/client_channel/subchannel_index.c \ src/core/ext/client_channel/uri_parser.c \ @@ -3749,6 +3751,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/client_channel/resolver.c \ src/core/ext/client_channel/resolver_factory.c \ src/core/ext/client_channel/resolver_registry.c \ + src/core/ext/client_channel/retry_throttle.c \ src/core/ext/client_channel/subchannel.c \ src/core/ext/client_channel/subchannel_index.c \ src/core/ext/client_channel/uri_parser.c \ @@ -4332,6 +4335,7 @@ LIBGRPC++_CRONET_SRC = \ src/core/ext/client_channel/resolver.c \ src/core/ext/client_channel/resolver_factory.c \ src/core/ext/client_channel/resolver_registry.c \ + src/core/ext/client_channel/retry_throttle.c \ src/core/ext/client_channel/subchannel.c \ src/core/ext/client_channel/subchannel_index.c \ src/core/ext/client_channel/uri_parser.c \ diff --git a/binding.gyp b/binding.gyp index c521a27c30..f6a04b27f9 100644 --- a/binding.gyp +++ b/binding.gyp @@ -796,6 +796,7 @@ 'src/core/ext/client_channel/resolver.c', 'src/core/ext/client_channel/resolver_factory.c', 'src/core/ext/client_channel/resolver_registry.c', + 'src/core/ext/client_channel/retry_throttle.c', 'src/core/ext/client_channel/subchannel.c', 'src/core/ext/client_channel/subchannel_index.c', 'src/core/ext/client_channel/uri_parser.c', diff --git a/build.yaml b/build.yaml index 7c43df3250..ae546cbb30 100644 --- a/build.yaml +++ b/build.yaml @@ -417,6 +417,7 @@ filegroups: - src/core/ext/client_channel/resolver.h - src/core/ext/client_channel/resolver_factory.h - src/core/ext/client_channel/resolver_registry.h + - src/core/ext/client_channel/retry_throttle.h - src/core/ext/client_channel/subchannel.h - src/core/ext/client_channel/subchannel_index.h - src/core/ext/client_channel/uri_parser.h @@ -439,6 +440,7 @@ filegroups: - src/core/ext/client_channel/resolver.c - src/core/ext/client_channel/resolver_factory.c - src/core/ext/client_channel/resolver_registry.c + - src/core/ext/client_channel/retry_throttle.c - src/core/ext/client_channel/subchannel.c - src/core/ext/client_channel/subchannel_index.c - src/core/ext/client_channel/uri_parser.c @@ -269,6 +269,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/client_channel/resolver.c \ src/core/ext/client_channel/resolver_factory.c \ src/core/ext/client_channel/resolver_registry.c \ + src/core/ext/client_channel/retry_throttle.c \ src/core/ext/client_channel/subchannel.c \ src/core/ext/client_channel/subchannel_index.c \ src/core/ext/client_channel/uri_parser.c \ diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 027babcda4..2fb00a3afe 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -418,6 +418,7 @@ Pod::Spec.new do |s| 'src/core/ext/client_channel/resolver.h', 'src/core/ext/client_channel/resolver_factory.h', 'src/core/ext/client_channel/resolver_registry.h', + 'src/core/ext/client_channel/retry_throttle.h', 'src/core/ext/client_channel/subchannel.h', 'src/core/ext/client_channel/subchannel_index.h', 'src/core/ext/client_channel/uri_parser.h', @@ -637,6 +638,7 @@ Pod::Spec.new do |s| 'src/core/ext/client_channel/resolver.c', 'src/core/ext/client_channel/resolver_factory.c', 'src/core/ext/client_channel/resolver_registry.c', + 'src/core/ext/client_channel/retry_throttle.c', 'src/core/ext/client_channel/subchannel.c', 'src/core/ext/client_channel/subchannel_index.c', 'src/core/ext/client_channel/uri_parser.c', @@ -853,6 +855,7 @@ Pod::Spec.new do |s| 'src/core/ext/client_channel/resolver.h', 'src/core/ext/client_channel/resolver_factory.h', 'src/core/ext/client_channel/resolver_registry.h', + 'src/core/ext/client_channel/retry_throttle.h', 'src/core/ext/client_channel/subchannel.h', 'src/core/ext/client_channel/subchannel_index.h', 'src/core/ext/client_channel/uri_parser.h', diff --git a/grpc.gemspec b/grpc.gemspec index 8d5b7b2ab1..1ca2446e65 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -335,6 +335,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/client_channel/resolver.h ) s.files += %w( src/core/ext/client_channel/resolver_factory.h ) s.files += %w( src/core/ext/client_channel/resolver_registry.h ) + s.files += %w( src/core/ext/client_channel/retry_throttle.h ) s.files += %w( src/core/ext/client_channel/subchannel.h ) s.files += %w( src/core/ext/client_channel/subchannel_index.h ) s.files += %w( src/core/ext/client_channel/uri_parser.h ) @@ -554,6 +555,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/client_channel/resolver.c ) s.files += %w( src/core/ext/client_channel/resolver_factory.c ) s.files += %w( src/core/ext/client_channel/resolver_registry.c ) + s.files += %w( src/core/ext/client_channel/retry_throttle.c ) s.files += %w( src/core/ext/client_channel/subchannel.c ) s.files += %w( src/core/ext/client_channel/subchannel_index.c ) s.files += %w( src/core/ext/client_channel/uri_parser.c ) diff --git a/package.xml b/package.xml index 4167bef26e..e29f462d33 100644 --- a/package.xml +++ b/package.xml @@ -344,6 +344,7 @@ <file baseinstalldir="/" name="src/core/ext/client_channel/resolver.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/client_channel/resolver_factory.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/client_channel/resolver_registry.h" role="src" /> + <file baseinstalldir="/" name="src/core/ext/client_channel/retry_throttle.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/client_channel/subchannel.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/client_channel/subchannel_index.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/client_channel/uri_parser.h" role="src" /> @@ -563,6 +564,7 @@ <file baseinstalldir="/" name="src/core/ext/client_channel/resolver.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/client_channel/resolver_factory.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/client_channel/resolver_registry.c" role="src" /> + <file baseinstalldir="/" name="src/core/ext/client_channel/retry_throttle.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/client_channel/subchannel.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/client_channel/subchannel_index.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/client_channel/uri_parser.c" role="src" /> diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index f2475cf6ae..ef273669e8 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -47,6 +47,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/client_channel/retry_throttle.h" #include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" @@ -189,6 +190,8 @@ typedef struct client_channel_channel_data { grpc_combiner *combiner; /** currently active load balancer */ grpc_lb_policy *lb_policy; + /** retry throttle data */ + grpc_server_retry_throttle_data *retry_throttle_data; /** maps method names to method_parameters structs */ grpc_slice_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ @@ -284,6 +287,65 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, &w->on_changed); } +typedef struct { + char *server_name; + grpc_server_retry_throttle_data *retry_throttle_data; +} service_config_parsing_state; + +static void parse_retry_throttle_params(const grpc_json *field, void *arg) { + service_config_parsing_state *parsing_state = arg; + if (strcmp(field->key, "retryThrottling") == 0) { + if (parsing_state->retry_throttle_data != NULL) return; // Duplicate. + if (field->type != GRPC_JSON_OBJECT) return; + int max_milli_tokens = 0; + int milli_token_ratio = 0; + for (grpc_json *sub_field = field->child; sub_field != NULL; + sub_field = sub_field->next) { + if (sub_field->key == NULL) return; + if (strcmp(sub_field->key, "maxTokens") == 0) { + if (max_milli_tokens != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value); + if (max_milli_tokens == -1) return; + max_milli_tokens *= 1000; + } else if (strcmp(sub_field->key, "tokenRatio") == 0) { + if (milli_token_ratio != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + // We support up to 3 decimal digits. + size_t whole_len = strlen(sub_field->value); + uint32_t multiplier = 1; + uint32_t decimal_value = 0; + const char *decimal_point = strchr(sub_field->value, '.'); + if (decimal_point != NULL) { + whole_len = (size_t)(decimal_point - sub_field->value); + multiplier = 1000; + size_t decimal_len = strlen(decimal_point + 1); + if (decimal_len > 3) decimal_len = 3; + if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len, + &decimal_value)) { + return; + } + uint32_t decimal_multiplier = 1; + for (size_t i = 0; i < (3 - decimal_len); ++i) { + decimal_multiplier *= 10; + } + decimal_value *= decimal_multiplier; + } + uint32_t whole_value; + if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len, + &whole_value)) { + return; + } + milli_token_ratio = (int)((whole_value * multiplier) + decimal_value); + if (milli_token_ratio <= 0) return; + } + } + parsing_state->retry_throttle_data = + grpc_retry_throttle_map_get_data_for_server( + parsing_state->server_name, max_milli_tokens, milli_token_ratio); + } +} + static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; @@ -295,6 +357,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, bool exit_idle = false; grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); char *service_config_json = NULL; + service_config_parsing_state parsing_state; + memset(&parsing_state, 0, sizeof(parsing_state)); if (chand->resolver_result != NULL) { // Find LB policy name. @@ -355,6 +419,18 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_service_config *service_config = grpc_service_config_create(service_config_json); if (service_config != NULL) { + channel_arg = + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); + GPR_ASSERT(channel_arg != NULL); + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + grpc_uri *uri = grpc_uri_parse(channel_arg->value.string, true); + GPR_ASSERT(uri->path[0] != '\0'); + parsing_state.server_name = + uri->path[0] == '/' ? uri->path + 1 : uri->path; + grpc_service_config_parse_global_params( + service_config, parse_retry_throttle_params, &parsing_state); + parsing_state.server_name = NULL; + grpc_uri_destroy(uri); method_params_table = grpc_service_config_create_method_config_table( exec_ctx, service_config, method_parameters_create_from_json, &method_parameters_vtable); @@ -386,6 +462,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, chand->info_service_config_json = service_config_json; } gpr_mu_unlock(&chand->info_mu); + + if (chand->retry_throttle_data != NULL) { + grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); + } + chand->retry_throttle_data = parsing_state.retry_throttle_data; if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -613,6 +694,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, } gpr_free(chand->info_lb_policy_name); gpr_free(chand->info_service_config_json); + if (chand->retry_throttle_data != NULL) { + grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); + } if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -675,6 +759,9 @@ typedef struct client_channel_call_data { grpc_call_stack *owning_call; grpc_linked_mdelem lb_token_mdelem; + + grpc_closure on_complete; + grpc_closure *original_on_complete; } call_data; grpc_subchannel_call *grpc_client_channel_get_subchannel_call( @@ -1045,14 +1132,45 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, add_waiting_locked(calld, op); } +static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = arg; + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (chand->retry_throttle_data != NULL) { + if (error == GRPC_ERROR_NONE) { + grpc_server_retry_throttle_data_record_success( + &chand->retry_throttle_data); + } else { + // TODO(roth): In a subsequent PR, check the return value here and + // decide whether or not to retry. Note that we should only + // record failures whose statuses match the configured retryable + // or non-fatal status codes. + grpc_server_retry_throttle_data_record_failure( + &chand->retry_throttle_data); + } + } + grpc_closure_run(exec_ctx, calld->original_on_complete, + GRPC_ERROR_REF(error)); +} + static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0); grpc_transport_stream_op *op = arg; grpc_call_element *elem = op->handler_private.args[0]; + channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + if (op->recv_trailing_metadata != NULL) { + GPR_ASSERT(op->on_complete != NULL); + calld->original_on_complete = op->on_complete; + grpc_closure_init(&calld->on_complete, on_complete_locked, elem, + grpc_combiner_scheduler(chand->combiner, false)); + op->on_complete = &calld->on_complete; + } + start_transport_stream_op_locked_inner(exec_ctx, op, elem); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c index 28d3b63f99..f51277d0b2 100644 --- a/src/core/ext/client_channel/client_channel_plugin.c +++ b/src/core/ext/client_channel/client_channel_plugin.c @@ -43,6 +43,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/client_channel/retry_throttle.h" #include "src/core/ext/client_channel/subchannel_index.h" #include "src/core/lib/surface/channel_init.h" @@ -82,6 +83,7 @@ static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx, void grpc_client_channel_init(void) { grpc_lb_policy_registry_init(); grpc_resolver_registry_init(); + grpc_retry_throttle_map_init(); grpc_proxy_mapper_registry_init(); grpc_register_http_proxy_mapper(); grpc_subchannel_index_init(); @@ -96,6 +98,7 @@ void grpc_client_channel_shutdown(void) { grpc_subchannel_index_shutdown(); grpc_channel_init_shutdown(); grpc_proxy_mapper_registry_shutdown(); + grpc_retry_throttle_map_shutdown(); grpc_resolver_registry_shutdown(); grpc_lb_policy_registry_shutdown(); } diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c new file mode 100644 index 0000000000..2aa52e4903 --- /dev/null +++ b/src/core/ext/client_channel/retry_throttle.c @@ -0,0 +1,242 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/client_channel/retry_throttle.h" + +#include <limits.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/avl.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> + +// +// server_retry_throttle_data +// + +struct grpc_server_retry_throttle_data { + gpr_refcount refs; + int max_milli_tokens; + int milli_token_ratio; + gpr_atm milli_tokens; + // A pointer to the replacement for this grpc_server_retry_throttle_data + // entry. If non-NULL, then this entry is stale and must not be used. + // We hold a reference to the replacement. + gpr_atm replacement; +}; + +static void get_replacement_throttle_data_if_needed( + grpc_server_retry_throttle_data** throttle_data) { + while (true) { + grpc_server_retry_throttle_data* new_throttle_data = + (grpc_server_retry_throttle_data*)gpr_atm_acq_load( + &(*throttle_data)->replacement); + if (new_throttle_data == NULL) return; + // Reset *throttle_data to its replacement, updating refcounts as + // appropriate. + // Note: It's safe to do this here, because the caller ensures that + // this will only be called with a given value of throttle_data from + // one thread at a time. + grpc_server_retry_throttle_data_ref(new_throttle_data); + grpc_server_retry_throttle_data* old_throttle_data = *throttle_data; + *throttle_data = new_throttle_data; + grpc_server_retry_throttle_data_unref(old_throttle_data); + } +} + +bool grpc_server_retry_throttle_data_record_failure( + grpc_server_retry_throttle_data** throttle_data) { + // First, check if we are stale and need to be replaced. + get_replacement_throttle_data_if_needed(throttle_data); + // We decrement milli_tokens by 1000 (1 token) for each failure. + const int delta = -1000; + const int old_value = (int)gpr_atm_full_fetch_add( + &(*throttle_data)->milli_tokens, (gpr_atm)delta); + // If the above change takes us below 0, then re-add the excess. Note + // that between these two atomic operations, the value will be + // artificially low by as much as 1000, but this window should be + // brief. + int new_value = old_value - 1000; + if (new_value < 0) { + const int excess_value = new_value - (old_value < 0 ? old_value : 0); + gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, + (gpr_atm)-excess_value); + new_value = 0; + } + // Retries are allowed as long as the new value is above the threshold + // (max_milli_tokens / 2). + return new_value > (*throttle_data)->max_milli_tokens / 2; +} + +void grpc_server_retry_throttle_data_record_success( + grpc_server_retry_throttle_data** throttle_data) { + // First, check if we are stale and need to be replaced. + get_replacement_throttle_data_if_needed(throttle_data); + // We increment milli_tokens by milli_token_ratio for each success. + const int delta = (*throttle_data)->milli_token_ratio; + const int old_value = (int)gpr_atm_full_fetch_add( + &(*throttle_data)->milli_tokens, (gpr_atm)delta); + // If the above change takes us over max_milli_tokens, then subtract + // the excess. Note that between these two atomic operations, the + // value will be artificially high by as much as milli_token_ratio, + // but this window should be brief. + const int new_value = old_value + (*throttle_data)->milli_token_ratio; + if (new_value > (*throttle_data)->max_milli_tokens) { + const int excess_value = + new_value - (old_value > (*throttle_data)->max_milli_tokens + ? old_value + : (*throttle_data)->max_milli_tokens); + gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, + (gpr_atm)-excess_value); + } +} + +void grpc_server_retry_throttle_data_ref( + grpc_server_retry_throttle_data* throttle_data) { + gpr_ref(&throttle_data->refs); +} + +void grpc_server_retry_throttle_data_unref( + grpc_server_retry_throttle_data* throttle_data) { + if (gpr_unref(&throttle_data->refs)) { + grpc_server_retry_throttle_data* replacement = + (grpc_server_retry_throttle_data*)gpr_atm_acq_load( + &throttle_data->replacement); + if (replacement != NULL) { + grpc_server_retry_throttle_data_unref(replacement); + } + gpr_free(throttle_data); + } +} + +static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( + int max_milli_tokens, int milli_token_ratio, + grpc_server_retry_throttle_data* old_throttle_data) { + grpc_server_retry_throttle_data* throttle_data = + gpr_malloc(sizeof(*throttle_data)); + memset(throttle_data, 0, sizeof(*throttle_data)); + gpr_ref_init(&throttle_data->refs, 1); + throttle_data->max_milli_tokens = max_milli_tokens; + throttle_data->milli_token_ratio = milli_token_ratio; + int initial_milli_tokens = max_milli_tokens; + // If there was a pre-existing entry for this server name, initialize + // the token count by scaling proportionately to the old data. This + // ensures that if we're already throttling retries on the old scale, + // we will start out doing the same thing on the new one. + if (old_throttle_data != NULL) { + double token_fraction = + (int)gpr_atm_acq_load(&old_throttle_data->milli_tokens) / + (double)old_throttle_data->max_milli_tokens; + initial_milli_tokens = (int)(token_fraction * max_milli_tokens); + } + gpr_atm_rel_store(&throttle_data->milli_tokens, + (gpr_atm)initial_milli_tokens); + // If there was a pre-existing entry, mark it as stale and give it a + // pointer to the new entry, which is its replacement. + if (old_throttle_data != NULL) { + grpc_server_retry_throttle_data_ref(throttle_data); + gpr_atm_rel_store(&old_throttle_data->replacement, (gpr_atm)throttle_data); + } + return throttle_data; +} + +// +// avl vtable for string -> server_retry_throttle_data map +// + +static void* copy_server_name(void* key) { return gpr_strdup(key); } + +static long compare_server_name(void* key1, void* key2) { + return strcmp(key1, key2); +} + +static void destroy_server_retry_throttle_data(void* value) { + grpc_server_retry_throttle_data* throttle_data = value; + grpc_server_retry_throttle_data_unref(throttle_data); +} + +static void* copy_server_retry_throttle_data(void* value) { + grpc_server_retry_throttle_data* throttle_data = value; + grpc_server_retry_throttle_data_ref(throttle_data); + return value; +} + +static const gpr_avl_vtable avl_vtable = { + gpr_free /* destroy_key */, copy_server_name, compare_server_name, + destroy_server_retry_throttle_data, copy_server_retry_throttle_data}; + +// +// server_retry_throttle_map +// + +static gpr_mu g_mu; +static gpr_avl g_avl; + +void grpc_retry_throttle_map_init() { + gpr_mu_init(&g_mu); + g_avl = gpr_avl_create(&avl_vtable); +} + +void grpc_retry_throttle_map_shutdown() { + gpr_mu_destroy(&g_mu); + gpr_avl_unref(g_avl); +} + +grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( + const char* server_name, int max_milli_tokens, int milli_token_ratio) { + gpr_mu_lock(&g_mu); + grpc_server_retry_throttle_data* throttle_data = + gpr_avl_get(g_avl, (char*)server_name); + if (throttle_data == NULL) { + // Entry not found. Create a new one. + throttle_data = grpc_server_retry_throttle_data_create( + max_milli_tokens, milli_token_ratio, NULL); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + } else { + if (throttle_data->max_milli_tokens != max_milli_tokens || + throttle_data->milli_token_ratio != milli_token_ratio) { + // Entry found but with old parameters. Create a new one based on + // the original one. + throttle_data = grpc_server_retry_throttle_data_create( + max_milli_tokens, milli_token_ratio, throttle_data); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + } else { + // Entry found. Increase refcount. + grpc_server_retry_throttle_data_ref(throttle_data); + } + } + gpr_mu_unlock(&g_mu); + return throttle_data; +} diff --git a/src/core/ext/client_channel/retry_throttle.h b/src/core/ext/client_channel/retry_throttle.h new file mode 100644 index 0000000000..4209bb7fb6 --- /dev/null +++ b/src/core/ext/client_channel/retry_throttle.h @@ -0,0 +1,69 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H + +#include <stdbool.h> + +/// Tracks retry throttling data for an individual server name. +typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data; + +/// Records a failure. Returns true if it's okay to send a retry. +/// Updates \a throttle_data if the original value is stale and has been +/// replaced. Not thread safe; caller must synchronize. +bool grpc_server_retry_throttle_data_record_failure( + grpc_server_retry_throttle_data** throttle_data); +/// Records a success. +/// Updates \a throttle_data if the original value is stale and has been +/// replaced. Not thread safe; caller must synchronize. +void grpc_server_retry_throttle_data_record_success( + grpc_server_retry_throttle_data** throttle_data); + +void grpc_server_retry_throttle_data_ref( + grpc_server_retry_throttle_data* throttle_data); +void grpc_server_retry_throttle_data_unref( + grpc_server_retry_throttle_data* throttle_data); + +/// Initializes global map of failure data for each server name. +void grpc_retry_throttle_map_init(); +/// Shuts down global map of failure data for each server name. +void grpc_retry_throttle_map_shutdown(); + +/// Returns a reference to the failure data for \a server_name, creating +/// a new entry if needed. +/// Caller must eventually unref via \a grpc_server_retry_throttle_data_unref(). +grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( + const char* server_name, int max_milli_tokens, int milli_token_ratio); + +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H */ diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c index 12da2a88fe..1195f75044 100644 --- a/src/core/lib/transport/service_config.c +++ b/src/core/lib/transport/service_config.c @@ -93,6 +93,18 @@ void grpc_service_config_destroy(grpc_service_config* service_config) { gpr_free(service_config); } +void grpc_service_config_parse_global_params( + const grpc_service_config* service_config, + void (*process_json)(const grpc_json* json, void* arg), void* arg) { + const grpc_json* json = service_config->json_tree; + if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return; + for (grpc_json* field = json->child; field != NULL; field = field->next) { + if (field->key == NULL) return; + if (strcmp(field->key, "methodConfig") == 0) continue; + process_json(field, arg); + } +} + const char* grpc_service_config_get_lb_policy_name( const grpc_service_config* service_config) { const grpc_json* json = service_config->json_tree; diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h index cd739a593c..ebfc59b534 100644 --- a/src/core/lib/transport/service_config.h +++ b/src/core/lib/transport/service_config.h @@ -42,6 +42,12 @@ typedef struct grpc_service_config grpc_service_config; grpc_service_config* grpc_service_config_create(const char* json_string); void grpc_service_config_destroy(grpc_service_config* service_config); +/// Invokes \a process_json() for each global parameter in the service +/// config. \a arg is passed as the second argument to \a process_json(). +void grpc_service_config_parse_global_params( + const grpc_service_config* service_config, + void (*process_json)(const grpc_json* json, void* arg), void* arg); + /// Gets the LB policy name from \a service_config. /// Returns NULL if no LB policy name was specified. /// Caller does NOT take ownership. diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index a9f20e6d2a..94d6e46cae 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -263,6 +263,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/client_channel/resolver.c', 'src/core/ext/client_channel/resolver_factory.c', 'src/core/ext/client_channel/resolver_registry.c', + 'src/core/ext/client_channel/retry_throttle.c', 'src/core/ext/client_channel/subchannel.c', 'src/core/ext/client_channel/subchannel_index.c', 'src/core/ext/client_channel/uri_parser.c', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 131a013451..fbe1f7f78e 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -928,6 +928,8 @@ src/core/ext/client_channel/resolver_factory.c \ src/core/ext/client_channel/resolver_factory.h \ src/core/ext/client_channel/resolver_registry.c \ src/core/ext/client_channel/resolver_registry.h \ +src/core/ext/client_channel/retry_throttle.c \ +src/core/ext/client_channel/retry_throttle.h \ src/core/ext/client_channel/subchannel.c \ src/core/ext/client_channel/subchannel.h \ src/core/ext/client_channel/subchannel_index.c \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index c0cd6c0496..b2f9078c05 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7790,6 +7790,7 @@ "src/core/ext/client_channel/resolver.h", "src/core/ext/client_channel/resolver_factory.h", "src/core/ext/client_channel/resolver_registry.h", + "src/core/ext/client_channel/retry_throttle.h", "src/core/ext/client_channel/subchannel.h", "src/core/ext/client_channel/subchannel_index.h", "src/core/ext/client_channel/uri_parser.h" @@ -7831,6 +7832,8 @@ "src/core/ext/client_channel/resolver_factory.h", "src/core/ext/client_channel/resolver_registry.c", "src/core/ext/client_channel/resolver_registry.h", + "src/core/ext/client_channel/retry_throttle.c", + "src/core/ext/client_channel/retry_throttle.h", "src/core/ext/client_channel/subchannel.c", "src/core/ext/client_channel/subchannel.h", "src/core/ext/client_channel/subchannel_index.c", diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index fde60be3e2..695524913d 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -466,6 +466,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\resolver.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\resolver_factory.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\resolver_registry.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\retry_throttle.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\subchannel.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\subchannel_index.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\uri_parser.h" /> @@ -876,6 +877,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\resolver_registry.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\retry_throttle.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\subchannel.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\subchannel_index.c"> diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 8edbbc22be..2fc34a8525 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -568,6 +568,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\resolver_registry.c"> <Filter>src\core\ext\client_channel</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\retry_throttle.c"> + <Filter>src\core\ext\client_channel</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\subchannel.c"> <Filter>src\core\ext\client_channel</Filter> </ClCompile> @@ -1271,6 +1274,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\resolver_registry.h"> <Filter>src\core\ext\client_channel</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\retry_throttle.h"> + <Filter>src\core\ext\client_channel</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\subchannel.h"> <Filter>src\core\ext\client_channel</Filter> </ClInclude> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 22f4740b8f..d15c6924e0 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -432,6 +432,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\resolver.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\resolver_factory.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\resolver_registry.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\retry_throttle.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\subchannel.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\subchannel_index.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\uri_parser.h" /> @@ -793,6 +794,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\resolver_registry.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\retry_throttle.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\subchannel.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\subchannel_index.c"> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 5021cb47d8..8e4835ee14 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -496,6 +496,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\resolver_registry.c"> <Filter>src\core\ext\client_channel</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\retry_throttle.c"> + <Filter>src\core\ext\client_channel</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\client_channel\subchannel.c"> <Filter>src\core\ext\client_channel</Filter> </ClCompile> @@ -1109,6 +1112,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\resolver_registry.h"> <Filter>src\core\ext\client_channel</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\retry_throttle.h"> + <Filter>src\core\ext\client_channel</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\ext\client_channel\subchannel.h"> <Filter>src\core\ext\client_channel</Filter> </ClInclude> |