From d6d192d00591ba278115e73d264f279fe248d544 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 23 Feb 2017 08:58:42 -0800 Subject: Retry throttling implementation. --- src/core/ext/client_channel/client_channel.c | 125 ++++++++++- .../ext/client_channel/client_channel_plugin.c | 3 + src/core/ext/client_channel/retry_throttle.c | 242 +++++++++++++++++++++ src/core/ext/client_channel/retry_throttle.h | 69 ++++++ src/core/lib/transport/service_config.c | 12 + src/core/lib/transport/service_config.h | 6 + src/python/grpcio/grpc_core_dependencies.py | 1 + 7 files changed, 452 insertions(+), 6 deletions(-) create mode 100644 src/core/ext/client_channel/retry_throttle.c create mode 100644 src/core/ext/client_channel/retry_throttle.h (limited to 'src') diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 6cbc333b83..1cc2b9455f 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -47,6 +47,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/client_channel/retry_throttle.h" #include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" @@ -165,6 +166,8 @@ typedef struct client_channel_channel_data { grpc_combiner *combiner; /** currently active load balancer */ grpc_lb_policy *lb_policy; + /** retry throttle data */ + grpc_server_retry_throttle_data *retry_throttle_data; /** maps method names to method_parameters structs */ grpc_slice_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ @@ -260,6 +263,64 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, &w->on_changed); } +typedef struct { + char *server_name; + grpc_server_retry_throttle_data *retry_throttle_data; +} service_config_parsing_state; + +static void parse_retry_throttle_params(const grpc_json *field, void *arg) { + service_config_parsing_state *parsing_state = arg; + if (strcmp(field->key, "retryThrottling") == 0) { + if (parsing_state->retry_throttle_data != NULL) return; // Duplicate. + if (field->type != GRPC_JSON_OBJECT) return; + int max_milli_tokens = 0; + int milli_token_ratio = 0; + for (grpc_json *sub_field = field->child; sub_field != NULL; + sub_field = sub_field->next) { + if (sub_field->key == NULL) continue; + if (strcmp(sub_field->key, "maxTokens") == 0) { + if (max_milli_tokens != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value); + if (max_milli_tokens == -1) return; + max_milli_tokens *= 1000; + } else if (strcmp(sub_field->key, "tokenRatio") == 0) { + if (milli_token_ratio != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + // We support up to 3 decimal digits. + size_t whole_len = strlen(sub_field->value); + uint32_t multiplier = 1; + uint32_t decimal_value = 0; + const char *decimal_point = strchr(sub_field->value, '.'); + if (decimal_point != NULL) { + whole_len = (size_t)(decimal_point - sub_field->value); + multiplier = 1000; + size_t decimal_len = strlen(decimal_point + 1); + if (decimal_len > 3) decimal_len = 3; + if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len, + &decimal_value)) { + return; + } + uint32_t decimal_multiplier = 1; + for (size_t i = 0; i < (3 - decimal_len); ++i) { + decimal_multiplier *= 10; + } + decimal_value *= decimal_multiplier; + } + uint32_t whole_value; + if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len, + &whole_value)) { + return; + } + milli_token_ratio = (int)((whole_value * multiplier) + decimal_value); + } + } + parsing_state->retry_throttle_data = + grpc_retry_throttle_map_get_data_for_server( + parsing_state->server_name, max_milli_tokens, milli_token_ratio); + } +} + static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; @@ -271,6 +332,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, bool exit_idle = false; grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); char *service_config_json = NULL; + service_config_parsing_state parsing_state; + memset(&parsing_state, 0, sizeof(parsing_state)); if (chand->resolver_result != NULL) { // Find LB policy name. @@ -330,6 +393,18 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_service_config *service_config = grpc_service_config_create(service_config_json); if (service_config != NULL) { + channel_arg = + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); + GPR_ASSERT(channel_arg != NULL); + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + grpc_uri *uri = grpc_uri_parse(channel_arg->value.string, true); + GPR_ASSERT(uri->path[0] != '\0'); + parsing_state.server_name = + uri->path[0] == '/' ? uri->path + 1 : uri->path; + grpc_service_config_parse_global_params( + service_config, parse_retry_throttle_params, &parsing_state); + parsing_state.server_name = NULL; + grpc_uri_destroy(uri); method_params_table = grpc_service_config_create_method_config_table( exec_ctx, service_config, method_parameters_create_from_json, &method_parameters_vtable); @@ -361,6 +436,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, chand->info_service_config_json = service_config_json; } gpr_mu_unlock(&chand->info_mu); + + if (chand->retry_throttle_data != NULL) { + grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); + } + chand->retry_throttle_data = parsing_state.retry_throttle_data; if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -589,6 +669,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, } gpr_free(chand->info_lb_policy_name); gpr_free(chand->info_service_config_json); + if (chand->retry_throttle_data != NULL) { + grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); + } if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -651,6 +734,9 @@ typedef struct client_channel_call_data { grpc_call_stack *owning_call; grpc_linked_mdelem lb_token_mdelem; + + grpc_closure on_complete; + grpc_closure *original_on_complete; } call_data; grpc_subchannel_call *grpc_client_channel_get_subchannel_call( @@ -977,20 +1063,47 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, add_waiting_locked(calld, op); } -static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error_ignored) { - GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0); +static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = arg; + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (chand->retry_throttle_data != NULL) { + if (error == GRPC_ERROR_NONE) { + grpc_server_retry_throttle_data_record_success( + &chand->retry_throttle_data); + } else { + // TODO(roth): In a subsequent PR, check the return value here and + // decide whether or not to retry. + grpc_server_retry_throttle_data_record_failure( + &chand->retry_throttle_data); + } + } + grpc_closure_run(exec_ctx, calld->original_on_complete, error); +} + +static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0); grpc_transport_stream_op *op = arg; grpc_call_element *elem = op->handler_private.args[0]; + channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + if (op->recv_trailing_metadata != NULL) { + GPR_ASSERT(op->on_complete != NULL); + calld->original_on_complete = op->on_complete; + grpc_closure_init(&calld->on_complete, on_complete_locked, elem, + grpc_combiner_scheduler(chand->combiner, false)); + op->on_complete = &calld->on_complete; + } + start_transport_stream_op_locked_inner(exec_ctx, op, elem); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "start_transport_stream_op"); - GPR_TIMER_END("cc_start_transport_stream_op_locked", 0); + GPR_TIMER_END("start_transport_stream_op_locked", 0); } /* The logic here is fairly complicated, due to (a) the fact that we @@ -1030,7 +1143,7 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_closure_sched( exec_ctx, grpc_closure_init(&op->handler_private.closure, - cc_start_transport_stream_op_locked, op, + start_transport_stream_op_locked, op, grpc_combiner_scheduler(chand->combiner, false)), GRPC_ERROR_NONE); GPR_TIMER_END("cc_start_transport_stream_op", 0); diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c index 6f9df3e386..c8d2105b47 100644 --- a/src/core/ext/client_channel/client_channel_plugin.c +++ b/src/core/ext/client_channel/client_channel_plugin.c @@ -43,6 +43,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/proxy_mapper_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/client_channel/retry_throttle.h" #include "src/core/ext/client_channel/subchannel_index.h" #include "src/core/lib/surface/channel_init.h" @@ -82,6 +83,7 @@ static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx, void grpc_client_channel_init(void) { grpc_lb_policy_registry_init(); grpc_resolver_registry_init(); + grpc_retry_throttle_map_init(); grpc_proxy_mapper_registry_init(); grpc_register_http_proxy_mapper(); grpc_subchannel_index_init(); @@ -96,6 +98,7 @@ void grpc_client_channel_shutdown(void) { grpc_subchannel_index_shutdown(); grpc_channel_init_shutdown(); grpc_proxy_mapper_registry_shutdown(); + grpc_retry_throttle_map_shutdown(); grpc_resolver_registry_shutdown(); grpc_lb_policy_registry_shutdown(); } diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c new file mode 100644 index 0000000000..2aa52e4903 --- /dev/null +++ b/src/core/ext/client_channel/retry_throttle.c @@ -0,0 +1,242 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/client_channel/retry_throttle.h" + +#include +#include + +#include +#include +#include +#include +#include + +// +// server_retry_throttle_data +// + +struct grpc_server_retry_throttle_data { + gpr_refcount refs; + int max_milli_tokens; + int milli_token_ratio; + gpr_atm milli_tokens; + // A pointer to the replacement for this grpc_server_retry_throttle_data + // entry. If non-NULL, then this entry is stale and must not be used. + // We hold a reference to the replacement. + gpr_atm replacement; +}; + +static void get_replacement_throttle_data_if_needed( + grpc_server_retry_throttle_data** throttle_data) { + while (true) { + grpc_server_retry_throttle_data* new_throttle_data = + (grpc_server_retry_throttle_data*)gpr_atm_acq_load( + &(*throttle_data)->replacement); + if (new_throttle_data == NULL) return; + // Reset *throttle_data to its replacement, updating refcounts as + // appropriate. + // Note: It's safe to do this here, because the caller ensures that + // this will only be called with a given value of throttle_data from + // one thread at a time. + grpc_server_retry_throttle_data_ref(new_throttle_data); + grpc_server_retry_throttle_data* old_throttle_data = *throttle_data; + *throttle_data = new_throttle_data; + grpc_server_retry_throttle_data_unref(old_throttle_data); + } +} + +bool grpc_server_retry_throttle_data_record_failure( + grpc_server_retry_throttle_data** throttle_data) { + // First, check if we are stale and need to be replaced. + get_replacement_throttle_data_if_needed(throttle_data); + // We decrement milli_tokens by 1000 (1 token) for each failure. + const int delta = -1000; + const int old_value = (int)gpr_atm_full_fetch_add( + &(*throttle_data)->milli_tokens, (gpr_atm)delta); + // If the above change takes us below 0, then re-add the excess. Note + // that between these two atomic operations, the value will be + // artificially low by as much as 1000, but this window should be + // brief. + int new_value = old_value - 1000; + if (new_value < 0) { + const int excess_value = new_value - (old_value < 0 ? old_value : 0); + gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, + (gpr_atm)-excess_value); + new_value = 0; + } + // Retries are allowed as long as the new value is above the threshold + // (max_milli_tokens / 2). + return new_value > (*throttle_data)->max_milli_tokens / 2; +} + +void grpc_server_retry_throttle_data_record_success( + grpc_server_retry_throttle_data** throttle_data) { + // First, check if we are stale and need to be replaced. + get_replacement_throttle_data_if_needed(throttle_data); + // We increment milli_tokens by milli_token_ratio for each success. + const int delta = (*throttle_data)->milli_token_ratio; + const int old_value = (int)gpr_atm_full_fetch_add( + &(*throttle_data)->milli_tokens, (gpr_atm)delta); + // If the above change takes us over max_milli_tokens, then subtract + // the excess. Note that between these two atomic operations, the + // value will be artificially high by as much as milli_token_ratio, + // but this window should be brief. + const int new_value = old_value + (*throttle_data)->milli_token_ratio; + if (new_value > (*throttle_data)->max_milli_tokens) { + const int excess_value = + new_value - (old_value > (*throttle_data)->max_milli_tokens + ? old_value + : (*throttle_data)->max_milli_tokens); + gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, + (gpr_atm)-excess_value); + } +} + +void grpc_server_retry_throttle_data_ref( + grpc_server_retry_throttle_data* throttle_data) { + gpr_ref(&throttle_data->refs); +} + +void grpc_server_retry_throttle_data_unref( + grpc_server_retry_throttle_data* throttle_data) { + if (gpr_unref(&throttle_data->refs)) { + grpc_server_retry_throttle_data* replacement = + (grpc_server_retry_throttle_data*)gpr_atm_acq_load( + &throttle_data->replacement); + if (replacement != NULL) { + grpc_server_retry_throttle_data_unref(replacement); + } + gpr_free(throttle_data); + } +} + +static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( + int max_milli_tokens, int milli_token_ratio, + grpc_server_retry_throttle_data* old_throttle_data) { + grpc_server_retry_throttle_data* throttle_data = + gpr_malloc(sizeof(*throttle_data)); + memset(throttle_data, 0, sizeof(*throttle_data)); + gpr_ref_init(&throttle_data->refs, 1); + throttle_data->max_milli_tokens = max_milli_tokens; + throttle_data->milli_token_ratio = milli_token_ratio; + int initial_milli_tokens = max_milli_tokens; + // If there was a pre-existing entry for this server name, initialize + // the token count by scaling proportionately to the old data. This + // ensures that if we're already throttling retries on the old scale, + // we will start out doing the same thing on the new one. + if (old_throttle_data != NULL) { + double token_fraction = + (int)gpr_atm_acq_load(&old_throttle_data->milli_tokens) / + (double)old_throttle_data->max_milli_tokens; + initial_milli_tokens = (int)(token_fraction * max_milli_tokens); + } + gpr_atm_rel_store(&throttle_data->milli_tokens, + (gpr_atm)initial_milli_tokens); + // If there was a pre-existing entry, mark it as stale and give it a + // pointer to the new entry, which is its replacement. + if (old_throttle_data != NULL) { + grpc_server_retry_throttle_data_ref(throttle_data); + gpr_atm_rel_store(&old_throttle_data->replacement, (gpr_atm)throttle_data); + } + return throttle_data; +} + +// +// avl vtable for string -> server_retry_throttle_data map +// + +static void* copy_server_name(void* key) { return gpr_strdup(key); } + +static long compare_server_name(void* key1, void* key2) { + return strcmp(key1, key2); +} + +static void destroy_server_retry_throttle_data(void* value) { + grpc_server_retry_throttle_data* throttle_data = value; + grpc_server_retry_throttle_data_unref(throttle_data); +} + +static void* copy_server_retry_throttle_data(void* value) { + grpc_server_retry_throttle_data* throttle_data = value; + grpc_server_retry_throttle_data_ref(throttle_data); + return value; +} + +static const gpr_avl_vtable avl_vtable = { + gpr_free /* destroy_key */, copy_server_name, compare_server_name, + destroy_server_retry_throttle_data, copy_server_retry_throttle_data}; + +// +// server_retry_throttle_map +// + +static gpr_mu g_mu; +static gpr_avl g_avl; + +void grpc_retry_throttle_map_init() { + gpr_mu_init(&g_mu); + g_avl = gpr_avl_create(&avl_vtable); +} + +void grpc_retry_throttle_map_shutdown() { + gpr_mu_destroy(&g_mu); + gpr_avl_unref(g_avl); +} + +grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( + const char* server_name, int max_milli_tokens, int milli_token_ratio) { + gpr_mu_lock(&g_mu); + grpc_server_retry_throttle_data* throttle_data = + gpr_avl_get(g_avl, (char*)server_name); + if (throttle_data == NULL) { + // Entry not found. Create a new one. + throttle_data = grpc_server_retry_throttle_data_create( + max_milli_tokens, milli_token_ratio, NULL); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + } else { + if (throttle_data->max_milli_tokens != max_milli_tokens || + throttle_data->milli_token_ratio != milli_token_ratio) { + // Entry found but with old parameters. Create a new one based on + // the original one. + throttle_data = grpc_server_retry_throttle_data_create( + max_milli_tokens, milli_token_ratio, throttle_data); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + } else { + // Entry found. Increase refcount. + grpc_server_retry_throttle_data_ref(throttle_data); + } + } + gpr_mu_unlock(&g_mu); + return throttle_data; +} diff --git a/src/core/ext/client_channel/retry_throttle.h b/src/core/ext/client_channel/retry_throttle.h new file mode 100644 index 0000000000..4209bb7fb6 --- /dev/null +++ b/src/core/ext/client_channel/retry_throttle.h @@ -0,0 +1,69 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H + +#include + +/// Tracks retry throttling data for an individual server name. +typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data; + +/// Records a failure. Returns true if it's okay to send a retry. +/// Updates \a throttle_data if the original value is stale and has been +/// replaced. Not thread safe; caller must synchronize. +bool grpc_server_retry_throttle_data_record_failure( + grpc_server_retry_throttle_data** throttle_data); +/// Records a success. +/// Updates \a throttle_data if the original value is stale and has been +/// replaced. Not thread safe; caller must synchronize. +void grpc_server_retry_throttle_data_record_success( + grpc_server_retry_throttle_data** throttle_data); + +void grpc_server_retry_throttle_data_ref( + grpc_server_retry_throttle_data* throttle_data); +void grpc_server_retry_throttle_data_unref( + grpc_server_retry_throttle_data* throttle_data); + +/// Initializes global map of failure data for each server name. +void grpc_retry_throttle_map_init(); +/// Shuts down global map of failure data for each server name. +void grpc_retry_throttle_map_shutdown(); + +/// Returns a reference to the failure data for \a server_name, creating +/// a new entry if needed. +/// Caller must eventually unref via \a grpc_server_retry_throttle_data_unref(). +grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( + const char* server_name, int max_milli_tokens, int milli_token_ratio); + +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H */ diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c index 12da2a88fe..1195f75044 100644 --- a/src/core/lib/transport/service_config.c +++ b/src/core/lib/transport/service_config.c @@ -93,6 +93,18 @@ void grpc_service_config_destroy(grpc_service_config* service_config) { gpr_free(service_config); } +void grpc_service_config_parse_global_params( + const grpc_service_config* service_config, + void (*process_json)(const grpc_json* json, void* arg), void* arg) { + const grpc_json* json = service_config->json_tree; + if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return; + for (grpc_json* field = json->child; field != NULL; field = field->next) { + if (field->key == NULL) return; + if (strcmp(field->key, "methodConfig") == 0) continue; + process_json(field, arg); + } +} + const char* grpc_service_config_get_lb_policy_name( const grpc_service_config* service_config) { const grpc_json* json = service_config->json_tree; diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h index cd739a593c..ebfc59b534 100644 --- a/src/core/lib/transport/service_config.h +++ b/src/core/lib/transport/service_config.h @@ -42,6 +42,12 @@ typedef struct grpc_service_config grpc_service_config; grpc_service_config* grpc_service_config_create(const char* json_string); void grpc_service_config_destroy(grpc_service_config* service_config); +/// Invokes \a process_json() for each global parameter in the service +/// config. \a arg is passed as the second argument to \a process_json(). +void grpc_service_config_parse_global_params( + const grpc_service_config* service_config, + void (*process_json)(const grpc_json* json, void* arg), void* arg); + /// Gets the LB policy name from \a service_config. /// Returns NULL if no LB policy name was specified. /// Caller does NOT take ownership. diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index a9f20e6d2a..94d6e46cae 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -263,6 +263,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/client_channel/resolver.c', 'src/core/ext/client_channel/resolver_factory.c', 'src/core/ext/client_channel/resolver_registry.c', + 'src/core/ext/client_channel/retry_throttle.c', 'src/core/ext/client_channel/subchannel.c', 'src/core/ext/client_channel/subchannel_index.c', 'src/core/ext/client_channel/uri_parser.c', -- cgit v1.2.3 From b33225678a656c50df8f6437123355bd178e0e30 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 23 Feb 2017 14:38:02 -0800 Subject: Code review changes. --- src/core/ext/client_channel/client_channel.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 1cc2b9455f..967709bf74 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -277,7 +277,7 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { int milli_token_ratio = 0; for (grpc_json *sub_field = field->child; sub_field != NULL; sub_field = sub_field->next) { - if (sub_field->key == NULL) continue; + if (sub_field->key == NULL) return; if (strcmp(sub_field->key, "maxTokens") == 0) { if (max_milli_tokens != 0) return; // Duplicate. if (sub_field->type != GRPC_JSON_NUMBER) return; @@ -313,6 +313,7 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { return; } milli_token_ratio = (int)((whole_value * multiplier) + decimal_value); + if (milli_token_ratio <= 0) return; } } parsing_state->retry_throttle_data = @@ -1074,7 +1075,9 @@ static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg, &chand->retry_throttle_data); } else { // TODO(roth): In a subsequent PR, check the return value here and - // decide whether or not to retry. + // decide whether or not to retry. Note that we should only + // record failures whose statuses match the configured retryable + // or non-fatal status codes. grpc_server_retry_throttle_data_record_failure( &chand->retry_throttle_data); } -- cgit v1.2.3 From 95039b57dc812dff2c0edfd80f6c09179afabc97 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 24 Feb 2017 07:59:45 -0800 Subject: Fix refcounting bug. --- src/core/ext/client_channel/client_channel.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 967709bf74..c298200465 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -1082,7 +1082,8 @@ static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg, &chand->retry_throttle_data); } } - grpc_closure_run(exec_ctx, calld->original_on_complete, error); + grpc_closure_run(exec_ctx, calld->original_on_complete, + GRPC_ERROR_REF(error)); } static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, -- cgit v1.2.3 From 77532e8bf3d4c93b680ac63a5e436cea92c708f5 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Mon, 27 Feb 2017 11:52:39 -0800 Subject: Destroy pointer args when destructing a ChannelArguments --- include/grpc++/support/channel_arguments.h | 2 +- src/cpp/common/channel_arguments.cc | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/include/grpc++/support/channel_arguments.h b/include/grpc++/support/channel_arguments.h index efdf7772ad..80a3bfda3d 100644 --- a/include/grpc++/support/channel_arguments.h +++ b/include/grpc++/support/channel_arguments.h @@ -54,7 +54,7 @@ class ResourceQuota; class ChannelArguments { public: ChannelArguments(); - ~ChannelArguments() {} + ~ChannelArguments(); ChannelArguments(const ChannelArguments& other); ChannelArguments& operator=(ChannelArguments other) { diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc index 65f3277499..34fd4491d0 100644 --- a/src/cpp/common/channel_arguments.cc +++ b/src/cpp/common/channel_arguments.cc @@ -81,6 +81,16 @@ ChannelArguments::ChannelArguments(const ChannelArguments& other) } } +ChannelArguments::~ChannelArguments() { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + for (auto it = args_.begin(); it != args_.end(); ++it) { + if (it->type == GRPC_ARG_POINTER) { + it->value.pointer.vtable->destroy(&exec_ctx, it->value.pointer.p); + } + } + grpc_exec_ctx_finish(&exec_ctx); +} + void ChannelArguments::Swap(ChannelArguments& other) { args_.swap(other.args_); strings_.swap(other.strings_); -- cgit v1.2.3 From 7e43bfa1fa6a6b6099699baf9819f9572d9e84d2 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Mar 2017 14:01:02 -0800 Subject: Fix fuzzing detected error --- .../ext/transport/chttp2/transport/hpack_parser.c | 10 ++++++++-- .../clusterfuzz-testcase-5298216461402112 | Bin 0 -> 172032 bytes tools/run_tests/generated/tests.json | 22 +++++++++++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) create mode 100644 test/core/transport/chttp2/hpack_parser_corpus/clusterfuzz-testcase-5298216461402112 (limited to 'src') diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 40f5120308..b2f364aa1a 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1625,8 +1625,14 @@ grpc_error *grpc_chttp2_hpack_parser_parse(grpc_exec_ctx *exec_ctx, stack space usage when no tail call optimization is available */ p->current_slice_refcount = slice.refcount; - grpc_error *error = p->state(exec_ctx, p, GRPC_SLICE_START_PTR(slice), - GRPC_SLICE_END_PTR(slice)); + uint8_t *start = GRPC_SLICE_START_PTR(slice); + uint8_t *end = GRPC_SLICE_END_PTR(slice); + grpc_error *error = GRPC_ERROR_NONE; + while (start != end && error == GRPC_ERROR_NONE) { + uint8_t *target = start + GPR_MIN(1024, end - start); + error = p->state(exec_ctx, p, start, target); + start = target; + } p->current_slice_refcount = NULL; return error; } diff --git a/test/core/transport/chttp2/hpack_parser_corpus/clusterfuzz-testcase-5298216461402112 b/test/core/transport/chttp2/hpack_parser_corpus/clusterfuzz-testcase-5298216461402112 new file mode 100644 index 0000000000..04d48d6d76 Binary files /dev/null and b/test/core/transport/chttp2/hpack_parser_corpus/clusterfuzz-testcase-5298216461402112 differ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index f9e2a19d0e..8feab6ba49 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -114587,6 +114587,28 @@ ], "uses_polling": false }, + { + "args": [ + "test/core/transport/chttp2/hpack_parser_corpus/clusterfuzz-testcase-5298216461402112" + ], + "ci_platforms": [ + "linux" + ], + "cpu_cost": 0.1, + "exclude_configs": [ + "tsan" + ], + "exclude_iomgrs": [ + "uv" + ], + "flaky": false, + "language": "c", + "name": "hpack_parser_fuzzer_test_one_entry", + "platforms": [ + "linux" + ], + "uses_polling": false + }, { "args": [ "test/core/transport/chttp2/hpack_parser_corpus/crash-5ac3e1ea7764cfb6383629574262f82dc7b3cada" -- cgit v1.2.3 From c04faa8c03f37f71cdc1ee65c3291a78a928341f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Mar 2017 14:01:51 -0800 Subject: Update comment --- src/core/ext/transport/chttp2/transport/hpack_parser.c | 4 ---- 1 file changed, 4 deletions(-) (limited to 'src') diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index b2f364aa1a..e26ab87ca7 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1620,10 +1620,6 @@ void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx, grpc_error *grpc_chttp2_hpack_parser_parse(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, grpc_slice slice) { - /* TODO(ctiller): limit the distance of end from beg, and perform multiple - steps in the event of a large chunk of data to limit - stack space usage when no tail call optimization is - available */ p->current_slice_refcount = slice.refcount; uint8_t *start = GRPC_SLICE_START_PTR(slice); uint8_t *end = GRPC_SLICE_END_PTR(slice); -- cgit v1.2.3 From e9c83c566c5b193c856a14c86e5a150b2925cc86 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Mar 2017 14:12:07 -0800 Subject: readability --- src/core/ext/transport/chttp2/transport/hpack_parser.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index e26ab87ca7..d94f66001d 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1620,12 +1620,15 @@ void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx, grpc_error *grpc_chttp2_hpack_parser_parse(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, grpc_slice slice) { + /* max number of bytes to parse at a time... limits call stack depth on + * compilers without TCO */ + const size_t max_parse_length = 1024; p->current_slice_refcount = slice.refcount; uint8_t *start = GRPC_SLICE_START_PTR(slice); uint8_t *end = GRPC_SLICE_END_PTR(slice); grpc_error *error = GRPC_ERROR_NONE; while (start != end && error == GRPC_ERROR_NONE) { - uint8_t *target = start + GPR_MIN(1024, end - start); + uint8_t *target = start + GPR_MIN(max_parse_length, end - start); error = p->state(exec_ctx, p, start, target); start = target; } -- cgit v1.2.3 From f134206509de8417f3f568e8b599290c1b955e1f Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Sun, 12 Mar 2017 22:39:11 -0700 Subject: Fix android build --- src/core/lib/iomgr/port.h | 4 ++++ src/core/lib/iomgr/tcp_server_posix.c | 9 ++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index f1897bb91f..94a454c0b7 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -39,6 +39,7 @@ #if defined(GRPC_UV) // Do nothing #elif defined(GPR_MANYLINUX1) +#define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 @@ -65,6 +66,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_LINUX) +#define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 @@ -90,6 +92,7 @@ #define GRPC_POSIX_SOCKETUTILS #endif #elif defined(GPR_APPLE) +#define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 #define GRPC_MSG_IOVLEN_TYPE int @@ -100,6 +103,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_FREEBSD) +#define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 5f286a6723..7cc44fabe7 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -42,9 +42,12 @@ #include "src/core/lib/iomgr/tcp_server.h" +#ifdef GRPC_HAVE_IFADDRS +#include +#endif /* GRPC_HAVE_IFADDRS */ + #include #include -#include #include #include #include @@ -598,6 +601,7 @@ static grpc_error *add_all_local_addrs_to_server(grpc_tcp_server *s, unsigned port_index, int requested_port, int *out_port) { +#ifdef GRPC_HAVE_IFADDRS struct ifaddrs *ifa = NULL; struct ifaddrs *ifa_it; unsigned fd_index = 0; @@ -685,6 +689,9 @@ static grpc_error *add_all_local_addrs_to_server(grpc_tcp_server *s, *out_port = sp->port; return GRPC_ERROR_NONE; } +#else /* GRPC_HAVE_IFADDRS */ + return GRPC_ERROR_NONE; +#endif /* GRPC_HAVE_IFADDRS */ } /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ -- cgit v1.2.3 From d9dd625f6080577abf31c4f8fe4f869ba0d6a233 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Mar 2017 08:31:35 -0700 Subject: Fix compile error --- src/core/ext/transport/chttp2/transport/hpack_parser.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index d94f66001d..1865b997b7 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1620,15 +1620,15 @@ void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx, grpc_error *grpc_chttp2_hpack_parser_parse(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, grpc_slice slice) { - /* max number of bytes to parse at a time... limits call stack depth on - * compilers without TCO */ - const size_t max_parse_length = 1024; +/* max number of bytes to parse at a time... limits call stack depth on + * compilers without TCO */ +#define MAX_PARSE_LENGTH 1024 p->current_slice_refcount = slice.refcount; uint8_t *start = GRPC_SLICE_START_PTR(slice); uint8_t *end = GRPC_SLICE_END_PTR(slice); grpc_error *error = GRPC_ERROR_NONE; while (start != end && error == GRPC_ERROR_NONE) { - uint8_t *target = start + GPR_MIN(max_parse_length, end - start); + uint8_t *target = start + GPR_MIN(MAX_PARSE_LENGTH, end - start); error = p->state(exec_ctx, p, start, target); start = target; } -- cgit v1.2.3 From e7a1702fe99ef598efd957c1f7546904298b9dba Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Mar 2017 10:20:38 -0700 Subject: Fixes --- src/core/lib/channel/channel_stack.c | 7 ++++--- src/core/lib/channel/channel_stack.h | 10 +++++----- src/core/lib/channel/compress_filter.c | 2 +- src/core/lib/channel/connected_channel.c | 4 ++-- src/core/lib/channel/deadline_filter.c | 2 +- src/core/lib/channel/http_client_filter.c | 2 +- src/core/lib/channel/http_server_filter.c | 2 +- src/core/lib/channel/message_size_filter.c | 2 +- src/core/lib/surface/call.c | 22 ++++++++++++++++++---- src/core/lib/surface/lame_client.c | 4 ++-- src/core/lib/surface/server.c | 2 +- src/core/lib/transport/transport.h | 3 ++- src/core/lib/transport/transport_impl.h | 3 ++- 13 files changed, 41 insertions(+), 24 deletions(-) (limited to 'src') diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 3fb2a60ac7..187f2c8b34 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -241,15 +241,16 @@ void grpc_call_stack_ignore_set_pollset_or_pollset_set( void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *then_schedule_closure) { grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack); size_t count = stack->count; size_t i; /* destroy per-filter data */ for (i = 0; i < count; i++) { - elems[i].filter->destroy_call_elem(exec_ctx, &elems[i], final_info, - i == count - 1 ? and_free_memory : NULL); + elems[i].filter->destroy_call_elem( + exec_ctx, &elems[i], final_info, + i == count - 1 ? then_schedule_closure : NULL); } } diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 6d3340bcbf..1b02f02dd6 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -139,12 +139,12 @@ typedef struct { /* Destroy per call data. The filter does not need to do any chaining. The bottom filter of a stack will be passed a non-NULL pointer to - \a and_free_memory that should be passed to gpr_free when destruction - is complete. \a final_info contains data about the completed call, mainly - for reporting purposes. */ + \a then_schedule_closure that should be passed to grpc_closure_sched when + destruction is complete. \a final_info contains data about the completed + call, mainly for reporting purposes. */ void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory); + grpc_closure *then_schedule_closure); /* sizeof(per channel data) */ size_t sizeof_channel_data; @@ -271,7 +271,7 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, /* Destroy a call stack */ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, const grpc_call_final_info *final_info, - void *and_free_memory); + grpc_closure *then_schedule_closure); /* Ignore set pollset{_set} - used by filters if they don't care about pollsets * at all. Does nothing. */ diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index aa41014a21..02dc479f3a 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -292,7 +292,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 29796f7ca7..64cf95b5a5 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -105,12 +105,12 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *then_schedule_closure) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_transport_destroy_stream(exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - and_free_memory); + then_schedule_closure); } /* Constructor for channel_data */ diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 5a12d62f1d..34114bbebf 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -256,7 +256,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, // Destructor for call_data. Used for both client and server filters. static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, - void* and_free_memory) { + grpc_closure* ignored) { grpc_deadline_state_destroy(exec_ctx, elem); } diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index c031533dd8..f9d0d689ac 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -412,7 +412,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); } diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index fb70de8e96..bebd3af335 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -358,7 +358,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; grpc_slice_buffer_destroy_internal(exec_ctx, &calld->read_slice_buffer); } diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index b424c0d2ac..5ba13fe251 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -200,7 +200,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, // Destructor for call_data. static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, - void* ignored) {} + grpc_closure* ignored) {} // Constructor for channel_data. static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index c2547c5147..b57dd8bd8a 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -51,6 +51,7 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -138,6 +139,7 @@ typedef struct batch_control { } batch_control; struct grpc_call { + gpr_arena *arena; grpc_completion_queue *cq; grpc_polling_entity pollent; grpc_channel *channel; @@ -212,6 +214,8 @@ struct grpc_call { grpc_closure receiving_initial_metadata_ready; uint32_t test_only_last_message_flags; + grpc_closure release_call; + union { struct { grpc_status_code *status; @@ -273,7 +277,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, grpc_channel_get_channel_stack(args->channel); grpc_call *call; GPR_TIMER_BEGIN("grpc_call_create", 0); - call = gpr_zalloc(sizeof(grpc_call) + channel_stack->call_stack_size); + gpr_arena *arena = gpr_arena_create(8192); + call = gpr_arena_alloc(arena, + sizeof(grpc_call) + channel_stack->call_stack_size); + call->arena = arena; *out_call = call; gpr_mu_init(&call->mu); call->channel = args->channel; @@ -421,6 +428,13 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } +static void release_call(grpc_exec_ctx *exec_ctx, void *call, + grpc_error *error) { + grpc_call *c = call; + gpr_arena_destroy(c->arena); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); +} + static void set_status_value_directly(grpc_status_code status, void *dest); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { @@ -447,7 +461,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, if (c->cq) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - grpc_channel *channel = c->channel; get_final_status(call, set_status_value_directly, &c->final_info.final_status, NULL); @@ -459,8 +472,9 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error); } - grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); + grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, + grpc_closure_init(&c->release_call, release_call, c, + grpc_schedule_on_exec_ctx)); GPR_TIMER_END("destroy_call", 0); } diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 49bc4c114b..9ddb88bd11 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -130,8 +130,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { - gpr_free(and_free_memory); + grpc_closure *then_schedule_closure) { + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index b360579553..1186a4af63 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -898,7 +898,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index cc1c277b35..11fd5d1697 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -246,7 +246,8 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport, caller, but any child memory must be cleaned up) */ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - grpc_stream *stream, void *and_free_memory); + grpc_stream *stream, + grpc_closure *then_schedule_closure); void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index 8553148c35..9b6095f666 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -67,7 +67,8 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream, void *and_free_memory); + grpc_stream *stream, + grpc_closure *then_schedule_closure); /* implementation of grpc_transport_destroy */ void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self); -- cgit v1.2.3 From d426caca811823f525334ec5952995e0b887f04f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Mar 2017 12:30:45 -0700 Subject: Use an arena for call & subchannel_call allocation --- src/core/ext/census/grpc_filter.c | 4 +-- src/core/ext/client_channel/client_channel.c | 26 +++++++++++++---- src/core/ext/client_channel/subchannel.c | 33 ++++++++++++++++------ src/core/ext/client_channel/subchannel.h | 18 ++++++++++-- .../ext/load_reporting/load_reporting_filter.c | 2 +- .../transport/chttp2/transport/chttp2_transport.c | 7 +++-- src/core/ext/transport/chttp2/transport/internal.h | 2 +- src/core/lib/channel/channel_stack.c | 29 +++++++------------ src/core/lib/channel/channel_stack.h | 13 +++++---- .../lib/security/transport/client_auth_filter.c | 2 +- .../lib/security/transport/server_auth_filter.c | 2 +- src/core/lib/surface/call.c | 15 ++++++---- src/core/lib/transport/transport.c | 5 ++-- test/core/channel/channel_stack_test.c | 16 +++++++---- test/core/end2end/tests/filter_call_init_fails.c | 2 +- test/core/end2end/tests/filter_causes_close.c | 2 +- test/core/end2end/tests/filter_latency.c | 4 +-- 17 files changed, 116 insertions(+), 66 deletions(-) (limited to 'src') diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c index b80d831557..fc29dbd454 100644 --- a/src/core/ext/census/grpc_filter.c +++ b/src/core/ext/census/grpc_filter.c @@ -138,7 +138,7 @@ static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx, static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */ @@ -160,7 +160,7 @@ static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx, static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */ diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index bf64f84772..f6550292a0 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -660,6 +660,7 @@ typedef struct client_channel_call_data { /** either 0 for no call, 1 for cancelled, or a pointer to a grpc_subchannel_call */ gpr_atm subchannel_call; + gpr_arena *arena; subchannel_creation_phase creation_phase; grpc_connected_subchannel *connected_subchannel; @@ -754,9 +755,14 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { /* Create call on subchannel. */ grpc_subchannel_call *subchannel_call = NULL; + grpc_connected_subchannel_call_args call_args = { + .pollent = calld->pollent, + .path = calld->path, + .start_time = calld->call_start_time, + .deadline = calld->deadline, + .arena = calld->arena}; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->call_start_time, calld->deadline, &subchannel_call); + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); subchannel_call = CANCELLED_CALL; @@ -982,9 +988,14 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; + grpc_connected_subchannel_call_args call_args = { + .pollent = calld->pollent, + .path = calld->path, + .start_time = calld->call_start_time, + .deadline = calld->deadline, + .arena = calld->arena}; grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->call_start_time, calld->deadline, &subchannel_call); + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); @@ -1161,6 +1172,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; calld->owning_call = args->call_stack; calld->pollent = NULL; + calld->arena = args->arena; GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config"); grpc_closure_sched( exec_ctx, @@ -1175,7 +1187,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *then_schedule_closure) { call_data *calld = elem->call_data; grpc_deadline_state_destroy(exec_ctx, elem); grpc_slice_unref_internal(exec_ctx, calld->path); @@ -1185,6 +1197,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { + grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure); + then_schedule_closure = NULL; GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); @@ -1194,7 +1208,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, "picked"); } gpr_free(calld->waiting_ops); - gpr_free(and_free_memory); + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 5df0a9060d..e4b669c582 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -148,6 +148,7 @@ struct grpc_subchannel { struct grpc_subchannel_call { grpc_connected_subchannel *connection; + grpc_closure *schedule_closure_after_destroy; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) @@ -719,13 +720,22 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_subchannel_call *c = call; + GPR_ASSERT(c->schedule_closure_after_destroy != NULL); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); grpc_connected_subchannel *connection = c->connection; - grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, c); + grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, + c->schedule_closure_after_destroy); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } +void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call *call, + grpc_closure *closure) { + GPR_ASSERT(call->schedule_closure_after_destroy == NULL); + GPR_ASSERT(closure != NULL); + call->schedule_closure_after_destroy = closure; +} + void grpc_subchannel_call_ref( grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); @@ -761,15 +771,22 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, - gpr_timespec deadline, grpc_subchannel_call **call) { + const grpc_connected_subchannel_call_args *args, + grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - *call = gpr_zalloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); + *call = gpr_arena_alloc( + args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. - grpc_error *error = - grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, path, start_time, deadline, callstk); + grpc_call_element_args call_args = {.call_stack = callstk, + .server_transport_data = NULL, + .context = NULL, + .path = args->path, + .start_time = args->start_time, + .deadline = args->deadline, + .arena = args->arena}; + grpc_error *error = grpc_call_stack_init( + exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); @@ -778,7 +795,7 @@ grpc_error *grpc_connected_subchannel_create_call( return error; } GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); - grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent); + grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, args->pollent); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 6a70a76467..3e64a2507c 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -37,6 +37,7 @@ #include "src/core/ext/client_channel/connector.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" @@ -112,10 +113,18 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_REF_EXTRA_ARGS); /** construct a subchannel call */ +typedef struct { + grpc_polling_entity *pollent; + grpc_slice path; + gpr_timespec start_time; + gpr_timespec deadline; + gpr_arena *arena; +} grpc_connected_subchannel_call_args; + grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, - gpr_timespec deadline, grpc_subchannel_call **subchannel_call); + const grpc_connected_subchannel_call_args *args, + grpc_subchannel_call **subchannel_call); /** process a transport level op */ void grpc_connected_subchannel_process_transport_op( @@ -154,6 +163,11 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call); +/** Must be called once per call. Sets the 'then_schedule_closure' argument for + call stack destruction. */ +void grpc_subchannel_call_set_cleanup_closure( + grpc_subchannel_call *subchannel_call, grpc_closure *closure); + grpc_call_stack *grpc_subchannel_call_get_call_stack( grpc_subchannel_call *subchannel_call); diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index c2750634a5..4ed832671d 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -123,7 +123,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; /* TODO(dgq): do something with the data diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index da4c7dc7b2..8024dd4702 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -665,16 +665,17 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_TIMER_END("destroy_stream", 0); - gpr_free(s->destroy_stream_arg); + grpc_closure_sched(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { + grpc_stream *gs, + grpc_closure *then_schedule_closure) { GPR_TIMER_BEGIN("destroy_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - s->destroy_stream_arg = and_free_memory; + s->destroy_stream_arg = then_schedule_closure; grpc_closure_sched( exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s, grpc_combiner_scheduler(t->combiner, false)), diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index d26812ad6b..3c56c21599 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -425,7 +425,7 @@ struct grpc_chttp2_stream { grpc_stream_refcount *refcount; grpc_closure destroy_stream; - void *destroy_stream_arg; + grpc_closure *destroy_stream_arg; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; uint8_t included[STREAM_LIST_COUNT]; diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 187f2c8b34..6d53b0576e 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -166,41 +166,32 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, } } -grpc_error *grpc_call_stack_init( - grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, - int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, - grpc_call_context_element *context, const void *transport_server_data, - grpc_slice path, gpr_timespec start_time, gpr_timespec deadline, - grpc_call_stack *call_stack) { +grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, + int initial_refs, grpc_iomgr_cb_func destroy, + void *destroy_arg, + const grpc_call_element_args *elem_args) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); size_t count = channel_stack->count; grpc_call_element *call_elems; char *user_data; size_t i; - call_stack->count = count; - GRPC_STREAM_REF_INIT(&call_stack->refcount, initial_refs, destroy, + elem_args->call_stack->count = count; + GRPC_STREAM_REF_INIT(&elem_args->call_stack->refcount, initial_refs, destroy, destroy_arg, "CALL_STACK"); - call_elems = CALL_ELEMS_FROM_STACK(call_stack); + call_elems = CALL_ELEMS_FROM_STACK(elem_args->call_stack); user_data = ((char *)call_elems) + ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); /* init per-filter data */ grpc_error *first_error = GRPC_ERROR_NONE; - const grpc_call_element_args args = { - .start_time = start_time, - .call_stack = call_stack, - .server_transport_data = transport_server_data, - .context = context, - .path = path, - .deadline = deadline, - }; for (i = 0; i < count; i++) { call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].call_data = user_data; - grpc_error *error = - call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args); + grpc_error *error = call_elems[i].filter->init_call_elem( + exec_ctx, &call_elems[i], elem_args); if (error != GRPC_ERROR_NONE) { if (first_error == GRPC_ERROR_NONE) { first_error = error; diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 1b02f02dd6..80e3603e8d 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -56,6 +56,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/transport/transport.h" #ifdef __cplusplus @@ -84,6 +85,7 @@ typedef struct { grpc_slice path; gpr_timespec start_time; gpr_timespec deadline; + gpr_arena *arena; } grpc_call_element_args; typedef struct { @@ -236,12 +238,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, /* Initialize a call stack given a channel stack. transport_server_data is expected to be NULL on a client, or an opaque transport owned pointer on the server. */ -grpc_error *grpc_call_stack_init( - grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, - int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, - grpc_call_context_element *context, const void *transport_server_data, - grpc_slice path, gpr_timespec start_time, gpr_timespec deadline, - grpc_call_stack *call_stack); +grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, + int initial_refs, grpc_iomgr_cb_func destroy, + void *destroy_arg, + const grpc_call_element_args *elem_args); /* Set a pollset or a pollset_set for a call stack: must occur before the first * op is started */ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index a23082a866..8dea1d98ff 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -318,7 +318,7 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; grpc_call_credentials_unref(exec_ctx, calld->creds); if (calld->have_host) { diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 14619d97ca..01cb473177 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -227,7 +227,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) {} + grpc_closure *ignored) {} /* Constructor for channel_data */ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index b57dd8bd8a..84c401db35 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -367,11 +367,16 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); /* initial refcount dropped by grpc_call_destroy */ + grpc_call_element_args call_args = { + .call_stack = CALL_STACK_FROM_CALL(call), + .server_transport_data = args->server_transport_data, + .context = call->context, + .path = path, + .start_time = call->start_time, + .deadline = send_deadline, + .arena = call->arena}; add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1, - destroy_call, call, call->context, - args->server_transport_data, path, - call->start_time, send_deadline, - CALL_STACK_FROM_CALL(call))); + destroy_call, call, &call_args)); if (error != GRPC_ERROR_NONE) { cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); @@ -431,8 +436,8 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { static void release_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_call *c = call; - gpr_arena_destroy(c->arena); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); + gpr_arena_destroy(c->arena); } static void set_status_value_directly(grpc_status_code status, void *dest); diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 165950e288..7635fc730d 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -197,9 +197,10 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport, void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - grpc_stream *stream, void *and_free_memory) { + grpc_stream *stream, + grpc_closure *then_schedule_closure) { transport->vtable->destroy_stream(exec_ctx, transport, stream, - and_free_memory); + then_schedule_closure); } char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index 76bb57346c..0c424c1ec4 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -68,7 +68,7 @@ static void channel_destroy_func(grpc_exec_ctx *exec_ctx, static void call_destroy_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { ++*(int *)(elem->channel_data); } @@ -139,10 +139,16 @@ static void test_create_channel_stack(void) { GPR_ASSERT(*channel_data == 0); call_stack = gpr_malloc(channel_stack->call_stack_size); - grpc_error *error = - grpc_call_stack_init(&exec_ctx, channel_stack, 1, free_call, call_stack, - NULL, NULL, path, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC), call_stack); + grpc_call_element_args args = { + .call_stack = call_stack, + .server_transport_data = NULL, + .context = NULL, + .path = path, + .start_time = gpr_now(GPR_CLOCK_MONOTONIC), + .deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC), + .arena = NULL}; + grpc_error *error = grpc_call_stack_init(&exec_ctx, channel_stack, 1, + free_call, call_stack, &args); GPR_ASSERT(error == GRPC_ERROR_NONE); GPR_ASSERT(call_stack->count == 1); call_elem = grpc_call_stack_element(call_stack, 0); diff --git a/test/core/end2end/tests/filter_call_init_fails.c b/test/core/end2end/tests/filter_call_init_fails.c index d2d6e82d57..65216cf19d 100644 --- a/test/core/end2end/tests/filter_call_init_fails.c +++ b/test/core/end2end/tests/filter_call_init_fails.c @@ -213,7 +213,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) {} + grpc_closure *ignored) {} static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index 25e606556d..c968f30b3b 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -236,7 +236,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) {} + grpc_closure *ignored) {} static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, diff --git a/test/core/end2end/tests/filter_latency.c b/test/core/end2end/tests/filter_latency.c index d05e9e79a1..2428c92a42 100644 --- a/test/core/end2end/tests/filter_latency.c +++ b/test/core/end2end/tests/filter_latency.c @@ -267,7 +267,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *ignored) { gpr_mu_lock(&g_mu); g_client_latency = final_info->stats.latency; gpr_mu_unlock(&g_mu); @@ -276,7 +276,7 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *ignored) { gpr_mu_lock(&g_mu); g_server_latency = final_info->stats.latency; gpr_mu_unlock(&g_mu); -- cgit v1.2.3 From 7d2c39827688026ca54b6a8a4d36c36d79aa282a Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Mar 2017 12:58:29 -0700 Subject: Additionally use arena for incoming metadata allocation --- .../transport/chttp2/transport/chttp2_transport.c | 26 +++++---- .../transport/chttp2/transport/incoming_metadata.c | 65 ++++++++-------------- .../transport/chttp2/transport/incoming_metadata.h | 18 +++--- src/core/ext/transport/chttp2/transport/parsing.c | 18 +++++- src/core/lib/channel/connected_channel.c | 2 +- src/core/lib/transport/transport.c | 4 +- src/core/lib/transport/transport.h | 3 +- src/core/lib/transport/transport_impl.h | 2 +- 8 files changed, 67 insertions(+), 71 deletions(-) (limited to 'src') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 8024dd4702..114b14e884 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -575,7 +575,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { GPR_TIMER_BEGIN("init_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; @@ -588,8 +588,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_ref_init(&s->active_streams, 1); GRPC_CHTTP2_STREAM_REF(s, "chttp2"); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena); grpc_chttp2_data_parser_init(&s->data_parser); grpc_slice_buffer_init(&s->flow_controlled_buffer); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); @@ -1630,15 +1630,19 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, s->recv_trailing_metadata_finished != NULL) { char status_string[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(status, status_string); - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_STATUS, - grpc_slice_from_copied_string(status_string))); + GRPC_LOG_IF_ERROR("add_status", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_GRPC_STATUS, + grpc_slice_from_copied_string(status_string)))); if (msg != NULL) { - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_from_copied_string(msg))); + GRPC_LOG_IF_ERROR( + "add_status_message", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_slice_from_copied_string(msg)))); } s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index c91b019aa0..ab900fdff2 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -41,69 +41,48 @@ #include void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer) { - buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) { + grpc_metadata_batch_init(&buffer->batch); + buffer->arena = arena; + buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) { - size_t i; - if (!buffer->published) { - for (i = 0; i < buffer->count; i++) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - gpr_free(buffer->elems); + grpc_metadata_batch_destroy(exec_ctx, &buffer->batch); } -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - GPR_ASSERT(!buffer->published); - if (buffer->capacity == buffer->count) { - buffer->capacity = GPR_MAX(8, 2 * buffer->capacity); - buffer->elems = - gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity); - } - buffer->elems[buffer->count++].md = elem; +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) { buffer->size += GRPC_MDELEM_LENGTH(elem); + return grpc_metadata_batch_add_tail( + exec_ctx, &buffer->batch, + gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem); } -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - for (size_t i = 0; i < buffer->count; i++) { - if (grpc_slice_eq(GRPC_MDKEY(buffer->elems[i].md), GRPC_MDKEY(elem))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - buffer->elems[i].md = elem; - return; + for (grpc_linked_mdelem *l = buffer->batch.list.head; l != NULL; + l = l->next) { + if (grpc_slice_eq(GRPC_MDKEY(l->md), GRPC_MDKEY(elem))) { + GRPC_MDELEM_UNREF(exec_ctx, l->md); + l->md = elem; + return GRPC_ERROR_NONE; } } - grpc_chttp2_incoming_metadata_buffer_add(buffer, elem); + return grpc_chttp2_incoming_metadata_buffer_add(exec_ctx, buffer, elem); } void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) { - GPR_ASSERT(!buffer->published); - buffer->deadline = deadline; + buffer->batch.deadline = deadline; } void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch) { - GPR_ASSERT(!buffer->published); - buffer->published = 1; - if (buffer->count > 0) { - size_t i; - for (i = 0; i < buffer->count; i++) { - /* TODO(ctiller): do something better here */ - if (!GRPC_LOG_IF_ERROR("grpc_chttp2_incoming_metadata_buffer_publish", - grpc_metadata_batch_link_tail( - exec_ctx, batch, &buffer->elems[i]))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - } else { - batch->list.head = batch->list.tail = NULL; - } - batch->deadline = buffer->deadline; + *batch = buffer->batch; + grpc_metadata_batch_init(&buffer->batch); } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h index 1eac6fc150..288c917e65 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h @@ -37,28 +37,26 @@ #include "src/core/lib/transport/transport.h" typedef struct { - grpc_linked_mdelem *elems; - size_t count; - size_t capacity; - gpr_timespec deadline; - int published; + gpr_arena *arena; + grpc_metadata_batch batch; size_t size; // total size of metadata } grpc_chttp2_incoming_metadata_buffer; /** assumes everything initially zeroed */ void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena); void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer); void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch); -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem); -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, - grpc_mdelem elem); + grpc_mdelem elem) GRPC_MUST_USE_RESULT; +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) GRPC_MUST_USE_RESULT; void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline); diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index e7f2597f89..7efc8c63c9 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -548,7 +548,14 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[0], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } } @@ -598,7 +605,14 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[1], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } GPR_TIMER_END("on_trailing_header", 0); diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 64cf95b5a5..42ef7b7806 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -88,7 +88,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; int r = grpc_transport_init_stream( exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - &args->call_stack->refcount, args->server_transport_data); + &args->call_stack->refcount, args->server_transport_data, args->arena); return r == 0 ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("transport stream initialization failed"); } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 7635fc730d..c1a364cbbf 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -162,9 +162,9 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { return transport->vtable->init_stream(exec_ctx, transport, stream, refcount, - server_data); + server_data, arena); } void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 11fd5d1697..950b18aeda 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -41,6 +41,7 @@ #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/transport/byte_stream.h" #include "src/core/lib/transport/metadata_batch.h" @@ -229,7 +230,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport); int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, grpc_stream_refcount *refcount, - const void *server_data); + const void *server_data, gpr_arena *arena); void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, grpc_polling_entity *pollent); diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index 9b6095f666..6f688bf8d2 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -47,7 +47,7 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_init_stream */ int (*init_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self, grpc_stream *stream, grpc_stream_refcount *refcount, - const void *server_data); + const void *server_data, gpr_arena *arena); /* implementation of grpc_transport_set_pollset */ void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_transport *self, -- cgit v1.2.3 From ca294650ba9613044c89635b91d6d5ed603d14c5 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Mon, 13 Mar 2017 14:50:34 -0700 Subject: Change macros --- src/core/lib/iomgr/tcp_server_posix.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 7cc44fabe7..61ffb02a71 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -601,7 +601,9 @@ static grpc_error *add_all_local_addrs_to_server(grpc_tcp_server *s, unsigned port_index, int requested_port, int *out_port) { -#ifdef GRPC_HAVE_IFADDRS +#ifndef GRPC_HAVE_IFADDRS + return GRPC_ERROR_CREATE("no ifaddrs available"); +#else /* ifndef GRPC_HAVE_IFADDRS */ struct ifaddrs *ifa = NULL; struct ifaddrs *ifa_it; unsigned fd_index = 0; @@ -689,9 +691,7 @@ static grpc_error *add_all_local_addrs_to_server(grpc_tcp_server *s, *out_port = sp->port; return GRPC_ERROR_NONE; } -#else /* GRPC_HAVE_IFADDRS */ - return GRPC_ERROR_NONE; -#endif /* GRPC_HAVE_IFADDRS */ +#endif /* ifndef GRPC_HAVE_IFADDRS */ } /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ @@ -708,10 +708,14 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, grpc_error *v6_err = GRPC_ERROR_NONE; grpc_error *v4_err = GRPC_ERROR_NONE; *out_port = -1; + +#ifdef GRPC_HAVE_IFADDRS if (s->expand_wildcard_addrs) { return add_all_local_addrs_to_server(s, port_index, requested_port, out_port); } +#endif /* GRPC_HAVE_IFADDRS */ + grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6); /* Try listening on IPv6 first. */ if ((v6_err = add_addr_to_server(s, &wild6, port_index, fd_index, &dsmode, -- cgit v1.2.3 From 47155ed254e7d798cd42797eea1533d5264c565c Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Mon, 13 Mar 2017 17:18:07 -0700 Subject: Split ifaddr related functions out into a separate file --- BUILD | 3 + CMakeLists.txt | 10 ++ Makefile | 10 ++ binding.gyp | 2 + build.yaml | 3 + config.m4 | 2 + gRPC-Core.podspec | 4 + grpc.gemspec | 3 + package.xml | 3 + src/core/lib/iomgr/tcp_server_posix.c | 192 ++------------------- src/core/lib/iomgr/tcp_server_utils_posix.h | 89 ++++++++++ .../lib/iomgr/tcp_server_utils_posix_ifaddrs.c | 178 +++++++++++++++++++ .../lib/iomgr/tcp_server_utils_posix_noifaddrs.c | 49 ++++++ src/python/grpcio/grpc_core_dependencies.py | 2 + tools/doxygen/Doxyfile.core.internal | 3 + tools/run_tests/generated/sources_and_headers.json | 4 + vsprojects/vcxproj/grpc/grpc.vcxproj | 5 + vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 9 + .../vcxproj/grpc_test_util/grpc_test_util.vcxproj | 5 + .../grpc_test_util/grpc_test_util.vcxproj.filters | 9 + .../vcxproj/grpc_unsecure/grpc_unsecure.vcxproj | 5 + .../grpc_unsecure/grpc_unsecure.vcxproj.filters | 9 + 22 files changed, 423 insertions(+), 176 deletions(-) create mode 100644 src/core/lib/iomgr/tcp_server_utils_posix.h create mode 100644 src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c create mode 100644 src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c (limited to 'src') diff --git a/BUILD b/BUILD index ca0a1c5607..dc6431484e 100644 --- a/BUILD +++ b/BUILD @@ -481,6 +481,8 @@ grpc_cc_library( "src/core/lib/iomgr/tcp_client_windows.c", "src/core/lib/iomgr/tcp_posix.c", "src/core/lib/iomgr/tcp_server_posix.c", + "src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c", + "src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c", "src/core/lib/iomgr/tcp_server_uv.c", "src/core/lib/iomgr/tcp_server_windows.c", "src/core/lib/iomgr/tcp_uv.c", @@ -599,6 +601,7 @@ grpc_cc_library( "src/core/lib/iomgr/tcp_client_posix.h", "src/core/lib/iomgr/tcp_posix.h", "src/core/lib/iomgr/tcp_server.h", + "src/core/lib/iomgr/tcp_server_utils_posix.h", "src/core/lib/iomgr/tcp_uv.h", "src/core/lib/iomgr/tcp_windows.h", "src/core/lib/iomgr/time_averaged_stats.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index d0a65b4493..cfc4555834 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -914,6 +914,8 @@ add_library(grpc src/core/lib/iomgr/tcp_client_windows.c src/core/lib/iomgr/tcp_posix.c src/core/lib/iomgr/tcp_server_posix.c + src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c + src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c src/core/lib/iomgr/tcp_server_uv.c src/core/lib/iomgr/tcp_server_windows.c src/core/lib/iomgr/tcp_uv.c @@ -1223,6 +1225,8 @@ add_library(grpc_cronet src/core/lib/iomgr/tcp_client_windows.c src/core/lib/iomgr/tcp_posix.c src/core/lib/iomgr/tcp_server_posix.c + src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c + src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c src/core/lib/iomgr/tcp_server_uv.c src/core/lib/iomgr/tcp_server_windows.c src/core/lib/iomgr/tcp_uv.c @@ -1523,6 +1527,8 @@ add_library(grpc_test_util src/core/lib/iomgr/tcp_client_windows.c src/core/lib/iomgr/tcp_posix.c src/core/lib/iomgr/tcp_server_posix.c + src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c + src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c src/core/lib/iomgr/tcp_server_uv.c src/core/lib/iomgr/tcp_server_windows.c src/core/lib/iomgr/tcp_uv.c @@ -1769,6 +1775,8 @@ add_library(grpc_unsecure src/core/lib/iomgr/tcp_client_windows.c src/core/lib/iomgr/tcp_posix.c src/core/lib/iomgr/tcp_server_posix.c + src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c + src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c src/core/lib/iomgr/tcp_server_uv.c src/core/lib/iomgr/tcp_server_windows.c src/core/lib/iomgr/tcp_uv.c @@ -2376,6 +2384,8 @@ add_library(grpc++_cronet src/core/lib/iomgr/tcp_client_windows.c src/core/lib/iomgr/tcp_posix.c src/core/lib/iomgr/tcp_server_posix.c + src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c + src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c src/core/lib/iomgr/tcp_server_uv.c src/core/lib/iomgr/tcp_server_windows.c src/core/lib/iomgr/tcp_uv.c diff --git a/Makefile b/Makefile index a9242dddbd..14c5a36c95 100644 --- a/Makefile +++ b/Makefile @@ -2799,6 +2799,8 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/tcp_client_windows.c \ src/core/lib/iomgr/tcp_posix.c \ src/core/lib/iomgr/tcp_server_posix.c \ + src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ + src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ src/core/lib/iomgr/tcp_server_windows.c \ src/core/lib/iomgr/tcp_uv.c \ @@ -3111,6 +3113,8 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/iomgr/tcp_client_windows.c \ src/core/lib/iomgr/tcp_posix.c \ src/core/lib/iomgr/tcp_server_posix.c \ + src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ + src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ src/core/lib/iomgr/tcp_server_windows.c \ src/core/lib/iomgr/tcp_uv.c \ @@ -3414,6 +3418,8 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/iomgr/tcp_client_windows.c \ src/core/lib/iomgr/tcp_posix.c \ src/core/lib/iomgr/tcp_server_posix.c \ + src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ + src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ src/core/lib/iomgr/tcp_server_windows.c \ src/core/lib/iomgr/tcp_uv.c \ @@ -3640,6 +3646,8 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/tcp_client_windows.c \ src/core/lib/iomgr/tcp_posix.c \ src/core/lib/iomgr/tcp_server_posix.c \ + src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ + src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ src/core/lib/iomgr/tcp_server_windows.c \ src/core/lib/iomgr/tcp_uv.c \ @@ -4249,6 +4257,8 @@ LIBGRPC++_CRONET_SRC = \ src/core/lib/iomgr/tcp_client_windows.c \ src/core/lib/iomgr/tcp_posix.c \ src/core/lib/iomgr/tcp_server_posix.c \ + src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ + src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ src/core/lib/iomgr/tcp_server_windows.c \ src/core/lib/iomgr/tcp_uv.c \ diff --git a/binding.gyp b/binding.gyp index c521a27c30..ec21a97806 100644 --- a/binding.gyp +++ b/binding.gyp @@ -667,6 +667,8 @@ 'src/core/lib/iomgr/tcp_client_windows.c', 'src/core/lib/iomgr/tcp_posix.c', 'src/core/lib/iomgr/tcp_server_posix.c', + 'src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c', + 'src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c', 'src/core/lib/iomgr/tcp_server_uv.c', 'src/core/lib/iomgr/tcp_server_windows.c', 'src/core/lib/iomgr/tcp_uv.c', diff --git a/build.yaml b/build.yaml index a32a8a06d3..36817a5506 100644 --- a/build.yaml +++ b/build.yaml @@ -228,6 +228,7 @@ filegroups: - src/core/lib/iomgr/tcp_client_posix.h - src/core/lib/iomgr/tcp_posix.h - src/core/lib/iomgr/tcp_server.h + - src/core/lib/iomgr/tcp_server_utils_posix.h - src/core/lib/iomgr/tcp_uv.h - src/core/lib/iomgr/tcp_windows.h - src/core/lib/iomgr/time_averaged_stats.h @@ -337,6 +338,8 @@ filegroups: - src/core/lib/iomgr/tcp_client_windows.c - src/core/lib/iomgr/tcp_posix.c - src/core/lib/iomgr/tcp_server_posix.c + - src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c + - src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c - src/core/lib/iomgr/tcp_server_uv.c - src/core/lib/iomgr/tcp_server_windows.c - src/core/lib/iomgr/tcp_uv.c diff --git a/config.m4 b/config.m4 index 90536e503e..181e44c52c 100644 --- a/config.m4 +++ b/config.m4 @@ -140,6 +140,8 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/tcp_client_windows.c \ src/core/lib/iomgr/tcp_posix.c \ src/core/lib/iomgr/tcp_server_posix.c \ + src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ + src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ src/core/lib/iomgr/tcp_server_windows.c \ src/core/lib/iomgr/tcp_uv.c \ diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 027babcda4..1d002247d4 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -309,6 +309,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/tcp_client_posix.h', 'src/core/lib/iomgr/tcp_posix.h', 'src/core/lib/iomgr/tcp_server.h', + 'src/core/lib/iomgr/tcp_server_utils_posix.h', 'src/core/lib/iomgr/tcp_uv.h', 'src/core/lib/iomgr/tcp_windows.h', 'src/core/lib/iomgr/time_averaged_stats.h', @@ -508,6 +509,8 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/tcp_client_windows.c', 'src/core/lib/iomgr/tcp_posix.c', 'src/core/lib/iomgr/tcp_server_posix.c', + 'src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c', + 'src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c', 'src/core/lib/iomgr/tcp_server_uv.c', 'src/core/lib/iomgr/tcp_server_windows.c', 'src/core/lib/iomgr/tcp_uv.c', @@ -744,6 +747,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/tcp_client_posix.h', 'src/core/lib/iomgr/tcp_posix.h', 'src/core/lib/iomgr/tcp_server.h', + 'src/core/lib/iomgr/tcp_server_utils_posix.h', 'src/core/lib/iomgr/tcp_uv.h', 'src/core/lib/iomgr/tcp_windows.h', 'src/core/lib/iomgr/time_averaged_stats.h', diff --git a/grpc.gemspec b/grpc.gemspec index 8d5b7b2ab1..edc08cb9b9 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -226,6 +226,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/tcp_client_posix.h ) s.files += %w( src/core/lib/iomgr/tcp_posix.h ) s.files += %w( src/core/lib/iomgr/tcp_server.h ) + s.files += %w( src/core/lib/iomgr/tcp_server_utils_posix.h ) s.files += %w( src/core/lib/iomgr/tcp_uv.h ) s.files += %w( src/core/lib/iomgr/tcp_windows.h ) s.files += %w( src/core/lib/iomgr/time_averaged_stats.h ) @@ -425,6 +426,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/tcp_client_windows.c ) s.files += %w( src/core/lib/iomgr/tcp_posix.c ) s.files += %w( src/core/lib/iomgr/tcp_server_posix.c ) + s.files += %w( src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c ) + s.files += %w( src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c ) s.files += %w( src/core/lib/iomgr/tcp_server_uv.c ) s.files += %w( src/core/lib/iomgr/tcp_server_windows.c ) s.files += %w( src/core/lib/iomgr/tcp_uv.c ) diff --git a/package.xml b/package.xml index 4167bef26e..21eeee4d3e 100644 --- a/package.xml +++ b/package.xml @@ -235,6 +235,7 @@ + @@ -434,6 +435,8 @@ + + diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 61ffb02a71..1e9e854d6b 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -42,10 +42,6 @@ #include "src/core/lib/iomgr/tcp_server.h" -#ifdef GRPC_HAVE_IFADDRS -#include -#endif /* GRPC_HAVE_IFADDRS */ - #include #include #include @@ -70,6 +66,7 @@ #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_posix.h" +#include "src/core/lib/iomgr/tcp_server_utils_posix.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/string.h" @@ -78,29 +75,6 @@ static gpr_once s_init_max_accept_queue_size; static int s_max_accept_queue_size; -/* one listening port */ -typedef struct grpc_tcp_listener grpc_tcp_listener; -struct grpc_tcp_listener { - int fd; - grpc_fd *emfd; - grpc_tcp_server *server; - grpc_resolved_address addr; - int port; - unsigned port_index; - unsigned fd_index; - grpc_closure read_closure; - grpc_closure destroyed_closure; - struct grpc_tcp_listener *next; - /* sibling is a linked list of all listeners for a given port. add_port and - clone_port place all new listeners in the same sibling list. A member of - the 'sibling' list is also a member of the 'next' list. The head of each - sibling list has is_sibling==0, and subsequent members of sibling lists - have is_sibling==1. is_sibling allows separate sibling lists to be - identified while iterating through 'next'. */ - struct grpc_tcp_listener *sibling; - int is_sibling; -}; - /* the overall server */ struct grpc_tcp_server { gpr_refcount refs; @@ -526,11 +500,11 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, /* If successful, add a listener to s for addr, set *dsmode for the socket, and return the *listener. */ -static grpc_error *add_addr_to_server(grpc_tcp_server *s, - const grpc_resolved_address *addr, - unsigned port_index, unsigned fd_index, - grpc_dualstack_mode *dsmode, - grpc_tcp_listener **listener) { +grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s, + const grpc_resolved_address *addr, + unsigned port_index, unsigned fd_index, + grpc_dualstack_mode *dsmode, + grpc_tcp_listener **listener) { grpc_resolved_address addr4_copy; int fd; grpc_error *err = @@ -545,39 +519,9 @@ static grpc_error *add_addr_to_server(grpc_tcp_server *s, return add_socket_to_server(s, fd, addr, port_index, fd_index, listener); } -/* Bind to "::" to get a port number not used by any address. */ -static grpc_error *get_unused_port(int *port) { - grpc_resolved_address wild; - grpc_sockaddr_make_wildcard6(0, &wild); - grpc_dualstack_mode dsmode; - int fd; - grpc_error *err = - grpc_create_dualstack_socket(&wild, SOCK_STREAM, 0, &dsmode, &fd); - if (err != GRPC_ERROR_NONE) { - return err; - } - if (dsmode == GRPC_DSMODE_IPV4) { - grpc_sockaddr_make_wildcard4(0, &wild); - } - if (bind(fd, (const struct sockaddr *)wild.addr, (socklen_t)wild.len) != 0) { - err = GRPC_OS_ERROR(errno, "bind"); - close(fd); - return err; - } - if (getsockname(fd, (struct sockaddr *)wild.addr, (socklen_t *)&wild.len) != - 0) { - err = GRPC_OS_ERROR(errno, "getsockname"); - close(fd); - return err; - } - close(fd); - *port = grpc_sockaddr_get_port(&wild); - return *port <= 0 ? GRPC_ERROR_CREATE("Bad port") : GRPC_ERROR_NONE; -} - /* Return the listener in s with address addr or NULL. */ -static grpc_tcp_listener *find_listener_with_addr(grpc_tcp_server *s, - grpc_resolved_address *addr) { +grpc_tcp_listener *grpc_tcp_server_find_listener_with_addr( + grpc_tcp_server *s, grpc_resolved_address *addr) { grpc_tcp_listener *l; gpr_mu_lock(&s->mu); for (l = s->head; l != NULL; l = l->next) { @@ -592,108 +536,6 @@ static grpc_tcp_listener *find_listener_with_addr(grpc_tcp_server *s, return l; } -/* Get all addresses assigned to network interfaces on the machine and create a - listener for each. requested_port is the port to use for every listener, or 0 - to select one random port that will be used for every listener. Set *out_port - to the port selected. Return GRPC_ERROR_NONE only if all listeners were - added. */ -static grpc_error *add_all_local_addrs_to_server(grpc_tcp_server *s, - unsigned port_index, - int requested_port, - int *out_port) { -#ifndef GRPC_HAVE_IFADDRS - return GRPC_ERROR_CREATE("no ifaddrs available"); -#else /* ifndef GRPC_HAVE_IFADDRS */ - struct ifaddrs *ifa = NULL; - struct ifaddrs *ifa_it; - unsigned fd_index = 0; - grpc_tcp_listener *sp = NULL; - grpc_error *err = GRPC_ERROR_NONE; - if (requested_port == 0) { - /* Note: There could be a race where some local addrs can listen on the - selected port and some can't. The sane way to handle this would be to - retry by recreating the whole grpc_tcp_server. Backing out individual - listeners and orphaning the FDs looks like too much trouble. */ - if ((err = get_unused_port(&requested_port)) != GRPC_ERROR_NONE) { - return err; - } else if (requested_port <= 0) { - return GRPC_ERROR_CREATE("Bad get_unused_port()"); - } - gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port); - } - if (getifaddrs(&ifa) != 0 || ifa == NULL) { - return GRPC_OS_ERROR(errno, "getifaddrs"); - } - for (ifa_it = ifa; ifa_it != NULL; ifa_it = ifa_it->ifa_next) { - grpc_resolved_address addr; - char *addr_str = NULL; - grpc_dualstack_mode dsmode; - grpc_tcp_listener *new_sp = NULL; - const char *ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : ""); - if (ifa_it->ifa_addr == NULL) { - continue; - } else if (ifa_it->ifa_addr->sa_family == AF_INET) { - addr.len = sizeof(struct sockaddr_in); - } else if (ifa_it->ifa_addr->sa_family == AF_INET6) { - addr.len = sizeof(struct sockaddr_in6); - } else { - continue; - } - memcpy(addr.addr, ifa_it->ifa_addr, addr.len); - if (!grpc_sockaddr_set_port(&addr, requested_port)) { - /* Should never happen, because we check sa_family above. */ - err = GRPC_ERROR_CREATE("Failed to set port"); - break; - } - if (grpc_sockaddr_to_string(&addr_str, &addr, 0) < 0) { - addr_str = gpr_strdup(""); - } - gpr_log(GPR_DEBUG, - "Adding local addr from interface %s flags 0x%x to server: %s", - ifa_name, ifa_it->ifa_flags, addr_str); - /* We could have multiple interfaces with the same address (e.g., bonding), - so look for duplicates. */ - if (find_listener_with_addr(s, &addr) != NULL) { - gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s", addr_str, - ifa_name); - gpr_free(addr_str); - continue; - } - if ((err = add_addr_to_server(s, &addr, port_index, fd_index, &dsmode, - &new_sp)) != GRPC_ERROR_NONE) { - char *err_str = NULL; - grpc_error *root_err; - if (gpr_asprintf(&err_str, "Failed to add listener: %s", addr_str) < 0) { - err_str = gpr_strdup("Failed to add listener"); - } - root_err = GRPC_ERROR_CREATE(err_str); - gpr_free(err_str); - gpr_free(addr_str); - err = grpc_error_add_child(root_err, err); - break; - } else { - GPR_ASSERT(requested_port == new_sp->port); - ++fd_index; - if (sp != NULL) { - new_sp->is_sibling = 1; - sp->sibling = new_sp; - } - sp = new_sp; - } - gpr_free(addr_str); - } - freeifaddrs(ifa); - if (err != GRPC_ERROR_NONE) { - return err; - } else if (sp == NULL) { - return GRPC_ERROR_CREATE("No local addresses"); - } else { - *out_port = sp->port; - return GRPC_ERROR_NONE; - } -#endif /* ifndef GRPC_HAVE_IFADDRS */ -} - /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, unsigned port_index, @@ -709,17 +551,15 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, grpc_error *v4_err = GRPC_ERROR_NONE; *out_port = -1; -#ifdef GRPC_HAVE_IFADDRS - if (s->expand_wildcard_addrs) { - return add_all_local_addrs_to_server(s, port_index, requested_port, - out_port); + if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) { + return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port, + out_port); } -#endif /* GRPC_HAVE_IFADDRS */ grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6); /* Try listening on IPv6 first. */ - if ((v6_err = add_addr_to_server(s, &wild6, port_index, fd_index, &dsmode, - &sp)) == GRPC_ERROR_NONE) { + if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index, + &dsmode, &sp)) == GRPC_ERROR_NONE) { ++fd_index; requested_port = *out_port = sp->port; if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) { @@ -728,8 +568,8 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, } /* If we got a v6-only socket or nothing, try adding 0.0.0.0. */ grpc_sockaddr_set_port(&wild4, requested_port); - if ((v4_err = add_addr_to_server(s, &wild4, port_index, fd_index, &dsmode, - &sp2)) == GRPC_ERROR_NONE) { + if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index, + &dsmode, &sp2)) == GRPC_ERROR_NONE) { *out_port = sp2->port; if (sp != NULL) { sp2->is_sibling = 1; @@ -839,7 +679,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { addr = &addr6_v4mapped; } - if ((err = add_addr_to_server(s, addr, port_index, 0, &dsmode, &sp)) == + if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) == GRPC_ERROR_NONE) { *out_port = sp->port; } diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h new file mode 100644 index 0000000000..d39f9085fd --- /dev/null +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -0,0 +1,89 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H +#define GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H + +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" +#include "src/core/lib/iomgr/tcp_server.h" + +/* one listening port */ +typedef struct grpc_tcp_listener { + int fd; + grpc_fd *emfd; + grpc_tcp_server *server; + grpc_resolved_address addr; + int port; + unsigned port_index; + unsigned fd_index; + grpc_closure read_closure; + grpc_closure destroyed_closure; + struct grpc_tcp_listener *next; + /* sibling is a linked list of all listeners for a given port. add_port and + clone_port place all new listeners in the same sibling list. A member of + the 'sibling' list is also a member of the 'next' list. The head of each + sibling list has is_sibling==0, and subsequent members of sibling lists + have is_sibling==1. is_sibling allows separate sibling lists to be + identified while iterating through 'next'. */ + struct grpc_tcp_listener *sibling; + int is_sibling; +} grpc_tcp_listener; + +/* If successful, add a listener to \a s for \a addr, set \a dsmode for the + socket, and return the \a listener. */ +grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s, + const grpc_resolved_address *addr, + unsigned port_index, unsigned fd_index, + grpc_dualstack_mode *dsmode, + grpc_tcp_listener **listener); + +/* Return the listener in \a s with address \a addr or NULL. */ +grpc_tcp_listener *grpc_tcp_server_find_listener_with_addr( + grpc_tcp_server *s, grpc_resolved_address *addr); + +/* Get all addresses assigned to network interfaces on the machine and create a + listener for each. requested_port is the port to use for every listener, or 0 + to select one random port that will be used for every listener. Set *out_port + to the port selected. Return GRPC_ERROR_NONE only if all listeners were + added. */ +grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, + unsigned port_index, + int requested_port, + int *out_port); + +/* Ruturn true if the platform supports ifaddrs */ +bool grpc_tcp_server_have_ifaddrs(void); + +#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c new file mode 100644 index 0000000000..05eb96f73e --- /dev/null +++ b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c @@ -0,0 +1,178 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_HAVE_IFADDRS + +#include "src/core/lib/iomgr/tcp_server_utils_posix.h" + +#include +#include +#include +#include + +#include +#include +#include + +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" + +/* Bind to "::" to get a port number not used by any address. */ +static grpc_error *get_unused_port(int *port) { + grpc_resolved_address wild; + grpc_sockaddr_make_wildcard6(0, &wild); + grpc_dualstack_mode dsmode; + int fd; + grpc_error *err = + grpc_create_dualstack_socket(&wild, SOCK_STREAM, 0, &dsmode, &fd); + if (err != GRPC_ERROR_NONE) { + return err; + } + if (dsmode == GRPC_DSMODE_IPV4) { + grpc_sockaddr_make_wildcard4(0, &wild); + } + if (bind(fd, (const struct sockaddr *)wild.addr, (socklen_t)wild.len) != 0) { + err = GRPC_OS_ERROR(errno, "bind"); + close(fd); + return err; + } + if (getsockname(fd, (struct sockaddr *)wild.addr, (socklen_t *)&wild.len) != + 0) { + err = GRPC_OS_ERROR(errno, "getsockname"); + close(fd); + return err; + } + close(fd); + *port = grpc_sockaddr_get_port(&wild); + return *port <= 0 ? GRPC_ERROR_CREATE("Bad port") : GRPC_ERROR_NONE; +} + +grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, + unsigned port_index, + int requested_port, + int *out_port) { + struct ifaddrs *ifa = NULL; + struct ifaddrs *ifa_it; + unsigned fd_index = 0; + grpc_tcp_listener *sp = NULL; + grpc_error *err = GRPC_ERROR_NONE; + if (requested_port == 0) { + /* Note: There could be a race where some local addrs can listen on the + selected port and some can't. The sane way to handle this would be to + retry by recreating the whole grpc_tcp_server. Backing out individual + listeners and orphaning the FDs looks like too much trouble. */ + if ((err = get_unused_port(&requested_port)) != GRPC_ERROR_NONE) { + return err; + } else if (requested_port <= 0) { + return GRPC_ERROR_CREATE("Bad get_unused_port()"); + } + gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port); + } + if (getifaddrs(&ifa) != 0 || ifa == NULL) { + return GRPC_OS_ERROR(errno, "getifaddrs"); + } + for (ifa_it = ifa; ifa_it != NULL; ifa_it = ifa_it->ifa_next) { + grpc_resolved_address addr; + char *addr_str = NULL; + grpc_dualstack_mode dsmode; + grpc_tcp_listener *new_sp = NULL; + const char *ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : ""); + if (ifa_it->ifa_addr == NULL) { + continue; + } else if (ifa_it->ifa_addr->sa_family == AF_INET) { + addr.len = sizeof(struct sockaddr_in); + } else if (ifa_it->ifa_addr->sa_family == AF_INET6) { + addr.len = sizeof(struct sockaddr_in6); + } else { + continue; + } + memcpy(addr.addr, ifa_it->ifa_addr, addr.len); + if (!grpc_sockaddr_set_port(&addr, requested_port)) { + /* Should never happen, because we check sa_family above. */ + err = GRPC_ERROR_CREATE("Failed to set port"); + break; + } + if (grpc_sockaddr_to_string(&addr_str, &addr, 0) < 0) { + addr_str = gpr_strdup(""); + } + gpr_log(GPR_DEBUG, + "Adding local addr from interface %s flags 0x%x to server: %s", + ifa_name, ifa_it->ifa_flags, addr_str); + /* We could have multiple interfaces with the same address (e.g., bonding), + so look for duplicates. */ + if (grpc_tcp_server_find_listener_with_addr(s, &addr) != NULL) { + gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s", addr_str, + ifa_name); + gpr_free(addr_str); + continue; + } + if ((err = grpc_tcp_server_add_addr(s, &addr, port_index, fd_index, &dsmode, + &new_sp)) != GRPC_ERROR_NONE) { + char *err_str = NULL; + grpc_error *root_err; + if (gpr_asprintf(&err_str, "Failed to add listener: %s", addr_str) < 0) { + err_str = gpr_strdup("Failed to add listener"); + } + root_err = GRPC_ERROR_CREATE(err_str); + gpr_free(err_str); + gpr_free(addr_str); + err = grpc_error_add_child(root_err, err); + break; + } else { + GPR_ASSERT(requested_port == new_sp->port); + ++fd_index; + if (sp != NULL) { + new_sp->is_sibling = 1; + sp->sibling = new_sp; + } + sp = new_sp; + } + gpr_free(addr_str); + } + freeifaddrs(ifa); + if (err != GRPC_ERROR_NONE) { + return err; + } else if (sp == NULL) { + return GRPC_ERROR_CREATE("No local addresses"); + } else { + *out_port = sp->port; + return GRPC_ERROR_NONE; + } +} + +bool grpc_tcp_server_have_ifaddrs(void) { return true; } + +#endif /* GRPC_HAVE_IFADDRS */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c new file mode 100644 index 0000000000..fb21d62d05 --- /dev/null +++ b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c @@ -0,0 +1,49 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifndef GRPC_HAVE_IFADDRS + +#include "src/core/lib/iomgr/tcp_server_utils_posix.h" + +grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, + unsigned port_index, + int requested_port, + int *out_port) { + return GRPC_ERROR_CREATE("no ifaddrs available"); +} + +bool grpc_tcp_server_have_ifaddrs(void) { return false; } + +#endif /* ifndef GRPC_HAVE_IFADDRS */ diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index a9f20e6d2a..00b2b8f902 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -134,6 +134,8 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/tcp_client_windows.c', 'src/core/lib/iomgr/tcp_posix.c', 'src/core/lib/iomgr/tcp_server_posix.c', + 'src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c', + 'src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c', 'src/core/lib/iomgr/tcp_server_uv.c', 'src/core/lib/iomgr/tcp_server_windows.c', 'src/core/lib/iomgr/tcp_uv.c', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 131a013451..f9e4775b73 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1130,6 +1130,9 @@ src/core/lib/iomgr/tcp_posix.c \ src/core/lib/iomgr/tcp_posix.h \ src/core/lib/iomgr/tcp_server.h \ src/core/lib/iomgr/tcp_server_posix.c \ +src/core/lib/iomgr/tcp_server_utils_posix.h \ +src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ +src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ src/core/lib/iomgr/tcp_server_windows.c \ src/core/lib/iomgr/tcp_uv.c \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 767c2b2e36..38fe87ac46 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7456,6 +7456,7 @@ "src/core/lib/iomgr/tcp_client_posix.h", "src/core/lib/iomgr/tcp_posix.h", "src/core/lib/iomgr/tcp_server.h", + "src/core/lib/iomgr/tcp_server_utils_posix.h", "src/core/lib/iomgr/tcp_uv.h", "src/core/lib/iomgr/tcp_windows.h", "src/core/lib/iomgr/time_averaged_stats.h", @@ -7636,6 +7637,9 @@ "src/core/lib/iomgr/tcp_posix.h", "src/core/lib/iomgr/tcp_server.h", "src/core/lib/iomgr/tcp_server_posix.c", + "src/core/lib/iomgr/tcp_server_utils_posix.h", + "src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c", + "src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c", "src/core/lib/iomgr/tcp_server_uv.c", "src/core/lib/iomgr/tcp_server_windows.c", "src/core/lib/iomgr/tcp_uv.c", diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index fde60be3e2..7b48dfbf59 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -357,6 +357,7 @@ + @@ -618,6 +619,10 @@ + + + + diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 8edbbc22be..8ba8a4f8fb 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -181,6 +181,12 @@ src\core\lib\iomgr + + src\core\lib\iomgr + + + src\core\lib\iomgr + src\core\lib\iomgr @@ -944,6 +950,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj index e7c9fb71f3..d078db1c3c 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj @@ -252,6 +252,7 @@ + @@ -461,6 +462,10 @@ + + + + diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters index 3d36948aae..9e2d817143 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters @@ -238,6 +238,12 @@ src\core\lib\iomgr + + src\core\lib\iomgr + + + src\core\lib\iomgr + src\core\lib\iomgr @@ -728,6 +734,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 22f4740b8f..b6f10b3d7c 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -347,6 +347,7 @@ + @@ -585,6 +586,10 @@ + + + + diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 5021cb47d8..03662d4044 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -184,6 +184,12 @@ src\core\lib\iomgr + + src\core\lib\iomgr + + + src\core\lib\iomgr + src\core\lib\iomgr @@ -854,6 +860,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr -- cgit v1.2.3 From a6bec8fc3159b54fc447a029b6260b3d6c478e6b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 14 Mar 2017 08:26:04 -0700 Subject: Auto-call-arena-sizing --- src/core/lib/support/arena.c | 1 + src/core/lib/surface/call.c | 6 ++++-- src/core/lib/surface/channel.c | 32 ++++++++++++++++++++++++++++++++ src/core/lib/surface/channel.h | 3 +++ 4 files changed, 40 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c index 6610acd3a0..8ce29aaab3 100644 --- a/src/core/lib/support/arena.c +++ b/src/core/lib/support/arena.c @@ -53,6 +53,7 @@ struct gpr_arena { gpr_arena *gpr_arena_create(size_t initial_size) { initial_size = ROUND_UP_TO_ALIGNMENT_SIZE(initial_size); + gpr_log(GPR_DEBUG, "arena create: %" PRIdPTR, initial_size); gpr_arena *a = gpr_zalloc(sizeof(gpr_arena) + initial_size); a->initial_zone.size_end = initial_size; return a; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 84c401db35..00719be9ab 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -277,7 +277,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, grpc_channel_get_channel_stack(args->channel); grpc_call *call; GPR_TIMER_BEGIN("grpc_call_create", 0); - gpr_arena *arena = gpr_arena_create(8192); + gpr_arena *arena = + gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel)); call = gpr_arena_alloc(arena, sizeof(grpc_call) + channel_stack->call_stack_size); call->arena = arena; @@ -436,8 +437,9 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { static void release_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_call *c = call; + grpc_channel_update_call_size_estimate(c->channel, + gpr_arena_destroy(c->arena)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); - gpr_arena_destroy(c->arena); } static void set_status_value_directly(grpc_status_code status, void *dest); diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index d6acd392c1..56beb7d96e 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -68,6 +68,8 @@ struct grpc_channel { grpc_compression_options compression_options; grpc_mdelem default_authority; + gpr_atm call_size_estimate; + gpr_mu registered_call_mu; registered_call *registered_calls; @@ -115,6 +117,10 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; + gpr_atm_no_barrier_store( + &channel->call_size_estimate, + CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size); + grpc_compression_options_init(&channel->compression_options); for (size_t i = 0; i < args->num_args; i++) { if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) { @@ -177,6 +183,32 @@ done: return channel; } +size_t grpc_channel_get_call_size_estimate(grpc_channel *channel) { +#define ROUND_UP_SIZE 256 + return ((size_t)gpr_atm_no_barrier_load(&channel->call_size_estimate) + + ROUND_UP_SIZE) & + ~(size_t)(ROUND_UP_SIZE - 1); +} + +void grpc_channel_update_call_size_estimate(grpc_channel *channel, + size_t size) { + size_t cur = (size_t)gpr_atm_no_barrier_load(&channel->call_size_estimate); + if (cur < size) { + /* size grew: update estimate */ + gpr_atm_no_barrier_cas(&channel->call_size_estimate, (gpr_atm)cur, + (gpr_atm)size); + /* if we lose: never mind, something else will likely update soon enough */ + } else if (cur == size) { + /* no change: holding pattern */ + } else if (cur > 0) { + /* size shrank: decrease estimate */ + gpr_atm_no_barrier_cas( + &channel->call_size_estimate, (gpr_atm)cur, + (gpr_atm)(GPR_MIN(cur - 1, (255 * cur + size) / 256))); + /* if we lose: never mind, something else will likely update soon enough */ + } +} + char *grpc_channel_get_target(grpc_channel *channel) { GRPC_API_TRACE("grpc_channel_get_target(channel=%p)", 1, (channel)); return gpr_strdup(channel->target); diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 3a441d7add..c4aebd8b9b 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -66,6 +66,9 @@ grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_exec_ctx *exec_ctx, grpc_channel *channel, int status_code); +size_t grpc_channel_get_call_size_estimate(grpc_channel *channel); +void grpc_channel_update_call_size_estimate(grpc_channel *channel, size_t size); + #ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel, -- cgit v1.2.3 From fb9d11204388b524b942950b47fef24116eab243 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 14 Mar 2017 09:26:27 -0700 Subject: Review comments --- src/core/ext/client_channel/client_channel.c | 4 ++-- src/core/ext/client_channel/subchannel.c | 14 +++++++------- .../ext/transport/chttp2/transport/incoming_metadata.c | 2 +- src/core/lib/support/arena.c | 1 - test/core/channel/channel_stack_test.c | 2 +- 5 files changed, 11 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index f6550292a0..ba842c7916 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -755,7 +755,7 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { /* Create call on subchannel. */ grpc_subchannel_call *subchannel_call = NULL; - grpc_connected_subchannel_call_args call_args = { + const grpc_connected_subchannel_call_args call_args = { .pollent = calld->pollent, .path = calld->path, .start_time = calld->call_start_time, @@ -988,7 +988,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; - grpc_connected_subchannel_call_args call_args = { + const grpc_connected_subchannel_call_args call_args = { .pollent = calld->pollent, .path = calld->path, .start_time = calld->call_start_time, diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index e4b669c582..cef08a04ea 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -778,13 +778,13 @@ grpc_error *grpc_connected_subchannel_create_call( args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. - grpc_call_element_args call_args = {.call_stack = callstk, - .server_transport_data = NULL, - .context = NULL, - .path = args->path, - .start_time = args->start_time, - .deadline = args->deadline, - .arena = args->arena}; + const grpc_call_element_args call_args = {.call_stack = callstk, + .server_transport_data = NULL, + .context = NULL, + .path = args->path, + .start_time = args->start_time, + .deadline = args->deadline, + .arena = args->arena}; grpc_error *error = grpc_call_stack_init( exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args); if (error != GRPC_ERROR_NONE) { diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index ab900fdff2..da0a34d32a 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -42,8 +42,8 @@ void grpc_chttp2_incoming_metadata_buffer_init( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) { - grpc_metadata_batch_init(&buffer->batch); buffer->arena = arena; + grpc_metadata_batch_init(&buffer->batch); buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c index 8ce29aaab3..6610acd3a0 100644 --- a/src/core/lib/support/arena.c +++ b/src/core/lib/support/arena.c @@ -53,7 +53,6 @@ struct gpr_arena { gpr_arena *gpr_arena_create(size_t initial_size) { initial_size = ROUND_UP_TO_ALIGNMENT_SIZE(initial_size); - gpr_log(GPR_DEBUG, "arena create: %" PRIdPTR, initial_size); gpr_arena *a = gpr_zalloc(sizeof(gpr_arena) + initial_size); a->initial_zone.size_end = initial_size; return a; diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index 0c424c1ec4..af551c4928 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -139,7 +139,7 @@ static void test_create_channel_stack(void) { GPR_ASSERT(*channel_data == 0); call_stack = gpr_malloc(channel_stack->call_stack_size); - grpc_call_element_args args = { + const grpc_call_element_args args = { .call_stack = call_stack, .server_transport_data = NULL, .context = NULL, -- cgit v1.2.3 From 4564b8cde72f74b88ec2ddf4943768e809dff94b Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Tue, 14 Mar 2017 01:21:57 -0700 Subject: Copy value at SetPointerWithVtable --- src/cpp/common/channel_arguments.cc | 4 +++- test/cpp/common/channel_arguments_test.cc | 7 ------- 2 files changed, 3 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc index 34fd4491d0..eddcacc332 100644 --- a/src/cpp/common/channel_arguments.cc +++ b/src/cpp/common/channel_arguments.cc @@ -111,8 +111,10 @@ void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) { for (auto it = args_.begin(); it != args_.end(); ++it) { if (it->type == mutator_arg.type && grpc::string(it->key) == grpc::string(mutator_arg.key)) { + GPR_ASSERT(!replaced); it->value.pointer.vtable->destroy(&exec_ctx, it->value.pointer.p); it->value.pointer = mutator_arg.value.pointer; + replaced = true; } } grpc_exec_ctx_finish(&exec_ctx); @@ -195,7 +197,7 @@ void ChannelArguments::SetPointerWithVtable( arg.type = GRPC_ARG_POINTER; strings_.push_back(key); arg.key = const_cast(strings_.back().c_str()); - arg.value.pointer.p = value; + arg.value.pointer.p = vtable->copy(value); arg.value.pointer.vtable = vtable; args_.push_back(arg); } diff --git a/test/cpp/common/channel_arguments_test.cc b/test/cpp/common/channel_arguments_test.cc index 190d32ce06..9bcc9f99f6 100644 --- a/test/cpp/common/channel_arguments_test.cc +++ b/test/cpp/common/channel_arguments_test.cc @@ -230,13 +230,6 @@ TEST_F(ChannelArgumentsTest, SetSocketMutator) { EXPECT_TRUE(HasArg(arg1)); // arg0 is replaced by arg1 EXPECT_FALSE(HasArg(arg0)); - - // arg0 is destroyed by grpc_socket_mutator_to_arg(mutator1) - { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - arg1.value.pointer.vtable->destroy(&exec_ctx, arg1.value.pointer.p); - grpc_exec_ctx_finish(&exec_ctx); - } } TEST_F(ChannelArgumentsTest, SetUserAgentPrefix) { -- cgit v1.2.3 From 68745bbf8a0193a8a2c2691281365a1e9d488863 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Tue, 14 Mar 2017 17:51:07 -0700 Subject: Resolve dependency issues --- BUILD | 1 + CMakeLists.txt | 5 + Makefile | 5 + binding.gyp | 1 + build.yaml | 1 + config.m4 | 1 + gRPC-Core.podspec | 1 + grpc.gemspec | 1 + package.xml | 1 + src/core/lib/iomgr/tcp_server_posix.c | 229 +-------------------- src/core/lib/iomgr/tcp_server_utils_posix.h | 53 ++++- src/core/lib/iomgr/tcp_server_utils_posix_common.c | 220 ++++++++++++++++++++ .../lib/iomgr/tcp_server_utils_posix_ifaddrs.c | 19 +- .../lib/iomgr/tcp_server_utils_posix_noifaddrs.c | 4 +- src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.core.internal | 1 + tools/run_tests/generated/sources_and_headers.json | 1 + vsprojects/vcxproj/grpc/grpc.vcxproj | 2 + vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 3 + .../vcxproj/grpc_test_util/grpc_test_util.vcxproj | 2 + .../grpc_test_util/grpc_test_util.vcxproj.filters | 3 + .../vcxproj/grpc_unsecure/grpc_unsecure.vcxproj | 2 + .../grpc_unsecure/grpc_unsecure.vcxproj.filters | 3 + 23 files changed, 325 insertions(+), 235 deletions(-) create mode 100644 src/core/lib/iomgr/tcp_server_utils_posix_common.c (limited to 'src') diff --git a/BUILD b/BUILD index dc6431484e..371ab200d2 100644 --- a/BUILD +++ b/BUILD @@ -481,6 +481,7 @@ grpc_cc_library( "src/core/lib/iomgr/tcp_client_windows.c", "src/core/lib/iomgr/tcp_posix.c", "src/core/lib/iomgr/tcp_server_posix.c", + "src/core/lib/iomgr/tcp_server_utils_posix_common.c", "src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c", "src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c", "src/core/lib/iomgr/tcp_server_uv.c", diff --git a/CMakeLists.txt b/CMakeLists.txt index cfc4555834..c7001aa721 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -914,6 +914,7 @@ add_library(grpc src/core/lib/iomgr/tcp_client_windows.c src/core/lib/iomgr/tcp_posix.c src/core/lib/iomgr/tcp_server_posix.c + src/core/lib/iomgr/tcp_server_utils_posix_common.c src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c src/core/lib/iomgr/tcp_server_uv.c @@ -1225,6 +1226,7 @@ add_library(grpc_cronet src/core/lib/iomgr/tcp_client_windows.c src/core/lib/iomgr/tcp_posix.c src/core/lib/iomgr/tcp_server_posix.c + src/core/lib/iomgr/tcp_server_utils_posix_common.c src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c src/core/lib/iomgr/tcp_server_uv.c @@ -1527,6 +1529,7 @@ add_library(grpc_test_util src/core/lib/iomgr/tcp_client_windows.c src/core/lib/iomgr/tcp_posix.c src/core/lib/iomgr/tcp_server_posix.c + src/core/lib/iomgr/tcp_server_utils_posix_common.c src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c src/core/lib/iomgr/tcp_server_uv.c @@ -1775,6 +1778,7 @@ add_library(grpc_unsecure src/core/lib/iomgr/tcp_client_windows.c src/core/lib/iomgr/tcp_posix.c src/core/lib/iomgr/tcp_server_posix.c + src/core/lib/iomgr/tcp_server_utils_posix_common.c src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c src/core/lib/iomgr/tcp_server_uv.c @@ -2384,6 +2388,7 @@ add_library(grpc++_cronet src/core/lib/iomgr/tcp_client_windows.c src/core/lib/iomgr/tcp_posix.c src/core/lib/iomgr/tcp_server_posix.c + src/core/lib/iomgr/tcp_server_utils_posix_common.c src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c src/core/lib/iomgr/tcp_server_uv.c diff --git a/Makefile b/Makefile index 14c5a36c95..e31b7514e1 100644 --- a/Makefile +++ b/Makefile @@ -2799,6 +2799,7 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/tcp_client_windows.c \ src/core/lib/iomgr/tcp_posix.c \ src/core/lib/iomgr/tcp_server_posix.c \ + src/core/lib/iomgr/tcp_server_utils_posix_common.c \ src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ @@ -3113,6 +3114,7 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/iomgr/tcp_client_windows.c \ src/core/lib/iomgr/tcp_posix.c \ src/core/lib/iomgr/tcp_server_posix.c \ + src/core/lib/iomgr/tcp_server_utils_posix_common.c \ src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ @@ -3418,6 +3420,7 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/iomgr/tcp_client_windows.c \ src/core/lib/iomgr/tcp_posix.c \ src/core/lib/iomgr/tcp_server_posix.c \ + src/core/lib/iomgr/tcp_server_utils_posix_common.c \ src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ @@ -3646,6 +3649,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/tcp_client_windows.c \ src/core/lib/iomgr/tcp_posix.c \ src/core/lib/iomgr/tcp_server_posix.c \ + src/core/lib/iomgr/tcp_server_utils_posix_common.c \ src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ @@ -4257,6 +4261,7 @@ LIBGRPC++_CRONET_SRC = \ src/core/lib/iomgr/tcp_client_windows.c \ src/core/lib/iomgr/tcp_posix.c \ src/core/lib/iomgr/tcp_server_posix.c \ + src/core/lib/iomgr/tcp_server_utils_posix_common.c \ src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ diff --git a/binding.gyp b/binding.gyp index ec21a97806..96843c0d6b 100644 --- a/binding.gyp +++ b/binding.gyp @@ -667,6 +667,7 @@ 'src/core/lib/iomgr/tcp_client_windows.c', 'src/core/lib/iomgr/tcp_posix.c', 'src/core/lib/iomgr/tcp_server_posix.c', + 'src/core/lib/iomgr/tcp_server_utils_posix_common.c', 'src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c', 'src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c', 'src/core/lib/iomgr/tcp_server_uv.c', diff --git a/build.yaml b/build.yaml index 36817a5506..75bacbde63 100644 --- a/build.yaml +++ b/build.yaml @@ -338,6 +338,7 @@ filegroups: - src/core/lib/iomgr/tcp_client_windows.c - src/core/lib/iomgr/tcp_posix.c - src/core/lib/iomgr/tcp_server_posix.c + - src/core/lib/iomgr/tcp_server_utils_posix_common.c - src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c - src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c - src/core/lib/iomgr/tcp_server_uv.c diff --git a/config.m4 b/config.m4 index 181e44c52c..6d6f9419cd 100644 --- a/config.m4 +++ b/config.m4 @@ -140,6 +140,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/tcp_client_windows.c \ src/core/lib/iomgr/tcp_posix.c \ src/core/lib/iomgr/tcp_server_posix.c \ + src/core/lib/iomgr/tcp_server_utils_posix_common.c \ src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 1d002247d4..16c17c2bf6 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -509,6 +509,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/tcp_client_windows.c', 'src/core/lib/iomgr/tcp_posix.c', 'src/core/lib/iomgr/tcp_server_posix.c', + 'src/core/lib/iomgr/tcp_server_utils_posix_common.c', 'src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c', 'src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c', 'src/core/lib/iomgr/tcp_server_uv.c', diff --git a/grpc.gemspec b/grpc.gemspec index edc08cb9b9..da9535ff46 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -426,6 +426,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/tcp_client_windows.c ) s.files += %w( src/core/lib/iomgr/tcp_posix.c ) s.files += %w( src/core/lib/iomgr/tcp_server_posix.c ) + s.files += %w( src/core/lib/iomgr/tcp_server_utils_posix_common.c ) s.files += %w( src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c ) s.files += %w( src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c ) s.files += %w( src/core/lib/iomgr/tcp_server_uv.c ) diff --git a/package.xml b/package.xml index 21eeee4d3e..aeeb0d5986 100644 --- a/package.xml +++ b/package.xml @@ -435,6 +435,7 @@ + diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 1e9e854d6b..b3644518f5 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -44,10 +44,8 @@ #include #include -#include #include #include -#include #include #include #include @@ -70,56 +68,6 @@ #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/string.h" -#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 - -static gpr_once s_init_max_accept_queue_size; -static int s_max_accept_queue_size; - -/* the overall server */ -struct grpc_tcp_server { - gpr_refcount refs; - /* Called whenever accept() succeeds on a server port. */ - grpc_tcp_server_cb on_accept_cb; - void *on_accept_cb_arg; - - gpr_mu mu; - - /* active port count: how many ports are actually still listening */ - size_t active_ports; - /* destroyed port count: how many ports are completely destroyed */ - size_t destroyed_ports; - - /* is this server shutting down? */ - bool shutdown; - /* have listeners been shutdown? */ - bool shutdown_listeners; - /* use SO_REUSEPORT */ - bool so_reuseport; - /* expand wildcard addresses to a list of all local addresses */ - bool expand_wildcard_addrs; - - /* linked list of server ports */ - grpc_tcp_listener *head; - grpc_tcp_listener *tail; - unsigned nports; - - /* List of closures passed to shutdown_starting_add(). */ - grpc_closure_list shutdown_starting; - - /* shutdown callback */ - grpc_closure *shutdown_complete; - - /* all pollsets interested in new connections */ - grpc_pollset **pollsets; - /* number of pollsets in the pollsets array */ - size_t pollset_count; - - /* next pollset to assign a channel to */ - gpr_atm next_pollset_to_assign; - - grpc_resource_quota *resource_quota; -}; - static gpr_once check_init = GPR_ONCE_INIT; static bool has_so_reuseport = false; @@ -278,99 +226,6 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } } -/* get max listen queue size on linux */ -static void init_max_accept_queue_size(void) { - int n = SOMAXCONN; - char buf[64]; - FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r"); - if (fp == NULL) { - /* 2.4 kernel. */ - s_max_accept_queue_size = SOMAXCONN; - return; - } - if (fgets(buf, sizeof buf, fp)) { - char *end; - long i = strtol(buf, &end, 10); - if (i > 0 && i <= INT_MAX && end && *end == 0) { - n = (int)i; - } - } - fclose(fp); - s_max_accept_queue_size = n; - - if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) { - gpr_log(GPR_INFO, - "Suspiciously small accept queue (%d) will probably lead to " - "connection drops", - s_max_accept_queue_size); - } -} - -static int get_max_accept_queue_size(void) { - gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size); - return s_max_accept_queue_size; -} - -/* Prepare a recently-created socket for listening. */ -static grpc_error *prepare_socket(int fd, const grpc_resolved_address *addr, - bool so_reuseport, int *port) { - grpc_resolved_address sockname_temp; - grpc_error *err = GRPC_ERROR_NONE; - - GPR_ASSERT(fd >= 0); - - if (so_reuseport && !grpc_is_unix_socket(addr)) { - err = grpc_set_socket_reuse_port(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - } - - err = grpc_set_socket_nonblocking(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - err = grpc_set_socket_cloexec(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - if (!grpc_is_unix_socket(addr)) { - err = grpc_set_socket_low_latency(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - err = grpc_set_socket_reuse_addr(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - } - err = grpc_set_socket_no_sigpipe_if_possible(fd); - if (err != GRPC_ERROR_NONE) goto error; - - GPR_ASSERT(addr->len < ~(socklen_t)0); - if (bind(fd, (struct sockaddr *)addr->addr, (socklen_t)addr->len) < 0) { - err = GRPC_OS_ERROR(errno, "bind"); - goto error; - } - - if (listen(fd, get_max_accept_queue_size()) < 0) { - err = GRPC_OS_ERROR(errno, "listen"); - goto error; - } - - sockname_temp.len = sizeof(struct sockaddr_storage); - - if (getsockname(fd, (struct sockaddr *)sockname_temp.addr, - (socklen_t *)&sockname_temp.len) < 0) { - err = GRPC_OS_ERROR(errno, "getsockname"); - goto error; - } - - *port = grpc_sockaddr_get_port(&sockname_temp); - return GRPC_ERROR_NONE; - -error: - GPR_ASSERT(err != GRPC_ERROR_NONE); - if (fd >= 0) { - close(fd); - } - grpc_error *ret = grpc_error_set_int( - GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1), - GRPC_ERROR_INT_FD, fd); - GRPC_ERROR_UNREF(err); - return ret; -} - /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_tcp_listener *sp = arg; @@ -454,88 +309,6 @@ error: } } -static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, - const grpc_resolved_address *addr, - unsigned port_index, unsigned fd_index, - grpc_tcp_listener **listener) { - grpc_tcp_listener *sp = NULL; - int port = -1; - char *addr_str; - char *name; - - grpc_error *err = prepare_socket(fd, addr, s->so_reuseport, &port); - if (err == GRPC_ERROR_NONE) { - GPR_ASSERT(port > 0); - grpc_sockaddr_to_string(&addr_str, addr, 1); - gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); - gpr_mu_lock(&s->mu); - s->nports++; - GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - sp = gpr_malloc(sizeof(grpc_tcp_listener)); - sp->next = NULL; - if (s->head == NULL) { - s->head = sp; - } else { - s->tail->next = sp; - } - s->tail = sp; - sp->server = s; - sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); - memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); - sp->port = port; - sp->port_index = port_index; - sp->fd_index = fd_index; - sp->is_sibling = 0; - sp->sibling = NULL; - GPR_ASSERT(sp->emfd); - gpr_mu_unlock(&s->mu); - gpr_free(addr_str); - gpr_free(name); - } - - *listener = sp; - return err; -} - -/* If successful, add a listener to s for addr, set *dsmode for the socket, and - return the *listener. */ -grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s, - const grpc_resolved_address *addr, - unsigned port_index, unsigned fd_index, - grpc_dualstack_mode *dsmode, - grpc_tcp_listener **listener) { - grpc_resolved_address addr4_copy; - int fd; - grpc_error *err = - grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd); - if (err != GRPC_ERROR_NONE) { - return err; - } - if (*dsmode == GRPC_DSMODE_IPV4 && - grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { - addr = &addr4_copy; - } - return add_socket_to_server(s, fd, addr, port_index, fd_index, listener); -} - -/* Return the listener in s with address addr or NULL. */ -grpc_tcp_listener *grpc_tcp_server_find_listener_with_addr( - grpc_tcp_server *s, grpc_resolved_address *addr) { - grpc_tcp_listener *l; - gpr_mu_lock(&s->mu); - for (l = s->head; l != NULL; l = l->next) { - if (l->addr.len != addr->len) { - continue; - } - if (memcmp(l->addr.addr, addr->addr, addr->len) == 0) { - break; - } - } - gpr_mu_unlock(&s->mu); - return l; -} - /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, unsigned port_index, @@ -607,7 +380,7 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode, &fd); if (err != GRPC_ERROR_NONE) return err; - err = prepare_socket(fd, &listener->addr, true, &port); + err = grpc_tcp_server_prepare_socket(fd, &listener->addr, true, &port); if (err != GRPC_ERROR_NONE) return err; listener->server->nports++; grpc_sockaddr_to_string(&addr_str, &listener->addr, 1); diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index d39f9085fd..f5dc8532f9 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -61,6 +61,51 @@ typedef struct grpc_tcp_listener { int is_sibling; } grpc_tcp_listener; +/* the overall server */ +struct grpc_tcp_server { + gpr_refcount refs; + /* Called whenever accept() succeeds on a server port. */ + grpc_tcp_server_cb on_accept_cb; + void *on_accept_cb_arg; + + gpr_mu mu; + + /* active port count: how many ports are actually still listening */ + size_t active_ports; + /* destroyed port count: how many ports are completely destroyed */ + size_t destroyed_ports; + + /* is this server shutting down? */ + bool shutdown; + /* have listeners been shutdown? */ + bool shutdown_listeners; + /* use SO_REUSEPORT */ + bool so_reuseport; + /* expand wildcard addresses to a list of all local addresses */ + bool expand_wildcard_addrs; + + /* linked list of server ports */ + grpc_tcp_listener *head; + grpc_tcp_listener *tail; + unsigned nports; + + /* List of closures passed to shutdown_starting_add(). */ + grpc_closure_list shutdown_starting; + + /* shutdown callback */ + grpc_closure *shutdown_complete; + + /* all pollsets interested in new connections */ + grpc_pollset **pollsets; + /* number of pollsets in the pollsets array */ + size_t pollset_count; + + /* next pollset to assign a channel to */ + gpr_atm next_pollset_to_assign; + + grpc_resource_quota *resource_quota; +}; + /* If successful, add a listener to \a s for \a addr, set \a dsmode for the socket, and return the \a listener. */ grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s, @@ -69,10 +114,6 @@ grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s, grpc_dualstack_mode *dsmode, grpc_tcp_listener **listener); -/* Return the listener in \a s with address \a addr or NULL. */ -grpc_tcp_listener *grpc_tcp_server_find_listener_with_addr( - grpc_tcp_server *s, grpc_resolved_address *addr); - /* Get all addresses assigned to network interfaces on the machine and create a listener for each. requested_port is the port to use for every listener, or 0 to select one random port that will be used for every listener. Set *out_port @@ -83,6 +124,10 @@ grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, int requested_port, int *out_port); +/* Prepare a recently-created socket for listening. */ +grpc_error *grpc_tcp_server_prepare_socket(int fd, + const grpc_resolved_address *addr, + bool so_reuseport, int *port); /* Ruturn true if the platform supports ifaddrs */ bool grpc_tcp_server_have_ifaddrs(void); diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.c b/src/core/lib/iomgr/tcp_server_utils_posix_common.c new file mode 100644 index 0000000000..e45e27d5ab --- /dev/null +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.c @@ -0,0 +1,220 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_HAVE_IFADDRS + +#include "src/core/lib/iomgr/tcp_server_utils_posix.h" + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/unix_sockets_posix.h" + +#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 + +static gpr_once s_init_max_accept_queue_size; +static int s_max_accept_queue_size; + +/* get max listen queue size on linux */ +static void init_max_accept_queue_size(void) { + int n = SOMAXCONN; + char buf[64]; + FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r"); + if (fp == NULL) { + /* 2.4 kernel. */ + s_max_accept_queue_size = SOMAXCONN; + return; + } + if (fgets(buf, sizeof buf, fp)) { + char *end; + long i = strtol(buf, &end, 10); + if (i > 0 && i <= INT_MAX && end && *end == 0) { + n = (int)i; + } + } + fclose(fp); + s_max_accept_queue_size = n; + + if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) { + gpr_log(GPR_INFO, + "Suspiciously small accept queue (%d) will probably lead to " + "connection drops", + s_max_accept_queue_size); + } +} + +static int get_max_accept_queue_size(void) { + gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size); + return s_max_accept_queue_size; +} + +static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, + const grpc_resolved_address *addr, + unsigned port_index, unsigned fd_index, + grpc_tcp_listener **listener) { + grpc_tcp_listener *sp = NULL; + int port = -1; + char *addr_str; + char *name; + + grpc_error *err = + grpc_tcp_server_prepare_socket(fd, addr, s->so_reuseport, &port); + if (err == GRPC_ERROR_NONE) { + GPR_ASSERT(port > 0); + grpc_sockaddr_to_string(&addr_str, addr, 1); + gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); + gpr_mu_lock(&s->mu); + s->nports++; + GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); + sp = gpr_malloc(sizeof(grpc_tcp_listener)); + sp->next = NULL; + if (s->head == NULL) { + s->head = sp; + } else { + s->tail->next = sp; + } + s->tail = sp; + sp->server = s; + sp->fd = fd; + sp->emfd = grpc_fd_create(fd, name); + memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); + sp->port = port; + sp->port_index = port_index; + sp->fd_index = fd_index; + sp->is_sibling = 0; + sp->sibling = NULL; + GPR_ASSERT(sp->emfd); + gpr_mu_unlock(&s->mu); + gpr_free(addr_str); + gpr_free(name); + } + + *listener = sp; + return err; +} + +/* If successful, add a listener to s for addr, set *dsmode for the socket, and + return the *listener. */ +grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s, + const grpc_resolved_address *addr, + unsigned port_index, unsigned fd_index, + grpc_dualstack_mode *dsmode, + grpc_tcp_listener **listener) { + grpc_resolved_address addr4_copy; + int fd; + grpc_error *err = + grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd); + if (err != GRPC_ERROR_NONE) { + return err; + } + if (*dsmode == GRPC_DSMODE_IPV4 && + grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { + addr = &addr4_copy; + } + return add_socket_to_server(s, fd, addr, port_index, fd_index, listener); +} + +/* Prepare a recently-created socket for listening. */ +grpc_error *grpc_tcp_server_prepare_socket(int fd, + const grpc_resolved_address *addr, + bool so_reuseport, int *port) { + grpc_resolved_address sockname_temp; + grpc_error *err = GRPC_ERROR_NONE; + + GPR_ASSERT(fd >= 0); + + if (so_reuseport && !grpc_is_unix_socket(addr)) { + err = grpc_set_socket_reuse_port(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + } + + err = grpc_set_socket_nonblocking(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_cloexec(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + if (!grpc_is_unix_socket(addr)) { + err = grpc_set_socket_low_latency(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_reuse_addr(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + } + err = grpc_set_socket_no_sigpipe_if_possible(fd); + if (err != GRPC_ERROR_NONE) goto error; + + GPR_ASSERT(addr->len < ~(socklen_t)0); + if (bind(fd, (struct sockaddr *)addr->addr, (socklen_t)addr->len) < 0) { + err = GRPC_OS_ERROR(errno, "bind"); + goto error; + } + + if (listen(fd, get_max_accept_queue_size()) < 0) { + err = GRPC_OS_ERROR(errno, "listen"); + goto error; + } + + sockname_temp.len = sizeof(struct sockaddr_storage); + + if (getsockname(fd, (struct sockaddr *)sockname_temp.addr, + (socklen_t *)&sockname_temp.len) < 0) { + err = GRPC_OS_ERROR(errno, "getsockname"); + goto error; + } + + *port = grpc_sockaddr_get_port(&sockname_temp); + return GRPC_ERROR_NONE; + +error: + GPR_ASSERT(err != GRPC_ERROR_NONE); + if (fd >= 0) { + close(fd); + } + grpc_error *ret = grpc_error_set_int( + GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1), + GRPC_ERROR_INT_FD, fd); + GRPC_ERROR_UNREF(err); + return ret; +} + +#endif /* GRPC_HAVE_IFADDRS */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c index 05eb96f73e..6354a6bdc1 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c +++ b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c @@ -50,6 +50,23 @@ #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +/* Return the listener in s with address addr or NULL. */ +static grpc_tcp_listener *find_listener_with_addr(grpc_tcp_server *s, + grpc_resolved_address *addr) { + grpc_tcp_listener *l; + gpr_mu_lock(&s->mu); + for (l = s->head; l != NULL; l = l->next) { + if (l->addr.len != addr->len) { + continue; + } + if (memcmp(l->addr.addr, addr->addr, addr->len) == 0) { + break; + } + } + gpr_mu_unlock(&s->mu); + return l; +} + /* Bind to "::" to get a port number not used by any address. */ static grpc_error *get_unused_port(int *port) { grpc_resolved_address wild; @@ -133,7 +150,7 @@ grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, ifa_name, ifa_it->ifa_flags, addr_str); /* We could have multiple interfaces with the same address (e.g., bonding), so look for duplicates. */ - if (grpc_tcp_server_find_listener_with_addr(s, &addr) != NULL) { + if (find_listener_with_addr(s, &addr) != NULL) { gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s", addr_str, ifa_name); gpr_free(addr_str); diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c index fb21d62d05..95c3198be6 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c +++ b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c @@ -33,7 +33,7 @@ #include "src/core/lib/iomgr/port.h" -#ifndef GRPC_HAVE_IFADDRS +#if defined(GRPC_POSIX_SOCKET) && !defined(GRPC_HAVE_IFADDRS) #include "src/core/lib/iomgr/tcp_server_utils_posix.h" @@ -46,4 +46,4 @@ grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, bool grpc_tcp_server_have_ifaddrs(void) { return false; } -#endif /* ifndef GRPC_HAVE_IFADDRS */ +#endif /* defined(GRPC_POSIX_SOCKET) && !defined(GRPC_HAVE_IFADDRS) */ diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 00b2b8f902..ea3a51be13 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -134,6 +134,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/tcp_client_windows.c', 'src/core/lib/iomgr/tcp_posix.c', 'src/core/lib/iomgr/tcp_server_posix.c', + 'src/core/lib/iomgr/tcp_server_utils_posix_common.c', 'src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c', 'src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c', 'src/core/lib/iomgr/tcp_server_uv.c', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index f9e4775b73..a1287cd894 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1131,6 +1131,7 @@ src/core/lib/iomgr/tcp_posix.h \ src/core/lib/iomgr/tcp_server.h \ src/core/lib/iomgr/tcp_server_posix.c \ src/core/lib/iomgr/tcp_server_utils_posix.h \ +src/core/lib/iomgr/tcp_server_utils_posix_common.c \ src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c \ src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c \ src/core/lib/iomgr/tcp_server_uv.c \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 38fe87ac46..9ab9836c4b 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7638,6 +7638,7 @@ "src/core/lib/iomgr/tcp_server.h", "src/core/lib/iomgr/tcp_server_posix.c", "src/core/lib/iomgr/tcp_server_utils_posix.h", + "src/core/lib/iomgr/tcp_server_utils_posix_common.c", "src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c", "src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c", "src/core/lib/iomgr/tcp_server_uv.c", diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 7b48dfbf59..5e3b027663 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -619,6 +619,8 @@ + + diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 8ba8a4f8fb..d75ca766c0 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -181,6 +181,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj index d078db1c3c..62969e31ac 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj @@ -462,6 +462,8 @@ + + diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters index 9e2d817143..30088101f5 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters @@ -238,6 +238,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index b6f10b3d7c..11ac8bd4b2 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -586,6 +586,8 @@ + + diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 03662d4044..414e2a50b8 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -184,6 +184,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr -- cgit v1.2.3 From 51006fea710ef7e4faefae653432f55947639b76 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 15 Mar 2017 08:07:02 -0700 Subject: Fix use-after-free --- src/core/lib/surface/call.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 00719be9ab..9a8870b86a 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -437,9 +437,10 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { static void release_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_call *c = call; - grpc_channel_update_call_size_estimate(c->channel, + grpc_channel *channel = c->channel; + grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); } static void set_status_value_directly(grpc_status_code status, void *dest); -- cgit v1.2.3 From dbad370aa5a5480824388d77207d3d0b2aa94a1c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 15 Mar 2017 08:21:19 -0700 Subject: Fix use-after-free, cronet compiles --- .../transport/cronet/transport/cronet_transport.c | 46 +++++++++++++--------- src/core/lib/surface/call.c | 3 +- 2 files changed, 29 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 01a03533da..e827423a2a 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -177,6 +177,7 @@ struct op_storage { }; struct stream_obj { + gpr_arena *arena; struct op_and_state *oas; grpc_transport_stream_op *curr_op; grpc_cronet_transport *curr_ct; @@ -451,15 +452,18 @@ static void on_response_headers_received( gpr_mu_lock(&s->mu); memset(&s->state.rs.initial_metadata, 0, sizeof(s->state.rs.initial_metadata)); - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata); + grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, + s->arena); for (size_t i = 0; i < headers->count; i++) { - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.initial_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].key)), - grpc_slice_intern( - grpc_slice_from_static_string(headers->headers[i].value)))); + GRPC_LOG_IF_ERROR( + "on_response_headers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.initial_metadata, + grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].value))))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || @@ -549,17 +553,20 @@ static void on_response_trailers_received( memset(&s->state.rs.trailing_metadata, 0, sizeof(s->state.rs.trailing_metadata)); s->state.rs.trailing_metadata_valid = false; - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata); + grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata, + s->arena); for (size_t i = 0; i < trailers->count; i++) { CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, trailers->headers[i].value); - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.trailing_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].key)), - grpc_slice_intern( - grpc_slice_from_static_string(trailers->headers[i].value)))); + GRPC_LOG_IF_ERROR( + "on_response_trailers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.trailing_metadata, + grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].value))))); s->state.rs.trailing_metadata_valid = true; if (0 == strcmp(trailers->headers[i].key, "grpc-status") && 0 != strcmp(trailers->headers[i].value, "0")) { @@ -1174,7 +1181,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { stream_obj *s = (stream_obj *)gs; memset(&s->storage, 0, sizeof(s->storage)); s->storage.head = NULL; @@ -1194,6 +1201,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->curr_gs = gs; s->curr_ct = (grpc_cronet_transport *)gt; + s->arena = arena; gpr_mu_init(&s->mu); return 0; @@ -1238,9 +1246,11 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { + grpc_stream *gs, + grpc_closure *then_schedule_closure) { stream_obj *s = (stream_obj *)gs; GRPC_ERROR_UNREF(s->state.cancel_error); + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {} diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 9a8870b86a..f1285b923d 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -438,8 +438,7 @@ static void release_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_call *c = call; grpc_channel *channel = c->channel; - grpc_channel_update_call_size_estimate(channel, - gpr_arena_destroy(c->arena)); + grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); } -- cgit v1.2.3 From 9ccbc4d5e51bcaa0adc3ca6837d93b7c162b3ca6 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 15 Mar 2017 08:30:04 -0700 Subject: Don't use combiner lock for on_complete callback. --- src/core/ext/client_channel/client_channel.c | 20 +++++++------ src/core/ext/client_channel/retry_throttle.c | 42 +++++++++++----------------- src/core/ext/client_channel/retry_throttle.h | 10 ++----- 3 files changed, 32 insertions(+), 40 deletions(-) (limited to 'src') diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index ef273669e8..e0b84ddd66 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -423,7 +423,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); GPR_ASSERT(channel_arg != NULL); GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - grpc_uri *uri = grpc_uri_parse(channel_arg->value.string, true); + grpc_uri *uri = + grpc_uri_parse(exec_ctx, channel_arg->value.string, true); GPR_ASSERT(uri->path[0] != '\0'); parsing_state.server_name = uri->path[0] == '/' ? uri->path + 1 : uri->path; @@ -738,6 +739,7 @@ typedef struct client_channel_call_data { grpc_slice path; // Request path. gpr_timespec call_start_time; gpr_timespec deadline; + grpc_server_retry_throttle_data *retry_throttle_data; method_parameters *method_params; grpc_error *cancel_error; @@ -814,7 +816,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { gpr_free(ops); } -// Sets calld->method_params. +// Sets calld->method_params and calld->retry_throttle_data. // If the method params specify a timeout, populates // *per_method_deadline and returns true. static bool set_call_method_params_from_service_config_locked( @@ -822,6 +824,10 @@ static bool set_call_method_params_from_service_config_locked( gpr_timespec *per_method_deadline) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + if (chand->retry_throttle_data != NULL) { + calld->retry_throttle_data = + grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); + } if (chand->method_params_table != NULL) { calld->method_params = grpc_method_config_table_get( exec_ctx, chand->method_params_table, calld->path); @@ -1135,19 +1141,18 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = arg; - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (chand->retry_throttle_data != NULL) { + if (calld->retry_throttle_data != NULL) { if (error == GRPC_ERROR_NONE) { grpc_server_retry_throttle_data_record_success( - &chand->retry_throttle_data); + calld->retry_throttle_data); } else { // TODO(roth): In a subsequent PR, check the return value here and // decide whether or not to retry. Note that we should only // record failures whose statuses match the configured retryable // or non-fatal status codes. grpc_server_retry_throttle_data_record_failure( - &chand->retry_throttle_data); + calld->retry_throttle_data); } } grpc_closure_run(exec_ctx, calld->original_on_complete, @@ -1160,14 +1165,13 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_transport_stream_op *op = arg; grpc_call_element *elem = op->handler_private.args[0]; - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; if (op->recv_trailing_metadata != NULL) { GPR_ASSERT(op->on_complete != NULL); calld->original_on_complete = op->on_complete; grpc_closure_init(&calld->on_complete, on_complete_locked, elem, - grpc_combiner_scheduler(chand->combiner, false)); + grpc_schedule_on_exec_ctx); op->on_complete = &calld->on_complete; } diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c index 2aa52e4903..7b813c33df 100644 --- a/src/core/ext/client_channel/retry_throttle.c +++ b/src/core/ext/client_channel/retry_throttle.c @@ -64,26 +64,18 @@ static void get_replacement_throttle_data_if_needed( (grpc_server_retry_throttle_data*)gpr_atm_acq_load( &(*throttle_data)->replacement); if (new_throttle_data == NULL) return; - // Reset *throttle_data to its replacement, updating refcounts as - // appropriate. - // Note: It's safe to do this here, because the caller ensures that - // this will only be called with a given value of throttle_data from - // one thread at a time. - grpc_server_retry_throttle_data_ref(new_throttle_data); - grpc_server_retry_throttle_data* old_throttle_data = *throttle_data; *throttle_data = new_throttle_data; - grpc_server_retry_throttle_data_unref(old_throttle_data); } } bool grpc_server_retry_throttle_data_record_failure( - grpc_server_retry_throttle_data** throttle_data) { + grpc_server_retry_throttle_data* throttle_data) { // First, check if we are stale and need to be replaced. - get_replacement_throttle_data_if_needed(throttle_data); + get_replacement_throttle_data_if_needed(&throttle_data); // We decrement milli_tokens by 1000 (1 token) for each failure. const int delta = -1000; const int old_value = (int)gpr_atm_full_fetch_add( - &(*throttle_data)->milli_tokens, (gpr_atm)delta); + &throttle_data->milli_tokens, (gpr_atm)delta); // If the above change takes us below 0, then re-add the excess. Note // that between these two atomic operations, the value will be // artificially low by as much as 1000, but this window should be @@ -91,41 +83,42 @@ bool grpc_server_retry_throttle_data_record_failure( int new_value = old_value - 1000; if (new_value < 0) { const int excess_value = new_value - (old_value < 0 ? old_value : 0); - gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, + gpr_atm_full_fetch_add(&throttle_data->milli_tokens, (gpr_atm)-excess_value); new_value = 0; } // Retries are allowed as long as the new value is above the threshold // (max_milli_tokens / 2). - return new_value > (*throttle_data)->max_milli_tokens / 2; + return new_value > throttle_data->max_milli_tokens / 2; } void grpc_server_retry_throttle_data_record_success( - grpc_server_retry_throttle_data** throttle_data) { + grpc_server_retry_throttle_data* throttle_data) { // First, check if we are stale and need to be replaced. - get_replacement_throttle_data_if_needed(throttle_data); + get_replacement_throttle_data_if_needed(&throttle_data); // We increment milli_tokens by milli_token_ratio for each success. - const int delta = (*throttle_data)->milli_token_ratio; + const int delta = throttle_data->milli_token_ratio; const int old_value = (int)gpr_atm_full_fetch_add( - &(*throttle_data)->milli_tokens, (gpr_atm)delta); + &throttle_data->milli_tokens, (gpr_atm)delta); // If the above change takes us over max_milli_tokens, then subtract // the excess. Note that between these two atomic operations, the // value will be artificially high by as much as milli_token_ratio, // but this window should be brief. - const int new_value = old_value + (*throttle_data)->milli_token_ratio; - if (new_value > (*throttle_data)->max_milli_tokens) { + const int new_value = old_value + throttle_data->milli_token_ratio; + if (new_value > throttle_data->max_milli_tokens) { const int excess_value = - new_value - (old_value > (*throttle_data)->max_milli_tokens + new_value - (old_value > throttle_data->max_milli_tokens ? old_value - : (*throttle_data)->max_milli_tokens); - gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens, + : throttle_data->max_milli_tokens); + gpr_atm_full_fetch_add(&throttle_data->milli_tokens, (gpr_atm)-excess_value); } } -void grpc_server_retry_throttle_data_ref( +grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( grpc_server_retry_throttle_data* throttle_data) { gpr_ref(&throttle_data->refs); + return throttle_data; } void grpc_server_retry_throttle_data_unref( @@ -189,8 +182,7 @@ static void destroy_server_retry_throttle_data(void* value) { static void* copy_server_retry_throttle_data(void* value) { grpc_server_retry_throttle_data* throttle_data = value; - grpc_server_retry_throttle_data_ref(throttle_data); - return value; + return grpc_server_retry_throttle_data_ref(throttle_data); } static const gpr_avl_vtable avl_vtable = { diff --git a/src/core/ext/client_channel/retry_throttle.h b/src/core/ext/client_channel/retry_throttle.h index 4209bb7fb6..f9971faf65 100644 --- a/src/core/ext/client_channel/retry_throttle.h +++ b/src/core/ext/client_channel/retry_throttle.h @@ -40,17 +40,13 @@ typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data; /// Records a failure. Returns true if it's okay to send a retry. -/// Updates \a throttle_data if the original value is stale and has been -/// replaced. Not thread safe; caller must synchronize. bool grpc_server_retry_throttle_data_record_failure( - grpc_server_retry_throttle_data** throttle_data); + grpc_server_retry_throttle_data* throttle_data); /// Records a success. -/// Updates \a throttle_data if the original value is stale and has been -/// replaced. Not thread safe; caller must synchronize. void grpc_server_retry_throttle_data_record_success( - grpc_server_retry_throttle_data** throttle_data); + grpc_server_retry_throttle_data* throttle_data); -void grpc_server_retry_throttle_data_ref( +grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( grpc_server_retry_throttle_data* throttle_data); void grpc_server_retry_throttle_data_unref( grpc_server_retry_throttle_data* throttle_data); -- cgit v1.2.3 From 527937031495287be5c301614055d1eb6a078241 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 15 Mar 2017 08:38:24 -0700 Subject: Fix compilation --- src/cpp/common/channel_filter.h | 3 ++- test/cpp/microbenchmarks/bm_call_create.cc | 26 ++++++++++++++++++++------ 2 files changed, 22 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index 79c4bab985..494d5d64d7 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -318,7 +318,8 @@ class ChannelFilter final { static void DestroyCallElement(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *then_call_closure) { + GPR_ASSERT(then_call_closure == NULL); reinterpret_cast(elem->call_data)->~CallDataType(); } diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index 014e2b96b5..5ef40abb97 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -232,7 +232,7 @@ static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, static void DestroyCallElem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) {} + grpc_closure *then_sched_closure) {} grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { @@ -275,7 +275,7 @@ const char *name; /* implementation of grpc_transport_init_stream */ int InitStream(grpc_exec_ctx *exec_ctx, grpc_transport *self, grpc_stream *stream, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { return 0; } @@ -299,7 +299,7 @@ void PerformOp(grpc_exec_ctx *exec_ctx, grpc_transport *self, /* implementation of grpc_transport_destroy_stream */ void DestroyStream(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream, void *and_free_memory) {} + grpc_stream *stream, grpc_closure *then_sched_closure) {} /* implementation of grpc_transport_destroy */ void Destroy(grpc_exec_ctx *exec_ctx, grpc_transport *self) {} @@ -394,7 +394,7 @@ static void BM_IsolatedFilter(benchmark::State &state) { grpc_channel_stack *channel_stack = static_cast(gpr_zalloc(channel_size)); GPR_ASSERT(GRPC_LOG_IF_ERROR( - "call_stack_init", + "channel_stack_init", grpc_channel_stack_init(&exec_ctx, 1, FilterDestroy, channel_stack, &filters[0], filters.size(), &channel_args, fixture.flags & REQUIRES_TRANSPORT @@ -409,15 +409,29 @@ static void BM_IsolatedFilter(benchmark::State &state) { grpc_slice method = grpc_slice_from_static_string("/foo/bar"); grpc_call_final_info final_info; TestOp test_op_data; + grpc_call_element_args call_args; + call_args.call_stack = call_stack; + call_args.server_transport_data = NULL; + call_args.context = NULL; + call_args.path = method; + call_args.start_time = start_time; + call_args.deadline = deadline; + const int kArenaSize = 4096; + call_args.arena = gpr_arena_create(kArenaSize); while (state.KeepRunning()) { GRPC_ERROR_UNREF(grpc_call_stack_init(&exec_ctx, channel_stack, 1, - DoNothing, NULL, NULL, NULL, method, - start_time, deadline, call_stack)); + DoNothing, NULL, &call_args)); typename TestOp::Op op(&exec_ctx, &test_op_data, call_stack); grpc_call_stack_destroy(&exec_ctx, call_stack, &final_info, NULL); op.Finish(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx); + // recreate arena every 64k iterations to avoid oom + if (0 == (state.iterations() & 0xffff)) { + gpr_arena_destroy(call_args.arena); + call_args.arena = gpr_arena_create(kArenaSize); + } } + gpr_arena_destroy(call_args.arena); grpc_channel_stack_destroy(&exec_ctx, channel_stack); grpc_exec_ctx_finish(&exec_ctx); gpr_free(channel_stack); -- cgit v1.2.3 From 2ac189e60626b6dbcec48feca8d79614db00baee Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 15 Mar 2017 10:02:57 -0700 Subject: Fix proxy mapper initialization to work with third-party plugins. --- src/core/ext/client_channel/proxy_mapper_registry.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/core/ext/client_channel/proxy_mapper_registry.c b/src/core/ext/client_channel/proxy_mapper_registry.c index 2c44b9d490..0935ddbdbd 100644 --- a/src/core/ext/client_channel/proxy_mapper_registry.c +++ b/src/core/ext/client_channel/proxy_mapper_registry.c @@ -94,6 +94,14 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) { grpc_proxy_mapper_destroy(list->list[i]); } gpr_free(list->list); + // Clean up in case we re-initialze later. + // TODO(ctiller): This should ideally live in + // grpc_proxy_mapper_registry_init(). However, if we did this there, + // then we would do it AFTER we start registering proxy mappers from + // third-party plugins, so they'd never show up (and would leak memory). + // We probably need some sort of dependency system for plugins to fix + // this. + memset(list, 0, sizeof(*list)); } // @@ -102,9 +110,7 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) { static grpc_proxy_mapper_list g_proxy_mapper_list; -void grpc_proxy_mapper_registry_init() { - memset(&g_proxy_mapper_list, 0, sizeof(g_proxy_mapper_list)); -} +void grpc_proxy_mapper_registry_init() {} void grpc_proxy_mapper_registry_shutdown() { grpc_proxy_mapper_list_destroy(&g_proxy_mapper_list); -- cgit v1.2.3 From de14410b43cfbed4c1e0b777aa18a764d92d56ad Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 15 Mar 2017 10:11:03 -0700 Subject: Rename on_complete_locked() to on_complete(). --- src/core/ext/client_channel/client_channel.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index e0b84ddd66..937ebfcb55 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -1138,8 +1138,7 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, add_waiting_locked(calld, op); } -static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = arg; call_data *calld = elem->call_data; if (calld->retry_throttle_data != NULL) { @@ -1170,7 +1169,7 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, if (op->recv_trailing_metadata != NULL) { GPR_ASSERT(op->on_complete != NULL); calld->original_on_complete = op->on_complete; - grpc_closure_init(&calld->on_complete, on_complete_locked, elem, + grpc_closure_init(&calld->on_complete, on_complete, elem, grpc_schedule_on_exec_ctx); op->on_complete = &calld->on_complete; } -- cgit v1.2.3 From 364fa33576bc09407b37b3ecca43f7b26d4f7c21 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 16 Mar 2017 08:00:36 -0700 Subject: Override subchannel address in channel args when proxy mapper is used. --- src/core/ext/client_channel/subchannel.c | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 5df0a9060d..2528941981 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -340,17 +340,15 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(new_address != NULL); gpr_free(addr); addr = new_address; - if (new_args != NULL) c->args = new_args; - } - if (c->args == NULL) { - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; - grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); - c->args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, - 1); - gpr_free(new_arg.value.string); } + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; + grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); gpr_free(addr); + c->args = grpc_channel_args_copy_and_add_and_remove( + new_args != NULL ? new_args : args->args, keys_to_remove, + GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); + gpr_free(new_arg.value.string); + if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c, -- cgit v1.2.3 From 2ccd502ed4e3fffa2cccf7d3e0aa7103caf37bf0 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 16 Mar 2017 09:57:46 -0700 Subject: Add cast --- src/core/lib/surface/channel.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 56beb7d96e..2b700b2f67 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -119,7 +119,7 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, gpr_atm_no_barrier_store( &channel->call_size_estimate, - CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size); + (gpr_atm)CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size); grpc_compression_options_init(&channel->compression_options); for (size_t i = 0; i < args->num_args; i++) { -- cgit v1.2.3 From fecba535d99ec2c819a0d26707047bf2f2f323fa Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 17 Mar 2017 09:50:48 -0700 Subject: Switch to using a CAS loop to update the token value. --- BUILD | 1 + CMakeLists.txt | 1 + Makefile | 1 + binding.gyp | 1 + build.yaml | 1 + config.m4 | 1 + gRPC-Core.podspec | 1 + grpc.gemspec | 1 + include/grpc/impl/codegen/atm.h | 5 +++ package.xml | 1 + src/core/ext/client_channel/retry_throttle.c | 36 +++-------------- src/core/lib/support/atm.c | 47 ++++++++++++++++++++++ src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.core.internal | 1 + tools/run_tests/generated/sources_and_headers.json | 1 + vsprojects/vcxproj/gpr/gpr.vcxproj | 2 + vsprojects/vcxproj/gpr/gpr.vcxproj.filters | 3 ++ 17 files changed, 75 insertions(+), 30 deletions(-) create mode 100644 src/core/lib/support/atm.c (limited to 'src') diff --git a/BUILD b/BUILD index 4e1f20c3b2..1fe72c02db 100644 --- a/BUILD +++ b/BUILD @@ -309,6 +309,7 @@ grpc_cc_library( "src/core/lib/profiling/basic_timers.c", "src/core/lib/profiling/stap_timers.c", "src/core/lib/support/alloc.c", + "src/core/lib/support/atm.c", "src/core/lib/support/avl.c", "src/core/lib/support/backoff.c", "src/core/lib/support/cmdline.c", diff --git a/CMakeLists.txt b/CMakeLists.txt index 8bc07255f1..9e99062f58 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -693,6 +693,7 @@ add_library(gpr src/core/lib/profiling/basic_timers.c src/core/lib/profiling/stap_timers.c src/core/lib/support/alloc.c + src/core/lib/support/atm.c src/core/lib/support/avl.c src/core/lib/support/backoff.c src/core/lib/support/cmdline.c diff --git a/Makefile b/Makefile index 11bac54c79..2f7120987a 100644 --- a/Makefile +++ b/Makefile @@ -2599,6 +2599,7 @@ LIBGPR_SRC = \ src/core/lib/profiling/basic_timers.c \ src/core/lib/profiling/stap_timers.c \ src/core/lib/support/alloc.c \ + src/core/lib/support/atm.c \ src/core/lib/support/avl.c \ src/core/lib/support/backoff.c \ src/core/lib/support/cmdline.c \ diff --git a/binding.gyp b/binding.gyp index f6a04b27f9..1107f31889 100644 --- a/binding.gyp +++ b/binding.gyp @@ -544,6 +544,7 @@ 'src/core/lib/profiling/basic_timers.c', 'src/core/lib/profiling/stap_timers.c', 'src/core/lib/support/alloc.c', + 'src/core/lib/support/atm.c', 'src/core/lib/support/avl.c', 'src/core/lib/support/backoff.c', 'src/core/lib/support/cmdline.c', diff --git a/build.yaml b/build.yaml index ae546cbb30..7b27968c61 100644 --- a/build.yaml +++ b/build.yaml @@ -101,6 +101,7 @@ filegroups: - src/core/lib/profiling/basic_timers.c - src/core/lib/profiling/stap_timers.c - src/core/lib/support/alloc.c + - src/core/lib/support/atm.c - src/core/lib/support/avl.c - src/core/lib/support/backoff.c - src/core/lib/support/cmdline.c diff --git a/config.m4 b/config.m4 index 5eaf161f09..010401f2fb 100644 --- a/config.m4 +++ b/config.m4 @@ -39,6 +39,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/profiling/basic_timers.c \ src/core/lib/profiling/stap_timers.c \ src/core/lib/support/alloc.c \ + src/core/lib/support/atm.c \ src/core/lib/support/avl.c \ src/core/lib/support/backoff.c \ src/core/lib/support/cmdline.c \ diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 2fb00a3afe..c78e4a7023 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -211,6 +211,7 @@ Pod::Spec.new do |s| 'src/core/lib/profiling/basic_timers.c', 'src/core/lib/profiling/stap_timers.c', 'src/core/lib/support/alloc.c', + 'src/core/lib/support/atm.c', 'src/core/lib/support/avl.c', 'src/core/lib/support/backoff.c', 'src/core/lib/support/cmdline.c', diff --git a/grpc.gemspec b/grpc.gemspec index 1ca2446e65..95aba00fd7 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -97,6 +97,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/profiling/basic_timers.c ) s.files += %w( src/core/lib/profiling/stap_timers.c ) s.files += %w( src/core/lib/support/alloc.c ) + s.files += %w( src/core/lib/support/atm.c ) s.files += %w( src/core/lib/support/avl.c ) s.files += %w( src/core/lib/support/backoff.c ) s.files += %w( src/core/lib/support/cmdline.c ) diff --git a/include/grpc/impl/codegen/atm.h b/include/grpc/impl/codegen/atm.h index ae00fb0f16..4bd572d6d1 100644 --- a/include/grpc/impl/codegen/atm.h +++ b/include/grpc/impl/codegen/atm.h @@ -92,4 +92,9 @@ #error could not determine platform for atm #endif +/** Adds \a delta to \a *value, clamping the result to the range specified + by \a min and \a max. Returns the new value. */ +gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta, + gpr_atm min, gpr_atm max); + #endif /* GRPC_IMPL_CODEGEN_ATM_H */ diff --git a/package.xml b/package.xml index e29f462d33..83ad2d2129 100644 --- a/package.xml +++ b/package.xml @@ -106,6 +106,7 @@ + diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c index 7b813c33df..8926c3d782 100644 --- a/src/core/ext/client_channel/retry_throttle.c +++ b/src/core/ext/client_channel/retry_throttle.c @@ -73,20 +73,9 @@ bool grpc_server_retry_throttle_data_record_failure( // First, check if we are stale and need to be replaced. get_replacement_throttle_data_if_needed(&throttle_data); // We decrement milli_tokens by 1000 (1 token) for each failure. - const int delta = -1000; - const int old_value = (int)gpr_atm_full_fetch_add( - &throttle_data->milli_tokens, (gpr_atm)delta); - // If the above change takes us below 0, then re-add the excess. Note - // that between these two atomic operations, the value will be - // artificially low by as much as 1000, but this window should be - // brief. - int new_value = old_value - 1000; - if (new_value < 0) { - const int excess_value = new_value - (old_value < 0 ? old_value : 0); - gpr_atm_full_fetch_add(&throttle_data->milli_tokens, - (gpr_atm)-excess_value); - new_value = 0; - } + const int new_value = (int)gpr_atm_no_barrier_clamped_add( + &throttle_data->milli_tokens, (gpr_atm)-1000, (gpr_atm)0, + (gpr_atm)throttle_data->max_milli_tokens); // Retries are allowed as long as the new value is above the threshold // (max_milli_tokens / 2). return new_value > throttle_data->max_milli_tokens / 2; @@ -97,22 +86,9 @@ void grpc_server_retry_throttle_data_record_success( // First, check if we are stale and need to be replaced. get_replacement_throttle_data_if_needed(&throttle_data); // We increment milli_tokens by milli_token_ratio for each success. - const int delta = throttle_data->milli_token_ratio; - const int old_value = (int)gpr_atm_full_fetch_add( - &throttle_data->milli_tokens, (gpr_atm)delta); - // If the above change takes us over max_milli_tokens, then subtract - // the excess. Note that between these two atomic operations, the - // value will be artificially high by as much as milli_token_ratio, - // but this window should be brief. - const int new_value = old_value + throttle_data->milli_token_ratio; - if (new_value > throttle_data->max_milli_tokens) { - const int excess_value = - new_value - (old_value > throttle_data->max_milli_tokens - ? old_value - : throttle_data->max_milli_tokens); - gpr_atm_full_fetch_add(&throttle_data->milli_tokens, - (gpr_atm)-excess_value); - } + gpr_atm_no_barrier_clamped_add( + &throttle_data->milli_tokens, (gpr_atm)throttle_data->milli_token_ratio, + (gpr_atm)0, (gpr_atm)throttle_data->max_milli_tokens); } grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( diff --git a/src/core/lib/support/atm.c b/src/core/lib/support/atm.c new file mode 100644 index 0000000000..06e8432caf --- /dev/null +++ b/src/core/lib/support/atm.c @@ -0,0 +1,47 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include + +gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta, + gpr_atm min, gpr_atm max) { + gpr_atm current; + gpr_atm new; + do { + current = gpr_atm_no_barrier_load(value); + new = GPR_CLAMP(current + delta, min, max); + if (new == current) break; + } while (!gpr_atm_no_barrier_cas(value, current, new)); + return new; +} diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 94d6e46cae..da0dba7dfe 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -33,6 +33,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/profiling/basic_timers.c', 'src/core/lib/profiling/stap_timers.c', 'src/core/lib/support/alloc.c', + 'src/core/lib/support/atm.c', 'src/core/lib/support/avl.c', 'src/core/lib/support/backoff.c', 'src/core/lib/support/cmdline.c', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index fbe1f7f78e..7147d152ef 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1230,6 +1230,7 @@ src/core/lib/slice/slice_internal.h \ src/core/lib/slice/slice_string_helpers.c \ src/core/lib/slice/slice_string_helpers.h \ src/core/lib/support/alloc.c \ +src/core/lib/support/atm.c \ src/core/lib/support/avl.c \ src/core/lib/support/backoff.c \ src/core/lib/support/backoff.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index b2f9078c05..7a6295cf72 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7311,6 +7311,7 @@ "src/core/lib/profiling/stap_timers.c", "src/core/lib/profiling/timers.h", "src/core/lib/support/alloc.c", + "src/core/lib/support/atm.c", "src/core/lib/support/avl.c", "src/core/lib/support/backoff.c", "src/core/lib/support/backoff.h", diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj b/vsprojects/vcxproj/gpr/gpr.vcxproj index 44c21ddeb3..67ac3b98c5 100644 --- a/vsprojects/vcxproj/gpr/gpr.vcxproj +++ b/vsprojects/vcxproj/gpr/gpr.vcxproj @@ -208,6 +208,8 @@ + + diff --git a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters index a5924a624a..c49c87ed60 100644 --- a/vsprojects/vcxproj/gpr/gpr.vcxproj.filters +++ b/vsprojects/vcxproj/gpr/gpr.vcxproj.filters @@ -10,6 +10,9 @@ src\core\lib\support + + src\core\lib\support + src\core\lib\support -- cgit v1.2.3 From 3c3093cbed08efac9e73cd0bb1a062e75fab3e0a Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 17 Mar 2017 14:36:01 -0700 Subject: Fix that there is no :status header in the case of trailer-only failure reply --- .../transport/chttp2/transport/chttp2_transport.c | 24 ++++++++++++++++++++++ 1 file changed, 24 insertions(+) (limited to 'src') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 082078c72f..89659e7464 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1765,6 +1765,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error) { grpc_slice hdr; grpc_slice status_hdr; + grpc_slice http_status_hdr; grpc_slice message_pfx; uint8_t *p; uint32_t len = 0; @@ -1780,6 +1781,26 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, It's complicated by the fact that our send machinery would be dead by the time we got around to sending this, so instead we ignore HPACK compression and just write the uncompressed bytes onto the wire. */ + if (!s->sent_initial_metadata) { + http_status_hdr = grpc_slice_malloc(13); + p = GRPC_SLICE_START_PTR(http_status_hdr); + *p++ = 0x00; + *p++ = 7; + *p++ = ':'; + *p++ = 's'; + *p++ = 't'; + *p++ = 'a'; + *p++ = 't'; + *p++ = 'u'; + *p++ = 's'; + *p++ = 3; + *p++ = '2'; + *p++ = '0'; + *p++ = '0'; + GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr)); + len += (uint32_t)GRPC_SLICE_LENGTH(http_status_hdr); + } + status_hdr = grpc_slice_malloc(15 + (grpc_status >= 10)); p = GRPC_SLICE_START_PTR(status_hdr); *p++ = 0x00; /* literal header, not indexed */ @@ -1847,6 +1868,9 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr)); grpc_slice_buffer_add(&t->qbuf, hdr); + if (!s->sent_initial_metadata) { + grpc_slice_buffer_add(&t->qbuf, http_status_hdr); + } grpc_slice_buffer_add(&t->qbuf, status_hdr); if (msg != NULL) { grpc_slice_buffer_add(&t->qbuf, message_pfx); -- cgit v1.2.3 From cb2f3f6ba6f8097b6a15d311d6eed9709c655963 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Fri, 17 Mar 2017 14:30:09 -0700 Subject: Downgrade error severity to INFO when failing to add a listener --- src/core/lib/iomgr/tcp_server_posix.c | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index b3644518f5..e242631fc0 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -350,8 +350,20 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, } } if (*out_port > 0) { - GRPC_LOG_IF_ERROR("Failed to add :: listener", v6_err); - GRPC_LOG_IF_ERROR("Failed to add 0.0.0.0 listener", v4_err); + if (v6_err != GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, + "Failed to add :: listener, " + "the environment may not support IPv6: %s", + grpc_error_string(v6_err)); + GRPC_ERROR_UNREF(v6_err); + } + if (v4_err != GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, + "Failed to add 0.0.0.0 listener, " + "the environment may not support IPv4: %s", + grpc_error_string(v4_err)); + GRPC_ERROR_UNREF(v4_err); + } return GRPC_ERROR_NONE; } else { grpc_error *root_err = -- cgit v1.2.3 From bb7484cee7943f82680f32267ffd6115a355111f Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Fri, 17 Mar 2017 23:42:39 +0000 Subject: Update tests to current gRPC code elements --- src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py | 8 ++++---- src/python/grpcio_tests/tests/interop/_secure_intraop_test.py | 8 ++++---- src/python/grpcio_tests/tests/interop/methods.py | 4 ++-- src/python/grpcio_tests/tests/interop/server.py | 5 +++-- src/python/grpcio_tests/tests/qps/qps_worker.py | 4 ++-- src/python/grpcio_tests/tests/qps/worker_server.py | 8 ++++---- .../grpcio_tests/tests/reflection/_reflection_servicer_test.py | 7 ++++--- src/python/grpcio_tests/tests/stress/client.py | 4 ++-- 8 files changed, 25 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py index 58f3b364ba..3325d54375 100644 --- a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py +++ b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py @@ -32,7 +32,7 @@ from concurrent import futures import unittest import grpc -from src.proto.grpc.testing import test_pb2 +from src.proto.grpc.testing import test_pb2_grpc from tests.interop import _intraop_test_case from tests.interop import methods @@ -44,11 +44,11 @@ class InsecureIntraopTest(_intraop_test_case.IntraopTestCase, def setUp(self): self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - test_pb2.add_TestServiceServicer_to_server(methods.TestService(), - self.server) + test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(), + self.server) port = self.server.add_insecure_port('[::]:0') self.server.start() - self.stub = test_pb2.TestServiceStub( + self.stub = test_pb2_grpc.TestServiceStub( grpc.insecure_channel('localhost:{}'.format(port))) diff --git a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py index 5fe929b99e..857e00efb3 100644 --- a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py +++ b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py @@ -32,7 +32,7 @@ from concurrent import futures import unittest import grpc -from src.proto.grpc.testing import test_pb2 +from src.proto.grpc.testing import test_pb2_grpc from tests.interop import _intraop_test_case from tests.interop import methods @@ -45,14 +45,14 @@ class SecureIntraopTest(_intraop_test_case.IntraopTestCase, unittest.TestCase): def setUp(self): self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - test_pb2.add_TestServiceServicer_to_server(methods.TestService(), - self.server) + test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(), + self.server) port = self.server.add_secure_port( '[::]:0', grpc.ssl_server_credentials( [(resources.private_key(), resources.certificate_chain())])) self.server.start() - self.stub = test_pb2.TestServiceStub( + self.stub = test_pb2_grpc.TestServiceStub( grpc.secure_channel('localhost:{}'.format(port), grpc.ssl_channel_credentials( resources.test_root_certificates()), ( diff --git a/src/python/grpcio_tests/tests/interop/methods.py b/src/python/grpcio_tests/tests/interop/methods.py index 662ea9ce57..e1016f7c0d 100644 --- a/src/python/grpcio_tests/tests/interop/methods.py +++ b/src/python/grpcio_tests/tests/interop/methods.py @@ -40,7 +40,7 @@ from grpc.beta import implementations from src.proto.grpc.testing import empty_pb2 from src.proto.grpc.testing import messages_pb2 -from src.proto.grpc.testing import test_pb2 +from src.proto.grpc.testing import test_pb2_grpc _INITIAL_METADATA_KEY = "x-grpc-test-echo-initial" _TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin" @@ -66,7 +66,7 @@ def _maybe_echo_status_and_message(request, servicer_context): servicer_context.set_details(request.response_status.message) -class TestService(test_pb2.TestServiceServicer): +class TestService(test_pb2_grpc.TestServiceServicer): def EmptyCall(self, request, context): _maybe_echo_metadata(context) diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py index 65f1604eb8..0ae2c97b42 100644 --- a/src/python/grpcio_tests/tests/interop/server.py +++ b/src/python/grpcio_tests/tests/interop/server.py @@ -34,7 +34,7 @@ import logging import time import grpc -from src.proto.grpc.testing import test_pb2 +from src.proto.grpc.testing import test_pb2_grpc from tests.interop import methods from tests.interop import resources @@ -53,7 +53,8 @@ def serve(): args = parser.parse_args() server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - test_pb2.add_TestServiceServicer_to_server(methods.TestService(), server) + test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(), + server) if args.use_tls: private_key = resources.private_key() certificate_chain = resources.certificate_chain() diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py index 025dfb9d4a..7cd53e7bd9 100644 --- a/src/python/grpcio_tests/tests/qps/qps_worker.py +++ b/src/python/grpcio_tests/tests/qps/qps_worker.py @@ -33,7 +33,7 @@ import time from concurrent import futures import grpc -from src.proto.grpc.testing import services_pb2 +from src.proto.grpc.testing import services_pb2_grpc from tests.qps import worker_server @@ -41,7 +41,7 @@ from tests.qps import worker_server def run_worker_server(port): server = grpc.server(futures.ThreadPoolExecutor(max_workers=5)) servicer = worker_server.WorkerServer() - services_pb2.add_WorkerServiceServicer_to_server(servicer, server) + services_pb2_grpc.add_WorkerServiceServicer_to_server(servicer, server) server.add_insecure_port('[::]:{}'.format(port)) server.start() servicer.wait_for_quit() diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index ca1a777611..de9535f46e 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -35,7 +35,7 @@ import time from concurrent import futures import grpc from src.proto.grpc.testing import control_pb2 -from src.proto.grpc.testing import services_pb2 +from src.proto.grpc.testing import services_pb2_grpc from src.proto.grpc.testing import stats_pb2 from tests.qps import benchmark_client @@ -45,7 +45,7 @@ from tests.qps import histogram from tests.unit import resources -class WorkerServer(services_pb2.WorkerServiceServicer): +class WorkerServer(services_pb2_grpc.WorkerServiceServicer): """Python Worker Server implementation.""" def __init__(self): @@ -87,8 +87,8 @@ class WorkerServer(services_pb2.WorkerServiceServicer): futures.ThreadPoolExecutor(max_workers=server_threads)) if config.server_type == control_pb2.ASYNC_SERVER: servicer = benchmark_server.BenchmarkServer() - services_pb2.add_BenchmarkServiceServicer_to_server(servicer, - server) + services_pb2_grpc.add_BenchmarkServiceServicer_to_server(servicer, + server) elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER: resp_size = config.payload_config.bytebuf_params.resp_size servicer = benchmark_server.GenericBenchmarkServer(resp_size) diff --git a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py index d06ff064e2..4d73be6204 100644 --- a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py +++ b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py @@ -34,6 +34,7 @@ import grpc from grpc.framework.foundation import logging_pool from grpc_reflection.v1alpha import reflection from grpc_reflection.v1alpha import reflection_pb2 +from grpc_reflection.v1alpha import reflection_pb2_grpc from google.protobuf import descriptor_pool from google.protobuf import descriptor_pb2 @@ -61,12 +62,12 @@ class ReflectionServicerTest(unittest.TestCase): server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) self._server = grpc.server(server_pool) port = self._server.add_insecure_port('[::]:0') - reflection_pb2.add_ServerReflectionServicer_to_server(servicer, - self._server) + reflection_pb2_grpc.add_ServerReflectionServicer_to_server(servicer, + self._server) self._server.start() channel = grpc.insecure_channel('localhost:%d' % port) - self._stub = reflection_pb2.ServerReflectionStub(channel) + self._stub = reflection_pb2_grpc.ServerReflectionStub(channel) def testFileByName(self): requests = (reflection_pb2.ServerReflectionRequest( diff --git a/src/python/grpcio_tests/tests/stress/client.py b/src/python/grpcio_tests/tests/stress/client.py index b9dbe61d44..b7eb12bff8 100644 --- a/src/python/grpcio_tests/tests/stress/client.py +++ b/src/python/grpcio_tests/tests/stress/client.py @@ -34,7 +34,7 @@ import threading import grpc from six.moves import queue -from src.proto.grpc.testing import metrics_pb2 +from src.proto.grpc.testing import metrics_pb2_grpc from src.proto.grpc.testing import test_pb2 from tests.interop import methods @@ -139,7 +139,7 @@ def run_test(args): runners = [] server = grpc.server(futures.ThreadPoolExecutor(max_workers=25)) - metrics_pb2.add_MetricsServiceServicer_to_server( + metrics_pb2_grpc.add_MetricsServiceServicer_to_server( metrics_server.MetricsServer(hist), server) server.add_insecure_port('[::]:{}'.format(args.metrics_port)) server.start() -- cgit v1.2.3