From e48b1bc011e2b5783ce75f4122dfd5aba01f0d97 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 11 May 2016 15:17:22 -0700 Subject: Base changes. Create ev_epoll_posix.{c,h} files by making a copy of ev_poll_and_epoll.c file --- src/core/lib/iomgr/ev_posix.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/core/lib/iomgr/ev_posix.c') diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 95520b01d3..baa3b9856a 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -44,6 +44,7 @@ #include #include +#include "src/core/lib/iomgr/ev_epoll_posix.h" #include "src/core/lib/iomgr/ev_poll_posix.h" #include "src/core/lib/support/env.h" @@ -61,7 +62,7 @@ typedef struct { } event_engine_factory; static const event_engine_factory g_factories[] = { - {"poll", grpc_init_poll_posix}, + {"poll", grpc_init_poll_posix}, {"epoll", grpc_init_epoll_posix}, }; static void add(const char *beg, const char *end, char ***ss, size_t *ns) { -- cgit v1.2.3 From f448c34a6839f75476900a4a2b24b2160fe4d164 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 19 May 2016 10:51:24 -0700 Subject: Remove union { } data and epoll_hdr structures. Added ev_epoll_linux files --- BUILD | 6 + Makefile | 2 + binding.gyp | 1 + build.yaml | 2 + config.m4 | 1 + gRPC.podspec | 3 + grpc.gemspec | 2 + package.xml | 2 + src/core/lib/iomgr/ev_epoll_linux.c | 1335 ++++++++++++++++++++ src/core/lib/iomgr/ev_epoll_linux.h | 41 + src/core/lib/iomgr/ev_epoll_posix.c | 87 +- src/core/lib/iomgr/ev_posix.c | 7 +- src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.core.internal | 2 + tools/run_tests/sources_and_headers.json | 3 + vsprojects/vcxproj/grpc/grpc.vcxproj | 3 + vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 6 + .../vcxproj/grpc_unsecure/grpc_unsecure.vcxproj | 3 + .../grpc_unsecure/grpc_unsecure.vcxproj.filters | 6 + 19 files changed, 1442 insertions(+), 71 deletions(-) create mode 100644 src/core/lib/iomgr/ev_epoll_linux.c create mode 100644 src/core/lib/iomgr/ev_epoll_linux.h (limited to 'src/core/lib/iomgr/ev_posix.c') diff --git a/BUILD b/BUILD index 0be8f27a01..a32352ebb3 100644 --- a/BUILD +++ b/BUILD @@ -178,6 +178,7 @@ cc_library( "src/core/lib/iomgr/closure.h", "src/core/lib/iomgr/endpoint.h", "src/core/lib/iomgr/endpoint_pair.h", + "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_epoll_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", @@ -322,6 +323,7 @@ cc_library( "src/core/lib/iomgr/endpoint.c", "src/core/lib/iomgr/endpoint_pair_posix.c", "src/core/lib/iomgr/endpoint_pair_windows.c", + "src/core/lib/iomgr/ev_epoll_linux.c", "src/core/lib/iomgr/ev_epoll_posix.c", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_posix.c", @@ -548,6 +550,7 @@ cc_library( "src/core/lib/iomgr/closure.h", "src/core/lib/iomgr/endpoint.h", "src/core/lib/iomgr/endpoint_pair.h", + "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_epoll_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", @@ -669,6 +672,7 @@ cc_library( "src/core/lib/iomgr/endpoint.c", "src/core/lib/iomgr/endpoint_pair_posix.c", "src/core/lib/iomgr/endpoint_pair_windows.c", + "src/core/lib/iomgr/ev_epoll_linux.c", "src/core/lib/iomgr/ev_epoll_posix.c", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_posix.c", @@ -1362,6 +1366,7 @@ objc_library( "src/core/lib/iomgr/endpoint.c", "src/core/lib/iomgr/endpoint_pair_posix.c", "src/core/lib/iomgr/endpoint_pair_windows.c", + "src/core/lib/iomgr/ev_epoll_linux.c", "src/core/lib/iomgr/ev_epoll_posix.c", "src/core/lib/iomgr/ev_poll_posix.c", "src/core/lib/iomgr/ev_posix.c", @@ -1567,6 +1572,7 @@ objc_library( "src/core/lib/iomgr/closure.h", "src/core/lib/iomgr/endpoint.h", "src/core/lib/iomgr/endpoint_pair.h", + "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_epoll_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", diff --git a/Makefile b/Makefile index 29ebc0e5ad..063698d943 100644 --- a/Makefile +++ b/Makefile @@ -2486,6 +2486,7 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/endpoint.c \ src/core/lib/iomgr/endpoint_pair_posix.c \ src/core/lib/iomgr/endpoint_pair_windows.c \ + src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_epoll_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ @@ -2841,6 +2842,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/endpoint.c \ src/core/lib/iomgr/endpoint_pair_posix.c \ src/core/lib/iomgr/endpoint_pair_windows.c \ + src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_epoll_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ diff --git a/binding.gyp b/binding.gyp index 89774ead4d..41e1b5bb41 100644 --- a/binding.gyp +++ b/binding.gyp @@ -581,6 +581,7 @@ 'src/core/lib/iomgr/endpoint.c', 'src/core/lib/iomgr/endpoint_pair_posix.c', 'src/core/lib/iomgr/endpoint_pair_windows.c', + 'src/core/lib/iomgr/ev_epoll_linux.c', 'src/core/lib/iomgr/ev_epoll_posix.c', 'src/core/lib/iomgr/ev_poll_posix.c', 'src/core/lib/iomgr/ev_posix.c', diff --git a/build.yaml b/build.yaml index 7ba6533297..2f3d07071d 100644 --- a/build.yaml +++ b/build.yaml @@ -165,6 +165,7 @@ filegroups: - src/core/lib/iomgr/closure.h - src/core/lib/iomgr/endpoint.h - src/core/lib/iomgr/endpoint_pair.h + - src/core/lib/iomgr/ev_epoll_linux.h - src/core/lib/iomgr/ev_epoll_posix.h - src/core/lib/iomgr/ev_poll_posix.h - src/core/lib/iomgr/ev_posix.h @@ -240,6 +241,7 @@ filegroups: - src/core/lib/iomgr/endpoint.c - src/core/lib/iomgr/endpoint_pair_posix.c - src/core/lib/iomgr/endpoint_pair_windows.c + - src/core/lib/iomgr/ev_epoll_linux.c - src/core/lib/iomgr/ev_epoll_posix.c - src/core/lib/iomgr/ev_poll_posix.c - src/core/lib/iomgr/ev_posix.c diff --git a/config.m4 b/config.m4 index 6987c74154..4308295afd 100644 --- a/config.m4 +++ b/config.m4 @@ -100,6 +100,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/endpoint.c \ src/core/lib/iomgr/endpoint_pair_posix.c \ src/core/lib/iomgr/endpoint_pair_windows.c \ + src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_epoll_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ diff --git a/gRPC.podspec b/gRPC.podspec index 3b4dd52380..de55880125 100644 --- a/gRPC.podspec +++ b/gRPC.podspec @@ -181,6 +181,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/endpoint.h', 'src/core/lib/iomgr/endpoint_pair.h', + 'src/core/lib/iomgr/ev_epoll_linux.h', 'src/core/lib/iomgr/ev_epoll_posix.h', 'src/core/lib/iomgr/ev_poll_posix.h', 'src/core/lib/iomgr/ev_posix.h', @@ -359,6 +360,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/endpoint.c', 'src/core/lib/iomgr/endpoint_pair_posix.c', 'src/core/lib/iomgr/endpoint_pair_windows.c', + 'src/core/lib/iomgr/ev_epoll_linux.c', 'src/core/lib/iomgr/ev_epoll_posix.c', 'src/core/lib/iomgr/ev_poll_posix.c', 'src/core/lib/iomgr/ev_posix.c', @@ -548,6 +550,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/endpoint.h', 'src/core/lib/iomgr/endpoint_pair.h', + 'src/core/lib/iomgr/ev_epoll_linux.h', 'src/core/lib/iomgr/ev_epoll_posix.h', 'src/core/lib/iomgr/ev_poll_posix.h', 'src/core/lib/iomgr/ev_posix.h', diff --git a/grpc.gemspec b/grpc.gemspec index 71cccb6ca8..54ae2eb68d 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -190,6 +190,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/closure.h ) s.files += %w( src/core/lib/iomgr/endpoint.h ) s.files += %w( src/core/lib/iomgr/endpoint_pair.h ) + s.files += %w( src/core/lib/iomgr/ev_epoll_linux.h ) s.files += %w( src/core/lib/iomgr/ev_epoll_posix.h ) s.files += %w( src/core/lib/iomgr/ev_poll_posix.h ) s.files += %w( src/core/lib/iomgr/ev_posix.h ) @@ -338,6 +339,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/endpoint.c ) s.files += %w( src/core/lib/iomgr/endpoint_pair_posix.c ) s.files += %w( src/core/lib/iomgr/endpoint_pair_windows.c ) + s.files += %w( src/core/lib/iomgr/ev_epoll_linux.c ) s.files += %w( src/core/lib/iomgr/ev_epoll_posix.c ) s.files += %w( src/core/lib/iomgr/ev_poll_posix.c ) s.files += %w( src/core/lib/iomgr/ev_posix.c ) diff --git a/package.xml b/package.xml index 0fc5d0dee4..d8e82a8bc3 100644 --- a/package.xml +++ b/package.xml @@ -197,6 +197,7 @@ + @@ -345,6 +346,7 @@ + diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c new file mode 100644 index 0000000000..f257ac8a1d --- /dev/null +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -0,0 +1,1335 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include + +#ifdef GPR_POSIX_SOCKET + +#include "src/core/lib/iomgr/ev_epoll_posix.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/wakeup_fd_posix.h" +#include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/block_annotate.h" + +struct polling_island; + +/******************************************************************************* + * FD declarations + */ + +struct grpc_fd { + int fd; + /* refst format: + bit0: 1=active/0=orphaned + bit1-n: refcount + meaning that mostly we ref by two to avoid altering the orphaned bit, + and just unref by 1 when we're ready to flag the object as orphaned */ + gpr_atm refst; + + gpr_mu mu; + int shutdown; + int closed; + int released; + + grpc_closure *read_closure; + grpc_closure *write_closure; + + /* Mutex protecting the 'polling_island' field */ + gpr_mu pi_mu; + + /* The polling island to which this fd belongs to. An fd belongs to exactly + one polling island */ + struct polling_island *polling_island; + + struct grpc_fd *freelist_next; + + grpc_closure *on_done_closure; + + grpc_iomgr_object iomgr_object; +}; + +/* Return 1 if this fd is orphaned, 0 otherwise */ +static bool fd_is_orphaned(grpc_fd *fd); + +/* Reference counting for fds */ +/*#define GRPC_FD_REF_COUNT_DEBUG*/ +#ifdef GRPC_FD_REF_COUNT_DEBUG +static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line); +static void fd_unref(grpc_fd *fd, const char *reason, const char *file, + int line); +#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__) +#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__) +#else +static void fd_ref(grpc_fd *fd); +static void fd_unref(grpc_fd *fd); +#define GRPC_FD_REF(fd, reason) fd_ref(fd) +#define GRPC_FD_UNREF(fd, reason) fd_unref(fd) +#endif + +static void fd_global_init(void); +static void fd_global_shutdown(void); + +#define CLOSURE_NOT_READY ((grpc_closure *)0) +#define CLOSURE_READY ((grpc_closure *)1) + +/******************************************************************************* + * Polling Island + */ +typedef struct polling_island { + gpr_mu mu; + int ref_cnt; + + /* Pointer to the polling_island this merged into. If this is not NULL, all + the remaining fields in this pollset (i.e all fields except mu and ref_cnt) + are considered invalid and must be ignored */ + struct polling_island *merged_to; + + /* The fd of the underlying epoll set */ + int epoll_fd; + + /* The file descriptors in the epoll set */ + size_t fd_cnt; + size_t fd_capacity; + grpc_fd **fds; + + /* Polling islands that are no longer needed are kept in a freelist so that + they can be reused. This field points to the next polling island in the + free list. Note that this is only used if the polling island is in the + free list */ + struct polling_island *next_free; +} polling_island; + +/* Polling island freelist */ +static gpr_mu g_pi_freelist_mu; +static polling_island *g_pi_freelist = NULL; + +/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ? */ +static void add_fd_to_polling_island_locked(polling_island *pi, grpc_fd *fd) { + int err; + struct epoll_event ev; + + ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); + ev.data.ptr = fd; + err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); + + if (err < 0 && errno != EEXIST) { + gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s", fd->fd, + strerror(errno)); + return; + } + + pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2); + pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity); + pi->fds[pi->fd_cnt++] = fd; +} + +static polling_island *polling_island_create(int initial_ref_cnt, + grpc_fd *initial_fd) { + polling_island *pi = NULL; + gpr_mu_lock(&g_pi_freelist_mu); + if (g_pi_freelist != NULL) { + pi = g_pi_freelist; + g_pi_freelist = g_pi_freelist->next_free; + pi->next_free = NULL; + } + gpr_mu_unlock(&g_pi_freelist_mu); + + /* Create new polling island if we could not get one from the free list */ + if (pi == NULL) { + pi = gpr_malloc(sizeof(*pi)); + gpr_mu_init(&pi->mu); + pi->fd_cnt = 0; + pi->fd_capacity = 0; + pi->fds = NULL; + + pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (pi->epoll_fd < 0) { + gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s", + strerror(errno)); + } + GPR_ASSERT(pi->epoll_fd >= 0); + } + + pi->ref_cnt = initial_ref_cnt; + pi->merged_to = NULL; + pi->next_free = NULL; + + if (initial_fd != NULL) { + /* add_fd_to_polling_island_locked() expects the caller to hold a pi->mu + * lock. However, since this is a new polling island (and no one has a + * reference to it yet), it is okay to not acquire pi->mu here */ + add_fd_to_polling_island_locked(pi, initial_fd); + } + + return pi; +} + +static void polling_island_global_init() { + polling_island_create(0, NULL); /* TODO(sreek): Delete this line */ + gpr_mu_init(&g_pi_freelist_mu); + g_pi_freelist = NULL; +} + +/******************************************************************************* + * pollset declarations + */ + +typedef struct grpc_cached_wakeup_fd { + grpc_wakeup_fd fd; + struct grpc_cached_wakeup_fd *next; +} grpc_cached_wakeup_fd; + +struct grpc_pollset_worker { + grpc_cached_wakeup_fd *wakeup_fd; + int reevaluate_polling_on_wakeup; + int kicked_specifically; + pthread_t pt_id; + struct grpc_pollset_worker *next; + struct grpc_pollset_worker *prev; +}; + +struct grpc_pollset { + gpr_mu mu; + grpc_pollset_worker root_worker; + int shutting_down; + int called_shutdown; + int kicked_without_pollers; + grpc_closure *shutdown_done; + + int epoll_fd; + + /* Mutex protecting the 'polling_island' field */ + gpr_mu pi_mu; + + /* The polling island to which this fd belongs to. An fd belongs to exactly + one polling island */ + struct polling_island *polling_island; + + /* Local cache of eventfds for workers */ + grpc_cached_wakeup_fd *local_wakeup_cache; +}; + +/* Add an fd to a pollset */ +static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + struct grpc_fd *fd); + +static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, grpc_fd *fd); + +/* Convert a timespec to milliseconds: + - very small or negative poll times are clamped to zero to do a + non-blocking poll (which becomes spin polling) + - other small values are rounded up to one millisecond + - longer than a millisecond polls are rounded up to the next nearest + millisecond to avoid spinning + - infinite timeouts are converted to -1 */ +static int poll_deadline_to_millis_timeout(gpr_timespec deadline, + gpr_timespec now); + +/* Allow kick to wakeup the currently polling worker */ +#define GRPC_POLLSET_CAN_KICK_SELF 1 +/* Force the wakee to repoll when awoken */ +#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 +/* As per pollset_kick, with an extended set of flags (defined above) + -- mostly for fd_posix's use. */ +static void pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags); + +/* turn a pollset into a multipoller: platform specific */ +typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + struct grpc_fd **fds, + size_t fd_count); + +/* Return 1 if the pollset has active threads in pollset_work (pollset must + * be locked) */ +static int pollset_has_workers(grpc_pollset *pollset); + +static void remove_fd_from_all_epoll_sets(int fd); + +/******************************************************************************* + * pollset_set definitions + */ + +struct grpc_pollset_set { + gpr_mu mu; + + size_t pollset_count; + size_t pollset_capacity; + grpc_pollset **pollsets; + + size_t pollset_set_count; + size_t pollset_set_capacity; + struct grpc_pollset_set **pollset_sets; + + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; +}; + +/******************************************************************************* + * fd_posix.c + */ + +/* We need to keep a freelist not because of any concerns of malloc performance + * but instead so that implementations with multiple threads in (for example) + * epoll_wait deal with the race between pollset removal and incoming poll + * notifications. + * + * The problem is that the poller ultimately holds a reference to this + * object, so it is very difficult to know when is safe to free it, at least + * without some expensive synchronization. + * + * If we keep the object freelisted, in the worst case losing this race just + * becomes a spurious read notification on a reused fd. + */ +/* TODO(klempner): We could use some form of polling generation count to know + * when these are safe to free. */ +/* TODO(klempner): Consider disabling freelisting if we don't have multiple + * threads in poll on the same fd */ +/* TODO(klempner): Batch these allocations to reduce fragmentation */ +static grpc_fd *fd_freelist = NULL; +static gpr_mu fd_freelist_mu; + +static void freelist_fd(grpc_fd *fd) { + gpr_mu_lock(&fd_freelist_mu); + fd->freelist_next = fd_freelist; + fd_freelist = fd; + grpc_iomgr_unregister_object(&fd->iomgr_object); + gpr_mu_unlock(&fd_freelist_mu); +} + +static grpc_fd *alloc_fd(int fd) { + grpc_fd *r = NULL; + + gpr_mu_lock(&fd_freelist_mu); + if (fd_freelist != NULL) { + r = fd_freelist; + fd_freelist = fd_freelist->freelist_next; + } + gpr_mu_unlock(&fd_freelist_mu); + + if (r == NULL) { + r = gpr_malloc(sizeof(grpc_fd)); + gpr_mu_init(&r->mu); + gpr_mu_init(&r->pi_mu); + } + + /* TODO: sreek - check with ctiller on why we need to acquire a lock here */ + gpr_mu_lock(&r->mu); + gpr_atm_rel_store(&r->refst, 1); + r->shutdown = 0; + r->read_closure = CLOSURE_NOT_READY; + r->write_closure = CLOSURE_NOT_READY; + r->fd = fd; + r->polling_island = NULL; + r->freelist_next = NULL; + r->on_done_closure = NULL; + r->closed = 0; + r->released = 0; + gpr_mu_unlock(&r->mu); + return r; +} + +static void destroy(grpc_fd *fd) { + gpr_mu_destroy(&fd->mu); + gpr_free(fd); +} + +#ifdef GRPC_FD_REF_COUNT_DEBUG +#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__) +#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__) +static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, + int line) { + gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, + gpr_atm_no_barrier_load(&fd->refst), + gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); +#else +#define REF_BY(fd, n, reason) ref_by(fd, n) +#define UNREF_BY(fd, n, reason) unref_by(fd, n) +static void ref_by(grpc_fd *fd, int n) { +#endif + GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0); +} + +#ifdef GRPC_FD_REF_COUNT_DEBUG +static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, + int line) { + gpr_atm old; + gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, + gpr_atm_no_barrier_load(&fd->refst), + gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line); +#else +static void unref_by(grpc_fd *fd, int n) { + gpr_atm old; +#endif + old = gpr_atm_full_fetch_add(&fd->refst, -n); + if (old == n) { + freelist_fd(fd); + } else { + GPR_ASSERT(old > n); + } +} + +static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); } + +static void fd_global_shutdown(void) { + gpr_mu_lock(&fd_freelist_mu); + gpr_mu_unlock(&fd_freelist_mu); + while (fd_freelist != NULL) { + grpc_fd *fd = fd_freelist; + fd_freelist = fd_freelist->freelist_next; + destroy(fd); + } + gpr_mu_destroy(&fd_freelist_mu); +} + +static grpc_fd *fd_create(int fd, const char *name) { + grpc_fd *r = alloc_fd(fd); + char *name2; + gpr_asprintf(&name2, "%s fd=%d", name, fd); + grpc_iomgr_register_object(&r->iomgr_object, name2); + gpr_free(name2); +#ifdef GRPC_FD_REF_COUNT_DEBUG + gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name); +#endif + return r; +} + +static bool fd_is_orphaned(grpc_fd *fd) { + return (gpr_atm_acq_load(&fd->refst) & 1) == 0; +} + +static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { + fd->closed = 1; + if (!fd->released) { + close(fd->fd); + } else { + remove_fd_from_all_epoll_sets(fd->fd); + } + grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL); +} + +static int fd_wrapped_fd(grpc_fd *fd) { + if (fd->released || fd->closed) { + return -1; + } else { + return fd->fd; + } +} + +/* TODO: sreek - do something here with the pollset island link */ +static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_closure *on_done, int *release_fd, + const char *reason) { + fd->on_done_closure = on_done; + fd->released = release_fd != NULL; + if (!fd->released) { + shutdown(fd->fd, SHUT_RDWR); + } else { + *release_fd = fd->fd; + } + gpr_mu_lock(&fd->mu); + REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ + close_fd_locked(exec_ctx, fd); + gpr_mu_unlock(&fd->mu); + UNREF_BY(fd, 2, reason); /* drop the reference */ +} + +/* increment refcount by two to avoid changing the orphan bit */ +#ifdef GRPC_FD_REF_COUNT_DEBUG +static void fd_ref(grpc_fd *fd, const char *reason, const char *file, + int line) { + ref_by(fd, 2, reason, file, line); +} + +static void fd_unref(grpc_fd *fd, const char *reason, const char *file, + int line) { + unref_by(fd, 2, reason, file, line); +} +#else +static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); } + +static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } +#endif + +static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_closure **st, grpc_closure *closure) { + if (*st == CLOSURE_NOT_READY) { + /* not ready ==> switch to a waiting state by setting the closure */ + *st = closure; + } else if (*st == CLOSURE_READY) { + /* already ready ==> queue the closure to run immediately */ + *st = CLOSURE_NOT_READY; + grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL); + } else { + /* upcallptr was set to a different closure. This is an error! */ + gpr_log(GPR_ERROR, + "User called a notify_on function with a previous callback still " + "pending"); + abort(); + } +} + +/* returns 1 if state becomes not ready */ +static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_closure **st) { + if (*st == CLOSURE_READY) { + /* duplicate ready ==> ignore */ + return 0; + } else if (*st == CLOSURE_NOT_READY) { + /* not ready, and not waiting ==> flag ready */ + *st = CLOSURE_READY; + return 0; + } else { + /* waiting ==> queue closure */ + grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL); + *st = CLOSURE_NOT_READY; + return 1; + } +} + +/* Do something here with the pollset island link (?) */ +static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { + gpr_mu_lock(&fd->mu); + GPR_ASSERT(!fd->shutdown); + fd->shutdown = 1; + set_ready_locked(exec_ctx, fd, &fd->read_closure); + set_ready_locked(exec_ctx, fd, &fd->write_closure); + gpr_mu_unlock(&fd->mu); +} + +static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_closure *closure) { + gpr_mu_lock(&fd->mu); + notify_on_locked(exec_ctx, fd, &fd->read_closure, closure); + gpr_mu_unlock(&fd->mu); +} + +static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_closure *closure) { + gpr_mu_lock(&fd->mu); + notify_on_locked(exec_ctx, fd, &fd->write_closure, closure); + gpr_mu_unlock(&fd->mu); +} + +/******************************************************************************* + * pollset_posix.c + */ + +GPR_TLS_DECL(g_current_thread_poller); +GPR_TLS_DECL(g_current_thread_worker); + +/** The alarm system needs to be able to wakeup 'some poller' sometimes + * (specifically when a new alarm needs to be triggered earlier than the next + * alarm 'epoch'). + * This wakeup_fd gives us something to alert on when such a case occurs. */ +grpc_wakeup_fd grpc_global_wakeup_fd; + +static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->prev->next = worker->next; + worker->next->prev = worker->prev; +} + +static int pollset_has_workers(grpc_pollset *p) { + return p->root_worker.next != &p->root_worker; +} + +static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { + if (pollset_has_workers(p)) { + grpc_pollset_worker *w = p->root_worker.next; + remove_worker(p, w); + return w; + } else { + return NULL; + } +} + +static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->next = &p->root_worker; + worker->prev = worker->next->prev; + worker->prev->next = worker->next->prev = worker; +} + +static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->prev = &p->root_worker; + worker->next = worker->prev->next; + worker->prev->next = worker->next->prev = worker; +} + +static void pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) { + GPR_TIMER_BEGIN("pollset_kick_ext", 0); + + /* pollset->mu already held */ + if (specific_worker != NULL) { + if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { + GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0); + GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); + for (specific_worker = p->root_worker.next; + specific_worker != &p->root_worker; + specific_worker = specific_worker->next) { + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + } + p->kicked_without_pollers = 1; + GPR_TIMER_END("pollset_kick_ext.broadcast", 0); + } else if (gpr_tls_get(&g_current_thread_worker) != + (intptr_t)specific_worker) { + GPR_TIMER_MARK("different_thread_worker", 0); + if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { + specific_worker->reevaluate_polling_on_wakeup = 1; + } + specific_worker->kicked_specifically = 1; + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + /* TODO (sreek): Refactor this into a separate file*/ + pthread_kill(specific_worker->pt_id, SIGUSR1); + } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { + GPR_TIMER_MARK("kick_yoself", 0); + if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { + specific_worker->reevaluate_polling_on_wakeup = 1; + } + specific_worker->kicked_specifically = 1; + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + } + } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { + GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); + GPR_TIMER_MARK("kick_anonymous", 0); + specific_worker = pop_front_worker(p); + if (specific_worker != NULL) { + if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { + GPR_TIMER_MARK("kick_anonymous_not_self", 0); + push_back_worker(p, specific_worker); + specific_worker = pop_front_worker(p); + if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && + gpr_tls_get(&g_current_thread_worker) == + (intptr_t)specific_worker) { + push_back_worker(p, specific_worker); + specific_worker = NULL; + } + } + if (specific_worker != NULL) { + GPR_TIMER_MARK("finally_kick", 0); + push_back_worker(p, specific_worker); + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + } + } else { + GPR_TIMER_MARK("kicked_no_pollers", 0); + p->kicked_without_pollers = 1; + } + } + + GPR_TIMER_END("pollset_kick_ext", 0); +} + +static void pollset_kick(grpc_pollset *p, + grpc_pollset_worker *specific_worker) { + pollset_kick_ext(p, specific_worker, 0); +} + +/* global state management */ + +static void sig_handler(int sig_num) { + gpr_log(GPR_INFO, "Received signal %d", sig_num); +} + +static void pollset_global_init(void) { + gpr_tls_init(&g_current_thread_poller); + gpr_tls_init(&g_current_thread_worker); + grpc_wakeup_fd_init(&grpc_global_wakeup_fd); + signal(SIGUSR1, sig_handler); +} + +static void pollset_global_shutdown(void) { + grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); + gpr_tls_destroy(&g_current_thread_poller); + gpr_tls_destroy(&g_current_thread_worker); +} + +static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } + +/* TODO: sreek. Try to Remove this forward declaration*/ +static void multipoll_with_epoll_pollset_create_efd(grpc_pollset *pollset); + +/* main interface */ + +static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { + gpr_mu_init(&pollset->mu); + *mu = &pollset->mu; + pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; + gpr_mu_init(&pollset->pi_mu); + pollset->polling_island = NULL; + pollset->shutting_down = 0; + pollset->called_shutdown = 0; + pollset->kicked_without_pollers = 0; + pollset->local_wakeup_cache = NULL; + pollset->kicked_without_pollers = 0; + + multipoll_with_epoll_pollset_create_efd(pollset); +} + +/* TODO(sreek): Maybe merge multipoll_*_destroy() with pollset_destroy() + * function */ +static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset); + +static void pollset_destroy(grpc_pollset *pollset) { + GPR_ASSERT(!pollset_has_workers(pollset)); + + multipoll_with_epoll_pollset_destroy(pollset); + + while (pollset->local_wakeup_cache) { + grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; + grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); + gpr_free(pollset->local_wakeup_cache); + pollset->local_wakeup_cache = next; + } + gpr_mu_destroy(&pollset->pi_mu); + gpr_mu_destroy(&pollset->mu); +} + +/* TODO(sreek) - Do something with the pollset island link (??) */ +static void pollset_reset(grpc_pollset *pollset) { + GPR_ASSERT(pollset->shutting_down); + GPR_ASSERT(!pollset_has_workers(pollset)); + pollset->shutting_down = 0; + pollset->called_shutdown = 0; + pollset->kicked_without_pollers = 0; +} + +/* TODO (sreek): Remove multipoll_with_epoll_finish_shutdown() declaration */ +static void multipoll_with_epoll_pollset_finish_shutdown(grpc_pollset *pollset); + +static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + multipoll_with_epoll_pollset_finish_shutdown(pollset); + grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); +} + +/* TODO(sreek): Remove multipoll_with_epoll_*_maybe_work_and_unlock declaration + */ +static void multipoll_with_epoll_pollset_maybe_work_and_unlock( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now); + +static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker_hdl, gpr_timespec now, + gpr_timespec deadline) { + grpc_pollset_worker worker; + *worker_hdl = &worker; + + /* pollset->mu already held */ + int added_worker = 0; + int locked = 1; + int queued_work = 0; + int keep_polling = 0; + GPR_TIMER_BEGIN("pollset_work", 0); + /* this must happen before we (potentially) drop pollset->mu */ + worker.next = worker.prev = NULL; + worker.reevaluate_polling_on_wakeup = 0; + if (pollset->local_wakeup_cache != NULL) { + worker.wakeup_fd = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd->next; + } else { + worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); + grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + } + worker.kicked_specifically = 0; + + /* TODO(sreek): Abstract this thread id stuff out into a separate file */ + worker.pt_id = pthread_self(); + /* If we're shutting down then we don't execute any extended work */ + if (pollset->shutting_down) { + GPR_TIMER_MARK("pollset_work.shutting_down", 0); + goto done; + } + /* Start polling, and keep doing so while we're being asked to + re-evaluate our pollers (this allows poll() based pollers to + ensure they don't miss wakeups) */ + keep_polling = 1; + while (keep_polling) { + keep_polling = 0; + if (!pollset->kicked_without_pollers) { + if (!added_worker) { + push_front_worker(pollset, &worker); + added_worker = 1; + gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); + } + gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); + GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); + + multipoll_with_epoll_pollset_maybe_work_and_unlock( + exec_ctx, pollset, &worker, deadline, now); + + GPR_TIMER_END("maybe_work_and_unlock", 0); + locked = 0; + gpr_tls_set(&g_current_thread_poller, 0); + } else { + GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0); + pollset->kicked_without_pollers = 0; + } + /* Finished execution - start cleaning up. + Note that we may arrive here from outside the enclosing while() loop. + In that case we won't loop though as we haven't added worker to the + worker list, which means nobody could ask us to re-evaluate polling). */ + done: + if (!locked) { + queued_work |= grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&pollset->mu); + locked = 1; + } + /* If we're forced to re-evaluate polling (via pollset_kick with + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force + a loop */ + if (worker.reevaluate_polling_on_wakeup) { + worker.reevaluate_polling_on_wakeup = 0; + pollset->kicked_without_pollers = 0; + if (queued_work || worker.kicked_specifically) { + /* If there's queued work on the list, then set the deadline to be + immediate so we get back out of the polling loop quickly */ + deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); + } + keep_polling = 1; + } + } + if (added_worker) { + remove_worker(pollset, &worker); + gpr_tls_set(&g_current_thread_worker, 0); + } + /* release wakeup fd to the local pool */ + worker.wakeup_fd->next = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd; + /* check shutdown conditions */ + if (pollset->shutting_down) { + if (pollset_has_workers(pollset)) { + pollset_kick(pollset, NULL); + } else if (!pollset->called_shutdown) { + pollset->called_shutdown = 1; + gpr_mu_unlock(&pollset->mu); + finish_shutdown(exec_ctx, pollset); + grpc_exec_ctx_flush(exec_ctx); + /* Continuing to access pollset here is safe -- it is the caller's + * responsibility to not destroy when it has outstanding calls to + * pollset_work. + * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ + gpr_mu_lock(&pollset->mu); + } + } + *worker_hdl = NULL; + GPR_TIMER_END("pollset_work", 0); +} + +/* TODO: (sreek) Do something with the pollset island link */ +static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_closure *closure) { + GPR_ASSERT(!pollset->shutting_down); + pollset->shutting_down = 1; + pollset->shutdown_done = closure; + pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); + + if (!pollset->called_shutdown && !pollset_has_workers(pollset)) { + pollset->called_shutdown = 1; + finish_shutdown(exec_ctx, pollset); + } +} + +static int poll_deadline_to_millis_timeout(gpr_timespec deadline, + gpr_timespec now) { + gpr_timespec timeout; + static const int64_t max_spin_polling_us = 10; + if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { + return -1; + } + if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( + max_spin_polling_us, + GPR_TIMESPAN))) <= 0) { + return 0; + } + timeout = gpr_time_sub(deadline, now); + return gpr_time_to_millis(gpr_time_add( + timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); +} + +/******************************************************************************* + * pollset_multipoller_with_epoll_posix.c + */ + +static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) { + /* only one set_ready can be active at once (but there may be a racing + notify_on) */ + gpr_mu_lock(&fd->mu); + set_ready_locked(exec_ctx, fd, st); + gpr_mu_unlock(&fd->mu); +} + +static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { + set_ready(exec_ctx, fd, &fd->read_closure); +} + +static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { + set_ready(exec_ctx, fd, &fd->write_closure); +} + +/* TODO (sreek): Maybe this global list is not required. Double check*/ +struct epoll_fd_list { + int *epoll_fds; + size_t count; + size_t capacity; +}; + +static struct epoll_fd_list epoll_fd_global_list; +static gpr_once init_epoll_fd_list_mu = GPR_ONCE_INIT; +static gpr_mu epoll_fd_list_mu; + +static void init_mu(void) { gpr_mu_init(&epoll_fd_list_mu); } + +static void add_epoll_fd_to_global_list(int epoll_fd) { + gpr_once_init(&init_epoll_fd_list_mu, init_mu); + + gpr_mu_lock(&epoll_fd_list_mu); + if (epoll_fd_global_list.count == epoll_fd_global_list.capacity) { + epoll_fd_global_list.capacity = + GPR_MAX((size_t)8, epoll_fd_global_list.capacity * 2); + epoll_fd_global_list.epoll_fds = + gpr_realloc(epoll_fd_global_list.epoll_fds, + epoll_fd_global_list.capacity * sizeof(int)); + } + epoll_fd_global_list.epoll_fds[epoll_fd_global_list.count++] = epoll_fd; + gpr_mu_unlock(&epoll_fd_list_mu); +} + +static void remove_epoll_fd_from_global_list(int epoll_fd) { + gpr_mu_lock(&epoll_fd_list_mu); + GPR_ASSERT(epoll_fd_global_list.count > 0); + for (size_t i = 0; i < epoll_fd_global_list.count; i++) { + if (epoll_fd == epoll_fd_global_list.epoll_fds[i]) { + epoll_fd_global_list.epoll_fds[i] = + epoll_fd_global_list.epoll_fds[--(epoll_fd_global_list.count)]; + break; + } + } + gpr_mu_unlock(&epoll_fd_list_mu); +} + +static void remove_fd_from_all_epoll_sets(int fd) { + int err; + gpr_once_init(&init_epoll_fd_list_mu, init_mu); + gpr_mu_lock(&epoll_fd_list_mu); + if (epoll_fd_global_list.count == 0) { + gpr_mu_unlock(&epoll_fd_list_mu); + return; + } + for (size_t i = 0; i < epoll_fd_global_list.count; i++) { + err = epoll_ctl(epoll_fd_global_list.epoll_fds[i], EPOLL_CTL_DEL, fd, NULL); + if (err < 0 && errno != ENOENT) { + gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd, + strerror(errno)); + } + } + gpr_mu_unlock(&epoll_fd_list_mu); +} + +/* TODO: sreek - This function multipoll_with_epoll_pollset_add_fd() and + * finally_add_fd() in ev_poll_and_epoll_posix.c */ +static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_fd *fd) { + + /* TODO sreek - Check if we need to get a pollset->mu lock here */ + + struct epoll_event ev; + int err; + + /* Hold a ref to the fd to keep it from being closed during the add. This may + result in a spurious wakeup being assigned to this pollset whilst adding, + but that should be benign. */ + /* TODO: (sreek): Understand how a spurious wake up migh be assinged to this + * pollset..and how holding a reference will prevent the fd from being closed + * (and perhaps more importantly, see how can an fd be closed while being + * added to the epollset */ + GRPC_FD_REF(fd, "add fd"); + + gpr_mu_lock(&fd->mu); + if (fd->shutdown) { + gpr_mu_unlock(&fd->mu); + GRPC_FD_UNREF(fd, "add fd"); + return; + } + gpr_mu_unlock(&fd->mu); + + ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); + ev.data.ptr = fd; + err = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); + if (err < 0) { + /* FDs may be added to a pollset multiple times, so EEXIST is normal. */ + if (errno != EEXIST) { + gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, + strerror(errno)); + } + } + + /* The fd might have been orphaned while we were adding it to the epoll set. + Close the fd in such a case (which will also take care of removing it from + the epoll set */ + gpr_mu_lock(&fd->mu); + if (fd_is_orphaned(fd) && !fd->closed) { + close_fd_locked(exec_ctx, fd); + } + gpr_mu_unlock(&fd->mu); + + GRPC_FD_UNREF(fd, "add fd"); +} + +/* Creates an epoll fd and initializes the pollset */ +/* TODO: This has to be called ONLY from pollset_init function. and hence it + * does not acquire any lock */ +static void multipoll_with_epoll_pollset_create_efd(grpc_pollset *pollset) { + struct epoll_event ev; + int err; + + pollset->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (pollset->epoll_fd < 0) { + gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); + abort(); + } + add_epoll_fd_to_global_list(pollset->epoll_fd); + + ev.events = (uint32_t)(EPOLLIN | EPOLLET); + ev.data.ptr = NULL; + + err = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_ADD, + GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev); + if (err < 0) { + gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", + GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), + strerror(errno)); + } +} + +/* TODO(klempner): We probably want to turn this down a bit */ +#define GRPC_EPOLL_MAX_EVENTS 1000 + +static void multipoll_with_epoll_pollset_maybe_work_and_unlock( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now) { + struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; + int epoll_fd = pollset->epoll_fd; + int ep_rv; + int poll_rv; + int timeout_ms; + struct pollfd pfds[2]; + + /* If you want to ignore epoll's ability to sanely handle parallel pollers, + * for a more apples-to-apples performance comparison with poll, add a + * if (pollset->counter != 0) { return 0; } + * here. + */ + + gpr_mu_unlock(&pollset->mu); + + timeout_ms = poll_deadline_to_millis_timeout(deadline, now); + + pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); + pfds[0].events = POLLIN; + pfds[0].revents = 0; + pfds[1].fd = epoll_fd; + pfds[1].events = POLLIN; + pfds[1].revents = 0; + + /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid + even going into the blocking annotation if possible */ + GPR_TIMER_BEGIN("poll", 0); + GRPC_SCHEDULING_START_BLOCKING_REGION; + poll_rv = grpc_poll_function(pfds, 2, timeout_ms); + GRPC_SCHEDULING_END_BLOCKING_REGION; + GPR_TIMER_END("poll", 0); + + if (poll_rv < 0) { + if (errno != EINTR) { + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + } + } else if (poll_rv == 0) { + /* do nothing */ + } else { + if (pfds[0].revents) { + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + } + if (pfds[1].revents) { + do { + /* The following epoll_wait never blocks; it has a timeout of 0 */ + ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); + if (ep_rv < 0) { + if (errno != EINTR) { + gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno)); + } + } else { + int i; + for (i = 0; i < ep_rv; ++i) { + grpc_fd *fd = ep_ev[i].data.ptr; + /* TODO(klempner): We might want to consider making err and pri + * separate events */ + int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); + int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); + int write_ev = ep_ev[i].events & EPOLLOUT; + if (fd == NULL) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } else { + if (read_ev || cancel) { + fd_become_readable(exec_ctx, fd); + } + if (write_ev || cancel) { + fd_become_writable(exec_ctx, fd); + } + } + } + } + } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); + } + } +} + +static void multipoll_with_epoll_pollset_finish_shutdown( + grpc_pollset *pollset) {} + +static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { + close(pollset->epoll_fd); + remove_epoll_fd_from_global_list(pollset->epoll_fd); +} + +/******************************************************************************* + * pollset_set_posix.c + */ + +static grpc_pollset_set *pollset_set_create(void) { + grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); + memset(pollset_set, 0, sizeof(*pollset_set)); + gpr_mu_init(&pollset_set->mu); + return pollset_set; +} + +static void pollset_set_destroy(grpc_pollset_set *pollset_set) { + size_t i; + gpr_mu_destroy(&pollset_set->mu); + for (i = 0; i < pollset_set->fd_count; i++) { + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); + } + gpr_free(pollset_set->pollsets); + gpr_free(pollset_set->pollset_sets); + gpr_free(pollset_set->fds); + gpr_free(pollset_set); +} + +static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset) { + size_t i, j; + gpr_mu_lock(&pollset_set->mu); + if (pollset_set->pollset_count == pollset_set->pollset_capacity) { + pollset_set->pollset_capacity = + GPR_MAX(8, 2 * pollset_set->pollset_capacity); + pollset_set->pollsets = + gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity * + sizeof(*pollset_set->pollsets)); + } + pollset_set->pollsets[pollset_set->pollset_count++] = pollset; + for (i = 0, j = 0; i < pollset_set->fd_count; i++) { + if (fd_is_orphaned(pollset_set->fds[i])) { + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); + } else { + pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]); + pollset_set->fds[j++] = pollset_set->fds[i]; + } + } + pollset_set->fd_count = j; + gpr_mu_unlock(&pollset_set->mu); +} + +static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset) { + size_t i; + gpr_mu_lock(&pollset_set->mu); + for (i = 0; i < pollset_set->pollset_count; i++) { + if (pollset_set->pollsets[i] == pollset) { + pollset_set->pollset_count--; + GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i], + pollset_set->pollsets[pollset_set->pollset_count]); + break; + } + } + gpr_mu_unlock(&pollset_set->mu); +} + +static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i, j; + gpr_mu_lock(&bag->mu); + if (bag->pollset_set_count == bag->pollset_set_capacity) { + bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); + bag->pollset_sets = + gpr_realloc(bag->pollset_sets, + bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + } + bag->pollset_sets[bag->pollset_set_count++] = item; + for (i = 0, j = 0; i < bag->fd_count; i++) { + if (fd_is_orphaned(bag->fds[i])) { + GRPC_FD_UNREF(bag->fds[i], "pollset_set"); + } else { + pollset_set_add_fd(exec_ctx, item, bag->fds[i]); + bag->fds[j++] = bag->fds[i]; + } + } + bag->fd_count = j; + gpr_mu_unlock(&bag->mu); +} + +static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i; + gpr_mu_lock(&bag->mu); + for (i = 0; i < bag->pollset_set_count; i++) { + if (bag->pollset_sets[i] == item) { + bag->pollset_set_count--; + GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], + bag->pollset_sets[bag->pollset_set_count]); + break; + } + } + gpr_mu_unlock(&bag->mu); +} + +static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, grpc_fd *fd) { + size_t i; + gpr_mu_lock(&pollset_set->mu); + if (pollset_set->fd_count == pollset_set->fd_capacity) { + pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity); + pollset_set->fds = gpr_realloc( + pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds)); + } + GRPC_FD_REF(fd, "pollset_set"); + pollset_set->fds[pollset_set->fd_count++] = fd; + for (i = 0; i < pollset_set->pollset_count; i++) { + pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd); + } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } + gpr_mu_unlock(&pollset_set->mu); +} + +static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, grpc_fd *fd) { + size_t i; + gpr_mu_lock(&pollset_set->mu); + for (i = 0; i < pollset_set->fd_count; i++) { + if (pollset_set->fds[i] == fd) { + pollset_set->fd_count--; + GPR_SWAP(grpc_fd *, pollset_set->fds[i], + pollset_set->fds[pollset_set->fd_count]); + GRPC_FD_UNREF(fd, "pollset_set"); + break; + } + } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } + gpr_mu_unlock(&pollset_set->mu); +} + +/******************************************************************************* + * event engine binding + */ + +static void shutdown_engine(void) { + fd_global_shutdown(); + pollset_global_shutdown(); +} + +static const grpc_event_engine_vtable vtable = { + .pollset_size = sizeof(grpc_pollset), + + .fd_create = fd_create, + .fd_wrapped_fd = fd_wrapped_fd, + .fd_orphan = fd_orphan, + .fd_shutdown = fd_shutdown, + .fd_notify_on_read = fd_notify_on_read, + .fd_notify_on_write = fd_notify_on_write, + + .pollset_init = pollset_init, + .pollset_shutdown = pollset_shutdown, + .pollset_reset = pollset_reset, + .pollset_destroy = pollset_destroy, + .pollset_work = pollset_work, + .pollset_kick = pollset_kick, + .pollset_add_fd = pollset_add_fd, + + .pollset_set_create = pollset_set_create, + .pollset_set_destroy = pollset_set_destroy, + .pollset_set_add_pollset = pollset_set_add_pollset, + .pollset_set_del_pollset = pollset_set_del_pollset, + .pollset_set_add_pollset_set = pollset_set_add_pollset_set, + .pollset_set_del_pollset_set = pollset_set_del_pollset_set, + .pollset_set_add_fd = pollset_set_add_fd, + .pollset_set_del_fd = pollset_set_del_fd, + + .kick_poller = kick_poller, + + .shutdown_engine = shutdown_engine, +}; + +const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { + fd_global_init(); + pollset_global_init(); + polling_island_global_init(); + return &vtable; +} + +#endif diff --git a/src/core/lib/iomgr/ev_epoll_linux.h b/src/core/lib/iomgr/ev_epoll_linux.h new file mode 100644 index 0000000000..8c819975a4 --- /dev/null +++ b/src/core/lib/iomgr/ev_epoll_linux.h @@ -0,0 +1,41 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_EV_EPOLL_LINUX_H +#define GRPC_CORE_LIB_IOMGR_EV_EPOLL_LINUX_H + +#include "src/core/lib/iomgr/ev_posix.h" + +const grpc_event_engine_vtable *grpc_init_epoll_linux(void); + +#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL_LINUX_H */ diff --git a/src/core/lib/iomgr/ev_epoll_posix.c b/src/core/lib/iomgr/ev_epoll_posix.c index 4481bab438..5abd5b2a94 100644 --- a/src/core/lib/iomgr/ev_epoll_posix.c +++ b/src/core/lib/iomgr/ev_epoll_posix.c @@ -42,6 +42,7 @@ #include #include #include +#include #include #include @@ -51,11 +52,13 @@ #include #include +#include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" + /******************************************************************************* * FD declarations */ @@ -133,10 +136,9 @@ struct grpc_pollset { int called_shutdown; int kicked_without_pollers; grpc_closure *shutdown_done; - union { - int fd; - void *ptr; - } data; + + int epoll_fd; + /* Local cache of eventfds for workers */ grpc_cached_wakeup_fd *local_wakeup_cache; }; @@ -589,7 +591,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->local_wakeup_cache = NULL; pollset->kicked_without_pollers = 0; - pollset->data.ptr = NULL; multipoll_with_epoll_pollset_create_efd(pollset); } @@ -619,22 +620,6 @@ static void pollset_reset(grpc_pollset *pollset) { pollset->kicked_without_pollers = 0; } -/* TODO (sreek): Remove multipoll_with_epoll_add_fd declaration*/ -static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_fd *fd); - -static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd) { - /* TODO (sreek) - Does reading pollset->data.ptr need pollset->mu lock ? - * because finally_add_fd() also reads it but without the lock! */ - gpr_mu_lock(&pollset->mu); - GPR_ASSERT(pollset->data.ptr != NULL); - gpr_mu_unlock(&pollset->mu); - - multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fd); -} - /* TODO (sreek): Remove multipoll_with_epoll_finish_shutdown() declaration */ static void multipoll_with_epoll_pollset_finish_shutdown(grpc_pollset *pollset); @@ -790,20 +775,6 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, * pollset_multipoller_with_epoll_posix.c */ -#include -#include -#include -#include -#include - -#include -#include -#include - -#include "src/core/lib/iomgr/ev_posix.h" -#include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" - static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) { /* only one set_ready can be active at once (but there may be a racing notify_on) */ @@ -879,13 +850,13 @@ static void remove_fd_from_all_epoll_sets(int fd) { gpr_mu_unlock(&epoll_fd_list_mu); } -typedef struct { int epoll_fd; } epoll_hdr; - -static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, +/* TODO: sreek - This function multipoll_with_epoll_pollset_add_fd() and + * finally_add_fd() in ev_poll_and_epoll_posix.c */ +static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - /*TODO: (sree) Shouldn't this read (pollset->data.ptr) be done under a - pollset lock - i.e pollset->mu ? */ - epoll_hdr *h = pollset->data.ptr; + + /* TODO sreek - Check if we need to get a pollset->mu lock here */ + struct epoll_event ev; int err; @@ -908,7 +879,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); ev.data.ptr = fd; - err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); + err = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); if (err < 0) { /* FDs may be added to a pollset multiple times, so EEXIST is normal. */ if (errno != EEXIST) { @@ -933,26 +904,20 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* TODO: This has to be called ONLY from pollset_init function. and hence it * does not acquire any lock */ static void multipoll_with_epoll_pollset_create_efd(grpc_pollset *pollset) { - epoll_hdr *h = gpr_malloc(sizeof(epoll_hdr)); struct epoll_event ev; int err; - /* TODO (sreek). remove this assert. Currently added this just to ensure that - * we do not overwrite h->epoll_fd without freeing the older one*/ - GPR_ASSERT(pollset->data.ptr == NULL); - - pollset->data.ptr = h; - h->epoll_fd = epoll_create1(EPOLL_CLOEXEC); - if (h->epoll_fd < 0) { + pollset->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (pollset->epoll_fd < 0) { gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); abort(); } - add_epoll_fd_to_global_list(h->epoll_fd); + add_epoll_fd_to_global_list(pollset->epoll_fd); ev.events = (uint32_t)(EPOLLIN | EPOLLET); ev.data.ptr = NULL; - err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, + err = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_ADD, GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev); if (err < 0) { gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", @@ -961,12 +926,6 @@ static void multipoll_with_epoll_pollset_create_efd(grpc_pollset *pollset) { } } -static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_fd *fd) { - finally_add_fd(exec_ctx, pollset, fd); -} - /* TODO(klempner): We probably want to turn this down a bit */ #define GRPC_EPOLL_MAX_EVENTS 1000 @@ -974,9 +933,9 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; + int epoll_fd = pollset->epoll_fd; int ep_rv; int poll_rv; - epoll_hdr *h = pollset->data.ptr; int timeout_ms; struct pollfd pfds[2]; @@ -993,7 +952,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); pfds[0].events = POLLIN; pfds[0].revents = 0; - pfds[1].fd = h->epoll_fd; + pfds[1].fd = epoll_fd; pfds[1].events = POLLIN; pfds[1].revents = 0; @@ -1018,7 +977,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( if (pfds[1].revents) { do { /* The following epoll_wait never blocks; it has a timeout of 0 */ - ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); + ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); if (ep_rv < 0) { if (errno != EINTR) { gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno)); @@ -1053,10 +1012,8 @@ static void multipoll_with_epoll_pollset_finish_shutdown( grpc_pollset *pollset) {} static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { - epoll_hdr *h = pollset->data.ptr; - close(h->epoll_fd); - remove_epoll_fd_from_global_list(h->epoll_fd); - gpr_free(h); + close(pollset->epoll_fd); + remove_epoll_fd_from_global_list(pollset->epoll_fd); } /******************************************************************************* diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index baa3b9856a..404ef2a64b 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -44,7 +44,7 @@ #include #include -#include "src/core/lib/iomgr/ev_epoll_posix.h" +#include "src/core/lib/iomgr/ev_epoll_linux.h" #include "src/core/lib/iomgr/ev_poll_posix.h" #include "src/core/lib/support/env.h" @@ -163,11 +163,6 @@ void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, g_event_engine->fd_notify_on_write(exec_ctx, fd, closure); } -grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, - grpc_fd *fd) { - return g_event_engine->fd_get_read_notifier_pollset(exec_ctx, fd); -} - size_t grpc_pollset_size(void) { return g_event_engine->pollset_size; } void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 49b4ddc457..13bc6888d6 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -94,6 +94,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/endpoint.c', 'src/core/lib/iomgr/endpoint_pair_posix.c', 'src/core/lib/iomgr/endpoint_pair_windows.c', + 'src/core/lib/iomgr/ev_epoll_linux.c', 'src/core/lib/iomgr/ev_epoll_posix.c', 'src/core/lib/iomgr/ev_poll_posix.c', 'src/core/lib/iomgr/ev_posix.c', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 36b25cca16..d968278f2a 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -807,6 +807,7 @@ src/core/lib/http/parser.h \ src/core/lib/iomgr/closure.h \ src/core/lib/iomgr/endpoint.h \ src/core/lib/iomgr/endpoint_pair.h \ +src/core/lib/iomgr/ev_epoll_linux.h \ src/core/lib/iomgr/ev_epoll_posix.h \ src/core/lib/iomgr/ev_poll_posix.h \ src/core/lib/iomgr/ev_posix.h \ @@ -955,6 +956,7 @@ src/core/lib/iomgr/closure.c \ src/core/lib/iomgr/endpoint.c \ src/core/lib/iomgr/endpoint_pair_posix.c \ src/core/lib/iomgr/endpoint_pair_windows.c \ +src/core/lib/iomgr/ev_epoll_linux.c \ src/core/lib/iomgr/ev_epoll_posix.c \ src/core/lib/iomgr/ev_poll_posix.c \ src/core/lib/iomgr/ev_posix.c \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 4394049586..97cc55db36 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -5530,6 +5530,7 @@ "src/core/lib/iomgr/closure.h", "src/core/lib/iomgr/endpoint.h", "src/core/lib/iomgr/endpoint_pair.h", + "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_epoll_posix.h", "src/core/lib/iomgr/ev_poll_posix.h", "src/core/lib/iomgr/ev_posix.h", @@ -5630,6 +5631,8 @@ "src/core/lib/iomgr/endpoint_pair.h", "src/core/lib/iomgr/endpoint_pair_posix.c", "src/core/lib/iomgr/endpoint_pair_windows.c", + "src/core/lib/iomgr/ev_epoll_linux.c", + "src/core/lib/iomgr/ev_epoll_linux.h", "src/core/lib/iomgr/ev_epoll_posix.c", "src/core/lib/iomgr/ev_epoll_posix.h", "src/core/lib/iomgr/ev_poll_posix.c", diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 55304af586..a67e4d16da 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -316,6 +316,7 @@ + @@ -484,6 +485,8 @@ + + diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 7d1c90fda7..bf9b7dc7dc 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -55,6 +55,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr @@ -677,6 +680,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 3d0cdfc668..afc9a2ca1b 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -304,6 +304,7 @@ + @@ -450,6 +451,8 @@ + + diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index d2ff4c630f..b7507f9a96 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -58,6 +58,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr @@ -575,6 +578,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr -- cgit v1.2.3 From d7d6eed78822ed8de14fdf9b39255365946cf8c4 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 31 May 2016 09:44:08 -0700 Subject: Correct typo --- src/core/lib/iomgr/ev_posix.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core/lib/iomgr/ev_posix.c') diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 404ef2a64b..96399ef837 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -44,7 +44,7 @@ #include #include -#include "src/core/lib/iomgr/ev_epoll_linux.h" +#include "src/core/lib/iomgr/ev_epoll_posix.h" #include "src/core/lib/iomgr/ev_poll_posix.h" #include "src/core/lib/support/env.h" -- cgit v1.2.3 From 73ef9154024290e86a3566a574c04992afc93d00 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Fri, 3 Jun 2016 15:25:05 -0700 Subject: epoll polling strategy now points to the new code --- src/core/lib/iomgr/ev_epoll_linux.c | 4 ---- src/core/lib/iomgr/ev_posix.c | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) (limited to 'src/core/lib/iomgr/ev_posix.c') diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index ce42a9e7ce..ab4224b2d5 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -788,16 +788,12 @@ static void sig_handler(int sig_num) { /* Global state management */ static void pollset_global_init(void) { - gpr_tls_init(&g_current_thread_poller); - gpr_tls_init(&g_current_thread_worker); grpc_wakeup_fd_init(&grpc_global_wakeup_fd); signal(SIGUSR1, sig_handler); } static void pollset_global_shutdown(void) { grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); - gpr_tls_destroy(&g_current_thread_poller); - gpr_tls_destroy(&g_current_thread_worker); } /* Return 1 if the pollset has active threads in pollset_work (pollset must diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 96399ef837..b6b113aed3 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -44,7 +44,7 @@ #include #include -#include "src/core/lib/iomgr/ev_epoll_posix.h" +#include "src/core/lib/iomgr/ev_epoll_linux.h" #include "src/core/lib/iomgr/ev_poll_posix.h" #include "src/core/lib/support/env.h" @@ -62,7 +62,7 @@ typedef struct { } event_engine_factory; static const event_engine_factory g_factories[] = { - {"poll", grpc_init_poll_posix}, {"epoll", grpc_init_epoll_posix}, + {"poll", grpc_init_poll_posix}, {"epoll", grpc_init_epoll_linux}, }; static void add(const char *beg, const char *end, char ***ss, size_t *ns) { -- cgit v1.2.3 From 5855c478c69508f000baa4878f515d72b5f5a1e9 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 8 Jun 2016 12:56:56 -0700 Subject: Use poll if not linux, add read notifier pollset support and some groundwork for adding API that allows users to register custom kick signal number --- include/grpc/impl/codegen/port_platform.h | 1 + src/core/lib/iomgr/ev_epoll_linux.c | 72 ++++++++++++++++++++++--------- src/core/lib/iomgr/ev_posix.c | 2 +- 3 files changed, 54 insertions(+), 21 deletions(-) (limited to 'src/core/lib/iomgr/ev_posix.c') diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h index be4215a54b..7a6ec53fb4 100644 --- a/include/grpc/impl/codegen/port_platform.h +++ b/include/grpc/impl/codegen/port_platform.h @@ -189,6 +189,7 @@ #define GPR_GCC_ATOMIC 1 #define GPR_GCC_TLS 1 #define GPR_LINUX 1 +#define GPR_LINUX_EPOLL 1 #define GPR_LINUX_LOG #define GPR_LINUX_MULTIPOLL_WITH_EPOLL 1 #define GPR_POSIX_WAKEUP_FD 1 diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index d5aac96fa4..69ab665e15 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -33,7 +33,7 @@ #include -#ifdef GPR_POSIX_SOCKET +#ifdef GPR_LINUX_EPOLL #include "src/core/lib/iomgr/ev_epoll_linux.h" @@ -60,6 +60,8 @@ struct polling_island; +static int grpc_poller_kick_signum; + /******************************************************************************* * Fd Declarations */ @@ -92,6 +94,9 @@ struct grpc_fd { struct grpc_fd *freelist_next; grpc_closure *on_done_closure; + /* The pollset that last noticed that the fd is readable */ + grpc_pollset *read_notifier_pollset; + grpc_iomgr_object iomgr_object; }; @@ -650,14 +655,15 @@ static grpc_fd *fd_create(int fd, const char *name) { gpr_mu_lock(&new_fd->mu); gpr_atm_rel_store(&new_fd->refst, 1); + new_fd->fd = fd; new_fd->shutdown = false; + new_fd->orphaned = false; new_fd->read_closure = CLOSURE_NOT_READY; new_fd->write_closure = CLOSURE_NOT_READY; - new_fd->fd = fd; new_fd->polling_island = NULL; new_fd->freelist_next = NULL; new_fd->on_done_closure = NULL; - new_fd->orphaned = false; + new_fd->read_notifier_pollset = NULL; gpr_mu_unlock(&new_fd->mu); @@ -765,6 +771,17 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } } +static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, + grpc_fd *fd) { + grpc_pollset *notifier = NULL; + + gpr_mu_lock(&fd->mu); + notifier = fd->read_notifier_pollset; + gpr_mu_unlock(&fd->mu); + + return notifier; +} + static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_lock(&fd->mu); GPR_ASSERT(!fd->shutdown); @@ -801,16 +818,25 @@ static void sig_handler(int sig_num) { #endif } +static void poller_kick_init() { + grpc_poller_kick_signum = SIGRTMIN + 2; + signal(grpc_poller_kick_signum, sig_handler); +} + /* Global state management */ static void pollset_global_init(void) { grpc_wakeup_fd_init(&grpc_global_wakeup_fd); - signal(SIGUSR1, sig_handler); /* TODO: sreek - Do not hardcode SIGUSR1 */ + poller_kick_init(); } static void pollset_global_shutdown(void) { grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); } +static void pollset_worker_kick(grpc_pollset_worker *worker) { + pthread_kill(worker->pt_id, grpc_poller_kick_signum); +} + /* Return 1 if the pollset has active threads in pollset_work (pollset must * be locked) */ static int pollset_has_workers(grpc_pollset *p) { @@ -856,7 +882,7 @@ static void pollset_kick(grpc_pollset *p, GPR_TIMER_BEGIN("pollset_kick.broadcast", 0); for (worker = p->root_worker.next; worker != &p->root_worker; worker = worker->next) { - pthread_kill(worker->pt_id, SIGUSR1); + pollset_worker_kick(worker); } } else { p->kicked_without_pollers = true; @@ -864,7 +890,7 @@ static void pollset_kick(grpc_pollset *p, GPR_TIMER_END("pollset_kick.broadcast", 0); } else { GPR_TIMER_MARK("kicked_specifically", 0); - pthread_kill(worker->pt_id, SIGUSR1); + pollset_worker_kick(worker); } } else { GPR_TIMER_MARK("kick_anonymous", 0); @@ -872,7 +898,7 @@ static void pollset_kick(grpc_pollset *p, if (worker != NULL) { GPR_TIMER_MARK("finally_kick", 0); push_back_worker(p, worker); - pthread_kill(worker->pt_id, SIGUSR1); + pollset_worker_kick(worker); } else { GPR_TIMER_MARK("kicked_no_pollers", 0); p->kicked_without_pollers = true; @@ -924,20 +950,20 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); } -static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) { - /* only one set_ready can be active at once (but there may be a racing - notify_on) */ +static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_pollset *notifier) { + /* Need the fd->mu since we might be racing with fd_notify_on_read */ gpr_mu_lock(&fd->mu); - set_ready_locked(exec_ctx, fd, st); + set_ready_locked(exec_ctx, fd, &fd->read_closure); + fd->read_notifier_pollset = notifier; gpr_mu_unlock(&fd->mu); } -static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - set_ready(exec_ctx, fd, &fd->read_closure); -} - static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - set_ready(exec_ctx, fd, &fd->write_closure); + /* Need the fd->mu since we might be racing with fd_notify_on_write */ + gpr_mu_lock(&fd->mu); + set_ready_locked(exec_ctx, fd, &fd->write_closure); + gpr_mu_unlock(&fd->mu); } #define GRPC_EPOLL_MAX_EVENTS 1000 @@ -1007,7 +1033,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); } else { if (read_ev || cancel) { - fd_become_readable(exec_ctx, fd); + fd_become_readable(exec_ctx, fd, pollset); } if (write_ev || cancel) { fd_become_writable(exec_ctx, fd); @@ -1109,9 +1135,9 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->kicked_without_pollers = 0; } else if (!pollset->shutting_down) { sigemptyset(&new_mask); - sigaddset(&new_mask, SIGUSR1); + sigaddset(&new_mask, grpc_poller_kick_signum); pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask); - sigdelset(&orig_mask, SIGUSR1); + sigdelset(&orig_mask, grpc_poller_kick_signum); push_front_worker(pollset, &worker); @@ -1350,6 +1376,7 @@ static const grpc_event_engine_vtable vtable = { .fd_shutdown = fd_shutdown, .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, + .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -1380,4 +1407,9 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return &vtable; } -#endif +#else /* defined(GPR_LINUX_EPOLL) */ +/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return + * NULL */ +const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; } + +#endif /* !defined(GPR_LINUX_EPOLL) */ diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index e0c3558a51..2b15967adc 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -63,8 +63,8 @@ typedef struct { } event_engine_factory; static const event_engine_factory g_factories[] = { - {"poll", grpc_init_poll_posix}, {"epoll", grpc_init_epoll_linux}, + {"poll", grpc_init_poll_posix}, {"legacy", grpc_init_poll_and_epoll_posix}, }; -- cgit v1.2.3 From 2e12db9c319bcbdbb2fa570149f88e4b496b558c Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 16 Jun 2016 16:53:59 -0700 Subject: Test polling island merges --- Makefile | 36 +++++ build.yaml | 12 ++ src/core/lib/iomgr/ev_epoll_linux.c | 51 ++++++- src/core/lib/iomgr/ev_epoll_linux.h | 6 + src/core/lib/iomgr/ev_posix.c | 7 + src/core/lib/iomgr/ev_posix.h | 3 + test/core/iomgr/ev_epoll_linux_test.c | 222 +++++++++++++++++++++++++++++++ tools/run_tests/sources_and_headers.json | 16 +++ tools/run_tests/tests.json | 15 +++ 9 files changed, 366 insertions(+), 2 deletions(-) create mode 100644 test/core/iomgr/ev_epoll_linux_test.c (limited to 'src/core/lib/iomgr/ev_posix.c') diff --git a/Makefile b/Makefile index e615704395..825684cc2d 100644 --- a/Makefile +++ b/Makefile @@ -905,6 +905,7 @@ dns_resolver_connectivity_test: $(BINDIR)/$(CONFIG)/dns_resolver_connectivity_te dns_resolver_test: $(BINDIR)/$(CONFIG)/dns_resolver_test dualstack_socket_test: $(BINDIR)/$(CONFIG)/dualstack_socket_test endpoint_pair_test: $(BINDIR)/$(CONFIG)/endpoint_pair_test +ev_epoll_linux_test: $(BINDIR)/$(CONFIG)/ev_epoll_linux_test fd_conservation_posix_test: $(BINDIR)/$(CONFIG)/fd_conservation_posix_test fd_posix_test: $(BINDIR)/$(CONFIG)/fd_posix_test fling_client: $(BINDIR)/$(CONFIG)/fling_client @@ -1242,6 +1243,7 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/dns_resolver_test \ $(BINDIR)/$(CONFIG)/dualstack_socket_test \ $(BINDIR)/$(CONFIG)/endpoint_pair_test \ + $(BINDIR)/$(CONFIG)/ev_epoll_linux_test \ $(BINDIR)/$(CONFIG)/fd_conservation_posix_test \ $(BINDIR)/$(CONFIG)/fd_posix_test \ $(BINDIR)/$(CONFIG)/fling_client \ @@ -1512,6 +1514,8 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/dualstack_socket_test || ( echo test dualstack_socket_test failed ; exit 1 ) $(E) "[RUN] Testing endpoint_pair_test" $(Q) $(BINDIR)/$(CONFIG)/endpoint_pair_test || ( echo test endpoint_pair_test failed ; exit 1 ) + $(E) "[RUN] Testing ev_epoll_linux_test" + $(Q) $(BINDIR)/$(CONFIG)/ev_epoll_linux_test || ( echo test ev_epoll_linux_test failed ; exit 1 ) $(E) "[RUN] Testing fd_conservation_posix_test" $(Q) $(BINDIR)/$(CONFIG)/fd_conservation_posix_test || ( echo test fd_conservation_posix_test failed ; exit 1 ) $(E) "[RUN] Testing fd_posix_test" @@ -7130,6 +7134,38 @@ endif endif +EV_EPOLL_LINUX_TEST_SRC = \ + test/core/iomgr/ev_epoll_linux_test.c \ + +EV_EPOLL_LINUX_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(EV_EPOLL_LINUX_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/ev_epoll_linux_test: openssl_dep_error + +else + + + +$(BINDIR)/$(CONFIG)/ev_epoll_linux_test: $(EV_EPOLL_LINUX_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LD) $(LDFLAGS) $(EV_EPOLL_LINUX_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/ev_epoll_linux_test + +endif + +$(OBJDIR)/$(CONFIG)/test/core/iomgr/ev_epoll_linux_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_ev_epoll_linux_test: $(EV_EPOLL_LINUX_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(EV_EPOLL_LINUX_TEST_OBJS:.o=.dep) +endif +endif + + FD_CONSERVATION_POSIX_TEST_SRC = \ test/core/iomgr/fd_conservation_posix_test.c \ diff --git a/build.yaml b/build.yaml index 7790e0c517..84f4ea521b 100644 --- a/build.yaml +++ b/build.yaml @@ -1407,6 +1407,18 @@ targets: - grpc - gpr_test_util - gpr +- name: ev_epoll_linux_test + build: test + language: c + src: + - test/core/iomgr/ev_epoll_linux_test.c + deps: + - grpc_test_util + - grpc + - gpr_test_util + - gpr + platforms: + - linux - name: fd_conservation_posix_test build: test language: c diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 1fb5947464..ed2c494b78 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -317,8 +317,9 @@ static void polling_island_remove_all_fds_locked(polling_island *pi, if (err < 0 && errno != ENOENT) { /* TODO: sreek - We need a better way to bubble up this error instead of * just logging a message */ - gpr_log(GPR_ERROR, "epoll_ctl deleting fds[%zu]: %d failed with error: %s", - i, pi->fds[i]->fd, strerror(errno)); + gpr_log(GPR_ERROR, + "epoll_ctl deleting fds[%zu]: %d failed with error: %s", i, + pi->fds[i]->fd, strerror(errno)); } if (remove_fd_refs) { @@ -1458,6 +1459,52 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&bag->mu); } +/* Test helper functions + * */ +void *grpc_fd_get_polling_island(grpc_fd *fd) { + polling_island *pi; + + gpr_mu_lock(&fd->pi_mu); + pi = fd->polling_island; + gpr_mu_unlock(&fd->pi_mu); + + return pi; +} + +void *grpc_pollset_get_polling_island(grpc_pollset *ps) { + polling_island *pi; + + gpr_mu_lock(&ps->pi_mu); + pi = ps->polling_island; + gpr_mu_unlock(&ps->pi_mu); + + return pi; +} + +static polling_island *get_polling_island(polling_island *p) { + if (p == NULL) { + return NULL; + } + + polling_island *next; + gpr_mu_lock(&p->mu); + while (p->merged_to != NULL) { + next = p->merged_to; + gpr_mu_unlock(&p->mu); + p = next; + gpr_mu_lock(&p->mu); + } + gpr_mu_unlock(&p->mu); + + return p; +} + +bool grpc_are_polling_islands_equal(void *p, void *q) { + p = get_polling_island(p); + q = get_polling_island(q); + return p == q; +} + /******************************************************************************* * Event engine binding */ diff --git a/src/core/lib/iomgr/ev_epoll_linux.h b/src/core/lib/iomgr/ev_epoll_linux.h index 8c819975a4..7a494aba19 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.h +++ b/src/core/lib/iomgr/ev_epoll_linux.h @@ -38,4 +38,10 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void); +#ifdef GPR_LINUX_EPOLL +void *grpc_fd_get_polling_island(grpc_fd *fd); +void *grpc_pollset_get_polling_island(grpc_pollset *ps); +bool grpc_are_polling_islands_equal(void *p, void *q); +#endif /* defined(GPR_LINUX_EPOLL) */ + #endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL_LINUX_H */ diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 2b15967adc..5b20600a6f 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -54,6 +54,7 @@ grpc_poll_function_type grpc_poll_function = poll; static const grpc_event_engine_vtable *g_event_engine; +static const char* g_poll_strategy_name = NULL; typedef const grpc_event_engine_vtable *(*event_engine_factory_fn)(void); @@ -101,6 +102,7 @@ static void try_engine(const char *engine) { for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) { if (is(engine, g_factories[i].name)) { if ((g_event_engine = g_factories[i].factory())) { + g_poll_strategy_name = g_factories[i].name; gpr_log(GPR_DEBUG, "Using polling engine: %s", g_factories[i].name); return; } @@ -108,6 +110,11 @@ static void try_engine(const char *engine) { } } +/* Call this only after calling grpc_event_engine_init() */ +const char *grpc_get_poll_strategy_name() { + return g_poll_strategy_name; +} + void grpc_event_engine_init(void) { char *s = gpr_getenv("GRPC_POLL_STRATEGY"); if (s == NULL) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 344bf63438..3ed5a5f956 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -98,6 +98,9 @@ typedef struct grpc_event_engine_vtable { void grpc_event_engine_init(void); void grpc_event_engine_shutdown(void); +/* Return the name of the poll strategy */ +const char* grpc_get_poll_strategy_name(); + /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. This takes ownership of closing fd. */ diff --git a/test/core/iomgr/ev_epoll_linux_test.c b/test/core/iomgr/ev_epoll_linux_test.c new file mode 100644 index 0000000000..51da15faa7 --- /dev/null +++ b/test/core/iomgr/ev_epoll_linux_test.c @@ -0,0 +1,222 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/ev_epoll_linux.h" +#include "src/core/lib/iomgr/ev_posix.h" + +#include +#include +#include +#include + +#include +#include + +#include "src/core/lib/iomgr/iomgr.h" +#include "test/core/util/test_config.h" + +typedef struct test_pollset { + grpc_pollset *pollset; + gpr_mu *mu; +} test_pollset; + +typedef struct test_fd { + int inner_fd; + grpc_fd *fd; +} test_fd; + +static void test_fd_init(test_fd *fds, int num_fds) { + int i; + for (i = 0; i < num_fds; i++) { + fds[i].inner_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + fds[i].fd = grpc_fd_create(fds[i].inner_fd, "test_fd"); + } +} + +static void test_fd_cleanup(grpc_exec_ctx *exec_ctx, test_fd *fds, + int num_fds) { + int release_fd; + int i; + + for (i = 0; i < num_fds; i++) { + grpc_fd_shutdown(exec_ctx, fds[i].fd); + grpc_exec_ctx_flush(exec_ctx); + + grpc_fd_orphan(exec_ctx, fds[i].fd, NULL, &release_fd, "test_fd_cleanup"); + grpc_exec_ctx_flush(exec_ctx); + + GPR_ASSERT(release_fd == fds[i].inner_fd); + close(fds[i].inner_fd); + } +} + +static void test_pollset_init(test_pollset *pollsets, int num_pollsets) { + int i; + for (i = 0; i < num_pollsets; i++) { + pollsets[i].pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(pollsets[i].pollset, &pollsets[i].mu); + } +} + +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { + grpc_pollset_destroy(p); +} + +static void test_pollset_cleanup(grpc_exec_ctx *exec_ctx, + test_pollset *pollsets, int num_pollsets) { + grpc_closure destroyed; + int i; + + for (i = 0; i < num_pollsets; i++) { + grpc_closure_init(&destroyed, destroy_pollset, pollsets[i].pollset); + grpc_pollset_shutdown(exec_ctx, pollsets[i].pollset, &destroyed); + + grpc_exec_ctx_flush(exec_ctx); + gpr_free(pollsets[i].pollset); + } +} + +#define NUM_FDS 8 +#define NUM_POLLSETS 4 +/* + * Cases to test: + * case 1) Polling islands of both fd and pollset are NULL + * case 2) Polling island of fd is NULL but that of pollset is not-NULL + * case 3) Polling island of fd is not-NULL but that of pollset is NULL + * case 4) Polling islands of both fd and pollset are not-NULL and: + * case 4.1) Polling islands of fd and pollset are equal + * case 4.2) Polling islands of fd and pollset are NOT-equal (This results + * in a merge) + * */ +static void test_add_fd_to_pollset() { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + test_fd fds[NUM_FDS]; + test_pollset pollsets[NUM_POLLSETS]; + void *expected_pi = NULL; + int i; + + test_fd_init(fds, NUM_FDS); + test_pollset_init(pollsets, NUM_POLLSETS); + + /*Step 1. + * Create three polling islands (This will exercise test case 1 and 2) with + * the following configuration: + * polling island 0 = { fds:0,1,2, pollsets:0} + * polling island 1 = { fds:3,4, pollsets:1} + * polling island 2 = { fds:5,6,7 pollsets:2} + * + *Step 2. + * Add pollset 3 to polling island 0 (by adding fds 0 and 1 to pollset 3) + * (This will exercise test cases 3 and 4.1). The configuration becomes: + * polling island 0 = { fds:0,1,2, pollsets:0,3} <<< pollset 3 added here + * polling island 1 = { fds:3,4, pollsets:1} + * polling island 2 = { fds:5,6,7 pollsets:2} + * + *Step 3. + * Merge polling islands 0 and 1 by adding fd 0 to pollset 1 (This will + * exercise test case 4.2). The configuration becomes: + * polling island (merged) = {fds: 0,1,2,3,4, pollsets: 0,1,3} + * polling island 2 = {fds: 5,6,7 pollsets: 2} + * + *Step 4. + * Finally do one more merge by adding fd 3 to pollset 2. + * polling island (merged) = {fds: 0,1,2,3,4,5,6,7, pollsets: 0,1,2,3} + */ + + /* == Step 1 == */ + for (i = 0; i <= 2; i++) { + grpc_pollset_add_fd(&exec_ctx, pollsets[0].pollset, fds[i].fd); + grpc_exec_ctx_flush(&exec_ctx); + } + + for (i = 3; i <= 4; i++) { + grpc_pollset_add_fd(&exec_ctx, pollsets[1].pollset, fds[i].fd); + grpc_exec_ctx_flush(&exec_ctx); + } + + for (i = 5; i <= 7; i++) { + grpc_pollset_add_fd(&exec_ctx, pollsets[2].pollset, fds[i].fd); + grpc_exec_ctx_flush(&exec_ctx); + } + + /* == Step 2 == */ + for (i = 0; i <= 1; i++) { + grpc_pollset_add_fd(&exec_ctx, pollsets[3].pollset, fds[i].fd); + grpc_exec_ctx_flush(&exec_ctx); + } + + /* == Step 3 == */ + grpc_pollset_add_fd(&exec_ctx, pollsets[1].pollset, fds[0].fd); + grpc_exec_ctx_flush(&exec_ctx); + + /* == Step 4 == */ + grpc_pollset_add_fd(&exec_ctx, pollsets[2].pollset, fds[3].fd); + grpc_exec_ctx_flush(&exec_ctx); + + /* All polling islands are merged at this point */ + + /* Compare Fd:0's polling island with that of all other Fds */ + expected_pi = grpc_fd_get_polling_island(fds[0].fd); + for (i = 1; i < NUM_FDS; i++) { + GPR_ASSERT(grpc_are_polling_islands_equal( + expected_pi, grpc_fd_get_polling_island(fds[i].fd))); + } + + /* Compare Fd:0's polling island with that of all other pollsets */ + for (i = 0; i < NUM_POLLSETS; i++) { + GPR_ASSERT(grpc_are_polling_islands_equal( + expected_pi, grpc_pollset_get_polling_island(pollsets[i].pollset))); + } + + test_fd_cleanup(&exec_ctx, fds, NUM_FDS); + test_pollset_cleanup(&exec_ctx, pollsets, NUM_POLLSETS); + grpc_exec_ctx_finish(&exec_ctx); +} + +int main(int argc, char **argv) { + const char *poll_strategy = NULL; + grpc_test_init(argc, argv); + grpc_iomgr_init(); + + poll_strategy = grpc_get_poll_strategy_name(); + if (poll_strategy != NULL && strcmp(poll_strategy, "epoll") == 0) { + test_add_fd_to_pollset(); + } else { + gpr_log(GPR_INFO, + "Skipping the test. The test is only relevant for 'epoll' " + "strategy. and the current strategy is: '%s'", + poll_strategy); + } + grpc_iomgr_shutdown(); + return 0; +} diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index e8ff61dc3f..e9df72e43a 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -315,6 +315,22 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", + "grpc_test_util" + ], + "headers": [], + "language": "c", + "name": "ev_epoll_linux_test", + "src": [ + "test/core/iomgr/ev_epoll_linux_test.c" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 5a84a41b63..ba661840da 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -377,6 +377,21 @@ "windows" ] }, + { + "args": [], + "ci_platforms": [ + "linux" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "flaky": false, + "gtest": false, + "language": "c", + "name": "ev_epoll_linux_test", + "platforms": [ + "linux" + ] + }, { "args": [], "ci_platforms": [ -- cgit v1.2.3 From 229533b1e68c4a4b8a67148f7fe25543584131f6 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 21 Jun 2016 20:42:52 -0700 Subject: Remove pollset->pi_mu since it is redundant. Also do not get polling island lock in the fast-path --- src/core/lib/iomgr/ev_epoll_linux.c | 82 ++++++++++++++++++++----------------- src/core/lib/iomgr/ev_posix.c | 6 +-- 2 files changed, 46 insertions(+), 42 deletions(-) (limited to 'src/core/lib/iomgr/ev_posix.c') diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 3a774a8876..6464d3ba34 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -207,12 +207,7 @@ struct grpc_pollset { bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */ grpc_closure *shutdown_done; /* Called after after shutdown is complete */ - /* The polling island to which this pollset belongs to and the mutex - protecting the field */ - /* TODO: sreek: This lock might actually be adding more overhead to the - critical path (i.e pollset_work() function). Consider removing this lock - and just using the overall pollset lock */ - gpr_mu pi_mu; + /* The polling island to which this pollset belongs to */ struct polling_island *polling_island; }; @@ -488,31 +483,47 @@ static void polling_island_delete(polling_island *pi) { gpr_mu_unlock(&g_pi_freelist_mu); } +/* Attempts to gets the last polling island in the linked list (liked by the + * 'merged_to' field). Since this does not lock the polling island, there are no + * guarantees that the island returned is the last island */ +static polling_island *polling_island_maybe_get_latest(polling_island *pi) { + polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); + while (next != NULL) { + pi = next; + next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); + } + + return pi; +} + /* Gets the lock on the *latest* polling island i.e the last polling island in - the linked list (linked by 'merged_to' link). Call gpr_mu_unlock on the + the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the returned polling island's mu. Usage: To lock/unlock polling island "pi", do the following: polling_island *pi_latest = polling_island_lock(pi); ... ... critical section .. ... - gpr_mu_unlock(&pi_latest->mu); //NOTE: use pi_latest->mu. NOT pi->mu */ -polling_island *polling_island_lock(polling_island *pi) { + gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */ +static polling_island *polling_island_lock(polling_island *pi) { polling_island *next = NULL; + while (true) { next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); if (next == NULL) { - /* pi is the last node in the linked list. Get the lock and check again - (under the pi->mu lock) that pi is still the last node (because a merge - may have happend after the (next == NULL) check above and before - getting the pi->mu lock. - If pi is the last node, we are done. If not, unlock and continue - traversing the list */ + /* Looks like 'pi' is the last node in the linked list but unless we check + this by holding the pi->mu lock, we cannot be sure (i.e without the + pi->mu lock, we don't prevent island merges). + To be absolutely sure, check once more by holding the pi->mu lock */ gpr_mu_lock(&pi->mu); next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); if (next == NULL) { + /* pi is infact the last node and we have the pi->mu lock. we're done */ break; } + + /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu + * isn't the lock we are interested in. Continue traversing the list */ gpr_mu_unlock(&pi->mu); } @@ -526,11 +537,11 @@ polling_island *polling_island_lock(polling_island *pi) { This function is needed because calling the following block of code to obtain locks on polling islands (*p and *q) is prone to deadlocks. { - polling_island_lock(*p); - polling_island_lock(*q); + polling_island_lock(*p, true); + polling_island_lock(*q, true); } - Usage/exmaple: + Usage/example: polling_island *p1; polling_island *p2; .. @@ -551,7 +562,7 @@ polling_island *polling_island_lock(polling_island *pi) { } */ -void polling_island_lock_pair(polling_island **p, polling_island **q) { +static void polling_island_lock_pair(polling_island **p, polling_island **q) { polling_island *pi_1 = *p; polling_island *pi_2 = *q; polling_island *next_1 = NULL; @@ -611,7 +622,8 @@ void polling_island_lock_pair(polling_island **p, polling_island **q) { *q = pi_2; } -polling_island *polling_island_merge(polling_island *p, polling_island *q) { +static polling_island *polling_island_merge(polling_island *p, + polling_island *q) { /* Get locks on both the polling islands */ polling_island_lock_pair(&p, &q); @@ -1124,7 +1136,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->finish_shutdown_called = false; pollset->shutdown_done = NULL; - gpr_mu_init(&pollset->pi_mu); pollset->polling_island = NULL; } @@ -1170,12 +1181,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { } static void pollset_release_polling_island(grpc_pollset *ps, char *reason) { - gpr_mu_lock(&ps->pi_mu); if (ps->polling_island != NULL) { PI_UNREF(ps->polling_island, reason); } ps->polling_island = NULL; - gpr_mu_unlock(&ps->pi_mu); } static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, @@ -1215,7 +1224,6 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * here */ static void pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(!pollset_has_workers(pollset)); - gpr_mu_destroy(&pollset->pi_mu); gpr_mu_destroy(&pollset->mu); } @@ -1250,22 +1258,25 @@ static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the latest polling island pointed by pollset->polling_island. - Acquire the following locks: - - pollset->mu (which we already have) - - pollset->pi_mu - - pollset->polling_island lock */ - gpr_mu_lock(&pollset->pi_mu); + + Since epoll_fd is immutable, we can read it without obtaining the polling + island lock. There is however a possibility that the polling island (from + which we got the epoll_fd) got merged with another island while we are + in this function. This is still okay because in such a case, we will wakeup + right-away from epoll_wait() and pick up the latest polling_island the next + this function (i.e pollset_work_and_unlock()) is called. + */ if (pollset->polling_island == NULL) { pollset->polling_island = polling_island_create(NULL); PI_ADD_REF(pollset->polling_island, "ps"); } - pi = polling_island_lock(pollset->polling_island); + pi = polling_island_maybe_get_latest(pollset->polling_island); epoll_fd = pi->epoll_fd; /* Update the pollset->polling_island since the island being pointed by - pollset->polling_island may not be the latest (i.e pi) */ + pollset->polling_island maybe older than the one pointed by pi) */ if (pollset->polling_island != pi) { /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the polling island to be deleted */ @@ -1278,9 +1289,6 @@ static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, the epoll_fd won't be closed) while we are are doing an epoll_wait() on the epoll_fd */ PI_ADD_REF(pi, "ps_work"); - - gpr_mu_unlock(&pi->mu); - gpr_mu_unlock(&pollset->pi_mu); gpr_mu_unlock(&pollset->mu); do { @@ -1413,7 +1421,6 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); - gpr_mu_lock(&pollset->pi_mu); gpr_mu_lock(&fd->pi_mu); polling_island *pi_new = NULL; @@ -1465,7 +1472,6 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } gpr_mu_unlock(&fd->pi_mu); - gpr_mu_unlock(&pollset->pi_mu); gpr_mu_unlock(&pollset->mu); } @@ -1627,9 +1633,9 @@ void *grpc_fd_get_polling_island(grpc_fd *fd) { void *grpc_pollset_get_polling_island(grpc_pollset *ps) { polling_island *pi; - gpr_mu_lock(&ps->pi_mu); + gpr_mu_lock(&ps->mu); pi = ps->polling_island; - gpr_mu_unlock(&ps->pi_mu); + gpr_mu_unlock(&ps->mu); return pi; } diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 4cdd13bbdb..a3c1e9db9a 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -54,7 +54,7 @@ grpc_poll_function_type grpc_poll_function = poll; static const grpc_event_engine_vtable *g_event_engine; -static const char* g_poll_strategy_name = NULL; +static const char *g_poll_strategy_name = NULL; typedef const grpc_event_engine_vtable *(*event_engine_factory_fn)(void); @@ -111,9 +111,7 @@ static void try_engine(const char *engine) { } /* Call this only after calling grpc_event_engine_init() */ -const char *grpc_get_poll_strategy_name() { - return g_poll_strategy_name; -} +const char *grpc_get_poll_strategy_name() { return g_poll_strategy_name; } void grpc_event_engine_init(void) { char *s = gpr_getenv("GRPC_POLL_STRATEGY"); -- cgit v1.2.3