diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-12-05 17:49:11 -0800 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2017-12-05 17:49:11 -0800 |
commit | 73bb67d054ecb952f10649cc42c998ab7ea8facd (patch) | |
tree | 181ee31adc7a144d8dcffed5e5de990ab4662cef | |
parent | 65c45fbb4d576d99bcd1c00a13f430c8994fee38 (diff) | |
parent | 05cd3102b7b59bf5d71f66dc012be1f4ecdaad88 (diff) |
Merge master into execctx
64 files changed, 1747 insertions, 950 deletions
diff --git a/.gitignore b/.gitignore index 151bbbde13..2b35c5fb9a 100644 --- a/.gitignore +++ b/.gitignore @@ -121,6 +121,7 @@ gdb.txt tags # perf data +memory_usage.csv perf.data perf.data.old diff --git a/CMakeLists.txt b/CMakeLists.txt index 297301bb39..22a1ab79af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -678,6 +678,7 @@ add_dependencies(buildtests_cxx bm_pollset) endif() add_dependencies(buildtests_cxx channel_arguments_test) add_dependencies(buildtests_cxx channel_filter_test) +add_dependencies(buildtests_cxx chttp2_settings_timeout_test) add_dependencies(buildtests_cxx cli_call_test) add_dependencies(buildtests_cxx client_channel_stress_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) @@ -9863,6 +9864,44 @@ target_link_libraries(channel_filter_test endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) +add_executable(chttp2_settings_timeout_test + test/core/transport/chttp2/settings_timeout_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + + +target_include_directories(chttp2_settings_timeout_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${BORINGSSL_ROOT_DIR}/include + PRIVATE ${PROTOBUF_ROOT_DIR}/src + PRIVATE ${BENCHMARK_ROOT_DIR}/include + PRIVATE ${ZLIB_ROOT_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib + PRIVATE ${CARES_INCLUDE_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include + PRIVATE third_party/googletest/googletest/include + PRIVATE third_party/googletest/googletest + PRIVATE third_party/googletest/googlemock/include + PRIVATE third_party/googletest/googlemock + PRIVATE ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(chttp2_settings_timeout_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + grpc + gpr_test_util + gpr + ${_gRPC_GFLAGS_LIBRARIES} +) + +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + add_executable(cli_call_test test/cpp/util/cli_call_test.cc third_party/googletest/googletest/src/gtest-all.cc @@ -1114,6 +1114,7 @@ bm_metadata: $(BINDIR)/$(CONFIG)/bm_metadata bm_pollset: $(BINDIR)/$(CONFIG)/bm_pollset channel_arguments_test: $(BINDIR)/$(CONFIG)/channel_arguments_test channel_filter_test: $(BINDIR)/$(CONFIG)/channel_filter_test +chttp2_settings_timeout_test: $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test cli_call_test: $(BINDIR)/$(CONFIG)/cli_call_test client_channel_stress_test: $(BINDIR)/$(CONFIG)/client_channel_stress_test client_crash_test: $(BINDIR)/$(CONFIG)/client_crash_test @@ -1557,6 +1558,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/bm_pollset \ $(BINDIR)/$(CONFIG)/channel_arguments_test \ $(BINDIR)/$(CONFIG)/channel_filter_test \ + $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \ $(BINDIR)/$(CONFIG)/cli_call_test \ $(BINDIR)/$(CONFIG)/client_channel_stress_test \ $(BINDIR)/$(CONFIG)/client_crash_test \ @@ -1684,6 +1686,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/bm_pollset \ $(BINDIR)/$(CONFIG)/channel_arguments_test \ $(BINDIR)/$(CONFIG)/channel_filter_test \ + $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \ $(BINDIR)/$(CONFIG)/cli_call_test \ $(BINDIR)/$(CONFIG)/client_channel_stress_test \ $(BINDIR)/$(CONFIG)/client_crash_test \ @@ -2069,6 +2072,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/channel_arguments_test || ( echo test channel_arguments_test failed ; exit 1 ) $(E) "[RUN] Testing channel_filter_test" $(Q) $(BINDIR)/$(CONFIG)/channel_filter_test || ( echo test channel_filter_test failed ; exit 1 ) + $(E) "[RUN] Testing chttp2_settings_timeout_test" + $(Q) $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test || ( echo test chttp2_settings_timeout_test failed ; exit 1 ) $(E) "[RUN] Testing cli_call_test" $(Q) $(BINDIR)/$(CONFIG)/cli_call_test || ( echo test cli_call_test failed ; exit 1 ) $(E) "[RUN] Testing client_channel_stress_test" @@ -14369,6 +14374,49 @@ endif endif +CHTTP2_SETTINGS_TIMEOUT_TEST_SRC = \ + test/core/transport/chttp2/settings_timeout_test.cc \ + +CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CHTTP2_SETTINGS_TIMEOUT_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test: $(PROTOBUF_DEP) $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/core/transport/chttp2/settings_timeout_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_chttp2_settings_timeout_test: $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS:.o=.dep) +endif +endif + + CLI_CALL_TEST_SRC = \ test/cpp/util/cli_call_test.cc \ diff --git a/build.yaml b/build.yaml index 854a695acc..6158b466ea 100644 --- a/build.yaml +++ b/build.yaml @@ -3824,6 +3824,18 @@ targets: - grpc - gpr uses_polling: false +- name: chttp2_settings_timeout_test + gtest: true + build: test + language: c++ + src: + - test/core/transport/chttp2/settings_timeout_test.cc + deps: + - grpc_test_util + - grpc + - gpr_test_util + - gpr + uses_polling: true - name: cli_call_test gtest: true build: test diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 9ab2fc8b73..fcbc8ac5a1 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -240,6 +240,9 @@ typedef struct { /** The time between the first and second connection attempts, in ms */ #define GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS \ "grpc.initial_reconnect_backoff_ms" +/** The timeout used on servers for finishing handshaking on an incoming + connection. Defaults to 120 seconds. */ +#define GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS "grpc.server_handshake_timeout_ms" /** This *should* be used for testing only. The caller of the secure_channel_create functions may override the target name used for SSL host name checking using this channel argument which is of diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 727210bbd4..ba82c88eb7 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -209,6 +209,14 @@ typedef struct client_channel_channel_data { char* info_service_config_json; } channel_data; +typedef struct { + channel_data* chand; + /** used as an identifier, don't dereference it because the LB policy may be + * non-existing when the callback is run */ + grpc_lb_policy* lb_policy; + grpc_closure closure; +} reresolution_request_args; + /** We create one watcher for each new lb_policy that is returned from a resolver, to watch for state changes from the lb_policy. When a state change is seen, we update the channel, and create a new watcher. */ @@ -254,21 +262,13 @@ static void set_channel_connectivity_state_locked(channel_data* chand, static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) { lb_policy_connectivity_watcher* w = (lb_policy_connectivity_watcher*)arg; - grpc_connectivity_state publish_state = w->state; /* check if the notification is for the latest policy */ if (w->lb_policy == w->chand->lb_policy) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand, w->lb_policy, grpc_connectivity_state_name(w->state)); } - if (publish_state == GRPC_CHANNEL_SHUTDOWN && - w->chand->resolver != nullptr) { - publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_resolver_channel_saw_error_locked(w->chand->resolver); - GRPC_LB_POLICY_UNREF(w->chand->lb_policy, "channel"); - w->chand->lb_policy = nullptr; - } - set_channel_connectivity_state_locked(w->chand, publish_state, + set_channel_connectivity_state_locked(w->chand, w->state, GRPC_ERROR_REF(error), "lb_changed"); if (w->state != GRPC_CHANNEL_SHUTDOWN) { watch_lb_policy_locked(w->chand, w->lb_policy, w->state); @@ -364,6 +364,25 @@ static void parse_retry_throttle_params(const grpc_json* field, void* arg) { } } +static void request_reresolution_locked(void* arg, grpc_error* error) { + reresolution_request_args* args = (reresolution_request_args*)arg; + channel_data* chand = args->chand; + // If this invocation is for a stale LB policy, treat it as an LB shutdown + // signal. + if (args->lb_policy != chand->lb_policy || error != GRPC_ERROR_NONE || + chand->resolver == nullptr) { + GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution"); + gpr_free(args); + return; + } + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p: started name re-resolving", chand); + } + grpc_resolver_channel_saw_error_locked(chand->resolver); + // Give back the closure to the LB policy. + grpc_lb_policy_set_reresolve_closure_locked(chand->lb_policy, &args->closure); +} + static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { channel_data* chand = (channel_data*)arg; if (grpc_client_channel_trace.enabled()) { @@ -379,98 +398,111 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { grpc_server_retry_throttle_data* retry_throttle_data = nullptr; grpc_slice_hash_table* method_params_table = nullptr; if (chand->resolver_result != nullptr) { - // Find LB policy name. - const char* lb_policy_name = nullptr; - const grpc_arg* channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); - if (channel_arg != nullptr) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - lb_policy_name = channel_arg->value.string; - } - // Special case: If at least one balancer address is present, we use - // the grpclb policy, regardless of what the resolver actually specified. - channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); - if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { - grpc_lb_addresses* addresses = - (grpc_lb_addresses*)channel_arg->value.pointer.p; - bool found_balancer_address = false; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) { - found_balancer_address = true; - break; + if (chand->resolver != nullptr) { + // Find LB policy name. + const char* lb_policy_name = nullptr; + const grpc_arg* channel_arg = grpc_channel_args_find( + chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); + if (channel_arg != nullptr) { + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + lb_policy_name = channel_arg->value.string; + } + // Special case: If at least one balancer address is present, we use + // the grpclb policy, regardless of what the resolver actually specified. + channel_arg = + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); + if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { + grpc_lb_addresses* addresses = + (grpc_lb_addresses*)channel_arg->value.pointer.p; + bool found_balancer_address = false; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (addresses->addresses[i].is_balancer) { + found_balancer_address = true; + break; + } + } + if (found_balancer_address) { + if (lb_policy_name != nullptr && + strcmp(lb_policy_name, "grpclb") != 0) { + gpr_log(GPR_INFO, + "resolver requested LB policy %s but provided at least one " + "balancer address -- forcing use of grpclb LB policy", + lb_policy_name); + } + lb_policy_name = "grpclb"; } } - if (found_balancer_address) { - if (lb_policy_name != nullptr && - strcmp(lb_policy_name, "grpclb") != 0) { - gpr_log(GPR_INFO, - "resolver requested LB policy %s but provided at least one " - "balancer address -- forcing use of grpclb LB policy", + // Use pick_first if nothing was specified and we didn't select grpclb + // above. + if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; + grpc_lb_policy_args lb_policy_args; + lb_policy_args.args = chand->resolver_result; + lb_policy_args.client_channel_factory = chand->client_channel_factory; + lb_policy_args.combiner = chand->combiner; + // Check to see if we're already using the right LB policy. + // Note: It's safe to use chand->info_lb_policy_name here without + // taking a lock on chand->info_mu, because this function is the + // only thing that modifies its value, and it can only be invoked + // once at any given time. + lb_policy_name_changed = + chand->info_lb_policy_name == nullptr || + gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0; + if (chand->lb_policy != nullptr && !lb_policy_name_changed) { + // Continue using the same LB policy. Update with new addresses. + lb_policy_updated = true; + grpc_lb_policy_update_locked(chand->lb_policy, &lb_policy_args); + } else { + // Instantiate new LB policy. + new_lb_policy = grpc_lb_policy_create(lb_policy_name, &lb_policy_args); + if (new_lb_policy == nullptr) { + gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); + } else { + reresolution_request_args* args = + (reresolution_request_args*)gpr_zalloc(sizeof(*args)); + args->chand = chand; + args->lb_policy = new_lb_policy; + GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args, + grpc_combiner_scheduler(chand->combiner)); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution"); + grpc_lb_policy_set_reresolve_closure_locked(new_lb_policy, + &args->closure); } - lb_policy_name = "grpclb"; - } - } - // Use pick_first if nothing was specified and we didn't select grpclb - // above. - if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; - grpc_lb_policy_args lb_policy_args; - lb_policy_args.args = chand->resolver_result; - lb_policy_args.client_channel_factory = chand->client_channel_factory; - lb_policy_args.combiner = chand->combiner; - // Check to see if we're already using the right LB policy. - // Note: It's safe to use chand->info_lb_policy_name here without - // taking a lock on chand->info_mu, because this function is the - // only thing that modifies its value, and it can only be invoked - // once at any given time. - lb_policy_name_changed = - chand->info_lb_policy_name == nullptr || - gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0; - if (chand->lb_policy != nullptr && !lb_policy_name_changed) { - // Continue using the same LB policy. Update with new addresses. - lb_policy_updated = true; - grpc_lb_policy_update_locked(chand->lb_policy, &lb_policy_args); - } else { - // Instantiate new LB policy. - new_lb_policy = grpc_lb_policy_create(lb_policy_name, &lb_policy_args); - if (new_lb_policy == nullptr) { - gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); } - } - // Find service config. - channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); - if (channel_arg != nullptr) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - service_config_json = gpr_strdup(channel_arg->value.string); - grpc_service_config* service_config = - grpc_service_config_create(service_config_json); - if (service_config != nullptr) { - channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); - GPR_ASSERT(channel_arg != nullptr); + // Find service config. + channel_arg = grpc_channel_args_find(chand->resolver_result, + GRPC_ARG_SERVICE_CONFIG); + if (channel_arg != nullptr) { GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - grpc_uri* uri = grpc_uri_parse(channel_arg->value.string, true); - GPR_ASSERT(uri->path[0] != '\0'); - service_config_parsing_state parsing_state; - memset(&parsing_state, 0, sizeof(parsing_state)); - parsing_state.server_name = - uri->path[0] == '/' ? uri->path + 1 : uri->path; - grpc_service_config_parse_global_params( - service_config, parse_retry_throttle_params, &parsing_state); - grpc_uri_destroy(uri); - retry_throttle_data = parsing_state.retry_throttle_data; - method_params_table = grpc_service_config_create_method_config_table( - service_config, method_parameters_create_from_json, - method_parameters_ref_wrapper, method_parameters_unref_wrapper); - grpc_service_config_destroy(service_config); + service_config_json = gpr_strdup(channel_arg->value.string); + grpc_service_config* service_config = + grpc_service_config_create(service_config_json); + if (service_config != nullptr) { + channel_arg = grpc_channel_args_find(chand->resolver_result, + GRPC_ARG_SERVER_URI); + GPR_ASSERT(channel_arg != nullptr); + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + grpc_uri* uri = grpc_uri_parse(channel_arg->value.string, true); + GPR_ASSERT(uri->path[0] != '\0'); + service_config_parsing_state parsing_state; + memset(&parsing_state, 0, sizeof(parsing_state)); + parsing_state.server_name = + uri->path[0] == '/' ? uri->path + 1 : uri->path; + grpc_service_config_parse_global_params( + service_config, parse_retry_throttle_params, &parsing_state); + grpc_uri_destroy(uri); + retry_throttle_data = parsing_state.retry_throttle_data; + method_params_table = grpc_service_config_create_method_config_table( + service_config, method_parameters_create_from_json, + method_parameters_ref_wrapper, method_parameters_unref_wrapper); + grpc_service_config_destroy(service_config); + } } + // Before we clean up, save a copy of lb_policy_name, since it might + // be pointing to data inside chand->resolver_result. + // The copy will be saved in chand->lb_policy_name below. + lb_policy_name_dup = gpr_strdup(lb_policy_name); } - // Before we clean up, save a copy of lb_policy_name, since it might - // be pointing to data inside chand->resolver_result. - // The copy will be saved in chand->lb_policy_name below. - lb_policy_name_dup = gpr_strdup(lb_policy_name); grpc_channel_args_destroy(chand->resolver_result); chand->resolver_result = nullptr; } @@ -507,11 +539,11 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { } chand->method_params_table = method_params_table; // If we have a new LB policy or are shutting down (in which case - // new_lb_policy will be NULL), swap out the LB policy, unreffing the - // old one and removing its fds from chand->interested_parties. - // Note that we do NOT do this if either (a) we updated the existing - // LB policy above or (b) we failed to create the new LB policy (in - // which case we want to continue using the most recent one we had). + // new_lb_policy will be NULL), swap out the LB policy, unreffing the old one + // and removing its fds from chand->interested_parties. Note that we do NOT do + // this if either (a) we updated the existing LB policy above or (b) we failed + // to create the new LB policy (in which case we want to continue using the + // most recent one we had). if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE || chand->resolver == nullptr) { if (chand->lb_policy != nullptr) { diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index 6e2559936b..b97aa319f7 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -147,3 +147,27 @@ void grpc_lb_policy_update_locked(grpc_lb_policy* policy, const grpc_lb_policy_args* lb_policy_args) { policy->vtable->update_locked(policy, lb_policy_args); } + +void grpc_lb_policy_set_reresolve_closure_locked( + grpc_lb_policy* policy, grpc_closure* request_reresolution) { + policy->vtable->set_reresolve_closure_locked(policy, request_reresolution); +} + +void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy, + grpc_core::TraceFlag* grpc_lb_trace, + grpc_error* error) { + if (policy->request_reresolution != nullptr) { + GRPC_CLOSURE_SCHED(policy->request_reresolution, error); + policy->request_reresolution = nullptr; + if (grpc_lb_trace->enabled()) { + gpr_log(GPR_DEBUG, + "%s %p: scheduling re-resolution closure with error=%s.", + grpc_lb_trace->name(), policy, grpc_error_string(error)); + } + } else { + if (grpc_lb_trace->enabled() && error == GRPC_ERROR_NONE) { + gpr_log(GPR_DEBUG, "%s %p: re-resolution already in progress.", + grpc_lb_trace->name(), policy); + } + } +} diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 6c241e932a..fd28a2b623 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -38,6 +38,8 @@ struct grpc_lb_policy { grpc_pollset_set* interested_parties; /* combiner under which lb_policy actions take place */ grpc_combiner* combiner; + /* callback to force a re-resolution */ + grpc_closure* request_reresolution; }; /** Extra arguments for an LB pick */ @@ -93,6 +95,10 @@ struct grpc_lb_policy_vtable { void (*update_locked)(grpc_lb_policy* policy, const grpc_lb_policy_args* args); + + /** \see grpc_lb_policy_set_reresolve_closure */ + void (*set_reresolve_closure_locked)(grpc_lb_policy* policy, + grpc_closure* request_reresolution); }; #ifndef NDEBUG @@ -193,4 +199,14 @@ grpc_connectivity_state grpc_lb_policy_check_connectivity_locked( void grpc_lb_policy_update_locked(grpc_lb_policy* policy, const grpc_lb_policy_args* lb_policy_args); +/** Set the re-resolution closure to \a request_reresolution. */ +void grpc_lb_policy_set_reresolve_closure_locked( + grpc_lb_policy* policy, grpc_closure* request_reresolution); + +/** Try to request a re-resolution. It's NOT a public API; it's only for use by + the LB policy implementations. */ +void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy, + grpc_core::TraceFlag* grpc_lb_trace, + grpc_error* error); + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 9a52f297b3..eadeea0368 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -636,7 +636,7 @@ static void update_lb_connectivity_status_locked( /* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return * immediately (ignoring its completion callback), we need to perform the - * cleanups this callback would otherwise be resposible for. + * cleanups this callback would otherwise be responsible for. * If \a force_async is true, then we will manually schedule the * completion callback even if the pick is available immediately. */ static bool pick_from_internal_rr_locked( @@ -761,6 +761,9 @@ static void create_rr_locked(glb_lb_policy* glb_policy, glb_policy->rr_policy); return; } + grpc_lb_policy_set_reresolve_closure_locked( + new_rr_policy, glb_policy->base.request_reresolution); + glb_policy->base.request_reresolution = nullptr; glb_policy->rr_policy = new_rr_policy; grpc_error* rr_state_error = nullptr; const grpc_connectivity_state rr_state = @@ -978,6 +981,7 @@ static void glb_destroy(grpc_lb_policy* pol) { static void glb_shutdown_locked(grpc_lb_policy* pol) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); glb_policy->shutting_down = true; /* We need a copy of the lb_call pointer because we can't cancell the call @@ -1008,6 +1012,8 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) { glb_policy->pending_pings = nullptr; if (glb_policy->rr_policy != nullptr) { GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown"); + } else { + grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); } // We destroy the LB channel here because // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy @@ -1017,28 +1023,26 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) { grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = nullptr; } - grpc_connectivity_state_set( - &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown"); + grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_REF(error), "glb_shutdown"); while (pp != nullptr) { pending_pick* next = pp->next; *pp->target = nullptr; - GRPC_CLOSURE_SCHED( - &pp->wrapped_on_complete_arg.wrapper_closure, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_ERROR_REF(error)); gpr_free(pp); pp = next; } while (pping != nullptr) { pending_ping* next = pping->next; - GRPC_CLOSURE_SCHED( - &pping->wrapped_notify_arg.wrapper_closure, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + GRPC_CLOSURE_SCHED(&pping->wrapped_notify_arg.wrapper_closure, + GRPC_ERROR_REF(error)); gpr_free(pping); pping = next; } + GRPC_ERROR_UNREF(error); } // Cancel a specific pending pick. @@ -1713,8 +1717,8 @@ static void fallback_update_locked(glb_lb_policy* glb_policy, grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); glb_policy->fallback_backend_addresses = extract_backend_addresses_locked(addresses); - if (glb_policy->started_picking && glb_policy->lb_fallback_timeout_ms > 0 && - !glb_policy->fallback_timer_active) { + if (glb_policy->lb_fallback_timeout_ms > 0 && + glb_policy->rr_policy != nullptr) { rr_handover_locked(glb_policy); } } @@ -1811,7 +1815,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, grpc_call_cancel(glb_policy->lb_call, nullptr); // lb_on_server_status_received() will pick up the cancel and reinit // lb_call. - } else if (glb_policy->started_picking && !glb_policy->shutting_down) { + } else if (glb_policy->started_picking) { if (glb_policy->retry_timer_active) { grpc_timer_cancel(&glb_policy->lb_call_retry_timer); glb_policy->retry_timer_active = false; @@ -1828,6 +1832,19 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, } } +static void glb_set_reresolve_closure_locked( + grpc_lb_policy* policy, grpc_closure* request_reresolution) { + glb_lb_policy* glb_policy = (glb_lb_policy*)policy; + GPR_ASSERT(!glb_policy->shutting_down); + GPR_ASSERT(glb_policy->base.request_reresolution == nullptr); + if (glb_policy->rr_policy != nullptr) { + grpc_lb_policy_set_reresolve_closure_locked(glb_policy->rr_policy, + request_reresolution); + } else { + glb_policy->base.request_reresolution = request_reresolution; + } +} + /* Code wiring the policy with the rest of the core */ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_destroy, @@ -1839,7 +1856,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_exit_idle_locked, glb_check_connectivity_locked, glb_notify_on_state_change_locked, - glb_update_locked}; + glb_update_locked, + glb_set_reresolve_closure_locked}; static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) { diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 9fc7f1a5d1..5e75b64843 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -70,7 +70,9 @@ static void pf_destroy(grpc_lb_policy* pol) { } } -static void shutdown_locked(pick_first_lb_policy* p, grpc_error* error) { +static void pf_shutdown_locked(grpc_lb_policy* pol) { + pick_first_lb_policy* p = (pick_first_lb_policy*)pol; + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p); } @@ -94,14 +96,11 @@ static void shutdown_locked(pick_first_lb_policy* p, grpc_error* error) { p->latest_pending_subchannel_list, "pf_shutdown"); p->latest_pending_subchannel_list = nullptr; } + grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, + GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } -static void pf_shutdown_locked(grpc_lb_policy* pol) { - shutdown_locked((pick_first_lb_policy*)pol, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown")); -} - static void pf_cancel_pick_locked(grpc_lb_policy* pol, grpc_connected_subchannel** target, grpc_error* error) { @@ -154,10 +153,15 @@ static void start_picking_locked(pick_first_lb_policy* p) { if (p->subchannel_list != nullptr && p->subchannel_list->num_subchannels > 0) { p->subchannel_list->checking_subchannel = 0; - grpc_lb_subchannel_list_ref_for_connectivity_watch( - p->subchannel_list, "connectivity_watch+start_picking"); - grpc_lb_subchannel_data_start_connectivity_watch( - &p->subchannel_list->subchannels[0]); + for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { + if (p->subchannel_list->subchannels[i].subchannel != nullptr) { + grpc_lb_subchannel_list_ref_for_connectivity_watch( + p->subchannel_list, "connectivity_watch+start_picking"); + grpc_lb_subchannel_data_start_connectivity_watch( + &p->subchannel_list->subchannels[i]); + break; + } + } } } @@ -394,6 +398,9 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { if (sd->curr_connectivity_state != GRPC_CHANNEL_READY && p->latest_pending_subchannel_list != nullptr) { p->selected = nullptr; + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + sd->subchannel_list, "selected_not_ready+switch_to_update"); grpc_lb_subchannel_list_shutdown_and_unref( p->subchannel_list, "selected_not_ready+switch_to_update"); p->subchannel_list = p->latest_pending_subchannel_list; @@ -402,21 +409,34 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); } else { - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* if the selected channel goes bad, we're done */ - sd->curr_connectivity_state = GRPC_CHANNEL_SHUTDOWN; + // TODO(juanlishen): we re-resolve when the selected subchannel goes to + // TRANSIENT_FAILURE because we used to shut down in this case before + // re-resolution is introduced. But we need to investigate whether we + // really want to take any action instead of waiting for the selected + // subchannel reconnecting. + if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN || + sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // If the selected channel goes bad, request a re-resolution. + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, + GRPC_ERROR_NONE, + "selected_changed+reresolve"); + p->started_picking = false; + grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, + GRPC_ERROR_NONE); + } else { + grpc_connectivity_state_set(&p->state_tracker, + sd->curr_connectivity_state, + GRPC_ERROR_REF(error), "selected_changed"); } - grpc_connectivity_state_set(&p->state_tracker, - sd->curr_connectivity_state, - GRPC_ERROR_REF(error), "selected_changed"); if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) { // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); } else { + p->selected = nullptr; grpc_lb_subchannel_data_stop_connectivity_watch(sd); grpc_lb_subchannel_list_unref_for_connectivity_watch( sd->subchannel_list, "pf_selected_shutdown"); - shutdown_locked(p, GRPC_ERROR_REF(error)); + grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown"); } } return; @@ -519,23 +539,36 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { } while (sd->subchannel == nullptr && sd != original_sd); if (sd == original_sd) { grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_candidate_shutdown"); - shutdown_locked(p, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick first exhausted channels", &error, 1)); - break; - } - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "subchannel_failed"); + sd->subchannel_list, "pf_exhausted_subchannels"); + if (sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, + GRPC_ERROR_NONE, + "exhausted_subchannels+reresolve"); + p->started_picking = false; + grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, + GRPC_ERROR_NONE); + } + } else { + if (sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set( + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "subchannel_failed"); + } + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(sd); } - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(sd); - break; } } } +static void pf_set_reresolve_closure_locked( + grpc_lb_policy* policy, grpc_closure* request_reresolution) { + pick_first_lb_policy* p = (pick_first_lb_policy*)policy; + GPR_ASSERT(!p->shutdown); + GPR_ASSERT(policy->request_reresolution == nullptr); + policy->request_reresolution = request_reresolution; +} + static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_destroy, pf_shutdown_locked, @@ -546,7 +579,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_exit_idle_locked, pf_check_connectivity_locked, pf_notify_on_state_change_locked, - pf_update_locked}; + pf_update_locked, + pf_set_reresolve_closure_locked}; static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 2eadd6473d..6958b72693 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -20,9 +20,9 @@ * * Before every pick, the \a get_next_ready_subchannel_index_locked function * returns the p->subchannel_list->subchannels index for next subchannel, - * respecting the relative - * order of the addresses provided upon creation or updates. Note however that - * updates will start picking from the beginning of the updated list. */ + * respecting the relative order of the addresses provided upon creation or + * updates. Note however that updates will start picking from the beginning of + * the updated list. */ #include <string.h> @@ -167,7 +167,9 @@ static void rr_destroy(grpc_lb_policy* pol) { gpr_free(p); } -static void shutdown_locked(round_robin_lb_policy* p, grpc_error* error) { +static void rr_shutdown_locked(grpc_lb_policy* pol) { + round_robin_lb_policy* p = (round_robin_lb_policy*)pol; + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p); } @@ -191,14 +193,11 @@ static void shutdown_locked(round_robin_lb_policy* p, grpc_error* error) { p->latest_pending_subchannel_list, "sl_shutdown_pending_rr_shutdown"); p->latest_pending_subchannel_list = nullptr; } + grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, + GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } -static void rr_shutdown_locked(grpc_lb_policy* pol) { - round_robin_lb_policy* p = (round_robin_lb_policy*)pol; - shutdown_locked(p, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); -} - static void rr_cancel_pick_locked(grpc_lb_policy* pol, grpc_connected_subchannel** target, grpc_error* error) { @@ -250,10 +249,12 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol, static void start_picking_locked(round_robin_lb_policy* p) { p->started_picking = true; for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { - grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list, - "connectivity_watch"); - grpc_lb_subchannel_data_start_connectivity_watch( - &p->subchannel_list->subchannels[i]); + if (p->subchannel_list->subchannels[i].subchannel != nullptr) { + grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list, + "connectivity_watch"); + grpc_lb_subchannel_data_start_connectivity_watch( + &p->subchannel_list->subchannels[i]); + } } } @@ -341,69 +342,69 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { } /** Sets the policy's connectivity status based on that of the passed-in \a sd - * (the grpc_lb_subchannel_data associted with the updated subchannel) and the - * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be - * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the - * connectivity status set. */ -static grpc_connectivity_state update_lb_connectivity_status_locked( - grpc_lb_subchannel_data* sd, grpc_error* error) { + * (the grpc_lb_subchannel_data associated with the updated subchannel) and the + * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used + * only if the policy transitions to state TRANSIENT_FAILURE. */ +static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, + grpc_error* error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). * * 1) RULE: ANY subchannel is READY => policy is READY. - * CHECK: At least one subchannel is ready iff p->ready_list is NOT empty. + * CHECK: subchannel_list->num_ready > 0. * * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. * CHECK: sd->curr_connectivity_state == CONNECTING. * - * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN. - * CHECK: p->subchannel_list->num_shutdown == - * p->subchannel_list->num_subchannels. + * 3) RULE: ALL subchannels are SHUTDOWN => policy is IDLE (and requests + * re-resolution). + * CHECK: subchannel_list->num_shutdown == + * subchannel_list->num_subchannels. * * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is - * TRANSIENT_FAILURE. - * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels. + * TRANSIENT_FAILURE. + * CHECK: subchannel_list->num_transient_failures == + * subchannel_list->num_subchannels. * * 5) RULE: ALL subchannels are IDLE => policy is IDLE. - * CHECK: p->num_idle == p->subchannel_list->num_subchannels. + * CHECK: subchannel_list->num_idle == subchannel_list->num_subchannels. + * (Note that all the subchannels will transition from IDLE to CONNECTING + * in batch when we start trying to connect.) */ - grpc_connectivity_state new_state = sd->curr_connectivity_state; + // TODO(juanlishen): if the subchannel states are mixed by {SHUTDOWN, + // TRANSIENT_FAILURE}, we don't change the state. We may want to improve on + // this. grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy; - if (subchannel_list->num_ready > 0) { /* 1) READY */ + if (subchannel_list->num_ready > 0) { + /* 1) READY */ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); - new_state = GRPC_CHANNEL_READY; - } else if (sd->curr_connectivity_state == - GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */ + } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) { + /* 2) CONNECTING */ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); - new_state = GRPC_CHANNEL_CONNECTING; - } else if (p->subchannel_list->num_shutdown == - p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */ - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_REF(error), "rr_shutdown"); - p->shutdown = true; - new_state = GRPC_CHANNEL_SHUTDOWN; - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_INFO, - "[RR %p] Shutting down: all subchannels have gone into shutdown", - (void*)p); - } + } else if (subchannel_list->num_shutdown == + subchannel_list->num_subchannels) { + /* 3) IDLE and re-resolve */ + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, + GRPC_ERROR_NONE, + "rr_exhausted_subchannels+reresolve"); + p->started_picking = false; + grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, + GRPC_ERROR_NONE); } else if (subchannel_list->num_transient_failures == - p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ + subchannel_list->num_subchannels) { + /* 4) TRANSIENT_FAILURE */ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "rr_transient_failure"); - new_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - } else if (subchannel_list->num_idle == - p->subchannel_list->num_subchannels) { /* 5) IDLE */ + } else if (subchannel_list->num_idle == subchannel_list->num_subchannels) { + /* 5) IDLE */ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, "rr_idle"); - new_state = GRPC_CHANNEL_IDLE; } GRPC_ERROR_UNREF(error); - return new_state; } static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { @@ -446,20 +447,15 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - // Update state counters and determine new overall state. + // Update state counters and new overall state. update_state_counters_locked(sd); - const grpc_connectivity_state new_policy_connectivity_state = - update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); - // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new - // policy's state is SHUTDOWN, clean up. + update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); + // If the sd's new state is SHUTDOWN, unref the subchannel. if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { grpc_lb_subchannel_data_stop_connectivity_watch(sd); grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown"); grpc_lb_subchannel_list_unref_for_connectivity_watch( sd->subchannel_list, "rr_connectivity_shutdown"); - if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - shutdown_locked(p, GRPC_ERROR_REF(error)); - } } else { // sd not in SHUTDOWN if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { if (sd->connected_subchannel == nullptr) { @@ -495,7 +491,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { } /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ + * p->pending_picks. This preemptively replicates rr_pick()'s actions. */ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); grpc_lb_subchannel_data* selected = @@ -630,6 +626,14 @@ static void rr_update_locked(grpc_lb_policy* policy, } } +static void rr_set_reresolve_closure_locked( + grpc_lb_policy* policy, grpc_closure* request_reresolution) { + round_robin_lb_policy* p = (round_robin_lb_policy*)policy; + GPR_ASSERT(!p->shutdown); + GPR_ASSERT(policy->request_reresolution == nullptr); + policy->request_reresolution = request_reresolution; +} + static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_destroy, rr_shutdown_locked, @@ -640,7 +644,8 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_exit_idle_locked, rr_check_connectivity_locked, rr_notify_on_state_change_locked, - rr_update_locked}; + rr_update_locked, + rr_set_reresolve_closure_locked}; static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {} diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 9771bd636a..db5962e7fd 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -114,10 +114,34 @@ static void on_handshake_done(void* arg, grpc_error* error) { grpc_endpoint_delete_from_pollset_set(args->endpoint, c->args.interested_parties); c->result->transport = - grpc_create_chttp2_transport(args->args, args->endpoint, 1); + grpc_create_chttp2_transport(args->args, args->endpoint, true); GPR_ASSERT(c->result->transport); - grpc_chttp2_transport_start_reading(c->result->transport, - args->read_buffer); + // TODO(roth): We ideally want to wait until we receive HTTP/2 + // settings from the server before we consider the connection + // established. If that doesn't happen before the connection + // timeout expires, then we should consider the connection attempt a + // failure and feed that information back into the backoff code. + // We could pass a notify_on_receive_settings callback to + // grpc_chttp2_transport_start_reading() to let us know when + // settings are received, but we would need to figure out how to use + // that information here. + // + // Unfortunately, we don't currently have a way to split apart the two + // effects of scheduling c->notify: we start sending RPCs immediately + // (which we want to do) and we consider the connection attempt successful + // (which we don't want to do until we get the notify_on_receive_settings + // callback from the transport). If we could split those things + // apart, then we could start sending RPCs but then wait for our + // timeout before deciding if the connection attempt is successful. + // If the attempt is not successful, then we would tear down the + // transport and feed the failure back into the backoff code. + // + // In addition, even if we did that, we would probably not want to do + // so until after transparent retries is implemented. Otherwise, any + // RPC that we attempt to send on the connection before the timeout + // would fail instead of being retried on a subsequent attempt. + grpc_chttp2_transport_start_reading(c->result->transport, args->read_buffer, + nullptr); c->result->channel_args = args->args; } grpc_closure* notify = c->notify; @@ -135,8 +159,9 @@ static void start_handshake_locked(chttp2_connector* c) { c->handshake_mgr); grpc_endpoint_add_to_pollset_set(c->endpoint, c->args.interested_parties); grpc_handshake_manager_do_handshake( - c->handshake_mgr, c->endpoint, c->args.channel_args, c->args.deadline, - nullptr /* acceptor */, on_handshake_done, c); + c->handshake_mgr, c->args.interested_parties, c->endpoint, + c->args.channel_args, c->args.deadline, nullptr /* acceptor */, + on_handshake_done, c); c->endpoint = nullptr; // Endpoint handed off to handshake manager. } diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index 6236551e24..0cdea5a94e 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -53,12 +53,14 @@ grpc_channel* grpc_insecure_channel_create_from_fd( grpc_fd_create(fd, "client"), args, "fd-client"); grpc_transport* transport = - grpc_create_chttp2_transport(final_args, client, 1); + grpc_create_chttp2_transport(final_args, client, true); GPR_ASSERT(transport); grpc_channel* channel = grpc_channel_create( target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); grpc_channel_args_destroy(final_args); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); + + grpc_core::ExecCtx::Get()->Flush(); return channel != nullptr ? channel : grpc_lame_client_channel_create( diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 08d919144e..5669fa4090 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -21,6 +21,7 @@ #include <grpc/grpc.h> #include <inttypes.h> +#include <limits.h> #include <string.h> #include <grpc/support/alloc.h> @@ -31,6 +32,7 @@ #include "src/core/ext/filters/http/server/http_server_filter.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/handshaker_registry.h" @@ -53,12 +55,51 @@ typedef struct { } server_state; typedef struct { + gpr_refcount refs; server_state* svr_state; grpc_pollset* accepting_pollset; grpc_tcp_server_acceptor* acceptor; grpc_handshake_manager* handshake_mgr; + // State for enforcing handshake timeout on receiving HTTP/2 settings. + grpc_chttp2_transport* transport; + grpc_millis deadline; + grpc_timer timer; + grpc_closure on_timeout; + grpc_closure on_receive_settings; } server_connection_state; +static void server_connection_state_unref( + server_connection_state* connection_state) { + if (gpr_unref(&connection_state->refs)) { + if (connection_state->transport != nullptr) { + GRPC_CHTTP2_UNREF_TRANSPORT(connection_state->transport, + "receive settings timeout"); + } + gpr_free(connection_state); + } +} + +static void on_timeout(void* arg, grpc_error* error) { + server_connection_state* connection_state = (server_connection_state*)arg; + // Note that we may be called with GRPC_ERROR_NONE when the timer fires + // or with an error indicating that the timer system is being shut down. + if (error != GRPC_ERROR_CANCELLED) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Did not receive HTTP/2 settings before handshake timeout"); + grpc_transport_perform_op(&connection_state->transport->base, op); + } + server_connection_state_unref(connection_state); +} + +static void on_receive_settings(void* arg, grpc_error* error) { + server_connection_state* connection_state = (server_connection_state*)arg; + if (error == GRPC_ERROR_NONE) { + grpc_timer_cancel(&connection_state->timer); + } + server_connection_state_unref(connection_state); +} + static void on_handshake_done(void* arg, grpc_error* error) { grpc_handshaker_args* args = (grpc_handshaker_args*)arg; server_connection_state* connection_state = @@ -67,7 +108,6 @@ static void on_handshake_done(void* arg, grpc_error* error) { if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) { const char* error_str = grpc_error_string(error); gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str); - if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) { // We were shut down after handshaking completed successfully, so // destroy the endpoint here. @@ -87,12 +127,27 @@ static void on_handshake_done(void* arg, grpc_error* error) { // code, so we can just clean up here without creating a transport. if (args->endpoint != nullptr) { grpc_transport* transport = - grpc_create_chttp2_transport(args->args, args->endpoint, 0); + grpc_create_chttp2_transport(args->args, args->endpoint, false); grpc_server_setup_transport( connection_state->svr_state->server, transport, connection_state->accepting_pollset, args->args); - grpc_chttp2_transport_start_reading(transport, args->read_buffer); + // Use notify_on_receive_settings callback to enforce the + // handshake deadline. + connection_state->transport = (grpc_chttp2_transport*)transport; + gpr_ref(&connection_state->refs); + GRPC_CLOSURE_INIT(&connection_state->on_receive_settings, + on_receive_settings, connection_state, + grpc_schedule_on_exec_ctx); + grpc_chttp2_transport_start_reading( + transport, args->read_buffer, &connection_state->on_receive_settings); grpc_channel_args_destroy(args->args); + gpr_ref(&connection_state->refs); + GRPC_CHTTP2_REF_TRANSPORT((grpc_chttp2_transport*)transport, + "receive settings timeout"); + GRPC_CLOSURE_INIT(&connection_state->on_timeout, on_timeout, + connection_state, grpc_schedule_on_exec_ctx); + grpc_timer_init(&connection_state->timer, connection_state->deadline, + &connection_state->on_timeout); } } grpc_handshake_manager_pending_list_remove( @@ -100,9 +155,9 @@ static void on_handshake_done(void* arg, grpc_error* error) { connection_state->handshake_mgr); gpr_mu_unlock(&connection_state->svr_state->mu); grpc_handshake_manager_destroy(connection_state->handshake_mgr); - grpc_tcp_server_unref(connection_state->svr_state->tcp_server); gpr_free(connection_state->acceptor); - gpr_free(connection_state); + grpc_tcp_server_unref(connection_state->svr_state->tcp_server); + server_connection_state_unref(connection_state); } static void on_accept(void* arg, grpc_endpoint* tcp, @@ -123,20 +178,24 @@ static void on_accept(void* arg, grpc_endpoint* tcp, gpr_mu_unlock(&state->mu); grpc_tcp_server_ref(state->tcp_server); server_connection_state* connection_state = - (server_connection_state*)gpr_malloc(sizeof(*connection_state)); + (server_connection_state*)gpr_zalloc(sizeof(*connection_state)); + gpr_ref_init(&connection_state->refs, 1); connection_state->svr_state = state; connection_state->accepting_pollset = accepting_pollset; connection_state->acceptor = acceptor; connection_state->handshake_mgr = handshake_mgr; grpc_handshakers_add(HANDSHAKER_SERVER, state->args, connection_state->handshake_mgr); - // TODO(roth): We should really get this timeout value from channel - // args instead of hard-coding it. - const grpc_millis deadline = - grpc_core::ExecCtx::Get()->Now() + 120 * GPR_MS_PER_SEC; - grpc_handshake_manager_do_handshake(connection_state->handshake_mgr, tcp, - state->args, deadline, acceptor, - on_handshake_done, connection_state); + const grpc_arg* timeout_arg = + grpc_channel_args_find(state->args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS); + connection_state->deadline = + grpc_core::ExecCtx::Get()->Now() + + grpc_channel_arg_get_integer(timeout_arg, + {120 * GPR_MS_PER_SEC, 1, INT_MAX}); + grpc_handshake_manager_do_handshake( + connection_state->handshake_mgr, nullptr /* interested_parties */, tcp, + state->args, connection_state->deadline, acceptor, on_handshake_done, + connection_state); } /* Server callback: start listening on our ports */ diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index 210939e6af..dafd4af6ce 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -49,7 +49,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, const grpc_channel_args* server_args = grpc_server_get_channel_args(server); grpc_transport* transport = grpc_create_chttp2_transport( - server_args, server_endpoint, 0 /* is_client */); + server_args, server_endpoint, false /* is_client */); grpc_pollset** pollsets; size_t num_pollsets = 0; @@ -60,7 +60,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, } grpc_server_setup_transport(server, transport, nullptr, server_args); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } #else // !GPR_SUPPORT_CHANNELS_FROM_FD diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 1380b3756d..f537fb09c2 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -615,6 +615,10 @@ static void close_transport_locked(grpc_chttp2_transport* t, GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error)); } + if (t->notify_on_receive_settings != nullptr) { + GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, GRPC_ERROR_CANCELLED); + t->notify_on_receive_settings = nullptr; + } GRPC_ERROR_UNREF(error); } @@ -1702,7 +1706,6 @@ static void perform_transport_op_locked(void* stream_op, grpc_transport_op* op = (grpc_transport_op*)stream_op; grpc_chttp2_transport* t = (grpc_chttp2_transport*)op->handler_private.extra_arg; - grpc_error* close_transport = op->disconnect_with_error; if (op->goaway_error) { send_goaway(t, op->goaway_error); @@ -1733,8 +1736,8 @@ static void perform_transport_op_locked(void* stream_op, op->on_connectivity_state_change); } - if (close_transport != GRPC_ERROR_NONE) { - close_transport_locked(t, close_transport); + if (op->disconnect_with_error != GRPC_ERROR_NONE) { + close_transport_locked(t, op->disconnect_with_error); } GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE); @@ -3079,15 +3082,16 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), static const grpc_transport_vtable* get_vtable(void) { return &vtable; } grpc_transport* grpc_create_chttp2_transport( - const grpc_channel_args* channel_args, grpc_endpoint* ep, int is_client) { + const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)gpr_zalloc(sizeof(grpc_chttp2_transport)); - init_transport(t, channel_args, ep, is_client != 0); + init_transport(t, channel_args, ep, is_client); return &t->base; } -void grpc_chttp2_transport_start_reading(grpc_transport* transport, - grpc_slice_buffer* read_buffer) { +void grpc_chttp2_transport_start_reading( + grpc_transport* transport, grpc_slice_buffer* read_buffer, + grpc_closure* notify_on_receive_settings) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)transport; GRPC_CHTTP2_REF_TRANSPORT( t, "reading_action"); /* matches unref inside reading_action */ @@ -3095,5 +3099,6 @@ void grpc_chttp2_transport_start_reading(grpc_transport* transport, grpc_slice_buffer_move_into(read_buffer, &t->read_buffer); gpr_free(read_buffer); } + t->notify_on_receive_settings = notify_on_receive_settings; GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h index 8fd10c803d..596ababb19 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h @@ -28,11 +28,14 @@ extern grpc_core::TraceFlag grpc_trace_http2_stream_state; extern grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount; grpc_transport* grpc_create_chttp2_transport( - const grpc_channel_args* channel_args, grpc_endpoint* ep, int is_client); + const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client); /// Takes ownership of \a read_buffer, which (if non-NULL) contains /// leftover bytes previously read from the endpoint (e.g., by handshakers). -void grpc_chttp2_transport_start_reading(grpc_transport* transport, - grpc_slice_buffer* read_buffer); +/// If non-null, \a notify_on_receive_settings will be scheduled when +/// HTTP/2 settings are received from the peer. +void grpc_chttp2_transport_start_reading( + grpc_transport* transport, grpc_slice_buffer* read_buffer, + grpc_closure* notify_on_receive_settings); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc index d4f84f352d..c6c2a6c301 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.cc +++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc @@ -130,6 +130,11 @@ grpc_error* grpc_chttp2_settings_parser_parse(void* p, grpc_chttp2_transport* t, memcpy(parser->target_settings, parser->incoming_settings, GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t)); grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); + if (t->notify_on_receive_settings != nullptr) { + GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, + GRPC_ERROR_NONE); + t->notify_on_receive_settings = nullptr; + } } return GRPC_ERROR_NONE; } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 3653ec6f56..932f5ba83d 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -241,6 +241,8 @@ struct grpc_chttp2_transport { grpc_combiner* combiner; + grpc_closure* notify_on_receive_settings; + /** write execution state of the transport */ grpc_chttp2_write_state write_state; /** is this the first write in a series of writes? diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index d816a5d1a7..8dd0b7dce2 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -447,6 +447,14 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { } else { err = GRPC_ERROR_REF(error); } + if (s->recv_initial_md_op->payload->recv_initial_metadata + .trailing_metadata_available != nullptr) { + // Set to true unconditionally, because we're failing the call, so even + // if we haven't actually seen the send_trailing_metadata op from the + // other side, we're going to return trailing metadata anyway. + *s->recv_initial_md_op->payload->recv_initial_metadata + .trailing_metadata_available = true; + } INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling initial-metadata-ready %p %p", s, error, err); @@ -655,6 +663,12 @@ static void op_state_machine(void* arg, grpc_error* error) { nullptr); s->recv_initial_md_op->payload->recv_initial_metadata .recv_initial_metadata->deadline = s->deadline; + if (s->recv_initial_md_op->payload->recv_initial_metadata + .trailing_metadata_available != nullptr) { + *s->recv_initial_md_op->payload->recv_initial_metadata + .trailing_metadata_available = + (other != nullptr && other->send_trailing_md_op != nullptr); + } grpc_metadata_batch_clear(&s->to_read_initial_md); s->to_read_initial_md_filled = false; INPROC_LOG(GPR_DEBUG, @@ -974,6 +988,15 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, if (error != GRPC_ERROR_NONE) { // Schedule op's closures that we didn't push to op state machine if (op->recv_initial_metadata) { + if (op->payload->recv_initial_metadata.trailing_metadata_available != + nullptr) { + // Set to true unconditionally, because we're failing the call, so + // even if we haven't actually seen the send_trailing_metadata op + // from the other side, we're going to return trailing metadata + // anyway. + *op->payload->recv_initial_metadata.trailing_metadata_available = + true; + } INPROC_LOG( GPR_DEBUG, "perform_stream_op error %p scheduling initial-metadata-ready %p", diff --git a/src/core/lib/channel/handshaker.cc b/src/core/lib/channel/handshaker.cc index 1f2426bc6c..dcb149c03e 100644 --- a/src/core/lib/channel/handshaker.cc +++ b/src/core/lib/channel/handshaker.cc @@ -219,18 +219,17 @@ static void on_timeout(void* arg, grpc_error* error) { grpc_handshake_manager_unref(mgr); } -void grpc_handshake_manager_do_handshake(grpc_handshake_manager* mgr, - grpc_endpoint* endpoint, - const grpc_channel_args* channel_args, - grpc_millis deadline, - grpc_tcp_server_acceptor* acceptor, - grpc_iomgr_cb_func on_handshake_done, - void* user_data) { +void grpc_handshake_manager_do_handshake( + grpc_handshake_manager* mgr, grpc_pollset_set* interested_parties, + grpc_endpoint* endpoint, const grpc_channel_args* channel_args, + grpc_millis deadline, grpc_tcp_server_acceptor* acceptor, + grpc_iomgr_cb_func on_handshake_done, void* user_data) { gpr_mu_lock(&mgr->mu); GPR_ASSERT(mgr->index == 0); GPR_ASSERT(!mgr->shutdown); // Construct handshaker args. These will be passed through all // handshakers and eventually be freed by the on_handshake_done callback. + mgr->args.interested_parties = interested_parties; mgr->args.endpoint = endpoint; mgr->args.args = grpc_channel_args_copy(channel_args); mgr->args.user_data = user_data; diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index 50038e6983..68e5463123 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -54,6 +54,7 @@ typedef struct grpc_handshaker grpc_handshaker; /// For the on_handshake_done callback, all members are input arguments, /// which the callback takes ownership of. typedef struct { + grpc_pollset_set* interested_parties; grpc_endpoint* endpoint; grpc_channel_args* args; grpc_slice_buffer* read_buffer; @@ -125,24 +126,24 @@ void grpc_handshake_manager_shutdown(grpc_handshake_manager* mgr, grpc_error* why); /// Invokes handshakers in the order they were added. +/// \a interested_parties may be non-nullptr to provide a pollset_set that +/// may be used during handshaking. Ownership is not taken. /// Takes ownership of \a endpoint, and then passes that ownership to /// the \a on_handshake_done callback. /// Does NOT take ownership of \a channel_args. Instead, makes a copy before /// invoking the first handshaker. -/// \a acceptor will be NULL for client-side handshakers. +/// \a acceptor will be nullptr for client-side handshakers. /// /// When done, invokes \a on_handshake_done with a grpc_handshaker_args /// object as its argument. If the callback is invoked with error != /// GRPC_ERROR_NONE, then handshaking failed and the handshaker has done /// the necessary clean-up. Otherwise, the callback takes ownership of /// the arguments. -void grpc_handshake_manager_do_handshake(grpc_handshake_manager* mgr, - grpc_endpoint* endpoint, - const grpc_channel_args* channel_args, - grpc_millis deadline, - grpc_tcp_server_acceptor* acceptor, - grpc_iomgr_cb_func on_handshake_done, - void* user_data); +void grpc_handshake_manager_do_handshake( + grpc_handshake_manager* mgr, grpc_pollset_set* interested_parties, + grpc_endpoint* endpoint, const grpc_channel_args* channel_args, + grpc_millis deadline, grpc_tcp_server_acceptor* acceptor, + grpc_iomgr_cb_func on_handshake_done, void* user_data); /// Add \a mgr to the server side list of all pending handshake managers, the /// list starts with \a *head. diff --git a/src/core/lib/http/httpcli_security_connector.cc b/src/core/lib/http/httpcli_security_connector.cc index 26599bc50a..bfb536a921 100644 --- a/src/core/lib/http/httpcli_security_connector.cc +++ b/src/core/lib/http/httpcli_security_connector.cc @@ -184,8 +184,9 @@ static void ssl_handshake(void* arg, grpc_endpoint* tcp, const char* host, c->handshake_mgr = grpc_handshake_manager_create(); grpc_handshakers_add(HANDSHAKER_CLIENT, &args, c->handshake_mgr); grpc_handshake_manager_do_handshake( - c->handshake_mgr, tcp, nullptr /* channel_args */, deadline, - nullptr /* acceptor */, on_handshake_done, c /* user_data */); + c->handshake_mgr, nullptr /* interested_parties */, tcp, + nullptr /* channel_args */, deadline, nullptr /* acceptor */, + on_handshake_done, c /* user_data */); GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli"); } diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index 72443cc29e..5139760634 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -55,7 +55,7 @@ static void init_max_accept_queue_size(void) { if (fgets(buf, sizeof buf, fp)) { char* end; long i = strtol(buf, &end, 10); - if (i > 0 && i <= INT_MAX && end && *end == 0) { + if (i > 0 && i <= INT_MAX && end && *end == '\n') { n = (int)i; } } diff --git a/src/core/lib/iomgr/tcp_server_uv.cc b/src/core/lib/iomgr/tcp_server_uv.cc index 55f16126b6..1ac49190fb 100644 --- a/src/core/lib/iomgr/tcp_server_uv.cc +++ b/src/core/lib/iomgr/tcp_server_uv.cc @@ -250,15 +250,36 @@ static void on_connect(uv_stream_t* server, int status) { } } -static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle, - const grpc_resolved_address* addr, - unsigned port_index, - grpc_tcp_listener** listener) { +static grpc_error* add_addr_to_server(grpc_tcp_server* s, + const grpc_resolved_address* addr, + unsigned port_index, + grpc_tcp_listener** listener) { grpc_tcp_listener* sp = NULL; int port = -1; int status; grpc_error* error; grpc_resolved_address sockname_temp; + uv_tcp_t* handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t)); + int family = grpc_sockaddr_get_family(addr); + + status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family); +#if defined(GPR_LINUX) && defined(SO_REUSEPORT) + if (family == AF_INET || family == AF_INET6) { + int fd; + uv_fileno((uv_handle_t*)handle, &fd); + int enable = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)); + } +#endif /* GPR_LINUX && SO_REUSEPORT */ + + if (status != 0) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to initialize UV tcp handle"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); + return error; + } // The last argument to uv_tcp_bind is flags status = uv_tcp_bind(handle, (struct sockaddr*)addr->addr, 0); @@ -315,20 +336,48 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle, return GRPC_ERROR_NONE; } +static grpc_error* add_wildcard_addrs_to_server(grpc_tcp_server* s, + unsigned port_index, + int requested_port, + grpc_tcp_listener** listener) { + grpc_resolved_address wild4; + grpc_resolved_address wild6; + grpc_tcp_listener* sp = nullptr; + grpc_tcp_listener* sp2 = nullptr; + grpc_error* v6_err = GRPC_ERROR_NONE; + grpc_error* v4_err = GRPC_ERROR_NONE; + + grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6); + /* Try listening on IPv6 first. */ + if ((v6_err = add_addr_to_server(s, &wild6, port_index, &sp)) == + GRPC_ERROR_NONE) { + *listener = sp; + return GRPC_ERROR_NONE; + } + + if ((v4_err = add_addr_to_server(s, &wild4, port_index, &sp2)) == + GRPC_ERROR_NONE) { + *listener = sp2; + return GRPC_ERROR_NONE; + } + + grpc_error* root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to add any wildcard listeners"); + root_err = grpc_error_add_child(root_err, v6_err); + root_err = grpc_error_add_child(root_err, v4_err); + return root_err; +} + grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s, const grpc_resolved_address* addr, int* port) { // This function is mostly copied from tcp_server_windows.c grpc_tcp_listener* sp = NULL; - uv_tcp_t* handle; grpc_resolved_address addr6_v4mapped; - grpc_resolved_address wildcard; grpc_resolved_address* allocated_addr = NULL; grpc_resolved_address sockname_temp; unsigned port_index = 0; - int status; grpc_error* error = GRPC_ERROR_NONE; - int family; GRPC_UV_ASSERT_SAME_THREAD(); @@ -357,38 +406,15 @@ grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s, } } - if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { - addr = &addr6_v4mapped; - } - /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ if (grpc_sockaddr_is_wildcard(addr, port)) { - grpc_sockaddr_make_wildcard6(*port, &wildcard); - - addr = &wildcard; - } - - handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t)); - - family = grpc_sockaddr_get_family(addr); - status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family); -#if defined(GPR_LINUX) && defined(SO_REUSEPORT) - if (family == AF_INET || family == AF_INET6) { - int fd; - uv_fileno((uv_handle_t*)handle, &fd); - int enable = 1; - setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)); - } -#endif /* GPR_LINUX && SO_REUSEPORT */ - - if (status == 0) { - error = add_socket_to_server(s, handle, addr, port_index, &sp); + error = add_wildcard_addrs_to_server(s, port_index, *port, &sp); } else { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Failed to initialize UV tcp handle"); - error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, - grpc_slice_from_static_string(uv_strerror(status))); + if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { + addr = &addr6_v4mapped; + } + + error = add_addr_to_server(s, addr, port_index, &sp); } gpr_free(allocated_addr); diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index d4de04a894..55e0b165ec 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -47,6 +47,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -71,14 +72,22 @@ struct grpc_udp_listener { grpc_udp_server_read_cb read_cb; grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; + // To be scheduled on another thread to actually read/write. + grpc_closure do_read_closure; + grpc_closure do_write_closure; + grpc_closure notify_on_write_closure; // True if orphan_cb is trigered. bool orphan_notified; + // True if grpc_fd_notify_on_write() is called after on_write() call. + bool notify_on_write_armed; + // True if fd has been shutdown. + bool already_shutdown; struct grpc_udp_listener* next; }; struct shutdown_fd_args { - grpc_fd* fd; + grpc_udp_listener* sp; gpr_mu* server_mu; }; @@ -143,8 +152,17 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { static void shutdown_fd(void* args, grpc_error* error) { struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args; + grpc_udp_listener* sp = shutdown_args->sp; + gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd); gpr_mu_lock(shutdown_args->server_mu); - grpc_fd_shutdown(shutdown_args->fd, GRPC_ERROR_REF(error)); + grpc_fd_shutdown(sp->emfd, GRPC_ERROR_REF(error)); + sp->already_shutdown = true; + if (!sp->notify_on_write_armed) { + // Re-arm write notification to notify listener with error. This is + // necessary to decrement active_ports. + sp->notify_on_write_armed = true; + grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); + } gpr_mu_unlock(shutdown_args->server_mu); gpr_free(shutdown_args); } @@ -160,6 +178,7 @@ static void finish_shutdown(grpc_udp_server* s) { gpr_mu_destroy(&s->mu); + gpr_log(GPR_DEBUG, "Destroy all listeners."); while (s->head) { grpc_udp_listener* sp = s->head; s->head = sp->next; @@ -205,9 +224,10 @@ static void deactivated_all_ports(grpc_udp_server* s) { /* Call the orphan_cb to signal that the FD is about to be closed and * should no longer be used. Because at this point, all listening ports * have been shutdown already, no need to shutdown again.*/ - GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp->emfd, + GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp, grpc_schedule_on_exec_ctx); GPR_ASSERT(sp->orphan_cb); + gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd); sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); } grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr, @@ -229,13 +249,14 @@ void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { s->shutdown_complete = on_done; + gpr_log(GPR_DEBUG, "start to destroy udp_server"); /* shutdown all fd's */ if (s->active_ports) { for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); struct shutdown_fd_args* args = (struct shutdown_fd_args*)gpr_malloc(sizeof(*args)); - args->fd = sp->emfd; + args->sp = sp; args->server_mu = &s->mu; GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args, grpc_schedule_on_exec_ctx); @@ -324,6 +345,27 @@ error: return -1; } +static void do_read(void* arg, grpc_error* error) { + grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); + GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE); + /* TODO: the reason we hold server->mu here is merely to prevent fd + * shutdown while we are reading. However, it blocks do_write(). Switch to + * read lock if available. */ + gpr_mu_lock(&sp->server->mu); + /* Tell the registered callback that data is available to read. */ + if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) { + /* There maybe more packets to read. Schedule read_more_cb_ closure to run + * after finishing this event loop. */ + GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); + } else { + /* Finish reading all the packets, re-arm the notification event so we can + * get another chance to read. Or fd already shutdown, re-arm to get a + * notification with shutdown error. */ + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + } + gpr_mu_unlock(&sp->server->mu); +} + /* event manager callback when reads are ready */ static void on_read(void* arg, grpc_error* error) { grpc_udp_listener* sp = (grpc_udp_listener*)arg; @@ -338,13 +380,49 @@ static void on_read(void* arg, grpc_error* error) { } return; } - - /* Tell the registered callback that data is available to read. */ + /* Read once. If there is more data to read, off load the work to another + * thread to finish. */ GPR_ASSERT(sp->read_cb); - sp->read_cb(sp->emfd, sp->server->user_data); + if (sp->read_cb(sp->emfd, sp->server->user_data)) { + /* There maybe more packets to read. Schedule read_more_cb_ closure to run + * after finishing this event loop. */ + GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, + grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); + GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); + } else { + /* Finish reading all the packets, re-arm the notification event so we can + * get another chance to read. Or fd already shutdown, re-arm to get a + * notification with shutdown error. */ + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + } + gpr_mu_unlock(&sp->server->mu); +} + +// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface. +void fd_notify_on_write_wrapper(void* arg, grpc_error* error) { + grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); + gpr_mu_lock(&sp->server->mu); + if (!sp->notify_on_write_armed) { + grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); + sp->notify_on_write_armed = true; + } + gpr_mu_unlock(&sp->server->mu); +} - /* Re-arm the notification event so we get another chance to read. */ - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); +static void do_write(void* arg, grpc_error* error) { + grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); + gpr_mu_lock(&(sp->server->mu)); + if (sp->already_shutdown) { + // If fd has been shutdown, don't write any more and re-arm notification. + grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); + } else { + sp->notify_on_write_armed = false; + /* Tell the registered callback that the socket is writeable. */ + GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE); + GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper, + arg, grpc_schedule_on_exec_ctx); + sp->write_cb(sp->emfd, sp->server->user_data, &sp->notify_on_write_closure); + } gpr_mu_unlock(&sp->server->mu); } @@ -362,12 +440,11 @@ static void on_write(void* arg, grpc_error* error) { return; } - /* Tell the registered callback that the socket is writeable. */ - GPR_ASSERT(sp->write_cb); - sp->write_cb(sp->emfd, sp->server->user_data); + /* Schedule actual write in another thread. */ + GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg, + grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); - /* Re-arm the notification event so we get another chance to write. */ - grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); + GRPC_CLOSURE_SCHED(&sp->do_write_closure, GRPC_ERROR_NONE); gpr_mu_unlock(&sp->server->mu); } @@ -404,6 +481,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; sp->orphan_notified = false; + sp->already_shutdown = false; GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); gpr_free(name); @@ -527,6 +605,7 @@ void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp, grpc_schedule_on_exec_ctx); + sp->notify_on_write_armed = true; grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); /* Registered for both read and write callbacks: increment active_ports diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index d9786be27c..02e3acb7f5 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -30,11 +30,14 @@ struct grpc_server; /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; -/* Called when data is available to read from the socket. */ -typedef void (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data); +/* Called when data is available to read from the socket. + * Return true if there is more data to read from fd. */ +typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data); -/* Called when the socket is writeable. */ -typedef void (*grpc_udp_server_write_cb)(grpc_fd* emfd, void* user_data); +/* Called when the socket is writeable. The given closure should be scheduled + * when the socket becomes blocked next time. */ +typedef void (*grpc_udp_server_write_cb)(grpc_fd* emfd, void* user_data, + grpc_closure* notify_on_write_closure); /* Called when the grpc_fd is about to be orphaned (and the FD closed). */ typedef void (*grpc_udp_server_orphan_cb)(grpc_fd* emfd, diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index ec57a47005..b03c0218dc 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -321,9 +321,6 @@ void grpc_transport_ping(grpc_transport* transport, grpc_closure* cb); void grpc_transport_goaway(grpc_transport* transport, grpc_status_code status, grpc_slice debug_data); -/* Close a transport. Aborts all open streams. */ -void grpc_transport_close(grpc_transport* transport); - /* Destroy the transport */ void grpc_transport_destroy(grpc_transport* transport); diff --git a/src/csharp/generate_proto_csharp.sh b/src/csharp/generate_proto_csharp.sh index 1a1adbbae5..299dc3f816 100755 --- a/src/csharp/generate_proto_csharp.sh +++ b/src/csharp/generate_proto_csharp.sh @@ -33,6 +33,11 @@ $PROTOC --plugin=$PLUGIN --csharp_out=$HEALTHCHECK_DIR --grpc_out=$HEALTHCHECK_D $PROTOC --plugin=$PLUGIN --csharp_out=$REFLECTION_DIR --grpc_out=$REFLECTION_DIR \ -I src/proto src/proto/grpc/reflection/v1alpha/reflection.proto +# Put grp/core/stats.proto in a subdirectory to avoid collision with grpc/testing/stats.proto +mkdir -p $TESTING_DIR/CoreStats +$PROTOC --plugin=$PLUGIN --csharp_out=$TESTING_DIR/CoreStats --grpc_out=$TESTING_DIR/CoreStats \ + -I src/proto src/proto/grpc/core/stats.proto + # TODO(jtattermusch): following .proto files are a bit broken and import paths # don't match the package names. Setting -I to the correct value src/proto # breaks the code generation. diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 558ce42129..564772527e 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -348,26 +348,25 @@ class Call(six.with_metaclass(abc.ABCMeta, RpcContext)): class ChannelCredentials(object): """An encapsulation of the data required to create a secure Channel. - This class has no supported interface - it exists to define the type of its - instances and its instances exist to be passed to other functions. For - example, ssl_channel_credentials returns an instance, and secure_channel - consumes an instance of this class. - """ + This class has no supported interface - it exists to define the type of its + instances and its instances exist to be passed to other functions. For + example, ssl_channel_credentials returns an instance of this class and + secure_channel requires an instance of this class. + """ def __init__(self, credentials): self._credentials = credentials class CallCredentials(object): - """An encapsulation of the data required to assert an identity over a - channel. + """An encapsulation of the data required to assert an identity over a call. - A CallCredentials may be composed with ChannelCredentials to always assert - identity for every call over that Channel. + A CallCredentials may be composed with ChannelCredentials to always assert + identity for every call over that Channel. - This class has no supported interface - it exists to define the type of its - instances and its instances exist to be passed to other functions. - """ + This class has no supported interface - it exists to define the type of its + instances and its instances exist to be passed to other functions. + """ def __init__(self, credentials): self._credentials = credentials @@ -376,23 +375,22 @@ class CallCredentials(object): class AuthMetadataContext(six.with_metaclass(abc.ABCMeta)): """Provides information to call credentials metadata plugins. - Attributes: - service_url: A string URL of the service being called into. - method_name: A string of the fully qualified method name being called. - """ + Attributes: + service_url: A string URL of the service being called into. + method_name: A string of the fully qualified method name being called. + """ class AuthMetadataPluginCallback(six.with_metaclass(abc.ABCMeta)): """Callback object received by a metadata plugin.""" def __call__(self, metadata, error): - """Inform the gRPC runtime of the metadata to construct a - CallCredentials. + """Passes to the gRPC runtime authentication metadata for an RPC. - Args: - metadata: The :term:`metadata` used to construct the CallCredentials. - error: An Exception to indicate error or None to indicate success. - """ + Args: + metadata: The :term:`metadata` used to construct the CallCredentials. + error: An Exception to indicate error or None to indicate success. + """ raise NotImplementedError() @@ -402,14 +400,14 @@ class AuthMetadataPlugin(six.with_metaclass(abc.ABCMeta)): def __call__(self, context, callback): """Implements authentication by passing metadata to a callback. - Implementations of this method must not block. + Implementations of this method must not block. - Args: - context: An AuthMetadataContext providing information on the RPC that the - plugin is being called to authenticate. - callback: An AuthMetadataPluginCallback to be invoked either synchronously - or asynchronously. - """ + Args: + context: An AuthMetadataContext providing information on the RPC that + the plugin is being called to authenticate. + callback: An AuthMetadataPluginCallback to be invoked either + synchronously or asynchronously. + """ raise NotImplementedError() @@ -1138,99 +1136,86 @@ def ssl_channel_credentials(root_certificates=None, certificate_chain=None): """Creates a ChannelCredentials for use with an SSL-enabled Channel. - Args: - root_certificates: The PEM-encoded root certificates as a byte string, - or None to retrieve them from a default location chosen by gRPC runtime. - private_key: The PEM-encoded private key as a byte string, or None if no - private key should be used. - certificate_chain: The PEM-encoded certificate chain as a byte string - to use or or None if no certificate chain should be used. + Args: + root_certificates: The PEM-encoded root certificates as a byte string, + or None to retrieve them from a default location chosen by gRPC + runtime. + private_key: The PEM-encoded private key as a byte string, or None if no + private key should be used. + certificate_chain: The PEM-encoded certificate chain as a byte string + to use or or None if no certificate chain should be used. - Returns: - A ChannelCredentials for use with an SSL-enabled Channel. - """ - if private_key is not None or certificate_chain is not None: - pair = _cygrpc.SslPemKeyCertPair(private_key, certificate_chain) - else: - pair = None + Returns: + A ChannelCredentials for use with an SSL-enabled Channel. + """ return ChannelCredentials( - _cygrpc.channel_credentials_ssl(root_certificates, pair)) + _cygrpc.SSLChannelCredentials(root_certificates, private_key, + certificate_chain)) def metadata_call_credentials(metadata_plugin, name=None): """Construct CallCredentials from an AuthMetadataPlugin. - Args: - metadata_plugin: An AuthMetadataPlugin to use for authentication. - name: An optional name for the plugin. + Args: + metadata_plugin: An AuthMetadataPlugin to use for authentication. + name: An optional name for the plugin. - Returns: - A CallCredentials. - """ + Returns: + A CallCredentials. + """ from grpc import _plugin_wrapping # pylint: disable=cyclic-import - if name is None: - try: - effective_name = metadata_plugin.__name__ - except AttributeError: - effective_name = metadata_plugin.__class__.__name__ - else: - effective_name = name - return CallCredentials( - _plugin_wrapping.call_credentials_metadata_plugin(metadata_plugin, - effective_name)) + return _plugin_wrapping.metadata_plugin_call_credentials(metadata_plugin, + name) def access_token_call_credentials(access_token): """Construct CallCredentials from an access token. - Args: - access_token: A string to place directly in the http request - authorization header, for example - "authorization: Bearer <access_token>". + Args: + access_token: A string to place directly in the http request + authorization header, for example + "authorization: Bearer <access_token>". - Returns: - A CallCredentials. - """ + Returns: + A CallCredentials. + """ from grpc import _auth # pylint: disable=cyclic-import - return metadata_call_credentials( - _auth.AccessTokenCallCredentials(access_token)) + from grpc import _plugin_wrapping # pylint: disable=cyclic-import + return _plugin_wrapping.metadata_plugin_call_credentials( + _auth.AccessTokenAuthMetadataPlugin(access_token), None) def composite_call_credentials(*call_credentials): """Compose multiple CallCredentials to make a new CallCredentials. - Args: - *call_credentials: At least two CallCredentials objects. + Args: + *call_credentials: At least two CallCredentials objects. - Returns: - A CallCredentials object composed of the given CallCredentials objects. - """ - from grpc import _credential_composition # pylint: disable=cyclic-import - cygrpc_call_credentials = tuple( - single_call_credentials._credentials - for single_call_credentials in call_credentials) + Returns: + A CallCredentials object composed of the given CallCredentials objects. + """ return CallCredentials( - _credential_composition.call(cygrpc_call_credentials)) + _cygrpc.CompositeCallCredentials( + tuple(single_call_credentials._credentials + for single_call_credentials in call_credentials))) def composite_channel_credentials(channel_credentials, *call_credentials): """Compose a ChannelCredentials and one or more CallCredentials objects. - Args: - channel_credentials: A ChannelCredentials object. - *call_credentials: One or more CallCredentials objects. + Args: + channel_credentials: A ChannelCredentials object. + *call_credentials: One or more CallCredentials objects. - Returns: - A ChannelCredentials composed of the given ChannelCredentials and - CallCredentials objects. - """ - from grpc import _credential_composition # pylint: disable=cyclic-import - cygrpc_call_credentials = tuple( - single_call_credentials._credentials - for single_call_credentials in call_credentials) + Returns: + A ChannelCredentials composed of the given ChannelCredentials and + CallCredentials objects. + """ return ChannelCredentials( - _credential_composition.channel(channel_credentials._credentials, - cygrpc_call_credentials)) + _cygrpc.CompositeChannelCredentials( + tuple(single_call_credentials._credentials + for single_call_credentials in call_credentials), + channel_credentials._credentials)) def ssl_server_credentials(private_key_certificate_chain_pairs, diff --git a/src/python/grpcio/grpc/_auth.py b/src/python/grpcio/grpc/_auth.py index c6542d0135..9a339b5900 100644 --- a/src/python/grpcio/grpc/_auth.py +++ b/src/python/grpcio/grpc/_auth.py @@ -63,7 +63,7 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin): self._pool.shutdown(wait=False) -class AccessTokenCallCredentials(grpc.AuthMetadataPlugin): +class AccessTokenAuthMetadataPlugin(grpc.AuthMetadataPlugin): """Metadata wrapper for raw access token credentials.""" def __init__(self, access_token): diff --git a/src/python/grpcio/grpc/_credential_composition.py b/src/python/grpcio/grpc/_credential_composition.py deleted file mode 100644 index f652cc3ae7..0000000000 --- a/src/python/grpcio/grpc/_credential_composition.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright 2016 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from grpc._cython import cygrpc - - -def _call(call_credentialses): - call_credentials_iterator = iter(call_credentialses) - composition = next(call_credentials_iterator) - for additional_call_credentials in call_credentials_iterator: - composition = cygrpc.call_credentials_composite( - composition, additional_call_credentials) - return composition - - -def call(call_credentialses): - return _call(call_credentialses) - - -def channel(channel_credentials, call_credentialses): - return cygrpc.channel_credentials_composite(channel_credentials, - _call(call_credentialses)) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 752fb330d0..6b3a276097 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -72,13 +72,12 @@ cdef class Call: result = grpc_call_cancel(self.c_call, NULL) return result - def set_credentials( - self, CallCredentials call_credentials not None): - cdef grpc_call_error result - with nogil: - result = grpc_call_set_credentials( - self.c_call, call_credentials.c_credentials) - return result + def set_credentials(self, CallCredentials call_credentials not None): + cdef grpc_call_credentials *c_call_credentials = call_credentials.c() + cdef grpc_call_error call_error = grpc_call_set_credentials( + self.c_call, c_call_credentials) + grpc_call_credentials_release(c_call_credentials) + return call_error def peer(self): cdef char *peer = NULL diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index aeabdba021..4c397f8f64 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -33,10 +33,10 @@ cdef class Channel: self.c_channel = grpc_insecure_channel_create(c_target, c_arguments, NULL) else: - with nogil: - self.c_channel = grpc_secure_channel_create( - channel_credentials.c_credentials, c_target, c_arguments, NULL) - self.references.append(channel_credentials) + c_channel_credentials = channel_credentials.c() + self.c_channel = grpc_secure_channel_create( + c_channel_credentials, c_target, c_arguments, NULL) + grpc_channel_credentials_release(c_channel_credentials) self.references.append(target) self.references.append(arguments) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi index bc0f185c77..7e9ea33ca0 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi @@ -12,20 +12,66 @@ # See the License for the specific language governing permissions and # limitations under the License. -cimport cpython + +cdef class CallCredentials: + + cdef grpc_call_credentials *c(self) + + # TODO(https://github.com/grpc/grpc/issues/12531): remove. + cdef grpc_call_credentials *c_credentials + + +cdef int _get_metadata( + void *state, grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) with gil + +cdef void _destroy(void *state) with gil + + +cdef class MetadataPluginCallCredentials(CallCredentials): + + cdef readonly object _metadata_plugin + cdef readonly bytes _name + + cdef grpc_call_credentials *c(self) + + +cdef grpc_call_credentials *_composition(call_credentialses) + + +cdef class CompositeCallCredentials(CallCredentials): + + cdef readonly tuple _call_credentialses + + cdef grpc_call_credentials *c(self) cdef class ChannelCredentials: + cdef grpc_channel_credentials *c(self) + + # TODO(https://github.com/grpc/grpc/issues/12531): remove. cdef grpc_channel_credentials *c_credentials - cdef grpc_ssl_pem_key_cert_pair c_ssl_pem_key_cert_pair - cdef list references -cdef class CallCredentials: +cdef class SSLChannelCredentials(ChannelCredentials): - cdef grpc_call_credentials *c_credentials - cdef list references + cdef readonly object _pem_root_certificates + cdef readonly object _private_key + cdef readonly object _certificate_chain + + cdef grpc_channel_credentials *c(self) + + +cdef class CompositeChannelCredentials(ChannelCredentials): + + cdef readonly tuple _call_credentialses + cdef readonly ChannelCredentials _channel_credentials + + cdef grpc_channel_credentials *c(self) cdef class ServerCertificateConfig: @@ -49,27 +95,3 @@ cdef class ServerCredentials: cdef object cert_config_fetcher # whether C-core has asked for the initial_cert_config cdef bint initial_cert_config_fetched - - -cdef class CredentialsMetadataPlugin: - - cdef object plugin_callback - cdef bytes plugin_name - - -cdef grpc_metadata_credentials_plugin _c_plugin(CredentialsMetadataPlugin plugin) - - -cdef class AuthMetadataContext: - - cdef grpc_auth_metadata_context context - - -cdef int plugin_get_metadata( - void *state, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, void *user_data, - grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], - size_t *num_creds_md, grpc_status_code *status, - const char **error_details) with gil - -cdef void plugin_destroy_c_plugin_state(void *state) with gil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index ee1328d2e9..246a271893 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -16,47 +16,123 @@ cimport cpython import grpc import threading -import traceback -cdef class ChannelCredentials: +cdef class CallCredentials: - def __cinit__(self): - grpc_init() - self.c_credentials = NULL - self.c_ssl_pem_key_cert_pair.private_key = NULL - self.c_ssl_pem_key_cert_pair.certificate_chain = NULL - self.references = [] + cdef grpc_call_credentials *c(self): + raise NotImplementedError() - # The object *can* be invalid in Python if we fail to make the credentials - # (and the core thus returns NULL credentials). Used primarily for debugging. - @property - def is_valid(self): - return self.c_credentials != NULL - def __dealloc__(self): - if self.c_credentials != NULL: - grpc_channel_credentials_release(self.c_credentials) - grpc_shutdown() +cdef int _get_metadata( + void *state, grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) with gil: + def callback(Metadata metadata, grpc_status_code status, bytes error_details): + if status is StatusCode.ok: + cb(user_data, metadata.c_metadata, metadata.c_count, status, NULL) + else: + cb(user_data, NULL, 0, status, error_details) + args = context.service_url, context.method_name, callback, + threading.Thread(target=<object>state, args=args).start() + return 0 # Asynchronous return -cdef class CallCredentials: +cdef void _destroy(void *state) with gil: + cpython.Py_DECREF(<object>state) - def __cinit__(self): - grpc_init() - self.c_credentials = NULL - self.references = [] - # The object *can* be invalid in Python if we fail to make the credentials - # (and the core thus returns NULL credentials). Used primarily for debugging. - @property - def is_valid(self): - return self.c_credentials != NULL +cdef class MetadataPluginCallCredentials(CallCredentials): - def __dealloc__(self): - if self.c_credentials != NULL: - grpc_call_credentials_release(self.c_credentials) - grpc_shutdown() + def __cinit__(self, metadata_plugin, name): + self._metadata_plugin = metadata_plugin + self._name = name + + cdef grpc_call_credentials *c(self): + cdef grpc_metadata_credentials_plugin c_metadata_plugin + c_metadata_plugin.get_metadata = _get_metadata + c_metadata_plugin.destroy = _destroy + c_metadata_plugin.state = <void *>self._metadata_plugin + c_metadata_plugin.type = self._name + cpython.Py_INCREF(self._metadata_plugin) + return grpc_metadata_credentials_create_from_plugin(c_metadata_plugin, NULL) + + +cdef grpc_call_credentials *_composition(call_credentialses): + call_credentials_iterator = iter(call_credentialses) + cdef CallCredentials composition = next(call_credentials_iterator) + cdef grpc_call_credentials *c_composition = composition.c() + cdef CallCredentials additional_call_credentials + cdef grpc_call_credentials *c_additional_call_credentials + cdef grpc_call_credentials *c_next_composition + for additional_call_credentials in call_credentials_iterator: + c_additional_call_credentials = additional_call_credentials.c() + c_next_composition = grpc_composite_call_credentials_create( + c_composition, c_additional_call_credentials, NULL) + grpc_call_credentials_release(c_composition) + grpc_call_credentials_release(c_additional_call_credentials) + c_composition = c_next_composition + return c_composition + + +cdef class CompositeCallCredentials(CallCredentials): + + def __cinit__(self, call_credentialses): + self._call_credentialses = call_credentialses + + cdef grpc_call_credentials *c(self): + return _composition(self._call_credentialses) + + +cdef class ChannelCredentials: + + cdef grpc_channel_credentials *c(self): + raise NotImplementedError() + + +cdef class SSLChannelCredentials(ChannelCredentials): + + def __cinit__(self, pem_root_certificates, private_key, certificate_chain): + self._pem_root_certificates = pem_root_certificates + self._private_key = private_key + self._certificate_chain = certificate_chain + + cdef grpc_channel_credentials *c(self): + cdef const char *c_pem_root_certificates + cdef grpc_ssl_pem_key_cert_pair c_pem_key_certificate_pair + if self._pem_root_certificates is None: + c_pem_root_certificates = NULL + else: + c_pem_root_certificates = self._pem_root_certificates + if self._private_key is None and self._certificate_chain is None: + return grpc_ssl_credentials_create( + c_pem_root_certificates, NULL, NULL) + else: + c_pem_key_certificate_pair.private_key = self._private_key + c_pem_key_certificate_pair.certificate_chain = self._certificate_chain + return grpc_ssl_credentials_create( + c_pem_root_certificates, &c_pem_key_certificate_pair, NULL) + + +cdef class CompositeChannelCredentials(ChannelCredentials): + + def __cinit__(self, call_credentialses, channel_credentials): + self._call_credentialses = call_credentialses + self._channel_credentials = channel_credentials + + cdef grpc_channel_credentials *c(self): + cdef grpc_channel_credentials *c_channel_credentials + c_channel_credentials = self._channel_credentials.c() + cdef grpc_call_credentials *c_call_credentials_composition = _composition( + self._call_credentialses) + cdef grpc_channel_credentials *composition + c_composition = grpc_composite_channel_credentials_create( + c_channel_credentials, c_call_credentials_composition, NULL) + grpc_channel_credentials_release(c_channel_credentials) + grpc_call_credentials_release(c_call_credentials_composition) + return c_composition cdef class ServerCertificateConfig: @@ -89,190 +165,6 @@ cdef class ServerCredentials: grpc_server_credentials_release(self.c_credentials) grpc_shutdown() - -cdef class CredentialsMetadataPlugin: - - def __cinit__(self, object plugin_callback, bytes name): - """ - Args: - plugin_callback (callable): Callback accepting a service URL (str/bytes) - and callback object (accepting a MetadataArray, - grpc_status_code, and a str/bytes error message). This argument - when called should be non-blocking and eventually call the callback - object with the appropriate status code/details and metadata (if - successful). - name (bytes): Plugin name. - """ - grpc_init() - if not callable(plugin_callback): - raise ValueError('expected callable plugin_callback') - self.plugin_callback = plugin_callback - self.plugin_name = name - - def __dealloc__(self): - grpc_shutdown() - - -cdef grpc_metadata_credentials_plugin _c_plugin(CredentialsMetadataPlugin plugin): - cdef grpc_metadata_credentials_plugin c_plugin - c_plugin.get_metadata = plugin_get_metadata - c_plugin.destroy = plugin_destroy_c_plugin_state - c_plugin.state = <void *>plugin - c_plugin.type = plugin.plugin_name - cpython.Py_INCREF(plugin) - return c_plugin - - -cdef class AuthMetadataContext: - - def __cinit__(self): - grpc_init() - self.context.service_url = NULL - self.context.method_name = NULL - - @property - def service_url(self): - return self.context.service_url - - @property - def method_name(self): - return self.context.method_name - - def __dealloc__(self): - grpc_shutdown() - - -cdef int plugin_get_metadata( - void *state, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, void *user_data, - grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], - size_t *num_creds_md, grpc_status_code *status, - const char **error_details) with gil: - called_flag = [False] - def python_callback( - Metadata metadata, grpc_status_code status, - bytes error_details): - cb(user_data, metadata.c_metadata, metadata.c_count, status, error_details) - called_flag[0] = True - cdef CredentialsMetadataPlugin self = <CredentialsMetadataPlugin>state - cdef AuthMetadataContext cy_context = AuthMetadataContext() - cy_context.context = context - def async_callback(): - try: - self.plugin_callback(cy_context, python_callback) - except Exception as error: - if not called_flag[0]: - cb(user_data, NULL, 0, StatusCode.unknown, - traceback.format_exc().encode()) - threading.Thread(group=None, target=async_callback).start() - return 0 # Asynchronous return - -cdef void plugin_destroy_c_plugin_state(void *state) with gil: - cpython.Py_DECREF(<CredentialsMetadataPlugin>state) - -def channel_credentials_google_default(): - cdef ChannelCredentials credentials = ChannelCredentials(); - with nogil: - credentials.c_credentials = grpc_google_default_credentials_create() - return credentials - -def channel_credentials_ssl(pem_root_certificates, - SslPemKeyCertPair ssl_pem_key_cert_pair): - pem_root_certificates = str_to_bytes(pem_root_certificates) - cdef ChannelCredentials credentials = ChannelCredentials() - cdef const char *c_pem_root_certificates = NULL - if pem_root_certificates is not None: - c_pem_root_certificates = pem_root_certificates - credentials.references.append(pem_root_certificates) - if ssl_pem_key_cert_pair is not None: - with nogil: - credentials.c_credentials = grpc_ssl_credentials_create( - c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL) - credentials.references.append(ssl_pem_key_cert_pair) - else: - with nogil: - credentials.c_credentials = grpc_ssl_credentials_create( - c_pem_root_certificates, NULL, NULL) - return credentials - -def channel_credentials_composite( - ChannelCredentials credentials_1 not None, - CallCredentials credentials_2 not None): - if not credentials_1.is_valid or not credentials_2.is_valid: - raise ValueError("passed credentials must both be valid") - cdef ChannelCredentials credentials = ChannelCredentials() - with nogil: - credentials.c_credentials = grpc_composite_channel_credentials_create( - credentials_1.c_credentials, credentials_2.c_credentials, NULL) - credentials.references.append(credentials_1) - credentials.references.append(credentials_2) - return credentials - -def call_credentials_composite( - CallCredentials credentials_1 not None, - CallCredentials credentials_2 not None): - if not credentials_1.is_valid or not credentials_2.is_valid: - raise ValueError("passed credentials must both be valid") - cdef CallCredentials credentials = CallCredentials() - with nogil: - credentials.c_credentials = grpc_composite_call_credentials_create( - credentials_1.c_credentials, credentials_2.c_credentials, NULL) - credentials.references.append(credentials_1) - credentials.references.append(credentials_2) - return credentials - -def call_credentials_google_compute_engine(): - cdef CallCredentials credentials = CallCredentials() - with nogil: - credentials.c_credentials = ( - grpc_google_compute_engine_credentials_create(NULL)) - return credentials - -def call_credentials_service_account_jwt_access( - json_key, Timespec token_lifetime not None): - json_key = str_to_bytes(json_key) - cdef CallCredentials credentials = CallCredentials() - cdef char *json_key_c_string = json_key - with nogil: - credentials.c_credentials = ( - grpc_service_account_jwt_access_credentials_create( - json_key_c_string, token_lifetime.c_time, NULL)) - credentials.references.append(json_key) - return credentials - -def call_credentials_google_refresh_token(json_refresh_token): - json_refresh_token = str_to_bytes(json_refresh_token) - cdef CallCredentials credentials = CallCredentials() - cdef char *json_refresh_token_c_string = json_refresh_token - with nogil: - credentials.c_credentials = grpc_google_refresh_token_credentials_create( - json_refresh_token_c_string, NULL) - credentials.references.append(json_refresh_token) - return credentials - -def call_credentials_google_iam(authorization_token, authority_selector): - authorization_token = str_to_bytes(authorization_token) - authority_selector = str_to_bytes(authority_selector) - cdef CallCredentials credentials = CallCredentials() - cdef char *authorization_token_c_string = authorization_token - cdef char *authority_selector_c_string = authority_selector - with nogil: - credentials.c_credentials = grpc_google_iam_credentials_create( - authorization_token_c_string, authority_selector_c_string, NULL) - credentials.references.append(authorization_token) - credentials.references.append(authority_selector) - return credentials - -def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin): - cdef CallCredentials credentials = CallCredentials() - cdef grpc_metadata_credentials_plugin c_plugin = _c_plugin(plugin) - with nogil: - credentials.c_credentials = ( - grpc_metadata_credentials_create_from_plugin(c_plugin, NULL)) - # TODO(atash): the following held reference is *probably* never necessary - credentials.references.append(plugin) - return credentials - cdef const char* _get_c_pem_root_certs(pem_root_certs): if pem_root_certs is None: return NULL diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py index bea2c0f139..cd17f4a049 100644 --- a/src/python/grpcio/grpc/_plugin_wrapping.py +++ b/src/python/grpcio/grpc/_plugin_wrapping.py @@ -13,6 +13,7 @@ # limitations under the License. import collections +import logging import threading import grpc @@ -20,89 +21,79 @@ from grpc import _common from grpc._cython import cygrpc -class AuthMetadataContext( +class _AuthMetadataContext( collections.namedtuple('AuthMetadataContext', ( 'service_url', 'method_name',)), grpc.AuthMetadataContext): pass -class AuthMetadataPluginCallback(grpc.AuthMetadataContext): +class _CallbackState(object): - def __init__(self, callback): - self._callback = callback - - def __call__(self, metadata, error): - self._callback(metadata, error) + def __init__(self): + self.lock = threading.Lock() + self.called = False + self.exception = None -class _WrappedCygrpcCallback(object): +class _AuthMetadataPluginCallback(grpc.AuthMetadataPluginCallback): - def __init__(self, cygrpc_callback): - self.is_called = False - self.error = None - self.is_called_lock = threading.Lock() - self.cygrpc_callback = cygrpc_callback - - def _invoke_failure(self, error): - # TODO(atash) translate different Exception superclasses into different - # status codes. - self.cygrpc_callback(_common.EMPTY_METADATA, cygrpc.StatusCode.internal, - _common.encode(str(error))) - - def _invoke_success(self, metadata): - try: - cygrpc_metadata = _common.to_cygrpc_metadata(metadata) - except Exception as exception: # pylint: disable=broad-except - self._invoke_failure(exception) - return - self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, b'') + def __init__(self, state, callback): + self._state = state + self._callback = callback def __call__(self, metadata, error): - with self.is_called_lock: - if self.is_called: - raise RuntimeError('callback should only ever be invoked once') - if self.error: - self._invoke_failure(self.error) - return - self.is_called = True + with self._state.lock: + if self._state.exception is None: + if self._state.called: + raise RuntimeError( + 'AuthMetadataPluginCallback invoked more than once!') + else: + self._state.called = True + else: + raise RuntimeError( + 'AuthMetadataPluginCallback raised exception "{}"!'.format( + self._state.exception)) if error is None: - self._invoke_success(metadata) + self._callback( + _common.to_cygrpc_metadata(metadata), cygrpc.StatusCode.ok, + None) else: - self._invoke_failure(error) - - def notify_failure(self, error): - with self.is_called_lock: - if not self.is_called: - self.error = error + self._callback(None, cygrpc.StatusCode.internal, + _common.encode(str(error))) -class _WrappedPlugin(object): +class _Plugin(object): - def __init__(self, plugin): - self.plugin = plugin + def __init__(self, metadata_plugin): + self._metadata_plugin = metadata_plugin - def __call__(self, context, cygrpc_callback): - wrapped_cygrpc_callback = _WrappedCygrpcCallback(cygrpc_callback) - wrapped_context = AuthMetadataContext( - _common.decode(context.service_url), - _common.decode(context.method_name)) + def __call__(self, service_url, method_name, callback): + context = _AuthMetadataContext( + _common.decode(service_url), _common.decode(method_name)) + callback_state = _CallbackState() + try: + self._metadata_plugin( + context, _AuthMetadataPluginCallback(callback_state, callback)) + except Exception as exception: # pylint: disable=broad-except + logging.exception( + 'AuthMetadataPluginCallback "%s" raised exception!', + self._metadata_plugin) + with callback_state.lock: + callback_state.exception = exception + if callback_state.called: + return + callback(None, cygrpc.StatusCode.internal, + _common.encode(str(exception))) + + +def metadata_plugin_call_credentials(metadata_plugin, name): + if name is None: try: - self.plugin(wrapped_context, - AuthMetadataPluginCallback(wrapped_cygrpc_callback)) - except Exception as error: - wrapped_cygrpc_callback.notify_failure(error) - raise - - -def call_credentials_metadata_plugin(plugin, name): - """ - Args: - plugin: A callable accepting a grpc.AuthMetadataContext - object and a callback (itself accepting a list of metadata key/value - 2-tuples and a None-able exception value). The callback must be eventually - called, but need not be called in plugin's invocation. - plugin's invocation must be non-blocking. - """ - return cygrpc.call_credentials_metadata_plugin( - cygrpc.CredentialsMetadataPlugin( - _WrappedPlugin(plugin), _common.encode(name))) + effective_name = metadata_plugin.__name__ + except AttributeError: + effective_name = metadata_plugin.__class__.__name__ + else: + effective_name = name + return grpc.CallCredentials( + cygrpc.MetadataPluginCallCredentials( + _Plugin(metadata_plugin), _common.encode(effective_name))) diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index cd59b07c04..5b4812bffe 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -374,10 +374,10 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer): context = _Context(rpc_event, state, request_deserializer) try: return behavior(argument, context), True - except Exception as e: # pylint: disable=broad-except + except Exception as exception: # pylint: disable=broad-except with state.condition: - if e not in state.rpc_errors: - details = 'Exception calling application: {}'.format(e) + if exception not in state.rpc_errors: + details = 'Exception calling application: {}'.format(exception) logging.exception(details) _abort(state, rpc_event.operation_call, cygrpc.StatusCode.unknown, _common.encode(details)) @@ -389,10 +389,10 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator): return next(response_iterator), True except StopIteration: return None, True - except Exception as e: # pylint: disable=broad-except + except Exception as exception: # pylint: disable=broad-except with state.condition: - if e not in state.rpc_errors: - details = 'Exception iterating responses: {}'.format(e) + if exception not in state.rpc_errors: + details = 'Exception iterating responses: {}'.format(exception) logging.exception(details) _abort(state, rpc_event.operation_call, cygrpc.StatusCode.unknown, _common.encode(details)) @@ -591,7 +591,13 @@ def _handle_call(rpc_event, generic_handlers, thread_pool, if not rpc_event.success: return None, None if rpc_event.request_call_details.method is not None: - method_handler = _find_method_handler(rpc_event, generic_handlers) + try: + method_handler = _find_method_handler(rpc_event, generic_handlers) + except Exception as exception: # pylint: disable=broad-except + details = 'Exception servicing handler: {}'.format(exception) + logging.exception(details) + return _reject_rpc(rpc_event, cygrpc.StatusCode.unknown, + b'Error in service handler!'), None if method_handler is None: return _reject_rpc(rpc_event, cygrpc.StatusCode.unimplemented, b'Method not found!'), None diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index e277a3ea1d..34cbade92c 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -22,7 +22,7 @@ "unit._api_test.ChannelConnectivityTest", "unit._api_test.ChannelTest", "unit._auth_context_test.AuthContextTest", - "unit._auth_test.AccessTokenCallCredentialsTest", + "unit._auth_test.AccessTokenAuthMetadataPluginTest", "unit._auth_test.GoogleCallCredentialsTest", "unit._channel_args_test.ChannelArgsTest", "unit._channel_connectivity_test.ChannelConnectivityTest", diff --git a/src/python/grpcio_tests/tests/unit/_auth_test.py b/src/python/grpcio_tests/tests/unit/_auth_test.py index f61951b80a..e2cb938936 100644 --- a/src/python/grpcio_tests/tests/unit/_auth_test.py +++ b/src/python/grpcio_tests/tests/unit/_auth_test.py @@ -61,7 +61,7 @@ class GoogleCallCredentialsTest(unittest.TestCase): self.assertTrue(callback_event.wait(1.0)) -class AccessTokenCallCredentialsTest(unittest.TestCase): +class AccessTokenAuthMetadataPluginTest(unittest.TestCase): def test_google_call_credentials_success(self): callback_event = threading.Event() @@ -71,8 +71,8 @@ class AccessTokenCallCredentialsTest(unittest.TestCase): self.assertIsNone(error) callback_event.set() - call_creds = _auth.AccessTokenCallCredentials('token') - call_creds(None, mock_callback) + metadata_plugin = _auth.AccessTokenAuthMetadataPlugin('token') + metadata_plugin(None, mock_callback) self.assertTrue(callback_event.wait(1.0)) diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 18d4a6df64..da94cf8028 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -28,7 +28,7 @@ _CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value' _EMPTY_FLAGS = 0 -def _metadata_plugin_callback(context, callback): +def _metadata_plugin(context, callback): callback( cygrpc.Metadata([ cygrpc.Metadatum(_CALL_CREDENTIALS_METADATA_KEY, @@ -105,17 +105,9 @@ class TypeSmokeTest(unittest.TestCase): channel = cygrpc.Channel(b'[::]:0', cygrpc.ChannelArgs([])) del channel - def testCredentialsMetadataPluginUpDown(self): - plugin = cygrpc.CredentialsMetadataPlugin( - lambda ignored_a, ignored_b: None, b'') - del plugin - - def testCallCredentialsFromPluginUpDown(self): - plugin = cygrpc.CredentialsMetadataPlugin(_metadata_plugin_callback, - b'') - call_credentials = cygrpc.call_credentials_metadata_plugin(plugin) - del plugin - del call_credentials + def test_metadata_plugin_call_credentials_up_down(self): + cygrpc.MetadataPluginCallCredentials(_metadata_plugin, + b'test plugin name!') def testServerStartNoExplicitShutdown(self): server = cygrpc.Server(cygrpc.ChannelArgs([])) @@ -205,7 +197,7 @@ class ServerClientMixin(object): return test_utilities.SimpleFuture(performer) - def testEcho(self): + def test_echo(self): DEADLINE = time.time() + 5 DEADLINE_TOLERANCE = 0.25 CLIENT_METADATA_ASCII_KEY = b'key' @@ -439,8 +431,8 @@ class SecureServerSecureClient(unittest.TestCase, ServerClientMixin): cygrpc.SslPemKeyCertPair(resources.private_key(), resources.certificate_chain()) ], False) - client_credentials = cygrpc.channel_credentials_ssl( - resources.test_root_certificates(), None) + client_credentials = cygrpc.SSLChannelCredentials( + resources.test_root_certificates(), None, None) self.setUpMixin(server_credentials, client_credentials, _SSL_HOST_OVERRIDE) diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py index 0a1e50c94c..2a1a49ce74 100644 --- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -32,6 +32,7 @@ _UNARY_UNARY = '/test/UnaryUnary' _UNARY_STREAM = '/test/UnaryStream' _STREAM_UNARY = '/test/StreamUnary' _STREAM_STREAM = '/test/StreamStream' +_DEFECTIVE_GENERIC_RPC_HANDLER = '/test/DefectiveGenericRpcHandler' class _Callback(object): @@ -95,6 +96,9 @@ class _Handler(object): yield request self._control.control() + def defective_generic_rpc_handler(self): + raise test_control.Defect() + class _MethodHandler(grpc.RpcMethodHandler): @@ -132,6 +136,8 @@ class _GenericHandler(grpc.GenericRpcHandler): elif handler_call_details.method == _STREAM_STREAM: return _MethodHandler(True, True, None, None, None, None, None, self._handler.handle_stream_stream) + elif handler_call_details.method == _DEFECTIVE_GENERIC_RPC_HANDLER: + return self._handler.defective_generic_rpc_handler() else: return None @@ -176,6 +182,10 @@ def _stream_stream_multi_callable(channel): return channel.stream_stream(_STREAM_STREAM) +def _defective_handler_multi_callable(channel): + return channel.unary_unary(_DEFECTIVE_GENERIC_RPC_HANDLER) + + class InvocationDefectsTest(unittest.TestCase): def setUp(self): @@ -235,6 +245,18 @@ class InvocationDefectsTest(unittest.TestCase): for _ in range(test_constants.STREAM_LENGTH // 2 + 1): next(response_iterator) + def testDefectiveGenericRpcHandlerUnaryResponse(self): + request = b'\x07\x08' + multi_callable = _defective_handler_multi_callable(self._channel) + + with self.assertRaises(grpc.RpcError) as exception_context: + response = multi_callable( + request, + metadata=(('test', 'DefectiveGenericRpcHandlerUnary'),)) + + self.assertIs(grpc.StatusCode.UNKNOWN, + exception_context.exception.code()) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc index f63a32ea5b..ccf35444d0 100644 --- a/test/core/bad_client/bad_client.cc +++ b/test/core/bad_client/bad_client.cc @@ -114,9 +114,9 @@ void grpc_run_bad_client_test( GRPC_BAD_CLIENT_REGISTERED_HOST, GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, 0); grpc_server_start(a.server); - transport = grpc_create_chttp2_transport(nullptr, sfd.server, 0); + transport = grpc_create_chttp2_transport(nullptr, sfd.server, false); server_setup_transport(&a, transport); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); /* Bind everything into the same pollset */ grpc_endpoint_add_to_pollset(sfd.client, grpc_cq_pollset(a.cq)); diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.cc b/test/core/end2end/fixtures/h2_sockpair+trace.cc index 447bb91d0c..9807e929af 100644 --- a/test/core/end2end/fixtures/h2_sockpair+trace.cc +++ b/test/core/end2end/fixtures/h2_sockpair+trace.cc @@ -93,10 +93,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, 1); + transport = grpc_create_chttp2_transport(client_args, sfd->client, true); client_setup_transport(&cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, @@ -108,9 +108,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, f->server = grpc_server_create(server_args, nullptr); grpc_server_register_completion_queue(f->server, f->cq, nullptr); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) { diff --git a/test/core/end2end/fixtures/h2_sockpair.cc b/test/core/end2end/fixtures/h2_sockpair.cc index c1f8ee0b64..b68279fd71 100644 --- a/test/core/end2end/fixtures/h2_sockpair.cc +++ b/test/core/end2end/fixtures/h2_sockpair.cc @@ -87,10 +87,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, 1); + transport = grpc_create_chttp2_transport(client_args, sfd->client, true); client_setup_transport(&cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, @@ -102,9 +102,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, f->server = grpc_server_create(server_args, nullptr); grpc_server_register_completion_queue(f->server, f->cq, nullptr); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) { diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.cc b/test/core/end2end/fixtures/h2_sockpair_1byte.cc index 55b847bdae..350be138ca 100644 --- a/test/core/end2end/fixtures/h2_sockpair_1byte.cc +++ b/test/core/end2end/fixtures/h2_sockpair_1byte.cc @@ -98,10 +98,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, 1); + transport = grpc_create_chttp2_transport(client_args, sfd->client, true); client_setup_transport(&cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, @@ -113,9 +113,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, f->server = grpc_server_create(server_args, nullptr); grpc_server_register_completion_queue(f->server, f->cq, nullptr); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) { diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index 539bbe4134..15c9dd4847 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -68,6 +68,9 @@ struct grpc_end2end_http_proxy { // Connection handling // +// proxy_connection structure is only accessed in the closures which are all +// scheduled under the same combiner lock. So there is is no need for a mutex to +// protect this structure. typedef struct proxy_connection { grpc_end2end_http_proxy* proxy; @@ -78,6 +81,8 @@ typedef struct proxy_connection { grpc_pollset_set* pollset_set; + // NOTE: All the closures execute under proxy->combiner lock. Which means + // there will not be any data-races between the closures grpc_closure on_read_request_done; grpc_closure on_server_connect_done; grpc_closure on_write_response_done; @@ -86,6 +91,13 @@ typedef struct proxy_connection { grpc_closure on_server_read_done; grpc_closure on_server_write_done; + bool client_read_failed : 1; + bool client_write_failed : 1; + bool client_shutdown : 1; + bool server_read_failed : 1; + bool server_write_failed : 1; + bool server_shutdown : 1; + grpc_slice_buffer client_read_buffer; grpc_slice_buffer client_deferred_write_buffer; bool client_is_writing; @@ -126,18 +138,50 @@ static void proxy_connection_unref(proxy_connection* conn, const char* reason) { } } +enum failure_type { + SETUP_FAILED, // To be used before we start proxying. + CLIENT_READ_FAILED, + CLIENT_WRITE_FAILED, + SERVER_READ_FAILED, + SERVER_WRITE_FAILED, +}; + // Helper function to shut down the proxy connection. -// Does NOT take ownership of a reference to error. -static void proxy_connection_failed(proxy_connection* conn, bool is_client, - const char* prefix, grpc_error* error) { - const char* msg = grpc_error_string(error); - gpr_log(GPR_INFO, "%s: %s", prefix, msg); - - grpc_endpoint_shutdown(conn->client_endpoint, GRPC_ERROR_REF(error)); - if (conn->server_endpoint != nullptr) { +static void proxy_connection_failed(proxy_connection* conn, + failure_type failure, const char* prefix, + grpc_error* error) { + gpr_log(GPR_INFO, "%s: %s", prefix, grpc_error_string(error)); + // Decide whether we should shut down the client and server. + bool shutdown_client = false; + bool shutdown_server = false; + if (failure == SETUP_FAILED) { + shutdown_client = true; + shutdown_server = true; + } else { + if ((failure == CLIENT_READ_FAILED && conn->client_write_failed) || + (failure == CLIENT_WRITE_FAILED && conn->client_read_failed) || + (failure == SERVER_READ_FAILED && !conn->client_is_writing)) { + shutdown_client = true; + } + if ((failure == SERVER_READ_FAILED && conn->server_write_failed) || + (failure == SERVER_WRITE_FAILED && conn->server_read_failed) || + (failure == CLIENT_READ_FAILED && !conn->server_is_writing)) { + shutdown_server = true; + } + } + // If we decided to shut down either one and have not yet done so, do so. + if (shutdown_client && !conn->client_shutdown) { + grpc_endpoint_shutdown(conn->client_endpoint, GRPC_ERROR_REF(error)); + conn->client_shutdown = true; + } + if (shutdown_server && !conn->server_shutdown && + (conn->server_endpoint != nullptr)) { grpc_endpoint_shutdown(conn->server_endpoint, GRPC_ERROR_REF(error)); + conn->server_shutdown = true; } + // Unref the connection. proxy_connection_unref(conn, "conn_failed"); + GRPC_ERROR_UNREF(error); } // Callback for writing proxy data to the client. @@ -145,8 +189,8 @@ static void on_client_write_done(void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy client write", error); + proxy_connection_failed(conn, CLIENT_WRITE_FAILED, + "HTTP proxy client write", GRPC_ERROR_REF(error)); return; } // Clear write buffer (the data we just wrote). @@ -170,8 +214,8 @@ static void on_server_write_done(void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; conn->server_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, false /* is_client */, - "HTTP proxy server write", error); + proxy_connection_failed(conn, SERVER_WRITE_FAILED, + "HTTP proxy server write", GRPC_ERROR_REF(error)); return; } // Clear write buffer (the data we just wrote). @@ -195,8 +239,8 @@ static void on_server_write_done(void* arg, grpc_error* error) { static void on_client_read_done(void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy client read", error); + proxy_connection_failed(conn, CLIENT_READ_FAILED, "HTTP proxy client read", + GRPC_ERROR_REF(error)); return; } // If there is already a pending write (i.e., server_write_buffer is @@ -226,8 +270,8 @@ static void on_client_read_done(void* arg, grpc_error* error) { static void on_server_read_done(void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, false /* is_client */, - "HTTP proxy server read", error); + proxy_connection_failed(conn, SERVER_READ_FAILED, "HTTP proxy server read", + GRPC_ERROR_REF(error)); return; } // If there is already a pending write (i.e., client_write_buffer is @@ -257,8 +301,8 @@ static void on_write_response_done(void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy write response", error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy write response", + GRPC_ERROR_REF(error)); return; } // Clear write buffer. @@ -285,8 +329,8 @@ static void on_server_connect_done(void* arg, grpc_error* error) { // connection failed. However, for the purposes of this test code, // it's fine to pretend this is a client-side error, which will // cause the client connection to be dropped. - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy server connect", error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy server connect", + GRPC_ERROR_REF(error)); return; } // We've established a connection, so send back a 200 response code to @@ -331,8 +375,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn, grpc_error_string(error)); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy read request", error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request", + GRPC_ERROR_REF(error)); return; } // Read request and feed it to the parser. @@ -341,8 +385,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { error = grpc_http_parser_parse( &conn->http_parser, conn->client_read_buffer.slices[i], nullptr); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy request parse", error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy request parse", + GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } @@ -362,8 +406,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { conn->http_request.method); error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy read request", error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request", + GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } @@ -382,8 +426,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { if (!client_authenticated) { const char* msg = "HTTP Connect could not verify authentication"; error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg); - proxy_connection_failed(conn, true /* is_client */, - "HTTP proxy read request", error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request", + GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } @@ -393,8 +437,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { error = grpc_blocking_resolve_address(conn->http_request.path, "80", &resolved_addresses); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(conn, true /* is_client */, "HTTP proxy DNS lookup", - error); + proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy DNS lookup", + GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 76e716281a..967a6d560f 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -468,9 +468,9 @@ static void do_connect(void* arg, grpc_error* error) { *fc->ep = client; grpc_transport* transport = - grpc_create_chttp2_transport(nullptr, server, 0); + grpc_create_chttp2_transport(nullptr, server, false); grpc_server_setup_transport(g_server, transport, nullptr, nullptr); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); GRPC_CLOSURE_SCHED(fc->closure, GRPC_ERROR_NONE); } else { diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc index 2d3fc7a074..c17d581d8b 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.cc +++ b/test/core/end2end/fuzzers/client_fuzzer.cc @@ -55,8 +55,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); grpc_transport* transport = - grpc_create_chttp2_transport(nullptr, mock_endpoint, 1); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_create_chttp2_transport(nullptr, mock_endpoint, true); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); grpc_channel* channel = grpc_channel_create( "test-target", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, transport); diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc index db25160e2b..61c55e0afd 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.cc +++ b/test/core/end2end/fuzzers/server_fuzzer.cc @@ -61,9 +61,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { // grpc_server_register_method(server, "/reg", NULL, 0); grpc_server_start(server); grpc_transport* transport = - grpc_create_chttp2_transport(nullptr, mock_endpoint, 0); + grpc_create_chttp2_transport(nullptr, mock_endpoint, false); grpc_server_setup_transport(server, transport, nullptr, nullptr); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); grpc_call* call1 = nullptr; grpc_call_details call_details1; diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc index 7f662f0683..0deb534abd 100644 --- a/test/core/iomgr/udp_server_test.cc +++ b/test/core/iomgr/udp_server_test.cc @@ -50,7 +50,7 @@ static int g_number_of_writes = 0; static int g_number_of_bytes_read = 0; static int g_number_of_orphan_calls = 0; -static void on_read(grpc_fd* emfd, void* user_data) { +static bool on_read(grpc_fd* emfd, void* user_data) { char read_buffer[512]; ssize_t byte_count; @@ -64,9 +64,11 @@ static void on_read(grpc_fd* emfd, void* user_data) { GPR_ASSERT( GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); gpr_mu_unlock(g_mu); + return false; } -static void on_write(grpc_fd* emfd, void* user_data) { +static void on_write(grpc_fd* emfd, void* user_data, + grpc_closure* notify_on_write_closure) { gpr_mu_lock(g_mu); g_number_of_writes++; @@ -79,6 +81,7 @@ static void on_fd_orphaned(grpc_fd* emfd, grpc_closure* closure, void* user_data) { gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d", grpc_fd_wrapped_fd(emfd)); + GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); g_number_of_orphan_calls++; } @@ -222,7 +225,6 @@ static void test_receive(int number_of_clients) { int clifd, svrfd; grpc_udp_server* s = grpc_udp_server_create(nullptr); int i; - int number_of_reads_before; grpc_millis deadline; grpc_pollset* pollsets[1]; LOG_TEST("test_receive"); @@ -252,14 +254,14 @@ static void test_receive(int number_of_clients) { deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10)); - number_of_reads_before = g_number_of_reads; + int number_of_bytes_read_before = g_number_of_bytes_read; /* Create a socket, send a packet to the UDP server. */ clifd = socket(addr->ss_family, SOCK_DGRAM, 0); GPR_ASSERT(clifd >= 0); GPR_ASSERT(connect(clifd, (struct sockaddr*)addr, (socklen_t)resolved_addr.len) == 0); GPR_ASSERT(5 == write(clifd, "hello", 5)); - while (g_number_of_reads == number_of_reads_before && + while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) && deadline > grpc_core::ExecCtx::Get()->Now()) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( @@ -268,7 +270,6 @@ static void test_receive(int number_of_clients) { grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(g_mu); } - GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1); close(clifd); } GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients); @@ -280,9 +281,6 @@ static void test_receive(int number_of_clients) { /* The server had a single FD, which is orphaned exactly once in * * grpc_udp_server_destroy. */ GPR_ASSERT(g_number_of_orphan_calls == 1); - - /* The write callback should have fired a few times. */ - GPR_ASSERT(g_number_of_writes > 0); } static void destroy_pollset(void* p, grpc_error* error) { diff --git a/test/core/security/ssl_server_fuzzer.cc b/test/core/security/ssl_server_fuzzer.cc index 7056cfe581..6e30698562 100644 --- a/test/core/security/ssl_server_fuzzer.cc +++ b/test/core/security/ssl_server_fuzzer.cc @@ -93,8 +93,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create(); grpc_server_security_connector_add_handshakers(sc, handshake_mgr); grpc_handshake_manager_do_handshake( - handshake_mgr, mock_endpoint, nullptr /* channel_args */, deadline, - nullptr /* acceptor */, on_handshake_done, &state); + handshake_mgr, nullptr /* interested_parties */, mock_endpoint, + nullptr /* channel_args */, deadline, nullptr /* acceptor */, + on_handshake_done, &state); grpc_core::ExecCtx::Get()->Flush(); // If the given string happens to be part of the correct client hello, the diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD index 1ea7d0341d..6eff716b01 100644 --- a/test/core/transport/chttp2/BUILD +++ b/test/core/transport/chttp2/BUILD @@ -115,6 +115,21 @@ grpc_cc_test( ) grpc_cc_test( + name = "settings_timeout_test", + srcs = ["settings_timeout_test.cc"], + language = "C++", + deps = [ + "//:gpr", + "//:grpc", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + ], + external_deps = [ + "gtest", + ], +) + +grpc_cc_test( name = "varint_test", srcs = ["varint_test.cc"], language = "C++", diff --git a/test/core/transport/chttp2/settings_timeout_test.cc b/test/core/transport/chttp2/settings_timeout_test.cc new file mode 100644 index 0000000000..08473c72b6 --- /dev/null +++ b/test/core/transport/chttp2/settings_timeout_test.cc @@ -0,0 +1,253 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +#include <memory> +#include <thread> + +#include <gtest/gtest.h> + +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/slice/slice_internal.h" + +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +namespace grpc_core { +namespace test { +namespace { + +// A gRPC server, running in its own thread. +class ServerThread { + public: + explicit ServerThread(const char* address) : address_(address) {} + + void Start() { + // Start server with 1-second handshake timeout. + grpc_arg arg; + arg.type = GRPC_ARG_INTEGER; + arg.key = const_cast<char*>(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS); + arg.value.integer = 1000; + grpc_channel_args args = {1, &arg}; + server_ = grpc_server_create(&args, nullptr); + ASSERT_TRUE(grpc_server_add_insecure_http2_port(server_, address_)); + cq_ = grpc_completion_queue_create_for_next(nullptr); + grpc_server_register_completion_queue(server_, cq_, nullptr); + grpc_server_start(server_); + thread_.reset(new std::thread(std::bind(&ServerThread::Serve, this))); + } + + void Shutdown() { + grpc_completion_queue* shutdown_cq = + grpc_completion_queue_create_for_pluck(nullptr); + grpc_server_shutdown_and_notify(server_, shutdown_cq, nullptr); + GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, nullptr, + grpc_timeout_seconds_to_deadline(1), + nullptr) + .type == GRPC_OP_COMPLETE); + grpc_completion_queue_destroy(shutdown_cq); + grpc_server_destroy(server_); + grpc_completion_queue_destroy(cq_); + thread_->join(); + } + + private: + void Serve() { + // The completion queue should not return anything other than shutdown. + grpc_event ev = grpc_completion_queue_next( + cq_, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr); + ASSERT_EQ(GRPC_QUEUE_SHUTDOWN, ev.type); + } + + const char* address_; // Do not own. + grpc_server* server_ = nullptr; + grpc_completion_queue* cq_ = nullptr; + std::unique_ptr<std::thread> thread_; +}; + +// A TCP client that connects to the server, reads data until the server +// closes, and then terminates. +class Client { + public: + explicit Client(const char* server_address) + : server_address_(server_address) {} + + void Connect() { + grpc_core::ExecCtx exec_ctx; + grpc_resolved_addresses* server_addresses = nullptr; + grpc_error* error = + grpc_blocking_resolve_address(server_address_, "80", &server_addresses); + ASSERT_EQ(GRPC_ERROR_NONE, error) << grpc_error_string(error); + ASSERT_GE(server_addresses->naddrs, 1UL); + pollset_ = (grpc_pollset*)gpr_zalloc(grpc_pollset_size()); + grpc_pollset_init(pollset_, &mu_); + grpc_pollset_set* pollset_set = grpc_pollset_set_create(); + grpc_pollset_set_add_pollset(pollset_set, pollset_); + EventState state; + grpc_tcp_client_connect(state.closure(), &endpoint_, pollset_set, + nullptr /* channel_args */, server_addresses->addrs, + 1000); + ASSERT_TRUE(PollUntilDone( + &state, + grpc_timespec_to_millis_round_up(gpr_inf_future(GPR_CLOCK_MONOTONIC)))); + ASSERT_EQ(GRPC_ERROR_NONE, state.error()); + grpc_pollset_set_destroy(pollset_set); + grpc_endpoint_add_to_pollset(endpoint_, pollset_); + grpc_resolved_addresses_destroy(server_addresses); + } + + // Reads until an error is returned. + // Returns true if an error was encountered before the deadline. + bool ReadUntilError() { + grpc_core::ExecCtx exec_ctx; + grpc_slice_buffer read_buffer; + grpc_slice_buffer_init(&read_buffer); + bool retval = true; + // Use a deadline of 3 seconds, which is a lot more than we should + // need for a 1-second timeout, but this helps avoid flakes. + grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 3000; + while (true) { + EventState state; + grpc_endpoint_read(endpoint_, &read_buffer, state.closure()); + if (!PollUntilDone(&state, deadline)) { + retval = false; + break; + } + if (state.error() != GRPC_ERROR_NONE) break; + gpr_log(GPR_INFO, "client read %" PRIuPTR " bytes", read_buffer.length); + grpc_slice_buffer_reset_and_unref_internal(&read_buffer); + } + grpc_endpoint_shutdown(endpoint_, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown")); + grpc_slice_buffer_destroy_internal(&read_buffer); + return retval; + } + + void Shutdown() { + grpc_core::ExecCtx exec_ctx; + grpc_endpoint_destroy(endpoint_); + grpc_pollset_shutdown(pollset_, + GRPC_CLOSURE_CREATE(&Client::PollsetDestroy, pollset_, + grpc_schedule_on_exec_ctx)); + } + + private: + // State used to wait for an I/O event. + class EventState { + public: + EventState() { + GRPC_CLOSURE_INIT(&closure_, &EventState::OnEventDone, this, + grpc_schedule_on_exec_ctx); + } + + ~EventState() { GRPC_ERROR_UNREF(error_); } + + grpc_closure* closure() { return &closure_; } + + bool done() const { return done_; } + + // Caller does NOT take ownership of the error. + grpc_error* error() const { return error_; } + + private: + static void OnEventDone(void* arg, grpc_error* error) { + gpr_log(GPR_INFO, "OnEventDone(): %s", grpc_error_string(error)); + EventState* state = (EventState*)arg; + state->error_ = GRPC_ERROR_REF(error); + state->done_ = true; + } + + grpc_closure closure_; + bool done_ = false; + grpc_error* error_ = GRPC_ERROR_NONE; + }; + + // Returns true if done, or false if deadline exceeded. + bool PollUntilDone(EventState* state, grpc_millis deadline) { + while (true) { + grpc_pollset_worker* worker = nullptr; + gpr_mu_lock(mu_); + GRPC_LOG_IF_ERROR( + "grpc_pollset_work", + grpc_pollset_work(pollset_, &worker, + grpc_core::ExecCtx::Get()->Now() + 1000)); + gpr_mu_unlock(mu_); + if (state != nullptr && state->done()) return true; + if (grpc_core::ExecCtx::Get()->Now() >= deadline) return false; + } + } + + static void PollsetDestroy(void* arg, grpc_error* error) { + grpc_pollset* pollset = (grpc_pollset*)arg; + grpc_pollset_destroy(pollset); + gpr_free(pollset); + } + + const char* server_address_; // Do not own. + grpc_endpoint* endpoint_; + gpr_mu* mu_; + grpc_pollset* pollset_; +}; + +TEST(SettingsTimeout, Basic) { + // Construct server address string. + const int server_port = grpc_pick_unused_port_or_die(); + char* server_address_string; + gpr_asprintf(&server_address_string, "localhost:%d", server_port); + // Start server. + gpr_log(GPR_INFO, "starting server on %s", server_address_string); + ServerThread server_thread(server_address_string); + server_thread.Start(); + // Create client and connect to server. + gpr_log(GPR_INFO, "starting client connect"); + Client client(server_address_string); + client.Connect(); + // Client read. Should fail due to server dropping connection. + gpr_log(GPR_INFO, "starting client read"); + EXPECT_TRUE(client.ReadUntilError()); + // Shut down client. + gpr_log(GPR_INFO, "shutting down client"); + client.Shutdown(); + // Shut down server. + gpr_log(GPR_INFO, "shutting down server"); + server_thread.Shutdown(); + // Clean up. + gpr_free(server_address_string); +} + +} // namespace +} // namespace test +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_test_init(argc, argv); + grpc_init(); + int result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 48cdd433b2..d4ee6b429f 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -353,11 +353,6 @@ class GrpclbEnd2endTest : public ::testing::Test { "balancer", server_host_, balancers_.back().get())); } ResetStub(); - std::vector<AddressData> addresses; - for (size_t i = 0; i < balancer_servers_.size(); ++i) { - addresses.emplace_back(AddressData{balancer_servers_[i].port_, true, ""}); - } - SetNextResolution(addresses); } void TearDown() override { @@ -370,6 +365,14 @@ class GrpclbEnd2endTest : public ::testing::Test { grpc_fake_resolver_response_generator_unref(response_generator_); } + void SetNextResolutionAllBalancers() { + std::vector<AddressData> addresses; + for (size_t i = 0; i < balancer_servers_.size(); ++i) { + addresses.emplace_back(AddressData{balancer_servers_[i].port_, true, ""}); + } + SetNextResolution(addresses); + } + void ResetStub(int fallback_timeout = 0) { ChannelArguments args; args.SetGrpclbFallbackTimeout(fallback_timeout); @@ -580,6 +583,7 @@ class SingleBalancerTest : public GrpclbEnd2endTest { }; TEST_F(SingleBalancerTest, Vanilla) { + SetNextResolutionAllBalancers(); const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), @@ -607,6 +611,7 @@ TEST_F(SingleBalancerTest, Vanilla) { } TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { + SetNextResolutionAllBalancers(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); const int kCallDeadlineMs = 1000 * grpc_test_slowdown_factor(); @@ -644,6 +649,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { } TEST_F(SingleBalancerTest, Fallback) { + SetNextResolutionAllBalancers(); const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); const size_t kNumBackendInResolution = backends_.size() / 2; @@ -710,6 +716,7 @@ TEST_F(SingleBalancerTest, Fallback) { } TEST_F(SingleBalancerTest, FallbackUpdate) { + SetNextResolutionAllBalancers(); const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); const size_t kNumBackendInResolution = backends_.size() / 3; @@ -817,6 +824,7 @@ TEST_F(SingleBalancerTest, FallbackUpdate) { } TEST_F(SingleBalancerTest, BackendsRestart) { + SetNextResolutionAllBalancers(); const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), @@ -856,6 +864,7 @@ class UpdatesTest : public GrpclbEnd2endTest { }; TEST_F(UpdatesTest, UpdateBalancers) { + SetNextResolutionAllBalancers(); const std::vector<int> first_backend{GetBackendPorts()[0]}; const std::vector<int> second_backend{GetBackendPorts()[1]}; ScheduleResponseForBalancer( @@ -918,6 +927,7 @@ TEST_F(UpdatesTest, UpdateBalancers) { // verify that the LB channel inside grpclb keeps the initial connection (which // by definition is also present in the update). TEST_F(UpdatesTest, UpdateBalancersRepeated) { + SetNextResolutionAllBalancers(); const std::vector<int> first_backend{GetBackendPorts()[0]}; const std::vector<int> second_backend{GetBackendPorts()[0]}; @@ -988,6 +998,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { } TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { + std::vector<AddressData> addresses; + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + SetNextResolution(addresses); const std::vector<int> first_backend{GetBackendPorts()[0]}; const std::vector<int> second_backend{GetBackendPorts()[1]}; @@ -1029,7 +1042,7 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); - std::vector<AddressData> addresses; + addresses.clear(); addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""}); gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); SetNextResolution(addresses); @@ -1054,8 +1067,14 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { balancers_[2]->NotifyDoneWithServerlists(); EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); - EXPECT_EQ(1U, balancer_servers_[1].service_->request_count()); - EXPECT_EQ(1U, balancer_servers_[1].service_->response_count()); + // The second balancer, published as part of the first update, may end up + // getting two requests (that is, 1 <= #req <= 2) if the LB call retry timer + // firing races with the arrival of the update containing the second + // balancer. + EXPECT_GE(balancer_servers_[1].service_->request_count(), 1U); + EXPECT_GE(balancer_servers_[1].service_->response_count(), 1U); + EXPECT_LE(balancer_servers_[1].service_->request_count(), 2U); + EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U); EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); // Check LB policy name for the channel. @@ -1063,6 +1082,7 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { } TEST_F(SingleBalancerTest, Drop) { + SetNextResolutionAllBalancers(); const size_t kNumRpcsPerAddress = 100; const int num_of_drop_by_rate_limiting_addresses = 1; const int num_of_drop_by_load_balancing_addresses = 2; @@ -1106,6 +1126,7 @@ TEST_F(SingleBalancerTest, Drop) { } TEST_F(SingleBalancerTest, DropAllFirst) { + SetNextResolutionAllBalancers(); // All registered addresses are marked as "drop". const int num_of_drop_by_rate_limiting_addresses = 1; const int num_of_drop_by_load_balancing_addresses = 1; @@ -1121,6 +1142,7 @@ TEST_F(SingleBalancerTest, DropAllFirst) { } TEST_F(SingleBalancerTest, DropAll) { + SetNextResolutionAllBalancers(); ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); @@ -1151,6 +1173,7 @@ class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest { }; TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { + SetNextResolutionAllBalancers(); const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), @@ -1185,6 +1208,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { } TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { + SetNextResolutionAllBalancers(); const size_t kNumRpcsPerAddress = 3; const int num_of_drop_by_rate_limiting_addresses = 2; const int num_of_drop_by_load_balancing_addresses = 1; diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc index a24cdc7d2d..30bd8bfef8 100644 --- a/test/cpp/interop/interop_server.cc +++ b/test/cpp/interop/interop_server.cc @@ -317,9 +317,15 @@ class TestServiceImpl : public TestService::Service { void grpc::testing::interop::RunServer( std::shared_ptr<ServerCredentials> creds) { - GPR_ASSERT(FLAGS_port != 0); + RunServer(creds, FLAGS_port, nullptr); +} + +void grpc::testing::interop::RunServer( + std::shared_ptr<ServerCredentials> creds, const int port, + ServerStartedCondition* server_started_condition) { + GPR_ASSERT(port != 0); std::ostringstream server_address; - server_address << "0.0.0.0:" << FLAGS_port; + server_address << "0.0.0.0:" << port; TestServiceImpl service; SimpleRequest request; @@ -333,6 +339,14 @@ void grpc::testing::interop::RunServer( } std::unique_ptr<Server> server(builder.BuildAndStart()); gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str()); + + // Signal that the server has started. + if (server_started_condition) { + std::unique_lock<std::mutex> lock(server_started_condition->mutex); + server_started_condition->server_started = true; + server_started_condition->condition.notify_all(); + } + while (!gpr_atm_no_barrier_load(&g_got_sigint)) { gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(5, GPR_TIMESPAN))); diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h index 6af003fe93..1bf7db1e14 100644 --- a/test/cpp/interop/server_helper.h +++ b/test/cpp/interop/server_helper.h @@ -19,6 +19,7 @@ #ifndef GRPC_TEST_CPP_INTEROP_SERVER_HELPER_H #define GRPC_TEST_CPP_INTEROP_SERVER_HELPER_H +#include <condition_variable> #include <memory> #include <grpc/compression.h> @@ -50,8 +51,27 @@ class InteropServerContextInspector { namespace interop { extern gpr_atm g_got_sigint; + +struct ServerStartedCondition { + std::mutex mutex; + std::condition_variable condition; + bool server_started = false; +}; + +/// Run gRPC interop server using port FLAGS_port. +/// +/// \param creds The credentials associated with the server. void RunServer(std::shared_ptr<ServerCredentials> creds); +/// Run gRPC interop server. +/// +/// \param creds The credentials associated with the server. +/// \param port Port to use for the server. +/// \param server_started_condition (optional) Struct holding mutex, condition +/// variable, and condition used to notify when the server has started. +void RunServer(std::shared_ptr<ServerCredentials> creds, int port, + ServerStartedCondition* server_started_condition); + } // namespace interop } // namespace testing } // namespace grpc diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 43bbbad880..fcb1677d09 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -132,7 +132,7 @@ class Fixture { grpc_channel_args c_args = args.c_channel_args(); ep_ = new DummyEndpoint; t_ = grpc_create_chttp2_transport(&c_args, ep_, client); - grpc_chttp2_transport_start_reading(t_, nullptr); + grpc_chttp2_transport_start_reading(t_, nullptr, nullptr); FlushExecCtx(); } diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h index ab0a696e3d..d1ede755a5 100644 --- a/test/cpp/microbenchmarks/fullstack_fixtures.h +++ b/test/cpp/microbenchmarks/fullstack_fixtures.h @@ -174,7 +174,7 @@ class EndpointPairFixture : public BaseFixture { const grpc_channel_args* server_args = grpc_server_get_channel_args(server_->c_server()); server_transport_ = grpc_create_chttp2_transport( - server_args, endpoints.server, 0 /* is_client */); + server_args, endpoints.server, false /* is_client */); grpc_pollset** pollsets; size_t num_pollsets = 0; @@ -186,7 +186,7 @@ class EndpointPairFixture : public BaseFixture { grpc_server_setup_transport(server_->c_server(), server_transport_, nullptr, server_args); - grpc_chttp2_transport_start_reading(server_transport_, nullptr); + grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr); } /* create channel */ @@ -197,11 +197,11 @@ class EndpointPairFixture : public BaseFixture { grpc_channel_args c_args = args.c_channel_args(); client_transport_ = - grpc_create_chttp2_transport(&c_args, endpoints.client, 1); + grpc_create_chttp2_transport(&c_args, endpoints.client, true); GPR_ASSERT(client_transport_); grpc_channel* channel = grpc_channel_create( "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport_); - grpc_chttp2_transport_start_reading(client_transport_, nullptr); + grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr); channel_ = CreateChannelInternal("", channel); } diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc index 1711eef303..0b9dc83f2b 100644 --- a/test/cpp/performance/writes_per_rpc_test.cc +++ b/test/cpp/performance/writes_per_rpc_test.cc @@ -89,7 +89,7 @@ class EndpointPairFixture { const grpc_channel_args* server_args = grpc_server_get_channel_args(server_->c_server()); grpc_transport* transport = grpc_create_chttp2_transport( - server_args, endpoints.server, 0 /* is_client */); + server_args, endpoints.server, false /* is_client */); grpc_pollset** pollsets; size_t num_pollsets = 0; @@ -101,7 +101,7 @@ class EndpointPairFixture { grpc_server_setup_transport(server_->c_server(), transport, nullptr, server_args); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); } /* create channel */ @@ -112,11 +112,11 @@ class EndpointPairFixture { grpc_channel_args c_args = args.c_channel_args(); grpc_transport* transport = - grpc_create_chttp2_transport(&c_args, endpoints.client, 1); + grpc_create_chttp2_transport(&c_args, endpoints.client, true); GPR_ASSERT(transport); grpc_channel* channel = grpc_channel_create( "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); - grpc_chttp2_transport_start_reading(transport, nullptr); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); channel_ = CreateChannelInternal("", channel); } diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 9f20b148eb..82a3f0042d 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -60,21 +60,20 @@ class SynchronousClient SetupLoadTest(config, num_threads_); } - virtual ~SynchronousClient(){}; + virtual ~SynchronousClient() {} - virtual void InitThreadFuncImpl(size_t thread_idx) = 0; + virtual bool InitThreadFuncImpl(size_t thread_idx) = 0; virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0; void ThreadFunc(size_t thread_idx, Thread* t) override { - InitThreadFuncImpl(thread_idx); + if (!InitThreadFuncImpl(thread_idx)) { + return; + } for (;;) { // run the loop body HistogramEntry entry; const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx); t->UpdateHistogram(&entry); - if (!thread_still_ok) { - gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - } if (!thread_still_ok || ThreadCompleted()) { return; } @@ -109,9 +108,6 @@ class SynchronousClient size_t num_threads_; std::vector<SimpleResponse> responses_; - - private: - void DestroyMultithreading() override final { EndThreads(); } }; class SynchronousUnaryClient final : public SynchronousClient { @@ -122,7 +118,7 @@ class SynchronousUnaryClient final : public SynchronousClient { } ~SynchronousUnaryClient() {} - void InitThreadFuncImpl(size_t thread_idx) override {} + bool InitThreadFuncImpl(size_t thread_idx) override { return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { @@ -140,6 +136,9 @@ class SynchronousUnaryClient final : public SynchronousClient { entry->set_status(s.error_code()); return true; } + + private: + void DestroyMultithreading() override final { EndThreads(); } }; template <class StreamType> @@ -149,31 +148,30 @@ class SynchronousStreamingClient : public SynchronousClient { : SynchronousClient(config), context_(num_threads_), stream_(num_threads_), + stream_mu_(num_threads_), + shutdown_(num_threads_), messages_per_stream_(config.messages_per_stream()), messages_issued_(num_threads_) { StartThreads(num_threads_); } virtual ~SynchronousStreamingClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - // forcibly cancel the streams, then finish - context_[i].TryCancel(); - (*stream)->Finish().IgnoreError(); - // don't log any error message on !ok since this was canceled - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams([this](size_t thread_idx) { + // Don't log any kind of error since we may have canceled this + stream_[thread_idx]->Finish().IgnoreError(); + }); } protected: std::vector<grpc::ClientContext> context_; std::vector<std::unique_ptr<StreamType>> stream_; + // stream_mu_ is only needed when changing an element of stream_ or context_ + std::vector<std::mutex> stream_mu_; + // use struct Bool rather than bool because vector<bool> is not concurrent + struct Bool { + bool val; + Bool() : val(false) {} + }; + std::vector<Bool> shutdown_; const int messages_per_stream_; std::vector<int> messages_issued_; @@ -182,27 +180,26 @@ class SynchronousStreamingClient : public SynchronousClient { // don't set the value since the stream is failed and shouldn't be timed entry->set_status(s.error_code()); if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx, - s.error_message().c_str()); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", + thread_idx, s.error_message().c_str()); + } } + // Lock the stream_mu_ now because the client context could change + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); context_[thread_idx].~ClientContext(); new (&context_[thread_idx]) ClientContext(); } -}; -class SynchronousStreamingPingPongClient final - : public SynchronousStreamingClient< - grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { - public: - SynchronousStreamingPingPongClient(const ClientConfig& config) - : SynchronousStreamingClient(config) {} - ~SynchronousStreamingPingPongClient() { + void CleanupAllStreams(std::function<void(size_t)> cleaner) { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); + cleanup_threads.emplace_back([this, i, cleaner] { + std::lock_guard<std::mutex> l(stream_mu_[i]); + shutdown_[i].val = true; + if (stream_[i]) { + cleaner(i); } }); } @@ -211,10 +208,36 @@ class SynchronousStreamingPingPongClient final } } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + void DestroyMultithreading() override final { + CleanupAllStreams( + [this](size_t thread_idx) { context_[thread_idx].TryCancel(); }); + EndThreads(); + } +}; + +class SynchronousStreamingPingPongClient final + : public SynchronousStreamingClient< + grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { + public: + SynchronousStreamingPingPongClient(const ClientConfig& config) + : SynchronousStreamingClient(config) {} + ~SynchronousStreamingPingPongClient() { + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); + } + + private: + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + } else { + return false; + } messages_issued_[thread_idx] = 0; + return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { @@ -239,7 +262,13 @@ class SynchronousStreamingPingPongClient final stream_[thread_idx]->WritesDone(); FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + } else { + stream_[thread_idx].reset(); + return false; + } messages_issued_[thread_idx] = 0; return true; } @@ -251,25 +280,24 @@ class SynchronousStreamingFromClientClient final SynchronousStreamingFromClientClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_issue_(num_threads_) {} ~SynchronousStreamingFromClientClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + std::vector<double> last_issue_; + + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], - &responses_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + } else { + return false; + } last_issue_[thread_idx] = UsageTimer::Now(); + return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { @@ -287,13 +315,16 @@ class SynchronousStreamingFromClientClient final stream_[thread_idx]->WritesDone(); FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], - &responses_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + } else { + stream_[thread_idx].reset(); + return false; + } return true; } - - private: - std::vector<double> last_issue_; }; class SynchronousStreamingFromServerClient final @@ -301,12 +332,24 @@ class SynchronousStreamingFromServerClient final public: SynchronousStreamingFromServerClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_recv_(num_threads_) {} - void InitThreadFuncImpl(size_t thread_idx) override { + ~SynchronousStreamingFromServerClient() {} + + private: + std::vector<double> last_recv_; + + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = - stub->StreamingFromServer(&context_[thread_idx], request_); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + } else { + return false; + } last_recv_[thread_idx] = UsageTimer::Now(); + return true; } + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); if (stream_[thread_idx]->Read(&responses_[thread_idx])) { @@ -317,13 +360,16 @@ class SynchronousStreamingFromServerClient final } FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = - stub->StreamingFromServer(&context_[thread_idx], request_); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + } else { + stream_[thread_idx].reset(); + return false; + } return true; } - - private: - std::vector<double> last_recv_; }; class SynchronousStreamingBothWaysClient final @@ -333,24 +379,22 @@ class SynchronousStreamingBothWaysClient final SynchronousStreamingBothWaysClient(const ClientConfig& config) : SynchronousStreamingClient(config) {} ~SynchronousStreamingBothWaysClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + } else { + return false; + } + return true; } + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index ae7ddb2475..3958d8e9e4 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -2896,6 +2896,23 @@ "gpr", "gpr_test_util", "grpc", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c++", + "name": "chttp2_settings_timeout_test", + "src": [ + "test/core/transport/chttp2/settings_timeout_test.cc" + ], + "third_party": false, + "type": "target" + }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", "grpc++", "grpc++_test_util", "grpc_cli_libs", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index ce698cb709..6ffc7f412d 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -3376,6 +3376,30 @@ "flaky": false, "gtest": true, "language": "c++", + "name": "chttp2_settings_timeout_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", "name": "cli_call_test", "platforms": [ "linux", diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 766c1c0b7c..471f5d99e7 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -1231,7 +1231,7 @@ if not args.disable_auto_set_flakes: if test.flaky: flaky_tests.add(test.name) if test.cpu > 0: shortname_to_cpu[test.name] = test.cpu except: - print("Unexpected error getting flaky tests:", sys.exc_info()[0]) + print("Unexpected error getting flaky tests: %s" % traceback.format_exc()) if args.force_default_poller: _POLLING_STRATEGIES = {} |