diff options
Diffstat (limited to 'src/core')
23 files changed, 511 insertions, 174 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 960d00e815..00c20913b0 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -47,6 +47,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/client_channel/retry_throttle.h" #include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" @@ -189,6 +190,8 @@ typedef struct client_channel_channel_data { grpc_combiner *combiner; /** currently active load balancer */ grpc_lb_policy *lb_policy; + /** retry throttle data */ + grpc_server_retry_throttle_data *retry_throttle_data; /** maps method names to method_parameters structs */ grpc_slice_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ @@ -284,6 +287,65 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, &w->on_changed); } +typedef struct { + char *server_name; + grpc_server_retry_throttle_data *retry_throttle_data; +} service_config_parsing_state; + +static void parse_retry_throttle_params(const grpc_json *field, void *arg) { + service_config_parsing_state *parsing_state = arg; + if (strcmp(field->key, "retryThrottling") == 0) { + if (parsing_state->retry_throttle_data != NULL) return; // Duplicate. + if (field->type != GRPC_JSON_OBJECT) return; + int max_milli_tokens = 0; + int milli_token_ratio = 0; + for (grpc_json *sub_field = field->child; sub_field != NULL; + sub_field = sub_field->next) { + if (sub_field->key == NULL) return; + if (strcmp(sub_field->key, "maxTokens") == 0) { + if (max_milli_tokens != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value); + if (max_milli_tokens == -1) return; + max_milli_tokens *= 1000; + } else if (strcmp(sub_field->key, "tokenRatio") == 0) { + if (milli_token_ratio != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + // We support up to 3 decimal digits. + size_t whole_len = strlen(sub_field->value); + uint32_t multiplier = 1; + uint32_t decimal_value = 0; + const char *decimal_point = strchr(sub_field->value, '.'); + if (decimal_point != NULL) { + whole_len = (size_t)(decimal_point - sub_field->value); + multiplier = 1000; + size_t decimal_len = strlen(decimal_point + 1); + if (decimal_len > 3) decimal_len = 3; + if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len, + &decimal_value)) { + return; + } + uint32_t decimal_multiplier = 1; + for (size_t i = 0; i < (3 - decimal_len); ++i) { + decimal_multiplier *= 10; + } + decimal_value *= decimal_multiplier; + } + uint32_t whole_value; + if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len, + &whole_value)) { + return; + } + milli_token_ratio = (int)((whole_value * multiplier) + decimal_value); + if (milli_token_ratio <= 0) return; + } + } + parsing_state->retry_throttle_data = + grpc_retry_throttle_map_get_data_for_server( + parsing_state->server_name, max_milli_tokens, milli_token_ratio); + } +} + static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; @@ -295,6 +357,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, bool exit_idle = false; grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); char *service_config_json = NULL; + service_config_parsing_state parsing_state; + memset(&parsing_state, 0, sizeof(parsing_state)); if (chand->resolver_result != NULL) { // Find LB policy name. @@ -355,6 +419,19 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_service_config *service_config = grpc_service_config_create(service_config_json); if (service_config != NULL) { + channel_arg = + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); + GPR_ASSERT(channel_arg != NULL); + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + grpc_uri *uri = + grpc_uri_parse(exec_ctx, channel_arg->value.string, true); + GPR_ASSERT(uri->path[0] != '\0'); + parsing_state.server_name = + uri->path[0] == '/' ? uri->path + 1 : uri->path; + grpc_service_config_parse_global_params( + service_config, parse_retry_throttle_params, &parsing_state); + parsing_state.server_name = NULL; + grpc_uri_destroy(uri); method_params_table = grpc_service_config_create_method_config_table( exec_ctx, service_config, method_parameters_create_from_json, &method_parameters_vtable); @@ -386,6 +463,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, chand->info_service_config_json = service_config_json; } gpr_mu_unlock(&chand->info_mu); + + if (chand->retry_throttle_data != NULL) { + grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); + } + chand->retry_throttle_data = parsing_state.retry_throttle_data; if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -613,6 +695,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, } gpr_free(chand->info_lb_policy_name); gpr_free(chand->info_service_config_json); + if (chand->retry_throttle_data != NULL) { + grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); + } if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -654,6 +739,7 @@ typedef struct client_channel_call_data { grpc_slice path; // Request path. gpr_timespec call_start_time; gpr_timespec deadline; + grpc_server_retry_throttle_data *retry_throttle_data; method_parameters *method_params; grpc_error *cancel_error; @@ -676,6 +762,9 @@ typedef struct client_channel_call_data { grpc_call_stack *owning_call; grpc_linked_mdelem lb_token_mdelem; + + grpc_closure on_complete; + grpc_closure *original_on_complete; } call_data; grpc_subchannel_call *grpc_client_channel_get_subchannel_call( @@ -728,7 +817,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { gpr_free(ops); } -// Sets calld->method_params. +// Sets calld->method_params and calld->retry_throttle_data. // If the method params specify a timeout, populates // *per_method_deadline and returns true. static bool set_call_method_params_from_service_config_locked( @@ -736,6 +825,10 @@ static bool set_call_method_params_from_service_config_locked( gpr_timespec *per_method_deadline) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + if (chand->retry_throttle_data != NULL) { + calld->retry_throttle_data = + grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); + } if (chand->method_params_table != NULL) { calld->method_params = grpc_method_config_table_get( exec_ctx, chand->method_params_table, calld->path); @@ -1056,6 +1149,26 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, add_waiting_locked(calld, op); } +static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_call_element *elem = arg; + call_data *calld = elem->call_data; + if (calld->retry_throttle_data != NULL) { + if (error == GRPC_ERROR_NONE) { + grpc_server_retry_throttle_data_record_success( + calld->retry_throttle_data); + } else { + // TODO(roth): In a subsequent PR, check the return value here and + // decide whether or not to retry. Note that we should only + // record failures whose statuses match the configured retryable + // or non-fatal status codes. + grpc_server_retry_throttle_data_record_failure( + calld->retry_throttle_data); + } + } + grpc_closure_run(exec_ctx, calld->original_on_complete, + GRPC_ERROR_REF(error)); +} + static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0); @@ -1064,6 +1177,14 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_call_element *elem = op->handler_private.args[0]; call_data *calld = elem->call_data; + if (op->recv_trailing_metadata != NULL) { + GPR_ASSERT(op->on_complete != NULL); + calld->original_on_complete = op->on_complete; + grpc_closure_init(&calld->on_complete, on_complete, elem, + grpc_schedule_on_exec_ctx); + op->on_complete = &calld->on_complete; + } + start_transport_stream_op_locked_inner(exec_ctx, op, elem); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c index 28d3b63f99..f51277d0b2 100644 --- a/src/core/ext/client_channel/client_channel_plugin.c +++ b/src/core/ext/client_channel/client_channel_plugin.c @@ -43,6 +43,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/client_channel/retry_throttle.h" #include "src/core/ext/client_channel/subchannel_index.h" #include "src/core/lib/surface/channel_init.h" @@ -82,6 +83,7 @@ static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx, void grpc_client_channel_init(void) { grpc_lb_policy_registry_init(); grpc_resolver_registry_init(); + grpc_retry_throttle_map_init(); grpc_proxy_mapper_registry_init(); grpc_register_http_proxy_mapper(); grpc_subchannel_index_init(); @@ -96,6 +98,7 @@ void grpc_client_channel_shutdown(void) { grpc_subchannel_index_shutdown(); grpc_channel_init_shutdown(); grpc_proxy_mapper_registry_shutdown(); + grpc_retry_throttle_map_shutdown(); grpc_resolver_registry_shutdown(); grpc_lb_policy_registry_shutdown(); } diff --git a/src/core/ext/client_channel/connector.h b/src/core/ext/client_channel/connector.h index 9bff41f003..94b5fb5c9e 100644 --- a/src/core/ext/client_channel/connector.h +++ b/src/core/ext/client_channel/connector.h @@ -48,8 +48,6 @@ struct grpc_connector { typedef struct { /** set of pollsets interested in this connection */ grpc_pollset_set *interested_parties; - /** initial connect string to send */ - grpc_slice initial_connect_string; /** deadline for connection */ gpr_timespec deadline; /** channel arguments (to be passed to transport) */ diff --git a/src/core/ext/client_channel/initial_connect_string.h b/src/core/ext/client_channel/initial_connect_string.h deleted file mode 100644 index 876abea40e..0000000000 --- a/src/core/ext/client_channel/initial_connect_string.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H -#define GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H - -#include <grpc/slice.h> -#include "src/core/lib/iomgr/resolve_address.h" - -typedef void (*grpc_set_initial_connect_string_func)( - grpc_resolved_address **addr, grpc_slice *initial_str); - -void grpc_test_set_initial_connect_string_function( - grpc_set_initial_connect_string_func func); - -/** Set a string to be sent once connected. Optionally reset addr. */ -void grpc_set_initial_connect_string(grpc_resolved_address **addr, - grpc_slice *connect_string); - -#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H */ diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c new file mode 100644 index 0000000000..8926c3d782 --- /dev/null +++ b/src/core/ext/client_channel/retry_throttle.c @@ -0,0 +1,210 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/client_channel/retry_throttle.h" + +#include <limits.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/avl.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> + +// +// server_retry_throttle_data +// + +struct grpc_server_retry_throttle_data { + gpr_refcount refs; + int max_milli_tokens; + int milli_token_ratio; + gpr_atm milli_tokens; + // A pointer to the replacement for this grpc_server_retry_throttle_data + // entry. If non-NULL, then this entry is stale and must not be used. + // We hold a reference to the replacement. + gpr_atm replacement; +}; + +static void get_replacement_throttle_data_if_needed( + grpc_server_retry_throttle_data** throttle_data) { + while (true) { + grpc_server_retry_throttle_data* new_throttle_data = + (grpc_server_retry_throttle_data*)gpr_atm_acq_load( + &(*throttle_data)->replacement); + if (new_throttle_data == NULL) return; + *throttle_data = new_throttle_data; + } +} + +bool grpc_server_retry_throttle_data_record_failure( + grpc_server_retry_throttle_data* throttle_data) { + // First, check if we are stale and need to be replaced. + get_replacement_throttle_data_if_needed(&throttle_data); + // We decrement milli_tokens by 1000 (1 token) for each failure. + const int new_value = (int)gpr_atm_no_barrier_clamped_add( + &throttle_data->milli_tokens, (gpr_atm)-1000, (gpr_atm)0, + (gpr_atm)throttle_data->max_milli_tokens); + // Retries are allowed as long as the new value is above the threshold + // (max_milli_tokens / 2). + return new_value > throttle_data->max_milli_tokens / 2; +} + +void grpc_server_retry_throttle_data_record_success( + grpc_server_retry_throttle_data* throttle_data) { + // First, check if we are stale and need to be replaced. + get_replacement_throttle_data_if_needed(&throttle_data); + // We increment milli_tokens by milli_token_ratio for each success. + gpr_atm_no_barrier_clamped_add( + &throttle_data->milli_tokens, (gpr_atm)throttle_data->milli_token_ratio, + (gpr_atm)0, (gpr_atm)throttle_data->max_milli_tokens); +} + +grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( + grpc_server_retry_throttle_data* throttle_data) { + gpr_ref(&throttle_data->refs); + return throttle_data; +} + +void grpc_server_retry_throttle_data_unref( + grpc_server_retry_throttle_data* throttle_data) { + if (gpr_unref(&throttle_data->refs)) { + grpc_server_retry_throttle_data* replacement = + (grpc_server_retry_throttle_data*)gpr_atm_acq_load( + &throttle_data->replacement); + if (replacement != NULL) { + grpc_server_retry_throttle_data_unref(replacement); + } + gpr_free(throttle_data); + } +} + +static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( + int max_milli_tokens, int milli_token_ratio, + grpc_server_retry_throttle_data* old_throttle_data) { + grpc_server_retry_throttle_data* throttle_data = + gpr_malloc(sizeof(*throttle_data)); + memset(throttle_data, 0, sizeof(*throttle_data)); + gpr_ref_init(&throttle_data->refs, 1); + throttle_data->max_milli_tokens = max_milli_tokens; + throttle_data->milli_token_ratio = milli_token_ratio; + int initial_milli_tokens = max_milli_tokens; + // If there was a pre-existing entry for this server name, initialize + // the token count by scaling proportionately to the old data. This + // ensures that if we're already throttling retries on the old scale, + // we will start out doing the same thing on the new one. + if (old_throttle_data != NULL) { + double token_fraction = + (int)gpr_atm_acq_load(&old_throttle_data->milli_tokens) / + (double)old_throttle_data->max_milli_tokens; + initial_milli_tokens = (int)(token_fraction * max_milli_tokens); + } + gpr_atm_rel_store(&throttle_data->milli_tokens, + (gpr_atm)initial_milli_tokens); + // If there was a pre-existing entry, mark it as stale and give it a + // pointer to the new entry, which is its replacement. + if (old_throttle_data != NULL) { + grpc_server_retry_throttle_data_ref(throttle_data); + gpr_atm_rel_store(&old_throttle_data->replacement, (gpr_atm)throttle_data); + } + return throttle_data; +} + +// +// avl vtable for string -> server_retry_throttle_data map +// + +static void* copy_server_name(void* key) { return gpr_strdup(key); } + +static long compare_server_name(void* key1, void* key2) { + return strcmp(key1, key2); +} + +static void destroy_server_retry_throttle_data(void* value) { + grpc_server_retry_throttle_data* throttle_data = value; + grpc_server_retry_throttle_data_unref(throttle_data); +} + +static void* copy_server_retry_throttle_data(void* value) { + grpc_server_retry_throttle_data* throttle_data = value; + return grpc_server_retry_throttle_data_ref(throttle_data); +} + +static const gpr_avl_vtable avl_vtable = { + gpr_free /* destroy_key */, copy_server_name, compare_server_name, + destroy_server_retry_throttle_data, copy_server_retry_throttle_data}; + +// +// server_retry_throttle_map +// + +static gpr_mu g_mu; +static gpr_avl g_avl; + +void grpc_retry_throttle_map_init() { + gpr_mu_init(&g_mu); + g_avl = gpr_avl_create(&avl_vtable); +} + +void grpc_retry_throttle_map_shutdown() { + gpr_mu_destroy(&g_mu); + gpr_avl_unref(g_avl); +} + +grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( + const char* server_name, int max_milli_tokens, int milli_token_ratio) { + gpr_mu_lock(&g_mu); + grpc_server_retry_throttle_data* throttle_data = + gpr_avl_get(g_avl, (char*)server_name); + if (throttle_data == NULL) { + // Entry not found. Create a new one. + throttle_data = grpc_server_retry_throttle_data_create( + max_milli_tokens, milli_token_ratio, NULL); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + } else { + if (throttle_data->max_milli_tokens != max_milli_tokens || + throttle_data->milli_token_ratio != milli_token_ratio) { + // Entry found but with old parameters. Create a new one based on + // the original one. + throttle_data = grpc_server_retry_throttle_data_create( + max_milli_tokens, milli_token_ratio, throttle_data); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + } else { + // Entry found. Increase refcount. + grpc_server_retry_throttle_data_ref(throttle_data); + } + } + gpr_mu_unlock(&g_mu); + return throttle_data; +} diff --git a/src/core/ext/client_channel/initial_connect_string.c b/src/core/ext/client_channel/retry_throttle.h index 8ebd06c458..f9971faf65 100644 --- a/src/core/ext/client_channel/initial_connect_string.c +++ b/src/core/ext/client_channel/retry_throttle.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2017, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,22 +31,35 @@ * */ -#include "src/core/ext/client_channel/initial_connect_string.h" +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H -#include <stddef.h> +#include <stdbool.h> -extern void grpc_set_default_initial_connect_string( - grpc_resolved_address **addr, grpc_slice *initial_str); +/// Tracks retry throttling data for an individual server name. +typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data; -static grpc_set_initial_connect_string_func g_set_initial_connect_string_func = - grpc_set_default_initial_connect_string; +/// Records a failure. Returns true if it's okay to send a retry. +bool grpc_server_retry_throttle_data_record_failure( + grpc_server_retry_throttle_data* throttle_data); +/// Records a success. +void grpc_server_retry_throttle_data_record_success( + grpc_server_retry_throttle_data* throttle_data); -void grpc_test_set_initial_connect_string_function( - grpc_set_initial_connect_string_func func) { - g_set_initial_connect_string_func = func; -} +grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( + grpc_server_retry_throttle_data* throttle_data); +void grpc_server_retry_throttle_data_unref( + grpc_server_retry_throttle_data* throttle_data); -void grpc_set_initial_connect_string(grpc_resolved_address **addr, - grpc_slice *initial_str) { - g_set_initial_connect_string_func(addr, initial_str); -} +/// Initializes global map of failure data for each server name. +void grpc_retry_throttle_map_init(); +/// Shuts down global map of failure data for each server name. +void grpc_retry_throttle_map_shutdown(); + +/// Returns a reference to the failure data for \a server_name, creating +/// a new entry if needed. +/// Caller must eventually unref via \a grpc_server_retry_throttle_data_unref(). +grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( + const char* server_name, int max_milli_tokens, int milli_token_ratio); + +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H */ diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index ed5029ea9a..e886fbc0cc 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -41,7 +41,6 @@ #include <grpc/support/string_util.h> #include "src/core/ext/client_channel/client_channel.h" -#include "src/core/ext/client_channel/initial_connect_string.h" #include "src/core/ext/client_channel/parse_address.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/subchannel_index.h" @@ -103,9 +102,6 @@ struct grpc_subchannel { grpc_subchannel_key *key; - /** initial string to send to peer */ - grpc_slice initial_connect_string; - /** set during connection */ grpc_connect_out_args connecting_result; @@ -215,7 +211,6 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_subchannel *c = arg; gpr_free((void *)c->filters); grpc_channel_args_destroy(exec_ctx, c->args); - grpc_slice_unref_internal(exec_ctx, c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); grpc_pollset_set_destroy(exec_ctx, c->pollset_set); @@ -333,7 +328,6 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, c->pollset_set = grpc_pollset_set_create(); grpc_resolved_address *addr = gpr_malloc(sizeof(*addr)); grpc_get_subchannel_address_arg(exec_ctx, args->args, addr); - grpc_set_initial_connect_string(&addr, &c->initial_connect_string); grpc_resolved_address *new_address = NULL; grpc_channel_args *new_args = NULL; if (grpc_proxy_mappers_map_address(exec_ctx, addr, args->args, &new_address, @@ -404,7 +398,6 @@ static void continue_connect_locked(grpc_exec_ctx *exec_ctx, args.interested_parties = c->pollset_set; args.deadline = c->next_attempt; args.channel_args = c->args; - args.initial_connect_string = c->initial_connect_string; grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index e2a66d16bd..179dcd6eac 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -423,11 +423,13 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, "This LB policy doesn't support user data. It will be ignored"); } + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; memset(&sc_args, 0, sizeof(grpc_subchannel_args)); grpc_arg addr_arg = grpc_create_subchannel_address_arg(&addresses->addresses[i].address); - grpc_channel_args *new_args = - grpc_channel_args_copy_and_add(args->args, &addr_arg, 1); + grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( + args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, + 1); gpr_free(addr_arg.value.string); sc_args.args = new_args; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index f2d1d46179..09562d30ed 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -709,11 +709,13 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, /* Skip balancer addresses, since we only know how to handle backends. */ if (addresses->addresses[i].is_balancer) continue; + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; memset(&sc_args, 0, sizeof(grpc_subchannel_args)); grpc_arg addr_arg = grpc_create_subchannel_address_arg(&addresses->addresses[i].address); - grpc_channel_args *new_args = - grpc_channel_args_copy_and_add(args->args, &addr_arg, 1); + grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( + args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, + 1); gpr_free(addr_arg.value.string); sc_args.args = new_args; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index eae0145ecc..d49c32b671 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -63,8 +63,6 @@ typedef struct { grpc_closure *notify; grpc_connect_in_args args; grpc_connect_out_args *result; - grpc_closure initial_string_sent; - grpc_slice_buffer initial_string_buffer; grpc_endpoint *endpoint; // Non-NULL until handshaking starts. @@ -82,7 +80,6 @@ static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { chttp2_connector *c = (chttp2_connector *)con; if (gpr_unref(&c->refs)) { - /* c->initial_string_buffer does not need to be destroyed */ gpr_mu_destroy(&c->mu); // If handshaking is not yet in progress, destroy the endpoint. // Otherwise, the handshaker will do this for us. @@ -160,28 +157,6 @@ static void start_handshake_locked(grpc_exec_ctx *exec_ctx, c->endpoint = NULL; // Endpoint handed off to handshake manager. } -static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - chttp2_connector *c = arg; - gpr_mu_lock(&c->mu); - if (error != GRPC_ERROR_NONE || c->shutdown) { - if (error == GRPC_ERROR_NONE) { - error = GRPC_ERROR_CREATE("connector shutdown"); - } else { - error = GRPC_ERROR_REF(error); - } - memset(c->result, 0, sizeof(*c->result)); - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_closure_sched(exec_ctx, notify, error); - gpr_mu_unlock(&c->mu); - chttp2_connector_unref(exec_ctx, arg); - } else { - start_handshake_locked(exec_ctx, c); - gpr_mu_unlock(&c->mu); - } -} - static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { chttp2_connector *c = arg; gpr_mu_lock(&c->mu); @@ -204,17 +179,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { chttp2_connector_unref(exec_ctx, arg); } else { GPR_ASSERT(c->endpoint != NULL); - if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { - grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, - c, grpc_schedule_on_exec_ctx); - grpc_slice_buffer_init(&c->initial_string_buffer); - grpc_slice_buffer_add(&c->initial_string_buffer, - c->args.initial_connect_string); - grpc_endpoint_write(exec_ctx, c->endpoint, &c->initial_string_buffer, - &c->initial_string_sent); - } else { - start_handshake_locked(exec_ctx, c); - } + start_handshake_locked(exec_ctx, c); gpr_mu_unlock(&c->mu); } } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 082078c72f..89659e7464 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1765,6 +1765,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error) { grpc_slice hdr; grpc_slice status_hdr; + grpc_slice http_status_hdr; grpc_slice message_pfx; uint8_t *p; uint32_t len = 0; @@ -1780,6 +1781,26 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, It's complicated by the fact that our send machinery would be dead by the time we got around to sending this, so instead we ignore HPACK compression and just write the uncompressed bytes onto the wire. */ + if (!s->sent_initial_metadata) { + http_status_hdr = grpc_slice_malloc(13); + p = GRPC_SLICE_START_PTR(http_status_hdr); + *p++ = 0x00; + *p++ = 7; + *p++ = ':'; + *p++ = 's'; + *p++ = 't'; + *p++ = 'a'; + *p++ = 't'; + *p++ = 'u'; + *p++ = 's'; + *p++ = 3; + *p++ = '2'; + *p++ = '0'; + *p++ = '0'; + GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr)); + len += (uint32_t)GRPC_SLICE_LENGTH(http_status_hdr); + } + status_hdr = grpc_slice_malloc(15 + (grpc_status >= 10)); p = GRPC_SLICE_START_PTR(status_hdr); *p++ = 0x00; /* literal header, not indexed */ @@ -1847,6 +1868,9 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr)); grpc_slice_buffer_add(&t->qbuf, hdr); + if (!s->sent_initial_metadata) { + grpc_slice_buffer_add(&t->qbuf, http_status_hdr); + } grpc_slice_buffer_add(&t->qbuf, status_hdr); if (msg != NULL) { grpc_slice_buffer_add(&t->qbuf, message_pfx); diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 40f5120308..1865b997b7 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1620,13 +1620,18 @@ void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx, grpc_error *grpc_chttp2_hpack_parser_parse(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, grpc_slice slice) { - /* TODO(ctiller): limit the distance of end from beg, and perform multiple - steps in the event of a large chunk of data to limit - stack space usage when no tail call optimization is - available */ +/* max number of bytes to parse at a time... limits call stack depth on + * compilers without TCO */ +#define MAX_PARSE_LENGTH 1024 p->current_slice_refcount = slice.refcount; - grpc_error *error = p->state(exec_ctx, p, GRPC_SLICE_START_PTR(slice), - GRPC_SLICE_END_PTR(slice)); + uint8_t *start = GRPC_SLICE_START_PTR(slice); + uint8_t *end = GRPC_SLICE_END_PTR(slice); + grpc_error *error = GRPC_ERROR_NONE; + while (start != end && error == GRPC_ERROR_NONE) { + uint8_t *target = start + GPR_MIN(MAX_PARSE_LENGTH, end - start); + error = p->state(exec_ctx, p, start, target); + start = target; + } p->current_slice_refcount = NULL; return error; } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 450d9ab23a..36bb67869c 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -128,6 +128,7 @@ struct read_state { int received_bytes; int remaining_bytes; int length_field; + bool compressed; char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES]; char *payload_field; bool read_stream_closed; @@ -484,6 +485,16 @@ static void on_response_headers_received( CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, headers, negotiated_protocol); stream_obj *s = (stream_obj *)stream->annotation; + + /* Identify if this is a header or a trailer (in a trailer-only response case) + */ + for (size_t i = 0; i < headers->count; i++) { + if (0 == strcmp("grpc-status", headers->headers[i].key)) { + on_response_trailers_received(stream, headers); + return; + } + } + gpr_mu_lock(&s->mu); memset(&s->state.rs.initial_metadata, 0, sizeof(s->state.rs.initial_metadata)); @@ -507,6 +518,7 @@ static void on_response_headers_received( is closed */ GPR_ASSERT(s->state.rs.length_field_received == false); s->state.rs.read_buffer = s->state.rs.grpc_header_bytes; + s->state.rs.compressed = false; s->state.rs.received_bytes = 0; s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); @@ -641,7 +653,7 @@ static void on_response_trailers_received( */ static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, char **pp_write_buffer, - size_t *p_write_buffer_size) { + size_t *p_write_buffer_size, uint32_t flags) { grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer); size_t length = GRPC_SLICE_LENGTH(slice); *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES; @@ -650,7 +662,9 @@ static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, *pp_write_buffer = write_buffer; uint8_t *p = (uint8_t *)write_buffer; /* Append 5 byte header */ - *p++ = 0; + /* Compressed flag */ + *p++ = (flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0; + /* Message length */ *p++ = (uint8_t)(length >> 24); *p++ = (uint8_t)(length >> 16); *p++ = (uint8_t)(length >> 8); @@ -728,14 +742,16 @@ static void convert_metadata_to_cronet_headers( *p_num_headers = (size_t)num_headers; } -static int parse_grpc_header(const uint8_t *data) { +static void parse_grpc_header(const uint8_t *data, int *length, + bool *compressed) { + const uint8_t c = *data; const uint8_t *p = data + 1; - int length = 0; - length |= ((uint8_t)*p++) << 24; - length |= ((uint8_t)*p++) << 16; - length |= ((uint8_t)*p++) << 8; - length |= ((uint8_t)*p++); - return length; + *compressed = ((c & 0x01) == 0x01); + *length = 0; + *length |= ((uint8_t)*p++) << 24; + *length |= ((uint8_t)*p++) << 16; + *length |= ((uint8_t)*p++) << 8; + *length |= ((uint8_t)*p++); } static bool header_has_authority(grpc_linked_mdelem *head) { @@ -788,7 +804,8 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false; /* we haven't received headers yet. */ - else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA]) + else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] && + !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false; } else if (op_id == OP_SEND_MESSAGE) { /* already executed (note we're checking op specific state, not stream @@ -801,7 +818,8 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, /* already executed */ if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false; /* we haven't received headers yet. */ - else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA]) + else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] && + !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false; } else if (op_id == OP_RECV_TRAILING_METADATA) { /* already executed */ @@ -955,12 +973,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_init(&write_slice_buffer); grpc_byte_stream_next(NULL, stream_op->send_message, &slice, stream_op->send_message->length, NULL); - /* Check that compression flag is OFF. We don't support compression yet. - */ - if (stream_op->send_message->flags != 0) { - gpr_log(GPR_ERROR, "Compression is not supported"); - GPR_ASSERT(stream_op->send_message->flags == 0); - } grpc_slice_buffer_add(&write_slice_buffer, slice); if (write_slice_buffer.count != 1) { /* Empty request not handled yet */ @@ -970,7 +982,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (write_slice_buffer.count > 0) { size_t write_buffer_size; create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, - &write_buffer_size); + &write_buffer_size, stream_op->send_message->flags); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, stream_state->ws.write_buffer); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; @@ -1022,6 +1034,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } else if (stream_state->state_callback_received[OP_FAILED]) { grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_NONE); + } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); } else { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &oas->s->state.rs.initial_metadata, @@ -1066,8 +1081,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.remaining_bytes == 0) { /* Start a read operation for data */ stream_state->rs.length_field_received = true; - stream_state->rs.length_field = stream_state->rs.remaining_bytes = - parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer); + parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer, + &stream_state->rs.length_field, + &stream_state->rs.compressed); CRONET_LOG(GPR_DEBUG, "length field = %d", stream_state->rs.length_field); if (stream_state->rs.length_field > 0) { @@ -1089,6 +1105,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0); + if (stream_state->rs.compressed) { + stream_state->rs.sbs.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; + } *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, @@ -1100,6 +1119,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.received_bytes = 0; + stream_state->rs.compressed = false; stream_state->rs.length_field_received = false; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); stream_state->state_op_done[OP_READ_REQ_MADE] = @@ -1114,6 +1134,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.received_bytes = 0; + stream_state->rs.compressed = false; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); stream_state->state_op_done[OP_READ_REQ_MADE] = true; /* Indicates that at least one read request has been made */ @@ -1137,6 +1158,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, read_data_slice); grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0); + if (stream_state->rs.compressed) { + stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS; + } *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, @@ -1146,6 +1170,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, /* Do an extra read to trigger on_succeeded() callback in case connection is closed */ stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; + stream_state->rs.compressed = false; stream_state->rs.received_bytes = 0; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.length_field_received = false; diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 509c1ff95d..6633fb68ec 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -33,6 +33,7 @@ #include "src/core/lib/iomgr/closure.h" +#include <assert.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -124,6 +125,7 @@ void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c, grpc_error *error) { GPR_TIMER_BEGIN("grpc_closure_run", 0); if (c != NULL) { + assert(c->cb); c->scheduler->vtable->run(exec_ctx, c, error); } else { GRPC_ERROR_UNREF(error); @@ -135,6 +137,7 @@ void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *c, grpc_error *error) { GPR_TIMER_BEGIN("grpc_closure_sched", 0); if (c != NULL) { + assert(c->cb); c->scheduler->vtable->sched(exec_ctx, c, error); } else { GRPC_ERROR_UNREF(error); @@ -146,6 +149,7 @@ void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) { grpc_closure *c = list->head; while (c != NULL) { grpc_closure *next = c->next_data.next; + assert(c->cb); c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error); c = next; } diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index fa9966c3a6..2bc476bbef 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -33,6 +33,7 @@ #include "src/core/lib/iomgr/combiner.h" +#include <assert.h> #include <string.h> #include <grpc/support/alloc.h> @@ -216,6 +217,7 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock, cl, covered_by_poller, last)); GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed + assert(cl->cb); cl->error_data.scratch = pack_error_data((error_data){error, covered_by_poller}); if (covered_by_poller) { diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index eb953947ae..0e7a7b0a1e 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -45,28 +45,9 @@ extern "C" { #endif /// Opaque representation of an error. -/// Errors are refcounted objects that represent the result of an operation. -/// Ownership laws: -/// if a grpc_error is returned by a function, the caller owns a ref to that -/// instance -/// if a grpc_error is passed to a grpc_closure callback function (functions -/// with the signature: -/// void (*f)(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error)) -/// then those functions do not own a ref to error (but are free to manually -/// take a reference). -/// if a grpc_error is passed to *ANY OTHER FUNCTION* then that function takes -/// ownership of the error -/// Errors have: -/// a set of ints, strings, and timestamps that describe the error -/// always present are: -/// GRPC_ERROR_STR_FILE, GRPC_ERROR_INT_FILE_LINE - source location the error -/// was generated -/// GRPC_ERROR_STR_DESCRIPTION - a human readable description of the error -/// GRPC_ERROR_TIME_CREATED - a timestamp indicating when the error happened -/// an error can also have children; these are other errors that are believed -/// to have contributed to this one. By accumulating children, we can begin -/// to root cause high level failures from low level failures, without having -/// to derive execution paths from log lines +/// See https://github.com/grpc/grpc/blob/master/doc/core/grpc-error.md for a +/// full write up of this object. + typedef struct grpc_error grpc_error; typedef enum { diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 5ddd5313e2..789ef22c55 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -1421,7 +1421,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { g_cvfds.pollcount++; opt = gpr_thd_options_default(); gpr_thd_options_set_detached(&opt); - gpr_thd_new(&t_id, &run_poll, pargs, &opt); + GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt)); // We want the poll() thread to trigger the deadline, so wait forever here gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) { diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index a5b62aa888..ae3e2eabc3 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -115,8 +115,8 @@ static void maybe_spawn_locked() { /* All previous instances of the thread should have been joined at this point. * Spawn time! */ g_executor.busy = 1; - gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL, - &g_executor.options); + GPR_ASSERT(gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL, + &g_executor.options)); g_executor.pending_join = 1; } diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index b3644518f5..e242631fc0 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -350,8 +350,20 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, } } if (*out_port > 0) { - GRPC_LOG_IF_ERROR("Failed to add :: listener", v6_err); - GRPC_LOG_IF_ERROR("Failed to add 0.0.0.0 listener", v4_err); + if (v6_err != GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, + "Failed to add :: listener, " + "the environment may not support IPv6: %s", + grpc_error_string(v6_err)); + GRPC_ERROR_UNREF(v6_err); + } + if (v4_err != GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, + "Failed to add 0.0.0.0 listener, " + "the environment may not support IPv4: %s", + grpc_error_string(v4_err)); + GRPC_ERROR_UNREF(v4_err); + } return GRPC_ERROR_NONE; } else { grpc_error *root_err = diff --git a/src/core/lib/profiling/basic_timers.c b/src/core/lib/profiling/basic_timers.c index 1f1987fb8e..bc8e27714c 100644 --- a/src/core/lib/profiling/basic_timers.c +++ b/src/core/lib/profiling/basic_timers.c @@ -218,7 +218,7 @@ void gpr_timers_set_log_filename(const char *filename) { static void init_output() { gpr_thd_options options = gpr_thd_options_default(); gpr_thd_options_set_joinable(&options); - gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options); + GPR_ASSERT(gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options)); atexit(finish_writing); } diff --git a/src/core/ext/client_channel/default_initial_connect_string.c b/src/core/lib/support/atm.c index 6db82d84ef..06e8432caf 100644 --- a/src/core/ext/client_channel/default_initial_connect_string.c +++ b/src/core/lib/support/atm.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2017, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,8 +31,17 @@ * */ -#include <grpc/slice.h> -#include "src/core/lib/iomgr/resolve_address.h" +#include <grpc/support/atm.h> +#include <grpc/support/useful.h> -void grpc_set_default_initial_connect_string(grpc_resolved_address **addr, - grpc_slice *initial_str) {} +gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta, + gpr_atm min, gpr_atm max) { + gpr_atm current; + gpr_atm new; + do { + current = gpr_atm_no_barrier_load(value); + new = GPR_CLAMP(current + delta, min, max); + if (new == current) break; + } while (!gpr_atm_no_barrier_cas(value, current, new)); + return new; +} diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c index 12da2a88fe..1195f75044 100644 --- a/src/core/lib/transport/service_config.c +++ b/src/core/lib/transport/service_config.c @@ -93,6 +93,18 @@ void grpc_service_config_destroy(grpc_service_config* service_config) { gpr_free(service_config); } +void grpc_service_config_parse_global_params( + const grpc_service_config* service_config, + void (*process_json)(const grpc_json* json, void* arg), void* arg) { + const grpc_json* json = service_config->json_tree; + if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return; + for (grpc_json* field = json->child; field != NULL; field = field->next) { + if (field->key == NULL) return; + if (strcmp(field->key, "methodConfig") == 0) continue; + process_json(field, arg); + } +} + const char* grpc_service_config_get_lb_policy_name( const grpc_service_config* service_config) { const grpc_json* json = service_config->json_tree; diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h index cd739a593c..ebfc59b534 100644 --- a/src/core/lib/transport/service_config.h +++ b/src/core/lib/transport/service_config.h @@ -42,6 +42,12 @@ typedef struct grpc_service_config grpc_service_config; grpc_service_config* grpc_service_config_create(const char* json_string); void grpc_service_config_destroy(grpc_service_config* service_config); +/// Invokes \a process_json() for each global parameter in the service +/// config. \a arg is passed as the second argument to \a process_json(). +void grpc_service_config_parse_global_params( + const grpc_service_config* service_config, + void (*process_json)(const grpc_json* json, void* arg), void* arg); + /// Gets the LB policy name from \a service_config. /// Returns NULL if no LB policy name was specified. /// Caller does NOT take ownership. |