aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-03-21 15:16:14 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-03-21 15:16:14 -0700
commit31041c9e1247e26d5b9acfa9231bbb4689071bda (patch)
treec02880f9c1db48ab3fa74c8c788a0475bd51a4cc /src
parentb7f35a658b00ae57d0341261d0944c26066dda04 (diff)
parent9f615de5ed5c622bd7abe0baaed418fcc560ba9b (diff)
Merge github.com:grpc/grpc into call_cases
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/census/grpc_filter.c4
-rw-r--r--src/core/ext/client_channel/client_channel.c149
-rw-r--r--src/core/ext/client_channel/client_channel_plugin.c3
-rw-r--r--src/core/ext/client_channel/proxy_mapper_registry.c12
-rw-r--r--src/core/ext/client_channel/retry_throttle.c210
-rw-r--r--src/core/ext/client_channel/retry_throttle.h65
-rw-r--r--src/core/ext/client_channel/subchannel.c49
-rw-r--r--src/core/ext/client_channel/subchannel.h18
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c57
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c17
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.c65
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.h18
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c18
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c46
-rw-r--r--src/core/lib/channel/channel_stack.c36
-rw-r--r--src/core/lib/channel/channel_stack.h23
-rw-r--r--src/core/lib/channel/compress_filter.c2
-rw-r--r--src/core/lib/channel/connected_channel.c6
-rw-r--r--src/core/lib/channel/deadline_filter.c2
-rw-r--r--src/core/lib/channel/http_client_filter.c2
-rw-r--r--src/core/lib/channel/http_server_filter.c2
-rw-r--r--src/core/lib/channel/message_size_filter.c2
-rw-r--r--src/core/lib/iomgr/port.h4
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c416
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix.h134
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.c220
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c195
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c49
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c2
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c2
-rw-r--r--src/core/lib/support/atm.c47
-rw-r--r--src/core/lib/surface/call.c37
-rw-r--r--src/core/lib/surface/channel.c32
-rw-r--r--src/core/lib/surface/channel.h3
-rw-r--r--src/core/lib/surface/lame_client.c4
-rw-r--r--src/core/lib/surface/server.c2
-rw-r--r--src/core/lib/transport/service_config.c12
-rw-r--r--src/core/lib/transport/service_config.h6
-rw-r--r--src/core/lib/transport/transport.c9
-rw-r--r--src/core/lib/transport/transport.h6
-rw-r--r--src/core/lib/transport/transport_impl.h5
-rw-r--r--src/cpp/common/channel_arguments.cc14
-rw-r--r--src/cpp/common/channel_filter.h3
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py5
-rw-r--r--src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py8
-rw-r--r--src/python/grpcio_tests/tests/interop/_secure_intraop_test.py8
-rw-r--r--src/python/grpcio_tests/tests/interop/methods.py4
-rw-r--r--src/python/grpcio_tests/tests/interop/server.py5
-rw-r--r--src/python/grpcio_tests/tests/qps/qps_worker.py4
-rw-r--r--src/python/grpcio_tests/tests/qps/worker_server.py8
-rw-r--r--src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py7
-rw-r--r--src/python/grpcio_tests/tests/stress/client.py4
54 files changed, 1462 insertions, 603 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index b80d831557..fc29dbd454 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -138,7 +138,7 @@ static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx,
static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {
+ grpc_closure *ignored) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
/* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
@@ -160,7 +160,7 @@ static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx,
static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {
+ grpc_closure *ignored) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
/* TODO(hongyu): record rpc server stats and census_tracing_end_op here */
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index f2475cf6ae..00c20913b0 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -47,6 +47,7 @@
#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/client_channel/resolver_registry.h"
+#include "src/core/ext/client_channel/retry_throttle.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
@@ -189,6 +190,8 @@ typedef struct client_channel_channel_data {
grpc_combiner *combiner;
/** currently active load balancer */
grpc_lb_policy *lb_policy;
+ /** retry throttle data */
+ grpc_server_retry_throttle_data *retry_throttle_data;
/** maps method names to method_parameters structs */
grpc_slice_hash_table *method_params_table;
/** incoming resolver result - set by resolver.next() */
@@ -284,6 +287,65 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
&w->on_changed);
}
+typedef struct {
+ char *server_name;
+ grpc_server_retry_throttle_data *retry_throttle_data;
+} service_config_parsing_state;
+
+static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
+ service_config_parsing_state *parsing_state = arg;
+ if (strcmp(field->key, "retryThrottling") == 0) {
+ if (parsing_state->retry_throttle_data != NULL) return; // Duplicate.
+ if (field->type != GRPC_JSON_OBJECT) return;
+ int max_milli_tokens = 0;
+ int milli_token_ratio = 0;
+ for (grpc_json *sub_field = field->child; sub_field != NULL;
+ sub_field = sub_field->next) {
+ if (sub_field->key == NULL) return;
+ if (strcmp(sub_field->key, "maxTokens") == 0) {
+ if (max_milli_tokens != 0) return; // Duplicate.
+ if (sub_field->type != GRPC_JSON_NUMBER) return;
+ max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
+ if (max_milli_tokens == -1) return;
+ max_milli_tokens *= 1000;
+ } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
+ if (milli_token_ratio != 0) return; // Duplicate.
+ if (sub_field->type != GRPC_JSON_NUMBER) return;
+ // We support up to 3 decimal digits.
+ size_t whole_len = strlen(sub_field->value);
+ uint32_t multiplier = 1;
+ uint32_t decimal_value = 0;
+ const char *decimal_point = strchr(sub_field->value, '.');
+ if (decimal_point != NULL) {
+ whole_len = (size_t)(decimal_point - sub_field->value);
+ multiplier = 1000;
+ size_t decimal_len = strlen(decimal_point + 1);
+ if (decimal_len > 3) decimal_len = 3;
+ if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
+ &decimal_value)) {
+ return;
+ }
+ uint32_t decimal_multiplier = 1;
+ for (size_t i = 0; i < (3 - decimal_len); ++i) {
+ decimal_multiplier *= 10;
+ }
+ decimal_value *= decimal_multiplier;
+ }
+ uint32_t whole_value;
+ if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
+ &whole_value)) {
+ return;
+ }
+ milli_token_ratio = (int)((whole_value * multiplier) + decimal_value);
+ if (milli_token_ratio <= 0) return;
+ }
+ }
+ parsing_state->retry_throttle_data =
+ grpc_retry_throttle_map_get_data_for_server(
+ parsing_state->server_name, max_milli_tokens, milli_token_ratio);
+ }
+}
+
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
channel_data *chand = arg;
@@ -295,6 +357,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
bool exit_idle = false;
grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
char *service_config_json = NULL;
+ service_config_parsing_state parsing_state;
+ memset(&parsing_state, 0, sizeof(parsing_state));
if (chand->resolver_result != NULL) {
// Find LB policy name.
@@ -355,6 +419,19 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_service_config *service_config =
grpc_service_config_create(service_config_json);
if (service_config != NULL) {
+ channel_arg =
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
+ GPR_ASSERT(channel_arg != NULL);
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+ grpc_uri *uri =
+ grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
+ GPR_ASSERT(uri->path[0] != '\0');
+ parsing_state.server_name =
+ uri->path[0] == '/' ? uri->path + 1 : uri->path;
+ grpc_service_config_parse_global_params(
+ service_config, parse_retry_throttle_params, &parsing_state);
+ parsing_state.server_name = NULL;
+ grpc_uri_destroy(uri);
method_params_table = grpc_service_config_create_method_config_table(
exec_ctx, service_config, method_parameters_create_from_json,
&method_parameters_vtable);
@@ -386,6 +463,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
chand->info_service_config_json = service_config_json;
}
gpr_mu_unlock(&chand->info_mu);
+
+ if (chand->retry_throttle_data != NULL) {
+ grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
+ }
+ chand->retry_throttle_data = parsing_state.retry_throttle_data;
if (chand->method_params_table != NULL) {
grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
@@ -613,6 +695,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
gpr_free(chand->info_lb_policy_name);
gpr_free(chand->info_service_config_json);
+ if (chand->retry_throttle_data != NULL) {
+ grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
+ }
if (chand->method_params_table != NULL) {
grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
@@ -654,6 +739,7 @@ typedef struct client_channel_call_data {
grpc_slice path; // Request path.
gpr_timespec call_start_time;
gpr_timespec deadline;
+ grpc_server_retry_throttle_data *retry_throttle_data;
method_parameters *method_params;
grpc_error *cancel_error;
@@ -661,6 +747,7 @@ typedef struct client_channel_call_data {
/** either 0 for no call, 1 for cancelled, or a pointer to a
grpc_subchannel_call */
gpr_atm subchannel_call;
+ gpr_arena *arena;
subchannel_creation_phase creation_phase;
grpc_connected_subchannel *connected_subchannel;
@@ -675,6 +762,9 @@ typedef struct client_channel_call_data {
grpc_call_stack *owning_call;
grpc_linked_mdelem lb_token_mdelem;
+
+ grpc_closure on_complete;
+ grpc_closure *original_on_complete;
} call_data;
grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
@@ -727,7 +817,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
gpr_free(ops);
}
-// Sets calld->method_params.
+// Sets calld->method_params and calld->retry_throttle_data.
// If the method params specify a timeout, populates
// *per_method_deadline and returns true.
static bool set_call_method_params_from_service_config_locked(
@@ -735,6 +825,10 @@ static bool set_call_method_params_from_service_config_locked(
gpr_timespec *per_method_deadline) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
+ if (chand->retry_throttle_data != NULL) {
+ calld->retry_throttle_data =
+ grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
+ }
if (chand->method_params_table != NULL) {
calld->method_params = grpc_method_config_table_get(
exec_ctx, chand->method_params_table, calld->path);
@@ -796,9 +890,14 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
} else {
/* Create call on subchannel. */
grpc_subchannel_call *subchannel_call = NULL;
+ const grpc_connected_subchannel_call_args call_args = {
+ .pollent = calld->pollent,
+ .path = calld->path,
+ .start_time = calld->call_start_time,
+ .deadline = calld->deadline,
+ .arena = calld->arena};
grpc_error *new_error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
- calld->call_start_time, calld->deadline, &subchannel_call);
+ exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
subchannel_call = CANCELLED_CALL;
@@ -1025,9 +1124,14 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
calld->connected_subchannel != NULL) {
grpc_subchannel_call *subchannel_call = NULL;
+ const grpc_connected_subchannel_call_args call_args = {
+ .pollent = calld->pollent,
+ .path = calld->path,
+ .start_time = calld->call_start_time,
+ .deadline = calld->deadline,
+ .arena = calld->arena};
grpc_error *error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
- calld->call_start_time, calld->deadline, &subchannel_call);
+ exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
@@ -1045,6 +1149,26 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
add_waiting_locked(calld, op);
}
+static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_call_element *elem = arg;
+ call_data *calld = elem->call_data;
+ if (calld->retry_throttle_data != NULL) {
+ if (error == GRPC_ERROR_NONE) {
+ grpc_server_retry_throttle_data_record_success(
+ calld->retry_throttle_data);
+ } else {
+ // TODO(roth): In a subsequent PR, check the return value here and
+ // decide whether or not to retry. Note that we should only
+ // record failures whose statuses match the configured retryable
+ // or non-fatal status codes.
+ grpc_server_retry_throttle_data_record_failure(
+ calld->retry_throttle_data);
+ }
+ }
+ grpc_closure_run(exec_ctx, calld->original_on_complete,
+ GRPC_ERROR_REF(error));
+}
+
static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0);
@@ -1053,6 +1177,14 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_call_element *elem = op->handler_private.args[0];
call_data *calld = elem->call_data;
+ if (op->recv_trailing_metadata != NULL) {
+ GPR_ASSERT(op->on_complete != NULL);
+ calld->original_on_complete = op->on_complete;
+ grpc_closure_init(&calld->on_complete, on_complete, elem,
+ grpc_schedule_on_exec_ctx);
+ op->on_complete = &calld->on_complete;
+ }
+
start_transport_stream_op_locked_inner(exec_ctx, op, elem);
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
@@ -1114,6 +1246,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
calld->call_start_time = args->start_time;
calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
calld->owning_call = args->call_stack;
+ calld->arena = args->arena;
grpc_deadline_state_start(exec_ctx, elem, calld->deadline);
return GRPC_ERROR_NONE;
}
@@ -1122,7 +1255,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *and_free_memory) {
+ grpc_closure *then_schedule_closure) {
call_data *calld = elem->call_data;
grpc_deadline_state_destroy(exec_ctx, elem);
grpc_slice_unref_internal(exec_ctx, calld->path);
@@ -1132,6 +1265,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
+ grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure);
+ then_schedule_closure = NULL;
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
}
GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
@@ -1141,7 +1276,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
"picked");
}
gpr_free(calld->waiting_ops);
- gpr_free(and_free_memory);
+ grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
}
static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c
index 28d3b63f99..f51277d0b2 100644
--- a/src/core/ext/client_channel/client_channel_plugin.c
+++ b/src/core/ext/client_channel/client_channel_plugin.c
@@ -43,6 +43,7 @@
#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/client_channel/resolver_registry.h"
+#include "src/core/ext/client_channel/retry_throttle.h"
#include "src/core/ext/client_channel/subchannel_index.h"
#include "src/core/lib/surface/channel_init.h"
@@ -82,6 +83,7 @@ static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx,
void grpc_client_channel_init(void) {
grpc_lb_policy_registry_init();
grpc_resolver_registry_init();
+ grpc_retry_throttle_map_init();
grpc_proxy_mapper_registry_init();
grpc_register_http_proxy_mapper();
grpc_subchannel_index_init();
@@ -96,6 +98,7 @@ void grpc_client_channel_shutdown(void) {
grpc_subchannel_index_shutdown();
grpc_channel_init_shutdown();
grpc_proxy_mapper_registry_shutdown();
+ grpc_retry_throttle_map_shutdown();
grpc_resolver_registry_shutdown();
grpc_lb_policy_registry_shutdown();
}
diff --git a/src/core/ext/client_channel/proxy_mapper_registry.c b/src/core/ext/client_channel/proxy_mapper_registry.c
index 2c44b9d490..0935ddbdbd 100644
--- a/src/core/ext/client_channel/proxy_mapper_registry.c
+++ b/src/core/ext/client_channel/proxy_mapper_registry.c
@@ -94,6 +94,14 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) {
grpc_proxy_mapper_destroy(list->list[i]);
}
gpr_free(list->list);
+ // Clean up in case we re-initialze later.
+ // TODO(ctiller): This should ideally live in
+ // grpc_proxy_mapper_registry_init(). However, if we did this there,
+ // then we would do it AFTER we start registering proxy mappers from
+ // third-party plugins, so they'd never show up (and would leak memory).
+ // We probably need some sort of dependency system for plugins to fix
+ // this.
+ memset(list, 0, sizeof(*list));
}
//
@@ -102,9 +110,7 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) {
static grpc_proxy_mapper_list g_proxy_mapper_list;
-void grpc_proxy_mapper_registry_init() {
- memset(&g_proxy_mapper_list, 0, sizeof(g_proxy_mapper_list));
-}
+void grpc_proxy_mapper_registry_init() {}
void grpc_proxy_mapper_registry_shutdown() {
grpc_proxy_mapper_list_destroy(&g_proxy_mapper_list);
diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c
new file mode 100644
index 0000000000..8926c3d782
--- /dev/null
+++ b/src/core/ext/client_channel/retry_throttle.c
@@ -0,0 +1,210 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/client_channel/retry_throttle.h"
+
+#include <limits.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/atm.h>
+#include <grpc/support/avl.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/sync.h>
+
+//
+// server_retry_throttle_data
+//
+
+struct grpc_server_retry_throttle_data {
+ gpr_refcount refs;
+ int max_milli_tokens;
+ int milli_token_ratio;
+ gpr_atm milli_tokens;
+ // A pointer to the replacement for this grpc_server_retry_throttle_data
+ // entry. If non-NULL, then this entry is stale and must not be used.
+ // We hold a reference to the replacement.
+ gpr_atm replacement;
+};
+
+static void get_replacement_throttle_data_if_needed(
+ grpc_server_retry_throttle_data** throttle_data) {
+ while (true) {
+ grpc_server_retry_throttle_data* new_throttle_data =
+ (grpc_server_retry_throttle_data*)gpr_atm_acq_load(
+ &(*throttle_data)->replacement);
+ if (new_throttle_data == NULL) return;
+ *throttle_data = new_throttle_data;
+ }
+}
+
+bool grpc_server_retry_throttle_data_record_failure(
+ grpc_server_retry_throttle_data* throttle_data) {
+ // First, check if we are stale and need to be replaced.
+ get_replacement_throttle_data_if_needed(&throttle_data);
+ // We decrement milli_tokens by 1000 (1 token) for each failure.
+ const int new_value = (int)gpr_atm_no_barrier_clamped_add(
+ &throttle_data->milli_tokens, (gpr_atm)-1000, (gpr_atm)0,
+ (gpr_atm)throttle_data->max_milli_tokens);
+ // Retries are allowed as long as the new value is above the threshold
+ // (max_milli_tokens / 2).
+ return new_value > throttle_data->max_milli_tokens / 2;
+}
+
+void grpc_server_retry_throttle_data_record_success(
+ grpc_server_retry_throttle_data* throttle_data) {
+ // First, check if we are stale and need to be replaced.
+ get_replacement_throttle_data_if_needed(&throttle_data);
+ // We increment milli_tokens by milli_token_ratio for each success.
+ gpr_atm_no_barrier_clamped_add(
+ &throttle_data->milli_tokens, (gpr_atm)throttle_data->milli_token_ratio,
+ (gpr_atm)0, (gpr_atm)throttle_data->max_milli_tokens);
+}
+
+grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref(
+ grpc_server_retry_throttle_data* throttle_data) {
+ gpr_ref(&throttle_data->refs);
+ return throttle_data;
+}
+
+void grpc_server_retry_throttle_data_unref(
+ grpc_server_retry_throttle_data* throttle_data) {
+ if (gpr_unref(&throttle_data->refs)) {
+ grpc_server_retry_throttle_data* replacement =
+ (grpc_server_retry_throttle_data*)gpr_atm_acq_load(
+ &throttle_data->replacement);
+ if (replacement != NULL) {
+ grpc_server_retry_throttle_data_unref(replacement);
+ }
+ gpr_free(throttle_data);
+ }
+}
+
+static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create(
+ int max_milli_tokens, int milli_token_ratio,
+ grpc_server_retry_throttle_data* old_throttle_data) {
+ grpc_server_retry_throttle_data* throttle_data =
+ gpr_malloc(sizeof(*throttle_data));
+ memset(throttle_data, 0, sizeof(*throttle_data));
+ gpr_ref_init(&throttle_data->refs, 1);
+ throttle_data->max_milli_tokens = max_milli_tokens;
+ throttle_data->milli_token_ratio = milli_token_ratio;
+ int initial_milli_tokens = max_milli_tokens;
+ // If there was a pre-existing entry for this server name, initialize
+ // the token count by scaling proportionately to the old data. This
+ // ensures that if we're already throttling retries on the old scale,
+ // we will start out doing the same thing on the new one.
+ if (old_throttle_data != NULL) {
+ double token_fraction =
+ (int)gpr_atm_acq_load(&old_throttle_data->milli_tokens) /
+ (double)old_throttle_data->max_milli_tokens;
+ initial_milli_tokens = (int)(token_fraction * max_milli_tokens);
+ }
+ gpr_atm_rel_store(&throttle_data->milli_tokens,
+ (gpr_atm)initial_milli_tokens);
+ // If there was a pre-existing entry, mark it as stale and give it a
+ // pointer to the new entry, which is its replacement.
+ if (old_throttle_data != NULL) {
+ grpc_server_retry_throttle_data_ref(throttle_data);
+ gpr_atm_rel_store(&old_throttle_data->replacement, (gpr_atm)throttle_data);
+ }
+ return throttle_data;
+}
+
+//
+// avl vtable for string -> server_retry_throttle_data map
+//
+
+static void* copy_server_name(void* key) { return gpr_strdup(key); }
+
+static long compare_server_name(void* key1, void* key2) {
+ return strcmp(key1, key2);
+}
+
+static void destroy_server_retry_throttle_data(void* value) {
+ grpc_server_retry_throttle_data* throttle_data = value;
+ grpc_server_retry_throttle_data_unref(throttle_data);
+}
+
+static void* copy_server_retry_throttle_data(void* value) {
+ grpc_server_retry_throttle_data* throttle_data = value;
+ return grpc_server_retry_throttle_data_ref(throttle_data);
+}
+
+static const gpr_avl_vtable avl_vtable = {
+ gpr_free /* destroy_key */, copy_server_name, compare_server_name,
+ destroy_server_retry_throttle_data, copy_server_retry_throttle_data};
+
+//
+// server_retry_throttle_map
+//
+
+static gpr_mu g_mu;
+static gpr_avl g_avl;
+
+void grpc_retry_throttle_map_init() {
+ gpr_mu_init(&g_mu);
+ g_avl = gpr_avl_create(&avl_vtable);
+}
+
+void grpc_retry_throttle_map_shutdown() {
+ gpr_mu_destroy(&g_mu);
+ gpr_avl_unref(g_avl);
+}
+
+grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server(
+ const char* server_name, int max_milli_tokens, int milli_token_ratio) {
+ gpr_mu_lock(&g_mu);
+ grpc_server_retry_throttle_data* throttle_data =
+ gpr_avl_get(g_avl, (char*)server_name);
+ if (throttle_data == NULL) {
+ // Entry not found. Create a new one.
+ throttle_data = grpc_server_retry_throttle_data_create(
+ max_milli_tokens, milli_token_ratio, NULL);
+ g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data);
+ } else {
+ if (throttle_data->max_milli_tokens != max_milli_tokens ||
+ throttle_data->milli_token_ratio != milli_token_ratio) {
+ // Entry found but with old parameters. Create a new one based on
+ // the original one.
+ throttle_data = grpc_server_retry_throttle_data_create(
+ max_milli_tokens, milli_token_ratio, throttle_data);
+ g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data);
+ } else {
+ // Entry found. Increase refcount.
+ grpc_server_retry_throttle_data_ref(throttle_data);
+ }
+ }
+ gpr_mu_unlock(&g_mu);
+ return throttle_data;
+}
diff --git a/src/core/ext/client_channel/retry_throttle.h b/src/core/ext/client_channel/retry_throttle.h
new file mode 100644
index 0000000000..f9971faf65
--- /dev/null
+++ b/src/core/ext/client_channel/retry_throttle.h
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H
+#define GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H
+
+#include <stdbool.h>
+
+/// Tracks retry throttling data for an individual server name.
+typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data;
+
+/// Records a failure. Returns true if it's okay to send a retry.
+bool grpc_server_retry_throttle_data_record_failure(
+ grpc_server_retry_throttle_data* throttle_data);
+/// Records a success.
+void grpc_server_retry_throttle_data_record_success(
+ grpc_server_retry_throttle_data* throttle_data);
+
+grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref(
+ grpc_server_retry_throttle_data* throttle_data);
+void grpc_server_retry_throttle_data_unref(
+ grpc_server_retry_throttle_data* throttle_data);
+
+/// Initializes global map of failure data for each server name.
+void grpc_retry_throttle_map_init();
+/// Shuts down global map of failure data for each server name.
+void grpc_retry_throttle_map_shutdown();
+
+/// Returns a reference to the failure data for \a server_name, creating
+/// a new entry if needed.
+/// Caller must eventually unref via \a grpc_server_retry_throttle_data_unref().
+grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server(
+ const char* server_name, int max_milli_tokens, int milli_token_ratio);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RETRY_THROTTLE_H */
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index 5df0a9060d..ed5029ea9a 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -148,6 +148,7 @@ struct grpc_subchannel {
struct grpc_subchannel_call {
grpc_connected_subchannel *connection;
+ grpc_closure *schedule_closure_after_destroy;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
@@ -340,17 +341,15 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(new_address != NULL);
gpr_free(addr);
addr = new_address;
- if (new_args != NULL) c->args = new_args;
- }
- if (c->args == NULL) {
- static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
- grpc_arg new_arg = grpc_create_subchannel_address_arg(addr);
- c->args = grpc_channel_args_copy_and_add_and_remove(
- args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg,
- 1);
- gpr_free(new_arg.value.string);
}
+ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
+ grpc_arg new_arg = grpc_create_subchannel_address_arg(addr);
gpr_free(addr);
+ c->args = grpc_channel_args_copy_and_add_and_remove(
+ new_args != NULL ? new_args : args->args, keys_to_remove,
+ GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
+ gpr_free(new_arg.value.string);
+ if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
&c->root_external_state_watcher;
grpc_closure_init(&c->connected, subchannel_connected, c,
@@ -719,13 +718,22 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
grpc_subchannel_call *c = call;
+ GPR_ASSERT(c->schedule_closure_after_destroy != NULL);
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_connected_subchannel *connection = c->connection;
- grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, c);
+ grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL,
+ c->schedule_closure_after_destroy);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call");
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
+void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call *call,
+ grpc_closure *closure) {
+ GPR_ASSERT(call->schedule_closure_after_destroy == NULL);
+ GPR_ASSERT(closure != NULL);
+ call->schedule_closure_after_destroy = closure;
+}
+
void grpc_subchannel_call_ref(
grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
@@ -761,15 +769,22 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time,
- gpr_timespec deadline, grpc_subchannel_call **call) {
+ const grpc_connected_subchannel_call_args *args,
+ grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
- *call = gpr_zalloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
+ *call = gpr_arena_alloc(
+ args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = con; // Ref is added below.
- grpc_error *error =
- grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
- NULL, NULL, path, start_time, deadline, callstk);
+ const grpc_call_element_args call_args = {.call_stack = callstk,
+ .server_transport_data = NULL,
+ .context = NULL,
+ .path = args->path,
+ .start_time = args->start_time,
+ .deadline = args->deadline,
+ .arena = args->arena};
+ grpc_error *error = grpc_call_stack_init(
+ exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args);
if (error != GRPC_ERROR_NONE) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);
@@ -778,7 +793,7 @@ grpc_error *grpc_connected_subchannel_create_call(
return error;
}
GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
- grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent);
+ grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, args->pollent);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h
index 6a70a76467..3e64a2507c 100644
--- a/src/core/ext/client_channel/subchannel.h
+++ b/src/core/ext/client_channel/subchannel.h
@@ -37,6 +37,7 @@
#include "src/core/ext/client_channel/connector.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/support/arena.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
@@ -112,10 +113,18 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
/** construct a subchannel call */
+typedef struct {
+ grpc_polling_entity *pollent;
+ grpc_slice path;
+ gpr_timespec start_time;
+ gpr_timespec deadline;
+ gpr_arena *arena;
+} grpc_connected_subchannel_call_args;
+
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
- grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time,
- gpr_timespec deadline, grpc_subchannel_call **subchannel_call);
+ const grpc_connected_subchannel_call_args *args,
+ grpc_subchannel_call **subchannel_call);
/** process a transport level op */
void grpc_connected_subchannel_process_transport_op(
@@ -154,6 +163,11 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *subchannel_call);
+/** Must be called once per call. Sets the 'then_schedule_closure' argument for
+ call stack destruction. */
+void grpc_subchannel_call_set_cleanup_closure(
+ grpc_subchannel_call *subchannel_call, grpc_closure *closure);
+
grpc_call_stack *grpc_subchannel_call_get_call_stack(
grpc_subchannel_call *subchannel_call);
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index c2750634a5..4ed832671d 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -123,7 +123,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {
+ grpc_closure *ignored) {
call_data *calld = elem->call_data;
/* TODO(dgq): do something with the data
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index a3684535ff..89659e7464 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -575,7 +575,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) {
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
- const void *server_data) {
+ const void *server_data, gpr_arena *arena) {
GPR_TIMER_BEGIN("init_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
@@ -588,8 +588,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_ref_init(&s->active_streams, 1);
GRPC_CHTTP2_STREAM_REF(s, "chttp2");
- grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]);
- grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]);
+ grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
+ grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena);
grpc_chttp2_data_parser_init(&s->data_parser);
grpc_slice_buffer_init(&s->flow_controlled_buffer);
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
@@ -665,16 +665,17 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_TIMER_END("destroy_stream", 0);
- gpr_free(s->destroy_stream_arg);
+ grpc_closure_sched(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, void *and_free_memory) {
+ grpc_stream *gs,
+ grpc_closure *then_schedule_closure) {
GPR_TIMER_BEGIN("destroy_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- s->destroy_stream_arg = and_free_memory;
+ s->destroy_stream_arg = then_schedule_closure;
grpc_closure_sched(
exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s,
grpc_combiner_scheduler(t->combiner, false)),
@@ -1629,15 +1630,19 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
s->recv_trailing_metadata_finished != NULL) {
char status_string[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(status, status_string);
- grpc_chttp2_incoming_metadata_buffer_replace_or_add(
- exec_ctx, &s->metadata_buffer[1],
- grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_STATUS,
- grpc_slice_from_copied_string(status_string)));
+ GRPC_LOG_IF_ERROR("add_status",
+ grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ exec_ctx, &s->metadata_buffer[1],
+ grpc_mdelem_from_slices(
+ exec_ctx, GRPC_MDSTR_GRPC_STATUS,
+ grpc_slice_from_copied_string(status_string))));
if (msg != NULL) {
- grpc_chttp2_incoming_metadata_buffer_replace_or_add(
- exec_ctx, &s->metadata_buffer[1],
- grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
- grpc_slice_from_copied_string(msg)));
+ GRPC_LOG_IF_ERROR(
+ "add_status_message",
+ grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ exec_ctx, &s->metadata_buffer[1],
+ grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
+ grpc_slice_from_copied_string(msg))));
}
s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
@@ -1760,6 +1765,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error) {
grpc_slice hdr;
grpc_slice status_hdr;
+ grpc_slice http_status_hdr;
grpc_slice message_pfx;
uint8_t *p;
uint32_t len = 0;
@@ -1775,6 +1781,26 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
It's complicated by the fact that our send machinery would be dead by
the time we got around to sending this, so instead we ignore HPACK
compression and just write the uncompressed bytes onto the wire. */
+ if (!s->sent_initial_metadata) {
+ http_status_hdr = grpc_slice_malloc(13);
+ p = GRPC_SLICE_START_PTR(http_status_hdr);
+ *p++ = 0x00;
+ *p++ = 7;
+ *p++ = ':';
+ *p++ = 's';
+ *p++ = 't';
+ *p++ = 'a';
+ *p++ = 't';
+ *p++ = 'u';
+ *p++ = 's';
+ *p++ = 3;
+ *p++ = '2';
+ *p++ = '0';
+ *p++ = '0';
+ GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
+ len += (uint32_t)GRPC_SLICE_LENGTH(http_status_hdr);
+ }
+
status_hdr = grpc_slice_malloc(15 + (grpc_status >= 10));
p = GRPC_SLICE_START_PTR(status_hdr);
*p++ = 0x00; /* literal header, not indexed */
@@ -1842,6 +1868,9 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
grpc_slice_buffer_add(&t->qbuf, hdr);
+ if (!s->sent_initial_metadata) {
+ grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
+ }
grpc_slice_buffer_add(&t->qbuf, status_hdr);
if (msg != NULL) {
grpc_slice_buffer_add(&t->qbuf, message_pfx);
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index 40f5120308..1865b997b7 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -1620,13 +1620,18 @@ void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_error *grpc_chttp2_hpack_parser_parse(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
grpc_slice slice) {
- /* TODO(ctiller): limit the distance of end from beg, and perform multiple
- steps in the event of a large chunk of data to limit
- stack space usage when no tail call optimization is
- available */
+/* max number of bytes to parse at a time... limits call stack depth on
+ * compilers without TCO */
+#define MAX_PARSE_LENGTH 1024
p->current_slice_refcount = slice.refcount;
- grpc_error *error = p->state(exec_ctx, p, GRPC_SLICE_START_PTR(slice),
- GRPC_SLICE_END_PTR(slice));
+ uint8_t *start = GRPC_SLICE_START_PTR(slice);
+ uint8_t *end = GRPC_SLICE_END_PTR(slice);
+ grpc_error *error = GRPC_ERROR_NONE;
+ while (start != end && error == GRPC_ERROR_NONE) {
+ uint8_t *target = start + GPR_MIN(MAX_PARSE_LENGTH, end - start);
+ error = p->state(exec_ctx, p, start, target);
+ start = target;
+ }
p->current_slice_refcount = NULL;
return error;
}
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
index c91b019aa0..da0a34d32a 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
@@ -41,69 +41,48 @@
#include <grpc/support/log.h>
void grpc_chttp2_incoming_metadata_buffer_init(
- grpc_chttp2_incoming_metadata_buffer *buffer) {
- buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) {
+ buffer->arena = arena;
+ grpc_metadata_batch_init(&buffer->batch);
+ buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) {
- size_t i;
- if (!buffer->published) {
- for (i = 0; i < buffer->count; i++) {
- GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
- }
- }
- gpr_free(buffer->elems);
+ grpc_metadata_batch_destroy(exec_ctx, &buffer->batch);
}
-void grpc_chttp2_incoming_metadata_buffer_add(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) {
- GPR_ASSERT(!buffer->published);
- if (buffer->capacity == buffer->count) {
- buffer->capacity = GPR_MAX(8, 2 * buffer->capacity);
- buffer->elems =
- gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity);
- }
- buffer->elems[buffer->count++].md = elem;
+grpc_error *grpc_chttp2_incoming_metadata_buffer_add(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
+ grpc_mdelem elem) {
buffer->size += GRPC_MDELEM_LENGTH(elem);
+ return grpc_metadata_batch_add_tail(
+ exec_ctx, &buffer->batch,
+ gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem);
}
-void grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_mdelem elem) {
- for (size_t i = 0; i < buffer->count; i++) {
- if (grpc_slice_eq(GRPC_MDKEY(buffer->elems[i].md), GRPC_MDKEY(elem))) {
- GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
- buffer->elems[i].md = elem;
- return;
+ for (grpc_linked_mdelem *l = buffer->batch.list.head; l != NULL;
+ l = l->next) {
+ if (grpc_slice_eq(GRPC_MDKEY(l->md), GRPC_MDKEY(elem))) {
+ GRPC_MDELEM_UNREF(exec_ctx, l->md);
+ l->md = elem;
+ return GRPC_ERROR_NONE;
}
}
- grpc_chttp2_incoming_metadata_buffer_add(buffer, elem);
+ return grpc_chttp2_incoming_metadata_buffer_add(exec_ctx, buffer, elem);
}
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) {
- GPR_ASSERT(!buffer->published);
- buffer->deadline = deadline;
+ buffer->batch.deadline = deadline;
}
void grpc_chttp2_incoming_metadata_buffer_publish(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_metadata_batch *batch) {
- GPR_ASSERT(!buffer->published);
- buffer->published = 1;
- if (buffer->count > 0) {
- size_t i;
- for (i = 0; i < buffer->count; i++) {
- /* TODO(ctiller): do something better here */
- if (!GRPC_LOG_IF_ERROR("grpc_chttp2_incoming_metadata_buffer_publish",
- grpc_metadata_batch_link_tail(
- exec_ctx, batch, &buffer->elems[i]))) {
- GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
- }
- }
- } else {
- batch->list.head = batch->list.tail = NULL;
- }
- batch->deadline = buffer->deadline;
+ *batch = buffer->batch;
+ grpc_metadata_batch_init(&buffer->batch);
}
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
index 1eac6fc150..288c917e65 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
@@ -37,28 +37,26 @@
#include "src/core/lib/transport/transport.h"
typedef struct {
- grpc_linked_mdelem *elems;
- size_t count;
- size_t capacity;
- gpr_timespec deadline;
- int published;
+ gpr_arena *arena;
+ grpc_metadata_batch batch;
size_t size; // total size of metadata
} grpc_chttp2_incoming_metadata_buffer;
/** assumes everything initially zeroed */
void grpc_chttp2_incoming_metadata_buffer_init(
- grpc_chttp2_incoming_metadata_buffer *buffer);
+ grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena);
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_publish(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_metadata_batch *batch);
-void grpc_chttp2_incoming_metadata_buffer_add(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem);
-void grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+grpc_error *grpc_chttp2_incoming_metadata_buffer_add(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
- grpc_mdelem elem);
+ grpc_mdelem elem) GRPC_MUST_USE_RESULT;
+grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
+ grpc_mdelem elem) GRPC_MUST_USE_RESULT;
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index d26812ad6b..3c56c21599 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -425,7 +425,7 @@ struct grpc_chttp2_stream {
grpc_stream_refcount *refcount;
grpc_closure destroy_stream;
- void *destroy_stream_arg;
+ grpc_closure *destroy_stream_arg;
grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
uint8_t included[STREAM_LIST_COUNT];
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index e7f2597f89..7efc8c63c9 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -548,7 +548,14 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
s->seen_error = true;
GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
- grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md);
+ grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add(
+ exec_ctx, &s->metadata_buffer[0], md);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
+ grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
+ s->seen_error = true;
+ GRPC_MDELEM_UNREF(exec_ctx, md);
+ }
}
}
@@ -598,7 +605,14 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp,
s->seen_error = true;
GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
- grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md);
+ grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add(
+ exec_ctx, &s->metadata_buffer[1], md);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
+ grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
+ s->seen_error = true;
+ GRPC_MDELEM_UNREF(exec_ctx, md);
+ }
}
GPR_TIMER_END("on_trailing_header", 0);
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index e187360330..450d9ab23a 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -184,6 +184,7 @@ struct op_storage {
};
struct stream_obj {
+ gpr_arena *arena;
struct op_and_state *oas;
grpc_transport_stream_op *curr_op;
grpc_cronet_transport *curr_ct;
@@ -486,15 +487,18 @@ static void on_response_headers_received(
gpr_mu_lock(&s->mu);
memset(&s->state.rs.initial_metadata, 0,
sizeof(s->state.rs.initial_metadata));
- grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata);
+ grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata,
+ s->arena);
for (size_t i = 0; i < headers->count; i++) {
- grpc_chttp2_incoming_metadata_buffer_add(
- &s->state.rs.initial_metadata,
- grpc_mdelem_from_slices(
- &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
- headers->headers[i].key)),
- grpc_slice_intern(
- grpc_slice_from_static_string(headers->headers[i].value))));
+ GRPC_LOG_IF_ERROR(
+ "on_response_headers_received",
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &exec_ctx, &s->state.rs.initial_metadata,
+ grpc_mdelem_from_slices(
+ &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
+ headers->headers[i].key)),
+ grpc_slice_intern(grpc_slice_from_static_string(
+ headers->headers[i].value)))));
}
s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
@@ -586,17 +590,20 @@ static void on_response_trailers_received(
memset(&s->state.rs.trailing_metadata, 0,
sizeof(s->state.rs.trailing_metadata));
s->state.rs.trailing_metadata_valid = false;
- grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata);
+ grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata,
+ s->arena);
for (size_t i = 0; i < trailers->count; i++) {
CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key,
trailers->headers[i].value);
- grpc_chttp2_incoming_metadata_buffer_add(
- &s->state.rs.trailing_metadata,
- grpc_mdelem_from_slices(
- &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
- trailers->headers[i].key)),
- grpc_slice_intern(
- grpc_slice_from_static_string(trailers->headers[i].value))));
+ GRPC_LOG_IF_ERROR(
+ "on_response_trailers_received",
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &exec_ctx, &s->state.rs.trailing_metadata,
+ grpc_mdelem_from_slices(
+ &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
+ trailers->headers[i].key)),
+ grpc_slice_intern(grpc_slice_from_static_string(
+ trailers->headers[i].value)))));
s->state.rs.trailing_metadata_valid = true;
if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
0 != strcmp(trailers->headers[i].value, "0")) {
@@ -1215,7 +1222,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
- const void *server_data) {
+ const void *server_data, gpr_arena *arena) {
stream_obj *s = (stream_obj *)gs;
memset(&s->storage, 0, sizeof(s->storage));
s->storage.head = NULL;
@@ -1237,6 +1244,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->curr_gs = gs;
s->curr_ct = (grpc_cronet_transport *)gt;
+ s->arena = arena;
gpr_mu_init(&s->mu);
return 0;
@@ -1273,10 +1281,12 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, void *and_free_memory) {
+ grpc_stream *gs,
+ grpc_closure *then_schedule_closure) {
stream_obj *s = (stream_obj *)gs;
null_and_maybe_free_read_buffer(s);
GRPC_ERROR_UNREF(s->state.cancel_error);
+ grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
}
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {}
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 3fb2a60ac7..6d53b0576e 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -166,41 +166,32 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
}
}
-grpc_error *grpc_call_stack_init(
- grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
- int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
- grpc_call_context_element *context, const void *transport_server_data,
- grpc_slice path, gpr_timespec start_time, gpr_timespec deadline,
- grpc_call_stack *call_stack) {
+grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack,
+ int initial_refs, grpc_iomgr_cb_func destroy,
+ void *destroy_arg,
+ const grpc_call_element_args *elem_args) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
size_t count = channel_stack->count;
grpc_call_element *call_elems;
char *user_data;
size_t i;
- call_stack->count = count;
- GRPC_STREAM_REF_INIT(&call_stack->refcount, initial_refs, destroy,
+ elem_args->call_stack->count = count;
+ GRPC_STREAM_REF_INIT(&elem_args->call_stack->refcount, initial_refs, destroy,
destroy_arg, "CALL_STACK");
- call_elems = CALL_ELEMS_FROM_STACK(call_stack);
+ call_elems = CALL_ELEMS_FROM_STACK(elem_args->call_stack);
user_data = ((char *)call_elems) +
ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
/* init per-filter data */
grpc_error *first_error = GRPC_ERROR_NONE;
- const grpc_call_element_args args = {
- .start_time = start_time,
- .call_stack = call_stack,
- .server_transport_data = transport_server_data,
- .context = context,
- .path = path,
- .deadline = deadline,
- };
for (i = 0; i < count; i++) {
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
- grpc_error *error =
- call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args);
+ grpc_error *error = call_elems[i].filter->init_call_elem(
+ exec_ctx, &call_elems[i], elem_args);
if (error != GRPC_ERROR_NONE) {
if (first_error == GRPC_ERROR_NONE) {
first_error = error;
@@ -241,15 +232,16 @@ void grpc_call_stack_ignore_set_pollset_or_pollset_set(
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
const grpc_call_final_info *final_info,
- void *and_free_memory) {
+ grpc_closure *then_schedule_closure) {
grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
size_t count = stack->count;
size_t i;
/* destroy per-filter data */
for (i = 0; i < count; i++) {
- elems[i].filter->destroy_call_elem(exec_ctx, &elems[i], final_info,
- i == count - 1 ? and_free_memory : NULL);
+ elems[i].filter->destroy_call_elem(
+ exec_ctx, &elems[i], final_info,
+ i == count - 1 ? then_schedule_closure : NULL);
}
}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 6d3340bcbf..80e3603e8d 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -56,6 +56,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/support/arena.h"
#include "src/core/lib/transport/transport.h"
#ifdef __cplusplus
@@ -84,6 +85,7 @@ typedef struct {
grpc_slice path;
gpr_timespec start_time;
gpr_timespec deadline;
+ gpr_arena *arena;
} grpc_call_element_args;
typedef struct {
@@ -139,12 +141,12 @@ typedef struct {
/* Destroy per call data.
The filter does not need to do any chaining.
The bottom filter of a stack will be passed a non-NULL pointer to
- \a and_free_memory that should be passed to gpr_free when destruction
- is complete. \a final_info contains data about the completed call, mainly
- for reporting purposes. */
+ \a then_schedule_closure that should be passed to grpc_closure_sched when
+ destruction is complete. \a final_info contains data about the completed
+ call, mainly for reporting purposes. */
void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *and_free_memory);
+ grpc_closure *then_schedule_closure);
/* sizeof(per channel data) */
size_t sizeof_channel_data;
@@ -236,12 +238,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
/* Initialize a call stack given a channel stack. transport_server_data is
expected to be NULL on a client, or an opaque transport owned pointer on the
server. */
-grpc_error *grpc_call_stack_init(
- grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
- int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
- grpc_call_context_element *context, const void *transport_server_data,
- grpc_slice path, gpr_timespec start_time, gpr_timespec deadline,
- grpc_call_stack *call_stack);
+grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack,
+ int initial_refs, grpc_iomgr_cb_func destroy,
+ void *destroy_arg,
+ const grpc_call_element_args *elem_args);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
@@ -271,7 +272,7 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
const grpc_call_final_info *final_info,
- void *and_free_memory);
+ grpc_closure *then_schedule_closure);
/* Ignore set pollset{_set} - used by filters if they don't care about pollsets
* at all. Does nothing. */
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index aa41014a21..02dc479f3a 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -292,7 +292,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {
+ grpc_closure *ignored) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index 29796f7ca7..42ef7b7806 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -88,7 +88,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
channel_data *chand = elem->channel_data;
int r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- &args->call_stack->refcount, args->server_transport_data);
+ &args->call_stack->refcount, args->server_transport_data, args->arena);
return r == 0 ? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("transport stream initialization failed");
}
@@ -105,12 +105,12 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *and_free_memory) {
+ grpc_closure *then_schedule_closure) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_transport_destroy_stream(exec_ctx, chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- and_free_memory);
+ then_schedule_closure);
}
/* Constructor for channel_data */
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
index 5a12d62f1d..34114bbebf 100644
--- a/src/core/lib/channel/deadline_filter.c
+++ b/src/core/lib/channel/deadline_filter.c
@@ -256,7 +256,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
// Destructor for call_data. Used for both client and server filters.
static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
const grpc_call_final_info* final_info,
- void* and_free_memory) {
+ grpc_closure* ignored) {
grpc_deadline_state_destroy(exec_ctx, elem);
}
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index c031533dd8..f9d0d689ac 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -412,7 +412,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {
+ grpc_closure *ignored) {
call_data *calld = elem->call_data;
grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
}
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index fb70de8e96..bebd3af335 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -358,7 +358,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {
+ grpc_closure *ignored) {
call_data *calld = elem->call_data;
grpc_slice_buffer_destroy_internal(exec_ctx, &calld->read_slice_buffer);
}
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index b424c0d2ac..5ba13fe251 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -200,7 +200,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
// Destructor for call_data.
static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
const grpc_call_final_info* final_info,
- void* ignored) {}
+ grpc_closure* ignored) {}
// Constructor for channel_data.
static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index f1897bb91f..94a454c0b7 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -39,6 +39,7 @@
#if defined(GRPC_UV)
// Do nothing
#elif defined(GPR_MANYLINUX1)
+#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
#define GRPC_HAVE_MSG_NOSIGNAL 1
@@ -65,6 +66,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_LINUX)
+#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
#define GRPC_HAVE_MSG_NOSIGNAL 1
@@ -90,6 +92,7 @@
#define GRPC_POSIX_SOCKETUTILS
#endif
#elif defined(GPR_APPLE)
+#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_SO_NOSIGPIPE 1
#define GRPC_HAVE_UNIX_SOCKET 1
#define GRPC_MSG_IOVLEN_TYPE int
@@ -100,6 +103,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_FREEBSD)
+#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_SO_NOSIGPIPE 1
#define GRPC_HAVE_UNIX_SOCKET 1
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 5f286a6723..e242631fc0 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -44,11 +44,8 @@
#include <errno.h>
#include <fcntl.h>
-#include <ifaddrs.h>
-#include <limits.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
-#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
@@ -67,82 +64,10 @@
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
+#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/support/string.h"
-#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
-
-static gpr_once s_init_max_accept_queue_size;
-static int s_max_accept_queue_size;
-
-/* one listening port */
-typedef struct grpc_tcp_listener grpc_tcp_listener;
-struct grpc_tcp_listener {
- int fd;
- grpc_fd *emfd;
- grpc_tcp_server *server;
- grpc_resolved_address addr;
- int port;
- unsigned port_index;
- unsigned fd_index;
- grpc_closure read_closure;
- grpc_closure destroyed_closure;
- struct grpc_tcp_listener *next;
- /* sibling is a linked list of all listeners for a given port. add_port and
- clone_port place all new listeners in the same sibling list. A member of
- the 'sibling' list is also a member of the 'next' list. The head of each
- sibling list has is_sibling==0, and subsequent members of sibling lists
- have is_sibling==1. is_sibling allows separate sibling lists to be
- identified while iterating through 'next'. */
- struct grpc_tcp_listener *sibling;
- int is_sibling;
-};
-
-/* the overall server */
-struct grpc_tcp_server {
- gpr_refcount refs;
- /* Called whenever accept() succeeds on a server port. */
- grpc_tcp_server_cb on_accept_cb;
- void *on_accept_cb_arg;
-
- gpr_mu mu;
-
- /* active port count: how many ports are actually still listening */
- size_t active_ports;
- /* destroyed port count: how many ports are completely destroyed */
- size_t destroyed_ports;
-
- /* is this server shutting down? */
- bool shutdown;
- /* have listeners been shutdown? */
- bool shutdown_listeners;
- /* use SO_REUSEPORT */
- bool so_reuseport;
- /* expand wildcard addresses to a list of all local addresses */
- bool expand_wildcard_addrs;
-
- /* linked list of server ports */
- grpc_tcp_listener *head;
- grpc_tcp_listener *tail;
- unsigned nports;
-
- /* List of closures passed to shutdown_starting_add(). */
- grpc_closure_list shutdown_starting;
-
- /* shutdown callback */
- grpc_closure *shutdown_complete;
-
- /* all pollsets interested in new connections */
- grpc_pollset **pollsets;
- /* number of pollsets in the pollsets array */
- size_t pollset_count;
-
- /* next pollset to assign a channel to */
- gpr_atm next_pollset_to_assign;
-
- grpc_resource_quota *resource_quota;
-};
-
static gpr_once check_init = GPR_ONCE_INIT;
static bool has_so_reuseport = false;
@@ -301,99 +226,6 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
}
-/* get max listen queue size on linux */
-static void init_max_accept_queue_size(void) {
- int n = SOMAXCONN;
- char buf[64];
- FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r");
- if (fp == NULL) {
- /* 2.4 kernel. */
- s_max_accept_queue_size = SOMAXCONN;
- return;
- }
- if (fgets(buf, sizeof buf, fp)) {
- char *end;
- long i = strtol(buf, &end, 10);
- if (i > 0 && i <= INT_MAX && end && *end == 0) {
- n = (int)i;
- }
- }
- fclose(fp);
- s_max_accept_queue_size = n;
-
- if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) {
- gpr_log(GPR_INFO,
- "Suspiciously small accept queue (%d) will probably lead to "
- "connection drops",
- s_max_accept_queue_size);
- }
-}
-
-static int get_max_accept_queue_size(void) {
- gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size);
- return s_max_accept_queue_size;
-}
-
-/* Prepare a recently-created socket for listening. */
-static grpc_error *prepare_socket(int fd, const grpc_resolved_address *addr,
- bool so_reuseport, int *port) {
- grpc_resolved_address sockname_temp;
- grpc_error *err = GRPC_ERROR_NONE;
-
- GPR_ASSERT(fd >= 0);
-
- if (so_reuseport && !grpc_is_unix_socket(addr)) {
- err = grpc_set_socket_reuse_port(fd, 1);
- if (err != GRPC_ERROR_NONE) goto error;
- }
-
- err = grpc_set_socket_nonblocking(fd, 1);
- if (err != GRPC_ERROR_NONE) goto error;
- err = grpc_set_socket_cloexec(fd, 1);
- if (err != GRPC_ERROR_NONE) goto error;
- if (!grpc_is_unix_socket(addr)) {
- err = grpc_set_socket_low_latency(fd, 1);
- if (err != GRPC_ERROR_NONE) goto error;
- err = grpc_set_socket_reuse_addr(fd, 1);
- if (err != GRPC_ERROR_NONE) goto error;
- }
- err = grpc_set_socket_no_sigpipe_if_possible(fd);
- if (err != GRPC_ERROR_NONE) goto error;
-
- GPR_ASSERT(addr->len < ~(socklen_t)0);
- if (bind(fd, (struct sockaddr *)addr->addr, (socklen_t)addr->len) < 0) {
- err = GRPC_OS_ERROR(errno, "bind");
- goto error;
- }
-
- if (listen(fd, get_max_accept_queue_size()) < 0) {
- err = GRPC_OS_ERROR(errno, "listen");
- goto error;
- }
-
- sockname_temp.len = sizeof(struct sockaddr_storage);
-
- if (getsockname(fd, (struct sockaddr *)sockname_temp.addr,
- (socklen_t *)&sockname_temp.len) < 0) {
- err = GRPC_OS_ERROR(errno, "getsockname");
- goto error;
- }
-
- *port = grpc_sockaddr_get_port(&sockname_temp);
- return GRPC_ERROR_NONE;
-
-error:
- GPR_ASSERT(err != GRPC_ERROR_NONE);
- if (fd >= 0) {
- close(fd);
- }
- grpc_error *ret = grpc_error_set_int(
- GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1),
- GRPC_ERROR_INT_FD, fd);
- GRPC_ERROR_UNREF(err);
- return ret;
-}
-
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_tcp_listener *sp = arg;
@@ -477,216 +309,6 @@ error:
}
}
-static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd,
- const grpc_resolved_address *addr,
- unsigned port_index, unsigned fd_index,
- grpc_tcp_listener **listener) {
- grpc_tcp_listener *sp = NULL;
- int port = -1;
- char *addr_str;
- char *name;
-
- grpc_error *err = prepare_socket(fd, addr, s->so_reuseport, &port);
- if (err == GRPC_ERROR_NONE) {
- GPR_ASSERT(port > 0);
- grpc_sockaddr_to_string(&addr_str, addr, 1);
- gpr_asprintf(&name, "tcp-server-listener:%s", addr_str);
- gpr_mu_lock(&s->mu);
- s->nports++;
- GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
- sp = gpr_malloc(sizeof(grpc_tcp_listener));
- sp->next = NULL;
- if (s->head == NULL) {
- s->head = sp;
- } else {
- s->tail->next = sp;
- }
- s->tail = sp;
- sp->server = s;
- sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name);
- memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
- sp->port = port;
- sp->port_index = port_index;
- sp->fd_index = fd_index;
- sp->is_sibling = 0;
- sp->sibling = NULL;
- GPR_ASSERT(sp->emfd);
- gpr_mu_unlock(&s->mu);
- gpr_free(addr_str);
- gpr_free(name);
- }
-
- *listener = sp;
- return err;
-}
-
-/* If successful, add a listener to s for addr, set *dsmode for the socket, and
- return the *listener. */
-static grpc_error *add_addr_to_server(grpc_tcp_server *s,
- const grpc_resolved_address *addr,
- unsigned port_index, unsigned fd_index,
- grpc_dualstack_mode *dsmode,
- grpc_tcp_listener **listener) {
- grpc_resolved_address addr4_copy;
- int fd;
- grpc_error *err =
- grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd);
- if (err != GRPC_ERROR_NONE) {
- return err;
- }
- if (*dsmode == GRPC_DSMODE_IPV4 &&
- grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
- addr = &addr4_copy;
- }
- return add_socket_to_server(s, fd, addr, port_index, fd_index, listener);
-}
-
-/* Bind to "::" to get a port number not used by any address. */
-static grpc_error *get_unused_port(int *port) {
- grpc_resolved_address wild;
- grpc_sockaddr_make_wildcard6(0, &wild);
- grpc_dualstack_mode dsmode;
- int fd;
- grpc_error *err =
- grpc_create_dualstack_socket(&wild, SOCK_STREAM, 0, &dsmode, &fd);
- if (err != GRPC_ERROR_NONE) {
- return err;
- }
- if (dsmode == GRPC_DSMODE_IPV4) {
- grpc_sockaddr_make_wildcard4(0, &wild);
- }
- if (bind(fd, (const struct sockaddr *)wild.addr, (socklen_t)wild.len) != 0) {
- err = GRPC_OS_ERROR(errno, "bind");
- close(fd);
- return err;
- }
- if (getsockname(fd, (struct sockaddr *)wild.addr, (socklen_t *)&wild.len) !=
- 0) {
- err = GRPC_OS_ERROR(errno, "getsockname");
- close(fd);
- return err;
- }
- close(fd);
- *port = grpc_sockaddr_get_port(&wild);
- return *port <= 0 ? GRPC_ERROR_CREATE("Bad port") : GRPC_ERROR_NONE;
-}
-
-/* Return the listener in s with address addr or NULL. */
-static grpc_tcp_listener *find_listener_with_addr(grpc_tcp_server *s,
- grpc_resolved_address *addr) {
- grpc_tcp_listener *l;
- gpr_mu_lock(&s->mu);
- for (l = s->head; l != NULL; l = l->next) {
- if (l->addr.len != addr->len) {
- continue;
- }
- if (memcmp(l->addr.addr, addr->addr, addr->len) == 0) {
- break;
- }
- }
- gpr_mu_unlock(&s->mu);
- return l;
-}
-
-/* Get all addresses assigned to network interfaces on the machine and create a
- listener for each. requested_port is the port to use for every listener, or 0
- to select one random port that will be used for every listener. Set *out_port
- to the port selected. Return GRPC_ERROR_NONE only if all listeners were
- added. */
-static grpc_error *add_all_local_addrs_to_server(grpc_tcp_server *s,
- unsigned port_index,
- int requested_port,
- int *out_port) {
- struct ifaddrs *ifa = NULL;
- struct ifaddrs *ifa_it;
- unsigned fd_index = 0;
- grpc_tcp_listener *sp = NULL;
- grpc_error *err = GRPC_ERROR_NONE;
- if (requested_port == 0) {
- /* Note: There could be a race where some local addrs can listen on the
- selected port and some can't. The sane way to handle this would be to
- retry by recreating the whole grpc_tcp_server. Backing out individual
- listeners and orphaning the FDs looks like too much trouble. */
- if ((err = get_unused_port(&requested_port)) != GRPC_ERROR_NONE) {
- return err;
- } else if (requested_port <= 0) {
- return GRPC_ERROR_CREATE("Bad get_unused_port()");
- }
- gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port);
- }
- if (getifaddrs(&ifa) != 0 || ifa == NULL) {
- return GRPC_OS_ERROR(errno, "getifaddrs");
- }
- for (ifa_it = ifa; ifa_it != NULL; ifa_it = ifa_it->ifa_next) {
- grpc_resolved_address addr;
- char *addr_str = NULL;
- grpc_dualstack_mode dsmode;
- grpc_tcp_listener *new_sp = NULL;
- const char *ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : "<unknown>");
- if (ifa_it->ifa_addr == NULL) {
- continue;
- } else if (ifa_it->ifa_addr->sa_family == AF_INET) {
- addr.len = sizeof(struct sockaddr_in);
- } else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
- addr.len = sizeof(struct sockaddr_in6);
- } else {
- continue;
- }
- memcpy(addr.addr, ifa_it->ifa_addr, addr.len);
- if (!grpc_sockaddr_set_port(&addr, requested_port)) {
- /* Should never happen, because we check sa_family above. */
- err = GRPC_ERROR_CREATE("Failed to set port");
- break;
- }
- if (grpc_sockaddr_to_string(&addr_str, &addr, 0) < 0) {
- addr_str = gpr_strdup("<error>");
- }
- gpr_log(GPR_DEBUG,
- "Adding local addr from interface %s flags 0x%x to server: %s",
- ifa_name, ifa_it->ifa_flags, addr_str);
- /* We could have multiple interfaces with the same address (e.g., bonding),
- so look for duplicates. */
- if (find_listener_with_addr(s, &addr) != NULL) {
- gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s", addr_str,
- ifa_name);
- gpr_free(addr_str);
- continue;
- }
- if ((err = add_addr_to_server(s, &addr, port_index, fd_index, &dsmode,
- &new_sp)) != GRPC_ERROR_NONE) {
- char *err_str = NULL;
- grpc_error *root_err;
- if (gpr_asprintf(&err_str, "Failed to add listener: %s", addr_str) < 0) {
- err_str = gpr_strdup("Failed to add listener");
- }
- root_err = GRPC_ERROR_CREATE(err_str);
- gpr_free(err_str);
- gpr_free(addr_str);
- err = grpc_error_add_child(root_err, err);
- break;
- } else {
- GPR_ASSERT(requested_port == new_sp->port);
- ++fd_index;
- if (sp != NULL) {
- new_sp->is_sibling = 1;
- sp->sibling = new_sp;
- }
- sp = new_sp;
- }
- gpr_free(addr_str);
- }
- freeifaddrs(ifa);
- if (err != GRPC_ERROR_NONE) {
- return err;
- } else if (sp == NULL) {
- return GRPC_ERROR_CREATE("No local addresses");
- } else {
- *out_port = sp->port;
- return GRPC_ERROR_NONE;
- }
-}
-
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s,
unsigned port_index,
@@ -701,14 +323,16 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s,
grpc_error *v6_err = GRPC_ERROR_NONE;
grpc_error *v4_err = GRPC_ERROR_NONE;
*out_port = -1;
- if (s->expand_wildcard_addrs) {
- return add_all_local_addrs_to_server(s, port_index, requested_port,
- out_port);
+
+ if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) {
+ return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port,
+ out_port);
}
+
grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
/* Try listening on IPv6 first. */
- if ((v6_err = add_addr_to_server(s, &wild6, port_index, fd_index, &dsmode,
- &sp)) == GRPC_ERROR_NONE) {
+ if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index,
+ &dsmode, &sp)) == GRPC_ERROR_NONE) {
++fd_index;
requested_port = *out_port = sp->port;
if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) {
@@ -717,8 +341,8 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s,
}
/* If we got a v6-only socket or nothing, try adding 0.0.0.0. */
grpc_sockaddr_set_port(&wild4, requested_port);
- if ((v4_err = add_addr_to_server(s, &wild4, port_index, fd_index, &dsmode,
- &sp2)) == GRPC_ERROR_NONE) {
+ if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index,
+ &dsmode, &sp2)) == GRPC_ERROR_NONE) {
*out_port = sp2->port;
if (sp != NULL) {
sp2->is_sibling = 1;
@@ -726,8 +350,20 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s,
}
}
if (*out_port > 0) {
- GRPC_LOG_IF_ERROR("Failed to add :: listener", v6_err);
- GRPC_LOG_IF_ERROR("Failed to add 0.0.0.0 listener", v4_err);
+ if (v6_err != GRPC_ERROR_NONE) {
+ gpr_log(GPR_INFO,
+ "Failed to add :: listener, "
+ "the environment may not support IPv6: %s",
+ grpc_error_string(v6_err));
+ GRPC_ERROR_UNREF(v6_err);
+ }
+ if (v4_err != GRPC_ERROR_NONE) {
+ gpr_log(GPR_INFO,
+ "Failed to add 0.0.0.0 listener, "
+ "the environment may not support IPv4: %s",
+ grpc_error_string(v4_err));
+ GRPC_ERROR_UNREF(v4_err);
+ }
return GRPC_ERROR_NONE;
} else {
grpc_error *root_err =
@@ -756,7 +392,7 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) {
err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode,
&fd);
if (err != GRPC_ERROR_NONE) return err;
- err = prepare_socket(fd, &listener->addr, true, &port);
+ err = grpc_tcp_server_prepare_socket(fd, &listener->addr, true, &port);
if (err != GRPC_ERROR_NONE) return err;
listener->server->nports++;
grpc_sockaddr_to_string(&addr_str, &listener->addr, 1);
@@ -828,7 +464,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
addr = &addr6_v4mapped;
}
- if ((err = add_addr_to_server(s, addr, port_index, 0, &dsmode, &sp)) ==
+ if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) ==
GRPC_ERROR_NONE) {
*out_port = sp->port;
}
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h
new file mode 100644
index 0000000000..f5dc8532f9
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_server_utils_posix.h
@@ -0,0 +1,134 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H
+#define GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H
+
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/socket_utils_posix.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+
+/* one listening port */
+typedef struct grpc_tcp_listener {
+ int fd;
+ grpc_fd *emfd;
+ grpc_tcp_server *server;
+ grpc_resolved_address addr;
+ int port;
+ unsigned port_index;
+ unsigned fd_index;
+ grpc_closure read_closure;
+ grpc_closure destroyed_closure;
+ struct grpc_tcp_listener *next;
+ /* sibling is a linked list of all listeners for a given port. add_port and
+ clone_port place all new listeners in the same sibling list. A member of
+ the 'sibling' list is also a member of the 'next' list. The head of each
+ sibling list has is_sibling==0, and subsequent members of sibling lists
+ have is_sibling==1. is_sibling allows separate sibling lists to be
+ identified while iterating through 'next'. */
+ struct grpc_tcp_listener *sibling;
+ int is_sibling;
+} grpc_tcp_listener;
+
+/* the overall server */
+struct grpc_tcp_server {
+ gpr_refcount refs;
+ /* Called whenever accept() succeeds on a server port. */
+ grpc_tcp_server_cb on_accept_cb;
+ void *on_accept_cb_arg;
+
+ gpr_mu mu;
+
+ /* active port count: how many ports are actually still listening */
+ size_t active_ports;
+ /* destroyed port count: how many ports are completely destroyed */
+ size_t destroyed_ports;
+
+ /* is this server shutting down? */
+ bool shutdown;
+ /* have listeners been shutdown? */
+ bool shutdown_listeners;
+ /* use SO_REUSEPORT */
+ bool so_reuseport;
+ /* expand wildcard addresses to a list of all local addresses */
+ bool expand_wildcard_addrs;
+
+ /* linked list of server ports */
+ grpc_tcp_listener *head;
+ grpc_tcp_listener *tail;
+ unsigned nports;
+
+ /* List of closures passed to shutdown_starting_add(). */
+ grpc_closure_list shutdown_starting;
+
+ /* shutdown callback */
+ grpc_closure *shutdown_complete;
+
+ /* all pollsets interested in new connections */
+ grpc_pollset **pollsets;
+ /* number of pollsets in the pollsets array */
+ size_t pollset_count;
+
+ /* next pollset to assign a channel to */
+ gpr_atm next_pollset_to_assign;
+
+ grpc_resource_quota *resource_quota;
+};
+
+/* If successful, add a listener to \a s for \a addr, set \a dsmode for the
+ socket, and return the \a listener. */
+grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s,
+ const grpc_resolved_address *addr,
+ unsigned port_index, unsigned fd_index,
+ grpc_dualstack_mode *dsmode,
+ grpc_tcp_listener **listener);
+
+/* Get all addresses assigned to network interfaces on the machine and create a
+ listener for each. requested_port is the port to use for every listener, or 0
+ to select one random port that will be used for every listener. Set *out_port
+ to the port selected. Return GRPC_ERROR_NONE only if all listeners were
+ added. */
+grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s,
+ unsigned port_index,
+ int requested_port,
+ int *out_port);
+
+/* Prepare a recently-created socket for listening. */
+grpc_error *grpc_tcp_server_prepare_socket(int fd,
+ const grpc_resolved_address *addr,
+ bool so_reuseport, int *port);
+/* Ruturn true if the platform supports ifaddrs */
+bool grpc_tcp_server_have_ifaddrs(void);
+
+#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.c b/src/core/lib/iomgr/tcp_server_utils_posix_common.c
new file mode 100644
index 0000000000..e45e27d5ab
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.c
@@ -0,0 +1,220 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_HAVE_IFADDRS
+
+#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
+
+#include <errno.h>
+#include <limits.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/unix_sockets_posix.h"
+
+#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
+
+static gpr_once s_init_max_accept_queue_size;
+static int s_max_accept_queue_size;
+
+/* get max listen queue size on linux */
+static void init_max_accept_queue_size(void) {
+ int n = SOMAXCONN;
+ char buf[64];
+ FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r");
+ if (fp == NULL) {
+ /* 2.4 kernel. */
+ s_max_accept_queue_size = SOMAXCONN;
+ return;
+ }
+ if (fgets(buf, sizeof buf, fp)) {
+ char *end;
+ long i = strtol(buf, &end, 10);
+ if (i > 0 && i <= INT_MAX && end && *end == 0) {
+ n = (int)i;
+ }
+ }
+ fclose(fp);
+ s_max_accept_queue_size = n;
+
+ if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) {
+ gpr_log(GPR_INFO,
+ "Suspiciously small accept queue (%d) will probably lead to "
+ "connection drops",
+ s_max_accept_queue_size);
+ }
+}
+
+static int get_max_accept_queue_size(void) {
+ gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size);
+ return s_max_accept_queue_size;
+}
+
+static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd,
+ const grpc_resolved_address *addr,
+ unsigned port_index, unsigned fd_index,
+ grpc_tcp_listener **listener) {
+ grpc_tcp_listener *sp = NULL;
+ int port = -1;
+ char *addr_str;
+ char *name;
+
+ grpc_error *err =
+ grpc_tcp_server_prepare_socket(fd, addr, s->so_reuseport, &port);
+ if (err == GRPC_ERROR_NONE) {
+ GPR_ASSERT(port > 0);
+ grpc_sockaddr_to_string(&addr_str, addr, 1);
+ gpr_asprintf(&name, "tcp-server-listener:%s", addr_str);
+ gpr_mu_lock(&s->mu);
+ s->nports++;
+ GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
+ sp = gpr_malloc(sizeof(grpc_tcp_listener));
+ sp->next = NULL;
+ if (s->head == NULL) {
+ s->head = sp;
+ } else {
+ s->tail->next = sp;
+ }
+ s->tail = sp;
+ sp->server = s;
+ sp->fd = fd;
+ sp->emfd = grpc_fd_create(fd, name);
+ memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
+ sp->port = port;
+ sp->port_index = port_index;
+ sp->fd_index = fd_index;
+ sp->is_sibling = 0;
+ sp->sibling = NULL;
+ GPR_ASSERT(sp->emfd);
+ gpr_mu_unlock(&s->mu);
+ gpr_free(addr_str);
+ gpr_free(name);
+ }
+
+ *listener = sp;
+ return err;
+}
+
+/* If successful, add a listener to s for addr, set *dsmode for the socket, and
+ return the *listener. */
+grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s,
+ const grpc_resolved_address *addr,
+ unsigned port_index, unsigned fd_index,
+ grpc_dualstack_mode *dsmode,
+ grpc_tcp_listener **listener) {
+ grpc_resolved_address addr4_copy;
+ int fd;
+ grpc_error *err =
+ grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd);
+ if (err != GRPC_ERROR_NONE) {
+ return err;
+ }
+ if (*dsmode == GRPC_DSMODE_IPV4 &&
+ grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
+ addr = &addr4_copy;
+ }
+ return add_socket_to_server(s, fd, addr, port_index, fd_index, listener);
+}
+
+/* Prepare a recently-created socket for listening. */
+grpc_error *grpc_tcp_server_prepare_socket(int fd,
+ const grpc_resolved_address *addr,
+ bool so_reuseport, int *port) {
+ grpc_resolved_address sockname_temp;
+ grpc_error *err = GRPC_ERROR_NONE;
+
+ GPR_ASSERT(fd >= 0);
+
+ if (so_reuseport && !grpc_is_unix_socket(addr)) {
+ err = grpc_set_socket_reuse_port(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ }
+
+ err = grpc_set_socket_nonblocking(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ err = grpc_set_socket_cloexec(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ if (!grpc_is_unix_socket(addr)) {
+ err = grpc_set_socket_low_latency(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ err = grpc_set_socket_reuse_addr(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ }
+ err = grpc_set_socket_no_sigpipe_if_possible(fd);
+ if (err != GRPC_ERROR_NONE) goto error;
+
+ GPR_ASSERT(addr->len < ~(socklen_t)0);
+ if (bind(fd, (struct sockaddr *)addr->addr, (socklen_t)addr->len) < 0) {
+ err = GRPC_OS_ERROR(errno, "bind");
+ goto error;
+ }
+
+ if (listen(fd, get_max_accept_queue_size()) < 0) {
+ err = GRPC_OS_ERROR(errno, "listen");
+ goto error;
+ }
+
+ sockname_temp.len = sizeof(struct sockaddr_storage);
+
+ if (getsockname(fd, (struct sockaddr *)sockname_temp.addr,
+ (socklen_t *)&sockname_temp.len) < 0) {
+ err = GRPC_OS_ERROR(errno, "getsockname");
+ goto error;
+ }
+
+ *port = grpc_sockaddr_get_port(&sockname_temp);
+ return GRPC_ERROR_NONE;
+
+error:
+ GPR_ASSERT(err != GRPC_ERROR_NONE);
+ if (fd >= 0) {
+ close(fd);
+ }
+ grpc_error *ret = grpc_error_set_int(
+ GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1),
+ GRPC_ERROR_INT_FD, fd);
+ GRPC_ERROR_UNREF(err);
+ return ret;
+}
+
+#endif /* GRPC_HAVE_IFADDRS */
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c
new file mode 100644
index 0000000000..6354a6bdc1
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c
@@ -0,0 +1,195 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_HAVE_IFADDRS
+
+#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
+
+#include <errno.h>
+#include <ifaddrs.h>
+#include <stddef.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+
+/* Return the listener in s with address addr or NULL. */
+static grpc_tcp_listener *find_listener_with_addr(grpc_tcp_server *s,
+ grpc_resolved_address *addr) {
+ grpc_tcp_listener *l;
+ gpr_mu_lock(&s->mu);
+ for (l = s->head; l != NULL; l = l->next) {
+ if (l->addr.len != addr->len) {
+ continue;
+ }
+ if (memcmp(l->addr.addr, addr->addr, addr->len) == 0) {
+ break;
+ }
+ }
+ gpr_mu_unlock(&s->mu);
+ return l;
+}
+
+/* Bind to "::" to get a port number not used by any address. */
+static grpc_error *get_unused_port(int *port) {
+ grpc_resolved_address wild;
+ grpc_sockaddr_make_wildcard6(0, &wild);
+ grpc_dualstack_mode dsmode;
+ int fd;
+ grpc_error *err =
+ grpc_create_dualstack_socket(&wild, SOCK_STREAM, 0, &dsmode, &fd);
+ if (err != GRPC_ERROR_NONE) {
+ return err;
+ }
+ if (dsmode == GRPC_DSMODE_IPV4) {
+ grpc_sockaddr_make_wildcard4(0, &wild);
+ }
+ if (bind(fd, (const struct sockaddr *)wild.addr, (socklen_t)wild.len) != 0) {
+ err = GRPC_OS_ERROR(errno, "bind");
+ close(fd);
+ return err;
+ }
+ if (getsockname(fd, (struct sockaddr *)wild.addr, (socklen_t *)&wild.len) !=
+ 0) {
+ err = GRPC_OS_ERROR(errno, "getsockname");
+ close(fd);
+ return err;
+ }
+ close(fd);
+ *port = grpc_sockaddr_get_port(&wild);
+ return *port <= 0 ? GRPC_ERROR_CREATE("Bad port") : GRPC_ERROR_NONE;
+}
+
+grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s,
+ unsigned port_index,
+ int requested_port,
+ int *out_port) {
+ struct ifaddrs *ifa = NULL;
+ struct ifaddrs *ifa_it;
+ unsigned fd_index = 0;
+ grpc_tcp_listener *sp = NULL;
+ grpc_error *err = GRPC_ERROR_NONE;
+ if (requested_port == 0) {
+ /* Note: There could be a race where some local addrs can listen on the
+ selected port and some can't. The sane way to handle this would be to
+ retry by recreating the whole grpc_tcp_server. Backing out individual
+ listeners and orphaning the FDs looks like too much trouble. */
+ if ((err = get_unused_port(&requested_port)) != GRPC_ERROR_NONE) {
+ return err;
+ } else if (requested_port <= 0) {
+ return GRPC_ERROR_CREATE("Bad get_unused_port()");
+ }
+ gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port);
+ }
+ if (getifaddrs(&ifa) != 0 || ifa == NULL) {
+ return GRPC_OS_ERROR(errno, "getifaddrs");
+ }
+ for (ifa_it = ifa; ifa_it != NULL; ifa_it = ifa_it->ifa_next) {
+ grpc_resolved_address addr;
+ char *addr_str = NULL;
+ grpc_dualstack_mode dsmode;
+ grpc_tcp_listener *new_sp = NULL;
+ const char *ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : "<unknown>");
+ if (ifa_it->ifa_addr == NULL) {
+ continue;
+ } else if (ifa_it->ifa_addr->sa_family == AF_INET) {
+ addr.len = sizeof(struct sockaddr_in);
+ } else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
+ addr.len = sizeof(struct sockaddr_in6);
+ } else {
+ continue;
+ }
+ memcpy(addr.addr, ifa_it->ifa_addr, addr.len);
+ if (!grpc_sockaddr_set_port(&addr, requested_port)) {
+ /* Should never happen, because we check sa_family above. */
+ err = GRPC_ERROR_CREATE("Failed to set port");
+ break;
+ }
+ if (grpc_sockaddr_to_string(&addr_str, &addr, 0) < 0) {
+ addr_str = gpr_strdup("<error>");
+ }
+ gpr_log(GPR_DEBUG,
+ "Adding local addr from interface %s flags 0x%x to server: %s",
+ ifa_name, ifa_it->ifa_flags, addr_str);
+ /* We could have multiple interfaces with the same address (e.g., bonding),
+ so look for duplicates. */
+ if (find_listener_with_addr(s, &addr) != NULL) {
+ gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s", addr_str,
+ ifa_name);
+ gpr_free(addr_str);
+ continue;
+ }
+ if ((err = grpc_tcp_server_add_addr(s, &addr, port_index, fd_index, &dsmode,
+ &new_sp)) != GRPC_ERROR_NONE) {
+ char *err_str = NULL;
+ grpc_error *root_err;
+ if (gpr_asprintf(&err_str, "Failed to add listener: %s", addr_str) < 0) {
+ err_str = gpr_strdup("Failed to add listener");
+ }
+ root_err = GRPC_ERROR_CREATE(err_str);
+ gpr_free(err_str);
+ gpr_free(addr_str);
+ err = grpc_error_add_child(root_err, err);
+ break;
+ } else {
+ GPR_ASSERT(requested_port == new_sp->port);
+ ++fd_index;
+ if (sp != NULL) {
+ new_sp->is_sibling = 1;
+ sp->sibling = new_sp;
+ }
+ sp = new_sp;
+ }
+ gpr_free(addr_str);
+ }
+ freeifaddrs(ifa);
+ if (err != GRPC_ERROR_NONE) {
+ return err;
+ } else if (sp == NULL) {
+ return GRPC_ERROR_CREATE("No local addresses");
+ } else {
+ *out_port = sp->port;
+ return GRPC_ERROR_NONE;
+ }
+}
+
+bool grpc_tcp_server_have_ifaddrs(void) { return true; }
+
+#endif /* GRPC_HAVE_IFADDRS */
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c
new file mode 100644
index 0000000000..95c3198be6
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#if defined(GRPC_POSIX_SOCKET) && !defined(GRPC_HAVE_IFADDRS)
+
+#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
+
+grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s,
+ unsigned port_index,
+ int requested_port,
+ int *out_port) {
+ return GRPC_ERROR_CREATE("no ifaddrs available");
+}
+
+bool grpc_tcp_server_have_ifaddrs(void) { return false; }
+
+#endif /* defined(GRPC_POSIX_SOCKET) && !defined(GRPC_HAVE_IFADDRS) */
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index a23082a866..8dea1d98ff 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -318,7 +318,7 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {
+ grpc_closure *ignored) {
call_data *calld = elem->call_data;
grpc_call_credentials_unref(exec_ctx, calld->creds);
if (calld->have_host) {
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 14619d97ca..01cb473177 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -227,7 +227,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {}
+ grpc_closure *ignored) {}
/* Constructor for channel_data */
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/support/atm.c b/src/core/lib/support/atm.c
new file mode 100644
index 0000000000..06e8432caf
--- /dev/null
+++ b/src/core/lib/support/atm.c
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/atm.h>
+#include <grpc/support/useful.h>
+
+gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta,
+ gpr_atm min, gpr_atm max) {
+ gpr_atm current;
+ gpr_atm new;
+ do {
+ current = gpr_atm_no_barrier_load(value);
+ new = GPR_CLAMP(current + delta, min, max);
+ if (new == current) break;
+ } while (!gpr_atm_no_barrier_cas(value, current, new));
+ return new;
+}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 47f36be5fd..2c5d8c0ff3 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -51,6 +51,7 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/support/arena.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
@@ -138,6 +139,7 @@ typedef struct batch_control {
} batch_control;
struct grpc_call {
+ gpr_arena *arena;
grpc_completion_queue *cq;
grpc_polling_entity pollent;
grpc_channel *channel;
@@ -212,6 +214,8 @@ struct grpc_call {
grpc_closure receiving_initial_metadata_ready;
uint32_t test_only_last_message_flags;
+ grpc_closure release_call;
+
union {
struct {
grpc_status_code *status;
@@ -273,7 +277,11 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
grpc_channel_get_channel_stack(args->channel);
grpc_call *call;
GPR_TIMER_BEGIN("grpc_call_create", 0);
- call = gpr_zalloc(sizeof(grpc_call) + channel_stack->call_stack_size);
+ gpr_arena *arena =
+ gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel));
+ call = gpr_arena_alloc(arena,
+ sizeof(grpc_call) + channel_stack->call_stack_size);
+ call->arena = arena;
*out_call = call;
gpr_mu_init(&call->child_list_mu);
call->channel = args->channel;
@@ -364,11 +372,16 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
/* initial refcount dropped by grpc_call_destroy */
+ grpc_call_element_args call_args = {
+ .call_stack = CALL_STACK_FROM_CALL(call),
+ .server_transport_data = args->server_transport_data,
+ .context = call->context,
+ .path = path,
+ .start_time = call->start_time,
+ .deadline = send_deadline,
+ .arena = call->arena};
add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
- destroy_call, call, call->context,
- args->server_transport_data, path,
- call->start_time, send_deadline,
- CALL_STACK_FROM_CALL(call)));
+ destroy_call, call, &call_args));
if (error != GRPC_ERROR_NONE) {
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
@@ -425,6 +438,14 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
}
+static void release_call(grpc_exec_ctx *exec_ctx, void *call,
+ grpc_error *error) {
+ grpc_call *c = call;
+ grpc_channel *channel = c->channel;
+ grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
+}
+
static void set_status_value_directly(grpc_status_code status, void *dest);
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
@@ -451,7 +472,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
if (c->cq) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
- grpc_channel *channel = c->channel;
get_final_status(call, set_status_value_directly, &c->final_info.final_status,
NULL);
@@ -463,8 +483,9 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
unpack_received_status(gpr_atm_acq_load(&c->status[i])).error);
}
- grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c);
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
+ grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info,
+ grpc_closure_init(&c->release_call, release_call, c,
+ grpc_schedule_on_exec_ctx));
GPR_TIMER_END("destroy_call", 0);
}
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index a00572c007..16208a5ca9 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -68,6 +68,8 @@ struct grpc_channel {
grpc_compression_options compression_options;
grpc_mdelem default_authority;
+ gpr_atm call_size_estimate;
+
gpr_mu registered_call_mu;
registered_call *registered_calls;
@@ -107,6 +109,10 @@ grpc_channel *grpc_channel_create_with_builder(
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
+ gpr_atm_no_barrier_store(
+ &channel->call_size_estimate,
+ (gpr_atm)CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size);
+
grpc_compression_options_init(&channel->compression_options);
for (size_t i = 0; i < args->num_args; i++) {
if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) {
@@ -186,6 +192,32 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
channel_stack_type);
}
+size_t grpc_channel_get_call_size_estimate(grpc_channel *channel) {
+#define ROUND_UP_SIZE 256
+ return ((size_t)gpr_atm_no_barrier_load(&channel->call_size_estimate) +
+ ROUND_UP_SIZE) &
+ ~(size_t)(ROUND_UP_SIZE - 1);
+}
+
+void grpc_channel_update_call_size_estimate(grpc_channel *channel,
+ size_t size) {
+ size_t cur = (size_t)gpr_atm_no_barrier_load(&channel->call_size_estimate);
+ if (cur < size) {
+ /* size grew: update estimate */
+ gpr_atm_no_barrier_cas(&channel->call_size_estimate, (gpr_atm)cur,
+ (gpr_atm)size);
+ /* if we lose: never mind, something else will likely update soon enough */
+ } else if (cur == size) {
+ /* no change: holding pattern */
+ } else if (cur > 0) {
+ /* size shrank: decrease estimate */
+ gpr_atm_no_barrier_cas(
+ &channel->call_size_estimate, (gpr_atm)cur,
+ (gpr_atm)(GPR_MIN(cur - 1, (255 * cur + size) / 256)));
+ /* if we lose: never mind, something else will likely update soon enough */
+ }
+}
+
char *grpc_channel_get_target(grpc_channel *channel) {
GRPC_API_TRACE("grpc_channel_get_target(channel=%p)", 1, (channel));
return gpr_strdup(channel->target);
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index 609c9357e0..0f203a3e59 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -71,6 +71,9 @@ grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_exec_ctx *exec_ctx,
grpc_channel *channel,
int status_code);
+size_t grpc_channel_get_call_size_estimate(grpc_channel *channel);
+void grpc_channel_update_call_size_estimate(grpc_channel *channel, size_t size);
+
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index 49bc4c114b..9ddb88bd11 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -130,8 +130,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *and_free_memory) {
- gpr_free(and_free_memory);
+ grpc_closure *then_schedule_closure) {
+ grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
}
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index b360579553..1186a4af63 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -898,7 +898,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {
+ grpc_closure *ignored) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c
index 12da2a88fe..1195f75044 100644
--- a/src/core/lib/transport/service_config.c
+++ b/src/core/lib/transport/service_config.c
@@ -93,6 +93,18 @@ void grpc_service_config_destroy(grpc_service_config* service_config) {
gpr_free(service_config);
}
+void grpc_service_config_parse_global_params(
+ const grpc_service_config* service_config,
+ void (*process_json)(const grpc_json* json, void* arg), void* arg) {
+ const grpc_json* json = service_config->json_tree;
+ if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return;
+ for (grpc_json* field = json->child; field != NULL; field = field->next) {
+ if (field->key == NULL) return;
+ if (strcmp(field->key, "methodConfig") == 0) continue;
+ process_json(field, arg);
+ }
+}
+
const char* grpc_service_config_get_lb_policy_name(
const grpc_service_config* service_config) {
const grpc_json* json = service_config->json_tree;
diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h
index cd739a593c..ebfc59b534 100644
--- a/src/core/lib/transport/service_config.h
+++ b/src/core/lib/transport/service_config.h
@@ -42,6 +42,12 @@ typedef struct grpc_service_config grpc_service_config;
grpc_service_config* grpc_service_config_create(const char* json_string);
void grpc_service_config_destroy(grpc_service_config* service_config);
+/// Invokes \a process_json() for each global parameter in the service
+/// config. \a arg is passed as the second argument to \a process_json().
+void grpc_service_config_parse_global_params(
+ const grpc_service_config* service_config,
+ void (*process_json)(const grpc_json* json, void* arg), void* arg);
+
/// Gets the LB policy name from \a service_config.
/// Returns NULL if no LB policy name was specified.
/// Caller does NOT take ownership.
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 3024f2ae78..d56cb31ee0 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -162,9 +162,9 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx,
int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport, grpc_stream *stream,
grpc_stream_refcount *refcount,
- const void *server_data) {
+ const void *server_data, gpr_arena *arena) {
return transport->vtable->init_stream(exec_ctx, transport, stream, refcount,
- server_data);
+ server_data, arena);
}
void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx,
@@ -197,9 +197,10 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport,
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
- grpc_stream *stream, void *and_free_memory) {
+ grpc_stream *stream,
+ grpc_closure *then_schedule_closure) {
transport->vtable->destroy_stream(exec_ctx, transport, stream,
- and_free_memory);
+ then_schedule_closure);
}
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index cc1c277b35..950b18aeda 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -41,6 +41,7 @@
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/support/arena.h"
#include "src/core/lib/transport/byte_stream.h"
#include "src/core/lib/transport/metadata_batch.h"
@@ -229,7 +230,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport);
int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport, grpc_stream *stream,
grpc_stream_refcount *refcount,
- const void *server_data);
+ const void *server_data, gpr_arena *arena);
void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport,
grpc_stream *stream, grpc_polling_entity *pollent);
@@ -246,7 +247,8 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport,
caller, but any child memory must be cleaned up) */
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
- grpc_stream *stream, void *and_free_memory);
+ grpc_stream *stream,
+ grpc_closure *then_schedule_closure);
void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op,
diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h
index 8553148c35..6f688bf8d2 100644
--- a/src/core/lib/transport/transport_impl.h
+++ b/src/core/lib/transport/transport_impl.h
@@ -47,7 +47,7 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_init_stream */
int (*init_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
grpc_stream *stream, grpc_stream_refcount *refcount,
- const void *server_data);
+ const void *server_data, gpr_arena *arena);
/* implementation of grpc_transport_set_pollset */
void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
@@ -67,7 +67,8 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
- grpc_stream *stream, void *and_free_memory);
+ grpc_stream *stream,
+ grpc_closure *then_schedule_closure);
/* implementation of grpc_transport_destroy */
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc
index 65f3277499..eddcacc332 100644
--- a/src/cpp/common/channel_arguments.cc
+++ b/src/cpp/common/channel_arguments.cc
@@ -81,6 +81,16 @@ ChannelArguments::ChannelArguments(const ChannelArguments& other)
}
}
+ChannelArguments::~ChannelArguments() {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ for (auto it = args_.begin(); it != args_.end(); ++it) {
+ if (it->type == GRPC_ARG_POINTER) {
+ it->value.pointer.vtable->destroy(&exec_ctx, it->value.pointer.p);
+ }
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
void ChannelArguments::Swap(ChannelArguments& other) {
args_.swap(other.args_);
strings_.swap(other.strings_);
@@ -101,8 +111,10 @@ void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) {
for (auto it = args_.begin(); it != args_.end(); ++it) {
if (it->type == mutator_arg.type &&
grpc::string(it->key) == grpc::string(mutator_arg.key)) {
+ GPR_ASSERT(!replaced);
it->value.pointer.vtable->destroy(&exec_ctx, it->value.pointer.p);
it->value.pointer = mutator_arg.value.pointer;
+ replaced = true;
}
}
grpc_exec_ctx_finish(&exec_ctx);
@@ -185,7 +197,7 @@ void ChannelArguments::SetPointerWithVtable(
arg.type = GRPC_ARG_POINTER;
strings_.push_back(key);
arg.key = const_cast<char*>(strings_.back().c_str());
- arg.value.pointer.p = value;
+ arg.value.pointer.p = vtable->copy(value);
arg.value.pointer.vtable = vtable;
args_.push_back(arg);
}
diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h
index 79c4bab985..494d5d64d7 100644
--- a/src/cpp/common/channel_filter.h
+++ b/src/cpp/common/channel_filter.h
@@ -318,7 +318,8 @@ class ChannelFilter final {
static void DestroyCallElement(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *and_free_memory) {
+ grpc_closure *then_call_closure) {
+ GPR_ASSERT(then_call_closure == NULL);
reinterpret_cast<CallDataType *>(elem->call_data)->~CallDataType();
}
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 52d3ffd2c4..8edab1b759 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -34,6 +34,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/profiling/stap_timers.c',
'src/core/lib/support/alloc.c',
'src/core/lib/support/arena.c',
+ 'src/core/lib/support/atm.c',
'src/core/lib/support/avl.c',
'src/core/lib/support/backoff.c',
'src/core/lib/support/cmdline.c',
@@ -135,6 +136,9 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/tcp_client_windows.c',
'src/core/lib/iomgr/tcp_posix.c',
'src/core/lib/iomgr/tcp_server_posix.c',
+ 'src/core/lib/iomgr/tcp_server_utils_posix_common.c',
+ 'src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c',
+ 'src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c',
'src/core/lib/iomgr/tcp_server_uv.c',
'src/core/lib/iomgr/tcp_server_windows.c',
'src/core/lib/iomgr/tcp_uv.c',
@@ -264,6 +268,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/client_channel/resolver.c',
'src/core/ext/client_channel/resolver_factory.c',
'src/core/ext/client_channel/resolver_registry.c',
+ 'src/core/ext/client_channel/retry_throttle.c',
'src/core/ext/client_channel/subchannel.c',
'src/core/ext/client_channel/subchannel_index.c',
'src/core/ext/client_channel/uri_parser.c',
diff --git a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
index 58f3b364ba..3325d54375 100644
--- a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
@@ -32,7 +32,7 @@ from concurrent import futures
import unittest
import grpc
-from src.proto.grpc.testing import test_pb2
+from src.proto.grpc.testing import test_pb2_grpc
from tests.interop import _intraop_test_case
from tests.interop import methods
@@ -44,11 +44,11 @@ class InsecureIntraopTest(_intraop_test_case.IntraopTestCase,
def setUp(self):
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
- test_pb2.add_TestServiceServicer_to_server(methods.TestService(),
- self.server)
+ test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(),
+ self.server)
port = self.server.add_insecure_port('[::]:0')
self.server.start()
- self.stub = test_pb2.TestServiceStub(
+ self.stub = test_pb2_grpc.TestServiceStub(
grpc.insecure_channel('localhost:{}'.format(port)))
diff --git a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
index 5fe929b99e..857e00efb3 100644
--- a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
@@ -32,7 +32,7 @@ from concurrent import futures
import unittest
import grpc
-from src.proto.grpc.testing import test_pb2
+from src.proto.grpc.testing import test_pb2_grpc
from tests.interop import _intraop_test_case
from tests.interop import methods
@@ -45,14 +45,14 @@ class SecureIntraopTest(_intraop_test_case.IntraopTestCase, unittest.TestCase):
def setUp(self):
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
- test_pb2.add_TestServiceServicer_to_server(methods.TestService(),
- self.server)
+ test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(),
+ self.server)
port = self.server.add_secure_port(
'[::]:0',
grpc.ssl_server_credentials(
[(resources.private_key(), resources.certificate_chain())]))
self.server.start()
- self.stub = test_pb2.TestServiceStub(
+ self.stub = test_pb2_grpc.TestServiceStub(
grpc.secure_channel('localhost:{}'.format(port),
grpc.ssl_channel_credentials(
resources.test_root_certificates()), (
diff --git a/src/python/grpcio_tests/tests/interop/methods.py b/src/python/grpcio_tests/tests/interop/methods.py
index 662ea9ce57..e1016f7c0d 100644
--- a/src/python/grpcio_tests/tests/interop/methods.py
+++ b/src/python/grpcio_tests/tests/interop/methods.py
@@ -40,7 +40,7 @@ from grpc.beta import implementations
from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing import messages_pb2
-from src.proto.grpc.testing import test_pb2
+from src.proto.grpc.testing import test_pb2_grpc
_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial"
_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin"
@@ -66,7 +66,7 @@ def _maybe_echo_status_and_message(request, servicer_context):
servicer_context.set_details(request.response_status.message)
-class TestService(test_pb2.TestServiceServicer):
+class TestService(test_pb2_grpc.TestServiceServicer):
def EmptyCall(self, request, context):
_maybe_echo_metadata(context)
diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py
index 65f1604eb8..0ae2c97b42 100644
--- a/src/python/grpcio_tests/tests/interop/server.py
+++ b/src/python/grpcio_tests/tests/interop/server.py
@@ -34,7 +34,7 @@ import logging
import time
import grpc
-from src.proto.grpc.testing import test_pb2
+from src.proto.grpc.testing import test_pb2_grpc
from tests.interop import methods
from tests.interop import resources
@@ -53,7 +53,8 @@ def serve():
args = parser.parse_args()
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
- test_pb2.add_TestServiceServicer_to_server(methods.TestService(), server)
+ test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(),
+ server)
if args.use_tls:
private_key = resources.private_key()
certificate_chain = resources.certificate_chain()
diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py
index 025dfb9d4a..7cd53e7bd9 100644
--- a/src/python/grpcio_tests/tests/qps/qps_worker.py
+++ b/src/python/grpcio_tests/tests/qps/qps_worker.py
@@ -33,7 +33,7 @@ import time
from concurrent import futures
import grpc
-from src.proto.grpc.testing import services_pb2
+from src.proto.grpc.testing import services_pb2_grpc
from tests.qps import worker_server
@@ -41,7 +41,7 @@ from tests.qps import worker_server
def run_worker_server(port):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=5))
servicer = worker_server.WorkerServer()
- services_pb2.add_WorkerServiceServicer_to_server(servicer, server)
+ services_pb2_grpc.add_WorkerServiceServicer_to_server(servicer, server)
server.add_insecure_port('[::]:{}'.format(port))
server.start()
servicer.wait_for_quit()
diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py
index ca1a777611..de9535f46e 100644
--- a/src/python/grpcio_tests/tests/qps/worker_server.py
+++ b/src/python/grpcio_tests/tests/qps/worker_server.py
@@ -35,7 +35,7 @@ import time
from concurrent import futures
import grpc
from src.proto.grpc.testing import control_pb2
-from src.proto.grpc.testing import services_pb2
+from src.proto.grpc.testing import services_pb2_grpc
from src.proto.grpc.testing import stats_pb2
from tests.qps import benchmark_client
@@ -45,7 +45,7 @@ from tests.qps import histogram
from tests.unit import resources
-class WorkerServer(services_pb2.WorkerServiceServicer):
+class WorkerServer(services_pb2_grpc.WorkerServiceServicer):
"""Python Worker Server implementation."""
def __init__(self):
@@ -87,8 +87,8 @@ class WorkerServer(services_pb2.WorkerServiceServicer):
futures.ThreadPoolExecutor(max_workers=server_threads))
if config.server_type == control_pb2.ASYNC_SERVER:
servicer = benchmark_server.BenchmarkServer()
- services_pb2.add_BenchmarkServiceServicer_to_server(servicer,
- server)
+ services_pb2_grpc.add_BenchmarkServiceServicer_to_server(servicer,
+ server)
elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER:
resp_size = config.payload_config.bytebuf_params.resp_size
servicer = benchmark_server.GenericBenchmarkServer(resp_size)
diff --git a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py
index d06ff064e2..4d73be6204 100644
--- a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py
+++ b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py
@@ -34,6 +34,7 @@ import grpc
from grpc.framework.foundation import logging_pool
from grpc_reflection.v1alpha import reflection
from grpc_reflection.v1alpha import reflection_pb2
+from grpc_reflection.v1alpha import reflection_pb2_grpc
from google.protobuf import descriptor_pool
from google.protobuf import descriptor_pb2
@@ -61,12 +62,12 @@ class ReflectionServicerTest(unittest.TestCase):
server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
self._server = grpc.server(server_pool)
port = self._server.add_insecure_port('[::]:0')
- reflection_pb2.add_ServerReflectionServicer_to_server(servicer,
- self._server)
+ reflection_pb2_grpc.add_ServerReflectionServicer_to_server(servicer,
+ self._server)
self._server.start()
channel = grpc.insecure_channel('localhost:%d' % port)
- self._stub = reflection_pb2.ServerReflectionStub(channel)
+ self._stub = reflection_pb2_grpc.ServerReflectionStub(channel)
def testFileByName(self):
requests = (reflection_pb2.ServerReflectionRequest(
diff --git a/src/python/grpcio_tests/tests/stress/client.py b/src/python/grpcio_tests/tests/stress/client.py
index b9dbe61d44..b7eb12bff8 100644
--- a/src/python/grpcio_tests/tests/stress/client.py
+++ b/src/python/grpcio_tests/tests/stress/client.py
@@ -34,7 +34,7 @@ import threading
import grpc
from six.moves import queue
-from src.proto.grpc.testing import metrics_pb2
+from src.proto.grpc.testing import metrics_pb2_grpc
from src.proto.grpc.testing import test_pb2
from tests.interop import methods
@@ -139,7 +139,7 @@ def run_test(args):
runners = []
server = grpc.server(futures.ThreadPoolExecutor(max_workers=25))
- metrics_pb2.add_MetricsServiceServicer_to_server(
+ metrics_pb2_grpc.add_MetricsServiceServicer_to_server(
metrics_server.MetricsServer(hist), server)
server.add_insecure_port('[::]:{}'.format(args.metrics_port))
server.start()