aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-30 14:49:12 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-30 14:49:12 -0700
commitc40060493d62f75c41dd9896ce68a3b2860f1180 (patch)
treea61342256fa1541ae58f1138b6a139cc7a72bedf /src/core/iomgr
parent1191e218f705055f76d7344ad3acbef241d2f378 (diff)
parente97c9b4d86206ae567ec1ef2c8f15fda2ca469f7 (diff)
Merge branch 'plucking-hell' into primary-goat-whisperer
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/fd_posix.c18
-rw-r--r--src/core/iomgr/fd_posix.h9
-rw-r--r--src/core/iomgr/pollset.h23
-rw-r--r--src/core/iomgr/pollset_kick_posix.c168
-rw-r--r--src/core/iomgr/pollset_kick_posix.h93
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c162
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c123
-rw-r--r--src/core/iomgr/pollset_posix.c145
-rw-r--r--src/core/iomgr/pollset_posix.h33
-rw-r--r--src/core/iomgr/wakeup_fd_eventfd.c8
-rw-r--r--src/core/iomgr/wakeup_fd_pipe.c12
-rw-r--r--src/core/iomgr/wakeup_fd_posix.c10
-rw-r--r--src/core/iomgr/wakeup_fd_posix.h20
13 files changed, 320 insertions, 504 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index a2df838d4a..4fb6b46ea6 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -168,13 +168,19 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
+static void pollset_kick_locked(grpc_pollset *pollset) {
+ gpr_mu_lock(GRPC_POLLSET_MU(pollset));
+ grpc_pollset_kick(pollset, NULL);
+ gpr_mu_unlock(GRPC_POLLSET_MU(pollset));
+}
+
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
- grpc_pollset_force_kick(fd->inactive_watcher_root.next->pollset);
+ pollset_kick_locked(fd->inactive_watcher_root.next->pollset);
} else if (fd->read_watcher) {
- grpc_pollset_force_kick(fd->read_watcher->pollset);
+ pollset_kick_locked(fd->read_watcher->pollset);
} else if (fd->write_watcher) {
- grpc_pollset_force_kick(fd->write_watcher->pollset);
+ pollset_kick_locked(fd->write_watcher->pollset);
}
}
@@ -188,13 +194,13 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
grpc_fd_watcher *watcher;
for (watcher = fd->inactive_watcher_root.next;
watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
- grpc_pollset_force_kick(watcher->pollset);
+ pollset_kick_locked(watcher->pollset);
}
if (fd->read_watcher) {
- grpc_pollset_force_kick(fd->read_watcher->pollset);
+ pollset_kick_locked(fd->read_watcher->pollset);
}
if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
- grpc_pollset_force_kick(fd->write_watcher->pollset);
+ pollset_kick_locked(fd->write_watcher->pollset);
}
}
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 4e8e267ffd..835e9b339a 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -109,7 +109,8 @@ grpc_fd *grpc_fd_create(int fd, const char *name);
on_done is called when the underlying file descriptor is definitely close()d.
If on_done is NULL, no callback will be made.
Requires: *fd initialized; no outstanding notify_on_read or
- notify_on_write. */
+ notify_on_write.
+ MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
const char *reason);
@@ -122,11 +123,13 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
i.e. a combination of read_mask and write_mask determined by the fd's current
interest in said events.
Polling strategies that do not need to alter their behavior depending on the
- fd's current interest (such as epoll) do not need to call this function. */
+ fd's current interest (such as epoll) do not need to call this function.
+ MUST NOT be called with a pollset lock taken */
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
gpr_uint32 read_mask, gpr_uint32 write_mask,
grpc_fd_watcher *rec);
-/* Complete polling previously started with grpc_fd_begin_poll */
+/* Complete polling previously started with grpc_fd_begin_poll
+ MUST NOT be called with a pollset lock taken */
void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write);
/* Return 1 if this fd is orphaned, 0 otherwise */
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index c40188b3c9..c474e4dbf1 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -37,6 +37,8 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
+#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
+
/* A grpc_pollset is a set of file descriptors that a higher level item is
interested in. For example:
- a server will typically keep a pollset containing all connected channels,
@@ -63,13 +65,24 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
descriptors.
Requires GRPC_POLLSET_MU(pollset) locked.
May unlock GRPC_POLLSET_MU(pollset) during its execution.
-
+
+ worker is a (platform-specific) handle that can be used to wake up
+ from grpc_pollset_work before any events are received and before the timeout
+ has expired. It is both initialized and destroyed by grpc_pollset_work.
+ Initialization of worker is guaranteed to occur BEFORE the
+ GRPC_POLLSET_MU(pollset) is released for the first time by
+ grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
+ not be released by grpc_pollset_work AFTER worker has been destroyed.
+
Returns true if some work has been done, and false if the deadline
- got attained. */
-int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline);
+ expired. */
+int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline);
/* Break one polling thread out of polling work for this pollset.
- Requires GRPC_POLLSET_MU(pollset) locked. */
-void grpc_pollset_kick(grpc_pollset *pollset);
+ If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.
+ Otherwise, if specific_worker is non-NULL, then kick that worker. */
+void grpc_pollset_kick(grpc_pollset *pollset,
+ grpc_pollset_worker *specific_worker);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */
diff --git a/src/core/iomgr/pollset_kick_posix.c b/src/core/iomgr/pollset_kick_posix.c
deleted file mode 100644
index 51021784f2..0000000000
--- a/src/core/iomgr/pollset_kick_posix.c
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_POSIX_SOCKET
-#include "src/core/iomgr/pollset_kick_posix.h"
-
-#include <errno.h>
-#include <string.h>
-#include <unistd.h>
-
-#include "src/core/iomgr/socket_utils_posix.h"
-#include "src/core/iomgr/wakeup_fd_posix.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-/* This implementation is based on a freelist of wakeup fds, with extra logic to
- * handle kicks while there is no attached fd. */
-
-/* TODO(klempner): Autosize this, and consider providing a way to disable the
- * cap entirely on systems with large fd limits */
-#define GRPC_MAX_CACHED_WFDS 50
-
-static grpc_kick_fd_info *fd_freelist = NULL;
-static int fd_freelist_count = 0;
-static gpr_mu fd_freelist_mu;
-
-static grpc_kick_fd_info *allocate_wfd(void) {
- grpc_kick_fd_info *info = NULL;
- gpr_mu_lock(&fd_freelist_mu);
- if (fd_freelist != NULL) {
- info = fd_freelist;
- fd_freelist = fd_freelist->next;
- --fd_freelist_count;
- }
- gpr_mu_unlock(&fd_freelist_mu);
- if (info == NULL) {
- info = gpr_malloc(sizeof(*info));
- grpc_wakeup_fd_create(&info->wakeup_fd);
- info->next = NULL;
- }
- return info;
-}
-
-static void destroy_wfd(grpc_kick_fd_info *wfd) {
- grpc_wakeup_fd_destroy(&wfd->wakeup_fd);
- gpr_free(wfd);
-}
-
-static void free_wfd(grpc_kick_fd_info *fd_info) {
- gpr_mu_lock(&fd_freelist_mu);
- if (fd_freelist_count < GRPC_MAX_CACHED_WFDS) {
- fd_info->next = fd_freelist;
- fd_freelist = fd_info;
- fd_freelist_count++;
- fd_info = NULL;
- }
- gpr_mu_unlock(&fd_freelist_mu);
-
- if (fd_info) {
- destroy_wfd(fd_info);
- }
-}
-
-void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state) {
- gpr_mu_init(&kick_state->mu);
- kick_state->kicked = 0;
- kick_state->fd_list.next = kick_state->fd_list.prev = &kick_state->fd_list;
-}
-
-void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) {
- gpr_mu_destroy(&kick_state->mu);
- GPR_ASSERT(kick_state->fd_list.next == &kick_state->fd_list);
-}
-
-grpc_kick_fd_info *grpc_pollset_kick_pre_poll(
- grpc_pollset_kick_state *kick_state) {
- grpc_kick_fd_info *fd_info;
- gpr_mu_lock(&kick_state->mu);
- if (kick_state->kicked) {
- kick_state->kicked = 0;
- gpr_mu_unlock(&kick_state->mu);
- return NULL;
- }
- fd_info = allocate_wfd();
- fd_info->next = &kick_state->fd_list;
- fd_info->prev = fd_info->next->prev;
- fd_info->next->prev = fd_info->prev->next = fd_info;
- gpr_mu_unlock(&kick_state->mu);
- return fd_info;
-}
-
-void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state,
- grpc_kick_fd_info *fd_info) {
- grpc_wakeup_fd_consume_wakeup(&fd_info->wakeup_fd);
-}
-
-void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state,
- grpc_kick_fd_info *fd_info) {
- gpr_mu_lock(&kick_state->mu);
- fd_info->next->prev = fd_info->prev;
- fd_info->prev->next = fd_info->next;
- free_wfd(fd_info);
- gpr_mu_unlock(&kick_state->mu);
-}
-
-void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) {
- gpr_mu_lock(&kick_state->mu);
- if (kick_state->fd_list.next != &kick_state->fd_list) {
- grpc_wakeup_fd_wakeup(&kick_state->fd_list.next->wakeup_fd);
- } else {
- kick_state->kicked = 1;
- }
- gpr_mu_unlock(&kick_state->mu);
-}
-
-void grpc_pollset_kick_global_init_fallback_fd(void) {
- gpr_mu_init(&fd_freelist_mu);
- grpc_wakeup_fd_global_init_force_fallback();
-}
-
-void grpc_pollset_kick_global_init(void) {
- gpr_mu_init(&fd_freelist_mu);
- grpc_wakeup_fd_global_init();
-}
-
-void grpc_pollset_kick_global_destroy(void) {
- while (fd_freelist != NULL) {
- grpc_kick_fd_info *current = fd_freelist;
- fd_freelist = fd_freelist->next;
- destroy_wfd(current);
- }
- grpc_wakeup_fd_global_destroy();
- gpr_mu_destroy(&fd_freelist_mu);
-}
-
-#endif /* GPR_POSIX_SOCKET */
diff --git a/src/core/iomgr/pollset_kick_posix.h b/src/core/iomgr/pollset_kick_posix.h
deleted file mode 100644
index 77e32a8d51..0000000000
--- a/src/core/iomgr/pollset_kick_posix.h
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H
-#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H
-
-#include "src/core/iomgr/wakeup_fd_posix.h"
-#include <grpc/support/sync.h>
-
-/* pollset kicking allows breaking a thread out of polling work for
- a given pollset.
- writing a byte to a pipe is used as a posix-ly portable base
- mechanism, and eventfds are utilized on Linux for better performance. */
-
-typedef struct grpc_kick_fd_info {
- grpc_wakeup_fd_info wakeup_fd;
- /* used for polling list and free list */
- struct grpc_kick_fd_info *next;
- /* only used when polling */
- struct grpc_kick_fd_info *prev;
-} grpc_kick_fd_info;
-
-typedef struct grpc_pollset_kick_state {
- gpr_mu mu;
- int kicked;
- struct grpc_kick_fd_info fd_list;
-} grpc_pollset_kick_state;
-
-#define GRPC_POLLSET_KICK_GET_FD(kick_fd_info) \
- GRPC_WAKEUP_FD_GET_READ_FD(&(kick_fd_info)->wakeup_fd)
-
-/* This is an abstraction around the typical pipe mechanism for waking up a
- thread sitting in a poll() style call. */
-
-void grpc_pollset_kick_global_init(void);
-void grpc_pollset_kick_global_destroy(void);
-
-void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state);
-void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state);
-
-/* Guarantees a pure posix implementation rather than a specialized one, if
- * applicable. Intended for testing. */
-void grpc_pollset_kick_global_init_fallback_fd(void);
-
-/* Must be called before entering poll(). If return value is NULL, this consumed
- an existing kick. Otherwise the return value is an FD to add to the poll set.
- */
-grpc_kick_fd_info *grpc_pollset_kick_pre_poll(
- grpc_pollset_kick_state *kick_state);
-
-/* Consume an existing kick. Must be called after poll returns that the fd was
- readable, and before calling kick_post_poll. */
-void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state,
- grpc_kick_fd_info *fd_info);
-
-/* Must be called after pre_poll, and after consume if applicable */
-void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state,
- grpc_kick_fd_info *fd_info);
-
-/* Actually kick */
-void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state);
-
-#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H */
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index d697b59e4c..1320c64579 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -36,6 +36,7 @@
#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
#include <errno.h>
+#include <poll.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
@@ -44,23 +45,28 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+typedef struct wakeup_fd_hdl {
+ grpc_wakeup_fd wakeup_fd;
+ struct wakeup_fd_hdl *next;
+} wakeup_fd_hdl;
+
+typedef struct {
+ grpc_pollset *pollset;
+ grpc_fd *fd;
+ grpc_iomgr_closure closure;
+} delayed_add;
+
typedef struct {
int epoll_fd;
- grpc_wakeup_fd_info wakeup_fd;
+ wakeup_fd_hdl *free_wakeup_fds;
} pollset_hdr;
-static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
- grpc_fd *fd,
- int and_unlock_pollset) {
+static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
grpc_fd_watcher watcher;
- if (and_unlock_pollset) {
- gpr_mu_unlock(&pollset->mu);
- }
-
/* We pretend to be polling whilst adding an fd to keep the fd from being
closed during the add. This may result in a spurious wakeup being assigned
to this pollset whilst adding, but that should be benign. */
@@ -80,6 +86,52 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
grpc_fd_end_poll(&watcher, 0, 0);
}
+static void perform_delayed_add(void *arg, int iomgr_status) {
+ delayed_add *da = arg;
+ int do_shutdown_cb = 0;
+
+ if (!grpc_fd_is_orphaned(da->fd)) {
+ finally_add_fd(da->pollset, da->fd);
+ }
+
+ gpr_mu_lock(&da->pollset->mu);
+ da->pollset->in_flight_cbs--;
+ if (da->pollset->shutting_down) {
+ /* We don't care about this pollset anymore. */
+ if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
+ GPR_ASSERT(!grpc_pollset_has_workers(da->pollset));
+ da->pollset->called_shutdown = 1;
+ do_shutdown_cb = 1;
+ }
+ }
+ gpr_mu_unlock(&da->pollset->mu);
+
+ GRPC_FD_UNREF(da->fd, "delayed_add");
+
+ if (do_shutdown_cb) {
+ da->pollset->shutdown_done_cb(da->pollset->shutdown_done_arg);
+ }
+
+ gpr_free(da);
+}
+
+static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
+ grpc_fd *fd,
+ int and_unlock_pollset) {
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ finally_add_fd(pollset, fd);
+ } else {
+ delayed_add *da = gpr_malloc(sizeof(*da));
+ da->pollset = pollset;
+ da->fd = fd;
+ GRPC_FD_REF(fd, "delayed_add");
+ grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
+ pollset->in_flight_cbs++;
+ grpc_iomgr_add_callback(&da->closure);
+ }
+}
+
static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
grpc_fd *fd,
int and_unlock_pollset) {
@@ -103,12 +155,14 @@ static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
#define GRPC_EPOLL_MAX_EVENTS 1000
static void multipoll_with_epoll_pollset_maybe_work(
- grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
+ grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline,
+ gpr_timespec now, int allow_synchronous_callback) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int ep_rv;
+ int poll_rv;
pollset_hdr *h = pollset->data.ptr;
int timeout_ms;
+ struct pollfd pfds[2];
/* If you want to ignore epoll's ability to sanely handle parallel pollers,
* for a more apples-to-apples performance comparison with poll, add a
@@ -116,43 +170,58 @@ static void multipoll_with_epoll_pollset_maybe_work(
* here.
*/
- pollset->counter += 1;
gpr_mu_unlock(&pollset->mu);
timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
- do {
- ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
- if (ep_rv < 0) {
- if (errno != EINTR) {
- gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
- }
- } else {
- int i;
- for (i = 0; i < ep_rv; ++i) {
- if (ep_ev[i].data.ptr == 0) {
- grpc_wakeup_fd_consume_wakeup(&h->wakeup_fd);
- } else {
- grpc_fd *fd = ep_ev[i].data.ptr;
- /* TODO(klempner): We might want to consider making err and pri
- * separate events */
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write = ep_ev[i].events & EPOLLOUT;
- if (read || cancel) {
- grpc_fd_become_readable(fd, allow_synchronous_callback);
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfds[0].events = POLLIN;
+ pfds[0].revents = 0;
+ pfds[1].fd = h->epoll_fd;
+ pfds[1].events = POLLIN;
+ pfds[1].revents = 0;
+
+ poll_rv = poll(pfds, 2, timeout_ms);
+
+ if (poll_rv < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ }
+ } else if (poll_rv == 0) {
+ /* do nothing */
+ } else {
+ if (pfds[0].revents) {
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
+ }
+ if (pfds[1].revents) {
+ do {
+ ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
+ if (ep_rv < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
}
- if (write || cancel) {
- grpc_fd_become_writable(fd, allow_synchronous_callback);
+ } else {
+ int i;
+ for (i = 0; i < ep_rv; ++i) {
+ grpc_fd *fd = ep_ev[i].data.ptr;
+ /* TODO(klempner): We might want to consider making err and pri
+ * separate events */
+ int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+ int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+ int write = ep_ev[i].events & EPOLLOUT;
+ if (read || cancel) {
+ grpc_fd_become_readable(fd, allow_synchronous_callback);
+ }
+ if (write || cancel) {
+ grpc_fd_become_writable(fd, allow_synchronous_callback);
+ }
}
}
- }
+ } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
}
- timeout_ms = 0;
- } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
+ }
gpr_mu_lock(&pollset->mu);
- pollset->counter -= 1;
}
static void multipoll_with_epoll_pollset_finish_shutdown(
@@ -160,21 +229,14 @@ static void multipoll_with_epoll_pollset_finish_shutdown(
static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
- grpc_wakeup_fd_destroy(&h->wakeup_fd);
close(h->epoll_fd);
gpr_free(h);
}
-static void epoll_kick(grpc_pollset *pollset) {
- pollset_hdr *h = pollset->data.ptr;
- grpc_wakeup_fd_wakeup(&h->wakeup_fd);
-}
-
static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
multipoll_with_epoll_pollset_add_fd,
multipoll_with_epoll_pollset_del_fd,
multipoll_with_epoll_pollset_maybe_work,
- epoll_kick,
multipoll_with_epoll_pollset_finish_shutdown,
multipoll_with_epoll_pollset_destroy};
@@ -182,8 +244,6 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
size_t nfds) {
size_t i;
pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
- struct epoll_event ev;
- int err;
pollset->vtable = &multipoll_with_epoll_pollset;
pollset->data.ptr = h;
@@ -196,16 +256,6 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
for (i = 0; i < nfds; i++) {
multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0);
}
-
- grpc_wakeup_fd_create(&h->wakeup_fd);
- ev.events = EPOLLIN;
- ev.data.ptr = 0;
- err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD,
- GRPC_WAKEUP_FD_GET_READ_FD(&h->wakeup_fd), &ev);
- if (err < 0) {
- gpr_log(GPR_ERROR, "Wakeup fd epoll_ctl failed: %s", strerror(errno));
- abort();
- }
}
grpc_platform_become_multipoller_type grpc_platform_become_multipoller =
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 0084e83953..1249b1b64a 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -53,12 +53,6 @@ typedef struct {
size_t fd_count;
size_t fd_capacity;
grpc_fd **fds;
- /* fds being polled by the current poller: parallel arrays of pollfd, and
- a grpc_fd_watcher */
- size_t pfd_count;
- size_t pfd_capacity;
- grpc_fd_watcher *watchers;
- struct pollfd *pfds;
/* fds that have been removed from the pollset explicitly */
size_t del_count;
size_t del_capacity;
@@ -102,80 +96,60 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset,
}
}
-static void end_polling(grpc_pollset *pollset) {
- size_t i;
- pollset_hdr *h;
- h = pollset->data.ptr;
- for (i = 1; i < h->pfd_count; i++) {
- grpc_fd_end_poll(&h->watchers[i], h->pfds[i].revents & POLLIN,
- h->pfds[i].revents & POLLOUT);
- }
-}
-
static void multipoll_with_poll_pollset_maybe_work(
- grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
+ grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline,
+ gpr_timespec now, int allow_synchronous_callback) {
int timeout;
int r;
- size_t i, np, nf, nd;
+ size_t i, j, pfd_count, fd_count;
pollset_hdr *h;
- grpc_kick_fd_info *kfd;
+ /* TODO(ctiller): inline some elements to avoid an allocation */
+ grpc_fd_watcher *watchers;
+ struct pollfd *pfds;
h = pollset->data.ptr;
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
- if (h->pfd_capacity < h->fd_count + 1) {
- h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1);
- gpr_free(h->pfds);
- gpr_free(h->watchers);
- h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity);
- h->watchers = gpr_malloc(sizeof(grpc_fd_watcher) * h->pfd_capacity);
- }
- nf = 0;
- np = 1;
- kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
- if (kfd == NULL) {
- /* Already kicked */
- return;
- }
- h->pfds[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
- h->pfds[0].events = POLLIN;
- h->pfds[0].revents = POLLOUT;
+ /* TODO(ctiller): perform just one malloc here if we exceed the inline case */
+ pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 1));
+ watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 1));
+ fd_count = 0;
+ pfd_count = 1;
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfds[0].events = POLLIN;
+ pfds[0].revents = POLLOUT;
for (i = 0; i < h->fd_count; i++) {
int remove = grpc_fd_is_orphaned(h->fds[i]);
- for (nd = 0; nd < h->del_count; nd++) {
- if (h->fds[i] == h->dels[nd]) remove = 1;
+ for (j = 0; j < h->del_count; j++) {
+ if (h->fds[i] == h->dels[j]) remove = 1;
}
if (remove) {
GRPC_FD_UNREF(h->fds[i], "multipoller");
} else {
- h->fds[nf++] = h->fds[i];
- h->watchers[np].fd = h->fds[i];
- h->pfds[np].fd = h->fds[i]->fd;
- h->pfds[np].revents = 0;
- np++;
+ h->fds[fd_count++] = h->fds[i];
+ watchers[pfd_count].fd = h->fds[i];
+ pfds[pfd_count].fd = h->fds[i]->fd;
+ pfds[pfd_count].revents = 0;
+ pfd_count++;
}
}
- h->pfd_count = np;
- h->fd_count = nf;
- for (nd = 0; nd < h->del_count; nd++) {
- GRPC_FD_UNREF(h->dels[nd], "multipoller_del");
+ for (j = 0; j < h->del_count; j++) {
+ GRPC_FD_UNREF(h->dels[j], "multipoller_del");
}
h->del_count = 0;
- if (h->pfd_count == 0) {
- end_polling(pollset);
- return;
- }
- pollset->counter++;
+ h->fd_count = fd_count;
gpr_mu_unlock(&pollset->mu);
- for (i = 1; i < np; i++) {
- h->pfds[i].events = grpc_fd_begin_poll(h->watchers[i].fd, pollset, POLLIN,
- POLLOUT, &h->watchers[i]);
+ for (i = 1; i < pfd_count; i++) {
+ pfds[i].events = grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN,
+ POLLOUT, &watchers[i]);
}
- r = poll(h->pfds, h->pfd_count, timeout);
+ r = poll(pfds, pfd_count, timeout);
- end_polling(pollset);
+ for (i = 1; i < pfd_count; i++) {
+ grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,
+ pfds[i].revents & POLLOUT);
+ }
if (r < 0) {
if (errno != EINTR) {
@@ -184,35 +158,31 @@ static void multipoll_with_poll_pollset_maybe_work(
} else if (r == 0) {
/* do nothing */
} else {
- if (h->pfds[0].revents & POLLIN) {
- grpc_pollset_kick_consume(&pollset->kick_state, kfd);
+ if (pfds[0].revents & POLLIN) {
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
- for (i = 1; i < np; i++) {
- if (h->watchers[i].fd == NULL) {
+ for (i = 1; i < pfd_count; i++) {
+ if (watchers[i].fd == NULL) {
continue;
}
- if (h->pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
- grpc_fd_become_readable(h->watchers[i].fd, allow_synchronous_callback);
+ if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
+ grpc_fd_become_readable(watchers[i].fd, allow_synchronous_callback);
}
- if (h->pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
- grpc_fd_become_writable(h->watchers[i].fd, allow_synchronous_callback);
+ if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
+ grpc_fd_become_writable(watchers[i].fd, allow_synchronous_callback);
}
}
}
- grpc_pollset_kick_post_poll(&pollset->kick_state, kfd);
- gpr_mu_lock(&pollset->mu);
- pollset->counter--;
-}
+ gpr_free(pfds);
+ gpr_free(watchers);
-static void multipoll_with_poll_pollset_kick(grpc_pollset *p) {
- grpc_pollset_force_kick(p);
+ gpr_mu_lock(&pollset->mu);
}
static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
size_t i;
pollset_hdr *h = pollset->data.ptr;
- GPR_ASSERT(pollset->counter == 0);
for (i = 0; i < h->fd_count; i++) {
GRPC_FD_UNREF(h->fds[i], "multipoller");
}
@@ -226,8 +196,6 @@ static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
multipoll_with_poll_pollset_finish_shutdown(pollset);
- gpr_free(h->pfds);
- gpr_free(h->watchers);
gpr_free(h->fds);
gpr_free(h->dels);
gpr_free(h);
@@ -237,7 +205,6 @@ static const grpc_pollset_vtable multipoll_with_poll_pollset = {
multipoll_with_poll_pollset_add_fd,
multipoll_with_poll_pollset_del_fd,
multipoll_with_poll_pollset_maybe_work,
- multipoll_with_poll_pollset_kick,
multipoll_with_poll_pollset_finish_shutdown,
multipoll_with_poll_pollset_destroy};
@@ -250,10 +217,6 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
h->fd_count = nfds;
h->fd_capacity = nfds;
h->fds = gpr_malloc(nfds * sizeof(grpc_fd *));
- h->pfd_count = 0;
- h->pfd_capacity = 0;
- h->pfds = NULL;
- h->watchers = NULL;
h->del_count = 0;
h->del_capacity = 0;
h->dels = NULL;
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index c8646af615..d3a9193af1 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -55,22 +55,60 @@
#include <grpc/support/useful.h>
GPR_TLS_DECL(g_current_thread_poller);
+GPR_TLS_DECL(g_current_thread_worker);
-void grpc_pollset_kick(grpc_pollset *p) {
- if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p && p->counter) {
- p->vtable->kick(p);
- }
+static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev->next = worker->next;
+ worker->next->prev = worker->prev;
+}
+
+int grpc_pollset_has_workers(grpc_pollset *p) {
+ return p->root_worker.next != &p->root_worker;
}
-void grpc_pollset_force_kick(grpc_pollset *p) {
- if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
- grpc_pollset_kick_kick(&p->kick_state);
+static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
+ if (grpc_pollset_has_workers(p)) {
+ grpc_pollset_worker *w = p->root_worker.next;
+ remove_worker(p, w);
+ return w;
+ } else {
+ return NULL;
}
}
-static void kick_using_pollset_kick(grpc_pollset *p) {
- if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
- grpc_pollset_kick_kick(&p->kick_state);
+static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->next = &p->root_worker;
+ worker->prev = worker->next->prev;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev = &p->root_worker;
+ worker->next = worker->prev->next;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+ if (specific_worker != NULL) {
+ if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+ for (specific_worker = p->root_worker.next;
+ specific_worker != &p->root_worker;
+ specific_worker = specific_worker->next) {
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ }
+ p->kicked_without_pollers = 1;
+ } else if (gpr_tls_get(&g_current_thread_worker) !=
+ (gpr_intptr)specific_worker) {
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ }
+ } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
+ specific_worker = pop_front_worker(p);
+ if (specific_worker != NULL) {
+ push_back_worker(p, specific_worker);
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ } else {
+ p->kicked_without_pollers = 1;
+ }
}
}
@@ -78,16 +116,12 @@ static void kick_using_pollset_kick(grpc_pollset *p) {
void grpc_pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
-
- /* Initialize kick fd state */
- grpc_pollset_kick_global_init();
+ grpc_wakeup_fd_global_init();
}
void grpc_pollset_global_shutdown(void) {
- /* destroy the kick pipes */
- grpc_pollset_kick_global_destroy();
-
gpr_tls_destroy(&g_current_thread_poller);
+ grpc_wakeup_fd_global_destroy();
}
/* main interface */
@@ -96,7 +130,7 @@ static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
void grpc_pollset_init(grpc_pollset *pollset) {
gpr_mu_init(&pollset->mu);
- grpc_pollset_kick_init(&pollset->kick_state);
+ pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
@@ -134,27 +168,44 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
}
-int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
+int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline) {
/* pollset->mu already held */
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ int added_worker = 0;
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
+ /* this must happen before we (potentially) drop pollset->mu */
+ worker->next = worker->prev = NULL;
+ /* TODO(ctiller): pool these */
+ grpc_wakeup_fd_init(&worker->wakeup_fd);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) {
- return 1;
+ goto done;
}
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
- return 1;
+ goto done;
}
if (pollset->shutting_down) {
- return 1;
+ goto done;
+ }
+ if (!pollset->kicked_without_pollers) {
+ push_front_worker(pollset, worker);
+ added_worker = 1;
+ gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
+ pollset->vtable->maybe_work(pollset, worker, deadline, now, 1);
+ gpr_tls_set(&g_current_thread_poller, 0);
+ } else {
+ pollset->kicked_without_pollers = 0;
+ }
+done:
+ grpc_wakeup_fd_destroy(&worker->wakeup_fd);
+ if (added_worker) {
+ remove_worker(pollset, worker);
}
- gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
- pollset->vtable->maybe_work(pollset, deadline, now, 1);
- gpr_tls_set(&g_current_thread_poller, 0);
if (pollset->shutting_down) {
- if (pollset->counter > 0) {
- grpc_pollset_kick(pollset);
+ if (grpc_pollset_has_workers(pollset)) {
+ grpc_pollset_kick(pollset, NULL);
} else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
pollset->called_shutdown = 1;
gpr_mu_unlock(&pollset->mu);
@@ -177,15 +228,13 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = 1;
if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
- pollset->counter == 0) {
+ !grpc_pollset_has_workers(pollset)) {
pollset->called_shutdown = 1;
call_shutdown = 1;
}
pollset->shutdown_done_cb = shutdown_done;
pollset->shutdown_done_arg = shutdown_done_arg;
- if (pollset->counter > 0) {
- grpc_pollset_kick(pollset);
- }
+ grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
gpr_mu_unlock(&pollset->mu);
if (call_shutdown) {
@@ -196,8 +245,8 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void grpc_pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(pollset->in_flight_cbs == 0);
+ GPR_ASSERT(!grpc_pollset_has_workers(pollset));
pollset->vtable->destroy(pollset);
- grpc_pollset_kick_destroy(&pollset->kick_state);
gpr_mu_destroy(&pollset->mu);
}
@@ -248,8 +297,8 @@ static void basic_do_promote(void *args, int success) {
gpr_mu_lock(&pollset->mu);
/* First we need to ensure that nobody is polling concurrently */
- if (pollset->counter != 0) {
- grpc_pollset_kick(pollset);
+ if (grpc_pollset_has_workers(pollset)) {
+ grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
grpc_iomgr_add_callback(&up_args->promotion_closure);
gpr_mu_unlock(&pollset->mu);
return;
@@ -264,7 +313,8 @@ static void basic_do_promote(void *args, int success) {
pollset->in_flight_cbs--;
if (pollset->shutting_down) {
/* We don't care about this pollset anymore. */
- if (pollset->in_flight_cbs == 0 && pollset->counter == 0 && !pollset->called_shutdown) {
+ if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
+ GPR_ASSERT(!grpc_pollset_has_workers(pollset));
pollset->called_shutdown = 1;
do_shutdown_cb = 1;
}
@@ -307,7 +357,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
GPR_ASSERT(fd);
if (fd == pollset->data.ptr) goto exit;
- if (!pollset->counter) {
+ if (!grpc_pollset_has_workers(pollset)) {
/* Fast path -- no in flight cbs */
/* TODO(klempner): Comment this out and fix any test failures or establish
* they are due to timing issues */
@@ -343,7 +393,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
up_args->promotion_closure.cb_arg = up_args;
grpc_iomgr_add_callback(&up_args->promotion_closure);
- grpc_pollset_kick(pollset);
+ grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
exit:
if (and_unlock_pollset) {
@@ -365,12 +415,12 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
}
static void basic_pollset_maybe_work(grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
gpr_timespec deadline, gpr_timespec now,
int allow_synchronous_callback) {
struct pollfd pfd[2];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
- grpc_kick_fd_info *kfd;
int timeout;
int r;
int nfds;
@@ -387,16 +437,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
fd = pollset->data.ptr = NULL;
}
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
- kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
- if (kfd == NULL) {
- /* Already kicked */
- return;
- }
- pfd[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
+ pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
pfd[0].events = POLLIN;
pfd[0].revents = 0;
nfds = 1;
- pollset->counter++;
if (fd) {
pfd[1].fd = fd->fd;
pfd[1].revents = 0;
@@ -428,7 +472,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
/* do nothing */
} else {
if (pfd[0].revents & POLLIN) {
- grpc_pollset_kick_consume(&pollset->kick_state, kfd);
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
if (nfds > 1) {
if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) {
@@ -440,14 +484,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
}
}
- grpc_pollset_kick_post_poll(&pollset->kick_state, kfd);
-
gpr_mu_lock(&pollset->mu);
- pollset->counter--;
}
static void basic_pollset_destroy(grpc_pollset *pollset) {
- GPR_ASSERT(pollset->counter == 0);
if (pollset->data.ptr != NULL) {
GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
pollset->data.ptr = NULL;
@@ -455,14 +495,13 @@ static void basic_pollset_destroy(grpc_pollset *pollset) {
}
static const grpc_pollset_vtable basic_pollset = {
- basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
- kick_using_pollset_kick, basic_pollset_destroy, basic_pollset_destroy};
+ basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
+ basic_pollset_destroy, basic_pollset_destroy};
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
pollset->vtable = &basic_pollset;
- pollset->counter = 0;
pollset->data.ptr = fd_or_null;
- if (fd_or_null) {
+ if (fd_or_null != NULL) {
GRPC_FD_REF(fd_or_null, "basicpoll");
}
}
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 37de1276d1..1c1b736193 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -35,8 +35,7 @@
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
#include <grpc/support/sync.h>
-
-#include "src/core/iomgr/pollset_kick_posix.h"
+#include "src/core/iomgr/wakeup_fd_posix.h"
typedef struct grpc_pollset_vtable grpc_pollset_vtable;
@@ -45,6 +44,12 @@ typedef struct grpc_pollset_vtable grpc_pollset_vtable;
use the struct tag */
struct grpc_fd;
+typedef struct grpc_pollset_worker {
+ grpc_wakeup_fd wakeup_fd;
+ struct grpc_pollset_worker *next;
+ struct grpc_pollset_worker *prev;
+} grpc_pollset_worker;
+
typedef struct grpc_pollset {
/* pollsets under posix can mutate representation as fds are added and
removed.
@@ -52,11 +57,11 @@ typedef struct grpc_pollset {
few fds, and an epoll() based implementation for many fds */
const grpc_pollset_vtable *vtable;
gpr_mu mu;
- grpc_pollset_kick_state kick_state;
- int counter;
+ grpc_pollset_worker root_worker;
int in_flight_cbs;
int shutting_down;
int called_shutdown;
+ int kicked_without_pollers;
void (*shutdown_done_cb)(void *arg);
void *shutdown_done_arg;
union {
@@ -70,9 +75,9 @@ struct grpc_pollset_vtable {
int and_unlock_pollset);
void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd,
int and_unlock_pollset);
- void (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline,
- gpr_timespec now, int allow_synchronous_callback);
- void (*kick)(grpc_pollset *pollset);
+ void (*maybe_work)(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline, gpr_timespec now,
+ int allow_synchronous_callback);
void (*finish_shutdown)(grpc_pollset *pollset);
void (*destroy)(grpc_pollset *pollset);
};
@@ -85,22 +90,16 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd);
poll after an fd is orphaned) */
void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd);
-/* Force any current pollers to break polling: it's the callers responsibility
- to ensure that the pollset indeed needs to be kicked - no verification that
- the pollset is actually performing polling work is done. At worst this will
- result in spurious wakeups if performed at the wrong moment.
- Does not touch pollset->mu. */
-void grpc_pollset_force_kick(grpc_pollset *pollset);
/* Returns the fd to listen on for kicks */
int grpc_kick_read_fd(grpc_pollset *p);
/* Call after polling has been kicked to leave the kicked state */
void grpc_kick_drain(grpc_pollset *p);
/* Convert a timespec to milliseconds:
- - very small or negative poll times are clamped to zero to do a
+ - very small or negative poll times are clamped to zero to do a
non-blocking poll (which becomes spin polling)
- other small values are rounded up to one millisecond
- - longer than a millisecond polls are rounded up to the next nearest
+ - longer than a millisecond polls are rounded up to the next nearest
millisecond to avoid spinning
- infinite timeouts are converted to -1 */
int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now);
@@ -114,4 +113,8 @@ extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller;
void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds,
size_t fd_count);
+/* Return 1 if the pollset has active threads in grpc_pollset_work (pollset must
+ * be locked) */
+int grpc_pollset_has_workers(grpc_pollset *pollset);
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/iomgr/wakeup_fd_eventfd.c b/src/core/iomgr/wakeup_fd_eventfd.c
index 99c32bb9db..52912235f8 100644
--- a/src/core/iomgr/wakeup_fd_eventfd.c
+++ b/src/core/iomgr/wakeup_fd_eventfd.c
@@ -42,7 +42,7 @@
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <grpc/support/log.h>
-static void eventfd_create(grpc_wakeup_fd_info *fd_info) {
+static void eventfd_create(grpc_wakeup_fd *fd_info) {
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
/* TODO(klempner): Handle failure more gracefully */
GPR_ASSERT(efd >= 0);
@@ -50,7 +50,7 @@ static void eventfd_create(grpc_wakeup_fd_info *fd_info) {
fd_info->write_fd = -1;
}
-static void eventfd_consume(grpc_wakeup_fd_info *fd_info) {
+static void eventfd_consume(grpc_wakeup_fd *fd_info) {
eventfd_t value;
int err;
do {
@@ -58,14 +58,14 @@ static void eventfd_consume(grpc_wakeup_fd_info *fd_info) {
} while (err < 0 && errno == EINTR);
}
-static void eventfd_wakeup(grpc_wakeup_fd_info *fd_info) {
+static void eventfd_wakeup(grpc_wakeup_fd *fd_info) {
int err;
do {
err = eventfd_write(fd_info->read_fd, 1);
} while (err < 0 && errno == EINTR);
}
-static void eventfd_destroy(grpc_wakeup_fd_info *fd_info) {
+static void eventfd_destroy(grpc_wakeup_fd *fd_info) {
close(fd_info->read_fd);
}
diff --git a/src/core/iomgr/wakeup_fd_pipe.c b/src/core/iomgr/wakeup_fd_pipe.c
index f895478990..9fc4ee2388 100644
--- a/src/core/iomgr/wakeup_fd_pipe.c
+++ b/src/core/iomgr/wakeup_fd_pipe.c
@@ -44,7 +44,7 @@
#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/support/log.h>
-static void pipe_create(grpc_wakeup_fd_info *fd_info) {
+static void pipe_init(grpc_wakeup_fd *fd_info) {
int pipefd[2];
/* TODO(klempner): Make this nonfatal */
GPR_ASSERT(0 == pipe(pipefd));
@@ -54,7 +54,7 @@ static void pipe_create(grpc_wakeup_fd_info *fd_info) {
fd_info->write_fd = pipefd[1];
}
-static void pipe_consume(grpc_wakeup_fd_info *fd_info) {
+static void pipe_consume(grpc_wakeup_fd *fd_info) {
char buf[128];
int r;
@@ -74,13 +74,13 @@ static void pipe_consume(grpc_wakeup_fd_info *fd_info) {
}
}
-static void pipe_wakeup(grpc_wakeup_fd_info *fd_info) {
+static void pipe_wakeup(grpc_wakeup_fd *fd_info) {
char c = 0;
while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR)
;
}
-static void pipe_destroy(grpc_wakeup_fd_info *fd_info) {
+static void pipe_destroy(grpc_wakeup_fd *fd_info) {
close(fd_info->read_fd);
close(fd_info->write_fd);
}
@@ -91,7 +91,7 @@ static int pipe_check_availability(void) {
}
const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = {
- pipe_create, pipe_consume, pipe_wakeup, pipe_destroy, pipe_check_availability
-};
+ pipe_init, pipe_consume, pipe_wakeup, pipe_destroy,
+ pipe_check_availability};
#endif /* GPR_POSIX_WAKUP_FD */
diff --git a/src/core/iomgr/wakeup_fd_posix.c b/src/core/iomgr/wakeup_fd_posix.c
index d3cc3ec570..e48f5223fa 100644
--- a/src/core/iomgr/wakeup_fd_posix.c
+++ b/src/core/iomgr/wakeup_fd_posix.c
@@ -57,19 +57,19 @@ void grpc_wakeup_fd_global_destroy(void) {
wakeup_fd_vtable = NULL;
}
-void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info) {
- wakeup_fd_vtable->create(fd_info);
+void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) {
+ wakeup_fd_vtable->init(fd_info);
}
-void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info) {
+void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->consume(fd_info);
}
-void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info) {
+void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->wakeup(fd_info);
}
-void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info) {
+void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->destroy(fd_info);
}
diff --git a/src/core/iomgr/wakeup_fd_posix.h b/src/core/iomgr/wakeup_fd_posix.h
index 1b0ff70c7f..a4da4df51f 100644
--- a/src/core/iomgr/wakeup_fd_posix.h
+++ b/src/core/iomgr/wakeup_fd_posix.h
@@ -69,28 +69,28 @@ void grpc_wakeup_fd_global_destroy(void);
* purposes only.*/
void grpc_wakeup_fd_global_init_force_fallback(void);
-typedef struct grpc_wakeup_fd_info grpc_wakeup_fd_info;
+typedef struct grpc_wakeup_fd grpc_wakeup_fd;
typedef struct grpc_wakeup_fd_vtable {
- void (*create)(grpc_wakeup_fd_info *fd_info);
- void (*consume)(grpc_wakeup_fd_info *fd_info);
- void (*wakeup)(grpc_wakeup_fd_info *fd_info);
- void (*destroy)(grpc_wakeup_fd_info *fd_info);
+ void (*init)(grpc_wakeup_fd *fd_info);
+ void (*consume)(grpc_wakeup_fd *fd_info);
+ void (*wakeup)(grpc_wakeup_fd *fd_info);
+ void (*destroy)(grpc_wakeup_fd *fd_info);
/* Must be called before calling any other functions */
int (*check_availability)(void);
} grpc_wakeup_fd_vtable;
-struct grpc_wakeup_fd_info {
+struct grpc_wakeup_fd {
int read_fd;
int write_fd;
};
#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)
-void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info);
-void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info);
-void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info);
-void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info);
+void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info);
+void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info);
+void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info);
+void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info);
/* Defined in some specialized implementation's .c file, or by
* wakeup_fd_nospecial.c if no such implementation exists. */