From d8a3c048e25f141a19a60e7f2439d699a21cdcc7 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 9 Sep 2016 12:42:37 -0700 Subject: Tie workqueue implementation to event engine --- src/core/lib/iomgr/ev_posix.c | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) (limited to 'src/core/lib/iomgr/ev_posix.c') diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 6536672685..26618f8d55 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -258,4 +258,27 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_error *grpc_kick_poller(void) { return g_event_engine->kick_poller(); } +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, + int line, const char *reason) { + return g_event_engine->workqueue_ref(workqueue, file, line, reason); +} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + const char *file, int line, const char *reason) { + g_event_engine->workqueue_unref(exec_ctx, workqueue, file, line, reason); +} +#else +grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) { + return g_event_engine->workqueue_ref(workqueue); +} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { + g_event_engine->workqueue_unref(exec_ctx, workqueue); +} +#endif + +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error) { + g_event_engine->workqueue_enqueue(exec_ctx, workqueue, closure, error); +} + #endif // GPR_POSIX_SOCKET -- cgit v1.2.3 From e02c7ed37b1f98ed95b8f47e053d4a617fd7c530 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Thu, 29 Sep 2016 09:15:49 -0700 Subject: Broke cv polling into seperate engine --- BUILD | 12 + CMakeLists.txt | 5 + Makefile | 6 + binding.gyp | 1 + build.yaml | 2 + config.m4 | 1 + gRPC-Core.podspec | 3 + grpc.gemspec | 2 + package.xml | 2 + src/core/lib/iomgr/ev_poll_cv_posix.c | 282 +++++++++++++++++++++ src/core/lib/iomgr/ev_poll_cv_posix.h | 65 +++++ src/core/lib/iomgr/ev_posix.c | 2 + src/core/lib/iomgr/wakeup_fd_cv.c | 230 +---------------- src/core/lib/iomgr/wakeup_fd_cv.h | 5 +- src/core/lib/iomgr/wakeup_fd_posix.c | 9 +- src/core/lib/iomgr/wakeup_fd_posix.h | 1 + src/python/grpcio/grpc_core_dependencies.py | 1 + test/core/iomgr/wakeup_fd_cv_test.c | 19 +- tools/doxygen/Doxyfile.c++.internal | 2 + tools/doxygen/Doxyfile.core.internal | 2 + tools/run_tests/run_tests.py | 2 +- tools/run_tests/sources_and_headers.json | 3 + vsprojects/vcxproj/grpc++/grpc++.vcxproj | 3 + vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters | 6 + .../grpc++_unsecure/grpc++_unsecure.vcxproj | 3 + .../grpc++_unsecure.vcxproj.filters | 6 + vsprojects/vcxproj/grpc/grpc.vcxproj | 3 + vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 6 + .../vcxproj/grpc_test_util/grpc_test_util.vcxproj | 3 + .../grpc_test_util/grpc_test_util.vcxproj.filters | 6 + .../vcxproj/grpc_unsecure/grpc_unsecure.vcxproj | 3 + .../grpc_unsecure/grpc_unsecure.vcxproj.filters | 6 + 32 files changed, 463 insertions(+), 239 deletions(-) create mode 100644 src/core/lib/iomgr/ev_poll_cv_posix.c create mode 100644 src/core/lib/iomgr/ev_poll_cv_posix.h (limited to 'src/core/lib/iomgr/ev_posix.c') diff --git a/BUILD b/BUILD index 67e481e0d2..a8d1b4a5b9 100644 --- a/BUILD +++ b/BUILD @@ -184,6 +184,7 @@ cc_library( "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", + "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", "src/core/lib/iomgr/exec_ctx.h", @@ -341,6 +342,7 @@ cc_library( "src/core/lib/iomgr/error.c", "src/core/lib/iomgr/ev_epoll_linux.c", "src/core/lib/iomgr/ev_poll_and_epoll_posix.c", + "src/core/lib/iomgr/ev_poll_cv_posix.c", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_posix.c", "src/core/lib/iomgr/exec_ctx.c", @@ -581,6 +583,7 @@ cc_library( "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", + "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", "src/core/lib/iomgr/exec_ctx.h", @@ -724,6 +727,7 @@ cc_library( "src/core/lib/iomgr/error.c", "src/core/lib/iomgr/ev_epoll_linux.c", "src/core/lib/iomgr/ev_poll_and_epoll_posix.c", + "src/core/lib/iomgr/ev_poll_cv_posix.c", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_posix.c", "src/core/lib/iomgr/exec_ctx.c", @@ -935,6 +939,7 @@ cc_library( "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", + "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", "src/core/lib/iomgr/exec_ctx.h", @@ -1069,6 +1074,7 @@ cc_library( "src/core/lib/iomgr/error.c", "src/core/lib/iomgr/ev_epoll_linux.c", "src/core/lib/iomgr/ev_poll_and_epoll_posix.c", + "src/core/lib/iomgr/ev_poll_cv_posix.c", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_posix.c", "src/core/lib/iomgr/exec_ctx.c", @@ -1284,6 +1290,7 @@ cc_library( "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", + "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", "src/core/lib/iomgr/exec_ctx.h", @@ -1398,6 +1405,7 @@ cc_library( "src/core/lib/iomgr/error.c", "src/core/lib/iomgr/ev_epoll_linux.c", "src/core/lib/iomgr/ev_poll_and_epoll_posix.c", + "src/core/lib/iomgr/ev_poll_cv_posix.c", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_posix.c", "src/core/lib/iomgr/exec_ctx.c", @@ -1696,6 +1704,7 @@ cc_library( "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", + "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", "src/core/lib/iomgr/exec_ctx.h", @@ -1805,6 +1814,7 @@ cc_library( "src/core/lib/iomgr/error.c", "src/core/lib/iomgr/ev_epoll_linux.c", "src/core/lib/iomgr/ev_poll_and_epoll_posix.c", + "src/core/lib/iomgr/ev_poll_cv_posix.c", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_posix.c", "src/core/lib/iomgr/exec_ctx.c", @@ -2196,6 +2206,7 @@ objc_library( "src/core/lib/iomgr/error.c", "src/core/lib/iomgr/ev_epoll_linux.c", "src/core/lib/iomgr/ev_poll_and_epoll_posix.c", + "src/core/lib/iomgr/ev_poll_cv_posix.c", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_posix.c", "src/core/lib/iomgr/exec_ctx.c", @@ -2415,6 +2426,7 @@ objc_library( "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", + "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", "src/core/lib/iomgr/exec_ctx.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index f07fa26e63..1f8c4ca0f0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -313,6 +313,7 @@ add_library(grpc src/core/lib/iomgr/error.c src/core/lib/iomgr/ev_epoll_linux.c src/core/lib/iomgr/ev_poll_and_epoll_posix.c + src/core/lib/iomgr/ev_poll_cv_posix.c src/core/lib/iomgr/ev_poll_posix.c src/core/lib/iomgr/ev_posix.c src/core/lib/iomgr/exec_ctx.c @@ -572,6 +573,7 @@ add_library(grpc_cronet src/core/lib/iomgr/error.c src/core/lib/iomgr/ev_epoll_linux.c src/core/lib/iomgr/ev_poll_and_epoll_posix.c + src/core/lib/iomgr/ev_poll_cv_posix.c src/core/lib/iomgr/ev_poll_posix.c src/core/lib/iomgr/ev_posix.c src/core/lib/iomgr/exec_ctx.c @@ -804,6 +806,7 @@ add_library(grpc_unsecure src/core/lib/iomgr/error.c src/core/lib/iomgr/ev_epoll_linux.c src/core/lib/iomgr/ev_poll_and_epoll_posix.c + src/core/lib/iomgr/ev_poll_cv_posix.c src/core/lib/iomgr/ev_poll_posix.c src/core/lib/iomgr/ev_posix.c src/core/lib/iomgr/exec_ctx.c @@ -1062,6 +1065,7 @@ add_library(grpc++ src/core/lib/iomgr/error.c src/core/lib/iomgr/ev_epoll_linux.c src/core/lib/iomgr/ev_poll_and_epoll_posix.c + src/core/lib/iomgr/ev_poll_cv_posix.c src/core/lib/iomgr/ev_poll_posix.c src/core/lib/iomgr/ev_posix.c src/core/lib/iomgr/exec_ctx.c @@ -1420,6 +1424,7 @@ add_library(grpc++_unsecure src/core/lib/iomgr/error.c src/core/lib/iomgr/ev_epoll_linux.c src/core/lib/iomgr/ev_poll_and_epoll_posix.c + src/core/lib/iomgr/ev_poll_cv_posix.c src/core/lib/iomgr/ev_poll_posix.c src/core/lib/iomgr/ev_posix.c src/core/lib/iomgr/exec_ctx.c diff --git a/Makefile b/Makefile index a23e12687d..0b23fc44bc 100644 --- a/Makefile +++ b/Makefile @@ -2538,6 +2538,7 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ + src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ @@ -2815,6 +2816,7 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ + src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ @@ -3081,6 +3083,7 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ + src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ @@ -3275,6 +3278,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ + src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ @@ -3616,6 +3620,7 @@ LIBGRPC++_SRC = \ src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ + src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ @@ -4252,6 +4257,7 @@ LIBGRPC++_UNSECURE_SRC = \ src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ + src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ diff --git a/binding.gyp b/binding.gyp index 1b68b63d22..d97b1410e1 100644 --- a/binding.gyp +++ b/binding.gyp @@ -585,6 +585,7 @@ 'src/core/lib/iomgr/error.c', 'src/core/lib/iomgr/ev_epoll_linux.c', 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c', + 'src/core/lib/iomgr/ev_poll_cv_posix.c', 'src/core/lib/iomgr/ev_poll_posix.c', 'src/core/lib/iomgr/ev_posix.c', 'src/core/lib/iomgr/exec_ctx.c', diff --git a/build.yaml b/build.yaml index d4f731e85f..7693c911f9 100644 --- a/build.yaml +++ b/build.yaml @@ -186,6 +186,7 @@ filegroups: - src/core/lib/iomgr/error.h - src/core/lib/iomgr/ev_epoll_linux.h - src/core/lib/iomgr/ev_poll_and_epoll_posix.h + - src/core/lib/iomgr/ev_poll_cv_posix.h - src/core/lib/iomgr/ev_poll_posix.h - src/core/lib/iomgr/ev_posix.h - src/core/lib/iomgr/exec_ctx.h @@ -268,6 +269,7 @@ filegroups: - src/core/lib/iomgr/error.c - src/core/lib/iomgr/ev_epoll_linux.c - src/core/lib/iomgr/ev_poll_and_epoll_posix.c + - src/core/lib/iomgr/ev_poll_cv_posix.c - src/core/lib/iomgr/ev_poll_posix.c - src/core/lib/iomgr/ev_posix.c - src/core/lib/iomgr/exec_ctx.c diff --git a/config.m4 b/config.m4 index 6b605fbcd6..bebcab601a 100644 --- a/config.m4 +++ b/config.m4 @@ -104,6 +104,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ + src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 92b3022cf7..73953cc8bd 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -275,6 +275,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/error.h', 'src/core/lib/iomgr/ev_epoll_linux.h', 'src/core/lib/iomgr/ev_poll_and_epoll_posix.h', + 'src/core/lib/iomgr/ev_poll_cv_posix.h', 'src/core/lib/iomgr/ev_poll_posix.h', 'src/core/lib/iomgr/ev_posix.h', 'src/core/lib/iomgr/exec_ctx.h', @@ -436,6 +437,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/error.c', 'src/core/lib/iomgr/ev_epoll_linux.c', 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c', + 'src/core/lib/iomgr/ev_poll_cv_posix.c', 'src/core/lib/iomgr/ev_poll_posix.c', 'src/core/lib/iomgr/ev_posix.c', 'src/core/lib/iomgr/exec_ctx.c', @@ -639,6 +641,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/error.h', 'src/core/lib/iomgr/ev_epoll_linux.h', 'src/core/lib/iomgr/ev_poll_and_epoll_posix.h', + 'src/core/lib/iomgr/ev_poll_cv_posix.h', 'src/core/lib/iomgr/ev_poll_posix.h', 'src/core/lib/iomgr/ev_posix.h', 'src/core/lib/iomgr/exec_ctx.h', diff --git a/grpc.gemspec b/grpc.gemspec index d672bb1284..64441ead59 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -195,6 +195,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/error.h ) s.files += %w( src/core/lib/iomgr/ev_epoll_linux.h ) s.files += %w( src/core/lib/iomgr/ev_poll_and_epoll_posix.h ) + s.files += %w( src/core/lib/iomgr/ev_poll_cv_posix.h ) s.files += %w( src/core/lib/iomgr/ev_poll_posix.h ) s.files += %w( src/core/lib/iomgr/ev_posix.h ) s.files += %w( src/core/lib/iomgr/exec_ctx.h ) @@ -356,6 +357,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/error.c ) s.files += %w( src/core/lib/iomgr/ev_epoll_linux.c ) s.files += %w( src/core/lib/iomgr/ev_poll_and_epoll_posix.c ) + s.files += %w( src/core/lib/iomgr/ev_poll_cv_posix.c ) s.files += %w( src/core/lib/iomgr/ev_poll_posix.c ) s.files += %w( src/core/lib/iomgr/ev_posix.c ) s.files += %w( src/core/lib/iomgr/exec_ctx.c ) diff --git a/package.xml b/package.xml index 02cc271fe5..3e2d9d278c 100644 --- a/package.xml +++ b/package.xml @@ -202,6 +202,7 @@ + @@ -363,6 +364,7 @@ + diff --git a/src/core/lib/iomgr/ev_poll_cv_posix.c b/src/core/lib/iomgr/ev_poll_cv_posix.c new file mode 100644 index 0000000000..596a3025eb --- /dev/null +++ b/src/core/lib/iomgr/ev_poll_cv_posix.c @@ -0,0 +1,282 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include + +#ifdef GPR_POSIX_SOCKET + +#include "src/core/lib/iomgr/ev_poll_cv_posix.h" + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/iomgr/ev_poll_posix.h" +#include "src/core/lib/iomgr/wakeup_fd_posix.h" + +#define POLL_PERIOD_MS 1000 +#define DEFAULT_TABLE_SIZE 16 + +// poll_result is owned by parent thread +typedef struct poll_result { + struct pollfd* fds; + gpr_cv* cv; + int completed; + int res; + int err; +} poll_result; + +// poll_args is owned by spawned thread +typedef struct poll_args { + struct pollfd* fds; + nfds_t nfds; + int timeout; + poll_result* result; +} poll_args; + +cv_fd_table g_cvfds; + +// Poll in a background thread +static void run_poll(void* arg) { + int result, timeout; + poll_args* pargs = (poll_args*)arg; + gpr_mu_lock(&g_cvfds.mu); + if (pargs->result != NULL) { + while (pargs->result != NULL) { + if (pargs->timeout < 0) { + timeout = POLL_PERIOD_MS; + } else { + timeout = GPR_MIN(POLL_PERIOD_MS, pargs->timeout); + pargs->timeout -= timeout; + } + gpr_mu_unlock(&g_cvfds.mu); + result = g_cvfds.poll(pargs->fds, pargs->nfds, timeout); + gpr_mu_lock(&g_cvfds.mu); + if (pargs->result != NULL) { + if (result != 0 || pargs->timeout == 0) { + memcpy(pargs->result->fds, pargs->fds, + sizeof(struct pollfd) * pargs->nfds); + pargs->result->res = result; + pargs->result->err = errno; + pargs->result->completed = 1; + gpr_cv_signal(pargs->result->cv); + break; + } + } + } + } + gpr_free(pargs->fds); + gpr_free(pargs); + gpr_mu_unlock(&g_cvfds.mu); +} + +// This function overrides poll() to handle condition variable wakeup fds +static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { + unsigned int i; + int res, idx; + cv_node *cvn, *prev; + struct pollfd* sockfds; + nfds_t nsockfds = 0; + gpr_cv pollcv; + gpr_thd_id t_id; + gpr_thd_options opt; + poll_args* pargs; + poll_result* pres; + gpr_mu_lock(&g_cvfds.mu); + gpr_cv_init(&pollcv); + for (i = 0; i < nfds; i++) { + fds[i].revents = 0; + if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { + idx = FD_TO_IDX(fds[i].fd); + cvn = gpr_malloc(sizeof(cv_node)); + cvn->cv = &pollcv; + cvn->next = g_cvfds.cvfds[idx].cvs; + g_cvfds.cvfds[idx].cvs = cvn; + // We should return immediately if there are pending events, + // but we still need to call poll() to check for socket events + if (g_cvfds.cvfds[idx].is_set) { + timeout = 0; + } + } else if (fds[i].fd >= 0) { + nsockfds++; + } + } + sockfds = gpr_malloc(sizeof(struct pollfd) * nsockfds); + idx = 0; + for (i = 0; i < nfds; i++) { + if (fds[i].fd >= 0) { + sockfds[idx].fd = fds[i].fd; + sockfds[idx].events = fds[i].events; + sockfds[idx].revents = 0; + idx++; + } + } + + errno = 0; + if (nsockfds > 0) { + pres = gpr_malloc(sizeof(struct poll_result)); + pargs = gpr_malloc(sizeof(struct poll_args)); + + pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds); + memcpy(pargs->fds, sockfds, sizeof(struct pollfd) * nsockfds); + pargs->nfds = nsockfds; + pargs->timeout = timeout; + pargs->result = pres; + + pres->fds = sockfds; + pres->cv = &pollcv; + pres->completed = 0; + pres->res = 0; + pres->err = 0; + + opt = gpr_thd_options_default(); + gpr_thd_options_set_detached(&opt); + gpr_thd_new(&t_id, &run_poll, pargs, &opt); + // We want the poll() thread to trigger the deadline, so wait forever here + gpr_cv_wait(&pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + if (!pres->completed) { + pargs->result = NULL; + } + res = pres->res; + errno = pres->err; + gpr_free(pres); + } else { + gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); + deadline = + gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); + gpr_cv_wait(&pollcv, &g_cvfds.mu, deadline); + res = 0; + } + idx = 0; + for (i = 0; i < nfds; i++) { + if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { + cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs; + prev = NULL; + while (cvn->cv != &pollcv) { + prev = cvn; + cvn = cvn->next; + GPR_ASSERT(cvn); + } + if (!prev) { + g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next; + } else { + prev->next = cvn->next; + } + gpr_free(cvn); + + if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) { + fds[i].revents = POLLIN; + if (res >= 0) res++; + } + } else if (fds[i].fd >= 0) { + fds[i].revents = sockfds[idx].revents; + idx++; + } + } + gpr_free(sockfds); + gpr_cv_destroy(&pollcv); + gpr_mu_unlock(&g_cvfds.mu); + + return res; +} + +static void grpc_global_cv_fd_table_init() { + gpr_mu_init(&g_cvfds.mu); + gpr_mu_lock(&g_cvfds.mu); + g_cvfds.size = DEFAULT_TABLE_SIZE; + g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * DEFAULT_TABLE_SIZE); + g_cvfds.free_fds = NULL; + for (int i = 0; i < DEFAULT_TABLE_SIZE; i++) { + g_cvfds.cvfds[i].is_set = 0; + g_cvfds.cvfds[i].cvs = NULL; + g_cvfds.cvfds[i].next_free = g_cvfds.free_fds; + g_cvfds.free_fds = &g_cvfds.cvfds[i]; + } + // Override the poll function with one that supports cvfds + g_cvfds.poll = grpc_poll_function; + grpc_poll_function = &cvfd_poll; + gpr_mu_unlock(&g_cvfds.mu); +} + +static void grpc_global_cv_fd_table_shutdown() { + gpr_mu_lock(&g_cvfds.mu); + grpc_poll_function = g_cvfds.poll; + gpr_free(g_cvfds.cvfds); + gpr_mu_unlock(&g_cvfds.mu); + gpr_mu_destroy(&g_cvfds.mu); +} + +/******************************************************************************* + * event engine binding + */ + +static const grpc_event_engine_vtable* ev_poll_vtable; +static grpc_event_engine_vtable vtable; + +static void shutdown_engine(void) { + ev_poll_vtable->shutdown_engine(); + grpc_global_cv_fd_table_shutdown(); +} + +const grpc_event_engine_vtable* grpc_init_poll_cv_posix(void) { + int has_wakeup_fd = grpc_has_wakeup_fd; + int allow_specialized_wakeup_fd = grpc_allow_specialized_wakeup_fd; + int allow_pipe_wakeup_fd = grpc_allow_pipe_wakeup_fd; + grpc_global_cv_fd_table_init(); + grpc_has_wakeup_fd = 1; + grpc_allow_specialized_wakeup_fd = 0; + grpc_allow_pipe_wakeup_fd = 0; + grpc_wakeup_fd_global_init(); + ev_poll_vtable = grpc_init_poll_posix(); + if (!ev_poll_vtable) { + grpc_global_cv_fd_table_shutdown(); + grpc_has_wakeup_fd = has_standard_wakeup_fd; + grpc_allow_specialized_wakeup_fd = allow_specialized_wakeup_fd; + grpc_allow_pipe_wakeup_fd = allow_pipe_wakeup_fd; + grpc_has_wakeup_fd = has_standard_wakeup_fd; + grpc_global_cv_fd_table_init(); + return NULL; + } + + vtable = *ev_poll_vtable; + vtable.shutdown_engine = shutdown_engine; + return &vtable; +} + +#endif /* GPR_POSIX_SOCKET */ diff --git a/src/core/lib/iomgr/ev_poll_cv_posix.h b/src/core/lib/iomgr/ev_poll_cv_posix.h new file mode 100644 index 0000000000..0fd3e5bbb4 --- /dev/null +++ b/src/core/lib/iomgr/ev_poll_cv_posix.h @@ -0,0 +1,65 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_EV_POLL_CV_POSIX_H +#define GRPC_CORE_LIB_IOMGR_EV_POLL_CV_POSIX_H + +#include + +#include "src/core/lib/iomgr/ev_posix.h" + +#define FD_TO_IDX(fd) (-(fd)-1) +#define IDX_TO_FD(idx) (-(idx)-1) + +typedef struct cv_node { + gpr_cv* cv; + struct cv_node* next; +} cv_node; + +typedef struct fd_node { + int is_set; + cv_node* cvs; + struct fd_node* next_free; +} fd_node; + +typedef struct cv_fd_table { + gpr_mu mu; + fd_node* cvfds; + fd_node* free_fds; + unsigned int size; + grpc_poll_function_type poll; +} cv_fd_table; + +const grpc_event_engine_vtable* grpc_init_poll_cv_posix(void); + +#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */ diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 6536672685..2fc8ccfa91 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -46,6 +46,7 @@ #include "src/core/lib/iomgr/ev_epoll_linux.h" #include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h" +#include "src/core/lib/iomgr/ev_poll_cv_posix.h" #include "src/core/lib/iomgr/ev_poll_posix.h" #include "src/core/lib/support/env.h" @@ -66,6 +67,7 @@ typedef struct { static const event_engine_factory g_factories[] = { {"epoll", grpc_init_epoll_linux}, {"poll", grpc_init_poll_posix}, + {"poll-cv", grpc_init_poll_cv_posix}, {"legacy", grpc_init_poll_and_epoll_posix}, }; diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.c index bfdc2cb422..651e2f663d 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.c +++ b/src/core/lib/iomgr/wakeup_fd_cv.c @@ -35,8 +35,6 @@ #ifdef GPR_POSIX_WAKEUP_FD -#include "src/core/lib/iomgr/wakeup_fd_posix.h" - #include #include @@ -47,200 +45,16 @@ #include #include -#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/ev_poll_cv_posix.h" #define MAX_TABLE_RESIZE 256 -#define DEFAULT_TABLE_SIZE 16 -#define POLL_PERIOD_MS 1000 - -#define FD_TO_IDX(fd) (-(fd)-1) -#define IDX_TO_FD(idx) (-(idx)-1) - -typedef struct cv_node { - gpr_cv* cv; - struct cv_node* next; -} cv_node; - -typedef struct fd_node { - int is_set; - cv_node* cvs; - struct fd_node* next_free; -} fd_node; - -typedef struct cv_fd_table { - fd_node* cvfds; - fd_node* free_fds; - unsigned int size; - grpc_poll_function_type poll; -} cv_fd_table; - -typedef struct poll_result { - struct pollfd* fds; - gpr_cv* cv; - int completed; - int res; - int err; -} poll_result; - -typedef struct poll_args { - struct pollfd* fds; - nfds_t nfds; - int timeout; - poll_result* result; -} poll_args; - -static gpr_mu g_mu = PTHREAD_MUTEX_INITIALIZER; -static cv_fd_table g_cvfds; - -// Some environments do not implement pthread_cancel(), so we run -// this poll in a detached thread, and wake up periodically and -// check if the calling thread is still waiting on a result -static void run_poll(void* arg) { - int result, timeout; - poll_args* pargs = (poll_args*)arg; - gpr_mu_lock(&g_mu); - if (pargs->result != NULL) { - while (pargs->result != NULL) { - if (pargs->timeout < 0) { - timeout = POLL_PERIOD_MS; - } else { - timeout = GPR_MIN(POLL_PERIOD_MS, pargs->timeout); - pargs->timeout -= timeout; - } - gpr_mu_unlock(&g_mu); - result = g_cvfds.poll(pargs->fds, pargs->nfds, timeout); - gpr_mu_lock(&g_mu); - if (pargs->result != NULL) { - if (result != 0 || pargs->timeout == 0) { - memcpy(pargs->result->fds, pargs->fds, - sizeof(struct pollfd) * pargs->nfds); - pargs->result->res = result; - pargs->result->err = errno; - pargs->result->completed = 1; - gpr_cv_signal(pargs->result->cv); - break; - } - } - } - } - gpr_free(pargs->fds); - gpr_free(pargs); - gpr_mu_unlock(&g_mu); -} - -int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { - unsigned int i; - int res, idx; - cv_node *cvn, *prev; - struct pollfd* sockfds; - nfds_t nsockfds = 0; - gpr_cv pollcv; - gpr_thd_id t_id; - gpr_thd_options opt; - poll_args* pargs; - poll_result* pres; - gpr_mu_lock(&g_mu); - gpr_cv_init(&pollcv); - for (i = 0; i < nfds; i++) { - fds[i].revents = 0; - if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { - idx = FD_TO_IDX(fds[i].fd); - cvn = gpr_malloc(sizeof(cv_node)); - cvn->cv = &pollcv; - cvn->next = g_cvfds.cvfds[idx].cvs; - g_cvfds.cvfds[idx].cvs = cvn; - // We should return immediately if there are pending events, - // but we still need to call poll() to check for socket events - if (g_cvfds.cvfds[idx].is_set) { - timeout = 0; - } - } else if (fds[i].fd >= 0) { - nsockfds++; - } - } - sockfds = gpr_malloc(sizeof(struct pollfd) * nsockfds); - idx = 0; - for (i = 0; i < nfds; i++) { - if (fds[i].fd >= 0) { - sockfds[idx].fd = fds[i].fd; - sockfds[idx].events = fds[i].events; - sockfds[idx].revents = 0; - idx++; - } - } - - errno = 0; - if (nsockfds > 0) { - pres = gpr_malloc(sizeof(struct poll_result)); - pargs = gpr_malloc(sizeof(struct poll_args)); - - pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds); - memcpy(pargs->fds, sockfds, sizeof(struct pollfd) * nsockfds); - pargs->nfds = nsockfds; - pargs->timeout = timeout; - pargs->result = pres; - - pres->fds = sockfds; - pres->cv = &pollcv; - pres->completed = 0; - pres->res = 0; - pres->err = 0; - - opt = gpr_thd_options_default(); - gpr_thd_options_set_detached(&opt); - gpr_thd_new(&t_id, &run_poll, pargs, &opt); - // We want the poll() thread to trigger the deadline, so wait forever here - gpr_cv_wait(&pollcv, &g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); - if (!pres->completed) { - pargs->result = NULL; - } - res = pres->res; - errno = pres->err; - gpr_free(pres); - } else { - gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); - deadline = - gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); - gpr_cv_wait(&pollcv, &g_mu, deadline); - res = 0; - } - idx = 0; - for (i = 0; i < nfds; i++) { - if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { - cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs; - prev = NULL; - while (cvn->cv != &pollcv) { - prev = cvn; - cvn = cvn->next; - GPR_ASSERT(cvn); - } - if (!prev) { - g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next; - } else { - prev->next = cvn->next; - } - gpr_free(cvn); - - if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) { - fds[i].revents = POLLIN; - if (res >= 0) res++; - } - } else if (fds[i].fd >= 0) { - fds[i].revents = sockfds[idx].revents; - idx++; - } - } - gpr_free(sockfds); - gpr_cv_destroy(&pollcv); - gpr_mu_unlock(&g_mu); - return res; -} +extern cv_fd_table g_cvfds; static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { unsigned int i, newsize; int idx; - gpr_mu_lock(&g_mu); + gpr_mu_lock(&g_cvfds.mu); if (!g_cvfds.free_fds) { newsize = GPR_MIN(g_cvfds.size * 2, g_cvfds.size + MAX_TABLE_RESIZE); g_cvfds.cvfds = gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize); @@ -259,51 +73,27 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { g_cvfds.cvfds[idx].is_set = 0; fd_info->read_fd = IDX_TO_FD(idx); fd_info->write_fd = -1; - gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_cvfds.mu); return GRPC_ERROR_NONE; } -void grpc_global_cv_fd_table_init() { - gpr_mu_lock(&g_mu); - g_cvfds.size = DEFAULT_TABLE_SIZE; - g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * DEFAULT_TABLE_SIZE); - g_cvfds.free_fds = NULL; - for (int i = 0; i < DEFAULT_TABLE_SIZE; i++) { - g_cvfds.cvfds[i].is_set = 0; - g_cvfds.cvfds[i].cvs = NULL; - g_cvfds.cvfds[i].next_free = g_cvfds.free_fds; - g_cvfds.free_fds = &g_cvfds.cvfds[i]; - } - // Override the poll function with one that supports cvfds - g_cvfds.poll = grpc_poll_function; - grpc_poll_function = &cvfd_poll; - gpr_mu_unlock(&g_mu); -} - -void grpc_global_cv_fd_table_shutdown() { - gpr_mu_lock(&g_mu); - grpc_poll_function = g_cvfds.poll; - gpr_free(g_cvfds.cvfds); - gpr_mu_unlock(&g_mu); -} - static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) { cv_node* cvn; - gpr_mu_lock(&g_mu); + gpr_mu_lock(&g_cvfds.mu); g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 1; cvn = g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs; while (cvn) { gpr_cv_signal(cvn->cv); cvn = cvn->next; } - gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_cvfds.mu); return GRPC_ERROR_NONE; } static grpc_error* cv_fd_consume(grpc_wakeup_fd* fd_info) { - gpr_mu_lock(&g_mu); + gpr_mu_lock(&g_cvfds.mu); g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 0; - gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_cvfds.mu); return GRPC_ERROR_NONE; } @@ -311,12 +101,12 @@ static void cv_fd_destroy(grpc_wakeup_fd* fd_info) { if (fd_info->read_fd == 0) { return; } - gpr_mu_lock(&g_mu); + gpr_mu_lock(&g_cvfds.mu); // Assert that there are no active pollers GPR_ASSERT(!g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs); g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds; g_cvfds.free_fds = &g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)]; - gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_cvfds.mu); } static int cv_check_availability(void) { return 1; } diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index 22ee6c0bbe..e57fc28363 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -41,7 +41,7 @@ * A global table of cv wakeup fds is mantained. A cv wakeup fd is a negative * file descriptor. poll() is then run in a background thread with only the * real socket fds while we wait on a condition variable trigged by either the - * poll() called or a wakeup_fd() call. + * poll() completion or a wakeup_fd() call. * */ @@ -50,9 +50,6 @@ #include "src/core/lib/iomgr/wakeup_fd_posix.h" -void grpc_global_cv_fd_table_init(); -void grpc_global_cv_fd_table_shutdown(); - extern grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable; #endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H */ diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.c index 564b836154..48ed92abe8 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.c +++ b/src/core/lib/iomgr/wakeup_fd_posix.c @@ -33,7 +33,7 @@ #include -#ifdef GPR_POSIX_WAKEUP_FD +#ifdef GPR_POSIX_SOCKET #include #include "src/core/lib/iomgr/wakeup_fd_cv.h" @@ -43,6 +43,8 @@ static const grpc_wakeup_fd_vtable *wakeup_fd_vtable = NULL; int grpc_allow_specialized_wakeup_fd = 1; int grpc_allow_pipe_wakeup_fd = 1; +int grpc_has_wakeup_fd = 1; + void grpc_wakeup_fd_global_init(void) { if (grpc_allow_specialized_wakeup_fd && @@ -52,15 +54,12 @@ void grpc_wakeup_fd_global_init(void) { grpc_pipe_wakeup_fd_vtable.check_availability()) { wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable; } else { + grpc_has_wakeup_fd = 0; wakeup_fd_vtable = &grpc_cv_wakeup_fd_vtable; - grpc_global_cv_fd_table_init(); } } void grpc_wakeup_fd_global_destroy(void) { - if (wakeup_fd_vtable == &grpc_cv_wakeup_fd_vtable) { - grpc_global_cv_fd_table_shutdown(); - } wakeup_fd_vtable = NULL; } diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h index a9f902bc9f..bd0fb46da1 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.h +++ b/src/core/lib/iomgr/wakeup_fd_posix.h @@ -89,6 +89,7 @@ struct grpc_wakeup_fd { extern int grpc_allow_specialized_wakeup_fd; extern int grpc_allow_pipe_wakeup_fd; +extern int grpc_has_wakeup_fd; #define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd) diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index dc2ce46979..0d4c7d1c0e 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -98,6 +98,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/error.c', 'src/core/lib/iomgr/ev_epoll_linux.c', 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c', + 'src/core/lib/iomgr/ev_poll_cv_posix.c', 'src/core/lib/iomgr/ev_poll_posix.c', 'src/core/lib/iomgr/ev_posix.c', 'src/core/lib/iomgr/exec_ctx.c', diff --git a/test/core/iomgr/wakeup_fd_cv_test.c b/test/core/iomgr/wakeup_fd_cv_test.c index 2cd777536d..8ac78a2f41 100644 --- a/test/core/iomgr/wakeup_fd_cv_test.c +++ b/test/core/iomgr/wakeup_fd_cv_test.c @@ -39,7 +39,7 @@ #include #include "src/core/lib/iomgr/ev_posix.h" -#include "src/core/lib/iomgr/wakeup_fd_posix.h" +#include "src/core/lib/iomgr/iomgr_posix.h" #include "src/core/lib/support/env.h" typedef struct poll_args { @@ -102,7 +102,6 @@ void background_poll(void *args) { void test_many_fds(void) { int i; - grpc_wakeup_fd_global_init(); grpc_wakeup_fd fd[1000]; for (i = 0; i < 1000; i++) { GPR_ASSERT(grpc_wakeup_fd_init(&fd[i]) == GRPC_ERROR_NONE); @@ -110,7 +109,6 @@ void test_many_fds(void) { for (i = 0; i < 1000; i++) { grpc_wakeup_fd_destroy(&fd[i]); } - grpc_wakeup_fd_global_destroy(); } void test_poll_cv_trigger(void) { @@ -119,8 +117,6 @@ void test_poll_cv_trigger(void) { poll_args pargs; gpr_thd_id t_id; gpr_thd_options opt; - grpc_poll_function = &mock_poll; - grpc_wakeup_fd_global_init(); GPR_ASSERT(grpc_wakeup_fd_init(&cvfd1) == GRPC_ERROR_NONE); GPR_ASSERT(grpc_wakeup_fd_init(&cvfd2) == GRPC_ERROR_NONE); @@ -226,17 +222,22 @@ void test_poll_cv_trigger(void) { GPR_ASSERT(pfds[4].revents == 0); GPR_ASSERT(pfds[5].revents == 0); - grpc_wakeup_fd_global_destroy(); } int main(int argc, char **argv) { - gpr_setenv("GRPC_POLL_STRATEGY", "poll"); - grpc_allow_specialized_wakeup_fd = 0; - grpc_allow_pipe_wakeup_fd = 0; + gpr_setenv("GRPC_POLL_STRATEGY", "poll-cv"); + grpc_poll_function = &mock_poll; gpr_mu_init(&poll_mu); gpr_cv_init(&poll_cv); + + grpc_iomgr_platform_init(); test_many_fds(); + grpc_iomgr_platform_shutdown(); + + grpc_iomgr_platform_init(); test_poll_cv_trigger(); + grpc_iomgr_platform_shutdown(); + // Make sure detached polling threads have chance // to exit and clean up memory. pthread_exit() causes tsan/msan // issues, so we just wait an ample amount of time diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 6e08a1977c..2ea042a4f4 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -894,6 +894,7 @@ src/core/lib/iomgr/endpoint_pair.h \ src/core/lib/iomgr/error.h \ src/core/lib/iomgr/ev_epoll_linux.h \ src/core/lib/iomgr/ev_poll_and_epoll_posix.h \ +src/core/lib/iomgr/ev_poll_cv_posix.h \ src/core/lib/iomgr/ev_poll_posix.h \ src/core/lib/iomgr/ev_posix.h \ src/core/lib/iomgr/exec_ctx.h \ @@ -1008,6 +1009,7 @@ src/core/lib/iomgr/endpoint_pair_windows.c \ src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ +src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 2328194c3a..a59ff043b1 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -811,6 +811,7 @@ src/core/lib/iomgr/endpoint_pair.h \ src/core/lib/iomgr/error.h \ src/core/lib/iomgr/ev_epoll_linux.h \ src/core/lib/iomgr/ev_poll_and_epoll_posix.h \ +src/core/lib/iomgr/ev_poll_cv_posix.h \ src/core/lib/iomgr/ev_poll_posix.h \ src/core/lib/iomgr/ev_posix.h \ src/core/lib/iomgr/exec_ctx.h \ @@ -972,6 +973,7 @@ src/core/lib/iomgr/endpoint_pair_windows.c \ src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ +src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 146018ba3a..7d8a18d8fe 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -69,7 +69,7 @@ _FORCE_ENVIRON_FOR_WRAPPERS = { _POLLING_STRATEGIES = { - 'linux': ['epoll', 'poll', 'legacy'] + 'linux': ['epoll', 'poll', 'poll-cv', 'legacy'] } diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 2ce0e3b05c..f6abb4641f 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -5926,6 +5926,7 @@ "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", + "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", "src/core/lib/iomgr/exec_ctx.h", @@ -6039,6 +6040,8 @@ "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.c", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", + "src/core/lib/iomgr/ev_poll_cv_posix.c", + "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.c", diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj b/vsprojects/vcxproj/grpc++/grpc++.vcxproj index 2afda35d34..eb830eb45d 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj @@ -394,6 +394,7 @@ + @@ -564,6 +565,8 @@ + + diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters index 1f88ae0e93..acee5ed3c9 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters @@ -163,6 +163,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr @@ -779,6 +782,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj index be20aef6dc..fbdc6c3bdf 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj @@ -390,6 +390,7 @@ + @@ -550,6 +551,8 @@ + + diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters index 628173dbbe..600e6475a4 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters @@ -148,6 +148,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr @@ -752,6 +755,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index ae24e8f066..3ae2ade070 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -320,6 +320,7 @@ + @@ -505,6 +506,8 @@ + + diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index a849795dc9..2a401e1695 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -67,6 +67,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr @@ -731,6 +734,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj index 3c3fae370f..e54b369ec6 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj @@ -211,6 +211,7 @@ + @@ -349,6 +350,8 @@ + + diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters index b81431cafa..f391f29729 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters @@ -115,6 +115,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr @@ -512,6 +515,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 69d631e452..3eaae7fe18 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -310,6 +310,7 @@ + @@ -473,6 +474,8 @@ + + diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 36e80b4a13..31ba6e97bf 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -70,6 +70,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr @@ -641,6 +644,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr -- cgit v1.2.3 From bc544be002b864e808cbca54ea64fa9b68262302 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Thu, 6 Oct 2016 19:23:47 -0700 Subject: Fix platform detection --- src/core/lib/iomgr/ev_epoll_linux.c | 4 ++++ src/core/lib/iomgr/ev_poll_posix.c | 3 +++ src/core/lib/iomgr/ev_posix.c | 1 + src/core/lib/iomgr/wakeup_fd_pipe.c | 3 +-- 4 files changed, 9 insertions(+), 2 deletions(-) (limited to 'src/core/lib/iomgr/ev_posix.c') diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index ab77ebc78b..249bc98735 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -1892,6 +1892,10 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; } + if (!grpc_has_wakeup_fd) { + return NULL; + } + if (!is_epoll_available()) { return NULL; } diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 16a5e3083e..97e71d968e 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -1277,6 +1277,9 @@ static const grpc_event_engine_vtable vtable = { }; const grpc_event_engine_vtable *grpc_init_poll_posix(void) { + if (!grpc_has_wakeup_fd) { + return NULL; + } if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) { return NULL; } diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 2fc8ccfa91..a4102b429f 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -65,6 +65,7 @@ typedef struct { } event_engine_factory; static const event_engine_factory g_factories[] = { + {"poll-cv", grpc_init_poll_cv_posix}, {"epoll", grpc_init_epoll_linux}, {"poll", grpc_init_poll_posix}, {"poll-cv", grpc_init_poll_cv_posix}, diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.c index 3dc94c94ba..d0ea216aa0 100644 --- a/src/core/lib/iomgr/wakeup_fd_pipe.c +++ b/src/core/lib/iomgr/wakeup_fd_pipe.c @@ -47,11 +47,10 @@ static grpc_error* pipe_init(grpc_wakeup_fd* fd_info) { int pipefd[2]; - /* TODO(klempner): Make this nonfatal */ int r = pipe(pipefd); if (0 != r) { gpr_log(GPR_ERROR, "pipe creation failed (%d): %s", errno, strerror(errno)); - abort(); + return GRPC_OS_ERROR(errno, "pipe"); } grpc_error* err; err = grpc_set_socket_nonblocking(pipefd[0], 1); -- cgit v1.2.3 From 131a1065fb23d9fd4be60361dffb9f8a0042f0cc Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Fri, 7 Oct 2016 14:03:30 -0700 Subject: Remove test-forcing hack --- src/core/lib/iomgr/ev_posix.c | 1 - src/core/lib/iomgr/wakeup_fd_posix.c | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) (limited to 'src/core/lib/iomgr/ev_posix.c') diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index a4102b429f..2fc8ccfa91 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -65,7 +65,6 @@ typedef struct { } event_engine_factory; static const event_engine_factory g_factories[] = { - {"poll-cv", grpc_init_poll_cv_posix}, {"epoll", grpc_init_epoll_linux}, {"poll", grpc_init_poll_posix}, {"poll-cv", grpc_init_poll_cv_posix}, diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.c index d8eafc4192..041c221de3 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.c +++ b/src/core/lib/iomgr/wakeup_fd_posix.c @@ -33,7 +33,7 @@ #include -#ifdef GPR_POSIX_SOCKET +#ifdef GPR_POSIX_WAKEUP_FD #include #include "src/core/lib/iomgr/wakeup_fd_cv.h" -- cgit v1.2.3 From 82e4ec741ba3e564d1223f0f20355a6e27e70136 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Thu, 13 Oct 2016 12:26:01 -0700 Subject: Moved cv polling into ev_poll_posix.c --- BUILD | 8 - CMakeLists.txt | 3 - Makefile | 4 - binding.gyp | 1 - build.yaml | 2 - config.m4 | 1 - gRPC-Core.podspec | 3 - grpc.gemspec | 2 - package.xml | 2 - src/core/lib/iomgr/ev_poll_cv_posix.c | 288 --------------------- src/core/lib/iomgr/ev_poll_cv_posix.h | 68 ----- src/core/lib/iomgr/ev_poll_posix.c | 237 ++++++++++++++++- src/core/lib/iomgr/ev_poll_posix.h | 1 + src/core/lib/iomgr/ev_posix.c | 1 - src/core/lib/iomgr/wakeup_fd_cv.c | 4 +- src/core/lib/iomgr/wakeup_fd_cv.h | 29 ++- src/core/lib/iomgr/wakeup_fd_posix.c | 26 +- src/core/lib/iomgr/wakeup_fd_posix.h | 2 +- src/python/grpcio/grpc_core_dependencies.py | 1 - tools/doxygen/Doxyfile.core.internal | 2 - tools/run_tests/sources_and_headers.json | 3 - vsprojects/vcxproj/grpc/grpc.vcxproj | 3 - vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 6 - .../vcxproj/grpc_test_util/grpc_test_util.vcxproj | 3 - .../grpc_test_util/grpc_test_util.vcxproj.filters | 6 - .../vcxproj/grpc_unsecure/grpc_unsecure.vcxproj | 3 - .../grpc_unsecure/grpc_unsecure.vcxproj.filters | 6 - 27 files changed, 280 insertions(+), 435 deletions(-) delete mode 100644 src/core/lib/iomgr/ev_poll_cv_posix.c delete mode 100644 src/core/lib/iomgr/ev_poll_cv_posix.h (limited to 'src/core/lib/iomgr/ev_posix.c') diff --git a/BUILD b/BUILD index ba44367dbb..72cb046a60 100644 --- a/BUILD +++ b/BUILD @@ -186,7 +186,6 @@ cc_library( "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", - "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", "src/core/lib/iomgr/exec_ctx.h", @@ -349,7 +348,6 @@ cc_library( "src/core/lib/iomgr/error.c", "src/core/lib/iomgr/ev_epoll_linux.c", "src/core/lib/iomgr/ev_poll_and_epoll_posix.c", - "src/core/lib/iomgr/ev_poll_cv_posix.c", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_posix.c", "src/core/lib/iomgr/exec_ctx.c", @@ -591,7 +589,6 @@ cc_library( "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", - "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", "src/core/lib/iomgr/exec_ctx.h", @@ -739,7 +736,6 @@ cc_library( "src/core/lib/iomgr/error.c", "src/core/lib/iomgr/ev_epoll_linux.c", "src/core/lib/iomgr/ev_poll_and_epoll_posix.c", - "src/core/lib/iomgr/ev_poll_cv_posix.c", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_posix.c", "src/core/lib/iomgr/exec_ctx.c", @@ -951,7 +947,6 @@ cc_library( "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", - "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", "src/core/lib/iomgr/exec_ctx.h", @@ -1091,7 +1086,6 @@ cc_library( "src/core/lib/iomgr/error.c", "src/core/lib/iomgr/ev_epoll_linux.c", "src/core/lib/iomgr/ev_poll_and_epoll_posix.c", - "src/core/lib/iomgr/ev_poll_cv_posix.c", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_posix.c", "src/core/lib/iomgr/exec_ctx.c", @@ -1856,7 +1850,6 @@ objc_library( "src/core/lib/iomgr/error.c", "src/core/lib/iomgr/ev_epoll_linux.c", "src/core/lib/iomgr/ev_poll_and_epoll_posix.c", - "src/core/lib/iomgr/ev_poll_cv_posix.c", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_posix.c", "src/core/lib/iomgr/exec_ctx.c", @@ -2077,7 +2070,6 @@ objc_library( "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", - "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", "src/core/lib/iomgr/exec_ctx.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 747d1fab4d..9ba5135593 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -314,7 +314,6 @@ add_library(grpc src/core/lib/iomgr/error.c src/core/lib/iomgr/ev_epoll_linux.c src/core/lib/iomgr/ev_poll_and_epoll_posix.c - src/core/lib/iomgr/ev_poll_cv_posix.c src/core/lib/iomgr/ev_poll_posix.c src/core/lib/iomgr/ev_posix.c src/core/lib/iomgr/exec_ctx.c @@ -575,7 +574,6 @@ add_library(grpc_cronet src/core/lib/iomgr/error.c src/core/lib/iomgr/ev_epoll_linux.c src/core/lib/iomgr/ev_poll_and_epoll_posix.c - src/core/lib/iomgr/ev_poll_cv_posix.c src/core/lib/iomgr/ev_poll_posix.c src/core/lib/iomgr/ev_posix.c src/core/lib/iomgr/exec_ctx.c @@ -808,7 +806,6 @@ add_library(grpc_unsecure src/core/lib/iomgr/error.c src/core/lib/iomgr/ev_epoll_linux.c src/core/lib/iomgr/ev_poll_and_epoll_posix.c - src/core/lib/iomgr/ev_poll_cv_posix.c src/core/lib/iomgr/ev_poll_posix.c src/core/lib/iomgr/ev_posix.c src/core/lib/iomgr/exec_ctx.c diff --git a/Makefile b/Makefile index 1f4c9b5c32..65d715c67c 100644 --- a/Makefile +++ b/Makefile @@ -2562,7 +2562,6 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ - src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ @@ -2841,7 +2840,6 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ - src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ @@ -3110,7 +3108,6 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ - src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ @@ -3306,7 +3303,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ - src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ diff --git a/binding.gyp b/binding.gyp index 85070e5cac..6dfae41860 100644 --- a/binding.gyp +++ b/binding.gyp @@ -589,7 +589,6 @@ 'src/core/lib/iomgr/error.c', 'src/core/lib/iomgr/ev_epoll_linux.c', 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c', - 'src/core/lib/iomgr/ev_poll_cv_posix.c', 'src/core/lib/iomgr/ev_poll_posix.c', 'src/core/lib/iomgr/ev_posix.c', 'src/core/lib/iomgr/exec_ctx.c', diff --git a/build.yaml b/build.yaml index 3e64ce8f37..93ebf44ef8 100644 --- a/build.yaml +++ b/build.yaml @@ -190,7 +190,6 @@ filegroups: - src/core/lib/iomgr/error.h - src/core/lib/iomgr/ev_epoll_linux.h - src/core/lib/iomgr/ev_poll_and_epoll_posix.h - - src/core/lib/iomgr/ev_poll_cv_posix.h - src/core/lib/iomgr/ev_poll_posix.h - src/core/lib/iomgr/ev_posix.h - src/core/lib/iomgr/exec_ctx.h @@ -276,7 +275,6 @@ filegroups: - src/core/lib/iomgr/error.c - src/core/lib/iomgr/ev_epoll_linux.c - src/core/lib/iomgr/ev_poll_and_epoll_posix.c - - src/core/lib/iomgr/ev_poll_cv_posix.c - src/core/lib/iomgr/ev_poll_posix.c - src/core/lib/iomgr/ev_posix.c - src/core/lib/iomgr/exec_ctx.c diff --git a/config.m4 b/config.m4 index 1452798897..347686bd11 100644 --- a/config.m4 +++ b/config.m4 @@ -108,7 +108,6 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ - src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 3ac0b841cf..6becf4c608 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -273,7 +273,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/error.h', 'src/core/lib/iomgr/ev_epoll_linux.h', 'src/core/lib/iomgr/ev_poll_and_epoll_posix.h', - 'src/core/lib/iomgr/ev_poll_cv_posix.h', 'src/core/lib/iomgr/ev_poll_posix.h', 'src/core/lib/iomgr/ev_posix.h', 'src/core/lib/iomgr/exec_ctx.h', @@ -440,7 +439,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/error.c', 'src/core/lib/iomgr/ev_epoll_linux.c', 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c', - 'src/core/lib/iomgr/ev_poll_cv_posix.c', 'src/core/lib/iomgr/ev_poll_posix.c', 'src/core/lib/iomgr/ev_posix.c', 'src/core/lib/iomgr/exec_ctx.c', @@ -650,7 +648,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/error.h', 'src/core/lib/iomgr/ev_epoll_linux.h', 'src/core/lib/iomgr/ev_poll_and_epoll_posix.h', - 'src/core/lib/iomgr/ev_poll_cv_posix.h', 'src/core/lib/iomgr/ev_poll_posix.h', 'src/core/lib/iomgr/ev_posix.h', 'src/core/lib/iomgr/exec_ctx.h', diff --git a/grpc.gemspec b/grpc.gemspec index 85135a5327..6a898b85ff 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -193,7 +193,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/error.h ) s.files += %w( src/core/lib/iomgr/ev_epoll_linux.h ) s.files += %w( src/core/lib/iomgr/ev_poll_and_epoll_posix.h ) - s.files += %w( src/core/lib/iomgr/ev_poll_cv_posix.h ) s.files += %w( src/core/lib/iomgr/ev_poll_posix.h ) s.files += %w( src/core/lib/iomgr/ev_posix.h ) s.files += %w( src/core/lib/iomgr/exec_ctx.h ) @@ -360,7 +359,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/error.c ) s.files += %w( src/core/lib/iomgr/ev_epoll_linux.c ) s.files += %w( src/core/lib/iomgr/ev_poll_and_epoll_posix.c ) - s.files += %w( src/core/lib/iomgr/ev_poll_cv_posix.c ) s.files += %w( src/core/lib/iomgr/ev_poll_posix.c ) s.files += %w( src/core/lib/iomgr/ev_posix.c ) s.files += %w( src/core/lib/iomgr/exec_ctx.c ) diff --git a/package.xml b/package.xml index 4631fc1c39..1f9846a516 100644 --- a/package.xml +++ b/package.xml @@ -200,7 +200,6 @@ - @@ -367,7 +366,6 @@ - diff --git a/src/core/lib/iomgr/ev_poll_cv_posix.c b/src/core/lib/iomgr/ev_poll_cv_posix.c deleted file mode 100644 index ed4c9e9143..0000000000 --- a/src/core/lib/iomgr/ev_poll_cv_posix.c +++ /dev/null @@ -1,288 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include - -#ifdef GPR_POSIX_SOCKET - -#include "src/core/lib/iomgr/ev_poll_cv_posix.h" - -#include -#include - -#include -#include -#include -#include -#include -#include - -#include "src/core/lib/iomgr/ev_poll_posix.h" -#include "src/core/lib/iomgr/wakeup_fd_posix.h" - -#define POLL_PERIOD_MS 1000 -#define DEFAULT_TABLE_SIZE 16 - -typedef enum status_t { INPROGRESS, COMPLETED, CANCELLED } status_t; - -typedef struct poll_args { - gpr_refcount refcount; - gpr_cv* cv; - struct pollfd* fds; - nfds_t nfds; - int timeout; - int retval; - int err; - status_t status; -} poll_args; - -cv_fd_table g_cvfds; - -static void decref_poll_args(poll_args* args) { - if (gpr_unref(&args->refcount)) { - gpr_free(args->fds); - gpr_cv_destroy(args->cv); - gpr_free(args->cv); - gpr_free(args); - } -} - -// Poll in a background thread -static void run_poll(void* arg) { - int timeout, retval; - poll_args* pargs = (poll_args*)arg; - while (pargs->status == INPROGRESS) { - if (pargs->timeout < 0) { - timeout = POLL_PERIOD_MS; - } else { - timeout = GPR_MIN(POLL_PERIOD_MS, pargs->timeout); - pargs->timeout -= timeout; - } - retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout); - if (retval != 0 || pargs->timeout == 0) { - pargs->retval = retval; - pargs->err = errno; - break; - } - } - gpr_mu_lock(&g_cvfds.mu); - if (pargs->status == INPROGRESS) { - // Signal main thread that the poll completed - pargs->status = COMPLETED; - gpr_cv_signal(pargs->cv); - } - decref_poll_args(pargs); - g_cvfds.pollcount--; - if (g_cvfds.shutdown && g_cvfds.pollcount == 0) { - gpr_cv_signal(&g_cvfds.shutdown_complete); - } - gpr_mu_unlock(&g_cvfds.mu); -} - -// This function overrides poll() to handle condition variable wakeup fds -static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { - unsigned int i; - int res, idx; - gpr_cv* pollcv; - cv_node *cvn, *prev; - nfds_t nsockfds = 0; - gpr_thd_id t_id; - gpr_thd_options opt; - poll_args* pargs = NULL; - gpr_mu_lock(&g_cvfds.mu); - pollcv = gpr_malloc(sizeof(gpr_cv)); - gpr_cv_init(pollcv); - for (i = 0; i < nfds; i++) { - fds[i].revents = 0; - if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { - idx = FD_TO_IDX(fds[i].fd); - cvn = gpr_malloc(sizeof(cv_node)); - cvn->cv = pollcv; - cvn->next = g_cvfds.cvfds[idx].cvs; - g_cvfds.cvfds[idx].cvs = cvn; - // We should return immediately if there are pending events, - // but we still need to call poll() to check for socket events - if (g_cvfds.cvfds[idx].is_set) { - timeout = 0; - } - } else if (fds[i].fd >= 0) { - nsockfds++; - } - } - - if (nsockfds > 0) { - pargs = gpr_malloc(sizeof(struct poll_args)); - // Both the main thread and calling thread get a reference - gpr_ref_init(&pargs->refcount, 2); - pargs->cv = pollcv; - pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds); - pargs->nfds = nsockfds; - pargs->timeout = timeout; - pargs->retval = 0; - pargs->err = 0; - pargs->status = INPROGRESS; - idx = 0; - for (i = 0; i < nfds; i++) { - if (fds[i].fd >= 0) { - pargs->fds[idx].fd = fds[i].fd; - pargs->fds[idx].events = fds[i].events; - pargs->fds[idx].revents = 0; - idx++; - } - } - g_cvfds.pollcount++; - opt = gpr_thd_options_default(); - gpr_thd_options_set_detached(&opt); - gpr_thd_new(&t_id, &run_poll, pargs, &opt); - // We want the poll() thread to trigger the deadline, so wait forever here - gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); - if (pargs->status == COMPLETED) { - res = pargs->retval; - errno = pargs->err; - } else { - res = 0; - errno = 0; - pargs->status = CANCELLED; - } - } else { - gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); - deadline = - gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); - gpr_cv_wait(pollcv, &g_cvfds.mu, deadline); - res = 0; - } - - idx = 0; - for (i = 0; i < nfds; i++) { - if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { - cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs; - prev = NULL; - while (cvn->cv != pollcv) { - prev = cvn; - cvn = cvn->next; - GPR_ASSERT(cvn); - } - if (!prev) { - g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next; - } else { - prev->next = cvn->next; - } - gpr_free(cvn); - - if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) { - fds[i].revents = POLLIN; - if (res >= 0) res++; - } - } else if (fds[i].fd >= 0 && pargs->status == COMPLETED) { - fds[i].revents = pargs->fds[idx].revents; - idx++; - } - } - - if (pargs) { - decref_poll_args(pargs); - } else { - gpr_cv_destroy(pollcv); - gpr_free(pollcv); - } - gpr_mu_unlock(&g_cvfds.mu); - - return res; -} - -static void grpc_global_cv_fd_table_init() { - gpr_mu_init(&g_cvfds.mu); - gpr_mu_lock(&g_cvfds.mu); - gpr_cv_init(&g_cvfds.shutdown_complete); - g_cvfds.shutdown = 0; - g_cvfds.pollcount = 0; - g_cvfds.size = DEFAULT_TABLE_SIZE; - g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * DEFAULT_TABLE_SIZE); - g_cvfds.free_fds = NULL; - for (int i = 0; i < DEFAULT_TABLE_SIZE; i++) { - g_cvfds.cvfds[i].is_set = 0; - g_cvfds.cvfds[i].cvs = NULL; - g_cvfds.cvfds[i].next_free = g_cvfds.free_fds; - g_cvfds.free_fds = &g_cvfds.cvfds[i]; - } - // Override the poll function with one that supports cvfds - g_cvfds.poll = grpc_poll_function; - grpc_poll_function = &cvfd_poll; - gpr_mu_unlock(&g_cvfds.mu); -} - -static void grpc_global_cv_fd_table_shutdown() { - gpr_mu_lock(&g_cvfds.mu); - g_cvfds.shutdown = 1; - // Attempt to wait for all abandoned poll() threads to terminate - // Not doing so will result in reported memory leaks - if (g_cvfds.pollcount > 0) { - int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(3, GPR_TIMESPAN))); - GPR_ASSERT(res == 0); - } - gpr_cv_destroy(&g_cvfds.shutdown_complete); - grpc_poll_function = g_cvfds.poll; - gpr_free(g_cvfds.cvfds); - gpr_mu_unlock(&g_cvfds.mu); - gpr_mu_destroy(&g_cvfds.mu); -} - -/******************************************************************************* - * event engine binding - */ - -static const grpc_event_engine_vtable* ev_poll_vtable; -static grpc_event_engine_vtable vtable; - -static void shutdown_engine(void) { - ev_poll_vtable->shutdown_engine(); - grpc_global_cv_fd_table_shutdown(); -} - -const grpc_event_engine_vtable* grpc_init_poll_cv_posix(void) { - grpc_global_cv_fd_table_init(); - grpc_enable_cv_wakeup_fds(1); - ev_poll_vtable = grpc_init_poll_posix(); - if (!ev_poll_vtable) { - grpc_global_cv_fd_table_shutdown(); - grpc_enable_cv_wakeup_fds(0); - return NULL; - } - vtable = *ev_poll_vtable; - vtable.shutdown_engine = shutdown_engine; - return &vtable; -} - -#endif /* GPR_POSIX_SOCKET */ diff --git a/src/core/lib/iomgr/ev_poll_cv_posix.h b/src/core/lib/iomgr/ev_poll_cv_posix.h deleted file mode 100644 index 885711d1c5..0000000000 --- a/src/core/lib/iomgr/ev_poll_cv_posix.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_EV_POLL_CV_POSIX_H -#define GRPC_CORE_LIB_IOMGR_EV_POLL_CV_POSIX_H - -#include - -#include "src/core/lib/iomgr/ev_posix.h" - -#define FD_TO_IDX(fd) (-(fd)-1) -#define IDX_TO_FD(idx) (-(idx)-1) - -typedef struct cv_node { - gpr_cv* cv; - struct cv_node* next; -} cv_node; - -typedef struct fd_node { - int is_set; - cv_node* cvs; - struct fd_node* next_free; -} fd_node; - -typedef struct cv_fd_table { - gpr_mu mu; - int pollcount; - int shutdown; - gpr_cv shutdown_complete; - fd_node* cvfds; - fd_node* free_fds; - unsigned int size; - grpc_poll_function_type poll; -} cv_fd_table; - -const grpc_event_engine_vtable* grpc_init_poll_cv_posix(void); - -#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_CV_POSIX_H */ diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 0dca51bc78..d9e5255ddc 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -47,10 +47,12 @@ #include #include #include +#include #include #include #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/wakeup_fd_cv.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" @@ -245,6 +247,28 @@ struct grpc_pollset_set { grpc_fd **fds; }; +/******************************************************************************* + * condition variable polling definitions + */ + +#define CV_POLL_PERIOD_MS 1000 +#define CV_DEFAULT_TABLE_SIZE 16 + +typedef enum status_t { INPROGRESS, COMPLETED, CANCELLED } status_t; + +typedef struct poll_args { + gpr_refcount refcount; + gpr_cv *cv; + struct pollfd *fds; + nfds_t nfds; + int timeout; + int retval; + int err; + status_t status; +} poll_args; + +cv_fd_table g_cvfds; + /******************************************************************************* * fd_posix.c */ @@ -1235,11 +1259,211 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&pollset_set->mu); } +/******************************************************************************* + * Condition Variable polling extensions + */ + +static void decref_poll_args(poll_args *args) { + if (gpr_unref(&args->refcount)) { + gpr_free(args->fds); + gpr_cv_destroy(args->cv); + gpr_free(args->cv); + gpr_free(args); + } +} + +// Poll in a background thread +static void run_poll(void *arg) { + int timeout, retval; + poll_args *pargs = (poll_args *)arg; + while (pargs->status == INPROGRESS) { + if (pargs->timeout < 0) { + timeout = CV_POLL_PERIOD_MS; + } else { + timeout = GPR_MIN(CV_POLL_PERIOD_MS, pargs->timeout); + pargs->timeout -= timeout; + } + retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout); + if (retval != 0 || pargs->timeout == 0) { + pargs->retval = retval; + pargs->err = errno; + break; + } + } + gpr_mu_lock(&g_cvfds.mu); + if (pargs->status == INPROGRESS) { + // Signal main thread that the poll completed + pargs->status = COMPLETED; + gpr_cv_signal(pargs->cv); + } + decref_poll_args(pargs); + g_cvfds.pollcount--; + if (g_cvfds.shutdown && g_cvfds.pollcount == 0) { + gpr_cv_signal(&g_cvfds.shutdown_complete); + } + gpr_mu_unlock(&g_cvfds.mu); +} + +// This function overrides poll() to handle condition variable wakeup fds +static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { + unsigned int i; + int res, idx; + gpr_cv *pollcv; + cv_node *cvn, *prev; + nfds_t nsockfds = 0; + gpr_thd_id t_id; + gpr_thd_options opt; + poll_args *pargs = NULL; + gpr_mu_lock(&g_cvfds.mu); + pollcv = gpr_malloc(sizeof(gpr_cv)); + gpr_cv_init(pollcv); + for (i = 0; i < nfds; i++) { + fds[i].revents = 0; + if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { + idx = FD_TO_IDX(fds[i].fd); + cvn = gpr_malloc(sizeof(cv_node)); + cvn->cv = pollcv; + cvn->next = g_cvfds.cvfds[idx].cvs; + g_cvfds.cvfds[idx].cvs = cvn; + // We should return immediately if there are pending events, + // but we still need to call poll() to check for socket events + if (g_cvfds.cvfds[idx].is_set) { + timeout = 0; + } + } else if (fds[i].fd >= 0) { + nsockfds++; + } + } + + if (nsockfds > 0) { + pargs = gpr_malloc(sizeof(struct poll_args)); + // Both the main thread and calling thread get a reference + gpr_ref_init(&pargs->refcount, 2); + pargs->cv = pollcv; + pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds); + pargs->nfds = nsockfds; + pargs->timeout = timeout; + pargs->retval = 0; + pargs->err = 0; + pargs->status = INPROGRESS; + idx = 0; + for (i = 0; i < nfds; i++) { + if (fds[i].fd >= 0) { + pargs->fds[idx].fd = fds[i].fd; + pargs->fds[idx].events = fds[i].events; + pargs->fds[idx].revents = 0; + idx++; + } + } + g_cvfds.pollcount++; + opt = gpr_thd_options_default(); + gpr_thd_options_set_detached(&opt); + gpr_thd_new(&t_id, &run_poll, pargs, &opt); + // We want the poll() thread to trigger the deadline, so wait forever here + gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + if (pargs->status == COMPLETED) { + res = pargs->retval; + errno = pargs->err; + } else { + res = 0; + errno = 0; + pargs->status = CANCELLED; + } + } else { + gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); + deadline = + gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); + gpr_cv_wait(pollcv, &g_cvfds.mu, deadline); + res = 0; + } + + idx = 0; + for (i = 0; i < nfds; i++) { + if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { + cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs; + prev = NULL; + while (cvn->cv != pollcv) { + prev = cvn; + cvn = cvn->next; + GPR_ASSERT(cvn); + } + if (!prev) { + g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next; + } else { + prev->next = cvn->next; + } + gpr_free(cvn); + + if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) { + fds[i].revents = POLLIN; + if (res >= 0) res++; + } + } else if (fds[i].fd >= 0 && pargs->status == COMPLETED) { + fds[i].revents = pargs->fds[idx].revents; + idx++; + } + } + + if (pargs) { + decref_poll_args(pargs); + } else { + gpr_cv_destroy(pollcv); + gpr_free(pollcv); + } + gpr_mu_unlock(&g_cvfds.mu); + + return res; +} + +static void global_cv_fd_table_init() { + gpr_mu_init(&g_cvfds.mu); + gpr_mu_lock(&g_cvfds.mu); + gpr_cv_init(&g_cvfds.shutdown_complete); + g_cvfds.shutdown = 0; + g_cvfds.pollcount = 0; + g_cvfds.size = CV_DEFAULT_TABLE_SIZE; + g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE); + g_cvfds.free_fds = NULL; + for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) { + g_cvfds.cvfds[i].is_set = 0; + g_cvfds.cvfds[i].cvs = NULL; + g_cvfds.cvfds[i].next_free = g_cvfds.free_fds; + g_cvfds.free_fds = &g_cvfds.cvfds[i]; + } + // Override the poll function with one that supports cvfds + g_cvfds.poll = grpc_poll_function; + grpc_poll_function = &cvfd_poll; + gpr_mu_unlock(&g_cvfds.mu); +} + +static void global_cv_fd_table_shutdown() { + gpr_mu_lock(&g_cvfds.mu); + g_cvfds.shutdown = 1; + // Attempt to wait for all abandoned poll() threads to terminate + // Not doing so will result in reported memory leaks + if (g_cvfds.pollcount > 0) { + int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(3, GPR_TIMESPAN))); + GPR_ASSERT(res == 0); + } + gpr_cv_destroy(&g_cvfds.shutdown_complete); + grpc_poll_function = g_cvfds.poll; + gpr_free(g_cvfds.cvfds); + gpr_mu_unlock(&g_cvfds.mu); + gpr_mu_destroy(&g_cvfds.mu); +} + /******************************************************************************* * event engine binding */ -static void shutdown_engine(void) { pollset_global_shutdown(); } +static void shutdown_engine(void) { + pollset_global_shutdown(); + if (grpc_cv_wakeup_fds_enabled()) { + global_cv_fd_table_shutdown(); + } +} static const grpc_event_engine_vtable vtable = { .pollset_size = sizeof(grpc_pollset), @@ -1286,4 +1510,15 @@ const grpc_event_engine_vtable *grpc_init_poll_posix(void) { return &vtable; } +const grpc_event_engine_vtable *grpc_init_poll_cv_posix(void) { + global_cv_fd_table_init(); + grpc_enable_cv_wakeup_fds(1); + if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) { + global_cv_fd_table_shutdown(); + grpc_enable_cv_wakeup_fds(0); + return NULL; + } + return &vtable; +} + #endif diff --git a/src/core/lib/iomgr/ev_poll_posix.h b/src/core/lib/iomgr/ev_poll_posix.h index 291736a2db..202ffca14c 100644 --- a/src/core/lib/iomgr/ev_poll_posix.h +++ b/src/core/lib/iomgr/ev_poll_posix.h @@ -37,5 +37,6 @@ #include "src/core/lib/iomgr/ev_posix.h" const grpc_event_engine_vtable *grpc_init_poll_posix(void); +const grpc_event_engine_vtable *grpc_init_poll_cv_posix(void); #endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */ diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 2fc8ccfa91..0637f80421 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -46,7 +46,6 @@ #include "src/core/lib/iomgr/ev_epoll_linux.h" #include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h" -#include "src/core/lib/iomgr/ev_poll_cv_posix.h" #include "src/core/lib/iomgr/ev_poll_posix.h" #include "src/core/lib/support/env.h" diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.c index 651e2f663d..b4165208ed 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.c +++ b/src/core/lib/iomgr/wakeup_fd_cv.c @@ -35,6 +35,8 @@ #ifdef GPR_POSIX_WAKEUP_FD +#include "src/core/lib/iomgr/wakeup_fd_cv.h" + #include #include @@ -45,8 +47,6 @@ #include #include -#include "src/core/lib/iomgr/ev_poll_cv_posix.h" - #define MAX_TABLE_RESIZE 256 extern cv_fd_table g_cvfds; diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index e57fc28363..ac16be1750 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -48,8 +48,33 @@ #ifndef GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H #define GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H -#include "src/core/lib/iomgr/wakeup_fd_posix.h" +#include -extern grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable; +#include "src/core/lib/iomgr/ev_posix.h" + +#define FD_TO_IDX(fd) (-(fd)-1) +#define IDX_TO_FD(idx) (-(idx)-1) + +typedef struct cv_node { + gpr_cv* cv; + struct cv_node* next; +} cv_node; + +typedef struct fd_node { + int is_set; + cv_node* cvs; + struct fd_node* next_free; +} fd_node; + +typedef struct cv_fd_table { + gpr_mu mu; + int pollcount; + int shutdown; + gpr_cv shutdown_complete; + fd_node* cvfds; + fd_node* free_fds; + unsigned int size; + grpc_poll_function_type poll; +} cv_fd_table; #endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H */ diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.c index f75ae78c22..5c894bef37 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.c +++ b/src/core/lib/iomgr/wakeup_fd_posix.c @@ -40,12 +40,14 @@ #include "src/core/lib/iomgr/wakeup_fd_pipe.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" +extern grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable; static const grpc_wakeup_fd_vtable *wakeup_fd_vtable = NULL; int grpc_allow_specialized_wakeup_fd = 1; int grpc_allow_pipe_wakeup_fd = 1; -int grpc_has_real_wakeup_fd = 1; -int grpc_cv_wakeup_fds_enabled = 0; + +int has_real_wakeup_fd = 1; +int cv_wakeup_fds_enabled = 0; void grpc_wakeup_fd_global_init(void) { if (grpc_allow_specialized_wakeup_fd && @@ -55,43 +57,41 @@ void grpc_wakeup_fd_global_init(void) { grpc_pipe_wakeup_fd_vtable.check_availability()) { wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable; } else { - grpc_has_real_wakeup_fd = 0; + has_real_wakeup_fd = 0; } } void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; } -int grpc_has_wakeup_fd(void) { - return grpc_has_real_wakeup_fd || grpc_cv_wakeup_fds_enabled; -} +int grpc_has_wakeup_fd(void) { return has_real_wakeup_fd; } -void grpc_enable_cv_wakeup_fds(int enable) { - grpc_cv_wakeup_fds_enabled = enable; -} +int grpc_cv_wakeup_fds_enabled(void) { return cv_wakeup_fds_enabled; } + +void grpc_enable_cv_wakeup_fds(int enable) { cv_wakeup_fds_enabled = enable; } grpc_error *grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) { - if (grpc_cv_wakeup_fds_enabled) { + if (cv_wakeup_fds_enabled) { return grpc_cv_wakeup_fd_vtable.init(fd_info); } return wakeup_fd_vtable->init(fd_info); } grpc_error *grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) { - if (grpc_cv_wakeup_fds_enabled) { + if (cv_wakeup_fds_enabled) { return grpc_cv_wakeup_fd_vtable.consume(fd_info); } return wakeup_fd_vtable->consume(fd_info); } grpc_error *grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) { - if (grpc_cv_wakeup_fds_enabled) { + if (cv_wakeup_fds_enabled) { return grpc_cv_wakeup_fd_vtable.wakeup(fd_info); } return wakeup_fd_vtable->wakeup(fd_info); } void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) { - if (grpc_cv_wakeup_fds_enabled) { + if (cv_wakeup_fds_enabled) { grpc_cv_wakeup_fd_vtable.destroy(fd_info); } else { wakeup_fd_vtable->destroy(fd_info); diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h index 243c452751..71d32d97ba 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.h +++ b/src/core/lib/iomgr/wakeup_fd_posix.h @@ -72,6 +72,7 @@ void grpc_wakeup_fd_global_destroy(void); void grpc_wakeup_fd_global_init_force_fallback(void); int grpc_has_wakeup_fd(void); +int grpc_cv_wakeup_fds_enabled(void); void grpc_enable_cv_wakeup_fds(int enable); typedef struct grpc_wakeup_fd grpc_wakeup_fd; @@ -92,7 +93,6 @@ struct grpc_wakeup_fd { extern int grpc_allow_specialized_wakeup_fd; extern int grpc_allow_pipe_wakeup_fd; -extern int grpc_has_real_wakeup_fd; #define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd) diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 62d9729e13..98974878fc 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -102,7 +102,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/error.c', 'src/core/lib/iomgr/ev_epoll_linux.c', 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c', - 'src/core/lib/iomgr/ev_poll_cv_posix.c', 'src/core/lib/iomgr/ev_poll_posix.c', 'src/core/lib/iomgr/ev_posix.c', 'src/core/lib/iomgr/exec_ctx.c', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 1ec177cdf3..0c92e270c8 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -810,7 +810,6 @@ src/core/lib/iomgr/endpoint_pair.h \ src/core/lib/iomgr/error.h \ src/core/lib/iomgr/ev_epoll_linux.h \ src/core/lib/iomgr/ev_poll_and_epoll_posix.h \ -src/core/lib/iomgr/ev_poll_cv_posix.h \ src/core/lib/iomgr/ev_poll_posix.h \ src/core/lib/iomgr/ev_posix.h \ src/core/lib/iomgr/exec_ctx.h \ @@ -977,7 +976,6 @@ src/core/lib/iomgr/endpoint_pair_windows.c \ src/core/lib/iomgr/error.c \ src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_poll_and_epoll_posix.c \ -src/core/lib/iomgr/ev_poll_cv_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ src/core/lib/iomgr/exec_ctx.c \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 35b21d4e20..c9ded1ff58 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -6391,7 +6391,6 @@ "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", - "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", "src/core/lib/iomgr/exec_ctx.h", @@ -6512,8 +6511,6 @@ "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_poll_and_epoll_posix.c", "src/core/lib/iomgr/ev_poll_and_epoll_posix.h", - "src/core/lib/iomgr/ev_poll_cv_posix.c", - "src/core/lib/iomgr/ev_poll_cv_posix.h", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.c", diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 7d5e0ee56f..4a62af0276 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -319,7 +319,6 @@ - @@ -513,8 +512,6 @@ - - diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 72bf449c3e..dacdcd7608 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -76,9 +76,6 @@ src\core\lib\iomgr - - src\core\lib\iomgr - src\core\lib\iomgr @@ -746,9 +743,6 @@ src\core\lib\iomgr - - src\core\lib\iomgr - src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj index 44cbe8bf0c..cb5963d0e8 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj @@ -212,7 +212,6 @@ - @@ -361,8 +360,6 @@ - - diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters index 57466022f4..362284339f 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters @@ -130,9 +130,6 @@ src\core\lib\iomgr - - src\core\lib\iomgr - src\core\lib\iomgr @@ -533,9 +530,6 @@ src\core\lib\iomgr - - src\core\lib\iomgr - src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index aa47cc800d..b7b72a05df 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -309,7 +309,6 @@ - @@ -481,8 +480,6 @@ - - diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 4706c90af2..ca32c0b4b4 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -79,9 +79,6 @@ src\core\lib\iomgr - - src\core\lib\iomgr - src\core\lib\iomgr @@ -656,9 +653,6 @@ src\core\lib\iomgr - - src\core\lib\iomgr - src\core\lib\iomgr -- cgit v1.2.3