aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Hongwei Wang <hongweiw@google.com>2015-08-05 12:46:17 -0700
committerGravatar Hongwei Wang <hongweiw@google.com>2015-08-05 12:46:17 -0700
commitbc7405c3205c09ba3ad1974e129d66b329ccf38a (patch)
tree540f7a792d72eba6771bba5bc73daf96bc8040a1 /src/core
parentabefb245e23b647f61e1bfd5538c5834eb51543a (diff)
parent4e407ba343fb96b780ebcd4f5540260bfcea079a (diff)
Merge branch 'master' of https://github.com/grpc/grpc into zookeeper
Diffstat (limited to 'src/core')
-rw-r--r--src/core/client_config/subchannel.c6
-rw-r--r--src/core/iomgr/fd_posix.c18
-rw-r--r--src/core/iomgr/fd_posix.h9
-rw-r--r--src/core/iomgr/pollset.h23
-rw-r--r--src/core/iomgr/pollset_kick_posix.c168
-rw-r--r--src/core/iomgr/pollset_kick_posix.h93
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c162
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c123
-rw-r--r--src/core/iomgr/pollset_posix.c145
-rw-r--r--src/core/iomgr/pollset_posix.h33
-rw-r--r--src/core/iomgr/pollset_windows.c83
-rw-r--r--src/core/iomgr/pollset_windows.h12
-rw-r--r--src/core/iomgr/wakeup_fd_eventfd.c8
-rw-r--r--src/core/iomgr/wakeup_fd_pipe.c12
-rw-r--r--src/core/iomgr/wakeup_fd_posix.c10
-rw-r--r--src/core/iomgr/wakeup_fd_posix.h20
-rw-r--r--src/core/security/google_default_credentials.c6
-rw-r--r--src/core/surface/completion_queue.c70
-rw-r--r--src/core/surface/completion_queue.h2
-rw-r--r--src/core/surface/server.c73
-rw-r--r--src/core/transport/chttp2/internal.h6
-rw-r--r--src/core/transport/chttp2/stream_lists.c3
-rw-r--r--src/core/transport/chttp2/writing.c34
-rw-r--r--src/core/transport/chttp2_transport.c8
24 files changed, 522 insertions, 605 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 17773bd2f4..ca52c75beb 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -322,8 +322,8 @@ static void continue_connect(grpc_subchannel *c) {
static void start_connect(grpc_subchannel *c) {
c->backoff_delta = gpr_time_from_seconds(
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
- c->next_attempt = gpr_time_add(
- gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
+ c->next_attempt =
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
continue_connect(c);
}
@@ -511,8 +511,6 @@ static void publish_transport(grpc_subchannel *c) {
connection *destroy_connection = NULL;
grpc_channel_element *elem;
- gpr_log(GPR_DEBUG, "publish_transport: %p", c->master);
-
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
filters = gpr_malloc(sizeof(*filters) * num_filters);
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index cbc141a2d0..2d08a77a70 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -168,13 +168,19 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
+static void pollset_kick_locked(grpc_pollset *pollset) {
+ gpr_mu_lock(GRPC_POLLSET_MU(pollset));
+ grpc_pollset_kick(pollset, NULL);
+ gpr_mu_unlock(GRPC_POLLSET_MU(pollset));
+}
+
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
- grpc_pollset_force_kick(fd->inactive_watcher_root.next->pollset);
+ pollset_kick_locked(fd->inactive_watcher_root.next->pollset);
} else if (fd->read_watcher) {
- grpc_pollset_force_kick(fd->read_watcher->pollset);
+ pollset_kick_locked(fd->read_watcher->pollset);
} else if (fd->write_watcher) {
- grpc_pollset_force_kick(fd->write_watcher->pollset);
+ pollset_kick_locked(fd->write_watcher->pollset);
}
}
@@ -188,13 +194,13 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
grpc_fd_watcher *watcher;
for (watcher = fd->inactive_watcher_root.next;
watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
- grpc_pollset_force_kick(watcher->pollset);
+ pollset_kick_locked(watcher->pollset);
}
if (fd->read_watcher) {
- grpc_pollset_force_kick(fd->read_watcher->pollset);
+ pollset_kick_locked(fd->read_watcher->pollset);
}
if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
- grpc_pollset_force_kick(fd->write_watcher->pollset);
+ pollset_kick_locked(fd->write_watcher->pollset);
}
}
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 4e8e267ffd..835e9b339a 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -109,7 +109,8 @@ grpc_fd *grpc_fd_create(int fd, const char *name);
on_done is called when the underlying file descriptor is definitely close()d.
If on_done is NULL, no callback will be made.
Requires: *fd initialized; no outstanding notify_on_read or
- notify_on_write. */
+ notify_on_write.
+ MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
const char *reason);
@@ -122,11 +123,13 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
i.e. a combination of read_mask and write_mask determined by the fd's current
interest in said events.
Polling strategies that do not need to alter their behavior depending on the
- fd's current interest (such as epoll) do not need to call this function. */
+ fd's current interest (such as epoll) do not need to call this function.
+ MUST NOT be called with a pollset lock taken */
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
gpr_uint32 read_mask, gpr_uint32 write_mask,
grpc_fd_watcher *rec);
-/* Complete polling previously started with grpc_fd_begin_poll */
+/* Complete polling previously started with grpc_fd_begin_poll
+ MUST NOT be called with a pollset lock taken */
void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write);
/* Return 1 if this fd is orphaned, 0 otherwise */
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index c40188b3c9..c474e4dbf1 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -37,6 +37,8 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
+#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
+
/* A grpc_pollset is a set of file descriptors that a higher level item is
interested in. For example:
- a server will typically keep a pollset containing all connected channels,
@@ -63,13 +65,24 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
descriptors.
Requires GRPC_POLLSET_MU(pollset) locked.
May unlock GRPC_POLLSET_MU(pollset) during its execution.
-
+
+ worker is a (platform-specific) handle that can be used to wake up
+ from grpc_pollset_work before any events are received and before the timeout
+ has expired. It is both initialized and destroyed by grpc_pollset_work.
+ Initialization of worker is guaranteed to occur BEFORE the
+ GRPC_POLLSET_MU(pollset) is released for the first time by
+ grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
+ not be released by grpc_pollset_work AFTER worker has been destroyed.
+
Returns true if some work has been done, and false if the deadline
- got attained. */
-int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline);
+ expired. */
+int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline);
/* Break one polling thread out of polling work for this pollset.
- Requires GRPC_POLLSET_MU(pollset) locked. */
-void grpc_pollset_kick(grpc_pollset *pollset);
+ If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.
+ Otherwise, if specific_worker is non-NULL, then kick that worker. */
+void grpc_pollset_kick(grpc_pollset *pollset,
+ grpc_pollset_worker *specific_worker);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */
diff --git a/src/core/iomgr/pollset_kick_posix.c b/src/core/iomgr/pollset_kick_posix.c
deleted file mode 100644
index 51021784f2..0000000000
--- a/src/core/iomgr/pollset_kick_posix.c
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_POSIX_SOCKET
-#include "src/core/iomgr/pollset_kick_posix.h"
-
-#include <errno.h>
-#include <string.h>
-#include <unistd.h>
-
-#include "src/core/iomgr/socket_utils_posix.h"
-#include "src/core/iomgr/wakeup_fd_posix.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-/* This implementation is based on a freelist of wakeup fds, with extra logic to
- * handle kicks while there is no attached fd. */
-
-/* TODO(klempner): Autosize this, and consider providing a way to disable the
- * cap entirely on systems with large fd limits */
-#define GRPC_MAX_CACHED_WFDS 50
-
-static grpc_kick_fd_info *fd_freelist = NULL;
-static int fd_freelist_count = 0;
-static gpr_mu fd_freelist_mu;
-
-static grpc_kick_fd_info *allocate_wfd(void) {
- grpc_kick_fd_info *info = NULL;
- gpr_mu_lock(&fd_freelist_mu);
- if (fd_freelist != NULL) {
- info = fd_freelist;
- fd_freelist = fd_freelist->next;
- --fd_freelist_count;
- }
- gpr_mu_unlock(&fd_freelist_mu);
- if (info == NULL) {
- info = gpr_malloc(sizeof(*info));
- grpc_wakeup_fd_create(&info->wakeup_fd);
- info->next = NULL;
- }
- return info;
-}
-
-static void destroy_wfd(grpc_kick_fd_info *wfd) {
- grpc_wakeup_fd_destroy(&wfd->wakeup_fd);
- gpr_free(wfd);
-}
-
-static void free_wfd(grpc_kick_fd_info *fd_info) {
- gpr_mu_lock(&fd_freelist_mu);
- if (fd_freelist_count < GRPC_MAX_CACHED_WFDS) {
- fd_info->next = fd_freelist;
- fd_freelist = fd_info;
- fd_freelist_count++;
- fd_info = NULL;
- }
- gpr_mu_unlock(&fd_freelist_mu);
-
- if (fd_info) {
- destroy_wfd(fd_info);
- }
-}
-
-void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state) {
- gpr_mu_init(&kick_state->mu);
- kick_state->kicked = 0;
- kick_state->fd_list.next = kick_state->fd_list.prev = &kick_state->fd_list;
-}
-
-void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) {
- gpr_mu_destroy(&kick_state->mu);
- GPR_ASSERT(kick_state->fd_list.next == &kick_state->fd_list);
-}
-
-grpc_kick_fd_info *grpc_pollset_kick_pre_poll(
- grpc_pollset_kick_state *kick_state) {
- grpc_kick_fd_info *fd_info;
- gpr_mu_lock(&kick_state->mu);
- if (kick_state->kicked) {
- kick_state->kicked = 0;
- gpr_mu_unlock(&kick_state->mu);
- return NULL;
- }
- fd_info = allocate_wfd();
- fd_info->next = &kick_state->fd_list;
- fd_info->prev = fd_info->next->prev;
- fd_info->next->prev = fd_info->prev->next = fd_info;
- gpr_mu_unlock(&kick_state->mu);
- return fd_info;
-}
-
-void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state,
- grpc_kick_fd_info *fd_info) {
- grpc_wakeup_fd_consume_wakeup(&fd_info->wakeup_fd);
-}
-
-void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state,
- grpc_kick_fd_info *fd_info) {
- gpr_mu_lock(&kick_state->mu);
- fd_info->next->prev = fd_info->prev;
- fd_info->prev->next = fd_info->next;
- free_wfd(fd_info);
- gpr_mu_unlock(&kick_state->mu);
-}
-
-void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) {
- gpr_mu_lock(&kick_state->mu);
- if (kick_state->fd_list.next != &kick_state->fd_list) {
- grpc_wakeup_fd_wakeup(&kick_state->fd_list.next->wakeup_fd);
- } else {
- kick_state->kicked = 1;
- }
- gpr_mu_unlock(&kick_state->mu);
-}
-
-void grpc_pollset_kick_global_init_fallback_fd(void) {
- gpr_mu_init(&fd_freelist_mu);
- grpc_wakeup_fd_global_init_force_fallback();
-}
-
-void grpc_pollset_kick_global_init(void) {
- gpr_mu_init(&fd_freelist_mu);
- grpc_wakeup_fd_global_init();
-}
-
-void grpc_pollset_kick_global_destroy(void) {
- while (fd_freelist != NULL) {
- grpc_kick_fd_info *current = fd_freelist;
- fd_freelist = fd_freelist->next;
- destroy_wfd(current);
- }
- grpc_wakeup_fd_global_destroy();
- gpr_mu_destroy(&fd_freelist_mu);
-}
-
-#endif /* GPR_POSIX_SOCKET */
diff --git a/src/core/iomgr/pollset_kick_posix.h b/src/core/iomgr/pollset_kick_posix.h
deleted file mode 100644
index 77e32a8d51..0000000000
--- a/src/core/iomgr/pollset_kick_posix.h
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H
-#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H
-
-#include "src/core/iomgr/wakeup_fd_posix.h"
-#include <grpc/support/sync.h>
-
-/* pollset kicking allows breaking a thread out of polling work for
- a given pollset.
- writing a byte to a pipe is used as a posix-ly portable base
- mechanism, and eventfds are utilized on Linux for better performance. */
-
-typedef struct grpc_kick_fd_info {
- grpc_wakeup_fd_info wakeup_fd;
- /* used for polling list and free list */
- struct grpc_kick_fd_info *next;
- /* only used when polling */
- struct grpc_kick_fd_info *prev;
-} grpc_kick_fd_info;
-
-typedef struct grpc_pollset_kick_state {
- gpr_mu mu;
- int kicked;
- struct grpc_kick_fd_info fd_list;
-} grpc_pollset_kick_state;
-
-#define GRPC_POLLSET_KICK_GET_FD(kick_fd_info) \
- GRPC_WAKEUP_FD_GET_READ_FD(&(kick_fd_info)->wakeup_fd)
-
-/* This is an abstraction around the typical pipe mechanism for waking up a
- thread sitting in a poll() style call. */
-
-void grpc_pollset_kick_global_init(void);
-void grpc_pollset_kick_global_destroy(void);
-
-void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state);
-void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state);
-
-/* Guarantees a pure posix implementation rather than a specialized one, if
- * applicable. Intended for testing. */
-void grpc_pollset_kick_global_init_fallback_fd(void);
-
-/* Must be called before entering poll(). If return value is NULL, this consumed
- an existing kick. Otherwise the return value is an FD to add to the poll set.
- */
-grpc_kick_fd_info *grpc_pollset_kick_pre_poll(
- grpc_pollset_kick_state *kick_state);
-
-/* Consume an existing kick. Must be called after poll returns that the fd was
- readable, and before calling kick_post_poll. */
-void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state,
- grpc_kick_fd_info *fd_info);
-
-/* Must be called after pre_poll, and after consume if applicable */
-void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state,
- grpc_kick_fd_info *fd_info);
-
-/* Actually kick */
-void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state);
-
-#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H */
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index d697b59e4c..1320c64579 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -36,6 +36,7 @@
#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
#include <errno.h>
+#include <poll.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
@@ -44,23 +45,28 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+typedef struct wakeup_fd_hdl {
+ grpc_wakeup_fd wakeup_fd;
+ struct wakeup_fd_hdl *next;
+} wakeup_fd_hdl;
+
+typedef struct {
+ grpc_pollset *pollset;
+ grpc_fd *fd;
+ grpc_iomgr_closure closure;
+} delayed_add;
+
typedef struct {
int epoll_fd;
- grpc_wakeup_fd_info wakeup_fd;
+ wakeup_fd_hdl *free_wakeup_fds;
} pollset_hdr;
-static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
- grpc_fd *fd,
- int and_unlock_pollset) {
+static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
grpc_fd_watcher watcher;
- if (and_unlock_pollset) {
- gpr_mu_unlock(&pollset->mu);
- }
-
/* We pretend to be polling whilst adding an fd to keep the fd from being
closed during the add. This may result in a spurious wakeup being assigned
to this pollset whilst adding, but that should be benign. */
@@ -80,6 +86,52 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
grpc_fd_end_poll(&watcher, 0, 0);
}
+static void perform_delayed_add(void *arg, int iomgr_status) {
+ delayed_add *da = arg;
+ int do_shutdown_cb = 0;
+
+ if (!grpc_fd_is_orphaned(da->fd)) {
+ finally_add_fd(da->pollset, da->fd);
+ }
+
+ gpr_mu_lock(&da->pollset->mu);
+ da->pollset->in_flight_cbs--;
+ if (da->pollset->shutting_down) {
+ /* We don't care about this pollset anymore. */
+ if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
+ GPR_ASSERT(!grpc_pollset_has_workers(da->pollset));
+ da->pollset->called_shutdown = 1;
+ do_shutdown_cb = 1;
+ }
+ }
+ gpr_mu_unlock(&da->pollset->mu);
+
+ GRPC_FD_UNREF(da->fd, "delayed_add");
+
+ if (do_shutdown_cb) {
+ da->pollset->shutdown_done_cb(da->pollset->shutdown_done_arg);
+ }
+
+ gpr_free(da);
+}
+
+static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
+ grpc_fd *fd,
+ int and_unlock_pollset) {
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ finally_add_fd(pollset, fd);
+ } else {
+ delayed_add *da = gpr_malloc(sizeof(*da));
+ da->pollset = pollset;
+ da->fd = fd;
+ GRPC_FD_REF(fd, "delayed_add");
+ grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
+ pollset->in_flight_cbs++;
+ grpc_iomgr_add_callback(&da->closure);
+ }
+}
+
static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
grpc_fd *fd,
int and_unlock_pollset) {
@@ -103,12 +155,14 @@ static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
#define GRPC_EPOLL_MAX_EVENTS 1000
static void multipoll_with_epoll_pollset_maybe_work(
- grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
+ grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline,
+ gpr_timespec now, int allow_synchronous_callback) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int ep_rv;
+ int poll_rv;
pollset_hdr *h = pollset->data.ptr;
int timeout_ms;
+ struct pollfd pfds[2];
/* If you want to ignore epoll's ability to sanely handle parallel pollers,
* for a more apples-to-apples performance comparison with poll, add a
@@ -116,43 +170,58 @@ static void multipoll_with_epoll_pollset_maybe_work(
* here.
*/
- pollset->counter += 1;
gpr_mu_unlock(&pollset->mu);
timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
- do {
- ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
- if (ep_rv < 0) {
- if (errno != EINTR) {
- gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
- }
- } else {
- int i;
- for (i = 0; i < ep_rv; ++i) {
- if (ep_ev[i].data.ptr == 0) {
- grpc_wakeup_fd_consume_wakeup(&h->wakeup_fd);
- } else {
- grpc_fd *fd = ep_ev[i].data.ptr;
- /* TODO(klempner): We might want to consider making err and pri
- * separate events */
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write = ep_ev[i].events & EPOLLOUT;
- if (read || cancel) {
- grpc_fd_become_readable(fd, allow_synchronous_callback);
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfds[0].events = POLLIN;
+ pfds[0].revents = 0;
+ pfds[1].fd = h->epoll_fd;
+ pfds[1].events = POLLIN;
+ pfds[1].revents = 0;
+
+ poll_rv = poll(pfds, 2, timeout_ms);
+
+ if (poll_rv < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ }
+ } else if (poll_rv == 0) {
+ /* do nothing */
+ } else {
+ if (pfds[0].revents) {
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
+ }
+ if (pfds[1].revents) {
+ do {
+ ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
+ if (ep_rv < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
}
- if (write || cancel) {
- grpc_fd_become_writable(fd, allow_synchronous_callback);
+ } else {
+ int i;
+ for (i = 0; i < ep_rv; ++i) {
+ grpc_fd *fd = ep_ev[i].data.ptr;
+ /* TODO(klempner): We might want to consider making err and pri
+ * separate events */
+ int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+ int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+ int write = ep_ev[i].events & EPOLLOUT;
+ if (read || cancel) {
+ grpc_fd_become_readable(fd, allow_synchronous_callback);
+ }
+ if (write || cancel) {
+ grpc_fd_become_writable(fd, allow_synchronous_callback);
+ }
}
}
- }
+ } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
}
- timeout_ms = 0;
- } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
+ }
gpr_mu_lock(&pollset->mu);
- pollset->counter -= 1;
}
static void multipoll_with_epoll_pollset_finish_shutdown(
@@ -160,21 +229,14 @@ static void multipoll_with_epoll_pollset_finish_shutdown(
static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
- grpc_wakeup_fd_destroy(&h->wakeup_fd);
close(h->epoll_fd);
gpr_free(h);
}
-static void epoll_kick(grpc_pollset *pollset) {
- pollset_hdr *h = pollset->data.ptr;
- grpc_wakeup_fd_wakeup(&h->wakeup_fd);
-}
-
static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
multipoll_with_epoll_pollset_add_fd,
multipoll_with_epoll_pollset_del_fd,
multipoll_with_epoll_pollset_maybe_work,
- epoll_kick,
multipoll_with_epoll_pollset_finish_shutdown,
multipoll_with_epoll_pollset_destroy};
@@ -182,8 +244,6 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
size_t nfds) {
size_t i;
pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
- struct epoll_event ev;
- int err;
pollset->vtable = &multipoll_with_epoll_pollset;
pollset->data.ptr = h;
@@ -196,16 +256,6 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
for (i = 0; i < nfds; i++) {
multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0);
}
-
- grpc_wakeup_fd_create(&h->wakeup_fd);
- ev.events = EPOLLIN;
- ev.data.ptr = 0;
- err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD,
- GRPC_WAKEUP_FD_GET_READ_FD(&h->wakeup_fd), &ev);
- if (err < 0) {
- gpr_log(GPR_ERROR, "Wakeup fd epoll_ctl failed: %s", strerror(errno));
- abort();
- }
}
grpc_platform_become_multipoller_type grpc_platform_become_multipoller =
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 0084e83953..b5b2d7534d 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -53,12 +53,6 @@ typedef struct {
size_t fd_count;
size_t fd_capacity;
grpc_fd **fds;
- /* fds being polled by the current poller: parallel arrays of pollfd, and
- a grpc_fd_watcher */
- size_t pfd_count;
- size_t pfd_capacity;
- grpc_fd_watcher *watchers;
- struct pollfd *pfds;
/* fds that have been removed from the pollset explicitly */
size_t del_count;
size_t del_capacity;
@@ -102,80 +96,60 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset,
}
}
-static void end_polling(grpc_pollset *pollset) {
- size_t i;
- pollset_hdr *h;
- h = pollset->data.ptr;
- for (i = 1; i < h->pfd_count; i++) {
- grpc_fd_end_poll(&h->watchers[i], h->pfds[i].revents & POLLIN,
- h->pfds[i].revents & POLLOUT);
- }
-}
-
static void multipoll_with_poll_pollset_maybe_work(
- grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
+ grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline,
+ gpr_timespec now, int allow_synchronous_callback) {
int timeout;
int r;
- size_t i, np, nf, nd;
+ size_t i, j, pfd_count, fd_count;
pollset_hdr *h;
- grpc_kick_fd_info *kfd;
+ /* TODO(ctiller): inline some elements to avoid an allocation */
+ grpc_fd_watcher *watchers;
+ struct pollfd *pfds;
h = pollset->data.ptr;
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
- if (h->pfd_capacity < h->fd_count + 1) {
- h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1);
- gpr_free(h->pfds);
- gpr_free(h->watchers);
- h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity);
- h->watchers = gpr_malloc(sizeof(grpc_fd_watcher) * h->pfd_capacity);
- }
- nf = 0;
- np = 1;
- kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
- if (kfd == NULL) {
- /* Already kicked */
- return;
- }
- h->pfds[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
- h->pfds[0].events = POLLIN;
- h->pfds[0].revents = POLLOUT;
+ /* TODO(ctiller): perform just one malloc here if we exceed the inline case */
+ pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 1));
+ watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 1));
+ fd_count = 0;
+ pfd_count = 1;
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfds[0].events = POLLIN;
+ pfds[0].revents = POLLOUT;
for (i = 0; i < h->fd_count; i++) {
int remove = grpc_fd_is_orphaned(h->fds[i]);
- for (nd = 0; nd < h->del_count; nd++) {
- if (h->fds[i] == h->dels[nd]) remove = 1;
+ for (j = 0; !remove && j < h->del_count; j++) {
+ if (h->fds[i] == h->dels[j]) remove = 1;
}
if (remove) {
GRPC_FD_UNREF(h->fds[i], "multipoller");
} else {
- h->fds[nf++] = h->fds[i];
- h->watchers[np].fd = h->fds[i];
- h->pfds[np].fd = h->fds[i]->fd;
- h->pfds[np].revents = 0;
- np++;
+ h->fds[fd_count++] = h->fds[i];
+ watchers[pfd_count].fd = h->fds[i];
+ pfds[pfd_count].fd = h->fds[i]->fd;
+ pfds[pfd_count].revents = 0;
+ pfd_count++;
}
}
- h->pfd_count = np;
- h->fd_count = nf;
- for (nd = 0; nd < h->del_count; nd++) {
- GRPC_FD_UNREF(h->dels[nd], "multipoller_del");
+ for (j = 0; j < h->del_count; j++) {
+ GRPC_FD_UNREF(h->dels[j], "multipoller_del");
}
h->del_count = 0;
- if (h->pfd_count == 0) {
- end_polling(pollset);
- return;
- }
- pollset->counter++;
+ h->fd_count = fd_count;
gpr_mu_unlock(&pollset->mu);
- for (i = 1; i < np; i++) {
- h->pfds[i].events = grpc_fd_begin_poll(h->watchers[i].fd, pollset, POLLIN,
- POLLOUT, &h->watchers[i]);
+ for (i = 1; i < pfd_count; i++) {
+ pfds[i].events = grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN,
+ POLLOUT, &watchers[i]);
}
- r = poll(h->pfds, h->pfd_count, timeout);
+ r = poll(pfds, pfd_count, timeout);
- end_polling(pollset);
+ for (i = 1; i < pfd_count; i++) {
+ grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,
+ pfds[i].revents & POLLOUT);
+ }
if (r < 0) {
if (errno != EINTR) {
@@ -184,35 +158,31 @@ static void multipoll_with_poll_pollset_maybe_work(
} else if (r == 0) {
/* do nothing */
} else {
- if (h->pfds[0].revents & POLLIN) {
- grpc_pollset_kick_consume(&pollset->kick_state, kfd);
+ if (pfds[0].revents & POLLIN) {
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
- for (i = 1; i < np; i++) {
- if (h->watchers[i].fd == NULL) {
+ for (i = 1; i < pfd_count; i++) {
+ if (watchers[i].fd == NULL) {
continue;
}
- if (h->pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
- grpc_fd_become_readable(h->watchers[i].fd, allow_synchronous_callback);
+ if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
+ grpc_fd_become_readable(watchers[i].fd, allow_synchronous_callback);
}
- if (h->pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
- grpc_fd_become_writable(h->watchers[i].fd, allow_synchronous_callback);
+ if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
+ grpc_fd_become_writable(watchers[i].fd, allow_synchronous_callback);
}
}
}
- grpc_pollset_kick_post_poll(&pollset->kick_state, kfd);
- gpr_mu_lock(&pollset->mu);
- pollset->counter--;
-}
+ gpr_free(pfds);
+ gpr_free(watchers);
-static void multipoll_with_poll_pollset_kick(grpc_pollset *p) {
- grpc_pollset_force_kick(p);
+ gpr_mu_lock(&pollset->mu);
}
static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
size_t i;
pollset_hdr *h = pollset->data.ptr;
- GPR_ASSERT(pollset->counter == 0);
for (i = 0; i < h->fd_count; i++) {
GRPC_FD_UNREF(h->fds[i], "multipoller");
}
@@ -226,8 +196,6 @@ static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
multipoll_with_poll_pollset_finish_shutdown(pollset);
- gpr_free(h->pfds);
- gpr_free(h->watchers);
gpr_free(h->fds);
gpr_free(h->dels);
gpr_free(h);
@@ -237,7 +205,6 @@ static const grpc_pollset_vtable multipoll_with_poll_pollset = {
multipoll_with_poll_pollset_add_fd,
multipoll_with_poll_pollset_del_fd,
multipoll_with_poll_pollset_maybe_work,
- multipoll_with_poll_pollset_kick,
multipoll_with_poll_pollset_finish_shutdown,
multipoll_with_poll_pollset_destroy};
@@ -250,10 +217,6 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
h->fd_count = nfds;
h->fd_capacity = nfds;
h->fds = gpr_malloc(nfds * sizeof(grpc_fd *));
- h->pfd_count = 0;
- h->pfd_capacity = 0;
- h->pfds = NULL;
- h->watchers = NULL;
h->del_count = 0;
h->del_capacity = 0;
h->dels = NULL;
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index c8646af615..d3a9193af1 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -55,22 +55,60 @@
#include <grpc/support/useful.h>
GPR_TLS_DECL(g_current_thread_poller);
+GPR_TLS_DECL(g_current_thread_worker);
-void grpc_pollset_kick(grpc_pollset *p) {
- if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p && p->counter) {
- p->vtable->kick(p);
- }
+static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev->next = worker->next;
+ worker->next->prev = worker->prev;
+}
+
+int grpc_pollset_has_workers(grpc_pollset *p) {
+ return p->root_worker.next != &p->root_worker;
}
-void grpc_pollset_force_kick(grpc_pollset *p) {
- if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
- grpc_pollset_kick_kick(&p->kick_state);
+static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
+ if (grpc_pollset_has_workers(p)) {
+ grpc_pollset_worker *w = p->root_worker.next;
+ remove_worker(p, w);
+ return w;
+ } else {
+ return NULL;
}
}
-static void kick_using_pollset_kick(grpc_pollset *p) {
- if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
- grpc_pollset_kick_kick(&p->kick_state);
+static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->next = &p->root_worker;
+ worker->prev = worker->next->prev;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev = &p->root_worker;
+ worker->next = worker->prev->next;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+ if (specific_worker != NULL) {
+ if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+ for (specific_worker = p->root_worker.next;
+ specific_worker != &p->root_worker;
+ specific_worker = specific_worker->next) {
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ }
+ p->kicked_without_pollers = 1;
+ } else if (gpr_tls_get(&g_current_thread_worker) !=
+ (gpr_intptr)specific_worker) {
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ }
+ } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
+ specific_worker = pop_front_worker(p);
+ if (specific_worker != NULL) {
+ push_back_worker(p, specific_worker);
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ } else {
+ p->kicked_without_pollers = 1;
+ }
}
}
@@ -78,16 +116,12 @@ static void kick_using_pollset_kick(grpc_pollset *p) {
void grpc_pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
-
- /* Initialize kick fd state */
- grpc_pollset_kick_global_init();
+ grpc_wakeup_fd_global_init();
}
void grpc_pollset_global_shutdown(void) {
- /* destroy the kick pipes */
- grpc_pollset_kick_global_destroy();
-
gpr_tls_destroy(&g_current_thread_poller);
+ grpc_wakeup_fd_global_destroy();
}
/* main interface */
@@ -96,7 +130,7 @@ static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
void grpc_pollset_init(grpc_pollset *pollset) {
gpr_mu_init(&pollset->mu);
- grpc_pollset_kick_init(&pollset->kick_state);
+ pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
@@ -134,27 +168,44 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
}
-int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
+int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline) {
/* pollset->mu already held */
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ int added_worker = 0;
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
+ /* this must happen before we (potentially) drop pollset->mu */
+ worker->next = worker->prev = NULL;
+ /* TODO(ctiller): pool these */
+ grpc_wakeup_fd_init(&worker->wakeup_fd);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) {
- return 1;
+ goto done;
}
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
- return 1;
+ goto done;
}
if (pollset->shutting_down) {
- return 1;
+ goto done;
+ }
+ if (!pollset->kicked_without_pollers) {
+ push_front_worker(pollset, worker);
+ added_worker = 1;
+ gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
+ pollset->vtable->maybe_work(pollset, worker, deadline, now, 1);
+ gpr_tls_set(&g_current_thread_poller, 0);
+ } else {
+ pollset->kicked_without_pollers = 0;
+ }
+done:
+ grpc_wakeup_fd_destroy(&worker->wakeup_fd);
+ if (added_worker) {
+ remove_worker(pollset, worker);
}
- gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
- pollset->vtable->maybe_work(pollset, deadline, now, 1);
- gpr_tls_set(&g_current_thread_poller, 0);
if (pollset->shutting_down) {
- if (pollset->counter > 0) {
- grpc_pollset_kick(pollset);
+ if (grpc_pollset_has_workers(pollset)) {
+ grpc_pollset_kick(pollset, NULL);
} else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
pollset->called_shutdown = 1;
gpr_mu_unlock(&pollset->mu);
@@ -177,15 +228,13 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = 1;
if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
- pollset->counter == 0) {
+ !grpc_pollset_has_workers(pollset)) {
pollset->called_shutdown = 1;
call_shutdown = 1;
}
pollset->shutdown_done_cb = shutdown_done;
pollset->shutdown_done_arg = shutdown_done_arg;
- if (pollset->counter > 0) {
- grpc_pollset_kick(pollset);
- }
+ grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
gpr_mu_unlock(&pollset->mu);
if (call_shutdown) {
@@ -196,8 +245,8 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void grpc_pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(pollset->in_flight_cbs == 0);
+ GPR_ASSERT(!grpc_pollset_has_workers(pollset));
pollset->vtable->destroy(pollset);
- grpc_pollset_kick_destroy(&pollset->kick_state);
gpr_mu_destroy(&pollset->mu);
}
@@ -248,8 +297,8 @@ static void basic_do_promote(void *args, int success) {
gpr_mu_lock(&pollset->mu);
/* First we need to ensure that nobody is polling concurrently */
- if (pollset->counter != 0) {
- grpc_pollset_kick(pollset);
+ if (grpc_pollset_has_workers(pollset)) {
+ grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
grpc_iomgr_add_callback(&up_args->promotion_closure);
gpr_mu_unlock(&pollset->mu);
return;
@@ -264,7 +313,8 @@ static void basic_do_promote(void *args, int success) {
pollset->in_flight_cbs--;
if (pollset->shutting_down) {
/* We don't care about this pollset anymore. */
- if (pollset->in_flight_cbs == 0 && pollset->counter == 0 && !pollset->called_shutdown) {
+ if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
+ GPR_ASSERT(!grpc_pollset_has_workers(pollset));
pollset->called_shutdown = 1;
do_shutdown_cb = 1;
}
@@ -307,7 +357,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
GPR_ASSERT(fd);
if (fd == pollset->data.ptr) goto exit;
- if (!pollset->counter) {
+ if (!grpc_pollset_has_workers(pollset)) {
/* Fast path -- no in flight cbs */
/* TODO(klempner): Comment this out and fix any test failures or establish
* they are due to timing issues */
@@ -343,7 +393,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
up_args->promotion_closure.cb_arg = up_args;
grpc_iomgr_add_callback(&up_args->promotion_closure);
- grpc_pollset_kick(pollset);
+ grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
exit:
if (and_unlock_pollset) {
@@ -365,12 +415,12 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
}
static void basic_pollset_maybe_work(grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
gpr_timespec deadline, gpr_timespec now,
int allow_synchronous_callback) {
struct pollfd pfd[2];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
- grpc_kick_fd_info *kfd;
int timeout;
int r;
int nfds;
@@ -387,16 +437,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
fd = pollset->data.ptr = NULL;
}
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
- kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
- if (kfd == NULL) {
- /* Already kicked */
- return;
- }
- pfd[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
+ pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
pfd[0].events = POLLIN;
pfd[0].revents = 0;
nfds = 1;
- pollset->counter++;
if (fd) {
pfd[1].fd = fd->fd;
pfd[1].revents = 0;
@@ -428,7 +472,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
/* do nothing */
} else {
if (pfd[0].revents & POLLIN) {
- grpc_pollset_kick_consume(&pollset->kick_state, kfd);
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
if (nfds > 1) {
if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) {
@@ -440,14 +484,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
}
}
- grpc_pollset_kick_post_poll(&pollset->kick_state, kfd);
-
gpr_mu_lock(&pollset->mu);
- pollset->counter--;
}
static void basic_pollset_destroy(grpc_pollset *pollset) {
- GPR_ASSERT(pollset->counter == 0);
if (pollset->data.ptr != NULL) {
GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
pollset->data.ptr = NULL;
@@ -455,14 +495,13 @@ static void basic_pollset_destroy(grpc_pollset *pollset) {
}
static const grpc_pollset_vtable basic_pollset = {
- basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
- kick_using_pollset_kick, basic_pollset_destroy, basic_pollset_destroy};
+ basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
+ basic_pollset_destroy, basic_pollset_destroy};
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
pollset->vtable = &basic_pollset;
- pollset->counter = 0;
pollset->data.ptr = fd_or_null;
- if (fd_or_null) {
+ if (fd_or_null != NULL) {
GRPC_FD_REF(fd_or_null, "basicpoll");
}
}
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 37de1276d1..1c1b736193 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -35,8 +35,7 @@
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
#include <grpc/support/sync.h>
-
-#include "src/core/iomgr/pollset_kick_posix.h"
+#include "src/core/iomgr/wakeup_fd_posix.h"
typedef struct grpc_pollset_vtable grpc_pollset_vtable;
@@ -45,6 +44,12 @@ typedef struct grpc_pollset_vtable grpc_pollset_vtable;
use the struct tag */
struct grpc_fd;
+typedef struct grpc_pollset_worker {
+ grpc_wakeup_fd wakeup_fd;
+ struct grpc_pollset_worker *next;
+ struct grpc_pollset_worker *prev;
+} grpc_pollset_worker;
+
typedef struct grpc_pollset {
/* pollsets under posix can mutate representation as fds are added and
removed.
@@ -52,11 +57,11 @@ typedef struct grpc_pollset {
few fds, and an epoll() based implementation for many fds */
const grpc_pollset_vtable *vtable;
gpr_mu mu;
- grpc_pollset_kick_state kick_state;
- int counter;
+ grpc_pollset_worker root_worker;
int in_flight_cbs;
int shutting_down;
int called_shutdown;
+ int kicked_without_pollers;
void (*shutdown_done_cb)(void *arg);
void *shutdown_done_arg;
union {
@@ -70,9 +75,9 @@ struct grpc_pollset_vtable {
int and_unlock_pollset);
void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd,
int and_unlock_pollset);
- void (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline,
- gpr_timespec now, int allow_synchronous_callback);
- void (*kick)(grpc_pollset *pollset);
+ void (*maybe_work)(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline, gpr_timespec now,
+ int allow_synchronous_callback);
void (*finish_shutdown)(grpc_pollset *pollset);
void (*destroy)(grpc_pollset *pollset);
};
@@ -85,22 +90,16 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd);
poll after an fd is orphaned) */
void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd);
-/* Force any current pollers to break polling: it's the callers responsibility
- to ensure that the pollset indeed needs to be kicked - no verification that
- the pollset is actually performing polling work is done. At worst this will
- result in spurious wakeups if performed at the wrong moment.
- Does not touch pollset->mu. */
-void grpc_pollset_force_kick(grpc_pollset *pollset);
/* Returns the fd to listen on for kicks */
int grpc_kick_read_fd(grpc_pollset *p);
/* Call after polling has been kicked to leave the kicked state */
void grpc_kick_drain(grpc_pollset *p);
/* Convert a timespec to milliseconds:
- - very small or negative poll times are clamped to zero to do a
+ - very small or negative poll times are clamped to zero to do a
non-blocking poll (which becomes spin polling)
- other small values are rounded up to one millisecond
- - longer than a millisecond polls are rounded up to the next nearest
+ - longer than a millisecond polls are rounded up to the next nearest
millisecond to avoid spinning
- infinite timeouts are converted to -1 */
int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now);
@@ -114,4 +113,8 @@ extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller;
void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds,
size_t fd_count);
+/* Return 1 if the pollset has active threads in grpc_pollset_work (pollset must
+ * be locked) */
+int grpc_pollset_has_workers(grpc_pollset *pollset);
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index a9c4739c7c..22dc5891c3 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -42,6 +42,38 @@
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h"
+static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev->next = worker->next;
+ worker->next->prev = worker->prev;
+}
+
+static int has_workers(grpc_pollset *p) {
+ return p->root_worker.next != &p->root_worker;
+}
+
+static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
+ if (has_workers(p)) {
+ grpc_pollset_worker *w = p->root_worker.next;
+ remove_worker(p, w);
+ return w;
+ }
+ else {
+ return NULL;
+ }
+}
+
+static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->next = &p->root_worker;
+ worker->prev = worker->next->prev;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev = &p->root_worker;
+ worker->next = worker->prev->next;
+ worker->prev->next = worker->next->prev = worker;
+}
+
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. We're still going to provide a minimal
set of features for the sake of the rest of grpc. But grpc_pollset_work
@@ -50,7 +82,8 @@
void grpc_pollset_init(grpc_pollset *pollset) {
memset(pollset, 0, sizeof(*pollset));
gpr_mu_init(&pollset->mu);
- gpr_cv_init(&pollset->cv);
+ pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
+ pollset->kicked_without_pollers = 0;
}
void grpc_pollset_shutdown(grpc_pollset *pollset,
@@ -58,34 +91,66 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void *shutdown_done_arg) {
gpr_mu_lock(&pollset->mu);
pollset->shutting_down = 1;
- gpr_cv_broadcast(&pollset->cv);
+ grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
gpr_mu_unlock(&pollset->mu);
shutdown_done(shutdown_done_arg);
}
void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
- gpr_cv_destroy(&pollset->cv);
}
-int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
+int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline) {
gpr_timespec now;
+ int added_worker = 0;
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}
+ worker->next = worker->prev = NULL;
+ gpr_cv_init(&worker->cv);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
- return 1 /* GPR_TRUE */;
+ goto done;
}
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
- return 1 /* GPR_TRUE */;
+ goto done;
}
- if (!pollset->shutting_down) {
- gpr_cv_wait(&pollset->cv, &pollset->mu, deadline);
+ if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
+ push_front_worker(pollset, worker);
+ added_worker = 1;
+ gpr_cv_wait(&worker->cv, &pollset->mu, deadline);
+ } else {
+ pollset->kicked_without_pollers = 0;
+ }
+done:
+ gpr_cv_destroy(&worker->cv);
+ if (added_worker) {
+ remove_worker(pollset, worker);
}
return 1 /* GPR_TRUE */;
}
-void grpc_pollset_kick(grpc_pollset *p) { gpr_cv_signal(&p->cv); }
+void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+ if (specific_worker != NULL) {
+ if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+ for (specific_worker = p->root_worker.next;
+ specific_worker != &p->root_worker;
+ specific_worker = specific_worker->next) {
+ gpr_cv_signal(&specific_worker->cv);
+ }
+ p->kicked_without_pollers = 1;
+ } else {
+ gpr_cv_signal(&specific_worker->cv);
+ }
+ } else {
+ specific_worker = pop_front_worker(p);
+ if (specific_worker != NULL) {
+ push_back_worker(p, specific_worker);
+ gpr_cv_signal(&specific_worker->cv);
+ } else {
+ p->kicked_without_pollers = 1;
+ }
+ }
+}
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h
index c9b8d3f374..4efa5a1717 100644
--- a/src/core/iomgr/pollset_windows.h
+++ b/src/core/iomgr/pollset_windows.h
@@ -40,12 +40,20 @@
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. A Windows "pollset" is merely a mutex
- and a condition variable, used to synchronize with the IOCP. */
+ used to synchronize with the IOCP, and workers are condition variables
+ used to block threads until work is ready. */
+
+typedef struct grpc_pollset_worker {
+ gpr_cv cv;
+ struct grpc_pollset_worker *next;
+ struct grpc_pollset_worker *prev;
+} grpc_pollset_worker;
typedef struct grpc_pollset {
gpr_mu mu;
- gpr_cv cv;
int shutting_down;
+ int kicked_without_pollers;
+ grpc_pollset_worker root_worker;
} grpc_pollset;
#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
diff --git a/src/core/iomgr/wakeup_fd_eventfd.c b/src/core/iomgr/wakeup_fd_eventfd.c
index 99c32bb9db..52912235f8 100644
--- a/src/core/iomgr/wakeup_fd_eventfd.c
+++ b/src/core/iomgr/wakeup_fd_eventfd.c
@@ -42,7 +42,7 @@
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <grpc/support/log.h>
-static void eventfd_create(grpc_wakeup_fd_info *fd_info) {
+static void eventfd_create(grpc_wakeup_fd *fd_info) {
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
/* TODO(klempner): Handle failure more gracefully */
GPR_ASSERT(efd >= 0);
@@ -50,7 +50,7 @@ static void eventfd_create(grpc_wakeup_fd_info *fd_info) {
fd_info->write_fd = -1;
}
-static void eventfd_consume(grpc_wakeup_fd_info *fd_info) {
+static void eventfd_consume(grpc_wakeup_fd *fd_info) {
eventfd_t value;
int err;
do {
@@ -58,14 +58,14 @@ static void eventfd_consume(grpc_wakeup_fd_info *fd_info) {
} while (err < 0 && errno == EINTR);
}
-static void eventfd_wakeup(grpc_wakeup_fd_info *fd_info) {
+static void eventfd_wakeup(grpc_wakeup_fd *fd_info) {
int err;
do {
err = eventfd_write(fd_info->read_fd, 1);
} while (err < 0 && errno == EINTR);
}
-static void eventfd_destroy(grpc_wakeup_fd_info *fd_info) {
+static void eventfd_destroy(grpc_wakeup_fd *fd_info) {
close(fd_info->read_fd);
}
diff --git a/src/core/iomgr/wakeup_fd_pipe.c b/src/core/iomgr/wakeup_fd_pipe.c
index f895478990..9fc4ee2388 100644
--- a/src/core/iomgr/wakeup_fd_pipe.c
+++ b/src/core/iomgr/wakeup_fd_pipe.c
@@ -44,7 +44,7 @@
#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/support/log.h>
-static void pipe_create(grpc_wakeup_fd_info *fd_info) {
+static void pipe_init(grpc_wakeup_fd *fd_info) {
int pipefd[2];
/* TODO(klempner): Make this nonfatal */
GPR_ASSERT(0 == pipe(pipefd));
@@ -54,7 +54,7 @@ static void pipe_create(grpc_wakeup_fd_info *fd_info) {
fd_info->write_fd = pipefd[1];
}
-static void pipe_consume(grpc_wakeup_fd_info *fd_info) {
+static void pipe_consume(grpc_wakeup_fd *fd_info) {
char buf[128];
int r;
@@ -74,13 +74,13 @@ static void pipe_consume(grpc_wakeup_fd_info *fd_info) {
}
}
-static void pipe_wakeup(grpc_wakeup_fd_info *fd_info) {
+static void pipe_wakeup(grpc_wakeup_fd *fd_info) {
char c = 0;
while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR)
;
}
-static void pipe_destroy(grpc_wakeup_fd_info *fd_info) {
+static void pipe_destroy(grpc_wakeup_fd *fd_info) {
close(fd_info->read_fd);
close(fd_info->write_fd);
}
@@ -91,7 +91,7 @@ static int pipe_check_availability(void) {
}
const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = {
- pipe_create, pipe_consume, pipe_wakeup, pipe_destroy, pipe_check_availability
-};
+ pipe_init, pipe_consume, pipe_wakeup, pipe_destroy,
+ pipe_check_availability};
#endif /* GPR_POSIX_WAKUP_FD */
diff --git a/src/core/iomgr/wakeup_fd_posix.c b/src/core/iomgr/wakeup_fd_posix.c
index d3cc3ec570..e48f5223fa 100644
--- a/src/core/iomgr/wakeup_fd_posix.c
+++ b/src/core/iomgr/wakeup_fd_posix.c
@@ -57,19 +57,19 @@ void grpc_wakeup_fd_global_destroy(void) {
wakeup_fd_vtable = NULL;
}
-void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info) {
- wakeup_fd_vtable->create(fd_info);
+void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) {
+ wakeup_fd_vtable->init(fd_info);
}
-void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info) {
+void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->consume(fd_info);
}
-void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info) {
+void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->wakeup(fd_info);
}
-void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info) {
+void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->destroy(fd_info);
}
diff --git a/src/core/iomgr/wakeup_fd_posix.h b/src/core/iomgr/wakeup_fd_posix.h
index 1b0ff70c7f..a4da4df51f 100644
--- a/src/core/iomgr/wakeup_fd_posix.h
+++ b/src/core/iomgr/wakeup_fd_posix.h
@@ -69,28 +69,28 @@ void grpc_wakeup_fd_global_destroy(void);
* purposes only.*/
void grpc_wakeup_fd_global_init_force_fallback(void);
-typedef struct grpc_wakeup_fd_info grpc_wakeup_fd_info;
+typedef struct grpc_wakeup_fd grpc_wakeup_fd;
typedef struct grpc_wakeup_fd_vtable {
- void (*create)(grpc_wakeup_fd_info *fd_info);
- void (*consume)(grpc_wakeup_fd_info *fd_info);
- void (*wakeup)(grpc_wakeup_fd_info *fd_info);
- void (*destroy)(grpc_wakeup_fd_info *fd_info);
+ void (*init)(grpc_wakeup_fd *fd_info);
+ void (*consume)(grpc_wakeup_fd *fd_info);
+ void (*wakeup)(grpc_wakeup_fd *fd_info);
+ void (*destroy)(grpc_wakeup_fd *fd_info);
/* Must be called before calling any other functions */
int (*check_availability)(void);
} grpc_wakeup_fd_vtable;
-struct grpc_wakeup_fd_info {
+struct grpc_wakeup_fd {
int read_fd;
int write_fd;
};
#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)
-void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info);
-void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info);
-void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info);
-void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info);
+void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info);
+void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info);
+void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info);
+void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info);
/* Defined in some specialized implementation's .c file, or by
* wakeup_fd_nospecial.c if no such implementation exists. */
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index de1929fe76..f368819597 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -80,7 +80,7 @@ static void on_compute_engine_detection_http_response(
}
gpr_mu_lock(GRPC_POLLSET_MU(&detector->pollset));
detector->is_done = 1;
- grpc_pollset_kick(&detector->pollset);
+ grpc_pollset_kick(&detector->pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset));
}
@@ -112,7 +112,9 @@ static int is_stack_running_on_compute_engine(void) {
called once for the lifetime of the process by the default credentials. */
gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
while (!detector.is_done) {
- grpc_pollset_work(&detector.pollset, gpr_inf_future(GPR_CLOCK_REALTIME));
+ grpc_pollset_worker worker;
+ grpc_pollset_work(&detector.pollset, &worker,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 3f60b0b0ba..00429fac19 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -45,6 +45,11 @@
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
+typedef struct {
+ grpc_pollset_worker *worker;
+ void *tag;
+} plucker;
+
/* Completion queue structure */
struct grpc_completion_queue {
/** completed events */
@@ -60,6 +65,8 @@ struct grpc_completion_queue {
int shutdown;
int shutdown_called;
int is_server_cq;
+ int num_pluckers;
+ plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
};
grpc_completion_queue *grpc_completion_queue_create(void) {
@@ -117,6 +124,8 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
void (*done)(void *done_arg, grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
int shutdown;
+ int i;
+ grpc_pollset_worker *pluck_worker;
storage->tag = tag;
storage->done = done;
@@ -130,7 +139,14 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
cc->completed_tail->next =
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;
- grpc_pollset_kick(&cc->pollset);
+ pluck_worker = NULL;
+ for (i = 0; i < cc->num_pluckers; i++) {
+ if (cc->pluckers[i].tag == tag) {
+ pluck_worker = cc->pluckers[i].worker;
+ break;
+ }
+ }
+ grpc_pollset_kick(&cc->pollset, pluck_worker);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
} else {
cc->completed_tail->next =
@@ -147,6 +163,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
grpc_event ret;
+ grpc_pollset_worker worker;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -172,7 +189,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
- if (!grpc_pollset_work(&cc->pollset, deadline)) {
+ if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
@@ -184,11 +201,37 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
return ret;
}
+static int add_plucker(grpc_completion_queue *cc, void *tag,
+ grpc_pollset_worker *worker) {
+ if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
+ return 0;
+ }
+ cc->pluckers[cc->num_pluckers].tag = tag;
+ cc->pluckers[cc->num_pluckers].worker = worker;
+ cc->num_pluckers++;
+ return 1;
+}
+
+static void del_plucker(grpc_completion_queue *cc, void *tag,
+ grpc_pollset_worker *worker) {
+ int i;
+ for (i = 0; i < cc->num_pluckers; i++) {
+ if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) {
+ cc->num_pluckers--;
+ GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]);
+ return;
+ }
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+}
+
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
grpc_event ret;
grpc_cq_completion *c;
grpc_cq_completion *prev;
+ grpc_pollset_worker worker;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -219,12 +262,24 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
- if (!grpc_pollset_work(&cc->pollset, deadline)) {
+ if (!add_plucker(cc, tag, &worker)) {
+ gpr_log(GPR_DEBUG,
+ "Too many outstanding grpc_completion_queue_pluck calls: maximum is %d",
+ GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
+ /* TODO(ctiller): should we use a different result here */
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
+ if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
+ del_plucker(cc, tag, &worker);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_TIMEOUT;
+ break;
+ }
+ del_plucker(cc, tag, &worker);
}
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
@@ -261,15 +316,6 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return &cc->pollset;
}
-void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- grpc_pollset_kick(&cc->pollset);
- grpc_pollset_work(&cc->pollset,
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(100, GPR_TIMESPAN)));
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index f944f48d8e..8de024aaea 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -77,8 +77,6 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
-void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc);
-
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index c370a9b8ab..29d893db71 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -327,6 +327,14 @@ static void request_matcher_zombify_all_pending_calls(
}
}
+static void request_matcher_kill_requests(grpc_server *server,
+ request_matcher *rm) {
+ int request_id;
+ while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
+ fail_call(server, &server->requested_calls[request_id]);
+ }
+}
+
/*
* server proper
*/
@@ -492,12 +500,25 @@ static int num_channels(grpc_server *server) {
return n;
}
+static void kill_pending_work_locked(grpc_server *server) {
+ registered_method *rm;
+ request_matcher_kill_requests(server, &server->unregistered_request_matcher);
+ request_matcher_zombify_all_pending_calls(
+ &server->unregistered_request_matcher);
+ for (rm = server->registered_methods; rm; rm = rm->next) {
+ request_matcher_kill_requests(server, &rm->request_matcher);
+ request_matcher_zombify_all_pending_calls(&rm->request_matcher);
+ }
+}
+
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
return;
}
+ kill_pending_work_locked(server);
+
if (server->root_channel_data.next != &server->root_channel_data ||
server->listeners_destroyed < num_listeners(server)) {
if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
@@ -947,52 +968,15 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
op.set_accept_stream_user_data = chand;
op.on_connectivity_state_change = &chand->channel_connectivity_changed;
op.connectivity_state = &chand->connectivity_state;
+ op.disconnect = gpr_atm_acq_load(&s->shutdown_flag);
grpc_transport_perform_op(transport, &op);
}
-typedef struct {
- requested_call **requests;
- size_t count;
- size_t capacity;
-} request_killer;
-
-static void request_killer_init(request_killer *rk) {
- memset(rk, 0, sizeof(*rk));
-}
-
-static void request_killer_add(request_killer *rk, requested_call *rc) {
- if (rk->capacity == rk->count) {
- rk->capacity = GPR_MAX(8, rk->capacity * 2);
- rk->requests =
- gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests));
- }
- rk->requests[rk->count++] = rc;
-}
-
-static void request_killer_add_request_matcher(request_killer *rk,
- grpc_server *server,
- request_matcher *rm) {
- int request_id;
- while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
- request_killer_add(rk, &server->requested_calls[request_id]);
- }
-}
-
-static void request_killer_run(request_killer *rk, grpc_server *server) {
- size_t i;
- for (i = 0; i < rk->count; i++) {
- fail_call(server, rk->requests[i]);
- }
- gpr_free(rk->requests);
-}
-
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
- registered_method *rm;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
- request_killer reqkill;
GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
@@ -1013,27 +997,16 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
channel_broadcaster_init(server, &broadcaster);
- request_killer_init(&reqkill);
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
- request_killer_add_request_matcher(&reqkill, server,
- &server->unregistered_request_matcher);
- request_matcher_zombify_all_pending_calls(
- &server->unregistered_request_matcher);
- for (rm = server->registered_methods; rm; rm = rm->next) {
- request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher);
- request_matcher_zombify_all_pending_calls(&rm->request_matcher);
- }
+ kill_pending_work_locked(server);
gpr_mu_unlock(&server->mu_call);
gpr_atm_rel_store(&server->shutdown_flag, 1);
maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu_global);
- /* terminate all the requested calls */
- request_killer_run(&reqkill, server);
-
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
l->destroy(server, l->arg);
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index f0eeb6de50..cb428f8e3c 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -119,6 +119,10 @@ typedef enum {
GRPC_WRITE_STATE_SENT_CLOSE
} grpc_chttp2_write_state;
+/* flags that can be or'd into stream_global::writing_now */
+#define GRPC_CHTTP2_WRITING_DATA 1
+#define GRPC_CHTTP2_WRITING_WINDOW 2
+
typedef enum {
GRPC_DONT_SEND_CLOSED = 0,
GRPC_SEND_CLOSED,
@@ -382,7 +386,7 @@ typedef struct {
gpr_uint8 published_cancelled;
/** is this stream in the stream map? (boolean) */
gpr_uint8 in_stream_map;
- /** is this stream actively being written? */
+ /** bitmask of GRPC_CHTTP2_WRITING_xxx above */
gpr_uint8 writing_now;
/** stream state already published to the upper layer */
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index 9e68c1e146..9c3ad7a777 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -164,9 +164,6 @@ void grpc_chttp2_list_add_first_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
GPR_ASSERT(stream_global->id != 0);
- gpr_log(GPR_DEBUG, "add:%d:%d:%d:%d", stream_global->id,
- stream_global->write_state, stream_global->in_stream_map,
- stream_global->read_closed);
stream_list_add_head(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE);
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index d39b0c42f7..b55e81fdca 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -77,7 +77,6 @@ int grpc_chttp2_unlocking_check_writes(
stream_writing->id = stream_global->id;
stream_writing->send_closed = GRPC_DONT_SEND_CLOSED;
- GPR_ASSERT(!stream_global->writing_now);
if (stream_global->outgoing_sopb) {
window_delta =
@@ -123,11 +122,13 @@ int grpc_chttp2_unlocking_check_writes(
stream_global->unannounced_incoming_window = 0;
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
- stream_global->writing_now = 1;
- grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
- } else if (stream_writing->sopb.nops > 0 ||
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
- stream_global->writing_now = 1;
+ stream_global->writing_now |= GRPC_CHTTP2_WRITING_WINDOW;
+ }
+ if (stream_writing->sopb.nops > 0 ||
+ stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
+ stream_global->writing_now |= GRPC_CHTTP2_WRITING_DATA;
+ }
+ if (stream_global->writing_now != 0) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
}
}
@@ -183,6 +184,7 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
stream_writing->id, &transport_writing->hpack_compressor,
&transport_writing->outbuf);
+ stream_writing->sopb.nops = 0;
}
if (stream_writing->announce_window > 0) {
gpr_slice_buffer_add(
@@ -191,7 +193,6 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
stream_writing->id, stream_writing->announce_window));
stream_writing->announce_window = 0;
}
- stream_writing->sopb.nops = 0;
if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) {
gpr_slice_buffer_add(&transport_writing->outbuf,
grpc_chttp2_rst_stream_create(stream_writing->id,
@@ -215,20 +216,23 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
- GPR_ASSERT(stream_global->writing_now);
- stream_global->writing_now = 0;
- if (stream_global->outgoing_sopb != NULL &&
- stream_global->outgoing_sopb->nops == 0) {
- stream_global->outgoing_sopb = NULL;
- grpc_chttp2_schedule_closure(transport_global,
- stream_global->send_done_closure, 1);
- }
+ GPR_ASSERT(stream_global->writing_now != 0);
if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
if (!transport_global->is_client) {
stream_global->read_closed = 1;
}
}
+ if (stream_global->writing_now & GRPC_CHTTP2_WRITING_DATA) {
+ if (stream_global->outgoing_sopb != NULL &&
+ stream_global->outgoing_sopb->nops == 0) {
+ GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
+ stream_global->outgoing_sopb = NULL;
+ grpc_chttp2_schedule_closure(transport_global,
+ stream_global->send_done_closure, 1);
+ }
+ }
+ stream_global->writing_now = 0;
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 1ea4a82c16..6ba144faa4 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -823,6 +823,12 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
stream_global);
} else {
stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
+ if (stream_global->outgoing_sopb != NULL) {
+ grpc_sopb_reset(stream_global->outgoing_sopb);
+ stream_global->outgoing_sopb = NULL;
+ grpc_chttp2_schedule_closure(transport_global,
+ stream_global->send_done_closure, 1);
+ }
stream_global->read_closed = 1;
if (!stream_global->published_cancelled) {
char buffer[GPR_LTOA_MIN_BUFSIZE];
@@ -849,7 +855,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
if (!stream_global->publish_sopb) {
continue;
}
- if (stream_global->writing_now) {
+ if (stream_global->writing_now != 0) {
continue;
}
/* FIXME(ctiller): we include in_stream_map in our computation of