aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-05 17:49:11 -0800
committerGravatar Yash Tibrewal <yashkt@google.com>2017-12-05 17:49:11 -0800
commit73bb67d054ecb952f10649cc42c998ab7ea8facd (patch)
tree181ee31adc7a144d8dcffed5e5de990ab4662cef
parent65c45fbb4d576d99bcd1c00a13f430c8994fee38 (diff)
parent05cd3102b7b59bf5d71f66dc012be1f4ecdaad88 (diff)
Merge master into execctx
-rw-r--r--.gitignore1
-rw-r--r--CMakeLists.txt39
-rw-r--r--Makefile48
-rw-r--r--build.yaml12
-rw-r--r--include/grpc/impl/codegen/grpc_types.h3
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc228
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.cc24
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h16
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc46
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc94
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc123
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.cc35
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc6
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.cc85
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc4
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc19
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.h9
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.cc5
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h2
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc23
-rw-r--r--src/core/lib/channel/handshaker.cc13
-rw-r--r--src/core/lib/channel/handshaker.h17
-rw-r--r--src/core/lib/http/httpcli_security_connector.cc5
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.cc2
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.cc100
-rw-r--r--src/core/lib/iomgr/udp_server.cc107
-rw-r--r--src/core/lib/iomgr/udp_server.h11
-rw-r--r--src/core/lib/transport/transport.h3
-rwxr-xr-xsrc/csharp/generate_proto_csharp.sh5
-rw-r--r--src/python/grpcio/grpc/__init__.py169
-rw-r--r--src/python/grpcio/grpc/_auth.py2
-rw-r--r--src/python/grpcio/grpc/_credential_composition.py33
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi13
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi8
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi82
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi322
-rw-r--r--src/python/grpcio/grpc/_plugin_wrapping.py127
-rw-r--r--src/python/grpcio/grpc/_server.py20
-rw-r--r--src/python/grpcio_tests/tests/tests.json2
-rw-r--r--src/python/grpcio_tests/tests/unit/_auth_test.py6
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py22
-rw-r--r--src/python/grpcio_tests/tests/unit/_invocation_defects_test.py22
-rw-r--r--test/core/bad_client/bad_client.cc4
-rw-r--r--test/core/end2end/fixtures/h2_sockpair+trace.cc8
-rw-r--r--test/core/end2end/fixtures/h2_sockpair.cc8
-rw-r--r--test/core/end2end/fixtures/h2_sockpair_1byte.cc8
-rw-r--r--test/core/end2end/fixtures/http_proxy_fixture.cc104
-rw-r--r--test/core/end2end/fuzzers/api_fuzzer.cc4
-rw-r--r--test/core/end2end/fuzzers/client_fuzzer.cc4
-rw-r--r--test/core/end2end/fuzzers/server_fuzzer.cc4
-rw-r--r--test/core/iomgr/udp_server_test.cc16
-rw-r--r--test/core/security/ssl_server_fuzzer.cc5
-rw-r--r--test/core/transport/chttp2/BUILD15
-rw-r--r--test/core/transport/chttp2/settings_timeout_test.cc253
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc40
-rw-r--r--test/cpp/interop/interop_server.cc18
-rw-r--r--test/cpp/interop/server_helper.h20
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc2
-rw-r--r--test/cpp/microbenchmarks/fullstack_fixtures.h8
-rw-r--r--test/cpp/performance/writes_per_rpc_test.cc8
-rw-r--r--test/cpp/qps/client_sync.cc212
-rw-r--r--tools/run_tests/generated/sources_and_headers.json17
-rw-r--r--tools/run_tests/generated/tests.json24
-rwxr-xr-xtools/run_tests/run_tests.py2
64 files changed, 1747 insertions, 950 deletions
diff --git a/.gitignore b/.gitignore
index 151bbbde13..2b35c5fb9a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -121,6 +121,7 @@ gdb.txt
tags
# perf data
+memory_usage.csv
perf.data
perf.data.old
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 297301bb39..22a1ab79af 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -678,6 +678,7 @@ add_dependencies(buildtests_cxx bm_pollset)
endif()
add_dependencies(buildtests_cxx channel_arguments_test)
add_dependencies(buildtests_cxx channel_filter_test)
+add_dependencies(buildtests_cxx chttp2_settings_timeout_test)
add_dependencies(buildtests_cxx cli_call_test)
add_dependencies(buildtests_cxx client_channel_stress_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@@ -9863,6 +9864,44 @@ target_link_libraries(channel_filter_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
+add_executable(chttp2_settings_timeout_test
+ test/core/transport/chttp2/settings_timeout_test.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+
+target_include_directories(chttp2_settings_timeout_test
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${BORINGSSL_ROOT_DIR}/include
+ PRIVATE ${PROTOBUF_ROOT_DIR}/src
+ PRIVATE ${BENCHMARK_ROOT_DIR}/include
+ PRIVATE ${ZLIB_ROOT_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
+ PRIVATE ${CARES_INCLUDE_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
+ PRIVATE third_party/googletest/googletest/include
+ PRIVATE third_party/googletest/googletest
+ PRIVATE third_party/googletest/googlemock/include
+ PRIVATE third_party/googletest/googlemock
+ PRIVATE ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(chttp2_settings_timeout_test
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc_test_util
+ grpc
+ gpr_test_util
+ gpr
+ ${_gRPC_GFLAGS_LIBRARIES}
+)
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
add_executable(cli_call_test
test/cpp/util/cli_call_test.cc
third_party/googletest/googletest/src/gtest-all.cc
diff --git a/Makefile b/Makefile
index 0e52d5a325..9dfd2a8ae2 100644
--- a/Makefile
+++ b/Makefile
@@ -1114,6 +1114,7 @@ bm_metadata: $(BINDIR)/$(CONFIG)/bm_metadata
bm_pollset: $(BINDIR)/$(CONFIG)/bm_pollset
channel_arguments_test: $(BINDIR)/$(CONFIG)/channel_arguments_test
channel_filter_test: $(BINDIR)/$(CONFIG)/channel_filter_test
+chttp2_settings_timeout_test: $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test
cli_call_test: $(BINDIR)/$(CONFIG)/cli_call_test
client_channel_stress_test: $(BINDIR)/$(CONFIG)/client_channel_stress_test
client_crash_test: $(BINDIR)/$(CONFIG)/client_crash_test
@@ -1557,6 +1558,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/bm_pollset \
$(BINDIR)/$(CONFIG)/channel_arguments_test \
$(BINDIR)/$(CONFIG)/channel_filter_test \
+ $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \
$(BINDIR)/$(CONFIG)/cli_call_test \
$(BINDIR)/$(CONFIG)/client_channel_stress_test \
$(BINDIR)/$(CONFIG)/client_crash_test \
@@ -1684,6 +1686,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/bm_pollset \
$(BINDIR)/$(CONFIG)/channel_arguments_test \
$(BINDIR)/$(CONFIG)/channel_filter_test \
+ $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \
$(BINDIR)/$(CONFIG)/cli_call_test \
$(BINDIR)/$(CONFIG)/client_channel_stress_test \
$(BINDIR)/$(CONFIG)/client_crash_test \
@@ -2069,6 +2072,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/channel_arguments_test || ( echo test channel_arguments_test failed ; exit 1 )
$(E) "[RUN] Testing channel_filter_test"
$(Q) $(BINDIR)/$(CONFIG)/channel_filter_test || ( echo test channel_filter_test failed ; exit 1 )
+ $(E) "[RUN] Testing chttp2_settings_timeout_test"
+ $(Q) $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test || ( echo test chttp2_settings_timeout_test failed ; exit 1 )
$(E) "[RUN] Testing cli_call_test"
$(Q) $(BINDIR)/$(CONFIG)/cli_call_test || ( echo test cli_call_test failed ; exit 1 )
$(E) "[RUN] Testing client_channel_stress_test"
@@ -14369,6 +14374,49 @@ endif
endif
+CHTTP2_SETTINGS_TIMEOUT_TEST_SRC = \
+ test/core/transport/chttp2/settings_timeout_test.cc \
+
+CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CHTTP2_SETTINGS_TIMEOUT_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
+
+$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test: $(PROTOBUF_DEP) $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LDXX) $(LDFLAGS) $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test
+
+endif
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/transport/chttp2/settings_timeout_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_chttp2_settings_timeout_test: $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
CLI_CALL_TEST_SRC = \
test/cpp/util/cli_call_test.cc \
diff --git a/build.yaml b/build.yaml
index 854a695acc..6158b466ea 100644
--- a/build.yaml
+++ b/build.yaml
@@ -3824,6 +3824,18 @@ targets:
- grpc
- gpr
uses_polling: false
+- name: chttp2_settings_timeout_test
+ gtest: true
+ build: test
+ language: c++
+ src:
+ - test/core/transport/chttp2/settings_timeout_test.cc
+ deps:
+ - grpc_test_util
+ - grpc
+ - gpr_test_util
+ - gpr
+ uses_polling: true
- name: cli_call_test
gtest: true
build: test
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 9ab2fc8b73..fcbc8ac5a1 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -240,6 +240,9 @@ typedef struct {
/** The time between the first and second connection attempts, in ms */
#define GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS \
"grpc.initial_reconnect_backoff_ms"
+/** The timeout used on servers for finishing handshaking on an incoming
+ connection. Defaults to 120 seconds. */
+#define GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS "grpc.server_handshake_timeout_ms"
/** This *should* be used for testing only.
The caller of the secure_channel_create functions may override the target
name used for SSL host name checking using this channel argument which is of
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 727210bbd4..ba82c88eb7 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -209,6 +209,14 @@ typedef struct client_channel_channel_data {
char* info_service_config_json;
} channel_data;
+typedef struct {
+ channel_data* chand;
+ /** used as an identifier, don't dereference it because the LB policy may be
+ * non-existing when the callback is run */
+ grpc_lb_policy* lb_policy;
+ grpc_closure closure;
+} reresolution_request_args;
+
/** We create one watcher for each new lb_policy that is returned from a
resolver, to watch for state changes from the lb_policy. When a state
change is seen, we update the channel, and create a new watcher. */
@@ -254,21 +262,13 @@ static void set_channel_connectivity_state_locked(channel_data* chand,
static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
lb_policy_connectivity_watcher* w = (lb_policy_connectivity_watcher*)arg;
- grpc_connectivity_state publish_state = w->state;
/* check if the notification is for the latest policy */
if (w->lb_policy == w->chand->lb_policy) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
w->lb_policy, grpc_connectivity_state_name(w->state));
}
- if (publish_state == GRPC_CHANNEL_SHUTDOWN &&
- w->chand->resolver != nullptr) {
- publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- grpc_resolver_channel_saw_error_locked(w->chand->resolver);
- GRPC_LB_POLICY_UNREF(w->chand->lb_policy, "channel");
- w->chand->lb_policy = nullptr;
- }
- set_channel_connectivity_state_locked(w->chand, publish_state,
+ set_channel_connectivity_state_locked(w->chand, w->state,
GRPC_ERROR_REF(error), "lb_changed");
if (w->state != GRPC_CHANNEL_SHUTDOWN) {
watch_lb_policy_locked(w->chand, w->lb_policy, w->state);
@@ -364,6 +364,25 @@ static void parse_retry_throttle_params(const grpc_json* field, void* arg) {
}
}
+static void request_reresolution_locked(void* arg, grpc_error* error) {
+ reresolution_request_args* args = (reresolution_request_args*)arg;
+ channel_data* chand = args->chand;
+ // If this invocation is for a stale LB policy, treat it as an LB shutdown
+ // signal.
+ if (args->lb_policy != chand->lb_policy || error != GRPC_ERROR_NONE ||
+ chand->resolver == nullptr) {
+ GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
+ gpr_free(args);
+ return;
+ }
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "chand=%p: started name re-resolving", chand);
+ }
+ grpc_resolver_channel_saw_error_locked(chand->resolver);
+ // Give back the closure to the LB policy.
+ grpc_lb_policy_set_reresolve_closure_locked(chand->lb_policy, &args->closure);
+}
+
static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
channel_data* chand = (channel_data*)arg;
if (grpc_client_channel_trace.enabled()) {
@@ -379,98 +398,111 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
grpc_server_retry_throttle_data* retry_throttle_data = nullptr;
grpc_slice_hash_table* method_params_table = nullptr;
if (chand->resolver_result != nullptr) {
- // Find LB policy name.
- const char* lb_policy_name = nullptr;
- const grpc_arg* channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
- if (channel_arg != nullptr) {
- GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
- lb_policy_name = channel_arg->value.string;
- }
- // Special case: If at least one balancer address is present, we use
- // the grpclb policy, regardless of what the resolver actually specified.
- channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
- if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
- grpc_lb_addresses* addresses =
- (grpc_lb_addresses*)channel_arg->value.pointer.p;
- bool found_balancer_address = false;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (addresses->addresses[i].is_balancer) {
- found_balancer_address = true;
- break;
+ if (chand->resolver != nullptr) {
+ // Find LB policy name.
+ const char* lb_policy_name = nullptr;
+ const grpc_arg* channel_arg = grpc_channel_args_find(
+ chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
+ if (channel_arg != nullptr) {
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+ lb_policy_name = channel_arg->value.string;
+ }
+ // Special case: If at least one balancer address is present, we use
+ // the grpclb policy, regardless of what the resolver actually specified.
+ channel_arg =
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
+ if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
+ grpc_lb_addresses* addresses =
+ (grpc_lb_addresses*)channel_arg->value.pointer.p;
+ bool found_balancer_address = false;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) {
+ found_balancer_address = true;
+ break;
+ }
+ }
+ if (found_balancer_address) {
+ if (lb_policy_name != nullptr &&
+ strcmp(lb_policy_name, "grpclb") != 0) {
+ gpr_log(GPR_INFO,
+ "resolver requested LB policy %s but provided at least one "
+ "balancer address -- forcing use of grpclb LB policy",
+ lb_policy_name);
+ }
+ lb_policy_name = "grpclb";
}
}
- if (found_balancer_address) {
- if (lb_policy_name != nullptr &&
- strcmp(lb_policy_name, "grpclb") != 0) {
- gpr_log(GPR_INFO,
- "resolver requested LB policy %s but provided at least one "
- "balancer address -- forcing use of grpclb LB policy",
+ // Use pick_first if nothing was specified and we didn't select grpclb
+ // above.
+ if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
+ grpc_lb_policy_args lb_policy_args;
+ lb_policy_args.args = chand->resolver_result;
+ lb_policy_args.client_channel_factory = chand->client_channel_factory;
+ lb_policy_args.combiner = chand->combiner;
+ // Check to see if we're already using the right LB policy.
+ // Note: It's safe to use chand->info_lb_policy_name here without
+ // taking a lock on chand->info_mu, because this function is the
+ // only thing that modifies its value, and it can only be invoked
+ // once at any given time.
+ lb_policy_name_changed =
+ chand->info_lb_policy_name == nullptr ||
+ gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
+ if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
+ // Continue using the same LB policy. Update with new addresses.
+ lb_policy_updated = true;
+ grpc_lb_policy_update_locked(chand->lb_policy, &lb_policy_args);
+ } else {
+ // Instantiate new LB policy.
+ new_lb_policy = grpc_lb_policy_create(lb_policy_name, &lb_policy_args);
+ if (new_lb_policy == nullptr) {
+ gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
lb_policy_name);
+ } else {
+ reresolution_request_args* args =
+ (reresolution_request_args*)gpr_zalloc(sizeof(*args));
+ args->chand = chand;
+ args->lb_policy = new_lb_policy;
+ GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
+ grpc_combiner_scheduler(chand->combiner));
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
+ grpc_lb_policy_set_reresolve_closure_locked(new_lb_policy,
+ &args->closure);
}
- lb_policy_name = "grpclb";
- }
- }
- // Use pick_first if nothing was specified and we didn't select grpclb
- // above.
- if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
- grpc_lb_policy_args lb_policy_args;
- lb_policy_args.args = chand->resolver_result;
- lb_policy_args.client_channel_factory = chand->client_channel_factory;
- lb_policy_args.combiner = chand->combiner;
- // Check to see if we're already using the right LB policy.
- // Note: It's safe to use chand->info_lb_policy_name here without
- // taking a lock on chand->info_mu, because this function is the
- // only thing that modifies its value, and it can only be invoked
- // once at any given time.
- lb_policy_name_changed =
- chand->info_lb_policy_name == nullptr ||
- gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
- if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
- // Continue using the same LB policy. Update with new addresses.
- lb_policy_updated = true;
- grpc_lb_policy_update_locked(chand->lb_policy, &lb_policy_args);
- } else {
- // Instantiate new LB policy.
- new_lb_policy = grpc_lb_policy_create(lb_policy_name, &lb_policy_args);
- if (new_lb_policy == nullptr) {
- gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
}
- }
- // Find service config.
- channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
- if (channel_arg != nullptr) {
- GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
- service_config_json = gpr_strdup(channel_arg->value.string);
- grpc_service_config* service_config =
- grpc_service_config_create(service_config_json);
- if (service_config != nullptr) {
- channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
- GPR_ASSERT(channel_arg != nullptr);
+ // Find service config.
+ channel_arg = grpc_channel_args_find(chand->resolver_result,
+ GRPC_ARG_SERVICE_CONFIG);
+ if (channel_arg != nullptr) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
- grpc_uri* uri = grpc_uri_parse(channel_arg->value.string, true);
- GPR_ASSERT(uri->path[0] != '\0');
- service_config_parsing_state parsing_state;
- memset(&parsing_state, 0, sizeof(parsing_state));
- parsing_state.server_name =
- uri->path[0] == '/' ? uri->path + 1 : uri->path;
- grpc_service_config_parse_global_params(
- service_config, parse_retry_throttle_params, &parsing_state);
- grpc_uri_destroy(uri);
- retry_throttle_data = parsing_state.retry_throttle_data;
- method_params_table = grpc_service_config_create_method_config_table(
- service_config, method_parameters_create_from_json,
- method_parameters_ref_wrapper, method_parameters_unref_wrapper);
- grpc_service_config_destroy(service_config);
+ service_config_json = gpr_strdup(channel_arg->value.string);
+ grpc_service_config* service_config =
+ grpc_service_config_create(service_config_json);
+ if (service_config != nullptr) {
+ channel_arg = grpc_channel_args_find(chand->resolver_result,
+ GRPC_ARG_SERVER_URI);
+ GPR_ASSERT(channel_arg != nullptr);
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+ grpc_uri* uri = grpc_uri_parse(channel_arg->value.string, true);
+ GPR_ASSERT(uri->path[0] != '\0');
+ service_config_parsing_state parsing_state;
+ memset(&parsing_state, 0, sizeof(parsing_state));
+ parsing_state.server_name =
+ uri->path[0] == '/' ? uri->path + 1 : uri->path;
+ grpc_service_config_parse_global_params(
+ service_config, parse_retry_throttle_params, &parsing_state);
+ grpc_uri_destroy(uri);
+ retry_throttle_data = parsing_state.retry_throttle_data;
+ method_params_table = grpc_service_config_create_method_config_table(
+ service_config, method_parameters_create_from_json,
+ method_parameters_ref_wrapper, method_parameters_unref_wrapper);
+ grpc_service_config_destroy(service_config);
+ }
}
+ // Before we clean up, save a copy of lb_policy_name, since it might
+ // be pointing to data inside chand->resolver_result.
+ // The copy will be saved in chand->lb_policy_name below.
+ lb_policy_name_dup = gpr_strdup(lb_policy_name);
}
- // Before we clean up, save a copy of lb_policy_name, since it might
- // be pointing to data inside chand->resolver_result.
- // The copy will be saved in chand->lb_policy_name below.
- lb_policy_name_dup = gpr_strdup(lb_policy_name);
grpc_channel_args_destroy(chand->resolver_result);
chand->resolver_result = nullptr;
}
@@ -507,11 +539,11 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
}
chand->method_params_table = method_params_table;
// If we have a new LB policy or are shutting down (in which case
- // new_lb_policy will be NULL), swap out the LB policy, unreffing the
- // old one and removing its fds from chand->interested_parties.
- // Note that we do NOT do this if either (a) we updated the existing
- // LB policy above or (b) we failed to create the new LB policy (in
- // which case we want to continue using the most recent one we had).
+ // new_lb_policy will be NULL), swap out the LB policy, unreffing the old one
+ // and removing its fds from chand->interested_parties. Note that we do NOT do
+ // this if either (a) we updated the existing LB policy above or (b) we failed
+ // to create the new LB policy (in which case we want to continue using the
+ // most recent one we had).
if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE ||
chand->resolver == nullptr) {
if (chand->lb_policy != nullptr) {
diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc
index 6e2559936b..b97aa319f7 100644
--- a/src/core/ext/filters/client_channel/lb_policy.cc
+++ b/src/core/ext/filters/client_channel/lb_policy.cc
@@ -147,3 +147,27 @@ void grpc_lb_policy_update_locked(grpc_lb_policy* policy,
const grpc_lb_policy_args* lb_policy_args) {
policy->vtable->update_locked(policy, lb_policy_args);
}
+
+void grpc_lb_policy_set_reresolve_closure_locked(
+ grpc_lb_policy* policy, grpc_closure* request_reresolution) {
+ policy->vtable->set_reresolve_closure_locked(policy, request_reresolution);
+}
+
+void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
+ grpc_core::TraceFlag* grpc_lb_trace,
+ grpc_error* error) {
+ if (policy->request_reresolution != nullptr) {
+ GRPC_CLOSURE_SCHED(policy->request_reresolution, error);
+ policy->request_reresolution = nullptr;
+ if (grpc_lb_trace->enabled()) {
+ gpr_log(GPR_DEBUG,
+ "%s %p: scheduling re-resolution closure with error=%s.",
+ grpc_lb_trace->name(), policy, grpc_error_string(error));
+ }
+ } else {
+ if (grpc_lb_trace->enabled() && error == GRPC_ERROR_NONE) {
+ gpr_log(GPR_DEBUG, "%s %p: re-resolution already in progress.",
+ grpc_lb_trace->name(), policy);
+ }
+ }
+}
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 6c241e932a..fd28a2b623 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -38,6 +38,8 @@ struct grpc_lb_policy {
grpc_pollset_set* interested_parties;
/* combiner under which lb_policy actions take place */
grpc_combiner* combiner;
+ /* callback to force a re-resolution */
+ grpc_closure* request_reresolution;
};
/** Extra arguments for an LB pick */
@@ -93,6 +95,10 @@ struct grpc_lb_policy_vtable {
void (*update_locked)(grpc_lb_policy* policy,
const grpc_lb_policy_args* args);
+
+ /** \see grpc_lb_policy_set_reresolve_closure */
+ void (*set_reresolve_closure_locked)(grpc_lb_policy* policy,
+ grpc_closure* request_reresolution);
};
#ifndef NDEBUG
@@ -193,4 +199,14 @@ grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
void grpc_lb_policy_update_locked(grpc_lb_policy* policy,
const grpc_lb_policy_args* lb_policy_args);
+/** Set the re-resolution closure to \a request_reresolution. */
+void grpc_lb_policy_set_reresolve_closure_locked(
+ grpc_lb_policy* policy, grpc_closure* request_reresolution);
+
+/** Try to request a re-resolution. It's NOT a public API; it's only for use by
+ the LB policy implementations. */
+void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
+ grpc_core::TraceFlag* grpc_lb_trace,
+ grpc_error* error);
+
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 9a52f297b3..eadeea0368 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -636,7 +636,7 @@ static void update_lb_connectivity_status_locked(
/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
* immediately (ignoring its completion callback), we need to perform the
- * cleanups this callback would otherwise be resposible for.
+ * cleanups this callback would otherwise be responsible for.
* If \a force_async is true, then we will manually schedule the
* completion callback even if the pick is available immediately. */
static bool pick_from_internal_rr_locked(
@@ -761,6 +761,9 @@ static void create_rr_locked(glb_lb_policy* glb_policy,
glb_policy->rr_policy);
return;
}
+ grpc_lb_policy_set_reresolve_closure_locked(
+ new_rr_policy, glb_policy->base.request_reresolution);
+ glb_policy->base.request_reresolution = nullptr;
glb_policy->rr_policy = new_rr_policy;
grpc_error* rr_state_error = nullptr;
const grpc_connectivity_state rr_state =
@@ -978,6 +981,7 @@ static void glb_destroy(grpc_lb_policy* pol) {
static void glb_shutdown_locked(grpc_lb_policy* pol) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
glb_policy->shutting_down = true;
/* We need a copy of the lb_call pointer because we can't cancell the call
@@ -1008,6 +1012,8 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) {
glb_policy->pending_pings = nullptr;
if (glb_policy->rr_policy != nullptr) {
GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown");
+ } else {
+ grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
}
// We destroy the LB channel here because
// glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
@@ -1017,28 +1023,26 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) {
grpc_channel_destroy(glb_policy->lb_channel);
glb_policy->lb_channel = nullptr;
}
- grpc_connectivity_state_set(
- &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
+ grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_REF(error), "glb_shutdown");
while (pp != nullptr) {
pending_pick* next = pp->next;
*pp->target = nullptr;
- GRPC_CLOSURE_SCHED(
- &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
+ GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_ERROR_REF(error));
gpr_free(pp);
pp = next;
}
while (pping != nullptr) {
pending_ping* next = pping->next;
- GRPC_CLOSURE_SCHED(
- &pping->wrapped_notify_arg.wrapper_closure,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
+ GRPC_CLOSURE_SCHED(&pping->wrapped_notify_arg.wrapper_closure,
+ GRPC_ERROR_REF(error));
gpr_free(pping);
pping = next;
}
+ GRPC_ERROR_UNREF(error);
}
// Cancel a specific pending pick.
@@ -1713,8 +1717,8 @@ static void fallback_update_locked(glb_lb_policy* glb_policy,
grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses =
extract_backend_addresses_locked(addresses);
- if (glb_policy->started_picking && glb_policy->lb_fallback_timeout_ms > 0 &&
- !glb_policy->fallback_timer_active) {
+ if (glb_policy->lb_fallback_timeout_ms > 0 &&
+ glb_policy->rr_policy != nullptr) {
rr_handover_locked(glb_policy);
}
}
@@ -1811,7 +1815,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
grpc_call_cancel(glb_policy->lb_call, nullptr);
// lb_on_server_status_received() will pick up the cancel and reinit
// lb_call.
- } else if (glb_policy->started_picking && !glb_policy->shutting_down) {
+ } else if (glb_policy->started_picking) {
if (glb_policy->retry_timer_active) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
glb_policy->retry_timer_active = false;
@@ -1828,6 +1832,19 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
}
}
+static void glb_set_reresolve_closure_locked(
+ grpc_lb_policy* policy, grpc_closure* request_reresolution) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)policy;
+ GPR_ASSERT(!glb_policy->shutting_down);
+ GPR_ASSERT(glb_policy->base.request_reresolution == nullptr);
+ if (glb_policy->rr_policy != nullptr) {
+ grpc_lb_policy_set_reresolve_closure_locked(glb_policy->rr_policy,
+ request_reresolution);
+ } else {
+ glb_policy->base.request_reresolution = request_reresolution;
+ }
+}
+
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy,
@@ -1839,7 +1856,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_exit_idle_locked,
glb_check_connectivity_locked,
glb_notify_on_state_change_locked,
- glb_update_locked};
+ glb_update_locked,
+ glb_set_reresolve_closure_locked};
static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
grpc_lb_policy_args* args) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 9fc7f1a5d1..5e75b64843 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -70,7 +70,9 @@ static void pf_destroy(grpc_lb_policy* pol) {
}
}
-static void shutdown_locked(pick_first_lb_policy* p, grpc_error* error) {
+static void pf_shutdown_locked(grpc_lb_policy* pol) {
+ pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p);
}
@@ -94,14 +96,11 @@ static void shutdown_locked(pick_first_lb_policy* p, grpc_error* error) {
p->latest_pending_subchannel_list, "pf_shutdown");
p->latest_pending_subchannel_list = nullptr;
}
+ grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
+ GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
-static void pf_shutdown_locked(grpc_lb_policy* pol) {
- shutdown_locked((pick_first_lb_policy*)pol,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"));
-}
-
static void pf_cancel_pick_locked(grpc_lb_policy* pol,
grpc_connected_subchannel** target,
grpc_error* error) {
@@ -154,10 +153,15 @@ static void start_picking_locked(pick_first_lb_policy* p) {
if (p->subchannel_list != nullptr &&
p->subchannel_list->num_subchannels > 0) {
p->subchannel_list->checking_subchannel = 0;
- grpc_lb_subchannel_list_ref_for_connectivity_watch(
- p->subchannel_list, "connectivity_watch+start_picking");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &p->subchannel_list->subchannels[0]);
+ for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
+ if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
+ grpc_lb_subchannel_list_ref_for_connectivity_watch(
+ p->subchannel_list, "connectivity_watch+start_picking");
+ grpc_lb_subchannel_data_start_connectivity_watch(
+ &p->subchannel_list->subchannels[i]);
+ break;
+ }
+ }
}
}
@@ -394,6 +398,9 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
p->latest_pending_subchannel_list != nullptr) {
p->selected = nullptr;
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ sd->subchannel_list, "selected_not_ready+switch_to_update");
grpc_lb_subchannel_list_shutdown_and_unref(
p->subchannel_list, "selected_not_ready+switch_to_update");
p->subchannel_list = p->latest_pending_subchannel_list;
@@ -402,21 +409,34 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
} else {
- if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- /* if the selected channel goes bad, we're done */
- sd->curr_connectivity_state = GRPC_CHANNEL_SHUTDOWN;
+ // TODO(juanlishen): we re-resolve when the selected subchannel goes to
+ // TRANSIENT_FAILURE because we used to shut down in this case before
+ // re-resolution is introduced. But we need to investigate whether we
+ // really want to take any action instead of waiting for the selected
+ // subchannel reconnecting.
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN ||
+ sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ // If the selected channel goes bad, request a re-resolution.
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
+ GRPC_ERROR_NONE,
+ "selected_changed+reresolve");
+ p->started_picking = false;
+ grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
+ GRPC_ERROR_NONE);
+ } else {
+ grpc_connectivity_state_set(&p->state_tracker,
+ sd->curr_connectivity_state,
+ GRPC_ERROR_REF(error), "selected_changed");
}
- grpc_connectivity_state_set(&p->state_tracker,
- sd->curr_connectivity_state,
- GRPC_ERROR_REF(error), "selected_changed");
if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
} else {
+ p->selected = nullptr;
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_list_unref_for_connectivity_watch(
sd->subchannel_list, "pf_selected_shutdown");
- shutdown_locked(p, GRPC_ERROR_REF(error));
+ grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown");
}
}
return;
@@ -519,23 +539,36 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
} while (sd->subchannel == nullptr && sd != original_sd);
if (sd == original_sd) {
grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_candidate_shutdown");
- shutdown_locked(p, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Pick first exhausted channels", &error, 1));
- break;
- }
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(&p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "subchannel_failed");
+ sd->subchannel_list, "pf_exhausted_subchannels");
+ if (sd->subchannel_list == p->subchannel_list) {
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
+ GRPC_ERROR_NONE,
+ "exhausted_subchannels+reresolve");
+ p->started_picking = false;
+ grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
+ GRPC_ERROR_NONE);
+ }
+ } else {
+ if (sd->subchannel_list == p->subchannel_list) {
+ grpc_connectivity_state_set(
+ &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "subchannel_failed");
+ }
+ // Reuses the connectivity refs from the previous watch.
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
- // Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- break;
}
}
}
+static void pf_set_reresolve_closure_locked(
+ grpc_lb_policy* policy, grpc_closure* request_reresolution) {
+ pick_first_lb_policy* p = (pick_first_lb_policy*)policy;
+ GPR_ASSERT(!p->shutdown);
+ GPR_ASSERT(policy->request_reresolution == nullptr);
+ policy->request_reresolution = request_reresolution;
+}
+
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy,
pf_shutdown_locked,
@@ -546,7 +579,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_exit_idle_locked,
pf_check_connectivity_locked,
pf_notify_on_state_change_locked,
- pf_update_locked};
+ pf_update_locked,
+ pf_set_reresolve_closure_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 2eadd6473d..6958b72693 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -20,9 +20,9 @@
*
* Before every pick, the \a get_next_ready_subchannel_index_locked function
* returns the p->subchannel_list->subchannels index for next subchannel,
- * respecting the relative
- * order of the addresses provided upon creation or updates. Note however that
- * updates will start picking from the beginning of the updated list. */
+ * respecting the relative order of the addresses provided upon creation or
+ * updates. Note however that updates will start picking from the beginning of
+ * the updated list. */
#include <string.h>
@@ -167,7 +167,9 @@ static void rr_destroy(grpc_lb_policy* pol) {
gpr_free(p);
}
-static void shutdown_locked(round_robin_lb_policy* p, grpc_error* error) {
+static void rr_shutdown_locked(grpc_lb_policy* pol) {
+ round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p);
}
@@ -191,14 +193,11 @@ static void shutdown_locked(round_robin_lb_policy* p, grpc_error* error) {
p->latest_pending_subchannel_list, "sl_shutdown_pending_rr_shutdown");
p->latest_pending_subchannel_list = nullptr;
}
+ grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
+ GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
-static void rr_shutdown_locked(grpc_lb_policy* pol) {
- round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
- shutdown_locked(p, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
-}
-
static void rr_cancel_pick_locked(grpc_lb_policy* pol,
grpc_connected_subchannel** target,
grpc_error* error) {
@@ -250,10 +249,12 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol,
static void start_picking_locked(round_robin_lb_policy* p) {
p->started_picking = true;
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
- grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
- "connectivity_watch");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &p->subchannel_list->subchannels[i]);
+ if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
+ grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
+ "connectivity_watch");
+ grpc_lb_subchannel_data_start_connectivity_watch(
+ &p->subchannel_list->subchannels[i]);
+ }
}
}
@@ -341,69 +342,69 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
}
/** Sets the policy's connectivity status based on that of the passed-in \a sd
- * (the grpc_lb_subchannel_data associted with the updated subchannel) and the
- * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be
- * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the
- * connectivity status set. */
-static grpc_connectivity_state update_lb_connectivity_status_locked(
- grpc_lb_subchannel_data* sd, grpc_error* error) {
+ * (the grpc_lb_subchannel_data associated with the updated subchannel) and the
+ * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used
+ * only if the policy transitions to state TRANSIENT_FAILURE. */
+static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
+ grpc_error* error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
* 1) RULE: ANY subchannel is READY => policy is READY.
- * CHECK: At least one subchannel is ready iff p->ready_list is NOT empty.
+ * CHECK: subchannel_list->num_ready > 0.
*
* 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
- * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
- * CHECK: p->subchannel_list->num_shutdown ==
- * p->subchannel_list->num_subchannels.
+ * 3) RULE: ALL subchannels are SHUTDOWN => policy is IDLE (and requests
+ * re-resolution).
+ * CHECK: subchannel_list->num_shutdown ==
+ * subchannel_list->num_subchannels.
*
* 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
- * TRANSIENT_FAILURE.
- * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels.
+ * TRANSIENT_FAILURE.
+ * CHECK: subchannel_list->num_transient_failures ==
+ * subchannel_list->num_subchannels.
*
* 5) RULE: ALL subchannels are IDLE => policy is IDLE.
- * CHECK: p->num_idle == p->subchannel_list->num_subchannels.
+ * CHECK: subchannel_list->num_idle == subchannel_list->num_subchannels.
+ * (Note that all the subchannels will transition from IDLE to CONNECTING
+ * in batch when we start trying to connect.)
*/
- grpc_connectivity_state new_state = sd->curr_connectivity_state;
+ // TODO(juanlishen): if the subchannel states are mixed by {SHUTDOWN,
+ // TRANSIENT_FAILURE}, we don't change the state. We may want to improve on
+ // this.
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy;
- if (subchannel_list->num_ready > 0) { /* 1) READY */
+ if (subchannel_list->num_ready > 0) {
+ /* 1) READY */
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
- new_state = GRPC_CHANNEL_READY;
- } else if (sd->curr_connectivity_state ==
- GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) {
+ /* 2) CONNECTING */
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "rr_connecting");
- new_state = GRPC_CHANNEL_CONNECTING;
- } else if (p->subchannel_list->num_shutdown ==
- p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_REF(error), "rr_shutdown");
- p->shutdown = true;
- new_state = GRPC_CHANNEL_SHUTDOWN;
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[RR %p] Shutting down: all subchannels have gone into shutdown",
- (void*)p);
- }
+ } else if (subchannel_list->num_shutdown ==
+ subchannel_list->num_subchannels) {
+ /* 3) IDLE and re-resolve */
+ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
+ GRPC_ERROR_NONE,
+ "rr_exhausted_subchannels+reresolve");
+ p->started_picking = false;
+ grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
+ GRPC_ERROR_NONE);
} else if (subchannel_list->num_transient_failures ==
- p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
+ subchannel_list->num_subchannels) {
+ /* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
- new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- } else if (subchannel_list->num_idle ==
- p->subchannel_list->num_subchannels) { /* 5) IDLE */
+ } else if (subchannel_list->num_idle == subchannel_list->num_subchannels) {
+ /* 5) IDLE */
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, "rr_idle");
- new_state = GRPC_CHANNEL_IDLE;
}
GRPC_ERROR_UNREF(error);
- return new_state;
}
static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
@@ -446,20 +447,15 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
- // Update state counters and determine new overall state.
+ // Update state counters and new overall state.
update_state_counters_locked(sd);
- const grpc_connectivity_state new_policy_connectivity_state =
- update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
- // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new
- // policy's state is SHUTDOWN, clean up.
+ update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
+ // If the sd's new state is SHUTDOWN, unref the subchannel.
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown");
grpc_lb_subchannel_list_unref_for_connectivity_watch(
sd->subchannel_list, "rr_connectivity_shutdown");
- if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- shutdown_locked(p, GRPC_ERROR_REF(error));
- }
} else { // sd not in SHUTDOWN
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
if (sd->connected_subchannel == nullptr) {
@@ -495,7 +491,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
}
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
- * p->pending_picks. This preemtively replicates rr_pick()'s actions. */
+ * p->pending_picks. This preemptively replicates rr_pick()'s actions. */
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
grpc_lb_subchannel_data* selected =
@@ -630,6 +626,14 @@ static void rr_update_locked(grpc_lb_policy* policy,
}
}
+static void rr_set_reresolve_closure_locked(
+ grpc_lb_policy* policy, grpc_closure* request_reresolution) {
+ round_robin_lb_policy* p = (round_robin_lb_policy*)policy;
+ GPR_ASSERT(!p->shutdown);
+ GPR_ASSERT(policy->request_reresolution == nullptr);
+ policy->request_reresolution = request_reresolution;
+}
+
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy,
rr_shutdown_locked,
@@ -640,7 +644,8 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_exit_idle_locked,
rr_check_connectivity_locked,
rr_notify_on_state_change_locked,
- rr_update_locked};
+ rr_update_locked,
+ rr_set_reresolve_closure_locked};
static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {}
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc
index 9771bd636a..db5962e7fd 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc
@@ -114,10 +114,34 @@ static void on_handshake_done(void* arg, grpc_error* error) {
grpc_endpoint_delete_from_pollset_set(args->endpoint,
c->args.interested_parties);
c->result->transport =
- grpc_create_chttp2_transport(args->args, args->endpoint, 1);
+ grpc_create_chttp2_transport(args->args, args->endpoint, true);
GPR_ASSERT(c->result->transport);
- grpc_chttp2_transport_start_reading(c->result->transport,
- args->read_buffer);
+ // TODO(roth): We ideally want to wait until we receive HTTP/2
+ // settings from the server before we consider the connection
+ // established. If that doesn't happen before the connection
+ // timeout expires, then we should consider the connection attempt a
+ // failure and feed that information back into the backoff code.
+ // We could pass a notify_on_receive_settings callback to
+ // grpc_chttp2_transport_start_reading() to let us know when
+ // settings are received, but we would need to figure out how to use
+ // that information here.
+ //
+ // Unfortunately, we don't currently have a way to split apart the two
+ // effects of scheduling c->notify: we start sending RPCs immediately
+ // (which we want to do) and we consider the connection attempt successful
+ // (which we don't want to do until we get the notify_on_receive_settings
+ // callback from the transport). If we could split those things
+ // apart, then we could start sending RPCs but then wait for our
+ // timeout before deciding if the connection attempt is successful.
+ // If the attempt is not successful, then we would tear down the
+ // transport and feed the failure back into the backoff code.
+ //
+ // In addition, even if we did that, we would probably not want to do
+ // so until after transparent retries is implemented. Otherwise, any
+ // RPC that we attempt to send on the connection before the timeout
+ // would fail instead of being retried on a subsequent attempt.
+ grpc_chttp2_transport_start_reading(c->result->transport, args->read_buffer,
+ nullptr);
c->result->channel_args = args->args;
}
grpc_closure* notify = c->notify;
@@ -135,8 +159,9 @@ static void start_handshake_locked(chttp2_connector* c) {
c->handshake_mgr);
grpc_endpoint_add_to_pollset_set(c->endpoint, c->args.interested_parties);
grpc_handshake_manager_do_handshake(
- c->handshake_mgr, c->endpoint, c->args.channel_args, c->args.deadline,
- nullptr /* acceptor */, on_handshake_done, c);
+ c->handshake_mgr, c->args.interested_parties, c->endpoint,
+ c->args.channel_args, c->args.deadline, nullptr /* acceptor */,
+ on_handshake_done, c);
c->endpoint = nullptr; // Endpoint handed off to handshake manager.
}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
index 6236551e24..0cdea5a94e 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
@@ -53,12 +53,14 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
grpc_fd_create(fd, "client"), args, "fd-client");
grpc_transport* transport =
- grpc_create_chttp2_transport(final_args, client, 1);
+ grpc_create_chttp2_transport(final_args, client, true);
GPR_ASSERT(transport);
grpc_channel* channel = grpc_channel_create(
target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_channel_args_destroy(final_args);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
+
+ grpc_core::ExecCtx::Get()->Flush();
return channel != nullptr ? channel
: grpc_lame_client_channel_create(
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc
index 08d919144e..5669fa4090 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.cc
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc
@@ -21,6 +21,7 @@
#include <grpc/grpc.h>
#include <inttypes.h>
+#include <limits.h>
#include <string.h>
#include <grpc/support/alloc.h>
@@ -31,6 +32,7 @@
#include "src/core/ext/filters/http/server/http_server_filter.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/channel/handshaker_registry.h"
@@ -53,12 +55,51 @@ typedef struct {
} server_state;
typedef struct {
+ gpr_refcount refs;
server_state* svr_state;
grpc_pollset* accepting_pollset;
grpc_tcp_server_acceptor* acceptor;
grpc_handshake_manager* handshake_mgr;
+ // State for enforcing handshake timeout on receiving HTTP/2 settings.
+ grpc_chttp2_transport* transport;
+ grpc_millis deadline;
+ grpc_timer timer;
+ grpc_closure on_timeout;
+ grpc_closure on_receive_settings;
} server_connection_state;
+static void server_connection_state_unref(
+ server_connection_state* connection_state) {
+ if (gpr_unref(&connection_state->refs)) {
+ if (connection_state->transport != nullptr) {
+ GRPC_CHTTP2_UNREF_TRANSPORT(connection_state->transport,
+ "receive settings timeout");
+ }
+ gpr_free(connection_state);
+ }
+}
+
+static void on_timeout(void* arg, grpc_error* error) {
+ server_connection_state* connection_state = (server_connection_state*)arg;
+ // Note that we may be called with GRPC_ERROR_NONE when the timer fires
+ // or with an error indicating that the timer system is being shut down.
+ if (error != GRPC_ERROR_CANCELLED) {
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Did not receive HTTP/2 settings before handshake timeout");
+ grpc_transport_perform_op(&connection_state->transport->base, op);
+ }
+ server_connection_state_unref(connection_state);
+}
+
+static void on_receive_settings(void* arg, grpc_error* error) {
+ server_connection_state* connection_state = (server_connection_state*)arg;
+ if (error == GRPC_ERROR_NONE) {
+ grpc_timer_cancel(&connection_state->timer);
+ }
+ server_connection_state_unref(connection_state);
+}
+
static void on_handshake_done(void* arg, grpc_error* error) {
grpc_handshaker_args* args = (grpc_handshaker_args*)arg;
server_connection_state* connection_state =
@@ -67,7 +108,6 @@ static void on_handshake_done(void* arg, grpc_error* error) {
if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) {
const char* error_str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
-
if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
@@ -87,12 +127,27 @@ static void on_handshake_done(void* arg, grpc_error* error) {
// code, so we can just clean up here without creating a transport.
if (args->endpoint != nullptr) {
grpc_transport* transport =
- grpc_create_chttp2_transport(args->args, args->endpoint, 0);
+ grpc_create_chttp2_transport(args->args, args->endpoint, false);
grpc_server_setup_transport(
connection_state->svr_state->server, transport,
connection_state->accepting_pollset, args->args);
- grpc_chttp2_transport_start_reading(transport, args->read_buffer);
+ // Use notify_on_receive_settings callback to enforce the
+ // handshake deadline.
+ connection_state->transport = (grpc_chttp2_transport*)transport;
+ gpr_ref(&connection_state->refs);
+ GRPC_CLOSURE_INIT(&connection_state->on_receive_settings,
+ on_receive_settings, connection_state,
+ grpc_schedule_on_exec_ctx);
+ grpc_chttp2_transport_start_reading(
+ transport, args->read_buffer, &connection_state->on_receive_settings);
grpc_channel_args_destroy(args->args);
+ gpr_ref(&connection_state->refs);
+ GRPC_CHTTP2_REF_TRANSPORT((grpc_chttp2_transport*)transport,
+ "receive settings timeout");
+ GRPC_CLOSURE_INIT(&connection_state->on_timeout, on_timeout,
+ connection_state, grpc_schedule_on_exec_ctx);
+ grpc_timer_init(&connection_state->timer, connection_state->deadline,
+ &connection_state->on_timeout);
}
}
grpc_handshake_manager_pending_list_remove(
@@ -100,9 +155,9 @@ static void on_handshake_done(void* arg, grpc_error* error) {
connection_state->handshake_mgr);
gpr_mu_unlock(&connection_state->svr_state->mu);
grpc_handshake_manager_destroy(connection_state->handshake_mgr);
- grpc_tcp_server_unref(connection_state->svr_state->tcp_server);
gpr_free(connection_state->acceptor);
- gpr_free(connection_state);
+ grpc_tcp_server_unref(connection_state->svr_state->tcp_server);
+ server_connection_state_unref(connection_state);
}
static void on_accept(void* arg, grpc_endpoint* tcp,
@@ -123,20 +178,24 @@ static void on_accept(void* arg, grpc_endpoint* tcp,
gpr_mu_unlock(&state->mu);
grpc_tcp_server_ref(state->tcp_server);
server_connection_state* connection_state =
- (server_connection_state*)gpr_malloc(sizeof(*connection_state));
+ (server_connection_state*)gpr_zalloc(sizeof(*connection_state));
+ gpr_ref_init(&connection_state->refs, 1);
connection_state->svr_state = state;
connection_state->accepting_pollset = accepting_pollset;
connection_state->acceptor = acceptor;
connection_state->handshake_mgr = handshake_mgr;
grpc_handshakers_add(HANDSHAKER_SERVER, state->args,
connection_state->handshake_mgr);
- // TODO(roth): We should really get this timeout value from channel
- // args instead of hard-coding it.
- const grpc_millis deadline =
- grpc_core::ExecCtx::Get()->Now() + 120 * GPR_MS_PER_SEC;
- grpc_handshake_manager_do_handshake(connection_state->handshake_mgr, tcp,
- state->args, deadline, acceptor,
- on_handshake_done, connection_state);
+ const grpc_arg* timeout_arg =
+ grpc_channel_args_find(state->args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS);
+ connection_state->deadline =
+ grpc_core::ExecCtx::Get()->Now() +
+ grpc_channel_arg_get_integer(timeout_arg,
+ {120 * GPR_MS_PER_SEC, 1, INT_MAX});
+ grpc_handshake_manager_do_handshake(
+ connection_state->handshake_mgr, nullptr /* interested_parties */, tcp,
+ state->args, connection_state->deadline, acceptor, on_handshake_done,
+ connection_state);
}
/* Server callback: start listening on our ports */
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
index 210939e6af..dafd4af6ce 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
@@ -49,7 +49,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
const grpc_channel_args* server_args = grpc_server_get_channel_args(server);
grpc_transport* transport = grpc_create_chttp2_transport(
- server_args, server_endpoint, 0 /* is_client */);
+ server_args, server_endpoint, false /* is_client */);
grpc_pollset** pollsets;
size_t num_pollsets = 0;
@@ -60,7 +60,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
}
grpc_server_setup_transport(server, transport, nullptr, server_args);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
#else // !GPR_SUPPORT_CHANNELS_FROM_FD
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 1380b3756d..f537fb09c2 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -615,6 +615,10 @@ static void close_transport_locked(grpc_chttp2_transport* t,
GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
}
+ if (t->notify_on_receive_settings != nullptr) {
+ GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, GRPC_ERROR_CANCELLED);
+ t->notify_on_receive_settings = nullptr;
+ }
GRPC_ERROR_UNREF(error);
}
@@ -1702,7 +1706,6 @@ static void perform_transport_op_locked(void* stream_op,
grpc_transport_op* op = (grpc_transport_op*)stream_op;
grpc_chttp2_transport* t =
(grpc_chttp2_transport*)op->handler_private.extra_arg;
- grpc_error* close_transport = op->disconnect_with_error;
if (op->goaway_error) {
send_goaway(t, op->goaway_error);
@@ -1733,8 +1736,8 @@ static void perform_transport_op_locked(void* stream_op,
op->on_connectivity_state_change);
}
- if (close_transport != GRPC_ERROR_NONE) {
- close_transport_locked(t, close_transport);
+ if (op->disconnect_with_error != GRPC_ERROR_NONE) {
+ close_transport_locked(t, op->disconnect_with_error);
}
GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE);
@@ -3079,15 +3082,16 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
grpc_transport* grpc_create_chttp2_transport(
- const grpc_channel_args* channel_args, grpc_endpoint* ep, int is_client) {
+ const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) {
grpc_chttp2_transport* t =
(grpc_chttp2_transport*)gpr_zalloc(sizeof(grpc_chttp2_transport));
- init_transport(t, channel_args, ep, is_client != 0);
+ init_transport(t, channel_args, ep, is_client);
return &t->base;
}
-void grpc_chttp2_transport_start_reading(grpc_transport* transport,
- grpc_slice_buffer* read_buffer) {
+void grpc_chttp2_transport_start_reading(
+ grpc_transport* transport, grpc_slice_buffer* read_buffer,
+ grpc_closure* notify_on_receive_settings) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)transport;
GRPC_CHTTP2_REF_TRANSPORT(
t, "reading_action"); /* matches unref inside reading_action */
@@ -3095,5 +3099,6 @@ void grpc_chttp2_transport_start_reading(grpc_transport* transport,
grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
gpr_free(read_buffer);
}
+ t->notify_on_receive_settings = notify_on_receive_settings;
GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h
index 8fd10c803d..596ababb19 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h
@@ -28,11 +28,14 @@ extern grpc_core::TraceFlag grpc_trace_http2_stream_state;
extern grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount;
grpc_transport* grpc_create_chttp2_transport(
- const grpc_channel_args* channel_args, grpc_endpoint* ep, int is_client);
+ const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client);
/// Takes ownership of \a read_buffer, which (if non-NULL) contains
/// leftover bytes previously read from the endpoint (e.g., by handshakers).
-void grpc_chttp2_transport_start_reading(grpc_transport* transport,
- grpc_slice_buffer* read_buffer);
+/// If non-null, \a notify_on_receive_settings will be scheduled when
+/// HTTP/2 settings are received from the peer.
+void grpc_chttp2_transport_start_reading(
+ grpc_transport* transport, grpc_slice_buffer* read_buffer,
+ grpc_closure* notify_on_receive_settings);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H */
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc
index d4f84f352d..c6c2a6c301 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc
@@ -130,6 +130,11 @@ grpc_error* grpc_chttp2_settings_parser_parse(void* p, grpc_chttp2_transport* t,
memcpy(parser->target_settings, parser->incoming_settings,
GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t));
grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
+ if (t->notify_on_receive_settings != nullptr) {
+ GRPC_CLOSURE_SCHED(t->notify_on_receive_settings,
+ GRPC_ERROR_NONE);
+ t->notify_on_receive_settings = nullptr;
+ }
}
return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 3653ec6f56..932f5ba83d 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -241,6 +241,8 @@ struct grpc_chttp2_transport {
grpc_combiner* combiner;
+ grpc_closure* notify_on_receive_settings;
+
/** write execution state of the transport */
grpc_chttp2_write_state write_state;
/** is this the first write in a series of writes?
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index d816a5d1a7..8dd0b7dce2 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -447,6 +447,14 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
} else {
err = GRPC_ERROR_REF(error);
}
+ if (s->recv_initial_md_op->payload->recv_initial_metadata
+ .trailing_metadata_available != nullptr) {
+ // Set to true unconditionally, because we're failing the call, so even
+ // if we haven't actually seen the send_trailing_metadata op from the
+ // other side, we're going to return trailing metadata anyway.
+ *s->recv_initial_md_op->payload->recv_initial_metadata
+ .trailing_metadata_available = true;
+ }
INPROC_LOG(GPR_DEBUG,
"fail_helper %p scheduling initial-metadata-ready %p %p", s,
error, err);
@@ -655,6 +663,12 @@ static void op_state_machine(void* arg, grpc_error* error) {
nullptr);
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata->deadline = s->deadline;
+ if (s->recv_initial_md_op->payload->recv_initial_metadata
+ .trailing_metadata_available != nullptr) {
+ *s->recv_initial_md_op->payload->recv_initial_metadata
+ .trailing_metadata_available =
+ (other != nullptr && other->send_trailing_md_op != nullptr);
+ }
grpc_metadata_batch_clear(&s->to_read_initial_md);
s->to_read_initial_md_filled = false;
INPROC_LOG(GPR_DEBUG,
@@ -974,6 +988,15 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
if (error != GRPC_ERROR_NONE) {
// Schedule op's closures that we didn't push to op state machine
if (op->recv_initial_metadata) {
+ if (op->payload->recv_initial_metadata.trailing_metadata_available !=
+ nullptr) {
+ // Set to true unconditionally, because we're failing the call, so
+ // even if we haven't actually seen the send_trailing_metadata op
+ // from the other side, we're going to return trailing metadata
+ // anyway.
+ *op->payload->recv_initial_metadata.trailing_metadata_available =
+ true;
+ }
INPROC_LOG(
GPR_DEBUG,
"perform_stream_op error %p scheduling initial-metadata-ready %p",
diff --git a/src/core/lib/channel/handshaker.cc b/src/core/lib/channel/handshaker.cc
index 1f2426bc6c..dcb149c03e 100644
--- a/src/core/lib/channel/handshaker.cc
+++ b/src/core/lib/channel/handshaker.cc
@@ -219,18 +219,17 @@ static void on_timeout(void* arg, grpc_error* error) {
grpc_handshake_manager_unref(mgr);
}
-void grpc_handshake_manager_do_handshake(grpc_handshake_manager* mgr,
- grpc_endpoint* endpoint,
- const grpc_channel_args* channel_args,
- grpc_millis deadline,
- grpc_tcp_server_acceptor* acceptor,
- grpc_iomgr_cb_func on_handshake_done,
- void* user_data) {
+void grpc_handshake_manager_do_handshake(
+ grpc_handshake_manager* mgr, grpc_pollset_set* interested_parties,
+ grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
+ grpc_millis deadline, grpc_tcp_server_acceptor* acceptor,
+ grpc_iomgr_cb_func on_handshake_done, void* user_data) {
gpr_mu_lock(&mgr->mu);
GPR_ASSERT(mgr->index == 0);
GPR_ASSERT(!mgr->shutdown);
// Construct handshaker args. These will be passed through all
// handshakers and eventually be freed by the on_handshake_done callback.
+ mgr->args.interested_parties = interested_parties;
mgr->args.endpoint = endpoint;
mgr->args.args = grpc_channel_args_copy(channel_args);
mgr->args.user_data = user_data;
diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h
index 50038e6983..68e5463123 100644
--- a/src/core/lib/channel/handshaker.h
+++ b/src/core/lib/channel/handshaker.h
@@ -54,6 +54,7 @@ typedef struct grpc_handshaker grpc_handshaker;
/// For the on_handshake_done callback, all members are input arguments,
/// which the callback takes ownership of.
typedef struct {
+ grpc_pollset_set* interested_parties;
grpc_endpoint* endpoint;
grpc_channel_args* args;
grpc_slice_buffer* read_buffer;
@@ -125,24 +126,24 @@ void grpc_handshake_manager_shutdown(grpc_handshake_manager* mgr,
grpc_error* why);
/// Invokes handshakers in the order they were added.
+/// \a interested_parties may be non-nullptr to provide a pollset_set that
+/// may be used during handshaking. Ownership is not taken.
/// Takes ownership of \a endpoint, and then passes that ownership to
/// the \a on_handshake_done callback.
/// Does NOT take ownership of \a channel_args. Instead, makes a copy before
/// invoking the first handshaker.
-/// \a acceptor will be NULL for client-side handshakers.
+/// \a acceptor will be nullptr for client-side handshakers.
///
/// When done, invokes \a on_handshake_done with a grpc_handshaker_args
/// object as its argument. If the callback is invoked with error !=
/// GRPC_ERROR_NONE, then handshaking failed and the handshaker has done
/// the necessary clean-up. Otherwise, the callback takes ownership of
/// the arguments.
-void grpc_handshake_manager_do_handshake(grpc_handshake_manager* mgr,
- grpc_endpoint* endpoint,
- const grpc_channel_args* channel_args,
- grpc_millis deadline,
- grpc_tcp_server_acceptor* acceptor,
- grpc_iomgr_cb_func on_handshake_done,
- void* user_data);
+void grpc_handshake_manager_do_handshake(
+ grpc_handshake_manager* mgr, grpc_pollset_set* interested_parties,
+ grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
+ grpc_millis deadline, grpc_tcp_server_acceptor* acceptor,
+ grpc_iomgr_cb_func on_handshake_done, void* user_data);
/// Add \a mgr to the server side list of all pending handshake managers, the
/// list starts with \a *head.
diff --git a/src/core/lib/http/httpcli_security_connector.cc b/src/core/lib/http/httpcli_security_connector.cc
index 26599bc50a..bfb536a921 100644
--- a/src/core/lib/http/httpcli_security_connector.cc
+++ b/src/core/lib/http/httpcli_security_connector.cc
@@ -184,8 +184,9 @@ static void ssl_handshake(void* arg, grpc_endpoint* tcp, const char* host,
c->handshake_mgr = grpc_handshake_manager_create();
grpc_handshakers_add(HANDSHAKER_CLIENT, &args, c->handshake_mgr);
grpc_handshake_manager_do_handshake(
- c->handshake_mgr, tcp, nullptr /* channel_args */, deadline,
- nullptr /* acceptor */, on_handshake_done, c /* user_data */);
+ c->handshake_mgr, nullptr /* interested_parties */, tcp,
+ nullptr /* channel_args */, deadline, nullptr /* acceptor */,
+ on_handshake_done, c /* user_data */);
GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli");
}
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index 72443cc29e..5139760634 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -55,7 +55,7 @@ static void init_max_accept_queue_size(void) {
if (fgets(buf, sizeof buf, fp)) {
char* end;
long i = strtol(buf, &end, 10);
- if (i > 0 && i <= INT_MAX && end && *end == 0) {
+ if (i > 0 && i <= INT_MAX && end && *end == '\n') {
n = (int)i;
}
}
diff --git a/src/core/lib/iomgr/tcp_server_uv.cc b/src/core/lib/iomgr/tcp_server_uv.cc
index 55f16126b6..1ac49190fb 100644
--- a/src/core/lib/iomgr/tcp_server_uv.cc
+++ b/src/core/lib/iomgr/tcp_server_uv.cc
@@ -250,15 +250,36 @@ static void on_connect(uv_stream_t* server, int status) {
}
}
-static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle,
- const grpc_resolved_address* addr,
- unsigned port_index,
- grpc_tcp_listener** listener) {
+static grpc_error* add_addr_to_server(grpc_tcp_server* s,
+ const grpc_resolved_address* addr,
+ unsigned port_index,
+ grpc_tcp_listener** listener) {
grpc_tcp_listener* sp = NULL;
int port = -1;
int status;
grpc_error* error;
grpc_resolved_address sockname_temp;
+ uv_tcp_t* handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
+ int family = grpc_sockaddr_get_family(addr);
+
+ status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family);
+#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
+ if (family == AF_INET || family == AF_INET6) {
+ int fd;
+ uv_fileno((uv_handle_t*)handle, &fd);
+ int enable = 1;
+ setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
+ }
+#endif /* GPR_LINUX && SO_REUSEPORT */
+
+ if (status != 0) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Failed to initialize UV tcp handle");
+ error =
+ grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ grpc_slice_from_static_string(uv_strerror(status)));
+ return error;
+ }
// The last argument to uv_tcp_bind is flags
status = uv_tcp_bind(handle, (struct sockaddr*)addr->addr, 0);
@@ -315,20 +336,48 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle,
return GRPC_ERROR_NONE;
}
+static grpc_error* add_wildcard_addrs_to_server(grpc_tcp_server* s,
+ unsigned port_index,
+ int requested_port,
+ grpc_tcp_listener** listener) {
+ grpc_resolved_address wild4;
+ grpc_resolved_address wild6;
+ grpc_tcp_listener* sp = nullptr;
+ grpc_tcp_listener* sp2 = nullptr;
+ grpc_error* v6_err = GRPC_ERROR_NONE;
+ grpc_error* v4_err = GRPC_ERROR_NONE;
+
+ grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
+ /* Try listening on IPv6 first. */
+ if ((v6_err = add_addr_to_server(s, &wild6, port_index, &sp)) ==
+ GRPC_ERROR_NONE) {
+ *listener = sp;
+ return GRPC_ERROR_NONE;
+ }
+
+ if ((v4_err = add_addr_to_server(s, &wild4, port_index, &sp2)) ==
+ GRPC_ERROR_NONE) {
+ *listener = sp2;
+ return GRPC_ERROR_NONE;
+ }
+
+ grpc_error* root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Failed to add any wildcard listeners");
+ root_err = grpc_error_add_child(root_err, v6_err);
+ root_err = grpc_error_add_child(root_err, v4_err);
+ return root_err;
+}
+
grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
const grpc_resolved_address* addr,
int* port) {
// This function is mostly copied from tcp_server_windows.c
grpc_tcp_listener* sp = NULL;
- uv_tcp_t* handle;
grpc_resolved_address addr6_v4mapped;
- grpc_resolved_address wildcard;
grpc_resolved_address* allocated_addr = NULL;
grpc_resolved_address sockname_temp;
unsigned port_index = 0;
- int status;
grpc_error* error = GRPC_ERROR_NONE;
- int family;
GRPC_UV_ASSERT_SAME_THREAD();
@@ -357,38 +406,15 @@ grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
}
}
- if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
- addr = &addr6_v4mapped;
- }
-
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
if (grpc_sockaddr_is_wildcard(addr, port)) {
- grpc_sockaddr_make_wildcard6(*port, &wildcard);
-
- addr = &wildcard;
- }
-
- handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
-
- family = grpc_sockaddr_get_family(addr);
- status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family);
-#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
- if (family == AF_INET || family == AF_INET6) {
- int fd;
- uv_fileno((uv_handle_t*)handle, &fd);
- int enable = 1;
- setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
- }
-#endif /* GPR_LINUX && SO_REUSEPORT */
-
- if (status == 0) {
- error = add_socket_to_server(s, handle, addr, port_index, &sp);
+ error = add_wildcard_addrs_to_server(s, port_index, *port, &sp);
} else {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Failed to initialize UV tcp handle");
- error =
- grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string(uv_strerror(status)));
+ if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
+ addr = &addr6_v4mapped;
+ }
+
+ error = add_addr_to_server(s, addr, port_index, &sp);
}
gpr_free(allocated_addr);
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index d4de04a894..55e0b165ec 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -47,6 +47,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -71,14 +72,22 @@ struct grpc_udp_listener {
grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
+ // To be scheduled on another thread to actually read/write.
+ grpc_closure do_read_closure;
+ grpc_closure do_write_closure;
+ grpc_closure notify_on_write_closure;
// True if orphan_cb is trigered.
bool orphan_notified;
+ // True if grpc_fd_notify_on_write() is called after on_write() call.
+ bool notify_on_write_armed;
+ // True if fd has been shutdown.
+ bool already_shutdown;
struct grpc_udp_listener* next;
};
struct shutdown_fd_args {
- grpc_fd* fd;
+ grpc_udp_listener* sp;
gpr_mu* server_mu;
};
@@ -143,8 +152,17 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) {
static void shutdown_fd(void* args, grpc_error* error) {
struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args;
+ grpc_udp_listener* sp = shutdown_args->sp;
+ gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd);
gpr_mu_lock(shutdown_args->server_mu);
- grpc_fd_shutdown(shutdown_args->fd, GRPC_ERROR_REF(error));
+ grpc_fd_shutdown(sp->emfd, GRPC_ERROR_REF(error));
+ sp->already_shutdown = true;
+ if (!sp->notify_on_write_armed) {
+ // Re-arm write notification to notify listener with error. This is
+ // necessary to decrement active_ports.
+ sp->notify_on_write_armed = true;
+ grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
+ }
gpr_mu_unlock(shutdown_args->server_mu);
gpr_free(shutdown_args);
}
@@ -160,6 +178,7 @@ static void finish_shutdown(grpc_udp_server* s) {
gpr_mu_destroy(&s->mu);
+ gpr_log(GPR_DEBUG, "Destroy all listeners.");
while (s->head) {
grpc_udp_listener* sp = s->head;
s->head = sp->next;
@@ -205,9 +224,10 @@ static void deactivated_all_ports(grpc_udp_server* s) {
/* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. Because at this point, all listening ports
* have been shutdown already, no need to shutdown again.*/
- GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp->emfd,
+ GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp,
grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->orphan_cb);
+ gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd);
sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data);
}
grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
@@ -229,13 +249,14 @@ void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) {
s->shutdown_complete = on_done;
+ gpr_log(GPR_DEBUG, "start to destroy udp_server");
/* shutdown all fd's */
if (s->active_ports) {
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
struct shutdown_fd_args* args =
(struct shutdown_fd_args*)gpr_malloc(sizeof(*args));
- args->fd = sp->emfd;
+ args->sp = sp;
args->server_mu = &s->mu;
GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args,
grpc_schedule_on_exec_ctx);
@@ -324,6 +345,27 @@ error:
return -1;
}
+static void do_read(void* arg, grpc_error* error) {
+ grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
+ GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE);
+ /* TODO: the reason we hold server->mu here is merely to prevent fd
+ * shutdown while we are reading. However, it blocks do_write(). Switch to
+ * read lock if available. */
+ gpr_mu_lock(&sp->server->mu);
+ /* Tell the registered callback that data is available to read. */
+ if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) {
+ /* There maybe more packets to read. Schedule read_more_cb_ closure to run
+ * after finishing this event loop. */
+ GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE);
+ } else {
+ /* Finish reading all the packets, re-arm the notification event so we can
+ * get another chance to read. Or fd already shutdown, re-arm to get a
+ * notification with shutdown error. */
+ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+ }
+ gpr_mu_unlock(&sp->server->mu);
+}
+
/* event manager callback when reads are ready */
static void on_read(void* arg, grpc_error* error) {
grpc_udp_listener* sp = (grpc_udp_listener*)arg;
@@ -338,13 +380,49 @@ static void on_read(void* arg, grpc_error* error) {
}
return;
}
-
- /* Tell the registered callback that data is available to read. */
+ /* Read once. If there is more data to read, off load the work to another
+ * thread to finish. */
GPR_ASSERT(sp->read_cb);
- sp->read_cb(sp->emfd, sp->server->user_data);
+ if (sp->read_cb(sp->emfd, sp->server->user_data)) {
+ /* There maybe more packets to read. Schedule read_more_cb_ closure to run
+ * after finishing this event loop. */
+ GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg,
+ grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
+ GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE);
+ } else {
+ /* Finish reading all the packets, re-arm the notification event so we can
+ * get another chance to read. Or fd already shutdown, re-arm to get a
+ * notification with shutdown error. */
+ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+ }
+ gpr_mu_unlock(&sp->server->mu);
+}
+
+// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface.
+void fd_notify_on_write_wrapper(void* arg, grpc_error* error) {
+ grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
+ gpr_mu_lock(&sp->server->mu);
+ if (!sp->notify_on_write_armed) {
+ grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
+ sp->notify_on_write_armed = true;
+ }
+ gpr_mu_unlock(&sp->server->mu);
+}
- /* Re-arm the notification event so we get another chance to read. */
- grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+static void do_write(void* arg, grpc_error* error) {
+ grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
+ gpr_mu_lock(&(sp->server->mu));
+ if (sp->already_shutdown) {
+ // If fd has been shutdown, don't write any more and re-arm notification.
+ grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
+ } else {
+ sp->notify_on_write_armed = false;
+ /* Tell the registered callback that the socket is writeable. */
+ GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE);
+ GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper,
+ arg, grpc_schedule_on_exec_ctx);
+ sp->write_cb(sp->emfd, sp->server->user_data, &sp->notify_on_write_closure);
+ }
gpr_mu_unlock(&sp->server->mu);
}
@@ -362,12 +440,11 @@ static void on_write(void* arg, grpc_error* error) {
return;
}
- /* Tell the registered callback that the socket is writeable. */
- GPR_ASSERT(sp->write_cb);
- sp->write_cb(sp->emfd, sp->server->user_data);
+ /* Schedule actual write in another thread. */
+ GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg,
+ grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
- /* Re-arm the notification event so we get another chance to write. */
- grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
+ GRPC_CLOSURE_SCHED(&sp->do_write_closure, GRPC_ERROR_NONE);
gpr_mu_unlock(&sp->server->mu);
}
@@ -404,6 +481,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
sp->orphan_notified = false;
+ sp->already_shutdown = false;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(name);
@@ -527,6 +605,7 @@ void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp,
grpc_schedule_on_exec_ctx);
+ sp->notify_on_write_armed = true;
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
/* Registered for both read and write callbacks: increment active_ports
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index d9786be27c..02e3acb7f5 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -30,11 +30,14 @@ struct grpc_server;
/* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server;
-/* Called when data is available to read from the socket. */
-typedef void (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data);
+/* Called when data is available to read from the socket.
+ * Return true if there is more data to read from fd. */
+typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data);
-/* Called when the socket is writeable. */
-typedef void (*grpc_udp_server_write_cb)(grpc_fd* emfd, void* user_data);
+/* Called when the socket is writeable. The given closure should be scheduled
+ * when the socket becomes blocked next time. */
+typedef void (*grpc_udp_server_write_cb)(grpc_fd* emfd, void* user_data,
+ grpc_closure* notify_on_write_closure);
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
typedef void (*grpc_udp_server_orphan_cb)(grpc_fd* emfd,
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index ec57a47005..b03c0218dc 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -321,9 +321,6 @@ void grpc_transport_ping(grpc_transport* transport, grpc_closure* cb);
void grpc_transport_goaway(grpc_transport* transport, grpc_status_code status,
grpc_slice debug_data);
-/* Close a transport. Aborts all open streams. */
-void grpc_transport_close(grpc_transport* transport);
-
/* Destroy the transport */
void grpc_transport_destroy(grpc_transport* transport);
diff --git a/src/csharp/generate_proto_csharp.sh b/src/csharp/generate_proto_csharp.sh
index 1a1adbbae5..299dc3f816 100755
--- a/src/csharp/generate_proto_csharp.sh
+++ b/src/csharp/generate_proto_csharp.sh
@@ -33,6 +33,11 @@ $PROTOC --plugin=$PLUGIN --csharp_out=$HEALTHCHECK_DIR --grpc_out=$HEALTHCHECK_D
$PROTOC --plugin=$PLUGIN --csharp_out=$REFLECTION_DIR --grpc_out=$REFLECTION_DIR \
-I src/proto src/proto/grpc/reflection/v1alpha/reflection.proto
+# Put grp/core/stats.proto in a subdirectory to avoid collision with grpc/testing/stats.proto
+mkdir -p $TESTING_DIR/CoreStats
+$PROTOC --plugin=$PLUGIN --csharp_out=$TESTING_DIR/CoreStats --grpc_out=$TESTING_DIR/CoreStats \
+ -I src/proto src/proto/grpc/core/stats.proto
+
# TODO(jtattermusch): following .proto files are a bit broken and import paths
# don't match the package names. Setting -I to the correct value src/proto
# breaks the code generation.
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index 558ce42129..564772527e 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -348,26 +348,25 @@ class Call(six.with_metaclass(abc.ABCMeta, RpcContext)):
class ChannelCredentials(object):
"""An encapsulation of the data required to create a secure Channel.
- This class has no supported interface - it exists to define the type of its
- instances and its instances exist to be passed to other functions. For
- example, ssl_channel_credentials returns an instance, and secure_channel
- consumes an instance of this class.
- """
+ This class has no supported interface - it exists to define the type of its
+ instances and its instances exist to be passed to other functions. For
+ example, ssl_channel_credentials returns an instance of this class and
+ secure_channel requires an instance of this class.
+ """
def __init__(self, credentials):
self._credentials = credentials
class CallCredentials(object):
- """An encapsulation of the data required to assert an identity over a
- channel.
+ """An encapsulation of the data required to assert an identity over a call.
- A CallCredentials may be composed with ChannelCredentials to always assert
- identity for every call over that Channel.
+ A CallCredentials may be composed with ChannelCredentials to always assert
+ identity for every call over that Channel.
- This class has no supported interface - it exists to define the type of its
- instances and its instances exist to be passed to other functions.
- """
+ This class has no supported interface - it exists to define the type of its
+ instances and its instances exist to be passed to other functions.
+ """
def __init__(self, credentials):
self._credentials = credentials
@@ -376,23 +375,22 @@ class CallCredentials(object):
class AuthMetadataContext(six.with_metaclass(abc.ABCMeta)):
"""Provides information to call credentials metadata plugins.
- Attributes:
- service_url: A string URL of the service being called into.
- method_name: A string of the fully qualified method name being called.
- """
+ Attributes:
+ service_url: A string URL of the service being called into.
+ method_name: A string of the fully qualified method name being called.
+ """
class AuthMetadataPluginCallback(six.with_metaclass(abc.ABCMeta)):
"""Callback object received by a metadata plugin."""
def __call__(self, metadata, error):
- """Inform the gRPC runtime of the metadata to construct a
- CallCredentials.
+ """Passes to the gRPC runtime authentication metadata for an RPC.
- Args:
- metadata: The :term:`metadata` used to construct the CallCredentials.
- error: An Exception to indicate error or None to indicate success.
- """
+ Args:
+ metadata: The :term:`metadata` used to construct the CallCredentials.
+ error: An Exception to indicate error or None to indicate success.
+ """
raise NotImplementedError()
@@ -402,14 +400,14 @@ class AuthMetadataPlugin(six.with_metaclass(abc.ABCMeta)):
def __call__(self, context, callback):
"""Implements authentication by passing metadata to a callback.
- Implementations of this method must not block.
+ Implementations of this method must not block.
- Args:
- context: An AuthMetadataContext providing information on the RPC that the
- plugin is being called to authenticate.
- callback: An AuthMetadataPluginCallback to be invoked either synchronously
- or asynchronously.
- """
+ Args:
+ context: An AuthMetadataContext providing information on the RPC that
+ the plugin is being called to authenticate.
+ callback: An AuthMetadataPluginCallback to be invoked either
+ synchronously or asynchronously.
+ """
raise NotImplementedError()
@@ -1138,99 +1136,86 @@ def ssl_channel_credentials(root_certificates=None,
certificate_chain=None):
"""Creates a ChannelCredentials for use with an SSL-enabled Channel.
- Args:
- root_certificates: The PEM-encoded root certificates as a byte string,
- or None to retrieve them from a default location chosen by gRPC runtime.
- private_key: The PEM-encoded private key as a byte string, or None if no
- private key should be used.
- certificate_chain: The PEM-encoded certificate chain as a byte string
- to use or or None if no certificate chain should be used.
+ Args:
+ root_certificates: The PEM-encoded root certificates as a byte string,
+ or None to retrieve them from a default location chosen by gRPC
+ runtime.
+ private_key: The PEM-encoded private key as a byte string, or None if no
+ private key should be used.
+ certificate_chain: The PEM-encoded certificate chain as a byte string
+ to use or or None if no certificate chain should be used.
- Returns:
- A ChannelCredentials for use with an SSL-enabled Channel.
- """
- if private_key is not None or certificate_chain is not None:
- pair = _cygrpc.SslPemKeyCertPair(private_key, certificate_chain)
- else:
- pair = None
+ Returns:
+ A ChannelCredentials for use with an SSL-enabled Channel.
+ """
return ChannelCredentials(
- _cygrpc.channel_credentials_ssl(root_certificates, pair))
+ _cygrpc.SSLChannelCredentials(root_certificates, private_key,
+ certificate_chain))
def metadata_call_credentials(metadata_plugin, name=None):
"""Construct CallCredentials from an AuthMetadataPlugin.
- Args:
- metadata_plugin: An AuthMetadataPlugin to use for authentication.
- name: An optional name for the plugin.
+ Args:
+ metadata_plugin: An AuthMetadataPlugin to use for authentication.
+ name: An optional name for the plugin.
- Returns:
- A CallCredentials.
- """
+ Returns:
+ A CallCredentials.
+ """
from grpc import _plugin_wrapping # pylint: disable=cyclic-import
- if name is None:
- try:
- effective_name = metadata_plugin.__name__
- except AttributeError:
- effective_name = metadata_plugin.__class__.__name__
- else:
- effective_name = name
- return CallCredentials(
- _plugin_wrapping.call_credentials_metadata_plugin(metadata_plugin,
- effective_name))
+ return _plugin_wrapping.metadata_plugin_call_credentials(metadata_plugin,
+ name)
def access_token_call_credentials(access_token):
"""Construct CallCredentials from an access token.
- Args:
- access_token: A string to place directly in the http request
- authorization header, for example
- "authorization: Bearer <access_token>".
+ Args:
+ access_token: A string to place directly in the http request
+ authorization header, for example
+ "authorization: Bearer <access_token>".
- Returns:
- A CallCredentials.
- """
+ Returns:
+ A CallCredentials.
+ """
from grpc import _auth # pylint: disable=cyclic-import
- return metadata_call_credentials(
- _auth.AccessTokenCallCredentials(access_token))
+ from grpc import _plugin_wrapping # pylint: disable=cyclic-import
+ return _plugin_wrapping.metadata_plugin_call_credentials(
+ _auth.AccessTokenAuthMetadataPlugin(access_token), None)
def composite_call_credentials(*call_credentials):
"""Compose multiple CallCredentials to make a new CallCredentials.
- Args:
- *call_credentials: At least two CallCredentials objects.
+ Args:
+ *call_credentials: At least two CallCredentials objects.
- Returns:
- A CallCredentials object composed of the given CallCredentials objects.
- """
- from grpc import _credential_composition # pylint: disable=cyclic-import
- cygrpc_call_credentials = tuple(
- single_call_credentials._credentials
- for single_call_credentials in call_credentials)
+ Returns:
+ A CallCredentials object composed of the given CallCredentials objects.
+ """
return CallCredentials(
- _credential_composition.call(cygrpc_call_credentials))
+ _cygrpc.CompositeCallCredentials(
+ tuple(single_call_credentials._credentials
+ for single_call_credentials in call_credentials)))
def composite_channel_credentials(channel_credentials, *call_credentials):
"""Compose a ChannelCredentials and one or more CallCredentials objects.
- Args:
- channel_credentials: A ChannelCredentials object.
- *call_credentials: One or more CallCredentials objects.
+ Args:
+ channel_credentials: A ChannelCredentials object.
+ *call_credentials: One or more CallCredentials objects.
- Returns:
- A ChannelCredentials composed of the given ChannelCredentials and
- CallCredentials objects.
- """
- from grpc import _credential_composition # pylint: disable=cyclic-import
- cygrpc_call_credentials = tuple(
- single_call_credentials._credentials
- for single_call_credentials in call_credentials)
+ Returns:
+ A ChannelCredentials composed of the given ChannelCredentials and
+ CallCredentials objects.
+ """
return ChannelCredentials(
- _credential_composition.channel(channel_credentials._credentials,
- cygrpc_call_credentials))
+ _cygrpc.CompositeChannelCredentials(
+ tuple(single_call_credentials._credentials
+ for single_call_credentials in call_credentials),
+ channel_credentials._credentials))
def ssl_server_credentials(private_key_certificate_chain_pairs,
diff --git a/src/python/grpcio/grpc/_auth.py b/src/python/grpcio/grpc/_auth.py
index c6542d0135..9a339b5900 100644
--- a/src/python/grpcio/grpc/_auth.py
+++ b/src/python/grpcio/grpc/_auth.py
@@ -63,7 +63,7 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin):
self._pool.shutdown(wait=False)
-class AccessTokenCallCredentials(grpc.AuthMetadataPlugin):
+class AccessTokenAuthMetadataPlugin(grpc.AuthMetadataPlugin):
"""Metadata wrapper for raw access token credentials."""
def __init__(self, access_token):
diff --git a/src/python/grpcio/grpc/_credential_composition.py b/src/python/grpcio/grpc/_credential_composition.py
deleted file mode 100644
index f652cc3ae7..0000000000
--- a/src/python/grpcio/grpc/_credential_composition.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# Copyright 2016 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from grpc._cython import cygrpc
-
-
-def _call(call_credentialses):
- call_credentials_iterator = iter(call_credentialses)
- composition = next(call_credentials_iterator)
- for additional_call_credentials in call_credentials_iterator:
- composition = cygrpc.call_credentials_composite(
- composition, additional_call_credentials)
- return composition
-
-
-def call(call_credentialses):
- return _call(call_credentialses)
-
-
-def channel(channel_credentials, call_credentialses):
- return cygrpc.channel_credentials_composite(channel_credentials,
- _call(call_credentialses))
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
index 752fb330d0..6b3a276097 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -72,13 +72,12 @@ cdef class Call:
result = grpc_call_cancel(self.c_call, NULL)
return result
- def set_credentials(
- self, CallCredentials call_credentials not None):
- cdef grpc_call_error result
- with nogil:
- result = grpc_call_set_credentials(
- self.c_call, call_credentials.c_credentials)
- return result
+ def set_credentials(self, CallCredentials call_credentials not None):
+ cdef grpc_call_credentials *c_call_credentials = call_credentials.c()
+ cdef grpc_call_error call_error = grpc_call_set_credentials(
+ self.c_call, c_call_credentials)
+ grpc_call_credentials_release(c_call_credentials)
+ return call_error
def peer(self):
cdef char *peer = NULL
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index aeabdba021..4c397f8f64 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -33,10 +33,10 @@ cdef class Channel:
self.c_channel = grpc_insecure_channel_create(c_target, c_arguments,
NULL)
else:
- with nogil:
- self.c_channel = grpc_secure_channel_create(
- channel_credentials.c_credentials, c_target, c_arguments, NULL)
- self.references.append(channel_credentials)
+ c_channel_credentials = channel_credentials.c()
+ self.c_channel = grpc_secure_channel_create(
+ c_channel_credentials, c_target, c_arguments, NULL)
+ grpc_channel_credentials_release(c_channel_credentials)
self.references.append(target)
self.references.append(arguments)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi
index bc0f185c77..7e9ea33ca0 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi
@@ -12,20 +12,66 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-cimport cpython
+
+cdef class CallCredentials:
+
+ cdef grpc_call_credentials *c(self)
+
+ # TODO(https://github.com/grpc/grpc/issues/12531): remove.
+ cdef grpc_call_credentials *c_credentials
+
+
+cdef int _get_metadata(
+ void *state, grpc_auth_metadata_context context,
+ grpc_credentials_plugin_metadata_cb cb, void *user_data,
+ grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
+ size_t *num_creds_md, grpc_status_code *status,
+ const char **error_details) with gil
+
+cdef void _destroy(void *state) with gil
+
+
+cdef class MetadataPluginCallCredentials(CallCredentials):
+
+ cdef readonly object _metadata_plugin
+ cdef readonly bytes _name
+
+ cdef grpc_call_credentials *c(self)
+
+
+cdef grpc_call_credentials *_composition(call_credentialses)
+
+
+cdef class CompositeCallCredentials(CallCredentials):
+
+ cdef readonly tuple _call_credentialses
+
+ cdef grpc_call_credentials *c(self)
cdef class ChannelCredentials:
+ cdef grpc_channel_credentials *c(self)
+
+ # TODO(https://github.com/grpc/grpc/issues/12531): remove.
cdef grpc_channel_credentials *c_credentials
- cdef grpc_ssl_pem_key_cert_pair c_ssl_pem_key_cert_pair
- cdef list references
-cdef class CallCredentials:
+cdef class SSLChannelCredentials(ChannelCredentials):
- cdef grpc_call_credentials *c_credentials
- cdef list references
+ cdef readonly object _pem_root_certificates
+ cdef readonly object _private_key
+ cdef readonly object _certificate_chain
+
+ cdef grpc_channel_credentials *c(self)
+
+
+cdef class CompositeChannelCredentials(ChannelCredentials):
+
+ cdef readonly tuple _call_credentialses
+ cdef readonly ChannelCredentials _channel_credentials
+
+ cdef grpc_channel_credentials *c(self)
cdef class ServerCertificateConfig:
@@ -49,27 +95,3 @@ cdef class ServerCredentials:
cdef object cert_config_fetcher
# whether C-core has asked for the initial_cert_config
cdef bint initial_cert_config_fetched
-
-
-cdef class CredentialsMetadataPlugin:
-
- cdef object plugin_callback
- cdef bytes plugin_name
-
-
-cdef grpc_metadata_credentials_plugin _c_plugin(CredentialsMetadataPlugin plugin)
-
-
-cdef class AuthMetadataContext:
-
- cdef grpc_auth_metadata_context context
-
-
-cdef int plugin_get_metadata(
- void *state, grpc_auth_metadata_context context,
- grpc_credentials_plugin_metadata_cb cb, void *user_data,
- grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
- size_t *num_creds_md, grpc_status_code *status,
- const char **error_details) with gil
-
-cdef void plugin_destroy_c_plugin_state(void *state) with gil
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
index ee1328d2e9..246a271893 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
@@ -16,47 +16,123 @@ cimport cpython
import grpc
import threading
-import traceback
-cdef class ChannelCredentials:
+cdef class CallCredentials:
- def __cinit__(self):
- grpc_init()
- self.c_credentials = NULL
- self.c_ssl_pem_key_cert_pair.private_key = NULL
- self.c_ssl_pem_key_cert_pair.certificate_chain = NULL
- self.references = []
+ cdef grpc_call_credentials *c(self):
+ raise NotImplementedError()
- # The object *can* be invalid in Python if we fail to make the credentials
- # (and the core thus returns NULL credentials). Used primarily for debugging.
- @property
- def is_valid(self):
- return self.c_credentials != NULL
- def __dealloc__(self):
- if self.c_credentials != NULL:
- grpc_channel_credentials_release(self.c_credentials)
- grpc_shutdown()
+cdef int _get_metadata(
+ void *state, grpc_auth_metadata_context context,
+ grpc_credentials_plugin_metadata_cb cb, void *user_data,
+ grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
+ size_t *num_creds_md, grpc_status_code *status,
+ const char **error_details) with gil:
+ def callback(Metadata metadata, grpc_status_code status, bytes error_details):
+ if status is StatusCode.ok:
+ cb(user_data, metadata.c_metadata, metadata.c_count, status, NULL)
+ else:
+ cb(user_data, NULL, 0, status, error_details)
+ args = context.service_url, context.method_name, callback,
+ threading.Thread(target=<object>state, args=args).start()
+ return 0 # Asynchronous return
-cdef class CallCredentials:
+cdef void _destroy(void *state) with gil:
+ cpython.Py_DECREF(<object>state)
- def __cinit__(self):
- grpc_init()
- self.c_credentials = NULL
- self.references = []
- # The object *can* be invalid in Python if we fail to make the credentials
- # (and the core thus returns NULL credentials). Used primarily for debugging.
- @property
- def is_valid(self):
- return self.c_credentials != NULL
+cdef class MetadataPluginCallCredentials(CallCredentials):
- def __dealloc__(self):
- if self.c_credentials != NULL:
- grpc_call_credentials_release(self.c_credentials)
- grpc_shutdown()
+ def __cinit__(self, metadata_plugin, name):
+ self._metadata_plugin = metadata_plugin
+ self._name = name
+
+ cdef grpc_call_credentials *c(self):
+ cdef grpc_metadata_credentials_plugin c_metadata_plugin
+ c_metadata_plugin.get_metadata = _get_metadata
+ c_metadata_plugin.destroy = _destroy
+ c_metadata_plugin.state = <void *>self._metadata_plugin
+ c_metadata_plugin.type = self._name
+ cpython.Py_INCREF(self._metadata_plugin)
+ return grpc_metadata_credentials_create_from_plugin(c_metadata_plugin, NULL)
+
+
+cdef grpc_call_credentials *_composition(call_credentialses):
+ call_credentials_iterator = iter(call_credentialses)
+ cdef CallCredentials composition = next(call_credentials_iterator)
+ cdef grpc_call_credentials *c_composition = composition.c()
+ cdef CallCredentials additional_call_credentials
+ cdef grpc_call_credentials *c_additional_call_credentials
+ cdef grpc_call_credentials *c_next_composition
+ for additional_call_credentials in call_credentials_iterator:
+ c_additional_call_credentials = additional_call_credentials.c()
+ c_next_composition = grpc_composite_call_credentials_create(
+ c_composition, c_additional_call_credentials, NULL)
+ grpc_call_credentials_release(c_composition)
+ grpc_call_credentials_release(c_additional_call_credentials)
+ c_composition = c_next_composition
+ return c_composition
+
+
+cdef class CompositeCallCredentials(CallCredentials):
+
+ def __cinit__(self, call_credentialses):
+ self._call_credentialses = call_credentialses
+
+ cdef grpc_call_credentials *c(self):
+ return _composition(self._call_credentialses)
+
+
+cdef class ChannelCredentials:
+
+ cdef grpc_channel_credentials *c(self):
+ raise NotImplementedError()
+
+
+cdef class SSLChannelCredentials(ChannelCredentials):
+
+ def __cinit__(self, pem_root_certificates, private_key, certificate_chain):
+ self._pem_root_certificates = pem_root_certificates
+ self._private_key = private_key
+ self._certificate_chain = certificate_chain
+
+ cdef grpc_channel_credentials *c(self):
+ cdef const char *c_pem_root_certificates
+ cdef grpc_ssl_pem_key_cert_pair c_pem_key_certificate_pair
+ if self._pem_root_certificates is None:
+ c_pem_root_certificates = NULL
+ else:
+ c_pem_root_certificates = self._pem_root_certificates
+ if self._private_key is None and self._certificate_chain is None:
+ return grpc_ssl_credentials_create(
+ c_pem_root_certificates, NULL, NULL)
+ else:
+ c_pem_key_certificate_pair.private_key = self._private_key
+ c_pem_key_certificate_pair.certificate_chain = self._certificate_chain
+ return grpc_ssl_credentials_create(
+ c_pem_root_certificates, &c_pem_key_certificate_pair, NULL)
+
+
+cdef class CompositeChannelCredentials(ChannelCredentials):
+
+ def __cinit__(self, call_credentialses, channel_credentials):
+ self._call_credentialses = call_credentialses
+ self._channel_credentials = channel_credentials
+
+ cdef grpc_channel_credentials *c(self):
+ cdef grpc_channel_credentials *c_channel_credentials
+ c_channel_credentials = self._channel_credentials.c()
+ cdef grpc_call_credentials *c_call_credentials_composition = _composition(
+ self._call_credentialses)
+ cdef grpc_channel_credentials *composition
+ c_composition = grpc_composite_channel_credentials_create(
+ c_channel_credentials, c_call_credentials_composition, NULL)
+ grpc_channel_credentials_release(c_channel_credentials)
+ grpc_call_credentials_release(c_call_credentials_composition)
+ return c_composition
cdef class ServerCertificateConfig:
@@ -89,190 +165,6 @@ cdef class ServerCredentials:
grpc_server_credentials_release(self.c_credentials)
grpc_shutdown()
-
-cdef class CredentialsMetadataPlugin:
-
- def __cinit__(self, object plugin_callback, bytes name):
- """
- Args:
- plugin_callback (callable): Callback accepting a service URL (str/bytes)
- and callback object (accepting a MetadataArray,
- grpc_status_code, and a str/bytes error message). This argument
- when called should be non-blocking and eventually call the callback
- object with the appropriate status code/details and metadata (if
- successful).
- name (bytes): Plugin name.
- """
- grpc_init()
- if not callable(plugin_callback):
- raise ValueError('expected callable plugin_callback')
- self.plugin_callback = plugin_callback
- self.plugin_name = name
-
- def __dealloc__(self):
- grpc_shutdown()
-
-
-cdef grpc_metadata_credentials_plugin _c_plugin(CredentialsMetadataPlugin plugin):
- cdef grpc_metadata_credentials_plugin c_plugin
- c_plugin.get_metadata = plugin_get_metadata
- c_plugin.destroy = plugin_destroy_c_plugin_state
- c_plugin.state = <void *>plugin
- c_plugin.type = plugin.plugin_name
- cpython.Py_INCREF(plugin)
- return c_plugin
-
-
-cdef class AuthMetadataContext:
-
- def __cinit__(self):
- grpc_init()
- self.context.service_url = NULL
- self.context.method_name = NULL
-
- @property
- def service_url(self):
- return self.context.service_url
-
- @property
- def method_name(self):
- return self.context.method_name
-
- def __dealloc__(self):
- grpc_shutdown()
-
-
-cdef int plugin_get_metadata(
- void *state, grpc_auth_metadata_context context,
- grpc_credentials_plugin_metadata_cb cb, void *user_data,
- grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
- size_t *num_creds_md, grpc_status_code *status,
- const char **error_details) with gil:
- called_flag = [False]
- def python_callback(
- Metadata metadata, grpc_status_code status,
- bytes error_details):
- cb(user_data, metadata.c_metadata, metadata.c_count, status, error_details)
- called_flag[0] = True
- cdef CredentialsMetadataPlugin self = <CredentialsMetadataPlugin>state
- cdef AuthMetadataContext cy_context = AuthMetadataContext()
- cy_context.context = context
- def async_callback():
- try:
- self.plugin_callback(cy_context, python_callback)
- except Exception as error:
- if not called_flag[0]:
- cb(user_data, NULL, 0, StatusCode.unknown,
- traceback.format_exc().encode())
- threading.Thread(group=None, target=async_callback).start()
- return 0 # Asynchronous return
-
-cdef void plugin_destroy_c_plugin_state(void *state) with gil:
- cpython.Py_DECREF(<CredentialsMetadataPlugin>state)
-
-def channel_credentials_google_default():
- cdef ChannelCredentials credentials = ChannelCredentials();
- with nogil:
- credentials.c_credentials = grpc_google_default_credentials_create()
- return credentials
-
-def channel_credentials_ssl(pem_root_certificates,
- SslPemKeyCertPair ssl_pem_key_cert_pair):
- pem_root_certificates = str_to_bytes(pem_root_certificates)
- cdef ChannelCredentials credentials = ChannelCredentials()
- cdef const char *c_pem_root_certificates = NULL
- if pem_root_certificates is not None:
- c_pem_root_certificates = pem_root_certificates
- credentials.references.append(pem_root_certificates)
- if ssl_pem_key_cert_pair is not None:
- with nogil:
- credentials.c_credentials = grpc_ssl_credentials_create(
- c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL)
- credentials.references.append(ssl_pem_key_cert_pair)
- else:
- with nogil:
- credentials.c_credentials = grpc_ssl_credentials_create(
- c_pem_root_certificates, NULL, NULL)
- return credentials
-
-def channel_credentials_composite(
- ChannelCredentials credentials_1 not None,
- CallCredentials credentials_2 not None):
- if not credentials_1.is_valid or not credentials_2.is_valid:
- raise ValueError("passed credentials must both be valid")
- cdef ChannelCredentials credentials = ChannelCredentials()
- with nogil:
- credentials.c_credentials = grpc_composite_channel_credentials_create(
- credentials_1.c_credentials, credentials_2.c_credentials, NULL)
- credentials.references.append(credentials_1)
- credentials.references.append(credentials_2)
- return credentials
-
-def call_credentials_composite(
- CallCredentials credentials_1 not None,
- CallCredentials credentials_2 not None):
- if not credentials_1.is_valid or not credentials_2.is_valid:
- raise ValueError("passed credentials must both be valid")
- cdef CallCredentials credentials = CallCredentials()
- with nogil:
- credentials.c_credentials = grpc_composite_call_credentials_create(
- credentials_1.c_credentials, credentials_2.c_credentials, NULL)
- credentials.references.append(credentials_1)
- credentials.references.append(credentials_2)
- return credentials
-
-def call_credentials_google_compute_engine():
- cdef CallCredentials credentials = CallCredentials()
- with nogil:
- credentials.c_credentials = (
- grpc_google_compute_engine_credentials_create(NULL))
- return credentials
-
-def call_credentials_service_account_jwt_access(
- json_key, Timespec token_lifetime not None):
- json_key = str_to_bytes(json_key)
- cdef CallCredentials credentials = CallCredentials()
- cdef char *json_key_c_string = json_key
- with nogil:
- credentials.c_credentials = (
- grpc_service_account_jwt_access_credentials_create(
- json_key_c_string, token_lifetime.c_time, NULL))
- credentials.references.append(json_key)
- return credentials
-
-def call_credentials_google_refresh_token(json_refresh_token):
- json_refresh_token = str_to_bytes(json_refresh_token)
- cdef CallCredentials credentials = CallCredentials()
- cdef char *json_refresh_token_c_string = json_refresh_token
- with nogil:
- credentials.c_credentials = grpc_google_refresh_token_credentials_create(
- json_refresh_token_c_string, NULL)
- credentials.references.append(json_refresh_token)
- return credentials
-
-def call_credentials_google_iam(authorization_token, authority_selector):
- authorization_token = str_to_bytes(authorization_token)
- authority_selector = str_to_bytes(authority_selector)
- cdef CallCredentials credentials = CallCredentials()
- cdef char *authorization_token_c_string = authorization_token
- cdef char *authority_selector_c_string = authority_selector
- with nogil:
- credentials.c_credentials = grpc_google_iam_credentials_create(
- authorization_token_c_string, authority_selector_c_string, NULL)
- credentials.references.append(authorization_token)
- credentials.references.append(authority_selector)
- return credentials
-
-def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin):
- cdef CallCredentials credentials = CallCredentials()
- cdef grpc_metadata_credentials_plugin c_plugin = _c_plugin(plugin)
- with nogil:
- credentials.c_credentials = (
- grpc_metadata_credentials_create_from_plugin(c_plugin, NULL))
- # TODO(atash): the following held reference is *probably* never necessary
- credentials.references.append(plugin)
- return credentials
-
cdef const char* _get_c_pem_root_certs(pem_root_certs):
if pem_root_certs is None:
return NULL
diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py
index bea2c0f139..cd17f4a049 100644
--- a/src/python/grpcio/grpc/_plugin_wrapping.py
+++ b/src/python/grpcio/grpc/_plugin_wrapping.py
@@ -13,6 +13,7 @@
# limitations under the License.
import collections
+import logging
import threading
import grpc
@@ -20,89 +21,79 @@ from grpc import _common
from grpc._cython import cygrpc
-class AuthMetadataContext(
+class _AuthMetadataContext(
collections.namedtuple('AuthMetadataContext', (
'service_url', 'method_name',)), grpc.AuthMetadataContext):
pass
-class AuthMetadataPluginCallback(grpc.AuthMetadataContext):
+class _CallbackState(object):
- def __init__(self, callback):
- self._callback = callback
-
- def __call__(self, metadata, error):
- self._callback(metadata, error)
+ def __init__(self):
+ self.lock = threading.Lock()
+ self.called = False
+ self.exception = None
-class _WrappedCygrpcCallback(object):
+class _AuthMetadataPluginCallback(grpc.AuthMetadataPluginCallback):
- def __init__(self, cygrpc_callback):
- self.is_called = False
- self.error = None
- self.is_called_lock = threading.Lock()
- self.cygrpc_callback = cygrpc_callback
-
- def _invoke_failure(self, error):
- # TODO(atash) translate different Exception superclasses into different
- # status codes.
- self.cygrpc_callback(_common.EMPTY_METADATA, cygrpc.StatusCode.internal,
- _common.encode(str(error)))
-
- def _invoke_success(self, metadata):
- try:
- cygrpc_metadata = _common.to_cygrpc_metadata(metadata)
- except Exception as exception: # pylint: disable=broad-except
- self._invoke_failure(exception)
- return
- self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, b'')
+ def __init__(self, state, callback):
+ self._state = state
+ self._callback = callback
def __call__(self, metadata, error):
- with self.is_called_lock:
- if self.is_called:
- raise RuntimeError('callback should only ever be invoked once')
- if self.error:
- self._invoke_failure(self.error)
- return
- self.is_called = True
+ with self._state.lock:
+ if self._state.exception is None:
+ if self._state.called:
+ raise RuntimeError(
+ 'AuthMetadataPluginCallback invoked more than once!')
+ else:
+ self._state.called = True
+ else:
+ raise RuntimeError(
+ 'AuthMetadataPluginCallback raised exception "{}"!'.format(
+ self._state.exception))
if error is None:
- self._invoke_success(metadata)
+ self._callback(
+ _common.to_cygrpc_metadata(metadata), cygrpc.StatusCode.ok,
+ None)
else:
- self._invoke_failure(error)
-
- def notify_failure(self, error):
- with self.is_called_lock:
- if not self.is_called:
- self.error = error
+ self._callback(None, cygrpc.StatusCode.internal,
+ _common.encode(str(error)))
-class _WrappedPlugin(object):
+class _Plugin(object):
- def __init__(self, plugin):
- self.plugin = plugin
+ def __init__(self, metadata_plugin):
+ self._metadata_plugin = metadata_plugin
- def __call__(self, context, cygrpc_callback):
- wrapped_cygrpc_callback = _WrappedCygrpcCallback(cygrpc_callback)
- wrapped_context = AuthMetadataContext(
- _common.decode(context.service_url),
- _common.decode(context.method_name))
+ def __call__(self, service_url, method_name, callback):
+ context = _AuthMetadataContext(
+ _common.decode(service_url), _common.decode(method_name))
+ callback_state = _CallbackState()
+ try:
+ self._metadata_plugin(
+ context, _AuthMetadataPluginCallback(callback_state, callback))
+ except Exception as exception: # pylint: disable=broad-except
+ logging.exception(
+ 'AuthMetadataPluginCallback "%s" raised exception!',
+ self._metadata_plugin)
+ with callback_state.lock:
+ callback_state.exception = exception
+ if callback_state.called:
+ return
+ callback(None, cygrpc.StatusCode.internal,
+ _common.encode(str(exception)))
+
+
+def metadata_plugin_call_credentials(metadata_plugin, name):
+ if name is None:
try:
- self.plugin(wrapped_context,
- AuthMetadataPluginCallback(wrapped_cygrpc_callback))
- except Exception as error:
- wrapped_cygrpc_callback.notify_failure(error)
- raise
-
-
-def call_credentials_metadata_plugin(plugin, name):
- """
- Args:
- plugin: A callable accepting a grpc.AuthMetadataContext
- object and a callback (itself accepting a list of metadata key/value
- 2-tuples and a None-able exception value). The callback must be eventually
- called, but need not be called in plugin's invocation.
- plugin's invocation must be non-blocking.
- """
- return cygrpc.call_credentials_metadata_plugin(
- cygrpc.CredentialsMetadataPlugin(
- _WrappedPlugin(plugin), _common.encode(name)))
+ effective_name = metadata_plugin.__name__
+ except AttributeError:
+ effective_name = metadata_plugin.__class__.__name__
+ else:
+ effective_name = name
+ return grpc.CallCredentials(
+ cygrpc.MetadataPluginCallCredentials(
+ _Plugin(metadata_plugin), _common.encode(effective_name)))
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index cd59b07c04..5b4812bffe 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -374,10 +374,10 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
context = _Context(rpc_event, state, request_deserializer)
try:
return behavior(argument, context), True
- except Exception as e: # pylint: disable=broad-except
+ except Exception as exception: # pylint: disable=broad-except
with state.condition:
- if e not in state.rpc_errors:
- details = 'Exception calling application: {}'.format(e)
+ if exception not in state.rpc_errors:
+ details = 'Exception calling application: {}'.format(exception)
logging.exception(details)
_abort(state, rpc_event.operation_call,
cygrpc.StatusCode.unknown, _common.encode(details))
@@ -389,10 +389,10 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator):
return next(response_iterator), True
except StopIteration:
return None, True
- except Exception as e: # pylint: disable=broad-except
+ except Exception as exception: # pylint: disable=broad-except
with state.condition:
- if e not in state.rpc_errors:
- details = 'Exception iterating responses: {}'.format(e)
+ if exception not in state.rpc_errors:
+ details = 'Exception iterating responses: {}'.format(exception)
logging.exception(details)
_abort(state, rpc_event.operation_call,
cygrpc.StatusCode.unknown, _common.encode(details))
@@ -591,7 +591,13 @@ def _handle_call(rpc_event, generic_handlers, thread_pool,
if not rpc_event.success:
return None, None
if rpc_event.request_call_details.method is not None:
- method_handler = _find_method_handler(rpc_event, generic_handlers)
+ try:
+ method_handler = _find_method_handler(rpc_event, generic_handlers)
+ except Exception as exception: # pylint: disable=broad-except
+ details = 'Exception servicing handler: {}'.format(exception)
+ logging.exception(details)
+ return _reject_rpc(rpc_event, cygrpc.StatusCode.unknown,
+ b'Error in service handler!'), None
if method_handler is None:
return _reject_rpc(rpc_event, cygrpc.StatusCode.unimplemented,
b'Method not found!'), None
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index e277a3ea1d..34cbade92c 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -22,7 +22,7 @@
"unit._api_test.ChannelConnectivityTest",
"unit._api_test.ChannelTest",
"unit._auth_context_test.AuthContextTest",
- "unit._auth_test.AccessTokenCallCredentialsTest",
+ "unit._auth_test.AccessTokenAuthMetadataPluginTest",
"unit._auth_test.GoogleCallCredentialsTest",
"unit._channel_args_test.ChannelArgsTest",
"unit._channel_connectivity_test.ChannelConnectivityTest",
diff --git a/src/python/grpcio_tests/tests/unit/_auth_test.py b/src/python/grpcio_tests/tests/unit/_auth_test.py
index f61951b80a..e2cb938936 100644
--- a/src/python/grpcio_tests/tests/unit/_auth_test.py
+++ b/src/python/grpcio_tests/tests/unit/_auth_test.py
@@ -61,7 +61,7 @@ class GoogleCallCredentialsTest(unittest.TestCase):
self.assertTrue(callback_event.wait(1.0))
-class AccessTokenCallCredentialsTest(unittest.TestCase):
+class AccessTokenAuthMetadataPluginTest(unittest.TestCase):
def test_google_call_credentials_success(self):
callback_event = threading.Event()
@@ -71,8 +71,8 @@ class AccessTokenCallCredentialsTest(unittest.TestCase):
self.assertIsNone(error)
callback_event.set()
- call_creds = _auth.AccessTokenCallCredentials('token')
- call_creds(None, mock_callback)
+ metadata_plugin = _auth.AccessTokenAuthMetadataPlugin('token')
+ metadata_plugin(None, mock_callback)
self.assertTrue(callback_event.wait(1.0))
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
index 18d4a6df64..da94cf8028 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -28,7 +28,7 @@ _CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value'
_EMPTY_FLAGS = 0
-def _metadata_plugin_callback(context, callback):
+def _metadata_plugin(context, callback):
callback(
cygrpc.Metadata([
cygrpc.Metadatum(_CALL_CREDENTIALS_METADATA_KEY,
@@ -105,17 +105,9 @@ class TypeSmokeTest(unittest.TestCase):
channel = cygrpc.Channel(b'[::]:0', cygrpc.ChannelArgs([]))
del channel
- def testCredentialsMetadataPluginUpDown(self):
- plugin = cygrpc.CredentialsMetadataPlugin(
- lambda ignored_a, ignored_b: None, b'')
- del plugin
-
- def testCallCredentialsFromPluginUpDown(self):
- plugin = cygrpc.CredentialsMetadataPlugin(_metadata_plugin_callback,
- b'')
- call_credentials = cygrpc.call_credentials_metadata_plugin(plugin)
- del plugin
- del call_credentials
+ def test_metadata_plugin_call_credentials_up_down(self):
+ cygrpc.MetadataPluginCallCredentials(_metadata_plugin,
+ b'test plugin name!')
def testServerStartNoExplicitShutdown(self):
server = cygrpc.Server(cygrpc.ChannelArgs([]))
@@ -205,7 +197,7 @@ class ServerClientMixin(object):
return test_utilities.SimpleFuture(performer)
- def testEcho(self):
+ def test_echo(self):
DEADLINE = time.time() + 5
DEADLINE_TOLERANCE = 0.25
CLIENT_METADATA_ASCII_KEY = b'key'
@@ -439,8 +431,8 @@ class SecureServerSecureClient(unittest.TestCase, ServerClientMixin):
cygrpc.SslPemKeyCertPair(resources.private_key(),
resources.certificate_chain())
], False)
- client_credentials = cygrpc.channel_credentials_ssl(
- resources.test_root_certificates(), None)
+ client_credentials = cygrpc.SSLChannelCredentials(
+ resources.test_root_certificates(), None, None)
self.setUpMixin(server_credentials, client_credentials,
_SSL_HOST_OVERRIDE)
diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
index 0a1e50c94c..2a1a49ce74 100644
--- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
+++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
@@ -32,6 +32,7 @@ _UNARY_UNARY = '/test/UnaryUnary'
_UNARY_STREAM = '/test/UnaryStream'
_STREAM_UNARY = '/test/StreamUnary'
_STREAM_STREAM = '/test/StreamStream'
+_DEFECTIVE_GENERIC_RPC_HANDLER = '/test/DefectiveGenericRpcHandler'
class _Callback(object):
@@ -95,6 +96,9 @@ class _Handler(object):
yield request
self._control.control()
+ def defective_generic_rpc_handler(self):
+ raise test_control.Defect()
+
class _MethodHandler(grpc.RpcMethodHandler):
@@ -132,6 +136,8 @@ class _GenericHandler(grpc.GenericRpcHandler):
elif handler_call_details.method == _STREAM_STREAM:
return _MethodHandler(True, True, None, None, None, None, None,
self._handler.handle_stream_stream)
+ elif handler_call_details.method == _DEFECTIVE_GENERIC_RPC_HANDLER:
+ return self._handler.defective_generic_rpc_handler()
else:
return None
@@ -176,6 +182,10 @@ def _stream_stream_multi_callable(channel):
return channel.stream_stream(_STREAM_STREAM)
+def _defective_handler_multi_callable(channel):
+ return channel.unary_unary(_DEFECTIVE_GENERIC_RPC_HANDLER)
+
+
class InvocationDefectsTest(unittest.TestCase):
def setUp(self):
@@ -235,6 +245,18 @@ class InvocationDefectsTest(unittest.TestCase):
for _ in range(test_constants.STREAM_LENGTH // 2 + 1):
next(response_iterator)
+ def testDefectiveGenericRpcHandlerUnaryResponse(self):
+ request = b'\x07\x08'
+ multi_callable = _defective_handler_multi_callable(self._channel)
+
+ with self.assertRaises(grpc.RpcError) as exception_context:
+ response = multi_callable(
+ request,
+ metadata=(('test', 'DefectiveGenericRpcHandlerUnary'),))
+
+ self.assertIs(grpc.StatusCode.UNKNOWN,
+ exception_context.exception.code())
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc
index f63a32ea5b..ccf35444d0 100644
--- a/test/core/bad_client/bad_client.cc
+++ b/test/core/bad_client/bad_client.cc
@@ -114,9 +114,9 @@ void grpc_run_bad_client_test(
GRPC_BAD_CLIENT_REGISTERED_HOST,
GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, 0);
grpc_server_start(a.server);
- transport = grpc_create_chttp2_transport(nullptr, sfd.server, 0);
+ transport = grpc_create_chttp2_transport(nullptr, sfd.server, false);
server_setup_transport(&a, transport);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
/* Bind everything into the same pollset */
grpc_endpoint_add_to_pollset(sfd.client, grpc_cq_pollset(a.cq));
diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.cc b/test/core/end2end/fixtures/h2_sockpair+trace.cc
index 447bb91d0c..9807e929af 100644
--- a/test/core/end2end/fixtures/h2_sockpair+trace.cc
+++ b/test/core/end2end/fixtures/h2_sockpair+trace.cc
@@ -93,10 +93,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- transport = grpc_create_chttp2_transport(client_args, sfd->client, 1);
+ transport = grpc_create_chttp2_transport(client_args, sfd->client, true);
client_setup_transport(&cs, transport);
GPR_ASSERT(f->client);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
@@ -108,9 +108,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
f->server = grpc_server_create(server_args, nullptr);
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_start(f->server);
- transport = grpc_create_chttp2_transport(server_args, sfd->server, 0);
+ transport = grpc_create_chttp2_transport(server_args, sfd->server, false);
server_setup_transport(f, transport);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) {
diff --git a/test/core/end2end/fixtures/h2_sockpair.cc b/test/core/end2end/fixtures/h2_sockpair.cc
index c1f8ee0b64..b68279fd71 100644
--- a/test/core/end2end/fixtures/h2_sockpair.cc
+++ b/test/core/end2end/fixtures/h2_sockpair.cc
@@ -87,10 +87,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- transport = grpc_create_chttp2_transport(client_args, sfd->client, 1);
+ transport = grpc_create_chttp2_transport(client_args, sfd->client, true);
client_setup_transport(&cs, transport);
GPR_ASSERT(f->client);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
@@ -102,9 +102,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
f->server = grpc_server_create(server_args, nullptr);
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_start(f->server);
- transport = grpc_create_chttp2_transport(server_args, sfd->server, 0);
+ transport = grpc_create_chttp2_transport(server_args, sfd->server, false);
server_setup_transport(f, transport);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) {
diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.cc b/test/core/end2end/fixtures/h2_sockpair_1byte.cc
index 55b847bdae..350be138ca 100644
--- a/test/core/end2end/fixtures/h2_sockpair_1byte.cc
+++ b/test/core/end2end/fixtures/h2_sockpair_1byte.cc
@@ -98,10 +98,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- transport = grpc_create_chttp2_transport(client_args, sfd->client, 1);
+ transport = grpc_create_chttp2_transport(client_args, sfd->client, true);
client_setup_transport(&cs, transport);
GPR_ASSERT(f->client);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
@@ -113,9 +113,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
f->server = grpc_server_create(server_args, nullptr);
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_start(f->server);
- transport = grpc_create_chttp2_transport(server_args, sfd->server, 0);
+ transport = grpc_create_chttp2_transport(server_args, sfd->server, false);
server_setup_transport(f, transport);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) {
diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc
index 539bbe4134..15c9dd4847 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.cc
+++ b/test/core/end2end/fixtures/http_proxy_fixture.cc
@@ -68,6 +68,9 @@ struct grpc_end2end_http_proxy {
// Connection handling
//
+// proxy_connection structure is only accessed in the closures which are all
+// scheduled under the same combiner lock. So there is is no need for a mutex to
+// protect this structure.
typedef struct proxy_connection {
grpc_end2end_http_proxy* proxy;
@@ -78,6 +81,8 @@ typedef struct proxy_connection {
grpc_pollset_set* pollset_set;
+ // NOTE: All the closures execute under proxy->combiner lock. Which means
+ // there will not be any data-races between the closures
grpc_closure on_read_request_done;
grpc_closure on_server_connect_done;
grpc_closure on_write_response_done;
@@ -86,6 +91,13 @@ typedef struct proxy_connection {
grpc_closure on_server_read_done;
grpc_closure on_server_write_done;
+ bool client_read_failed : 1;
+ bool client_write_failed : 1;
+ bool client_shutdown : 1;
+ bool server_read_failed : 1;
+ bool server_write_failed : 1;
+ bool server_shutdown : 1;
+
grpc_slice_buffer client_read_buffer;
grpc_slice_buffer client_deferred_write_buffer;
bool client_is_writing;
@@ -126,18 +138,50 @@ static void proxy_connection_unref(proxy_connection* conn, const char* reason) {
}
}
+enum failure_type {
+ SETUP_FAILED, // To be used before we start proxying.
+ CLIENT_READ_FAILED,
+ CLIENT_WRITE_FAILED,
+ SERVER_READ_FAILED,
+ SERVER_WRITE_FAILED,
+};
+
// Helper function to shut down the proxy connection.
-// Does NOT take ownership of a reference to error.
-static void proxy_connection_failed(proxy_connection* conn, bool is_client,
- const char* prefix, grpc_error* error) {
- const char* msg = grpc_error_string(error);
- gpr_log(GPR_INFO, "%s: %s", prefix, msg);
-
- grpc_endpoint_shutdown(conn->client_endpoint, GRPC_ERROR_REF(error));
- if (conn->server_endpoint != nullptr) {
+static void proxy_connection_failed(proxy_connection* conn,
+ failure_type failure, const char* prefix,
+ grpc_error* error) {
+ gpr_log(GPR_INFO, "%s: %s", prefix, grpc_error_string(error));
+ // Decide whether we should shut down the client and server.
+ bool shutdown_client = false;
+ bool shutdown_server = false;
+ if (failure == SETUP_FAILED) {
+ shutdown_client = true;
+ shutdown_server = true;
+ } else {
+ if ((failure == CLIENT_READ_FAILED && conn->client_write_failed) ||
+ (failure == CLIENT_WRITE_FAILED && conn->client_read_failed) ||
+ (failure == SERVER_READ_FAILED && !conn->client_is_writing)) {
+ shutdown_client = true;
+ }
+ if ((failure == SERVER_READ_FAILED && conn->server_write_failed) ||
+ (failure == SERVER_WRITE_FAILED && conn->server_read_failed) ||
+ (failure == CLIENT_READ_FAILED && !conn->server_is_writing)) {
+ shutdown_server = true;
+ }
+ }
+ // If we decided to shut down either one and have not yet done so, do so.
+ if (shutdown_client && !conn->client_shutdown) {
+ grpc_endpoint_shutdown(conn->client_endpoint, GRPC_ERROR_REF(error));
+ conn->client_shutdown = true;
+ }
+ if (shutdown_server && !conn->server_shutdown &&
+ (conn->server_endpoint != nullptr)) {
grpc_endpoint_shutdown(conn->server_endpoint, GRPC_ERROR_REF(error));
+ conn->server_shutdown = true;
}
+ // Unref the connection.
proxy_connection_unref(conn, "conn_failed");
+ GRPC_ERROR_UNREF(error);
}
// Callback for writing proxy data to the client.
@@ -145,8 +189,8 @@ static void on_client_write_done(void* arg, grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
conn->client_is_writing = false;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy client write", error);
+ proxy_connection_failed(conn, CLIENT_WRITE_FAILED,
+ "HTTP proxy client write", GRPC_ERROR_REF(error));
return;
}
// Clear write buffer (the data we just wrote).
@@ -170,8 +214,8 @@ static void on_server_write_done(void* arg, grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
conn->server_is_writing = false;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, false /* is_client */,
- "HTTP proxy server write", error);
+ proxy_connection_failed(conn, SERVER_WRITE_FAILED,
+ "HTTP proxy server write", GRPC_ERROR_REF(error));
return;
}
// Clear write buffer (the data we just wrote).
@@ -195,8 +239,8 @@ static void on_server_write_done(void* arg, grpc_error* error) {
static void on_client_read_done(void* arg, grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy client read", error);
+ proxy_connection_failed(conn, CLIENT_READ_FAILED, "HTTP proxy client read",
+ GRPC_ERROR_REF(error));
return;
}
// If there is already a pending write (i.e., server_write_buffer is
@@ -226,8 +270,8 @@ static void on_client_read_done(void* arg, grpc_error* error) {
static void on_server_read_done(void* arg, grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, false /* is_client */,
- "HTTP proxy server read", error);
+ proxy_connection_failed(conn, SERVER_READ_FAILED, "HTTP proxy server read",
+ GRPC_ERROR_REF(error));
return;
}
// If there is already a pending write (i.e., client_write_buffer is
@@ -257,8 +301,8 @@ static void on_write_response_done(void* arg, grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
conn->client_is_writing = false;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy write response", error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy write response",
+ GRPC_ERROR_REF(error));
return;
}
// Clear write buffer.
@@ -285,8 +329,8 @@ static void on_server_connect_done(void* arg, grpc_error* error) {
// connection failed. However, for the purposes of this test code,
// it's fine to pretend this is a client-side error, which will
// cause the client connection to be dropped.
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy server connect", error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy server connect",
+ GRPC_ERROR_REF(error));
return;
}
// We've established a connection, so send back a 200 response code to
@@ -331,8 +375,8 @@ static void on_read_request_done(void* arg, grpc_error* error) {
gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn,
grpc_error_string(error));
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy read request", error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request",
+ GRPC_ERROR_REF(error));
return;
}
// Read request and feed it to the parser.
@@ -341,8 +385,8 @@ static void on_read_request_done(void* arg, grpc_error* error) {
error = grpc_http_parser_parse(
&conn->http_parser, conn->client_read_buffer.slices[i], nullptr);
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy request parse", error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy request parse",
+ GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
return;
}
@@ -362,8 +406,8 @@ static void on_read_request_done(void* arg, grpc_error* error) {
conn->http_request.method);
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy read request", error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request",
+ GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
return;
}
@@ -382,8 +426,8 @@ static void on_read_request_done(void* arg, grpc_error* error) {
if (!client_authenticated) {
const char* msg = "HTTP Connect could not verify authentication";
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg);
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy read request", error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request",
+ GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
return;
}
@@ -393,8 +437,8 @@ static void on_read_request_done(void* arg, grpc_error* error) {
error = grpc_blocking_resolve_address(conn->http_request.path, "80",
&resolved_addresses);
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, true /* is_client */, "HTTP proxy DNS lookup",
- error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy DNS lookup",
+ GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
return;
}
diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc
index 76e716281a..967a6d560f 100644
--- a/test/core/end2end/fuzzers/api_fuzzer.cc
+++ b/test/core/end2end/fuzzers/api_fuzzer.cc
@@ -468,9 +468,9 @@ static void do_connect(void* arg, grpc_error* error) {
*fc->ep = client;
grpc_transport* transport =
- grpc_create_chttp2_transport(nullptr, server, 0);
+ grpc_create_chttp2_transport(nullptr, server, false);
grpc_server_setup_transport(g_server, transport, nullptr, nullptr);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
GRPC_CLOSURE_SCHED(fc->closure, GRPC_ERROR_NONE);
} else {
diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc
index 2d3fc7a074..c17d581d8b 100644
--- a/test/core/end2end/fuzzers/client_fuzzer.cc
+++ b/test/core/end2end/fuzzers/client_fuzzer.cc
@@ -55,8 +55,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
grpc_transport* transport =
- grpc_create_chttp2_transport(nullptr, mock_endpoint, 1);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_create_chttp2_transport(nullptr, mock_endpoint, true);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_channel* channel = grpc_channel_create(
"test-target", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, transport);
diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc
index db25160e2b..61c55e0afd 100644
--- a/test/core/end2end/fuzzers/server_fuzzer.cc
+++ b/test/core/end2end/fuzzers/server_fuzzer.cc
@@ -61,9 +61,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
// grpc_server_register_method(server, "/reg", NULL, 0);
grpc_server_start(server);
grpc_transport* transport =
- grpc_create_chttp2_transport(nullptr, mock_endpoint, 0);
+ grpc_create_chttp2_transport(nullptr, mock_endpoint, false);
grpc_server_setup_transport(server, transport, nullptr, nullptr);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_call* call1 = nullptr;
grpc_call_details call_details1;
diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc
index 7f662f0683..0deb534abd 100644
--- a/test/core/iomgr/udp_server_test.cc
+++ b/test/core/iomgr/udp_server_test.cc
@@ -50,7 +50,7 @@ static int g_number_of_writes = 0;
static int g_number_of_bytes_read = 0;
static int g_number_of_orphan_calls = 0;
-static void on_read(grpc_fd* emfd, void* user_data) {
+static bool on_read(grpc_fd* emfd, void* user_data) {
char read_buffer[512];
ssize_t byte_count;
@@ -64,9 +64,11 @@ static void on_read(grpc_fd* emfd, void* user_data) {
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
+ return false;
}
-static void on_write(grpc_fd* emfd, void* user_data) {
+static void on_write(grpc_fd* emfd, void* user_data,
+ grpc_closure* notify_on_write_closure) {
gpr_mu_lock(g_mu);
g_number_of_writes++;
@@ -79,6 +81,7 @@ static void on_fd_orphaned(grpc_fd* emfd, grpc_closure* closure,
void* user_data) {
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
grpc_fd_wrapped_fd(emfd));
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
g_number_of_orphan_calls++;
}
@@ -222,7 +225,6 @@ static void test_receive(int number_of_clients) {
int clifd, svrfd;
grpc_udp_server* s = grpc_udp_server_create(nullptr);
int i;
- int number_of_reads_before;
grpc_millis deadline;
grpc_pollset* pollsets[1];
LOG_TEST("test_receive");
@@ -252,14 +254,14 @@ static void test_receive(int number_of_clients) {
deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
- number_of_reads_before = g_number_of_reads;
+ int number_of_bytes_read_before = g_number_of_bytes_read;
/* Create a socket, send a packet to the UDP server. */
clifd = socket(addr->ss_family, SOCK_DGRAM, 0);
GPR_ASSERT(clifd >= 0);
GPR_ASSERT(connect(clifd, (struct sockaddr*)addr,
(socklen_t)resolved_addr.len) == 0);
GPR_ASSERT(5 == write(clifd, "hello", 5));
- while (g_number_of_reads == number_of_reads_before &&
+ while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) &&
deadline > grpc_core::ExecCtx::Get()->Now()) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
@@ -268,7 +270,6 @@ static void test_receive(int number_of_clients) {
grpc_core::ExecCtx::Get()->Flush();
gpr_mu_lock(g_mu);
}
- GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1);
close(clifd);
}
GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients);
@@ -280,9 +281,6 @@ static void test_receive(int number_of_clients) {
/* The server had a single FD, which is orphaned exactly once in *
* grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
-
- /* The write callback should have fired a few times. */
- GPR_ASSERT(g_number_of_writes > 0);
}
static void destroy_pollset(void* p, grpc_error* error) {
diff --git a/test/core/security/ssl_server_fuzzer.cc b/test/core/security/ssl_server_fuzzer.cc
index 7056cfe581..6e30698562 100644
--- a/test/core/security/ssl_server_fuzzer.cc
+++ b/test/core/security/ssl_server_fuzzer.cc
@@ -93,8 +93,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create();
grpc_server_security_connector_add_handshakers(sc, handshake_mgr);
grpc_handshake_manager_do_handshake(
- handshake_mgr, mock_endpoint, nullptr /* channel_args */, deadline,
- nullptr /* acceptor */, on_handshake_done, &state);
+ handshake_mgr, nullptr /* interested_parties */, mock_endpoint,
+ nullptr /* channel_args */, deadline, nullptr /* acceptor */,
+ on_handshake_done, &state);
grpc_core::ExecCtx::Get()->Flush();
// If the given string happens to be part of the correct client hello, the
diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD
index 1ea7d0341d..6eff716b01 100644
--- a/test/core/transport/chttp2/BUILD
+++ b/test/core/transport/chttp2/BUILD
@@ -115,6 +115,21 @@ grpc_cc_test(
)
grpc_cc_test(
+ name = "settings_timeout_test",
+ srcs = ["settings_timeout_test.cc"],
+ language = "C++",
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
+ external_deps = [
+ "gtest",
+ ],
+)
+
+grpc_cc_test(
name = "varint_test",
srcs = ["varint_test.cc"],
language = "C++",
diff --git a/test/core/transport/chttp2/settings_timeout_test.cc b/test/core/transport/chttp2/settings_timeout_test.cc
new file mode 100644
index 0000000000..08473c72b6
--- /dev/null
+++ b/test/core/transport/chttp2/settings_timeout_test.cc
@@ -0,0 +1,253 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include <memory>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/slice/slice_internal.h"
+
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+
+namespace grpc_core {
+namespace test {
+namespace {
+
+// A gRPC server, running in its own thread.
+class ServerThread {
+ public:
+ explicit ServerThread(const char* address) : address_(address) {}
+
+ void Start() {
+ // Start server with 1-second handshake timeout.
+ grpc_arg arg;
+ arg.type = GRPC_ARG_INTEGER;
+ arg.key = const_cast<char*>(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS);
+ arg.value.integer = 1000;
+ grpc_channel_args args = {1, &arg};
+ server_ = grpc_server_create(&args, nullptr);
+ ASSERT_TRUE(grpc_server_add_insecure_http2_port(server_, address_));
+ cq_ = grpc_completion_queue_create_for_next(nullptr);
+ grpc_server_register_completion_queue(server_, cq_, nullptr);
+ grpc_server_start(server_);
+ thread_.reset(new std::thread(std::bind(&ServerThread::Serve, this)));
+ }
+
+ void Shutdown() {
+ grpc_completion_queue* shutdown_cq =
+ grpc_completion_queue_create_for_pluck(nullptr);
+ grpc_server_shutdown_and_notify(server_, shutdown_cq, nullptr);
+ GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, nullptr,
+ grpc_timeout_seconds_to_deadline(1),
+ nullptr)
+ .type == GRPC_OP_COMPLETE);
+ grpc_completion_queue_destroy(shutdown_cq);
+ grpc_server_destroy(server_);
+ grpc_completion_queue_destroy(cq_);
+ thread_->join();
+ }
+
+ private:
+ void Serve() {
+ // The completion queue should not return anything other than shutdown.
+ grpc_event ev = grpc_completion_queue_next(
+ cq_, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
+ ASSERT_EQ(GRPC_QUEUE_SHUTDOWN, ev.type);
+ }
+
+ const char* address_; // Do not own.
+ grpc_server* server_ = nullptr;
+ grpc_completion_queue* cq_ = nullptr;
+ std::unique_ptr<std::thread> thread_;
+};
+
+// A TCP client that connects to the server, reads data until the server
+// closes, and then terminates.
+class Client {
+ public:
+ explicit Client(const char* server_address)
+ : server_address_(server_address) {}
+
+ void Connect() {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_resolved_addresses* server_addresses = nullptr;
+ grpc_error* error =
+ grpc_blocking_resolve_address(server_address_, "80", &server_addresses);
+ ASSERT_EQ(GRPC_ERROR_NONE, error) << grpc_error_string(error);
+ ASSERT_GE(server_addresses->naddrs, 1UL);
+ pollset_ = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
+ grpc_pollset_init(pollset_, &mu_);
+ grpc_pollset_set* pollset_set = grpc_pollset_set_create();
+ grpc_pollset_set_add_pollset(pollset_set, pollset_);
+ EventState state;
+ grpc_tcp_client_connect(state.closure(), &endpoint_, pollset_set,
+ nullptr /* channel_args */, server_addresses->addrs,
+ 1000);
+ ASSERT_TRUE(PollUntilDone(
+ &state,
+ grpc_timespec_to_millis_round_up(gpr_inf_future(GPR_CLOCK_MONOTONIC))));
+ ASSERT_EQ(GRPC_ERROR_NONE, state.error());
+ grpc_pollset_set_destroy(pollset_set);
+ grpc_endpoint_add_to_pollset(endpoint_, pollset_);
+ grpc_resolved_addresses_destroy(server_addresses);
+ }
+
+ // Reads until an error is returned.
+ // Returns true if an error was encountered before the deadline.
+ bool ReadUntilError() {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_slice_buffer read_buffer;
+ grpc_slice_buffer_init(&read_buffer);
+ bool retval = true;
+ // Use a deadline of 3 seconds, which is a lot more than we should
+ // need for a 1-second timeout, but this helps avoid flakes.
+ grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 3000;
+ while (true) {
+ EventState state;
+ grpc_endpoint_read(endpoint_, &read_buffer, state.closure());
+ if (!PollUntilDone(&state, deadline)) {
+ retval = false;
+ break;
+ }
+ if (state.error() != GRPC_ERROR_NONE) break;
+ gpr_log(GPR_INFO, "client read %" PRIuPTR " bytes", read_buffer.length);
+ grpc_slice_buffer_reset_and_unref_internal(&read_buffer);
+ }
+ grpc_endpoint_shutdown(endpoint_,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown"));
+ grpc_slice_buffer_destroy_internal(&read_buffer);
+ return retval;
+ }
+
+ void Shutdown() {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_endpoint_destroy(endpoint_);
+ grpc_pollset_shutdown(pollset_,
+ GRPC_CLOSURE_CREATE(&Client::PollsetDestroy, pollset_,
+ grpc_schedule_on_exec_ctx));
+ }
+
+ private:
+ // State used to wait for an I/O event.
+ class EventState {
+ public:
+ EventState() {
+ GRPC_CLOSURE_INIT(&closure_, &EventState::OnEventDone, this,
+ grpc_schedule_on_exec_ctx);
+ }
+
+ ~EventState() { GRPC_ERROR_UNREF(error_); }
+
+ grpc_closure* closure() { return &closure_; }
+
+ bool done() const { return done_; }
+
+ // Caller does NOT take ownership of the error.
+ grpc_error* error() const { return error_; }
+
+ private:
+ static void OnEventDone(void* arg, grpc_error* error) {
+ gpr_log(GPR_INFO, "OnEventDone(): %s", grpc_error_string(error));
+ EventState* state = (EventState*)arg;
+ state->error_ = GRPC_ERROR_REF(error);
+ state->done_ = true;
+ }
+
+ grpc_closure closure_;
+ bool done_ = false;
+ grpc_error* error_ = GRPC_ERROR_NONE;
+ };
+
+ // Returns true if done, or false if deadline exceeded.
+ bool PollUntilDone(EventState* state, grpc_millis deadline) {
+ while (true) {
+ grpc_pollset_worker* worker = nullptr;
+ gpr_mu_lock(mu_);
+ GRPC_LOG_IF_ERROR(
+ "grpc_pollset_work",
+ grpc_pollset_work(pollset_, &worker,
+ grpc_core::ExecCtx::Get()->Now() + 1000));
+ gpr_mu_unlock(mu_);
+ if (state != nullptr && state->done()) return true;
+ if (grpc_core::ExecCtx::Get()->Now() >= deadline) return false;
+ }
+ }
+
+ static void PollsetDestroy(void* arg, grpc_error* error) {
+ grpc_pollset* pollset = (grpc_pollset*)arg;
+ grpc_pollset_destroy(pollset);
+ gpr_free(pollset);
+ }
+
+ const char* server_address_; // Do not own.
+ grpc_endpoint* endpoint_;
+ gpr_mu* mu_;
+ grpc_pollset* pollset_;
+};
+
+TEST(SettingsTimeout, Basic) {
+ // Construct server address string.
+ const int server_port = grpc_pick_unused_port_or_die();
+ char* server_address_string;
+ gpr_asprintf(&server_address_string, "localhost:%d", server_port);
+ // Start server.
+ gpr_log(GPR_INFO, "starting server on %s", server_address_string);
+ ServerThread server_thread(server_address_string);
+ server_thread.Start();
+ // Create client and connect to server.
+ gpr_log(GPR_INFO, "starting client connect");
+ Client client(server_address_string);
+ client.Connect();
+ // Client read. Should fail due to server dropping connection.
+ gpr_log(GPR_INFO, "starting client read");
+ EXPECT_TRUE(client.ReadUntilError());
+ // Shut down client.
+ gpr_log(GPR_INFO, "shutting down client");
+ client.Shutdown();
+ // Shut down server.
+ gpr_log(GPR_INFO, "shutting down server");
+ server_thread.Shutdown();
+ // Clean up.
+ gpr_free(server_address_string);
+}
+
+} // namespace
+} // namespace test
+} // namespace grpc_core
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ grpc_test_init(argc, argv);
+ grpc_init();
+ int result = RUN_ALL_TESTS();
+ grpc_shutdown();
+ return result;
+}
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 48cdd433b2..d4ee6b429f 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -353,11 +353,6 @@ class GrpclbEnd2endTest : public ::testing::Test {
"balancer", server_host_, balancers_.back().get()));
}
ResetStub();
- std::vector<AddressData> addresses;
- for (size_t i = 0; i < balancer_servers_.size(); ++i) {
- addresses.emplace_back(AddressData{balancer_servers_[i].port_, true, ""});
- }
- SetNextResolution(addresses);
}
void TearDown() override {
@@ -370,6 +365,14 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpc_fake_resolver_response_generator_unref(response_generator_);
}
+ void SetNextResolutionAllBalancers() {
+ std::vector<AddressData> addresses;
+ for (size_t i = 0; i < balancer_servers_.size(); ++i) {
+ addresses.emplace_back(AddressData{balancer_servers_[i].port_, true, ""});
+ }
+ SetNextResolution(addresses);
+ }
+
void ResetStub(int fallback_timeout = 0) {
ChannelArguments args;
args.SetGrpclbFallbackTimeout(fallback_timeout);
@@ -580,6 +583,7 @@ class SingleBalancerTest : public GrpclbEnd2endTest {
};
TEST_F(SingleBalancerTest, Vanilla) {
+ SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
@@ -607,6 +611,7 @@ TEST_F(SingleBalancerTest, Vanilla) {
}
TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
+ SetNextResolutionAllBalancers();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const int kCallDeadlineMs = 1000 * grpc_test_slowdown_factor();
@@ -644,6 +649,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
}
TEST_F(SingleBalancerTest, Fallback) {
+ SetNextResolutionAllBalancers();
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const size_t kNumBackendInResolution = backends_.size() / 2;
@@ -710,6 +716,7 @@ TEST_F(SingleBalancerTest, Fallback) {
}
TEST_F(SingleBalancerTest, FallbackUpdate) {
+ SetNextResolutionAllBalancers();
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const size_t kNumBackendInResolution = backends_.size() / 3;
@@ -817,6 +824,7 @@ TEST_F(SingleBalancerTest, FallbackUpdate) {
}
TEST_F(SingleBalancerTest, BackendsRestart) {
+ SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
@@ -856,6 +864,7 @@ class UpdatesTest : public GrpclbEnd2endTest {
};
TEST_F(UpdatesTest, UpdateBalancers) {
+ SetNextResolutionAllBalancers();
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[1]};
ScheduleResponseForBalancer(
@@ -918,6 +927,7 @@ TEST_F(UpdatesTest, UpdateBalancers) {
// verify that the LB channel inside grpclb keeps the initial connection (which
// by definition is also present in the update).
TEST_F(UpdatesTest, UpdateBalancersRepeated) {
+ SetNextResolutionAllBalancers();
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[0]};
@@ -988,6 +998,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
}
TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
+ std::vector<AddressData> addresses;
+ addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
+ SetNextResolution(addresses);
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[1]};
@@ -1029,7 +1042,7 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
- std::vector<AddressData> addresses;
+ addresses.clear();
addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
SetNextResolution(addresses);
@@ -1054,8 +1067,14 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
balancers_[2]->NotifyDoneWithServerlists();
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
- EXPECT_EQ(1U, balancer_servers_[1].service_->request_count());
- EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
+ // The second balancer, published as part of the first update, may end up
+ // getting two requests (that is, 1 <= #req <= 2) if the LB call retry timer
+ // firing races with the arrival of the update containing the second
+ // balancer.
+ EXPECT_GE(balancer_servers_[1].service_->request_count(), 1U);
+ EXPECT_GE(balancer_servers_[1].service_->response_count(), 1U);
+ EXPECT_LE(balancer_servers_[1].service_->request_count(), 2U);
+ EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U);
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
// Check LB policy name for the channel.
@@ -1063,6 +1082,7 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
}
TEST_F(SingleBalancerTest, Drop) {
+ SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 100;
const int num_of_drop_by_rate_limiting_addresses = 1;
const int num_of_drop_by_load_balancing_addresses = 2;
@@ -1106,6 +1126,7 @@ TEST_F(SingleBalancerTest, Drop) {
}
TEST_F(SingleBalancerTest, DropAllFirst) {
+ SetNextResolutionAllBalancers();
// All registered addresses are marked as "drop".
const int num_of_drop_by_rate_limiting_addresses = 1;
const int num_of_drop_by_load_balancing_addresses = 1;
@@ -1121,6 +1142,7 @@ TEST_F(SingleBalancerTest, DropAllFirst) {
}
TEST_F(SingleBalancerTest, DropAll) {
+ SetNextResolutionAllBalancers();
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
@@ -1151,6 +1173,7 @@ class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
};
TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
+ SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
@@ -1185,6 +1208,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
}
TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
+ SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 3;
const int num_of_drop_by_rate_limiting_addresses = 2;
const int num_of_drop_by_load_balancing_addresses = 1;
diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc
index a24cdc7d2d..30bd8bfef8 100644
--- a/test/cpp/interop/interop_server.cc
+++ b/test/cpp/interop/interop_server.cc
@@ -317,9 +317,15 @@ class TestServiceImpl : public TestService::Service {
void grpc::testing::interop::RunServer(
std::shared_ptr<ServerCredentials> creds) {
- GPR_ASSERT(FLAGS_port != 0);
+ RunServer(creds, FLAGS_port, nullptr);
+}
+
+void grpc::testing::interop::RunServer(
+ std::shared_ptr<ServerCredentials> creds, const int port,
+ ServerStartedCondition* server_started_condition) {
+ GPR_ASSERT(port != 0);
std::ostringstream server_address;
- server_address << "0.0.0.0:" << FLAGS_port;
+ server_address << "0.0.0.0:" << port;
TestServiceImpl service;
SimpleRequest request;
@@ -333,6 +339,14 @@ void grpc::testing::interop::RunServer(
}
std::unique_ptr<Server> server(builder.BuildAndStart());
gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());
+
+ // Signal that the server has started.
+ if (server_started_condition) {
+ std::unique_lock<std::mutex> lock(server_started_condition->mutex);
+ server_started_condition->server_started = true;
+ server_started_condition->condition.notify_all();
+ }
+
while (!gpr_atm_no_barrier_load(&g_got_sigint)) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(5, GPR_TIMESPAN)));
diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h
index 6af003fe93..1bf7db1e14 100644
--- a/test/cpp/interop/server_helper.h
+++ b/test/cpp/interop/server_helper.h
@@ -19,6 +19,7 @@
#ifndef GRPC_TEST_CPP_INTEROP_SERVER_HELPER_H
#define GRPC_TEST_CPP_INTEROP_SERVER_HELPER_H
+#include <condition_variable>
#include <memory>
#include <grpc/compression.h>
@@ -50,8 +51,27 @@ class InteropServerContextInspector {
namespace interop {
extern gpr_atm g_got_sigint;
+
+struct ServerStartedCondition {
+ std::mutex mutex;
+ std::condition_variable condition;
+ bool server_started = false;
+};
+
+/// Run gRPC interop server using port FLAGS_port.
+///
+/// \param creds The credentials associated with the server.
void RunServer(std::shared_ptr<ServerCredentials> creds);
+/// Run gRPC interop server.
+///
+/// \param creds The credentials associated with the server.
+/// \param port Port to use for the server.
+/// \param server_started_condition (optional) Struct holding mutex, condition
+/// variable, and condition used to notify when the server has started.
+void RunServer(std::shared_ptr<ServerCredentials> creds, int port,
+ ServerStartedCondition* server_started_condition);
+
} // namespace interop
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index 43bbbad880..fcb1677d09 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -132,7 +132,7 @@ class Fixture {
grpc_channel_args c_args = args.c_channel_args();
ep_ = new DummyEndpoint;
t_ = grpc_create_chttp2_transport(&c_args, ep_, client);
- grpc_chttp2_transport_start_reading(t_, nullptr);
+ grpc_chttp2_transport_start_reading(t_, nullptr, nullptr);
FlushExecCtx();
}
diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h
index ab0a696e3d..d1ede755a5 100644
--- a/test/cpp/microbenchmarks/fullstack_fixtures.h
+++ b/test/cpp/microbenchmarks/fullstack_fixtures.h
@@ -174,7 +174,7 @@ class EndpointPairFixture : public BaseFixture {
const grpc_channel_args* server_args =
grpc_server_get_channel_args(server_->c_server());
server_transport_ = grpc_create_chttp2_transport(
- server_args, endpoints.server, 0 /* is_client */);
+ server_args, endpoints.server, false /* is_client */);
grpc_pollset** pollsets;
size_t num_pollsets = 0;
@@ -186,7 +186,7 @@ class EndpointPairFixture : public BaseFixture {
grpc_server_setup_transport(server_->c_server(), server_transport_,
nullptr, server_args);
- grpc_chttp2_transport_start_reading(server_transport_, nullptr);
+ grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr);
}
/* create channel */
@@ -197,11 +197,11 @@ class EndpointPairFixture : public BaseFixture {
grpc_channel_args c_args = args.c_channel_args();
client_transport_ =
- grpc_create_chttp2_transport(&c_args, endpoints.client, 1);
+ grpc_create_chttp2_transport(&c_args, endpoints.client, true);
GPR_ASSERT(client_transport_);
grpc_channel* channel = grpc_channel_create(
"target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport_);
- grpc_chttp2_transport_start_reading(client_transport_, nullptr);
+ grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr);
channel_ = CreateChannelInternal("", channel);
}
diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc
index 1711eef303..0b9dc83f2b 100644
--- a/test/cpp/performance/writes_per_rpc_test.cc
+++ b/test/cpp/performance/writes_per_rpc_test.cc
@@ -89,7 +89,7 @@ class EndpointPairFixture {
const grpc_channel_args* server_args =
grpc_server_get_channel_args(server_->c_server());
grpc_transport* transport = grpc_create_chttp2_transport(
- server_args, endpoints.server, 0 /* is_client */);
+ server_args, endpoints.server, false /* is_client */);
grpc_pollset** pollsets;
size_t num_pollsets = 0;
@@ -101,7 +101,7 @@ class EndpointPairFixture {
grpc_server_setup_transport(server_->c_server(), transport, nullptr,
server_args);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
/* create channel */
@@ -112,11 +112,11 @@ class EndpointPairFixture {
grpc_channel_args c_args = args.c_channel_args();
grpc_transport* transport =
- grpc_create_chttp2_transport(&c_args, endpoints.client, 1);
+ grpc_create_chttp2_transport(&c_args, endpoints.client, true);
GPR_ASSERT(transport);
grpc_channel* channel = grpc_channel_create(
"target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
channel_ = CreateChannelInternal("", channel);
}
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 9f20b148eb..82a3f0042d 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -60,21 +60,20 @@ class SynchronousClient
SetupLoadTest(config, num_threads_);
}
- virtual ~SynchronousClient(){};
+ virtual ~SynchronousClient() {}
- virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
+ virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
void ThreadFunc(size_t thread_idx, Thread* t) override {
- InitThreadFuncImpl(thread_idx);
+ if (!InitThreadFuncImpl(thread_idx)) {
+ return;
+ }
for (;;) {
// run the loop body
HistogramEntry entry;
const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
t->UpdateHistogram(&entry);
- if (!thread_still_ok) {
- gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
- }
if (!thread_still_ok || ThreadCompleted()) {
return;
}
@@ -109,9 +108,6 @@ class SynchronousClient
size_t num_threads_;
std::vector<SimpleResponse> responses_;
-
- private:
- void DestroyMultithreading() override final { EndThreads(); }
};
class SynchronousUnaryClient final : public SynchronousClient {
@@ -122,7 +118,7 @@ class SynchronousUnaryClient final : public SynchronousClient {
}
~SynchronousUnaryClient() {}
- void InitThreadFuncImpl(size_t thread_idx) override {}
+ bool InitThreadFuncImpl(size_t thread_idx) override { return true; }
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) {
@@ -140,6 +136,9 @@ class SynchronousUnaryClient final : public SynchronousClient {
entry->set_status(s.error_code());
return true;
}
+
+ private:
+ void DestroyMultithreading() override final { EndThreads(); }
};
template <class StreamType>
@@ -149,31 +148,30 @@ class SynchronousStreamingClient : public SynchronousClient {
: SynchronousClient(config),
context_(num_threads_),
stream_(num_threads_),
+ stream_mu_(num_threads_),
+ shutdown_(num_threads_),
messages_per_stream_(config.messages_per_stream()),
messages_issued_(num_threads_) {
StartThreads(num_threads_);
}
virtual ~SynchronousStreamingClient() {
- std::vector<std::thread> cleanup_threads;
- for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i]() {
- auto stream = &stream_[i];
- if (*stream) {
- // forcibly cancel the streams, then finish
- context_[i].TryCancel();
- (*stream)->Finish().IgnoreError();
- // don't log any error message on !ok since this was canceled
- }
- });
- }
- for (auto& th : cleanup_threads) {
- th.join();
- }
+ CleanupAllStreams([this](size_t thread_idx) {
+ // Don't log any kind of error since we may have canceled this
+ stream_[thread_idx]->Finish().IgnoreError();
+ });
}
protected:
std::vector<grpc::ClientContext> context_;
std::vector<std::unique_ptr<StreamType>> stream_;
+ // stream_mu_ is only needed when changing an element of stream_ or context_
+ std::vector<std::mutex> stream_mu_;
+ // use struct Bool rather than bool because vector<bool> is not concurrent
+ struct Bool {
+ bool val;
+ Bool() : val(false) {}
+ };
+ std::vector<Bool> shutdown_;
const int messages_per_stream_;
std::vector<int> messages_issued_;
@@ -182,27 +180,26 @@ class SynchronousStreamingClient : public SynchronousClient {
// don't set the value since the stream is failed and shouldn't be timed
entry->set_status(s.error_code());
if (!s.ok()) {
- gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx,
- s.error_message().c_str());
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s",
+ thread_idx, s.error_message().c_str());
+ }
}
+ // Lock the stream_mu_ now because the client context could change
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
context_[thread_idx].~ClientContext();
new (&context_[thread_idx]) ClientContext();
}
-};
-class SynchronousStreamingPingPongClient final
- : public SynchronousStreamingClient<
- grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
- public:
- SynchronousStreamingPingPongClient(const ClientConfig& config)
- : SynchronousStreamingClient(config) {}
- ~SynchronousStreamingPingPongClient() {
+ void CleanupAllStreams(std::function<void(size_t)> cleaner) {
std::vector<std::thread> cleanup_threads;
for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i]() {
- auto stream = &stream_[i];
- if (*stream) {
- (*stream)->WritesDone();
+ cleanup_threads.emplace_back([this, i, cleaner] {
+ std::lock_guard<std::mutex> l(stream_mu_[i]);
+ shutdown_[i].val = true;
+ if (stream_[i]) {
+ cleaner(i);
}
});
}
@@ -211,10 +208,36 @@ class SynchronousStreamingPingPongClient final
}
}
- void InitThreadFuncImpl(size_t thread_idx) override {
+ private:
+ void DestroyMultithreading() override final {
+ CleanupAllStreams(
+ [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
+ EndThreads();
+ }
+};
+
+class SynchronousStreamingPingPongClient final
+ : public SynchronousStreamingClient<
+ grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
+ public:
+ SynchronousStreamingPingPongClient(const ClientConfig& config)
+ : SynchronousStreamingClient(config) {}
+ ~SynchronousStreamingPingPongClient() {
+ CleanupAllStreams(
+ [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
+ }
+
+ private:
+ bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ } else {
+ return false;
+ }
messages_issued_[thread_idx] = 0;
+ return true;
}
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
@@ -239,7 +262,13 @@ class SynchronousStreamingPingPongClient final
stream_[thread_idx]->WritesDone();
FinishStream(entry, thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ } else {
+ stream_[thread_idx].reset();
+ return false;
+ }
messages_issued_[thread_idx] = 0;
return true;
}
@@ -251,25 +280,24 @@ class SynchronousStreamingFromClientClient final
SynchronousStreamingFromClientClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient() {
- std::vector<std::thread> cleanup_threads;
- for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i]() {
- auto stream = &stream_[i];
- if (*stream) {
- (*stream)->WritesDone();
- }
- });
- }
- for (auto& th : cleanup_threads) {
- th.join();
- }
+ CleanupAllStreams(
+ [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
}
- void InitThreadFuncImpl(size_t thread_idx) override {
+ private:
+ std::vector<double> last_issue_;
+
+ bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
- &responses_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
+ &responses_[thread_idx]);
+ } else {
+ return false;
+ }
last_issue_[thread_idx] = UsageTimer::Now();
+ return true;
}
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
@@ -287,13 +315,16 @@ class SynchronousStreamingFromClientClient final
stream_[thread_idx]->WritesDone();
FinishStream(entry, thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
- &responses_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
+ &responses_[thread_idx]);
+ } else {
+ stream_[thread_idx].reset();
+ return false;
+ }
return true;
}
-
- private:
- std::vector<double> last_issue_;
};
class SynchronousStreamingFromServerClient final
@@ -301,12 +332,24 @@ class SynchronousStreamingFromServerClient final
public:
SynchronousStreamingFromServerClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_recv_(num_threads_) {}
- void InitThreadFuncImpl(size_t thread_idx) override {
+ ~SynchronousStreamingFromServerClient() {}
+
+ private:
+ std::vector<double> last_recv_;
+
+ bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] =
- stub->StreamingFromServer(&context_[thread_idx], request_);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] =
+ stub->StreamingFromServer(&context_[thread_idx], request_);
+ } else {
+ return false;
+ }
last_recv_[thread_idx] = UsageTimer::Now();
+ return true;
}
+
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
@@ -317,13 +360,16 @@ class SynchronousStreamingFromServerClient final
}
FinishStream(entry, thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] =
- stub->StreamingFromServer(&context_[thread_idx], request_);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] =
+ stub->StreamingFromServer(&context_[thread_idx], request_);
+ } else {
+ stream_[thread_idx].reset();
+ return false;
+ }
return true;
}
-
- private:
- std::vector<double> last_recv_;
};
class SynchronousStreamingBothWaysClient final
@@ -333,24 +379,22 @@ class SynchronousStreamingBothWaysClient final
SynchronousStreamingBothWaysClient(const ClientConfig& config)
: SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient() {
- std::vector<std::thread> cleanup_threads;
- for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i]() {
- auto stream = &stream_[i];
- if (*stream) {
- (*stream)->WritesDone();
- }
- });
- }
- for (auto& th : cleanup_threads) {
- th.join();
- }
+ CleanupAllStreams(
+ [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
}
- void InitThreadFuncImpl(size_t thread_idx) override {
+ private:
+ bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
+ } else {
+ return false;
+ }
+ return true;
}
+
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
// TODO (vjpai): Do this
return true;
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index ae7ddb2475..3958d8e9e4 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -2896,6 +2896,23 @@
"gpr",
"gpr_test_util",
"grpc",
+ "grpc_test_util"
+ ],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c++",
+ "name": "chttp2_settings_timeout_test",
+ "src": [
+ "test/core/transport/chttp2/settings_timeout_test.cc"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "gpr",
+ "gpr_test_util",
+ "grpc",
"grpc++",
"grpc++_test_util",
"grpc_cli_libs",
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index ce698cb709..6ffc7f412d 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -3376,6 +3376,30 @@
"flaky": false,
"gtest": true,
"language": "c++",
+ "name": "chttp2_settings_timeout_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "uses_polling": true
+ },
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": true,
+ "language": "c++",
"name": "cli_call_test",
"platforms": [
"linux",
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 766c1c0b7c..471f5d99e7 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -1231,7 +1231,7 @@ if not args.disable_auto_set_flakes:
if test.flaky: flaky_tests.add(test.name)
if test.cpu > 0: shortname_to_cpu[test.name] = test.cpu
except:
- print("Unexpected error getting flaky tests:", sys.exc_info()[0])
+ print("Unexpected error getting flaky tests: %s" % traceback.format_exc())
if args.force_default_poller:
_POLLING_STRATEGIES = {}