diff options
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/pollset_kick.h | 68 | ||||
-rw-r--r-- | src/core/iomgr/pollset_kick_posix.c | 161 | ||||
-rw-r--r-- | src/core/iomgr/pollset_kick_posix.h | 47 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 9 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 84 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.h | 3 |
6 files changed, 302 insertions, 70 deletions
diff --git a/src/core/iomgr/pollset_kick.h b/src/core/iomgr/pollset_kick.h new file mode 100644 index 0000000000..f088818b9a --- /dev/null +++ b/src/core/iomgr/pollset_kick.h @@ -0,0 +1,68 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_KICK_H_ +#define __GRPC_INTERNAL_IOMGR_POLLSET_KICK_H_ + +#include <grpc/support/port_platform.h> + +/* This is an abstraction around the typical pipe mechanism for waking up a + thread sitting in a poll() style call. */ + +#ifdef GPR_POSIX_SOCKET +#include "src/core/iomgr/pollset_kick_posix.h" +#else +#error "No pollset kick support on platform" +#endif + +void grpc_pollset_kick_global_init(void); +void grpc_pollset_kick_global_destroy(void); + +void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state); +void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state); + +/* Must be called before entering poll(). If return value is -1, this consumed + an existing kick. Otherwise the return value is an FD to add to the poll set. + */ +int grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state); + +/* Consume an existing kick. Must be called after poll returns that the fd was + readable, and before calling kick_post_poll. */ +void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state); + +/* Must be called after pre_poll, and after consume if applicable */ +void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state); + +void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state); + +#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_KICK_H_ */ diff --git a/src/core/iomgr/pollset_kick_posix.c b/src/core/iomgr/pollset_kick_posix.c new file mode 100644 index 0000000000..d16e49e459 --- /dev/null +++ b/src/core/iomgr/pollset_kick_posix.c @@ -0,0 +1,161 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/pollset_kick_posix.h" + +#include <errno.h> +#include <string.h> +#include <unistd.h> + +#include "src/core/iomgr/socket_utils_posix.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +/* This implementation is based on a freelist of pipes. */ + +typedef struct grpc_kick_pipe_info { + int pipe_read_fd; + int pipe_write_fd; + struct grpc_kick_pipe_info *next; +} grpc_kick_pipe_info; + +static grpc_kick_pipe_info *pipe_freelist = NULL; +static gpr_mu pipe_freelist_mu; + +static grpc_kick_pipe_info *allocate_pipe() { + grpc_kick_pipe_info *info; + gpr_mu_lock(&pipe_freelist_mu); + if (pipe_freelist != NULL) { + info = pipe_freelist; + pipe_freelist = pipe_freelist->next; + } else { + int pipefd[2]; + /* TODO(klempner): Make this nonfatal */ + GPR_ASSERT(0 == pipe(pipefd)); + GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1)); + GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1)); + info = gpr_malloc(sizeof(*info)); + info->pipe_read_fd = pipefd[0]; + info->pipe_write_fd = pipefd[1]; + info->next = NULL; + } + gpr_mu_unlock(&pipe_freelist_mu); + return info; +} + +static void free_pipe(grpc_kick_pipe_info *pipe_info) { + /* TODO(klempner): Start closing pipes if the free list gets too large */ + gpr_mu_lock(&pipe_freelist_mu); + pipe_info->next = pipe_freelist; + pipe_freelist = pipe_info; + gpr_mu_unlock(&pipe_freelist_mu); +} + +void grpc_pollset_kick_global_init() { + pipe_freelist = NULL; + gpr_mu_init(&pipe_freelist_mu); +} + +void grpc_pollset_kick_global_destroy() { + while (pipe_freelist != NULL) { + grpc_kick_pipe_info *current = pipe_freelist; + pipe_freelist = pipe_freelist->next; + close(current->pipe_read_fd); + close(current->pipe_write_fd); + gpr_free(current); + } + gpr_mu_destroy(&pipe_freelist_mu); +} + +void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state) { + gpr_mu_init(&kick_state->mu); + kick_state->kicked = 0; + kick_state->pipe_info = NULL; +} + +void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) { + gpr_mu_destroy(&kick_state->mu); + GPR_ASSERT(kick_state->pipe_info == NULL); +} + +int grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state) { + gpr_mu_lock(&kick_state->mu); + if (kick_state->kicked) { + kick_state->kicked = 0; + gpr_mu_unlock(&kick_state->mu); + return -1; + } + kick_state->pipe_info = allocate_pipe(); + gpr_mu_unlock(&kick_state->mu); + return kick_state->pipe_info->pipe_read_fd; +} + +void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state) { + char buf[128]; + int r; + + for (;;) { + r = read(kick_state->pipe_info->pipe_read_fd, buf, sizeof(buf)); + if (r > 0) continue; + if (r == 0) return; + switch (errno) { + case EAGAIN: + return; + case EINTR: + continue; + default: + gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno)); + return; + } + } +} + +void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state) { + gpr_mu_lock(&kick_state->mu); + free_pipe(kick_state->pipe_info); + kick_state->pipe_info = NULL; + gpr_mu_unlock(&kick_state->mu); +} + +void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) { + gpr_mu_lock(&kick_state->mu); + if (kick_state->pipe_info != NULL) { + char c = 0; + while (write(kick_state->pipe_info->pipe_write_fd, &c, 1) != 1 && + errno == EINTR) + ; + } else { + kick_state->kicked = 1; + } + gpr_mu_unlock(&kick_state->mu); +} diff --git a/src/core/iomgr/pollset_kick_posix.h b/src/core/iomgr/pollset_kick_posix.h new file mode 100644 index 0000000000..bae3b5923a --- /dev/null +++ b/src/core/iomgr/pollset_kick_posix.h @@ -0,0 +1,47 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_ +#define __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_ + +#include <grpc/support/sync.h> + +struct grpc_kick_pipe_info; + +typedef struct grpc_pollset_kick_state { + gpr_mu mu; + int kicked; + struct grpc_kick_pipe_info *pipe_info; +} grpc_pollset_kick_state; + +#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_ */ diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index e482da94f7..7c9a9491cb 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -131,7 +131,11 @@ static int multipoll_with_poll_pollset_maybe_work( } nf = 0; np = 1; - h->pfds[0].fd = grpc_kick_read_fd(pollset); + h->pfds[0].fd = grpc_pollset_kick_pre_poll(&pollset->kick_state); + if (h->pfds[0].fd < 0) { + /* Already kicked */ + return 1; + } h->pfds[0].events = POLLIN; h->pfds[0].revents = POLLOUT; for (i = 0; i < h->fd_count; i++) { @@ -173,7 +177,7 @@ static int multipoll_with_poll_pollset_maybe_work( /* do nothing */ } else { if (h->pfds[0].revents & POLLIN) { - grpc_kick_drain(pollset); + grpc_pollset_kick_consume(&pollset->kick_state); } for (i = 1; i < np; i++) { if (h->pfds[i].revents & POLLIN) { @@ -184,6 +188,7 @@ static int multipoll_with_poll_pollset_maybe_work( } } } + grpc_pollset_kick_post_poll(&pollset->kick_state); end_polling(pollset); gpr_mu_lock(&pollset->mu); diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 6f1b3ced7d..2555322532 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -48,18 +48,6 @@ #include <grpc/support/thd.h> #include <grpc/support/useful.h> -/* kick pipes: we keep a sharded set of pipes to allow breaking from poll. - Ideally this would be 1:1 with pollsets, but we'd like to avoid associating - full kernel objects with each pollset to keep them lightweight, so instead - keep a sharded set and allow associating a pollset with one of the shards. - - TODO(ctiller): move this out from this file, and allow an eventfd - implementation on linux */ - -#define LOG2_KICK_SHARDS 6 -#define KICK_SHARDS (1 << LOG2_KICK_SHARDS) - -static int g_kick_pipes[KICK_SHARDS][2]; static grpc_pollset g_backup_pollset; static int g_shutdown_backup_poller; static gpr_event g_backup_poller_done; @@ -82,65 +70,22 @@ static void backup_poller(void *p) { gpr_event_set(&g_backup_poller_done, (void *)1); } -static size_t kick_shard(const grpc_pollset *info) { - size_t x = (size_t)info; - return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (KICK_SHARDS - 1); -} - -int grpc_kick_read_fd(grpc_pollset *p) { - return g_kick_pipes[kick_shard(p)][0]; -} - -static int grpc_kick_write_fd(grpc_pollset *p) { - return g_kick_pipes[kick_shard(p)][1]; -} - -void grpc_pollset_force_kick(grpc_pollset *p) { - char c = 0; - while (write(grpc_kick_write_fd(p), &c, 1) != 1 && errno == EINTR) - ; -} - void grpc_pollset_kick(grpc_pollset *p) { if (!p->counter) return; - grpc_pollset_force_kick(p); + grpc_pollset_kick_kick(&p->kick_state); } -void grpc_kick_drain(grpc_pollset *p) { - int fd = grpc_kick_read_fd(p); - char buf[128]; - int r; - - for (;;) { - r = read(fd, buf, sizeof(buf)); - if (r > 0) continue; - if (r == 0) return; - switch (errno) { - case EAGAIN: - return; - case EINTR: - continue; - default: - gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno)); - return; - } - } -} +void grpc_pollset_force_kick(grpc_pollset *p) { grpc_pollset_kick(p); } /* global state management */ grpc_pollset *grpc_backup_pollset(void) { return &g_backup_pollset; } void grpc_pollset_global_init(void) { - int i; gpr_thd_id id; - /* initialize the kick shards */ - for (i = 0; i < KICK_SHARDS; i++) { - GPR_ASSERT(0 == pipe(g_kick_pipes[i])); - GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][0], 1)); - GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][1], 1)); - } + /* Initialize kick fd state */ + grpc_pollset_kick_global_init(); /* initialize the backup pollset */ grpc_pollset_init(&g_backup_pollset); @@ -152,8 +97,6 @@ void grpc_pollset_global_init(void) { } void grpc_pollset_global_shutdown(void) { - int i; - /* terminate the backup poller thread */ gpr_mu_lock(&g_backup_pollset.mu); g_shutdown_backup_poller = 1; @@ -163,11 +106,8 @@ void grpc_pollset_global_shutdown(void) { /* destroy the backup pollset */ grpc_pollset_destroy(&g_backup_pollset); - /* destroy the kick shards */ - for (i = 0; i < KICK_SHARDS; i++) { - close(g_kick_pipes[i][0]); - close(g_kick_pipes[i][1]); - } + /* destroy the kick pipes */ + grpc_pollset_kick_global_destroy(); } /* main interface */ @@ -178,6 +118,7 @@ static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd); void grpc_pollset_init(grpc_pollset *pollset) { gpr_mu_init(&pollset->mu); gpr_cv_init(&pollset->cv); + grpc_pollset_kick_init(&pollset->kick_state); become_empty_pollset(pollset); } @@ -213,6 +154,7 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { void grpc_pollset_destroy(grpc_pollset *pollset) { pollset->vtable->destroy(pollset); + grpc_pollset_kick_destroy(&pollset->kick_state); gpr_mu_destroy(&pollset->mu); gpr_cv_destroy(&pollset->cv); } @@ -290,7 +232,11 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, return 1; } } - pfd[0].fd = grpc_kick_read_fd(pollset); + pfd[0].fd = grpc_pollset_kick_pre_poll(&pollset->kick_state); + if (pfd[0].fd < 0) { + /* Already kicked */ + return 1; + } pfd[0].events = POLLIN; pfd[0].revents = 0; pfd[1].fd = fd->fd; @@ -308,7 +254,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, /* do nothing */ } else { if (pfd[0].revents & POLLIN) { - grpc_kick_drain(pollset); + grpc_pollset_kick_consume(&pollset->kick_state); } if (pfd[1].revents & POLLIN) { grpc_fd_become_readable(fd, allow_synchronous_callback); @@ -318,6 +264,8 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, } } + grpc_pollset_kick_post_poll(&pollset->kick_state); + gpr_mu_lock(&pollset->mu); grpc_fd_end_poll(fd, pollset); pollset->counter = 0; diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 32a8f533ae..f62433707e 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -36,6 +36,8 @@ #include <grpc/support/sync.h> +#include "src/core/iomgr/pollset_kick.h" + typedef struct grpc_pollset_vtable grpc_pollset_vtable; /* forward declare only in this file to avoid leaking impl details via @@ -51,6 +53,7 @@ typedef struct grpc_pollset { const grpc_pollset_vtable *vtable; gpr_mu mu; gpr_cv cv; + grpc_pollset_kick_state kick_state; int counter; union { int fd; |