diff options
author | Craig Tiller <ctiller@google.com> | 2016-10-17 14:31:44 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-10-17 14:31:44 -0700 |
commit | 654b242ce70afcf9fdab674cab9b71d8d3f02502 (patch) | |
tree | f463f15f28a23c0b94cd2c1e2a3250a416943ad1 | |
parent | a628c84e86c0a545c7550e865a356ca4144047f8 (diff) | |
parent | 14c836fcd10e4f96ae1f4ac22fe43d42e2e293ba (diff) |
Merge branch 'direct-calls' into buffer_pools_for_realsies
53 files changed, 1312 insertions, 195 deletions
@@ -219,6 +219,7 @@ cc_library( "src/core/lib/iomgr/timer_heap.h", "src/core/lib/iomgr/udp_server.h", "src/core/lib/iomgr/unix_sockets_posix.h", + "src/core/lib/iomgr/wakeup_fd_cv.h", "src/core/lib/iomgr/wakeup_fd_pipe.h", "src/core/lib/iomgr/wakeup_fd_posix.h", "src/core/lib/iomgr/workqueue.h", @@ -381,6 +382,7 @@ cc_library( "src/core/lib/iomgr/udp_server.c", "src/core/lib/iomgr/unix_sockets_posix.c", "src/core/lib/iomgr/unix_sockets_posix_noop.c", + "src/core/lib/iomgr/wakeup_fd_cv.c", "src/core/lib/iomgr/wakeup_fd_eventfd.c", "src/core/lib/iomgr/wakeup_fd_nospecial.c", "src/core/lib/iomgr/wakeup_fd_pipe.c", @@ -621,6 +623,7 @@ cc_library( "src/core/lib/iomgr/timer_heap.h", "src/core/lib/iomgr/udp_server.h", "src/core/lib/iomgr/unix_sockets_posix.h", + "src/core/lib/iomgr/wakeup_fd_cv.h", "src/core/lib/iomgr/wakeup_fd_pipe.h", "src/core/lib/iomgr/wakeup_fd_posix.h", "src/core/lib/iomgr/workqueue.h", @@ -768,6 +771,7 @@ cc_library( "src/core/lib/iomgr/udp_server.c", "src/core/lib/iomgr/unix_sockets_posix.c", "src/core/lib/iomgr/unix_sockets_posix_noop.c", + "src/core/lib/iomgr/wakeup_fd_cv.c", "src/core/lib/iomgr/wakeup_fd_eventfd.c", "src/core/lib/iomgr/wakeup_fd_nospecial.c", "src/core/lib/iomgr/wakeup_fd_pipe.c", @@ -978,6 +982,7 @@ cc_library( "src/core/lib/iomgr/timer_heap.h", "src/core/lib/iomgr/udp_server.h", "src/core/lib/iomgr/unix_sockets_posix.h", + "src/core/lib/iomgr/wakeup_fd_cv.h", "src/core/lib/iomgr/wakeup_fd_pipe.h", "src/core/lib/iomgr/wakeup_fd_posix.h", "src/core/lib/iomgr/workqueue.h", @@ -1117,6 +1122,7 @@ cc_library( "src/core/lib/iomgr/udp_server.c", "src/core/lib/iomgr/unix_sockets_posix.c", "src/core/lib/iomgr/unix_sockets_posix_noop.c", + "src/core/lib/iomgr/wakeup_fd_cv.c", "src/core/lib/iomgr/wakeup_fd_eventfd.c", "src/core/lib/iomgr/wakeup_fd_nospecial.c", "src/core/lib/iomgr/wakeup_fd_pipe.c", @@ -1884,6 +1890,7 @@ objc_library( "src/core/lib/iomgr/udp_server.c", "src/core/lib/iomgr/unix_sockets_posix.c", "src/core/lib/iomgr/unix_sockets_posix_noop.c", + "src/core/lib/iomgr/wakeup_fd_cv.c", "src/core/lib/iomgr/wakeup_fd_eventfd.c", "src/core/lib/iomgr/wakeup_fd_nospecial.c", "src/core/lib/iomgr/wakeup_fd_pipe.c", @@ -2103,6 +2110,7 @@ objc_library( "src/core/lib/iomgr/timer_heap.h", "src/core/lib/iomgr/udp_server.h", "src/core/lib/iomgr/unix_sockets_posix.h", + "src/core/lib/iomgr/wakeup_fd_cv.h", "src/core/lib/iomgr/wakeup_fd_pipe.h", "src/core/lib/iomgr/wakeup_fd_posix.h", "src/core/lib/iomgr/workqueue.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 92dd243567..59b661101e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -347,6 +347,7 @@ add_library(grpc src/core/lib/iomgr/udp_server.c src/core/lib/iomgr/unix_sockets_posix.c src/core/lib/iomgr/unix_sockets_posix_noop.c + src/core/lib/iomgr/wakeup_fd_cv.c src/core/lib/iomgr/wakeup_fd_eventfd.c src/core/lib/iomgr/wakeup_fd_nospecial.c src/core/lib/iomgr/wakeup_fd_pipe.c @@ -606,6 +607,7 @@ add_library(grpc_cronet src/core/lib/iomgr/udp_server.c src/core/lib/iomgr/unix_sockets_posix.c src/core/lib/iomgr/unix_sockets_posix_noop.c + src/core/lib/iomgr/wakeup_fd_cv.c src/core/lib/iomgr/wakeup_fd_eventfd.c src/core/lib/iomgr/wakeup_fd_nospecial.c src/core/lib/iomgr/wakeup_fd_pipe.c @@ -837,6 +839,7 @@ add_library(grpc_unsecure src/core/lib/iomgr/udp_server.c src/core/lib/iomgr/unix_sockets_posix.c src/core/lib/iomgr/unix_sockets_posix_noop.c + src/core/lib/iomgr/wakeup_fd_cv.c src/core/lib/iomgr/wakeup_fd_eventfd.c src/core/lib/iomgr/wakeup_fd_nospecial.c src/core/lib/iomgr/wakeup_fd_pipe.c @@ -1026,6 +1026,7 @@ transport_security_test: $(BINDIR)/$(CONFIG)/transport_security_test udp_server_test: $(BINDIR)/$(CONFIG)/udp_server_test uri_fuzzer_test: $(BINDIR)/$(CONFIG)/uri_fuzzer_test uri_parser_test: $(BINDIR)/$(CONFIG)/uri_parser_test +wakeup_fd_cv_test: $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test alarm_cpp_test: $(BINDIR)/$(CONFIG)/alarm_cpp_test async_end2end_test: $(BINDIR)/$(CONFIG)/async_end2end_test auth_property_iterator_test: $(BINDIR)/$(CONFIG)/auth_property_iterator_test @@ -1342,6 +1343,7 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/transport_security_test \ $(BINDIR)/$(CONFIG)/udp_server_test \ $(BINDIR)/$(CONFIG)/uri_parser_test \ + $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test \ $(BINDIR)/$(CONFIG)/public_headers_must_be_c89 \ $(BINDIR)/$(CONFIG)/badreq_bad_client_test \ $(BINDIR)/$(CONFIG)/connection_prefix_bad_client_test \ @@ -1742,6 +1744,8 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/udp_server_test || ( echo test udp_server_test failed ; exit 1 ) $(E) "[RUN] Testing uri_parser_test" $(Q) $(BINDIR)/$(CONFIG)/uri_parser_test || ( echo test uri_parser_test failed ; exit 1 ) + $(E) "[RUN] Testing wakeup_fd_cv_test" + $(Q) $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test || ( echo test wakeup_fd_cv_test failed ; exit 1 ) $(E) "[RUN] Testing public_headers_must_be_c89" $(Q) $(BINDIR)/$(CONFIG)/public_headers_must_be_c89 || ( echo test public_headers_must_be_c89 failed ; exit 1 ) $(E) "[RUN] Testing badreq_bad_client_test" @@ -2595,6 +2599,7 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/udp_server.c \ src/core/lib/iomgr/unix_sockets_posix.c \ src/core/lib/iomgr/unix_sockets_posix_noop.c \ + src/core/lib/iomgr/wakeup_fd_cv.c \ src/core/lib/iomgr/wakeup_fd_eventfd.c \ src/core/lib/iomgr/wakeup_fd_nospecial.c \ src/core/lib/iomgr/wakeup_fd_pipe.c \ @@ -2872,6 +2877,7 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/iomgr/udp_server.c \ src/core/lib/iomgr/unix_sockets_posix.c \ src/core/lib/iomgr/unix_sockets_posix_noop.c \ + src/core/lib/iomgr/wakeup_fd_cv.c \ src/core/lib/iomgr/wakeup_fd_eventfd.c \ src/core/lib/iomgr/wakeup_fd_nospecial.c \ src/core/lib/iomgr/wakeup_fd_pipe.c \ @@ -3139,6 +3145,7 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/iomgr/udp_server.c \ src/core/lib/iomgr/unix_sockets_posix.c \ src/core/lib/iomgr/unix_sockets_posix_noop.c \ + src/core/lib/iomgr/wakeup_fd_cv.c \ src/core/lib/iomgr/wakeup_fd_eventfd.c \ src/core/lib/iomgr/wakeup_fd_nospecial.c \ src/core/lib/iomgr/wakeup_fd_pipe.c \ @@ -3333,6 +3340,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/udp_server.c \ src/core/lib/iomgr/unix_sockets_posix.c \ src/core/lib/iomgr/unix_sockets_posix_noop.c \ + src/core/lib/iomgr/wakeup_fd_cv.c \ src/core/lib/iomgr/wakeup_fd_eventfd.c \ src/core/lib/iomgr/wakeup_fd_nospecial.c \ src/core/lib/iomgr/wakeup_fd_pipe.c \ @@ -10799,6 +10807,38 @@ endif endif +WAKEUP_FD_CV_TEST_SRC = \ + test/core/iomgr/wakeup_fd_cv_test.c \ + +WAKEUP_FD_CV_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(WAKEUP_FD_CV_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/wakeup_fd_cv_test: openssl_dep_error + +else + + + +$(BINDIR)/$(CONFIG)/wakeup_fd_cv_test: $(WAKEUP_FD_CV_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LD) $(LDFLAGS) $(WAKEUP_FD_CV_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test + +endif + +$(OBJDIR)/$(CONFIG)/test/core/iomgr/wakeup_fd_cv_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_wakeup_fd_cv_test: $(WAKEUP_FD_CV_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(WAKEUP_FD_CV_TEST_OBJS:.o=.dep) +endif +endif + + ALARM_CPP_TEST_SRC = \ test/cpp/common/alarm_cpp_test.cc \ diff --git a/binding.gyp b/binding.gyp index 6f2a9eddf7..c767241fde 100644 --- a/binding.gyp +++ b/binding.gyp @@ -622,6 +622,7 @@ 'src/core/lib/iomgr/udp_server.c', 'src/core/lib/iomgr/unix_sockets_posix.c', 'src/core/lib/iomgr/unix_sockets_posix_noop.c', + 'src/core/lib/iomgr/wakeup_fd_cv.c', 'src/core/lib/iomgr/wakeup_fd_eventfd.c', 'src/core/lib/iomgr/wakeup_fd_nospecial.c', 'src/core/lib/iomgr/wakeup_fd_pipe.c', diff --git a/build.yaml b/build.yaml index c883dbf137..f76f925517 100644 --- a/build.yaml +++ b/build.yaml @@ -223,6 +223,7 @@ filegroups: - src/core/lib/iomgr/timer_heap.h - src/core/lib/iomgr/udp_server.h - src/core/lib/iomgr/unix_sockets_posix.h + - src/core/lib/iomgr/wakeup_fd_cv.h - src/core/lib/iomgr/wakeup_fd_pipe.h - src/core/lib/iomgr/wakeup_fd_posix.h - src/core/lib/iomgr/workqueue.h @@ -308,6 +309,7 @@ filegroups: - src/core/lib/iomgr/udp_server.c - src/core/lib/iomgr/unix_sockets_posix.c - src/core/lib/iomgr/unix_sockets_posix_noop.c + - src/core/lib/iomgr/wakeup_fd_cv.c - src/core/lib/iomgr/wakeup_fd_eventfd.c - src/core/lib/iomgr/wakeup_fd_nospecial.c - src/core/lib/iomgr/wakeup_fd_pipe.c @@ -2606,6 +2608,20 @@ targets: - grpc - gpr_test_util - gpr +- name: wakeup_fd_cv_test + build: test + language: c + src: + - test/core/iomgr/wakeup_fd_cv_test.c + deps: + - grpc_test_util + - grpc + - gpr_test_util + - gpr + platforms: + - mac + - linux + - posix - name: alarm_cpp_test gtest: true build: test @@ -141,6 +141,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/udp_server.c \ src/core/lib/iomgr/unix_sockets_posix.c \ src/core/lib/iomgr/unix_sockets_posix_noop.c \ + src/core/lib/iomgr/wakeup_fd_cv.c \ src/core/lib/iomgr/wakeup_fd_eventfd.c \ src/core/lib/iomgr/wakeup_fd_nospecial.c \ src/core/lib/iomgr/wakeup_fd_pipe.c \ diff --git a/doc/interop-test-descriptions.md b/doc/interop-test-descriptions.md index 666af185b9..7c153fa3fa 100644 --- a/doc/interop-test-descriptions.md +++ b/doc/interop-test-descriptions.md @@ -779,25 +779,38 @@ Client asserts: ### unimplemented_method -Status: Ready for implementation. Blocking beta. - -This test verifies calling unimplemented RPC method returns the UNIMPLEMENTED status code. +This test verifies that calling an unimplemented RPC method returns the +UNIMPLEMENTED status code. Server features: N/A Procedure: -* Client calls `grpc.testing.UnimplementedService/UnimplementedCall` with an - empty request (defined as `grpc.testing.Empty`): +* Client calls `grpc.testing.TestService/UnimplementedMethod` with an empty + request (defined as `grpc.testing.Empty`): ``` { } ``` + +Client asserts: +* received status code is 12 (UNIMPLEMENTED) + +### unimplemented_service + +This test verifies calling an unimplemented server returns the UNIMPLEMENTED +status code. + +Server features: +N/A + +Procedure: +* Client calls `grpc.testing.UnimplementedService/UnimplementedCall` with an + empty request (defined as `grpc.testing.Empty`) Client asserts: * received status code is 12 (UNIMPLEMENTED) -* received status message is empty or null/unset ### cancel_after_begin diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 6a6a407025..19bd8ac42a 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -306,6 +306,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/timer_heap.h', 'src/core/lib/iomgr/udp_server.h', 'src/core/lib/iomgr/unix_sockets_posix.h', + 'src/core/lib/iomgr/wakeup_fd_cv.h', 'src/core/lib/iomgr/wakeup_fd_pipe.h', 'src/core/lib/iomgr/wakeup_fd_posix.h', 'src/core/lib/iomgr/workqueue.h', @@ -472,6 +473,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/udp_server.c', 'src/core/lib/iomgr/unix_sockets_posix.c', 'src/core/lib/iomgr/unix_sockets_posix_noop.c', + 'src/core/lib/iomgr/wakeup_fd_cv.c', 'src/core/lib/iomgr/wakeup_fd_eventfd.c', 'src/core/lib/iomgr/wakeup_fd_nospecial.c', 'src/core/lib/iomgr/wakeup_fd_pipe.c', @@ -680,6 +682,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/timer_heap.h', 'src/core/lib/iomgr/udp_server.h', 'src/core/lib/iomgr/unix_sockets_posix.h', + 'src/core/lib/iomgr/wakeup_fd_cv.h', 'src/core/lib/iomgr/wakeup_fd_pipe.h', 'src/core/lib/iomgr/wakeup_fd_posix.h', 'src/core/lib/iomgr/workqueue.h', diff --git a/grpc.gemspec b/grpc.gemspec index 7345e28381..004e937109 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -226,6 +226,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/timer_heap.h ) s.files += %w( src/core/lib/iomgr/udp_server.h ) s.files += %w( src/core/lib/iomgr/unix_sockets_posix.h ) + s.files += %w( src/core/lib/iomgr/wakeup_fd_cv.h ) s.files += %w( src/core/lib/iomgr/wakeup_fd_pipe.h ) s.files += %w( src/core/lib/iomgr/wakeup_fd_posix.h ) s.files += %w( src/core/lib/iomgr/workqueue.h ) @@ -392,6 +393,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/udp_server.c ) s.files += %w( src/core/lib/iomgr/unix_sockets_posix.c ) s.files += %w( src/core/lib/iomgr/unix_sockets_posix_noop.c ) + s.files += %w( src/core/lib/iomgr/wakeup_fd_cv.c ) s.files += %w( src/core/lib/iomgr/wakeup_fd_eventfd.c ) s.files += %w( src/core/lib/iomgr/wakeup_fd_nospecial.c ) s.files += %w( src/core/lib/iomgr/wakeup_fd_pipe.c ) diff --git a/package.xml b/package.xml index 1a7266422b..19e23b9e74 100644 --- a/package.xml +++ b/package.xml @@ -233,6 +233,7 @@ <file baseinstalldir="/" name="src/core/lib/iomgr/timer_heap.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/udp_server.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/unix_sockets_posix.h" role="src" /> + <file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_cv.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_pipe.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_posix.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/workqueue.h" role="src" /> @@ -399,6 +400,7 @@ <file baseinstalldir="/" name="src/core/lib/iomgr/udp_server.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/unix_sockets_posix.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/unix_sockets_posix_noop.c" role="src" /> + <file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_cv.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_eventfd.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_nospecial.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_pipe.c" role="src" /> diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index a6056c3e8d..cbf79afa17 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -513,10 +513,14 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - call_data *calld = arg; + grpc_call_element *elem = arg; + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; gpr_mu_lock(&calld->mu); GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); + grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, + chand->interested_parties); calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (calld->connected_subchannel == NULL) { gpr_atm_no_barrier_store(&calld->subchannel_call, 1); @@ -564,6 +568,9 @@ typedef struct { grpc_closure closure; } continue_picking_args; +/** Return true if subchannel is available immediately (in which case on_ready + should not be called), or false otherwise (in which case on_ready should be + called when the subchannel is available). */ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, @@ -629,8 +636,8 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, gpr_mu_unlock(&chand->mu); // TODO(dgq): make this deadline configurable somehow. const grpc_lb_policy_pick_args inputs = { - calld->pollent, initial_metadata, initial_metadata_flags, - &calld->lb_token_mdelem, gpr_inf_future(GPR_CLOCK_MONOTONIC)}; + initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem, + gpr_inf_future(GPR_CLOCK_MONOTONIC)}; r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); @@ -672,6 +679,7 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); /* try to (atomically) get the call */ @@ -739,14 +747,20 @@ retry: calld->connected_subchannel == NULL && op->send_initial_metadata != NULL) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; - grpc_closure_init(&calld->next_step, subchannel_ready, calld); + grpc_closure_init(&calld->next_step, subchannel_ready, elem); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); + /* If a subchannel is not available immediately, the polling entity from + call_data should be provided to channel_data's interested_parties, so + that IO of the lb_policy and resolver could be done under it. */ if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, op->send_initial_metadata_flags, &calld->connected_subchannel, &calld->next_step, GRPC_ERROR_NONE)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); + } else { + grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, + chand->interested_parties); } } /* if we've got a subchannel, then let's ask it to create a call */ diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index 110d08fcac..de424cd105 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -35,7 +35,6 @@ #define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H #include "src/core/ext/client_config/subchannel.h" -#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/transport/connectivity_state.h" /** A load balancing policy: specified by a vtable and a struct (which @@ -55,8 +54,6 @@ struct grpc_lb_policy { /** Extra arguments for an LB pick */ typedef struct grpc_lb_policy_pick_args { - /** Parties interested in the pick's progress */ - grpc_polling_entity *pollent; /** Initial metadata associated with the picking call. */ grpc_metadata_batch *initial_metadata; /** Bitmask used for selective cancelling. See \a @@ -153,7 +150,8 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, once the pick is complete with its error argument set to indicate success or failure. - Any I/O should be done under \a pick_args->pollent. */ + Any IO should be done under the \a interested_parties \a grpc_pollset_set + in the \a grpc_lb_policy struct. */ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, void **user_data, diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 626f285b90..9af92b787d 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -69,8 +69,8 @@ * possible scenarios: * * 1. This is the first server list received. There was no previous instance of - * the Round Robin policy. \a rr_handover() will instantiate the RR policy - * and perform all the pending operations over it. + * the Round Robin policy. \a rr_handover_locked() will instantiate the RR + * policy and perform all the pending operations over it. * 2. There's already a RR policy instance active. We need to introduce the new * one build from the new serverlist, but taking care not to disrupt the * operations in progress over the old RR instance. This is done by @@ -78,7 +78,7 @@ * references are held on the old RR policy, it'll be destroyed and \a * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN * state. At this point we can transition to a new RR instance safely, which - * is done once again via \a rr_handover(). + * is done once again via \a rr_handover_locked(). * * * Once a RR policy instance is in place (and getting updated as described), @@ -86,8 +86,8 @@ * forwarding them to the RR instance. Any time there's no RR policy available * (ie, right after the creation of the gRPCLB policy, if an empty serverlist * is received, etc), pick/ping requests are added to a list of pending - * picks/pings to be flushed and serviced as part of \a rr_handover() the moment - * the RR policy instance becomes available. + * picks/pings to be flushed and serviced as part of \a rr_handover_locked() the + * moment the RR policy instance becomes available. * * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the * high level design and details. */ @@ -134,6 +134,9 @@ static void initial_metadata_add_lb_token( } typedef struct wrapped_rr_closure_arg { + /* the closure instance using this struct as argument */ + grpc_closure wrapper_closure; + /* the original closure. Usually a on_complete/notify cb for pick() and ping() * calls against the internal RR instance, respectively. */ grpc_closure *wrapped_closure; @@ -155,9 +158,8 @@ typedef struct wrapped_rr_closure_arg { /* The RR instance related to the closure */ grpc_lb_policy *rr_policy; - /* when not NULL, represents a pending_{pick,ping} node to be freed upon - * closure execution */ - void *owning_pending_node; /* to be freed if not NULL */ + /* heap memory to be freed upon closure execution. */ + void *free_when_done; } wrapped_rr_closure_arg; /* The \a on_complete closure passed as part of the pick requires keeping a @@ -183,10 +185,10 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, } } GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), NULL); - gpr_free(wc_arg->owning_pending_node); + GPR_ASSERT(wc_arg->free_when_done != NULL); + gpr_free(wc_arg->free_when_done); } /* Linked list of pending pick requests. It stores all information needed to @@ -207,10 +209,6 @@ typedef struct pending_pick { * upon error. */ grpc_connected_subchannel **target; - /* a closure wrapping the original on_complete one to be invoked once the - * pick() has completed (regardless of success) */ - grpc_closure wrapped_on_complete; - /* args for wrapped_on_complete */ wrapped_rr_closure_arg wrapped_on_complete_arg; } pending_pick; @@ -230,8 +228,9 @@ static void add_pending_pick(pending_pick **root, pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; pp->wrapped_on_complete_arg.lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; - grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure, - &pp->wrapped_on_complete_arg); + pp->wrapped_on_complete_arg.free_when_done = pp; + grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure, + wrapped_rr_closure, &pp->wrapped_on_complete_arg); *root = pp; } @@ -239,10 +238,6 @@ static void add_pending_pick(pending_pick **root, typedef struct pending_ping { struct pending_ping *next; - /* a closure wrapping the original on_complete one to be invoked once the - * ping() has completed (regardless of success) */ - grpc_closure wrapped_notify; - /* args for wrapped_notify */ wrapped_rr_closure_arg wrapped_notify_arg; } pending_ping; @@ -251,10 +246,11 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) { pending_ping *pping = gpr_malloc(sizeof(*pping)); memset(pping, 0, sizeof(pending_ping)); memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg)); - pping->next = *root; - grpc_closure_init(&pping->wrapped_notify, wrapped_rr_closure, - &pping->wrapped_notify_arg); pping->wrapped_notify_arg.wrapped_closure = notify; + pping->wrapped_notify_arg.free_when_done = pping; + pping->next = *root; + grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure, + wrapped_rr_closure, &pping->wrapped_notify_arg); *root = pping; } @@ -307,13 +303,6 @@ typedef struct glb_lb_policy { /** for tracking of the RR connectivity */ rr_connectivity_data *rr_connectivity; - - /* a wrapped (see \a wrapped_rr_closure) on-complete closure for readily - * available RR picks */ - grpc_closure wrapped_on_complete; - - /* arguments for the wrapped_on_complete closure */ - wrapped_rr_closure_arg wc_arg; } glb_lb_policy; /* Keeps track and reacts to changes in connectivity of the RR instance */ @@ -399,14 +388,14 @@ static grpc_lb_addresses *process_serverlist( GPR_ARRAY_SIZE(server->load_balance_token) - 1; grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( (uint8_t *)server->load_balance_token, lb_token_size); - user_data = grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); + user_data = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LB_TOKEN, + lb_token_mdstr); } else { gpr_log(GPR_ERROR, "Missing LB token for backend address '%s'. The empty token will " "be used instead", grpc_sockaddr_to_uri((struct sockaddr *)&addr.addr)); - user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY; + user_data = GRPC_MDELEM_LB_TOKEN_EMPTY; } grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, @@ -424,9 +413,43 @@ static void lb_token_destroy(void *token) { if (token != NULL) GRPC_MDELEM_UNREF(token); } -static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, - const grpc_grpclb_serverlist *serverlist, - glb_lb_policy *glb_policy) { +/* perform a pick over \a rr_policy. Given that a pick can return immediately + * (ignoring its completion callback) we need to perform the cleanups this + * callback would be otherwise resposible for */ +static bool pick_from_internal_rr_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { + GPR_ASSERT(rr_policy != NULL); + const bool pick_done = + grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target, + (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); + if (pick_done) { + /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick"); + + /* add the load reporting initial metadata */ + initial_metadata_add_lb_token(pick_args->initial_metadata, + pick_args->lb_token_mdelem_storage, + GRPC_MDELEM_REF(wc_arg->lb_token)); + + gpr_free(wc_arg); + } + /* else, the pending pick will be registered and taken care of by the + * pending pick list inside the RR policy (glb_policy->rr_policy). + * Eventually, wrapped_on_complete will be called, which will -among other + * things- add the LB token to the call's initial metadata */ + + return pick_done; +} + +static grpc_lb_policy *create_rr_locked( + grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist, + glb_lb_policy *glb_policy) { GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); grpc_lb_policy_args args; @@ -446,18 +469,21 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, return rr; } -static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, - grpc_error *error) { +static void rr_handover_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy, grpc_error *error) { GPR_ASSERT(glb_policy->serverlist != NULL && glb_policy->serverlist->num_servers > 0); glb_policy->rr_policy = - create_rr(exec_ctx, glb_policy->serverlist, glb_policy); + create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")", (intptr_t)glb_policy->rr_policy); } GPR_ASSERT(glb_policy->rr_policy != NULL); + grpc_pollset_set_add_pollset_set(exec_ctx, + glb_policy->rr_policy->interested_parties, + glb_policy->base.interested_parties); glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity( exec_ctx, glb_policy->rr_policy, &error); grpc_lb_policy_notify_on_state_change( @@ -478,11 +504,9 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pp->pick_args, - pp->target, - (void **)&pp->wrapped_on_complete_arg.lb_token, - &pp->wrapped_on_complete); - pp->wrapped_on_complete_arg.owning_pending_node = pp; + pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, + &pp->pick_args, pp->target, + &pp->wrapped_on_complete_arg); } pending_ping *pping; @@ -495,8 +519,7 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, (intptr_t)glb_policy->rr_policy); } grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, - &pping->wrapped_notify); - pping->wrapped_notify_arg.owning_pending_node = pping; + &pping->wrapped_notify_arg.wrapper_closure); } } @@ -509,13 +532,16 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (glb_policy->serverlist != NULL) { /* a RR policy is shutting down but there's a serverlist available -> * perform a handover */ - rr_handover(exec_ctx, glb_policy, error); + gpr_mu_lock(&glb_policy->mu); + rr_handover_locked(exec_ctx, glb_policy, error); + gpr_mu_unlock(&glb_policy->mu); } else { /* shutting down and no new serverlist available. Bail out. */ gpr_free(rr_conn_data); } } else { if (error == GRPC_ERROR_NONE) { + gpr_mu_lock(&glb_policy->mu); /* RR not shutting down. Mimic the RR's policy state */ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_conn_data->state, GRPC_ERROR_REF(error), @@ -524,6 +550,7 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_conn_data->state, &rr_conn_data->on_change); + gpr_mu_unlock(&glb_policy->mu); } else { /* error */ gpr_free(rr_conn_data); } @@ -648,15 +675,15 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_ERROR_NONE, NULL); pp = next; } while (pping != NULL) { pending_ping *next = pping->next; - grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, + GRPC_ERROR_NONE, NULL); pping = next; } @@ -686,11 +713,9 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_polling_entity_del_from_pollset_set( - exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties); *target = NULL; grpc_exec_ctx_sched( - exec_ctx, &pp->wrapped_on_complete, + exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); } else { pp->next = glb_policy->pending_picks; @@ -719,10 +744,8 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_polling_entity_del_from_pollset_set( - exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties); grpc_exec_ctx_sched( - exec_ctx, &pp->wrapped_on_complete, + exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); } else { pp->next = glb_policy->pending_picks; @@ -775,39 +798,20 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, (intptr_t)glb_policy->rr_policy); } GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); - memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg)); - glb_policy->wc_arg.rr_policy = glb_policy->rr_policy; - glb_policy->wc_arg.target = target; - glb_policy->wc_arg.wrapped_closure = on_complete; - glb_policy->wc_arg.lb_token_mdelem_storage = - pick_args->lb_token_mdelem_storage; - glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata; - glb_policy->wc_arg.owning_pending_node = NULL; - grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure, - &glb_policy->wc_arg); - - pick_done = - grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, - (void **)&glb_policy->wc_arg.lb_token, - &glb_policy->wrapped_on_complete); - if (pick_done) { - /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", - (intptr_t)glb_policy->wc_arg.rr_policy); - } - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick"); - /* add the load reporting initial metadata */ - initial_metadata_add_lb_token( - pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, - GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token)); - } + wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg)); + memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg)); + + grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg); + wc_arg->rr_policy = glb_policy->rr_policy; + wc_arg->target = target; + wc_arg->wrapped_closure = on_complete; + wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; + wc_arg->initial_metadata = pick_args->initial_metadata; + wc_arg->free_when_done = wc_arg; + pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, + pick_args, target, wc_arg); } else { - /* else, the pending pick will be registered and taken care of by the - * pending pick list inside the RR policy (glb_policy->rr_policy) */ - grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, - glb_policy->base.interested_parties); add_pending_pick(&glb_policy->pending_picks, pick_args, target, on_complete); @@ -931,7 +935,7 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling - * entities passed to glb_pick(). */ + * entities from \a client_channel. */ lb_client->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, @@ -1076,6 +1080,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* update serverlist */ if (serverlist->num_servers > 0) { + gpr_mu_lock(&lb_client->glb_policy->mu); if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist, serverlist)) { if (grpc_lb_glb_trace) { @@ -1093,7 +1098,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { if (lb_client->glb_policy->rr_policy == NULL) { /* initial "handover", in this case from a null RR policy, meaning * it'll just create the first RR policy instance */ - rr_handover(exec_ctx, lb_client->glb_policy, error); + rr_handover_locked(exec_ctx, lb_client->glb_policy, error); } else { /* unref the RR policy, eventually leading to its substitution with a * new one constructed from the received serverlist (see @@ -1101,6 +1106,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy, "serverlist_received"); } + gpr_mu_unlock(&lb_client->glb_policy->mu); } else { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 961a0c9b19..6533327343 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -39,7 +39,6 @@ typedef struct pending_pick { struct pending_pick *next; - grpc_polling_entity *pollent; uint32_t initial_metadata_flags; grpc_connected_subchannel **target; grpc_closure *on_complete; @@ -119,8 +118,6 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); pp = next; @@ -138,8 +135,6 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); *target = NULL; grpc_exec_ctx_sched( exec_ctx, pp->on_complete, @@ -168,8 +163,6 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); grpc_exec_ctx_sched( exec_ctx, pp->on_complete, GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); @@ -229,11 +222,8 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, - p->base.interested_parties); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; - pp->pollent = pick_args->pollent; pp->target = target; pp->initial_metadata_flags = pick_args->initial_metadata_flags; pp->on_complete = on_complete; @@ -319,8 +309,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = selected; - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 930fa86aca..9bd3f9da24 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -78,9 +78,6 @@ int grpc_lb_round_robin_trace = 0; typedef struct pending_pick { struct pending_pick *next; - /* polling entity for the pick()'s async notification */ - grpc_polling_entity *pollent; - /* output argument where to store the pick()ed user_data. It'll be NULL if no * such data is present or there's an error (the definite test for errors is * \a target being NULL). */ @@ -318,8 +315,6 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); *target = NULL; grpc_exec_ctx_sched( exec_ctx, pp->on_complete, @@ -348,8 +343,6 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); *pp->target = NULL; grpc_exec_ctx_sched( exec_ctx, pp->on_complete, @@ -403,7 +396,6 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_lock(&p->mu); if ((selected = peek_next_connected_locked(p))) { /* readily available, report right away */ - gpr_mu_unlock(&p->mu); *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); if (user_data != NULL) { @@ -416,17 +408,15 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } /* only advance the last picked pointer if the selection was used */ advance_last_picked_locked(p); + gpr_mu_unlock(&p->mu); return 1; } else { /* no pick currently available. Save for later in list of pending picks */ if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, - p->base.interested_parties); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; - pp->pollent = pick_args->pollent; pp->target = target; pp->on_complete = on_complete; pp->initial_metadata_flags = pick_args->initial_metadata_flags; @@ -482,8 +472,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", (void *)selected->subchannel, (void *)selected); } - grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, - p->base.interested_parties); grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } diff --git a/src/core/ext/load_reporting/load_reporting.h b/src/core/ext/load_reporting/load_reporting.h index e37817d8c2..e13097654d 100644 --- a/src/core/ext/load_reporting/load_reporting.h +++ b/src/core/ext/load_reporting/load_reporting.h @@ -37,13 +37,21 @@ #include <grpc/impl/codegen/grpc_types.h> #include "src/core/lib/channel/channel_stack.h" -/** Metadata key for initial metadata coming from clients */ -/* TODO(dgq): change to the final value TBD */ -#define GRPC_LOAD_REPORTING_INITIAL_MD_KEY "load-reporting-initial" +/** Metadata key for the gRPC LB load balancer token. + * + * The value corresponding to this key is an opaque token that is given to the + * frontend as part of each pick; the frontend sends this token to the backend + * in each request it sends when using that pick. The token is used by the + * backend to verify the request and to allow the backend to report load to the + * gRPC LB system. */ +#define GRPC_LB_TOKEN_MD_KEY "lb-token" -/** Metadata key for trailing metadata from servers */ -/* TODO(dgq): change to the final value TBD */ -#define GRPC_LOAD_REPORTING_TRAILING_MD_KEY "load-reporting-trailing" +/** Metadata key for gRPC LB cost reporting. + * + * The value corresponding to this key is an opaque binary blob reported by the + * backend as part of its trailing metadata containing cost information for the + * call. */ +#define GRPC_LB_COST_MD_KEY "lb-cost" /** Identifiers for the invocation point of the users LR callback */ typedef enum grpc_load_reporting_source { diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index 394f0cb832..22bf36367f 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -75,7 +75,7 @@ static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) { if (md->key == GRPC_MDSTR_PATH) { calld->service_method = grpc_mdstr_as_c_string(md->value); - } else if (md->key == GRPC_MDSTR_LOAD_REPORTING_INITIAL) { + } else if (md->key == GRPC_MDSTR_LB_TOKEN) { calld->initial_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value)); return NULL; } @@ -193,7 +193,7 @@ static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - if (md->key == GRPC_MDSTR_LOAD_REPORTING_TRAILING) { + if (md->key == GRPC_MDSTR_LB_COST) { calld->trailing_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value)); return NULL; } diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 8de42bb7a9..e5909d9380 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -2023,6 +2023,10 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; } + if (!grpc_has_wakeup_fd()) { + return NULL; + } + if (!is_epoll_available()) { return NULL; } diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index b84a56018f..351b069613 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -47,10 +47,12 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include <grpc/support/thd.h> #include <grpc/support/tls.h> #include <grpc/support/useful.h> #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/wakeup_fd_cv.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" @@ -246,6 +248,28 @@ struct grpc_pollset_set { }; /******************************************************************************* + * condition variable polling definitions + */ + +#define CV_POLL_PERIOD_MS 1000 +#define CV_DEFAULT_TABLE_SIZE 16 + +typedef enum poll_status_t { INPROGRESS, COMPLETED, CANCELLED } poll_status_t; + +typedef struct poll_args { + gpr_refcount refcount; + gpr_cv *cv; + struct pollfd *fds; + nfds_t nfds; + int timeout; + int retval; + int err; + gpr_atm status; +} poll_args; + +cv_fd_table g_cvfds; + +/******************************************************************************* * fd_posix.c */ @@ -1262,10 +1286,211 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, } /******************************************************************************* + * Condition Variable polling extensions + */ + +static void decref_poll_args(poll_args *args) { + if (gpr_unref(&args->refcount)) { + gpr_free(args->fds); + gpr_cv_destroy(args->cv); + gpr_free(args->cv); + gpr_free(args); + } +} + +// Poll in a background thread +static void run_poll(void *arg) { + int timeout, retval; + poll_args *pargs = (poll_args *)arg; + while (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) { + if (pargs->timeout < 0) { + timeout = CV_POLL_PERIOD_MS; + } else { + timeout = GPR_MIN(CV_POLL_PERIOD_MS, pargs->timeout); + pargs->timeout -= timeout; + } + retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout); + if (retval != 0 || pargs->timeout == 0) { + pargs->retval = retval; + pargs->err = errno; + break; + } + } + gpr_mu_lock(&g_cvfds.mu); + if (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) { + // Signal main thread that the poll completed + gpr_atm_no_barrier_store(&pargs->status, COMPLETED); + gpr_cv_signal(pargs->cv); + } + decref_poll_args(pargs); + g_cvfds.pollcount--; + if (g_cvfds.shutdown && g_cvfds.pollcount == 0) { + gpr_cv_signal(&g_cvfds.shutdown_complete); + } + gpr_mu_unlock(&g_cvfds.mu); +} + +// This function overrides poll() to handle condition variable wakeup fds +static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { + unsigned int i; + int res, idx; + gpr_cv *pollcv; + cv_node *cvn, *prev; + nfds_t nsockfds = 0; + gpr_thd_id t_id; + gpr_thd_options opt; + poll_args *pargs = NULL; + gpr_mu_lock(&g_cvfds.mu); + pollcv = gpr_malloc(sizeof(gpr_cv)); + gpr_cv_init(pollcv); + for (i = 0; i < nfds; i++) { + fds[i].revents = 0; + if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { + idx = FD_TO_IDX(fds[i].fd); + cvn = gpr_malloc(sizeof(cv_node)); + cvn->cv = pollcv; + cvn->next = g_cvfds.cvfds[idx].cvs; + g_cvfds.cvfds[idx].cvs = cvn; + // We should return immediately if there are pending events, + // but we still need to call poll() to check for socket events + if (g_cvfds.cvfds[idx].is_set) { + timeout = 0; + } + } else if (fds[i].fd >= 0) { + nsockfds++; + } + } + + if (nsockfds > 0) { + pargs = gpr_malloc(sizeof(struct poll_args)); + // Both the main thread and calling thread get a reference + gpr_ref_init(&pargs->refcount, 2); + pargs->cv = pollcv; + pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds); + pargs->nfds = nsockfds; + pargs->timeout = timeout; + pargs->retval = 0; + pargs->err = 0; + gpr_atm_no_barrier_store(&pargs->status, INPROGRESS); + idx = 0; + for (i = 0; i < nfds; i++) { + if (fds[i].fd >= 0) { + pargs->fds[idx].fd = fds[i].fd; + pargs->fds[idx].events = fds[i].events; + pargs->fds[idx].revents = 0; + idx++; + } + } + g_cvfds.pollcount++; + opt = gpr_thd_options_default(); + gpr_thd_options_set_detached(&opt); + gpr_thd_new(&t_id, &run_poll, pargs, &opt); + // We want the poll() thread to trigger the deadline, so wait forever here + gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) { + res = pargs->retval; + errno = pargs->err; + } else { + res = 0; + errno = 0; + gpr_atm_no_barrier_store(&pargs->status, CANCELLED); + } + } else { + gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); + deadline = + gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); + gpr_cv_wait(pollcv, &g_cvfds.mu, deadline); + res = 0; + } + + idx = 0; + for (i = 0; i < nfds; i++) { + if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { + cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs; + prev = NULL; + while (cvn->cv != pollcv) { + prev = cvn; + cvn = cvn->next; + GPR_ASSERT(cvn); + } + if (!prev) { + g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next; + } else { + prev->next = cvn->next; + } + gpr_free(cvn); + + if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) { + fds[i].revents = POLLIN; + if (res >= 0) res++; + } + } else if (fds[i].fd >= 0 && + gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) { + fds[i].revents = pargs->fds[idx].revents; + idx++; + } + } + + if (pargs) { + decref_poll_args(pargs); + } else { + gpr_cv_destroy(pollcv); + gpr_free(pollcv); + } + gpr_mu_unlock(&g_cvfds.mu); + + return res; +} + +static void global_cv_fd_table_init() { + gpr_mu_init(&g_cvfds.mu); + gpr_mu_lock(&g_cvfds.mu); + gpr_cv_init(&g_cvfds.shutdown_complete); + g_cvfds.shutdown = 0; + g_cvfds.pollcount = 0; + g_cvfds.size = CV_DEFAULT_TABLE_SIZE; + g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE); + g_cvfds.free_fds = NULL; + for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) { + g_cvfds.cvfds[i].is_set = 0; + g_cvfds.cvfds[i].cvs = NULL; + g_cvfds.cvfds[i].next_free = g_cvfds.free_fds; + g_cvfds.free_fds = &g_cvfds.cvfds[i]; + } + // Override the poll function with one that supports cvfds + g_cvfds.poll = grpc_poll_function; + grpc_poll_function = &cvfd_poll; + gpr_mu_unlock(&g_cvfds.mu); +} + +static void global_cv_fd_table_shutdown() { + gpr_mu_lock(&g_cvfds.mu); + g_cvfds.shutdown = 1; + // Attempt to wait for all abandoned poll() threads to terminate + // Not doing so will result in reported memory leaks + if (g_cvfds.pollcount > 0) { + int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(3, GPR_TIMESPAN))); + GPR_ASSERT(res == 0); + } + gpr_cv_destroy(&g_cvfds.shutdown_complete); + grpc_poll_function = g_cvfds.poll; + gpr_free(g_cvfds.cvfds); + gpr_mu_unlock(&g_cvfds.mu); + gpr_mu_destroy(&g_cvfds.mu); +} + +/******************************************************************************* * event engine binding */ -static void shutdown_engine(void) { pollset_global_shutdown(); } +static void shutdown_engine(void) { + pollset_global_shutdown(); + if (grpc_cv_wakeup_fds_enabled()) { + global_cv_fd_table_shutdown(); + } +} static const grpc_event_engine_vtable vtable = { .pollset_size = sizeof(grpc_pollset), @@ -1307,7 +1532,21 @@ static const grpc_event_engine_vtable vtable = { }; const grpc_event_engine_vtable *grpc_init_poll_posix(void) { + if (!grpc_has_wakeup_fd()) { + return NULL; + } + if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) { + return NULL; + } + return &vtable; +} + +const grpc_event_engine_vtable *grpc_init_poll_cv_posix(void) { + global_cv_fd_table_init(); + grpc_enable_cv_wakeup_fds(1); if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) { + global_cv_fd_table_shutdown(); + grpc_enable_cv_wakeup_fds(0); return NULL; } return &vtable; diff --git a/src/core/lib/iomgr/ev_poll_posix.h b/src/core/lib/iomgr/ev_poll_posix.h index 291736a2db..202ffca14c 100644 --- a/src/core/lib/iomgr/ev_poll_posix.h +++ b/src/core/lib/iomgr/ev_poll_posix.h @@ -37,5 +37,6 @@ #include "src/core/lib/iomgr/ev_posix.h" const grpc_event_engine_vtable *grpc_init_poll_posix(void); +const grpc_event_engine_vtable *grpc_init_poll_cv_posix(void); #endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */ diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 26618f8d55..9857b0bce9 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -66,6 +66,7 @@ typedef struct { static const event_engine_factory g_factories[] = { {"epoll", grpc_init_epoll_linux}, {"poll", grpc_init_poll_posix}, + {"poll-cv", grpc_init_poll_cv_posix}, {"legacy", grpc_init_poll_and_epoll_posix}, }; diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.c new file mode 100644 index 0000000000..b4165208ed --- /dev/null +++ b/src/core/lib/iomgr/wakeup_fd_cv.c @@ -0,0 +1,118 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_WAKEUP_FD + +#include "src/core/lib/iomgr/wakeup_fd_cv.h" + +#include <errno.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> + +#define MAX_TABLE_RESIZE 256 + +extern cv_fd_table g_cvfds; + +static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { + unsigned int i, newsize; + int idx; + gpr_mu_lock(&g_cvfds.mu); + if (!g_cvfds.free_fds) { + newsize = GPR_MIN(g_cvfds.size * 2, g_cvfds.size + MAX_TABLE_RESIZE); + g_cvfds.cvfds = gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize); + for (i = g_cvfds.size; i < newsize; i++) { + g_cvfds.cvfds[i].is_set = 0; + g_cvfds.cvfds[i].cvs = NULL; + g_cvfds.cvfds[i].next_free = g_cvfds.free_fds; + g_cvfds.free_fds = &g_cvfds.cvfds[i]; + } + g_cvfds.size = newsize; + } + + idx = (int)(g_cvfds.free_fds - g_cvfds.cvfds); + g_cvfds.free_fds = g_cvfds.free_fds->next_free; + g_cvfds.cvfds[idx].cvs = NULL; + g_cvfds.cvfds[idx].is_set = 0; + fd_info->read_fd = IDX_TO_FD(idx); + fd_info->write_fd = -1; + gpr_mu_unlock(&g_cvfds.mu); + return GRPC_ERROR_NONE; +} + +static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) { + cv_node* cvn; + gpr_mu_lock(&g_cvfds.mu); + g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 1; + cvn = g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs; + while (cvn) { + gpr_cv_signal(cvn->cv); + cvn = cvn->next; + } + gpr_mu_unlock(&g_cvfds.mu); + return GRPC_ERROR_NONE; +} + +static grpc_error* cv_fd_consume(grpc_wakeup_fd* fd_info) { + gpr_mu_lock(&g_cvfds.mu); + g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 0; + gpr_mu_unlock(&g_cvfds.mu); + return GRPC_ERROR_NONE; +} + +static void cv_fd_destroy(grpc_wakeup_fd* fd_info) { + if (fd_info->read_fd == 0) { + return; + } + gpr_mu_lock(&g_cvfds.mu); + // Assert that there are no active pollers + GPR_ASSERT(!g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs); + g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds; + g_cvfds.free_fds = &g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)]; + gpr_mu_unlock(&g_cvfds.mu); +} + +static int cv_check_availability(void) { return 1; } + +const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable = { + cv_fd_init, cv_fd_consume, cv_fd_wakeup, cv_fd_destroy, + cv_check_availability}; + +#endif /* GPR_POSIX_WAKUP_FD */ diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h new file mode 100644 index 0000000000..ac16be1750 --- /dev/null +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -0,0 +1,80 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* + * wakeup_fd_cv uses condition variables to implement wakeup fds. + * + * It is intended for use only in cases when eventfd() and pipe() are not + * available. It can only be used with the "poll" engine. + * + * Implementation: + * A global table of cv wakeup fds is mantained. A cv wakeup fd is a negative + * file descriptor. poll() is then run in a background thread with only the + * real socket fds while we wait on a condition variable trigged by either the + * poll() completion or a wakeup_fd() call. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H +#define GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H + +#include <grpc/support/sync.h> + +#include "src/core/lib/iomgr/ev_posix.h" + +#define FD_TO_IDX(fd) (-(fd)-1) +#define IDX_TO_FD(idx) (-(idx)-1) + +typedef struct cv_node { + gpr_cv* cv; + struct cv_node* next; +} cv_node; + +typedef struct fd_node { + int is_set; + cv_node* cvs; + struct fd_node* next_free; +} fd_node; + +typedef struct cv_fd_table { + gpr_mu mu; + int pollcount; + int shutdown; + gpr_cv shutdown_complete; + fd_node* cvfds; + fd_node* free_fds; + unsigned int size; + grpc_poll_function_type poll; +} cv_fd_table; + +#endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H */ diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.c index 4e5dbdcb73..d0ea216aa0 100644 --- a/src/core/lib/iomgr/wakeup_fd_pipe.c +++ b/src/core/lib/iomgr/wakeup_fd_pipe.c @@ -47,11 +47,10 @@ static grpc_error* pipe_init(grpc_wakeup_fd* fd_info) { int pipefd[2]; - /* TODO(klempner): Make this nonfatal */ int r = pipe(pipefd); if (0 != r) { gpr_log(GPR_ERROR, "pipe creation failed (%d): %s", errno, strerror(errno)); - abort(); + return GRPC_OS_ERROR(errno, "pipe"); } grpc_error* err; err = grpc_set_socket_nonblocking(pipefd[0], 1); @@ -95,8 +94,13 @@ static void pipe_destroy(grpc_wakeup_fd* fd_info) { } static int pipe_check_availability(void) { - /* Assume that pipes are always available. */ - return 1; + grpc_wakeup_fd fd; + if (pipe_init(&fd) == GRPC_ERROR_NONE) { + pipe_destroy(&fd); + return 1; + } else { + return 0; + } } const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = { diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.c index 046208abc8..5c894bef37 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.c +++ b/src/core/lib/iomgr/wakeup_fd_posix.c @@ -36,37 +36,66 @@ #ifdef GPR_POSIX_WAKEUP_FD #include <stddef.h> +#include "src/core/lib/iomgr/wakeup_fd_cv.h" #include "src/core/lib/iomgr/wakeup_fd_pipe.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" +extern grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable; static const grpc_wakeup_fd_vtable *wakeup_fd_vtable = NULL; + int grpc_allow_specialized_wakeup_fd = 1; +int grpc_allow_pipe_wakeup_fd = 1; + +int has_real_wakeup_fd = 1; +int cv_wakeup_fds_enabled = 0; void grpc_wakeup_fd_global_init(void) { if (grpc_allow_specialized_wakeup_fd && grpc_specialized_wakeup_fd_vtable.check_availability()) { wakeup_fd_vtable = &grpc_specialized_wakeup_fd_vtable; - } else { + } else if (grpc_allow_pipe_wakeup_fd && + grpc_pipe_wakeup_fd_vtable.check_availability()) { wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable; + } else { + has_real_wakeup_fd = 0; } } void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; } +int grpc_has_wakeup_fd(void) { return has_real_wakeup_fd; } + +int grpc_cv_wakeup_fds_enabled(void) { return cv_wakeup_fds_enabled; } + +void grpc_enable_cv_wakeup_fds(int enable) { cv_wakeup_fds_enabled = enable; } + grpc_error *grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) { + if (cv_wakeup_fds_enabled) { + return grpc_cv_wakeup_fd_vtable.init(fd_info); + } return wakeup_fd_vtable->init(fd_info); } grpc_error *grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) { + if (cv_wakeup_fds_enabled) { + return grpc_cv_wakeup_fd_vtable.consume(fd_info); + } return wakeup_fd_vtable->consume(fd_info); } grpc_error *grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) { + if (cv_wakeup_fds_enabled) { + return grpc_cv_wakeup_fd_vtable.wakeup(fd_info); + } return wakeup_fd_vtable->wakeup(fd_info); } void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) { - wakeup_fd_vtable->destroy(fd_info); + if (cv_wakeup_fds_enabled) { + grpc_cv_wakeup_fd_vtable.destroy(fd_info); + } else { + wakeup_fd_vtable->destroy(fd_info); + } } #endif /* GPR_POSIX_WAKEUP_FD */ diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h index e269f242d8..71d32d97ba 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.h +++ b/src/core/lib/iomgr/wakeup_fd_posix.h @@ -71,6 +71,10 @@ void grpc_wakeup_fd_global_destroy(void); * purposes only.*/ void grpc_wakeup_fd_global_init_force_fallback(void); +int grpc_has_wakeup_fd(void); +int grpc_cv_wakeup_fds_enabled(void); +void grpc_enable_cv_wakeup_fds(int enable); + typedef struct grpc_wakeup_fd grpc_wakeup_fd; typedef struct grpc_wakeup_fd_vtable { @@ -88,6 +92,7 @@ struct grpc_wakeup_fd { }; extern int grpc_allow_specialized_wakeup_fd; +extern int grpc_allow_pipe_wakeup_fd; #define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd) diff --git a/src/core/lib/transport/static_metadata.c b/src/core/lib/transport/static_metadata.c index 5e0352a467..f019ef156a 100644 --- a/src/core/lib/transport/static_metadata.c +++ b/src/core/lib/transport/static_metadata.c @@ -126,9 +126,9 @@ const char *const grpc_static_metadata_strings[GRPC_STATIC_MDSTR_COUNT] = { "if-range", "if-unmodified-since", "last-modified", + "lb-cost", + "lb-token", "link", - "load-reporting-initial", - "load-reporting-trailing", "location", "max-forwards", ":method", diff --git a/src/core/lib/transport/static_metadata.h b/src/core/lib/transport/static_metadata.h index 5b9ee1a60a..e0a8196419 100644 --- a/src/core/lib/transport/static_metadata.h +++ b/src/core/lib/transport/static_metadata.h @@ -175,12 +175,12 @@ extern grpc_mdstr grpc_static_mdstr_table[GRPC_STATIC_MDSTR_COUNT]; #define GRPC_MDSTR_IF_UNMODIFIED_SINCE (&grpc_static_mdstr_table[62]) /* "last-modified" */ #define GRPC_MDSTR_LAST_MODIFIED (&grpc_static_mdstr_table[63]) +/* "lb-cost" */ +#define GRPC_MDSTR_LB_COST (&grpc_static_mdstr_table[64]) +/* "lb-token" */ +#define GRPC_MDSTR_LB_TOKEN (&grpc_static_mdstr_table[65]) /* "link" */ -#define GRPC_MDSTR_LINK (&grpc_static_mdstr_table[64]) -/* "load-reporting-initial" */ -#define GRPC_MDSTR_LOAD_REPORTING_INITIAL (&grpc_static_mdstr_table[65]) -/* "load-reporting-trailing" */ -#define GRPC_MDSTR_LOAD_REPORTING_TRAILING (&grpc_static_mdstr_table[66]) +#define GRPC_MDSTR_LINK (&grpc_static_mdstr_table[66]) /* "location" */ #define GRPC_MDSTR_LOCATION (&grpc_static_mdstr_table[67]) /* "max-forwards" */ @@ -337,13 +337,12 @@ extern uintptr_t grpc_static_mdelem_user_data[GRPC_STATIC_MDELEM_COUNT]; #define GRPC_MDELEM_IF_UNMODIFIED_SINCE_EMPTY (&grpc_static_mdelem_table[44]) /* "last-modified": "" */ #define GRPC_MDELEM_LAST_MODIFIED_EMPTY (&grpc_static_mdelem_table[45]) +/* "lb-cost": "" */ +#define GRPC_MDELEM_LB_COST_EMPTY (&grpc_static_mdelem_table[46]) +/* "lb-token": "" */ +#define GRPC_MDELEM_LB_TOKEN_EMPTY (&grpc_static_mdelem_table[47]) /* "link": "" */ -#define GRPC_MDELEM_LINK_EMPTY (&grpc_static_mdelem_table[46]) -/* "load-reporting-initial": "" */ -#define GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY (&grpc_static_mdelem_table[47]) -/* "load-reporting-trailing": "" */ -#define GRPC_MDELEM_LOAD_REPORTING_TRAILING_EMPTY \ - (&grpc_static_mdelem_table[48]) +#define GRPC_MDELEM_LINK_EMPTY (&grpc_static_mdelem_table[48]) /* "location": "" */ #define GRPC_MDELEM_LOCATION_EMPTY (&grpc_static_mdelem_table[49]) /* "max-forwards": "" */ diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c index f350e55b34..533ec52077 100644 --- a/src/core/lib/transport/transport_op_string.c +++ b/src/core/lib/transport/transport_op_string.c @@ -73,56 +73,51 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { char *tmp; char *out; - bool first = true; gpr_strvec b; gpr_strvec_init(&b); + gpr_strvec_add( + &b, gpr_strdup(op->covered_by_poller ? "[COVERED]" : "[UNCOVERED]")); + if (op->send_initial_metadata != NULL) { - if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); - first = false; + gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("SEND_INITIAL_METADATA{")); put_metadata_list(&b, *op->send_initial_metadata); gpr_strvec_add(&b, gpr_strdup("}")); } if (op->send_message != NULL) { - if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); - first = false; + gpr_strvec_add(&b, gpr_strdup(" ")); gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d", op->send_message->flags, op->send_message->length); gpr_strvec_add(&b, tmp); } if (op->send_trailing_metadata != NULL) { - if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); - first = false; + gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("SEND_TRAILING_METADATA{")); put_metadata_list(&b, *op->send_trailing_metadata); gpr_strvec_add(&b, gpr_strdup("}")); } if (op->recv_initial_metadata != NULL) { - if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); - first = false; + gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("RECV_INITIAL_METADATA")); } if (op->recv_message != NULL) { - if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); - first = false; + gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE")); } if (op->recv_trailing_metadata != NULL) { - if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); - first = false; + gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("RECV_TRAILING_METADATA")); } if (op->cancel_error != GRPC_ERROR_NONE) { - if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); - first = false; + gpr_strvec_add(&b, gpr_strdup(" ")); const char *msg = grpc_error_string(op->cancel_error); gpr_asprintf(&tmp, "CANCEL:%s", msg); grpc_error_free_string(msg); @@ -130,8 +125,7 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { } if (op->close_error != GRPC_ERROR_NONE) { - if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); - first = false; + gpr_strvec_add(&b, gpr_strdup(" ")); const char *msg = grpc_error_string(op->close_error); gpr_asprintf(&tmp, "CLOSE:%s", msg); grpc_error_free_string(msg); diff --git a/src/proto/grpc/testing/test.proto b/src/proto/grpc/testing/test.proto index b52c4cbad6..d6ef58ab12 100644 --- a/src/proto/grpc/testing/test.proto +++ b/src/proto/grpc/testing/test.proto @@ -74,6 +74,10 @@ service TestService { // first request. rpc HalfDuplexCall(stream StreamingOutputCallRequest) returns (stream StreamingOutputCallResponse); + + // The test server will not implement this method. It will be used + // to test the behavior when clients call unimplemented methods. + rpc UnimplementedMethod(grpc.testing.Empty) returns (grpc.testing.Empty); } // A simple service NOT implemented at servers so clients can test for diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 5cf66312cc..eeb30135e2 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -135,6 +135,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/udp_server.c', 'src/core/lib/iomgr/unix_sockets_posix.c', 'src/core/lib/iomgr/unix_sockets_posix_noop.c', + 'src/core/lib/iomgr/wakeup_fd_cv.c', 'src/core/lib/iomgr/wakeup_fd_eventfd.c', 'src/core/lib/iomgr/wakeup_fd_nospecial.c', 'src/core/lib/iomgr/wakeup_fd_pipe.c', diff --git a/test/core/end2end/fuzzers/hpack.dictionary b/test/core/end2end/fuzzers/hpack.dictionary index a93bccfa0d..181bbe845e 100644 --- a/test/core/end2end/fuzzers/hpack.dictionary +++ b/test/core/end2end/fuzzers/hpack.dictionary @@ -63,9 +63,9 @@ "\x08if-range" "\x13if-unmodified-since" "\x0Dlast-modified" +"\x07lb-cost" +"\x08lb-token" "\x04link" -"\x16load-reporting-initial" -"\x17load-reporting-trailing" "\x08location" "\x0Cmax-forwards" "\x07:method" @@ -138,9 +138,9 @@ "\x00\x08if-range\x00" "\x00\x13if-unmodified-since\x00" "\x00\x0Dlast-modified\x00" +"\x00\x07lb-cost\x00" +"\x00\x08lb-token\x00" "\x00\x04link\x00" -"\x00\x16load-reporting-initial\x00" -"\x00\x17load-reporting-trailing\x00" "\x00\x08location\x00" "\x00\x0Cmax-forwards\x00" "\x00\x07:method\x03GET" diff --git a/test/core/end2end/tests/load_reporting_hook.c b/test/core/end2end/tests/load_reporting_hook.c index 59d054cf87..7f95dfd949 100644 --- a/test/core/end2end/tests/load_reporting_hook.c +++ b/test/core/end2end/tests/load_reporting_hook.c @@ -295,13 +295,13 @@ static void test_load_reporting_hook(grpc_end2end_test_config config) { grpc_metadata initial_lr_metadata; grpc_metadata trailing_lr_metadata; - initial_lr_metadata.key = GRPC_LOAD_REPORTING_INITIAL_MD_KEY; + initial_lr_metadata.key = GRPC_LB_TOKEN_MD_KEY; initial_lr_metadata.value = "client-token"; initial_lr_metadata.value_length = strlen(initial_lr_metadata.value); memset(&initial_lr_metadata.internal_data, 0, sizeof(initial_lr_metadata.internal_data)); - trailing_lr_metadata.key = GRPC_LOAD_REPORTING_TRAILING_MD_KEY; + trailing_lr_metadata.key = GRPC_LB_COST_MD_KEY; trailing_lr_metadata.value = "server-token"; trailing_lr_metadata.value_length = strlen(trailing_lr_metadata.value); memset(&trailing_lr_metadata.internal_data, 0, diff --git a/test/core/end2end/tests/payload.c b/test/core/end2end/tests/payload.c index ed1c719ef8..40696d088f 100644 --- a/test/core/end2end/tests/payload.c +++ b/test/core/end2end/tests/payload.c @@ -99,12 +99,16 @@ static void end_test(grpc_end2end_test_fixture *f) { static gpr_slice generate_random_slice() { size_t i; static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890"; - char output[1024 * 1024]; - for (i = 0; i < GPR_ARRAY_SIZE(output) - 1; ++i) { + char *output; + const size_t output_size = 1024 * 1024; + output = gpr_malloc(output_size); + for (i = 0; i < output_size - 1; ++i) { output[i] = chars[rand() % (int)(sizeof(chars) - 1)]; } - output[GPR_ARRAY_SIZE(output) - 1] = '\0'; - return gpr_slice_from_copied_string(output); + output[output_size - 1] = '\0'; + gpr_slice out = gpr_slice_from_copied_string(output); + gpr_free(output); + return out; } static void request_response_with_payload(grpc_end2end_test_fixture f) { diff --git a/test/core/iomgr/wakeup_fd_cv_test.c b/test/core/iomgr/wakeup_fd_cv_test.c new file mode 100644 index 0000000000..952985ed7e --- /dev/null +++ b/test/core/iomgr/wakeup_fd_cv_test.c @@ -0,0 +1,240 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <pthread.h> + +#include <grpc/support/log.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/iomgr_posix.h" +#include "src/core/lib/support/env.h" + +typedef struct poll_args { + struct pollfd *fds; + nfds_t nfds; + int timeout; + int result; +} poll_args; + +gpr_cv poll_cv; +gpr_mu poll_mu; +static int socket_event = 0; + +// Trigger a "socket" POLLIN in mock_poll() +void trigger_socket_event() { + gpr_mu_lock(&poll_mu); + socket_event = 1; + gpr_cv_broadcast(&poll_cv); + gpr_mu_unlock(&poll_mu); +} + +void reset_socket_event() { + gpr_mu_lock(&poll_mu); + socket_event = 0; + gpr_mu_unlock(&poll_mu); +} + +// Mocks posix poll() function +int mock_poll(struct pollfd *fds, nfds_t nfds, int timeout) { + int res = 0; + gpr_timespec poll_time; + gpr_mu_lock(&poll_mu); + GPR_ASSERT(nfds == 3); + GPR_ASSERT(fds[0].fd == 20); + GPR_ASSERT(fds[1].fd == 30); + GPR_ASSERT(fds[2].fd == 50); + GPR_ASSERT(fds[0].events == (POLLIN | POLLHUP)); + GPR_ASSERT(fds[1].events == (POLLIN | POLLHUP)); + GPR_ASSERT(fds[2].events == POLLIN); + + if (timeout < 0) { + poll_time = gpr_inf_future(GPR_CLOCK_REALTIME); + } else { + poll_time = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(timeout, GPR_TIMESPAN)); + } + + if (socket_event || !gpr_cv_wait(&poll_cv, &poll_mu, poll_time)) { + fds[0].revents = POLLIN; + res = 1; + } + gpr_mu_unlock(&poll_mu); + return res; +} + +void background_poll(void *args) { + poll_args *pargs = (poll_args *)args; + pargs->result = grpc_poll_function(pargs->fds, pargs->nfds, pargs->timeout); +} + +void test_many_fds(void) { + int i; + grpc_wakeup_fd fd[1000]; + for (i = 0; i < 1000; i++) { + GPR_ASSERT(grpc_wakeup_fd_init(&fd[i]) == GRPC_ERROR_NONE); + } + for (i = 0; i < 1000; i++) { + grpc_wakeup_fd_destroy(&fd[i]); + } +} + +void test_poll_cv_trigger(void) { + grpc_wakeup_fd cvfd1, cvfd2, cvfd3; + struct pollfd pfds[6]; + poll_args pargs; + gpr_thd_id t_id; + gpr_thd_options opt; + + GPR_ASSERT(grpc_wakeup_fd_init(&cvfd1) == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_wakeup_fd_init(&cvfd2) == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_wakeup_fd_init(&cvfd3) == GRPC_ERROR_NONE); + GPR_ASSERT(cvfd1.read_fd < 0); + GPR_ASSERT(cvfd2.read_fd < 0); + GPR_ASSERT(cvfd3.read_fd < 0); + GPR_ASSERT(cvfd1.read_fd != cvfd2.read_fd); + GPR_ASSERT(cvfd2.read_fd != cvfd3.read_fd); + GPR_ASSERT(cvfd1.read_fd != cvfd3.read_fd); + + pfds[0].fd = cvfd1.read_fd; + pfds[1].fd = cvfd2.read_fd; + pfds[2].fd = 20; + pfds[3].fd = 30; + pfds[4].fd = cvfd3.read_fd; + pfds[5].fd = 50; + + pfds[0].events = 0; + pfds[1].events = POLLIN; + pfds[2].events = POLLIN | POLLHUP; + pfds[3].events = POLLIN | POLLHUP; + pfds[4].events = POLLIN; + pfds[5].events = POLLIN; + + pargs.fds = pfds; + pargs.nfds = 6; + pargs.timeout = 1000; + pargs.result = -2; + + opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + gpr_thd_new(&t_id, &background_poll, &pargs, &opt); + + // Wakeup wakeup_fd not listening for events + GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd1) == GRPC_ERROR_NONE); + gpr_thd_join(t_id); + GPR_ASSERT(pargs.result == 0); + GPR_ASSERT(pfds[0].revents == 0); + GPR_ASSERT(pfds[1].revents == 0); + GPR_ASSERT(pfds[2].revents == 0); + GPR_ASSERT(pfds[3].revents == 0); + GPR_ASSERT(pfds[4].revents == 0); + GPR_ASSERT(pfds[5].revents == 0); + + // Pollin on socket fd + pargs.timeout = -1; + pargs.result = -2; + gpr_thd_new(&t_id, &background_poll, &pargs, &opt); + trigger_socket_event(); + gpr_thd_join(t_id); + GPR_ASSERT(pargs.result == 1); + GPR_ASSERT(pfds[0].revents == 0); + GPR_ASSERT(pfds[1].revents == 0); + GPR_ASSERT(pfds[2].revents == POLLIN); + GPR_ASSERT(pfds[3].revents == 0); + GPR_ASSERT(pfds[4].revents == 0); + GPR_ASSERT(pfds[5].revents == 0); + + // Pollin on wakeup fd + reset_socket_event(); + pargs.result = -2; + gpr_thd_new(&t_id, &background_poll, &pargs, &opt); + GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd2) == GRPC_ERROR_NONE); + gpr_thd_join(t_id); + + GPR_ASSERT(pargs.result == 1); + GPR_ASSERT(pfds[0].revents == 0); + GPR_ASSERT(pfds[1].revents == POLLIN); + GPR_ASSERT(pfds[2].revents == 0); + GPR_ASSERT(pfds[3].revents == 0); + GPR_ASSERT(pfds[4].revents == 0); + GPR_ASSERT(pfds[5].revents == 0); + + // Pollin on wakeup fd + socket fd + trigger_socket_event(); + pargs.result = -2; + gpr_thd_new(&t_id, &background_poll, &pargs, &opt); + gpr_thd_join(t_id); + + GPR_ASSERT(pargs.result == 2); + GPR_ASSERT(pfds[0].revents == 0); + GPR_ASSERT(pfds[1].revents == POLLIN); + GPR_ASSERT(pfds[2].revents == POLLIN); + GPR_ASSERT(pfds[3].revents == 0); + GPR_ASSERT(pfds[4].revents == 0); + GPR_ASSERT(pfds[5].revents == 0); + + // No Events + pargs.result = -2; + pargs.timeout = 1000; + reset_socket_event(); + GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd1) == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd2) == GRPC_ERROR_NONE); + gpr_thd_new(&t_id, &background_poll, &pargs, &opt); + gpr_thd_join(t_id); + + GPR_ASSERT(pargs.result == 0); + GPR_ASSERT(pfds[0].revents == 0); + GPR_ASSERT(pfds[1].revents == 0); + GPR_ASSERT(pfds[2].revents == 0); + GPR_ASSERT(pfds[3].revents == 0); + GPR_ASSERT(pfds[4].revents == 0); + GPR_ASSERT(pfds[5].revents == 0); +} + +int main(int argc, char **argv) { + gpr_setenv("GRPC_POLL_STRATEGY", "poll-cv"); + grpc_poll_function = &mock_poll; + gpr_mu_init(&poll_mu); + gpr_cv_init(&poll_cv); + + grpc_iomgr_platform_init(); + test_many_fds(); + grpc_iomgr_platform_shutdown(); + + grpc_iomgr_platform_init(); + test_poll_cv_trigger(); + grpc_iomgr_platform_shutdown(); + return 0; +} diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index 7666c4e60b..80f2fa4f4d 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -334,8 +334,7 @@ static void start_backend_server(server_fixture *sf) { // have a version for int but does have one for long long int. string expected_token = "token" + std::to_string((long long int)sf->port); expected_token.resize(64, '-'); - GPR_ASSERT(contains_metadata(&request_metadata_recv, - "load-reporting-initial", + GPR_ASSERT(contains_metadata(&request_metadata_recv, "lb-token", expected_token.c_str())); gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport); diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 999be9d8a3..245e27b2bb 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -79,7 +79,8 @@ DEFINE_string(test_case, "large_unary", "slow_consumer : single request with response streaming with " "slow client consumer;\n" "status_code_and_message: verify status code & message;\n" - "timeout_on_sleeping_server: deadline exceeds on stream;\n"); + "timeout_on_sleeping_server: deadline exceeds on stream;\n" + "unimplemented_method: client calls an unimplemented_method;\n"); DEFINE_string(default_service_account, "", "Email of GCE default service account"); DEFINE_string(service_account_key_file, "", @@ -149,6 +150,8 @@ int main(int argc, char** argv) { client.DoStatusWithMessage(); } else if (FLAGS_test_case == "custom_metadata") { client.DoCustomMetadata(); + } else if (FLAGS_test_case == "unimplemented_method") { + client.DoUnimplementedMethod(); } else if (FLAGS_test_case == "cacheable_unary") { client.DoCacheableUnary(); } else if (FLAGS_test_case == "all") { @@ -168,6 +171,7 @@ int main(int argc, char** argv) { client.DoEmptyStream(); client.DoStatusWithMessage(); client.DoCustomMetadata(); + client.DoUnimplementedMethod(); client.DoCacheableUnary(); // service_account_creds and jwt_token_creds can only run with ssl. if (FLAGS_use_tls) { @@ -202,7 +206,8 @@ int main(int argc, char** argv) { "server_compressed_unary", "server_streaming", "status_code_and_message", - "timeout_on_sleeping_server"}; + "timeout_on_sleeping_server", + "unimplemented_method"}; char* joined_testcases = gpr_strjoin_sep(testcases, GPR_ARRAY_SIZE(testcases), "\n", NULL); diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index e9a804ccae..ffd19eb1d5 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -981,5 +981,23 @@ bool InteropClient::DoCustomMetadata() { return true; } +bool InteropClient::DoUnimplementedMethod() { + gpr_log(GPR_DEBUG, "Sending a request for an unimplemented rpc..."); + + Empty request = Empty::default_instance(); + Empty response = Empty::default_instance(); + ClientContext context; + + Status s = + serviceStub_.Get()->UnimplementedMethod(&context, request, &response); + + if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED)) { + return false; + } + + gpr_log(GPR_DEBUG, "unimplemented rpc done."); + return true; +} + } // namespace testing } // namespace grpc diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index 1e89f0987d..0a96e7734d 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -79,6 +79,7 @@ class InteropClient { bool DoEmptyStream(); bool DoStatusWithMessage(); bool DoCustomMetadata(); + bool DoUnimplementedMethod(); bool DoCacheableUnary(); // Auth tests. // username is a string containing the user email diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 1163b765e3..3d2ac78868 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -137,9 +137,7 @@ class AsyncQpsServerTest : public Server { std::lock_guard<std::mutex> lock((*ss)->mutex); (*ss)->shutdown = true; } - // TODO (vpai): Remove this deadline and allow Shutdown to finish properly - auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3); - server_->Shutdown(deadline); + std::thread shutdown_thread(&AsyncQpsServerTest::ShutdownThreadFunc, this); for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { (*cq)->Shutdown(); } @@ -152,9 +150,16 @@ class AsyncQpsServerTest : public Server { while ((*cq)->Next(&got_tag, &ok)) ; } + shutdown_thread.join(); } private: + void ShutdownThreadFunc() { + // TODO (vpai): Remove this deadline and allow Shutdown to finish properly + auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3); + server_->Shutdown(deadline); + } + void ThreadFunc(int thread_idx) { // Wait until work is available or we are shutting down bool ok; diff --git a/tools/codegen/core/gen_static_metadata.py b/tools/codegen/core/gen_static_metadata.py index 6f400102e4..6f8cad279c 100755 --- a/tools/codegen/core/gen_static_metadata.py +++ b/tools/codegen/core/gen_static_metadata.py @@ -109,8 +109,8 @@ CONFIG = [ ('if-range', ''), ('if-unmodified-since', ''), ('last-modified', ''), - ('load-reporting-initial', ''), - ('load-reporting-trailing', ''), + ('lb-token', ''), + ('lb-cost', ''), ('link', ''), ('location', ''), ('max-forwards', ''), diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index cdc70f8bf1..71a33e67c7 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -843,6 +843,7 @@ src/core/lib/iomgr/timer.h \ src/core/lib/iomgr/timer_heap.h \ src/core/lib/iomgr/udp_server.h \ src/core/lib/iomgr/unix_sockets_posix.h \ +src/core/lib/iomgr/wakeup_fd_cv.h \ src/core/lib/iomgr/wakeup_fd_pipe.h \ src/core/lib/iomgr/wakeup_fd_posix.h \ src/core/lib/iomgr/workqueue.h \ @@ -1009,6 +1010,7 @@ src/core/lib/iomgr/timer_heap.c \ src/core/lib/iomgr/udp_server.c \ src/core/lib/iomgr/unix_sockets_posix.c \ src/core/lib/iomgr/unix_sockets_posix_noop.c \ +src/core/lib/iomgr/wakeup_fd_cv.c \ src/core/lib/iomgr/wakeup_fd_eventfd.c \ src/core/lib/iomgr/wakeup_fd_nospecial.c \ src/core/lib/iomgr/wakeup_fd_pipe.c \ diff --git a/tools/run_tests/filter_pull_request_tests.py b/tools/run_tests/filter_pull_request_tests.py new file mode 100644 index 0000000000..55dab42f8a --- /dev/null +++ b/tools/run_tests/filter_pull_request_tests.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python2.7 +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Filter out tests based on file differences compared to merge target branch""" + +import re +from subprocess import call, check_output + + +class TestSuite: + """ + Contains tag to identify job as belonging to this test suite and + triggers to identify if changed files are relevant + """ + def __init__(self, tags): + """ + Build TestSuite to group tests by their tags + :param tag: string used to identify if a job belongs to this TestSuite + todo(mattkwong): Change the use of tag because do not want to depend on + job.shortname to identify what suite a test belongs to + """ + self.triggers = [] + self.tags = tags + + def add_trigger(self, trigger): + """ + Add a regex to list of triggers that determine if a changed file should run tests + :param trigger: regex matching file relevant to tests + """ + self.triggers.append(trigger) + +# Create test suites +_core_test_suite = TestSuite(['_c_']) +_cpp_test_suite = TestSuite(['_c++_']) +_csharp_test_suite = TestSuite(['_csharp_']) +_node_test_suite = TestSuite(['_node_']) +_objc_test_suite = TestSuite(['_objc_']) +_php_test_suite = TestSuite(['_php_', '_php7_']) +_python_test_suite = TestSuite(['_python_']) +_ruby_test_suite = TestSuite(['_ruby']) +_all_test_suites = [_core_test_suite, _cpp_test_suite, _csharp_test_suite, + _node_test_suite, _objc_test_suite, _php_test_suite, + _python_test_suite, _ruby_test_suite] + +# Dictionary of whitelistable files where the key is a regex matching changed files +# and the value is a list of tests that should be run. An empty list means that +# the changed files should not trigger any tests. Any changed file that does not +# match any of these regexes will trigger all tests +_WHITELIST_DICT = { + '^templates/.*': [], + '^doc/.*': [], + '^examples/.*': [], + '^summerofcode/.*': [], + '.*README.md$': [], + '.*LICENSE$': [], + '^src/cpp.*': [_cpp_test_suite], + '^src/csharp.*': [_csharp_test_suite], + '^src/node.*': [_node_test_suite], + '^src/objective-c.*': [_objc_test_suite], + '^src/php.*': [_php_test_suite], + '^src/python.*': [_python_test_suite], + '^src/ruby.*': [_ruby_test_suite], + '^test/core.*': [_core_test_suite], + '^test/cpp.*': [_cpp_test_suite], + '^test/distrib/cpp.*': [_cpp_test_suite], + '^test/distrib/csharp.*': [_csharp_test_suite], + '^test/distrib/node.*': [_node_test_suite], + '^test/distrib/php.*': [_php_test_suite], + '^test/distrib/python.*': [_python_test_suite], + '^test/distrib/ruby.*': [_ruby_test_suite] +} +# Add all triggers to their respective test suites +for trigger, test_suites in _WHITELIST_DICT.iteritems(): + for test_suite in test_suites: + test_suite.add_trigger(trigger) + + +def _get_changed_files(base_branch): + """ + Get list of changed files between current branch and base of target merge branch + """ + # git fetch might need to be called on Jenkins slave + # todo(mattkwong): remove or uncomment below after seeing if Jenkins needs this + # call(['git', 'fetch']) + + # Get file changes between branch and merge-base of specified branch + # Not combined to be Windows friendly + base_commit = check_output(["git", "merge-base", base_branch, "HEAD"]).rstrip() + return check_output(["git", "diff", base_commit, "--name-only"]).splitlines() + + +def _can_skip_tests(file_names, triggers): + """ + Determines if tests are skippable based on if all files do not match list of regexes + :param file_names: list of changed files generated by _get_changed_files() + :param triggers: list of regexes matching file name that indicates tests should be run + :return: safe to skip tests + """ + for file_name in file_names: + if any(re.match(trigger, file_name) for trigger in triggers): + return False + return True + + +def _remove_irrelevant_tests(tests, tag): + """ + Filters out tests by config or language - will not remove sanitizer tests + :param tests: list of all tests generated by run_tests_matrix.py + :param tag: string representing language or config to filter - "_(language)_" or "_(config)" + :return: list of relevant tests + """ + # todo(mattkwong): find a more reliable way to filter tests - don't use shortname + return [test for test in tests if tag not in test.shortname or + any(san_tag in test.shortname for san_tag in ['_asan', '_tsan', '_msan'])] + + +def _remove_sanitizer_tests(tests): + """ + Filters out sanitizer tests + :param tests: list of all tests generated by run_tests_matrix.py + :return: list of relevant tests + """ + # todo(mattkwong): find a more reliable way to filter tests - don't use shortname + return [test for test in tests if + all(san_tag not in test.shortname for san_tag in ['_asan', '_tsan', '_msan'])] + + +def filter_tests(tests, base_branch): + """ + Filters out tests that are safe to ignore + :param tests: list of all tests generated by run_tests_matrix.py + :return: list of relevant tests + """ + print("Finding file differences between %s repo and current branch...\n" % base_branch) + changed_files = _get_changed_files(base_branch) + for changed_file in changed_files: + print(changed_file) + print + + # Regex that combines all keys in _WHITELIST_DICT + all_triggers = "(" + ")|(".join(_WHITELIST_DICT.keys()) + ")" + # Check if all tests have to be run + for changed_file in changed_files: + if not re.match(all_triggers, changed_file): + return(tests) + # Filter out tests by language + for test_suite in _all_test_suites: + if _can_skip_tests(changed_files, test_suite.triggers): + for tag in test_suite.tags: + print(" Filtering %s tests" % tag) + tests = _remove_irrelevant_tests(tests, tag) + # Sanitizer tests skipped if core and c++ are skipped + if _can_skip_tests(changed_files, _cpp_test_suite.triggers + _core_test_suite.triggers): + print(" Filtering Sanitizer tests") + tests = _remove_sanitizer_tests(tests) + + return tests diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index dd070b1fe0..e9ee5f538d 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -69,7 +69,7 @@ _FORCE_ENVIRON_FOR_WRAPPERS = { _POLLING_STRATEGIES = { - 'linux': ['epoll', 'poll', 'legacy'] + 'linux': ['epoll', 'poll', 'poll-cv', 'legacy'] } diff --git a/tools/run_tests/run_tests_matrix.py b/tools/run_tests/run_tests_matrix.py index 60c21a1e21..550dd10ea5 100755 --- a/tools/run_tests/run_tests_matrix.py +++ b/tools/run_tests/run_tests_matrix.py @@ -36,13 +36,14 @@ import multiprocessing import os import report_utils import sys +from filter_pull_request_tests import filter_tests _ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..')) os.chdir(_ROOT) # Set the timeout high to allow enough time for sanitizers and pre-building # clang docker. -_RUNTESTS_TIMEOUT = 2*60*60 +_RUNTESTS_TIMEOUT = 4*60*60 # Number of jobs assigned to each run_tests.py instance _INNER_JOBS = 2 @@ -231,6 +232,15 @@ argp.add_argument('--dry_run', action='store_const', const=True, help='Only print what would be run.') +argp.add_argument('--filter_pr_tests', + default=False, + action='store_const', + const=True, + help='Filters out tests irrelavant to pull request changes.') +argp.add_argument('--base_branch', + default='origin/master', + type=str, + help='Branch that pull request is requesting to merge into') args = argp.parse_args() extra_args = [] @@ -264,6 +274,19 @@ for job in jobs: print ' %s' % job.shortname print +if args.filter_pr_tests: + print 'IMPORTANT: Test filtering is not active; this is only for testing.' + relevant_jobs = filter_tests(jobs, args.base_branch) + # todo(mattkwong): add skipped tests to report.xml + print + if len(relevant_jobs) == len(jobs): + print '(TESTING) No tests will be skipped.' + else: + print '(TESTING) These tests will be skipped:' + for job in list(set(jobs) - set(relevant_jobs)): + print ' %s' % job.shortname + print + if args.dry_run: print '--dry_run was used, exiting' sys.exit(1) diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 42d29ebc9e..e84ee7eb5d 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -2101,6 +2101,23 @@ "gpr", "gpr_test_util", "grpc", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c", + "name": "wakeup_fd_cv_test", + "src": [ + "test/core/iomgr/wakeup_fd_cv_test.c" + ], + "third_party": false, + "type": "target" + }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", "grpc++", "grpc++_test_util", "grpc_test_util" @@ -6426,6 +6443,7 @@ "src/core/lib/iomgr/timer_heap.h", "src/core/lib/iomgr/udp_server.h", "src/core/lib/iomgr/unix_sockets_posix.h", + "src/core/lib/iomgr/wakeup_fd_cv.h", "src/core/lib/iomgr/wakeup_fd_pipe.h", "src/core/lib/iomgr/wakeup_fd_posix.h", "src/core/lib/iomgr/workqueue.h", @@ -6579,6 +6597,8 @@ "src/core/lib/iomgr/unix_sockets_posix.c", "src/core/lib/iomgr/unix_sockets_posix.h", "src/core/lib/iomgr/unix_sockets_posix_noop.c", + "src/core/lib/iomgr/wakeup_fd_cv.c", + "src/core/lib/iomgr/wakeup_fd_cv.h", "src/core/lib/iomgr/wakeup_fd_eventfd.c", "src/core/lib/iomgr/wakeup_fd_nospecial.c", "src/core/lib/iomgr/wakeup_fd_pipe.c", diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index ea013c4ceb..a8f4ca8269 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -2087,6 +2087,25 @@ "ci_platforms": [ "linux", "mac", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "flaky": false, + "gtest": false, + "language": "c", + "name": "wakeup_fd_cv_test", + "platforms": [ + "linux", + "mac", + "posix" + ] + }, + { + "args": [], + "ci_platforms": [ + "linux", + "mac", "posix", "windows" ], diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index c6ebfd0eae..31fdbe4f44 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -352,6 +352,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\timer_heap.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\udp_server.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_posix.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\workqueue.h" /> @@ -578,6 +579,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_nospecial.c"> diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index de3202f53d..0dcb848cd9 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -175,6 +175,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c"> <Filter>src\core\lib\iomgr</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c"> + <Filter>src\core\lib\iomgr</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c"> <Filter>src\core\lib\iomgr</Filter> </ClCompile> @@ -839,6 +842,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h"> <Filter>src\core\lib\iomgr</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h"> + <Filter>src\core\lib\iomgr</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h"> <Filter>src\core\lib\iomgr</Filter> </ClInclude> diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj index 09103e92c7..ed74ef0f0b 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj @@ -245,6 +245,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\timer_heap.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\udp_server.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_posix.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\workqueue.h" /> @@ -426,6 +427,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_nospecial.c"> diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters index d1230a26bb..9a6ad72611 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters @@ -229,6 +229,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c"> <Filter>src\core\lib\iomgr</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c"> + <Filter>src\core\lib\iomgr</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c"> <Filter>src\core\lib\iomgr</Filter> </ClCompile> @@ -626,6 +629,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h"> <Filter>src\core\lib\iomgr</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h"> + <Filter>src\core\lib\iomgr</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h"> <Filter>src\core\lib\iomgr</Filter> </ClInclude> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 887b12b0e4..8d007329b6 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -342,6 +342,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\timer_heap.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\udp_server.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_posix.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\workqueue.h" /> @@ -546,6 +547,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_nospecial.c"> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index fb5e80a16d..1a28d41bbf 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -178,6 +178,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c"> <Filter>src\core\lib\iomgr</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c"> + <Filter>src\core\lib\iomgr</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c"> <Filter>src\core\lib\iomgr</Filter> </ClCompile> @@ -749,6 +752,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h"> <Filter>src\core\lib\iomgr</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h"> + <Filter>src\core\lib\iomgr</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h"> <Filter>src\core\lib\iomgr</Filter> </ClInclude> |