aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD8
-rw-r--r--CMakeLists.txt3
-rw-r--r--Makefile40
-rw-r--r--binding.gyp1
-rw-r--r--build.yaml16
-rw-r--r--config.m41
-rw-r--r--gRPC-Core.podspec3
-rwxr-xr-xgrpc.gemspec2
-rw-r--r--package.xml2
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c169
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c2
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c4
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c241
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.h1
-rw-r--r--src/core/lib/iomgr/ev_posix.c1
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.c118
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.h80
-rw-r--r--src/core/lib/iomgr/wakeup_fd_pipe.c12
-rw-r--r--src/core/lib/iomgr/wakeup_fd_posix.c33
-rw-r--r--src/core/lib/iomgr/wakeup_fd_posix.h5
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
-rw-r--r--test/core/iomgr/wakeup_fd_cv_test.c240
-rw-r--r--tools/doxygen/Doxyfile.core.internal2
-rw-r--r--tools/run_tests/filter_pull_request_tests.py184
-rwxr-xr-xtools/run_tests/run_tests.py2
-rwxr-xr-xtools/run_tests/run_tests_matrix.py25
-rw-r--r--tools/run_tests/sources_and_headers.json20
-rw-r--r--tools/run_tests/tests.json19
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj.filters6
-rw-r--r--vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters6
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters6
34 files changed, 1172 insertions, 90 deletions
diff --git a/BUILD b/BUILD
index 42a0cb3a95..b28c2285e1 100644
--- a/BUILD
+++ b/BUILD
@@ -217,6 +217,7 @@ cc_library(
"src/core/lib/iomgr/timer_heap.h",
"src/core/lib/iomgr/udp_server.h",
"src/core/lib/iomgr/unix_sockets_posix.h",
+ "src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/workqueue.h",
@@ -378,6 +379,7 @@ cc_library(
"src/core/lib/iomgr/udp_server.c",
"src/core/lib/iomgr/unix_sockets_posix.c",
"src/core/lib/iomgr/unix_sockets_posix_noop.c",
+ "src/core/lib/iomgr/wakeup_fd_cv.c",
"src/core/lib/iomgr/wakeup_fd_eventfd.c",
"src/core/lib/iomgr/wakeup_fd_nospecial.c",
"src/core/lib/iomgr/wakeup_fd_pipe.c",
@@ -616,6 +618,7 @@ cc_library(
"src/core/lib/iomgr/timer_heap.h",
"src/core/lib/iomgr/udp_server.h",
"src/core/lib/iomgr/unix_sockets_posix.h",
+ "src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/workqueue.h",
@@ -762,6 +765,7 @@ cc_library(
"src/core/lib/iomgr/udp_server.c",
"src/core/lib/iomgr/unix_sockets_posix.c",
"src/core/lib/iomgr/unix_sockets_posix_noop.c",
+ "src/core/lib/iomgr/wakeup_fd_cv.c",
"src/core/lib/iomgr/wakeup_fd_eventfd.c",
"src/core/lib/iomgr/wakeup_fd_nospecial.c",
"src/core/lib/iomgr/wakeup_fd_pipe.c",
@@ -970,6 +974,7 @@ cc_library(
"src/core/lib/iomgr/timer_heap.h",
"src/core/lib/iomgr/udp_server.h",
"src/core/lib/iomgr/unix_sockets_posix.h",
+ "src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/workqueue.h",
@@ -1108,6 +1113,7 @@ cc_library(
"src/core/lib/iomgr/udp_server.c",
"src/core/lib/iomgr/unix_sockets_posix.c",
"src/core/lib/iomgr/unix_sockets_posix_noop.c",
+ "src/core/lib/iomgr/wakeup_fd_cv.c",
"src/core/lib/iomgr/wakeup_fd_eventfd.c",
"src/core/lib/iomgr/wakeup_fd_nospecial.c",
"src/core/lib/iomgr/wakeup_fd_pipe.c",
@@ -1870,6 +1876,7 @@ objc_library(
"src/core/lib/iomgr/udp_server.c",
"src/core/lib/iomgr/unix_sockets_posix.c",
"src/core/lib/iomgr/unix_sockets_posix_noop.c",
+ "src/core/lib/iomgr/wakeup_fd_cv.c",
"src/core/lib/iomgr/wakeup_fd_eventfd.c",
"src/core/lib/iomgr/wakeup_fd_nospecial.c",
"src/core/lib/iomgr/wakeup_fd_pipe.c",
@@ -2087,6 +2094,7 @@ objc_library(
"src/core/lib/iomgr/timer_heap.h",
"src/core/lib/iomgr/udp_server.h",
"src/core/lib/iomgr/unix_sockets_posix.h",
+ "src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/workqueue.h",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d82b677771..f4ece9a1f3 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -346,6 +346,7 @@ add_library(grpc
src/core/lib/iomgr/udp_server.c
src/core/lib/iomgr/unix_sockets_posix.c
src/core/lib/iomgr/unix_sockets_posix_noop.c
+ src/core/lib/iomgr/wakeup_fd_cv.c
src/core/lib/iomgr/wakeup_fd_eventfd.c
src/core/lib/iomgr/wakeup_fd_nospecial.c
src/core/lib/iomgr/wakeup_fd_pipe.c
@@ -604,6 +605,7 @@ add_library(grpc_cronet
src/core/lib/iomgr/udp_server.c
src/core/lib/iomgr/unix_sockets_posix.c
src/core/lib/iomgr/unix_sockets_posix_noop.c
+ src/core/lib/iomgr/wakeup_fd_cv.c
src/core/lib/iomgr/wakeup_fd_eventfd.c
src/core/lib/iomgr/wakeup_fd_nospecial.c
src/core/lib/iomgr/wakeup_fd_pipe.c
@@ -834,6 +836,7 @@ add_library(grpc_unsecure
src/core/lib/iomgr/udp_server.c
src/core/lib/iomgr/unix_sockets_posix.c
src/core/lib/iomgr/unix_sockets_posix_noop.c
+ src/core/lib/iomgr/wakeup_fd_cv.c
src/core/lib/iomgr/wakeup_fd_eventfd.c
src/core/lib/iomgr/wakeup_fd_nospecial.c
src/core/lib/iomgr/wakeup_fd_pipe.c
diff --git a/Makefile b/Makefile
index 496d1cee6b..6b3c3ed848 100644
--- a/Makefile
+++ b/Makefile
@@ -1025,6 +1025,7 @@ transport_security_test: $(BINDIR)/$(CONFIG)/transport_security_test
udp_server_test: $(BINDIR)/$(CONFIG)/udp_server_test
uri_fuzzer_test: $(BINDIR)/$(CONFIG)/uri_fuzzer_test
uri_parser_test: $(BINDIR)/$(CONFIG)/uri_parser_test
+wakeup_fd_cv_test: $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test
alarm_cpp_test: $(BINDIR)/$(CONFIG)/alarm_cpp_test
async_end2end_test: $(BINDIR)/$(CONFIG)/async_end2end_test
auth_property_iterator_test: $(BINDIR)/$(CONFIG)/auth_property_iterator_test
@@ -1340,6 +1341,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/transport_security_test \
$(BINDIR)/$(CONFIG)/udp_server_test \
$(BINDIR)/$(CONFIG)/uri_parser_test \
+ $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test \
$(BINDIR)/$(CONFIG)/public_headers_must_be_c89 \
$(BINDIR)/$(CONFIG)/badreq_bad_client_test \
$(BINDIR)/$(CONFIG)/connection_prefix_bad_client_test \
@@ -1738,6 +1740,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/udp_server_test || ( echo test udp_server_test failed ; exit 1 )
$(E) "[RUN] Testing uri_parser_test"
$(Q) $(BINDIR)/$(CONFIG)/uri_parser_test || ( echo test uri_parser_test failed ; exit 1 )
+ $(E) "[RUN] Testing wakeup_fd_cv_test"
+ $(Q) $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test || ( echo test wakeup_fd_cv_test failed ; exit 1 )
$(E) "[RUN] Testing public_headers_must_be_c89"
$(Q) $(BINDIR)/$(CONFIG)/public_headers_must_be_c89 || ( echo test public_headers_must_be_c89 failed ; exit 1 )
$(E) "[RUN] Testing badreq_bad_client_test"
@@ -2590,6 +2594,7 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
+ src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \
@@ -2866,6 +2871,7 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
+ src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \
@@ -3132,6 +3138,7 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
+ src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \
@@ -3325,6 +3332,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
+ src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \
@@ -10753,6 +10761,38 @@ endif
endif
+WAKEUP_FD_CV_TEST_SRC = \
+ test/core/iomgr/wakeup_fd_cv_test.c \
+
+WAKEUP_FD_CV_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(WAKEUP_FD_CV_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/wakeup_fd_cv_test: openssl_dep_error
+
+else
+
+
+
+$(BINDIR)/$(CONFIG)/wakeup_fd_cv_test: $(WAKEUP_FD_CV_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LD) $(LDFLAGS) $(WAKEUP_FD_CV_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/iomgr/wakeup_fd_cv_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_wakeup_fd_cv_test: $(WAKEUP_FD_CV_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(WAKEUP_FD_CV_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
ALARM_CPP_TEST_SRC = \
test/cpp/common/alarm_cpp_test.cc \
diff --git a/binding.gyp b/binding.gyp
index dc215cf5ff..6b1c351681 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -621,6 +621,7 @@
'src/core/lib/iomgr/udp_server.c',
'src/core/lib/iomgr/unix_sockets_posix.c',
'src/core/lib/iomgr/unix_sockets_posix_noop.c',
+ 'src/core/lib/iomgr/wakeup_fd_cv.c',
'src/core/lib/iomgr/wakeup_fd_eventfd.c',
'src/core/lib/iomgr/wakeup_fd_nospecial.c',
'src/core/lib/iomgr/wakeup_fd_pipe.c',
diff --git a/build.yaml b/build.yaml
index fc1599ecf7..51159f7e07 100644
--- a/build.yaml
+++ b/build.yaml
@@ -221,6 +221,7 @@ filegroups:
- src/core/lib/iomgr/timer_heap.h
- src/core/lib/iomgr/udp_server.h
- src/core/lib/iomgr/unix_sockets_posix.h
+ - src/core/lib/iomgr/wakeup_fd_cv.h
- src/core/lib/iomgr/wakeup_fd_pipe.h
- src/core/lib/iomgr/wakeup_fd_posix.h
- src/core/lib/iomgr/workqueue.h
@@ -305,6 +306,7 @@ filegroups:
- src/core/lib/iomgr/udp_server.c
- src/core/lib/iomgr/unix_sockets_posix.c
- src/core/lib/iomgr/unix_sockets_posix_noop.c
+ - src/core/lib/iomgr/wakeup_fd_cv.c
- src/core/lib/iomgr/wakeup_fd_eventfd.c
- src/core/lib/iomgr/wakeup_fd_nospecial.c
- src/core/lib/iomgr/wakeup_fd_pipe.c
@@ -2590,6 +2592,20 @@ targets:
- grpc
- gpr_test_util
- gpr
+- name: wakeup_fd_cv_test
+ build: test
+ language: c
+ src:
+ - test/core/iomgr/wakeup_fd_cv_test.c
+ deps:
+ - grpc_test_util
+ - grpc
+ - gpr_test_util
+ - gpr
+ platforms:
+ - mac
+ - linux
+ - posix
- name: alarm_cpp_test
gtest: true
build: test
diff --git a/config.m4 b/config.m4
index 1c52caae48..30e7e2d0f3 100644
--- a/config.m4
+++ b/config.m4
@@ -140,6 +140,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
+ src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 4d8186190a..f9932122e9 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -304,6 +304,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/timer_heap.h',
'src/core/lib/iomgr/udp_server.h',
'src/core/lib/iomgr/unix_sockets_posix.h',
+ 'src/core/lib/iomgr/wakeup_fd_cv.h',
'src/core/lib/iomgr/wakeup_fd_pipe.h',
'src/core/lib/iomgr/wakeup_fd_posix.h',
'src/core/lib/iomgr/workqueue.h',
@@ -469,6 +470,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/udp_server.c',
'src/core/lib/iomgr/unix_sockets_posix.c',
'src/core/lib/iomgr/unix_sockets_posix_noop.c',
+ 'src/core/lib/iomgr/wakeup_fd_cv.c',
'src/core/lib/iomgr/wakeup_fd_eventfd.c',
'src/core/lib/iomgr/wakeup_fd_nospecial.c',
'src/core/lib/iomgr/wakeup_fd_pipe.c',
@@ -675,6 +677,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/timer_heap.h',
'src/core/lib/iomgr/udp_server.h',
'src/core/lib/iomgr/unix_sockets_posix.h',
+ 'src/core/lib/iomgr/wakeup_fd_cv.h',
'src/core/lib/iomgr/wakeup_fd_pipe.h',
'src/core/lib/iomgr/wakeup_fd_posix.h',
'src/core/lib/iomgr/workqueue.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index d788c4b68e..ab75a04119 100755
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -224,6 +224,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/timer_heap.h )
s.files += %w( src/core/lib/iomgr/udp_server.h )
s.files += %w( src/core/lib/iomgr/unix_sockets_posix.h )
+ s.files += %w( src/core/lib/iomgr/wakeup_fd_cv.h )
s.files += %w( src/core/lib/iomgr/wakeup_fd_pipe.h )
s.files += %w( src/core/lib/iomgr/wakeup_fd_posix.h )
s.files += %w( src/core/lib/iomgr/workqueue.h )
@@ -389,6 +390,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/udp_server.c )
s.files += %w( src/core/lib/iomgr/unix_sockets_posix.c )
s.files += %w( src/core/lib/iomgr/unix_sockets_posix_noop.c )
+ s.files += %w( src/core/lib/iomgr/wakeup_fd_cv.c )
s.files += %w( src/core/lib/iomgr/wakeup_fd_eventfd.c )
s.files += %w( src/core/lib/iomgr/wakeup_fd_nospecial.c )
s.files += %w( src/core/lib/iomgr/wakeup_fd_pipe.c )
diff --git a/package.xml b/package.xml
index 3e54e1a599..37103ace45 100644
--- a/package.xml
+++ b/package.xml
@@ -231,6 +231,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/timer_heap.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/udp_server.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/unix_sockets_posix.h" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_cv.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_pipe.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/workqueue.h" role="src" />
@@ -396,6 +397,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/udp_server.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/unix_sockets_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/unix_sockets_posix_noop.c" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_cv.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_eventfd.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_nospecial.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/wakeup_fd_pipe.c" role="src" />
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 04413ed655..9af92b787d 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -69,8 +69,8 @@
* possible scenarios:
*
* 1. This is the first server list received. There was no previous instance of
- * the Round Robin policy. \a rr_handover() will instantiate the RR policy
- * and perform all the pending operations over it.
+ * the Round Robin policy. \a rr_handover_locked() will instantiate the RR
+ * policy and perform all the pending operations over it.
* 2. There's already a RR policy instance active. We need to introduce the new
* one build from the new serverlist, but taking care not to disrupt the
* operations in progress over the old RR instance. This is done by
@@ -78,7 +78,7 @@
* references are held on the old RR policy, it'll be destroyed and \a
* glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
* state. At this point we can transition to a new RR instance safely, which
- * is done once again via \a rr_handover().
+ * is done once again via \a rr_handover_locked().
*
*
* Once a RR policy instance is in place (and getting updated as described),
@@ -86,8 +86,8 @@
* forwarding them to the RR instance. Any time there's no RR policy available
* (ie, right after the creation of the gRPCLB policy, if an empty serverlist
* is received, etc), pick/ping requests are added to a list of pending
- * picks/pings to be flushed and serviced as part of \a rr_handover() the moment
- * the RR policy instance becomes available.
+ * picks/pings to be flushed and serviced as part of \a rr_handover_locked() the
+ * moment the RR policy instance becomes available.
*
* \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
* high level design and details. */
@@ -134,6 +134,9 @@ static void initial_metadata_add_lb_token(
}
typedef struct wrapped_rr_closure_arg {
+ /* the closure instance using this struct as argument */
+ grpc_closure wrapper_closure;
+
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance, respectively. */
grpc_closure *wrapped_closure;
@@ -155,9 +158,8 @@ typedef struct wrapped_rr_closure_arg {
/* The RR instance related to the closure */
grpc_lb_policy *rr_policy;
- /* when not NULL, represents a pending_{pick,ping} node to be freed upon
- * closure execution */
- void *owning_pending_node; /* to be freed if not NULL */
+ /* heap memory to be freed upon closure execution. */
+ void *free_when_done;
} wrapped_rr_closure_arg;
/* The \a on_complete closure passed as part of the pick requires keeping a
@@ -183,10 +185,10 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
}
}
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
-
grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error),
NULL);
- gpr_free(wc_arg->owning_pending_node);
+ GPR_ASSERT(wc_arg->free_when_done != NULL);
+ gpr_free(wc_arg->free_when_done);
}
/* Linked list of pending pick requests. It stores all information needed to
@@ -207,10 +209,6 @@ typedef struct pending_pick {
* upon error. */
grpc_connected_subchannel **target;
- /* a closure wrapping the original on_complete one to be invoked once the
- * pick() has completed (regardless of success) */
- grpc_closure wrapped_on_complete;
-
/* args for wrapped_on_complete */
wrapped_rr_closure_arg wrapped_on_complete_arg;
} pending_pick;
@@ -230,8 +228,9 @@ static void add_pending_pick(pending_pick **root,
pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
pick_args->lb_token_mdelem_storage;
- grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
- &pp->wrapped_on_complete_arg);
+ pp->wrapped_on_complete_arg.free_when_done = pp;
+ grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure,
+ wrapped_rr_closure, &pp->wrapped_on_complete_arg);
*root = pp;
}
@@ -239,10 +238,6 @@ static void add_pending_pick(pending_pick **root,
typedef struct pending_ping {
struct pending_ping *next;
- /* a closure wrapping the original on_complete one to be invoked once the
- * ping() has completed (regardless of success) */
- grpc_closure wrapped_notify;
-
/* args for wrapped_notify */
wrapped_rr_closure_arg wrapped_notify_arg;
} pending_ping;
@@ -251,10 +246,11 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
pending_ping *pping = gpr_malloc(sizeof(*pping));
memset(pping, 0, sizeof(pending_ping));
memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg));
- pping->next = *root;
- grpc_closure_init(&pping->wrapped_notify, wrapped_rr_closure,
- &pping->wrapped_notify_arg);
pping->wrapped_notify_arg.wrapped_closure = notify;
+ pping->wrapped_notify_arg.free_when_done = pping;
+ pping->next = *root;
+ grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure,
+ wrapped_rr_closure, &pping->wrapped_notify_arg);
*root = pping;
}
@@ -307,13 +303,6 @@ typedef struct glb_lb_policy {
/** for tracking of the RR connectivity */
rr_connectivity_data *rr_connectivity;
-
- /* a wrapped (see \a wrapped_rr_closure) on-complete closure for readily
- * available RR picks */
- grpc_closure wrapped_on_complete;
-
- /* arguments for the wrapped_on_complete closure */
- wrapped_rr_closure_arg wc_arg;
} glb_lb_policy;
/* Keeps track and reacts to changes in connectivity of the RR instance */
@@ -424,9 +413,43 @@ static void lb_token_destroy(void *token) {
if (token != NULL) GRPC_MDELEM_UNREF(token);
}
-static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
- const grpc_grpclb_serverlist *serverlist,
- glb_lb_policy *glb_policy) {
+/* perform a pick over \a rr_policy. Given that a pick can return immediately
+ * (ignoring its completion callback) we need to perform the cleanups this
+ * callback would be otherwise resposible for */
+static bool pick_from_internal_rr_locked(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
+ GPR_ASSERT(rr_policy != NULL);
+ const bool pick_done =
+ grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target,
+ (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
+ if (pick_done) {
+ /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
+ (intptr_t)wc_arg->rr_policy);
+ }
+ GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick");
+
+ /* add the load reporting initial metadata */
+ initial_metadata_add_lb_token(pick_args->initial_metadata,
+ pick_args->lb_token_mdelem_storage,
+ GRPC_MDELEM_REF(wc_arg->lb_token));
+
+ gpr_free(wc_arg);
+ }
+ /* else, the pending pick will be registered and taken care of by the
+ * pending pick list inside the RR policy (glb_policy->rr_policy).
+ * Eventually, wrapped_on_complete will be called, which will -among other
+ * things- add the LB token to the call's initial metadata */
+
+ return pick_done;
+}
+
+static grpc_lb_policy *create_rr_locked(
+ grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist,
+ glb_lb_policy *glb_policy) {
GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
grpc_lb_policy_args args;
@@ -446,12 +469,12 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
return rr;
}
-static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
- grpc_error *error) {
+static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy, grpc_error *error) {
GPR_ASSERT(glb_policy->serverlist != NULL &&
glb_policy->serverlist->num_servers > 0);
glb_policy->rr_policy =
- create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
+ create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")",
@@ -481,11 +504,9 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
- grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pp->pick_args,
- pp->target,
- (void **)&pp->wrapped_on_complete_arg.lb_token,
- &pp->wrapped_on_complete);
- pp->wrapped_on_complete_arg.owning_pending_node = pp;
+ pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
+ &pp->pick_args, pp->target,
+ &pp->wrapped_on_complete_arg);
}
pending_ping *pping;
@@ -498,8 +519,7 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
(intptr_t)glb_policy->rr_policy);
}
grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
- &pping->wrapped_notify);
- pping->wrapped_notify_arg.owning_pending_node = pping;
+ &pping->wrapped_notify_arg.wrapper_closure);
}
}
@@ -512,13 +532,16 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (glb_policy->serverlist != NULL) {
/* a RR policy is shutting down but there's a serverlist available ->
* perform a handover */
- rr_handover(exec_ctx, glb_policy, error);
+ gpr_mu_lock(&glb_policy->mu);
+ rr_handover_locked(exec_ctx, glb_policy, error);
+ gpr_mu_unlock(&glb_policy->mu);
} else {
/* shutting down and no new serverlist available. Bail out. */
gpr_free(rr_conn_data);
}
} else {
if (error == GRPC_ERROR_NONE) {
+ gpr_mu_lock(&glb_policy->mu);
/* RR not shutting down. Mimic the RR's policy state */
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
rr_conn_data->state, GRPC_ERROR_REF(error),
@@ -527,6 +550,7 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_conn_data->state,
&rr_conn_data->on_change);
+ gpr_mu_unlock(&glb_policy->mu);
} else { /* error */
gpr_free(rr_conn_data);
}
@@ -651,15 +675,15 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE,
- NULL);
+ grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_ERROR_NONE, NULL);
pp = next;
}
while (pping != NULL) {
pending_ping *next = pping->next;
- grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify, GRPC_ERROR_NONE,
- NULL);
+ grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
+ GRPC_ERROR_NONE, NULL);
pping = next;
}
@@ -691,7 +715,7 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (pp->target == target) {
*target = NULL;
grpc_exec_ctx_sched(
- exec_ctx, &pp->wrapped_on_complete,
+ exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
} else {
pp->next = glb_policy->pending_picks;
@@ -721,7 +745,7 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
grpc_exec_ctx_sched(
- exec_ctx, &pp->wrapped_on_complete,
+ exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
} else {
pp->next = glb_policy->pending_picks;
@@ -774,37 +798,20 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
(intptr_t)glb_policy->rr_policy);
}
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
- memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
- glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
- glb_policy->wc_arg.target = target;
- glb_policy->wc_arg.wrapped_closure = on_complete;
- glb_policy->wc_arg.lb_token_mdelem_storage =
- pick_args->lb_token_mdelem_storage;
- glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata;
- glb_policy->wc_arg.owning_pending_node = NULL;
- grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
- &glb_policy->wc_arg);
-
- pick_done =
- grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
- (void **)&glb_policy->wc_arg.lb_token,
- &glb_policy->wrapped_on_complete);
- if (pick_done) {
- /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
- if (grpc_lb_glb_trace) {
- gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
- (intptr_t)glb_policy->wc_arg.rr_policy);
- }
- GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
- /* add the load reporting initial metadata */
- initial_metadata_add_lb_token(
- pick_args->initial_metadata, pick_args->lb_token_mdelem_storage,
- GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token));
- }
+ wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg));
+ memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg));
+
+ grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg);
+ wc_arg->rr_policy = glb_policy->rr_policy;
+ wc_arg->target = target;
+ wc_arg->wrapped_closure = on_complete;
+ wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
+ wc_arg->initial_metadata = pick_args->initial_metadata;
+ wc_arg->free_when_done = wc_arg;
+ pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
+ pick_args, target, wc_arg);
} else {
- /* else, the pending pick will be registered and taken care of by the
- * pending pick list inside the RR policy (glb_policy->rr_policy) */
add_pending_pick(&glb_policy->pending_picks, pick_args, target,
on_complete);
@@ -1073,6 +1080,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* update serverlist */
if (serverlist->num_servers > 0) {
+ gpr_mu_lock(&lb_client->glb_policy->mu);
if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist,
serverlist)) {
if (grpc_lb_glb_trace) {
@@ -1090,7 +1098,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
if (lb_client->glb_policy->rr_policy == NULL) {
/* initial "handover", in this case from a null RR policy, meaning
* it'll just create the first RR policy instance */
- rr_handover(exec_ctx, lb_client->glb_policy, error);
+ rr_handover_locked(exec_ctx, lb_client->glb_policy, error);
} else {
/* unref the RR policy, eventually leading to its substitution with a
* new one constructed from the received serverlist (see
@@ -1098,6 +1106,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
"serverlist_received");
}
+ gpr_mu_unlock(&lb_client->glb_policy->mu);
} else {
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index fca4ba99b6..9bd3f9da24 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -396,7 +396,6 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
/* readily available, report right away */
- gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (user_data != NULL) {
@@ -409,6 +408,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
/* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p);
+ gpr_mu_unlock(&p->mu);
return 1;
} else {
/* no pick currently available. Save for later in list of pending picks */
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 8de42bb7a9..e5909d9380 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -2023,6 +2023,10 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
return NULL;
}
+ if (!grpc_has_wakeup_fd()) {
+ return NULL;
+ }
+
if (!is_epoll_available()) {
return NULL;
}
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index b84a56018f..351b069613 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -47,10 +47,12 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/thd.h>
#include <grpc/support/tls.h>
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/wakeup_fd_cv.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
@@ -246,6 +248,28 @@ struct grpc_pollset_set {
};
/*******************************************************************************
+ * condition variable polling definitions
+ */
+
+#define CV_POLL_PERIOD_MS 1000
+#define CV_DEFAULT_TABLE_SIZE 16
+
+typedef enum poll_status_t { INPROGRESS, COMPLETED, CANCELLED } poll_status_t;
+
+typedef struct poll_args {
+ gpr_refcount refcount;
+ gpr_cv *cv;
+ struct pollfd *fds;
+ nfds_t nfds;
+ int timeout;
+ int retval;
+ int err;
+ gpr_atm status;
+} poll_args;
+
+cv_fd_table g_cvfds;
+
+/*******************************************************************************
* fd_posix.c
*/
@@ -1262,10 +1286,211 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
}
/*******************************************************************************
+ * Condition Variable polling extensions
+ */
+
+static void decref_poll_args(poll_args *args) {
+ if (gpr_unref(&args->refcount)) {
+ gpr_free(args->fds);
+ gpr_cv_destroy(args->cv);
+ gpr_free(args->cv);
+ gpr_free(args);
+ }
+}
+
+// Poll in a background thread
+static void run_poll(void *arg) {
+ int timeout, retval;
+ poll_args *pargs = (poll_args *)arg;
+ while (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
+ if (pargs->timeout < 0) {
+ timeout = CV_POLL_PERIOD_MS;
+ } else {
+ timeout = GPR_MIN(CV_POLL_PERIOD_MS, pargs->timeout);
+ pargs->timeout -= timeout;
+ }
+ retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout);
+ if (retval != 0 || pargs->timeout == 0) {
+ pargs->retval = retval;
+ pargs->err = errno;
+ break;
+ }
+ }
+ gpr_mu_lock(&g_cvfds.mu);
+ if (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
+ // Signal main thread that the poll completed
+ gpr_atm_no_barrier_store(&pargs->status, COMPLETED);
+ gpr_cv_signal(pargs->cv);
+ }
+ decref_poll_args(pargs);
+ g_cvfds.pollcount--;
+ if (g_cvfds.shutdown && g_cvfds.pollcount == 0) {
+ gpr_cv_signal(&g_cvfds.shutdown_complete);
+ }
+ gpr_mu_unlock(&g_cvfds.mu);
+}
+
+// This function overrides poll() to handle condition variable wakeup fds
+static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
+ unsigned int i;
+ int res, idx;
+ gpr_cv *pollcv;
+ cv_node *cvn, *prev;
+ nfds_t nsockfds = 0;
+ gpr_thd_id t_id;
+ gpr_thd_options opt;
+ poll_args *pargs = NULL;
+ gpr_mu_lock(&g_cvfds.mu);
+ pollcv = gpr_malloc(sizeof(gpr_cv));
+ gpr_cv_init(pollcv);
+ for (i = 0; i < nfds; i++) {
+ fds[i].revents = 0;
+ if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
+ idx = FD_TO_IDX(fds[i].fd);
+ cvn = gpr_malloc(sizeof(cv_node));
+ cvn->cv = pollcv;
+ cvn->next = g_cvfds.cvfds[idx].cvs;
+ g_cvfds.cvfds[idx].cvs = cvn;
+ // We should return immediately if there are pending events,
+ // but we still need to call poll() to check for socket events
+ if (g_cvfds.cvfds[idx].is_set) {
+ timeout = 0;
+ }
+ } else if (fds[i].fd >= 0) {
+ nsockfds++;
+ }
+ }
+
+ if (nsockfds > 0) {
+ pargs = gpr_malloc(sizeof(struct poll_args));
+ // Both the main thread and calling thread get a reference
+ gpr_ref_init(&pargs->refcount, 2);
+ pargs->cv = pollcv;
+ pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
+ pargs->nfds = nsockfds;
+ pargs->timeout = timeout;
+ pargs->retval = 0;
+ pargs->err = 0;
+ gpr_atm_no_barrier_store(&pargs->status, INPROGRESS);
+ idx = 0;
+ for (i = 0; i < nfds; i++) {
+ if (fds[i].fd >= 0) {
+ pargs->fds[idx].fd = fds[i].fd;
+ pargs->fds[idx].events = fds[i].events;
+ pargs->fds[idx].revents = 0;
+ idx++;
+ }
+ }
+ g_cvfds.pollcount++;
+ opt = gpr_thd_options_default();
+ gpr_thd_options_set_detached(&opt);
+ gpr_thd_new(&t_id, &run_poll, pargs, &opt);
+ // We want the poll() thread to trigger the deadline, so wait forever here
+ gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
+ res = pargs->retval;
+ errno = pargs->err;
+ } else {
+ res = 0;
+ errno = 0;
+ gpr_atm_no_barrier_store(&pargs->status, CANCELLED);
+ }
+ } else {
+ gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
+ deadline =
+ gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
+ gpr_cv_wait(pollcv, &g_cvfds.mu, deadline);
+ res = 0;
+ }
+
+ idx = 0;
+ for (i = 0; i < nfds; i++) {
+ if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
+ cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs;
+ prev = NULL;
+ while (cvn->cv != pollcv) {
+ prev = cvn;
+ cvn = cvn->next;
+ GPR_ASSERT(cvn);
+ }
+ if (!prev) {
+ g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next;
+ } else {
+ prev->next = cvn->next;
+ }
+ gpr_free(cvn);
+
+ if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) {
+ fds[i].revents = POLLIN;
+ if (res >= 0) res++;
+ }
+ } else if (fds[i].fd >= 0 &&
+ gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
+ fds[i].revents = pargs->fds[idx].revents;
+ idx++;
+ }
+ }
+
+ if (pargs) {
+ decref_poll_args(pargs);
+ } else {
+ gpr_cv_destroy(pollcv);
+ gpr_free(pollcv);
+ }
+ gpr_mu_unlock(&g_cvfds.mu);
+
+ return res;
+}
+
+static void global_cv_fd_table_init() {
+ gpr_mu_init(&g_cvfds.mu);
+ gpr_mu_lock(&g_cvfds.mu);
+ gpr_cv_init(&g_cvfds.shutdown_complete);
+ g_cvfds.shutdown = 0;
+ g_cvfds.pollcount = 0;
+ g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
+ g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE);
+ g_cvfds.free_fds = NULL;
+ for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
+ g_cvfds.cvfds[i].is_set = 0;
+ g_cvfds.cvfds[i].cvs = NULL;
+ g_cvfds.cvfds[i].next_free = g_cvfds.free_fds;
+ g_cvfds.free_fds = &g_cvfds.cvfds[i];
+ }
+ // Override the poll function with one that supports cvfds
+ g_cvfds.poll = grpc_poll_function;
+ grpc_poll_function = &cvfd_poll;
+ gpr_mu_unlock(&g_cvfds.mu);
+}
+
+static void global_cv_fd_table_shutdown() {
+ gpr_mu_lock(&g_cvfds.mu);
+ g_cvfds.shutdown = 1;
+ // Attempt to wait for all abandoned poll() threads to terminate
+ // Not doing so will result in reported memory leaks
+ if (g_cvfds.pollcount > 0) {
+ int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu,
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(3, GPR_TIMESPAN)));
+ GPR_ASSERT(res == 0);
+ }
+ gpr_cv_destroy(&g_cvfds.shutdown_complete);
+ grpc_poll_function = g_cvfds.poll;
+ gpr_free(g_cvfds.cvfds);
+ gpr_mu_unlock(&g_cvfds.mu);
+ gpr_mu_destroy(&g_cvfds.mu);
+}
+
+/*******************************************************************************
* event engine binding
*/
-static void shutdown_engine(void) { pollset_global_shutdown(); }
+static void shutdown_engine(void) {
+ pollset_global_shutdown();
+ if (grpc_cv_wakeup_fds_enabled()) {
+ global_cv_fd_table_shutdown();
+ }
+}
static const grpc_event_engine_vtable vtable = {
.pollset_size = sizeof(grpc_pollset),
@@ -1307,7 +1532,21 @@ static const grpc_event_engine_vtable vtable = {
};
const grpc_event_engine_vtable *grpc_init_poll_posix(void) {
+ if (!grpc_has_wakeup_fd()) {
+ return NULL;
+ }
+ if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
+ return NULL;
+ }
+ return &vtable;
+}
+
+const grpc_event_engine_vtable *grpc_init_poll_cv_posix(void) {
+ global_cv_fd_table_init();
+ grpc_enable_cv_wakeup_fds(1);
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
+ global_cv_fd_table_shutdown();
+ grpc_enable_cv_wakeup_fds(0);
return NULL;
}
return &vtable;
diff --git a/src/core/lib/iomgr/ev_poll_posix.h b/src/core/lib/iomgr/ev_poll_posix.h
index 291736a2db..202ffca14c 100644
--- a/src/core/lib/iomgr/ev_poll_posix.h
+++ b/src/core/lib/iomgr/ev_poll_posix.h
@@ -37,5 +37,6 @@
#include "src/core/lib/iomgr/ev_posix.h"
const grpc_event_engine_vtable *grpc_init_poll_posix(void);
+const grpc_event_engine_vtable *grpc_init_poll_cv_posix(void);
#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 26618f8d55..9857b0bce9 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -66,6 +66,7 @@ typedef struct {
static const event_engine_factory g_factories[] = {
{"epoll", grpc_init_epoll_linux},
{"poll", grpc_init_poll_posix},
+ {"poll-cv", grpc_init_poll_cv_posix},
{"legacy", grpc_init_poll_and_epoll_posix},
};
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.c
new file mode 100644
index 0000000000..b4165208ed
--- /dev/null
+++ b/src/core/lib/iomgr/wakeup_fd_cv.c
@@ -0,0 +1,118 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_WAKEUP_FD
+
+#include "src/core/lib/iomgr/wakeup_fd_cv.h"
+
+#include <errno.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+
+#define MAX_TABLE_RESIZE 256
+
+extern cv_fd_table g_cvfds;
+
+static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
+ unsigned int i, newsize;
+ int idx;
+ gpr_mu_lock(&g_cvfds.mu);
+ if (!g_cvfds.free_fds) {
+ newsize = GPR_MIN(g_cvfds.size * 2, g_cvfds.size + MAX_TABLE_RESIZE);
+ g_cvfds.cvfds = gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize);
+ for (i = g_cvfds.size; i < newsize; i++) {
+ g_cvfds.cvfds[i].is_set = 0;
+ g_cvfds.cvfds[i].cvs = NULL;
+ g_cvfds.cvfds[i].next_free = g_cvfds.free_fds;
+ g_cvfds.free_fds = &g_cvfds.cvfds[i];
+ }
+ g_cvfds.size = newsize;
+ }
+
+ idx = (int)(g_cvfds.free_fds - g_cvfds.cvfds);
+ g_cvfds.free_fds = g_cvfds.free_fds->next_free;
+ g_cvfds.cvfds[idx].cvs = NULL;
+ g_cvfds.cvfds[idx].is_set = 0;
+ fd_info->read_fd = IDX_TO_FD(idx);
+ fd_info->write_fd = -1;
+ gpr_mu_unlock(&g_cvfds.mu);
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) {
+ cv_node* cvn;
+ gpr_mu_lock(&g_cvfds.mu);
+ g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 1;
+ cvn = g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs;
+ while (cvn) {
+ gpr_cv_signal(cvn->cv);
+ cvn = cvn->next;
+ }
+ gpr_mu_unlock(&g_cvfds.mu);
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error* cv_fd_consume(grpc_wakeup_fd* fd_info) {
+ gpr_mu_lock(&g_cvfds.mu);
+ g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 0;
+ gpr_mu_unlock(&g_cvfds.mu);
+ return GRPC_ERROR_NONE;
+}
+
+static void cv_fd_destroy(grpc_wakeup_fd* fd_info) {
+ if (fd_info->read_fd == 0) {
+ return;
+ }
+ gpr_mu_lock(&g_cvfds.mu);
+ // Assert that there are no active pollers
+ GPR_ASSERT(!g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs);
+ g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds;
+ g_cvfds.free_fds = &g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)];
+ gpr_mu_unlock(&g_cvfds.mu);
+}
+
+static int cv_check_availability(void) { return 1; }
+
+const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable = {
+ cv_fd_init, cv_fd_consume, cv_fd_wakeup, cv_fd_destroy,
+ cv_check_availability};
+
+#endif /* GPR_POSIX_WAKUP_FD */
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h
new file mode 100644
index 0000000000..ac16be1750
--- /dev/null
+++ b/src/core/lib/iomgr/wakeup_fd_cv.h
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/*
+ * wakeup_fd_cv uses condition variables to implement wakeup fds.
+ *
+ * It is intended for use only in cases when eventfd() and pipe() are not
+ * available. It can only be used with the "poll" engine.
+ *
+ * Implementation:
+ * A global table of cv wakeup fds is mantained. A cv wakeup fd is a negative
+ * file descriptor. poll() is then run in a background thread with only the
+ * real socket fds while we wait on a condition variable trigged by either the
+ * poll() completion or a wakeup_fd() call.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H
+#define GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H
+
+#include <grpc/support/sync.h>
+
+#include "src/core/lib/iomgr/ev_posix.h"
+
+#define FD_TO_IDX(fd) (-(fd)-1)
+#define IDX_TO_FD(idx) (-(idx)-1)
+
+typedef struct cv_node {
+ gpr_cv* cv;
+ struct cv_node* next;
+} cv_node;
+
+typedef struct fd_node {
+ int is_set;
+ cv_node* cvs;
+ struct fd_node* next_free;
+} fd_node;
+
+typedef struct cv_fd_table {
+ gpr_mu mu;
+ int pollcount;
+ int shutdown;
+ gpr_cv shutdown_complete;
+ fd_node* cvfds;
+ fd_node* free_fds;
+ unsigned int size;
+ grpc_poll_function_type poll;
+} cv_fd_table;
+
+#endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H */
diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.c
index 4e5dbdcb73..d0ea216aa0 100644
--- a/src/core/lib/iomgr/wakeup_fd_pipe.c
+++ b/src/core/lib/iomgr/wakeup_fd_pipe.c
@@ -47,11 +47,10 @@
static grpc_error* pipe_init(grpc_wakeup_fd* fd_info) {
int pipefd[2];
- /* TODO(klempner): Make this nonfatal */
int r = pipe(pipefd);
if (0 != r) {
gpr_log(GPR_ERROR, "pipe creation failed (%d): %s", errno, strerror(errno));
- abort();
+ return GRPC_OS_ERROR(errno, "pipe");
}
grpc_error* err;
err = grpc_set_socket_nonblocking(pipefd[0], 1);
@@ -95,8 +94,13 @@ static void pipe_destroy(grpc_wakeup_fd* fd_info) {
}
static int pipe_check_availability(void) {
- /* Assume that pipes are always available. */
- return 1;
+ grpc_wakeup_fd fd;
+ if (pipe_init(&fd) == GRPC_ERROR_NONE) {
+ pipe_destroy(&fd);
+ return 1;
+ } else {
+ return 0;
+ }
}
const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = {
diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.c
index 046208abc8..5c894bef37 100644
--- a/src/core/lib/iomgr/wakeup_fd_posix.c
+++ b/src/core/lib/iomgr/wakeup_fd_posix.c
@@ -36,37 +36,66 @@
#ifdef GPR_POSIX_WAKEUP_FD
#include <stddef.h>
+#include "src/core/lib/iomgr/wakeup_fd_cv.h"
#include "src/core/lib/iomgr/wakeup_fd_pipe.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+extern grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable;
static const grpc_wakeup_fd_vtable *wakeup_fd_vtable = NULL;
+
int grpc_allow_specialized_wakeup_fd = 1;
+int grpc_allow_pipe_wakeup_fd = 1;
+
+int has_real_wakeup_fd = 1;
+int cv_wakeup_fds_enabled = 0;
void grpc_wakeup_fd_global_init(void) {
if (grpc_allow_specialized_wakeup_fd &&
grpc_specialized_wakeup_fd_vtable.check_availability()) {
wakeup_fd_vtable = &grpc_specialized_wakeup_fd_vtable;
- } else {
+ } else if (grpc_allow_pipe_wakeup_fd &&
+ grpc_pipe_wakeup_fd_vtable.check_availability()) {
wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable;
+ } else {
+ has_real_wakeup_fd = 0;
}
}
void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; }
+int grpc_has_wakeup_fd(void) { return has_real_wakeup_fd; }
+
+int grpc_cv_wakeup_fds_enabled(void) { return cv_wakeup_fds_enabled; }
+
+void grpc_enable_cv_wakeup_fds(int enable) { cv_wakeup_fds_enabled = enable; }
+
grpc_error *grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) {
+ if (cv_wakeup_fds_enabled) {
+ return grpc_cv_wakeup_fd_vtable.init(fd_info);
+ }
return wakeup_fd_vtable->init(fd_info);
}
grpc_error *grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) {
+ if (cv_wakeup_fds_enabled) {
+ return grpc_cv_wakeup_fd_vtable.consume(fd_info);
+ }
return wakeup_fd_vtable->consume(fd_info);
}
grpc_error *grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) {
+ if (cv_wakeup_fds_enabled) {
+ return grpc_cv_wakeup_fd_vtable.wakeup(fd_info);
+ }
return wakeup_fd_vtable->wakeup(fd_info);
}
void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) {
- wakeup_fd_vtable->destroy(fd_info);
+ if (cv_wakeup_fds_enabled) {
+ grpc_cv_wakeup_fd_vtable.destroy(fd_info);
+ } else {
+ wakeup_fd_vtable->destroy(fd_info);
+ }
}
#endif /* GPR_POSIX_WAKEUP_FD */
diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h
index e269f242d8..71d32d97ba 100644
--- a/src/core/lib/iomgr/wakeup_fd_posix.h
+++ b/src/core/lib/iomgr/wakeup_fd_posix.h
@@ -71,6 +71,10 @@ void grpc_wakeup_fd_global_destroy(void);
* purposes only.*/
void grpc_wakeup_fd_global_init_force_fallback(void);
+int grpc_has_wakeup_fd(void);
+int grpc_cv_wakeup_fds_enabled(void);
+void grpc_enable_cv_wakeup_fds(int enable);
+
typedef struct grpc_wakeup_fd grpc_wakeup_fd;
typedef struct grpc_wakeup_fd_vtable {
@@ -88,6 +92,7 @@ struct grpc_wakeup_fd {
};
extern int grpc_allow_specialized_wakeup_fd;
+extern int grpc_allow_pipe_wakeup_fd;
#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 0c6a66e9c8..406f88a02d 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -134,6 +134,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/udp_server.c',
'src/core/lib/iomgr/unix_sockets_posix.c',
'src/core/lib/iomgr/unix_sockets_posix_noop.c',
+ 'src/core/lib/iomgr/wakeup_fd_cv.c',
'src/core/lib/iomgr/wakeup_fd_eventfd.c',
'src/core/lib/iomgr/wakeup_fd_nospecial.c',
'src/core/lib/iomgr/wakeup_fd_pipe.c',
diff --git a/test/core/iomgr/wakeup_fd_cv_test.c b/test/core/iomgr/wakeup_fd_cv_test.c
new file mode 100644
index 0000000000..952985ed7e
--- /dev/null
+++ b/test/core/iomgr/wakeup_fd_cv_test.c
@@ -0,0 +1,240 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <pthread.h>
+
+#include <grpc/support/log.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/iomgr_posix.h"
+#include "src/core/lib/support/env.h"
+
+typedef struct poll_args {
+ struct pollfd *fds;
+ nfds_t nfds;
+ int timeout;
+ int result;
+} poll_args;
+
+gpr_cv poll_cv;
+gpr_mu poll_mu;
+static int socket_event = 0;
+
+// Trigger a "socket" POLLIN in mock_poll()
+void trigger_socket_event() {
+ gpr_mu_lock(&poll_mu);
+ socket_event = 1;
+ gpr_cv_broadcast(&poll_cv);
+ gpr_mu_unlock(&poll_mu);
+}
+
+void reset_socket_event() {
+ gpr_mu_lock(&poll_mu);
+ socket_event = 0;
+ gpr_mu_unlock(&poll_mu);
+}
+
+// Mocks posix poll() function
+int mock_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
+ int res = 0;
+ gpr_timespec poll_time;
+ gpr_mu_lock(&poll_mu);
+ GPR_ASSERT(nfds == 3);
+ GPR_ASSERT(fds[0].fd == 20);
+ GPR_ASSERT(fds[1].fd == 30);
+ GPR_ASSERT(fds[2].fd == 50);
+ GPR_ASSERT(fds[0].events == (POLLIN | POLLHUP));
+ GPR_ASSERT(fds[1].events == (POLLIN | POLLHUP));
+ GPR_ASSERT(fds[2].events == POLLIN);
+
+ if (timeout < 0) {
+ poll_time = gpr_inf_future(GPR_CLOCK_REALTIME);
+ } else {
+ poll_time = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(timeout, GPR_TIMESPAN));
+ }
+
+ if (socket_event || !gpr_cv_wait(&poll_cv, &poll_mu, poll_time)) {
+ fds[0].revents = POLLIN;
+ res = 1;
+ }
+ gpr_mu_unlock(&poll_mu);
+ return res;
+}
+
+void background_poll(void *args) {
+ poll_args *pargs = (poll_args *)args;
+ pargs->result = grpc_poll_function(pargs->fds, pargs->nfds, pargs->timeout);
+}
+
+void test_many_fds(void) {
+ int i;
+ grpc_wakeup_fd fd[1000];
+ for (i = 0; i < 1000; i++) {
+ GPR_ASSERT(grpc_wakeup_fd_init(&fd[i]) == GRPC_ERROR_NONE);
+ }
+ for (i = 0; i < 1000; i++) {
+ grpc_wakeup_fd_destroy(&fd[i]);
+ }
+}
+
+void test_poll_cv_trigger(void) {
+ grpc_wakeup_fd cvfd1, cvfd2, cvfd3;
+ struct pollfd pfds[6];
+ poll_args pargs;
+ gpr_thd_id t_id;
+ gpr_thd_options opt;
+
+ GPR_ASSERT(grpc_wakeup_fd_init(&cvfd1) == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_wakeup_fd_init(&cvfd2) == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_wakeup_fd_init(&cvfd3) == GRPC_ERROR_NONE);
+ GPR_ASSERT(cvfd1.read_fd < 0);
+ GPR_ASSERT(cvfd2.read_fd < 0);
+ GPR_ASSERT(cvfd3.read_fd < 0);
+ GPR_ASSERT(cvfd1.read_fd != cvfd2.read_fd);
+ GPR_ASSERT(cvfd2.read_fd != cvfd3.read_fd);
+ GPR_ASSERT(cvfd1.read_fd != cvfd3.read_fd);
+
+ pfds[0].fd = cvfd1.read_fd;
+ pfds[1].fd = cvfd2.read_fd;
+ pfds[2].fd = 20;
+ pfds[3].fd = 30;
+ pfds[4].fd = cvfd3.read_fd;
+ pfds[5].fd = 50;
+
+ pfds[0].events = 0;
+ pfds[1].events = POLLIN;
+ pfds[2].events = POLLIN | POLLHUP;
+ pfds[3].events = POLLIN | POLLHUP;
+ pfds[4].events = POLLIN;
+ pfds[5].events = POLLIN;
+
+ pargs.fds = pfds;
+ pargs.nfds = 6;
+ pargs.timeout = 1000;
+ pargs.result = -2;
+
+ opt = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&opt);
+ gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
+
+ // Wakeup wakeup_fd not listening for events
+ GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd1) == GRPC_ERROR_NONE);
+ gpr_thd_join(t_id);
+ GPR_ASSERT(pargs.result == 0);
+ GPR_ASSERT(pfds[0].revents == 0);
+ GPR_ASSERT(pfds[1].revents == 0);
+ GPR_ASSERT(pfds[2].revents == 0);
+ GPR_ASSERT(pfds[3].revents == 0);
+ GPR_ASSERT(pfds[4].revents == 0);
+ GPR_ASSERT(pfds[5].revents == 0);
+
+ // Pollin on socket fd
+ pargs.timeout = -1;
+ pargs.result = -2;
+ gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
+ trigger_socket_event();
+ gpr_thd_join(t_id);
+ GPR_ASSERT(pargs.result == 1);
+ GPR_ASSERT(pfds[0].revents == 0);
+ GPR_ASSERT(pfds[1].revents == 0);
+ GPR_ASSERT(pfds[2].revents == POLLIN);
+ GPR_ASSERT(pfds[3].revents == 0);
+ GPR_ASSERT(pfds[4].revents == 0);
+ GPR_ASSERT(pfds[5].revents == 0);
+
+ // Pollin on wakeup fd
+ reset_socket_event();
+ pargs.result = -2;
+ gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
+ GPR_ASSERT(grpc_wakeup_fd_wakeup(&cvfd2) == GRPC_ERROR_NONE);
+ gpr_thd_join(t_id);
+
+ GPR_ASSERT(pargs.result == 1);
+ GPR_ASSERT(pfds[0].revents == 0);
+ GPR_ASSERT(pfds[1].revents == POLLIN);
+ GPR_ASSERT(pfds[2].revents == 0);
+ GPR_ASSERT(pfds[3].revents == 0);
+ GPR_ASSERT(pfds[4].revents == 0);
+ GPR_ASSERT(pfds[5].revents == 0);
+
+ // Pollin on wakeup fd + socket fd
+ trigger_socket_event();
+ pargs.result = -2;
+ gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
+ gpr_thd_join(t_id);
+
+ GPR_ASSERT(pargs.result == 2);
+ GPR_ASSERT(pfds[0].revents == 0);
+ GPR_ASSERT(pfds[1].revents == POLLIN);
+ GPR_ASSERT(pfds[2].revents == POLLIN);
+ GPR_ASSERT(pfds[3].revents == 0);
+ GPR_ASSERT(pfds[4].revents == 0);
+ GPR_ASSERT(pfds[5].revents == 0);
+
+ // No Events
+ pargs.result = -2;
+ pargs.timeout = 1000;
+ reset_socket_event();
+ GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd1) == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_wakeup_fd_consume_wakeup(&cvfd2) == GRPC_ERROR_NONE);
+ gpr_thd_new(&t_id, &background_poll, &pargs, &opt);
+ gpr_thd_join(t_id);
+
+ GPR_ASSERT(pargs.result == 0);
+ GPR_ASSERT(pfds[0].revents == 0);
+ GPR_ASSERT(pfds[1].revents == 0);
+ GPR_ASSERT(pfds[2].revents == 0);
+ GPR_ASSERT(pfds[3].revents == 0);
+ GPR_ASSERT(pfds[4].revents == 0);
+ GPR_ASSERT(pfds[5].revents == 0);
+}
+
+int main(int argc, char **argv) {
+ gpr_setenv("GRPC_POLL_STRATEGY", "poll-cv");
+ grpc_poll_function = &mock_poll;
+ gpr_mu_init(&poll_mu);
+ gpr_cv_init(&poll_cv);
+
+ grpc_iomgr_platform_init();
+ test_many_fds();
+ grpc_iomgr_platform_shutdown();
+
+ grpc_iomgr_platform_init();
+ test_poll_cv_trigger();
+ grpc_iomgr_platform_shutdown();
+ return 0;
+}
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 2c63f36729..3848675add 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -841,6 +841,7 @@ src/core/lib/iomgr/timer.h \
src/core/lib/iomgr/timer_heap.h \
src/core/lib/iomgr/udp_server.h \
src/core/lib/iomgr/unix_sockets_posix.h \
+src/core/lib/iomgr/wakeup_fd_cv.h \
src/core/lib/iomgr/wakeup_fd_pipe.h \
src/core/lib/iomgr/wakeup_fd_posix.h \
src/core/lib/iomgr/workqueue.h \
@@ -1006,6 +1007,7 @@ src/core/lib/iomgr/timer_heap.c \
src/core/lib/iomgr/udp_server.c \
src/core/lib/iomgr/unix_sockets_posix.c \
src/core/lib/iomgr/unix_sockets_posix_noop.c \
+src/core/lib/iomgr/wakeup_fd_cv.c \
src/core/lib/iomgr/wakeup_fd_eventfd.c \
src/core/lib/iomgr/wakeup_fd_nospecial.c \
src/core/lib/iomgr/wakeup_fd_pipe.c \
diff --git a/tools/run_tests/filter_pull_request_tests.py b/tools/run_tests/filter_pull_request_tests.py
new file mode 100644
index 0000000000..55dab42f8a
--- /dev/null
+++ b/tools/run_tests/filter_pull_request_tests.py
@@ -0,0 +1,184 @@
+#!/usr/bin/env python2.7
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Filter out tests based on file differences compared to merge target branch"""
+
+import re
+from subprocess import call, check_output
+
+
+class TestSuite:
+ """
+ Contains tag to identify job as belonging to this test suite and
+ triggers to identify if changed files are relevant
+ """
+ def __init__(self, tags):
+ """
+ Build TestSuite to group tests by their tags
+ :param tag: string used to identify if a job belongs to this TestSuite
+ todo(mattkwong): Change the use of tag because do not want to depend on
+ job.shortname to identify what suite a test belongs to
+ """
+ self.triggers = []
+ self.tags = tags
+
+ def add_trigger(self, trigger):
+ """
+ Add a regex to list of triggers that determine if a changed file should run tests
+ :param trigger: regex matching file relevant to tests
+ """
+ self.triggers.append(trigger)
+
+# Create test suites
+_core_test_suite = TestSuite(['_c_'])
+_cpp_test_suite = TestSuite(['_c++_'])
+_csharp_test_suite = TestSuite(['_csharp_'])
+_node_test_suite = TestSuite(['_node_'])
+_objc_test_suite = TestSuite(['_objc_'])
+_php_test_suite = TestSuite(['_php_', '_php7_'])
+_python_test_suite = TestSuite(['_python_'])
+_ruby_test_suite = TestSuite(['_ruby'])
+_all_test_suites = [_core_test_suite, _cpp_test_suite, _csharp_test_suite,
+ _node_test_suite, _objc_test_suite, _php_test_suite,
+ _python_test_suite, _ruby_test_suite]
+
+# Dictionary of whitelistable files where the key is a regex matching changed files
+# and the value is a list of tests that should be run. An empty list means that
+# the changed files should not trigger any tests. Any changed file that does not
+# match any of these regexes will trigger all tests
+_WHITELIST_DICT = {
+ '^templates/.*': [],
+ '^doc/.*': [],
+ '^examples/.*': [],
+ '^summerofcode/.*': [],
+ '.*README.md$': [],
+ '.*LICENSE$': [],
+ '^src/cpp.*': [_cpp_test_suite],
+ '^src/csharp.*': [_csharp_test_suite],
+ '^src/node.*': [_node_test_suite],
+ '^src/objective-c.*': [_objc_test_suite],
+ '^src/php.*': [_php_test_suite],
+ '^src/python.*': [_python_test_suite],
+ '^src/ruby.*': [_ruby_test_suite],
+ '^test/core.*': [_core_test_suite],
+ '^test/cpp.*': [_cpp_test_suite],
+ '^test/distrib/cpp.*': [_cpp_test_suite],
+ '^test/distrib/csharp.*': [_csharp_test_suite],
+ '^test/distrib/node.*': [_node_test_suite],
+ '^test/distrib/php.*': [_php_test_suite],
+ '^test/distrib/python.*': [_python_test_suite],
+ '^test/distrib/ruby.*': [_ruby_test_suite]
+}
+# Add all triggers to their respective test suites
+for trigger, test_suites in _WHITELIST_DICT.iteritems():
+ for test_suite in test_suites:
+ test_suite.add_trigger(trigger)
+
+
+def _get_changed_files(base_branch):
+ """
+ Get list of changed files between current branch and base of target merge branch
+ """
+ # git fetch might need to be called on Jenkins slave
+ # todo(mattkwong): remove or uncomment below after seeing if Jenkins needs this
+ # call(['git', 'fetch'])
+
+ # Get file changes between branch and merge-base of specified branch
+ # Not combined to be Windows friendly
+ base_commit = check_output(["git", "merge-base", base_branch, "HEAD"]).rstrip()
+ return check_output(["git", "diff", base_commit, "--name-only"]).splitlines()
+
+
+def _can_skip_tests(file_names, triggers):
+ """
+ Determines if tests are skippable based on if all files do not match list of regexes
+ :param file_names: list of changed files generated by _get_changed_files()
+ :param triggers: list of regexes matching file name that indicates tests should be run
+ :return: safe to skip tests
+ """
+ for file_name in file_names:
+ if any(re.match(trigger, file_name) for trigger in triggers):
+ return False
+ return True
+
+
+def _remove_irrelevant_tests(tests, tag):
+ """
+ Filters out tests by config or language - will not remove sanitizer tests
+ :param tests: list of all tests generated by run_tests_matrix.py
+ :param tag: string representing language or config to filter - "_(language)_" or "_(config)"
+ :return: list of relevant tests
+ """
+ # todo(mattkwong): find a more reliable way to filter tests - don't use shortname
+ return [test for test in tests if tag not in test.shortname or
+ any(san_tag in test.shortname for san_tag in ['_asan', '_tsan', '_msan'])]
+
+
+def _remove_sanitizer_tests(tests):
+ """
+ Filters out sanitizer tests
+ :param tests: list of all tests generated by run_tests_matrix.py
+ :return: list of relevant tests
+ """
+ # todo(mattkwong): find a more reliable way to filter tests - don't use shortname
+ return [test for test in tests if
+ all(san_tag not in test.shortname for san_tag in ['_asan', '_tsan', '_msan'])]
+
+
+def filter_tests(tests, base_branch):
+ """
+ Filters out tests that are safe to ignore
+ :param tests: list of all tests generated by run_tests_matrix.py
+ :return: list of relevant tests
+ """
+ print("Finding file differences between %s repo and current branch...\n" % base_branch)
+ changed_files = _get_changed_files(base_branch)
+ for changed_file in changed_files:
+ print(changed_file)
+ print
+
+ # Regex that combines all keys in _WHITELIST_DICT
+ all_triggers = "(" + ")|(".join(_WHITELIST_DICT.keys()) + ")"
+ # Check if all tests have to be run
+ for changed_file in changed_files:
+ if not re.match(all_triggers, changed_file):
+ return(tests)
+ # Filter out tests by language
+ for test_suite in _all_test_suites:
+ if _can_skip_tests(changed_files, test_suite.triggers):
+ for tag in test_suite.tags:
+ print(" Filtering %s tests" % tag)
+ tests = _remove_irrelevant_tests(tests, tag)
+ # Sanitizer tests skipped if core and c++ are skipped
+ if _can_skip_tests(changed_files, _cpp_test_suite.triggers + _core_test_suite.triggers):
+ print(" Filtering Sanitizer tests")
+ tests = _remove_sanitizer_tests(tests)
+
+ return tests
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index dd070b1fe0..e9ee5f538d 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -69,7 +69,7 @@ _FORCE_ENVIRON_FOR_WRAPPERS = {
_POLLING_STRATEGIES = {
- 'linux': ['epoll', 'poll', 'legacy']
+ 'linux': ['epoll', 'poll', 'poll-cv', 'legacy']
}
diff --git a/tools/run_tests/run_tests_matrix.py b/tools/run_tests/run_tests_matrix.py
index 60c21a1e21..550dd10ea5 100755
--- a/tools/run_tests/run_tests_matrix.py
+++ b/tools/run_tests/run_tests_matrix.py
@@ -36,13 +36,14 @@ import multiprocessing
import os
import report_utils
import sys
+from filter_pull_request_tests import filter_tests
_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
os.chdir(_ROOT)
# Set the timeout high to allow enough time for sanitizers and pre-building
# clang docker.
-_RUNTESTS_TIMEOUT = 2*60*60
+_RUNTESTS_TIMEOUT = 4*60*60
# Number of jobs assigned to each run_tests.py instance
_INNER_JOBS = 2
@@ -231,6 +232,15 @@ argp.add_argument('--dry_run',
action='store_const',
const=True,
help='Only print what would be run.')
+argp.add_argument('--filter_pr_tests',
+ default=False,
+ action='store_const',
+ const=True,
+ help='Filters out tests irrelavant to pull request changes.')
+argp.add_argument('--base_branch',
+ default='origin/master',
+ type=str,
+ help='Branch that pull request is requesting to merge into')
args = argp.parse_args()
extra_args = []
@@ -264,6 +274,19 @@ for job in jobs:
print ' %s' % job.shortname
print
+if args.filter_pr_tests:
+ print 'IMPORTANT: Test filtering is not active; this is only for testing.'
+ relevant_jobs = filter_tests(jobs, args.base_branch)
+ # todo(mattkwong): add skipped tests to report.xml
+ print
+ if len(relevant_jobs) == len(jobs):
+ print '(TESTING) No tests will be skipped.'
+ else:
+ print '(TESTING) These tests will be skipped:'
+ for job in list(set(jobs) - set(relevant_jobs)):
+ print ' %s' % job.shortname
+ print
+
if args.dry_run:
print '--dry_run was used, exiting'
sys.exit(1)
diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json
index cbed0fc036..aed184c600 100644
--- a/tools/run_tests/sources_and_headers.json
+++ b/tools/run_tests/sources_and_headers.json
@@ -2084,6 +2084,23 @@
"gpr",
"gpr_test_util",
"grpc",
+ "grpc_test_util"
+ ],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c",
+ "name": "wakeup_fd_cv_test",
+ "src": [
+ "test/core/iomgr/wakeup_fd_cv_test.c"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "gpr",
+ "gpr_test_util",
+ "grpc",
"grpc++",
"grpc++_test_util",
"grpc_test_util"
@@ -6405,6 +6422,7 @@
"src/core/lib/iomgr/timer_heap.h",
"src/core/lib/iomgr/udp_server.h",
"src/core/lib/iomgr/unix_sockets_posix.h",
+ "src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_pipe.h",
"src/core/lib/iomgr/wakeup_fd_posix.h",
"src/core/lib/iomgr/workqueue.h",
@@ -6555,6 +6573,8 @@
"src/core/lib/iomgr/unix_sockets_posix.c",
"src/core/lib/iomgr/unix_sockets_posix.h",
"src/core/lib/iomgr/unix_sockets_posix_noop.c",
+ "src/core/lib/iomgr/wakeup_fd_cv.c",
+ "src/core/lib/iomgr/wakeup_fd_cv.h",
"src/core/lib/iomgr/wakeup_fd_eventfd.c",
"src/core/lib/iomgr/wakeup_fd_nospecial.c",
"src/core/lib/iomgr/wakeup_fd_pipe.c",
diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json
index beaf81d4ba..3ebf67de95 100644
--- a/tools/run_tests/tests.json
+++ b/tools/run_tests/tests.json
@@ -2066,6 +2066,25 @@
"ci_platforms": [
"linux",
"mac",
+ "posix"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "flaky": false,
+ "gtest": false,
+ "language": "c",
+ "name": "wakeup_fd_cv_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix"
+ ]
+ },
+ {
+ "args": [],
+ "ci_platforms": [
+ "linux",
+ "mac",
"posix",
"windows"
],
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj
index 21561f0057..adfebda736 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj
@@ -350,6 +350,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\timer_heap.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\udp_server.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\workqueue.h" />
@@ -574,6 +575,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_nospecial.c">
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
index ea61c43859..ee5bf362f1 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
@@ -172,6 +172,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@@ -830,6 +833,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
index 43c28265ab..382494a839 100644
--- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
+++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
@@ -243,6 +243,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\timer_heap.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\udp_server.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\workqueue.h" />
@@ -422,6 +423,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_nospecial.c">
diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
index fded5c4e1b..2f7854d72b 100644
--- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
@@ -226,6 +226,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@@ -617,6 +620,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
index efe285640a..2b5613a021 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
@@ -340,6 +340,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\timer_heap.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\udp_server.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\workqueue.h" />
@@ -542,6 +543,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_nospecial.c">
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
index 911cf112cf..e52425a07b 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
@@ -175,6 +175,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix_noop.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.c">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_eventfd.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@@ -740,6 +743,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\unix_sockets_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_cv.h">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\wakeup_fd_pipe.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>