diff options
Diffstat (limited to 'src/core/iomgr/pollset_posix.c')
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 84 |
1 files changed, 16 insertions, 68 deletions
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 6f1b3ced7d..2555322532 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -48,18 +48,6 @@ #include <grpc/support/thd.h> #include <grpc/support/useful.h> -/* kick pipes: we keep a sharded set of pipes to allow breaking from poll. - Ideally this would be 1:1 with pollsets, but we'd like to avoid associating - full kernel objects with each pollset to keep them lightweight, so instead - keep a sharded set and allow associating a pollset with one of the shards. - - TODO(ctiller): move this out from this file, and allow an eventfd - implementation on linux */ - -#define LOG2_KICK_SHARDS 6 -#define KICK_SHARDS (1 << LOG2_KICK_SHARDS) - -static int g_kick_pipes[KICK_SHARDS][2]; static grpc_pollset g_backup_pollset; static int g_shutdown_backup_poller; static gpr_event g_backup_poller_done; @@ -82,65 +70,22 @@ static void backup_poller(void *p) { gpr_event_set(&g_backup_poller_done, (void *)1); } -static size_t kick_shard(const grpc_pollset *info) { - size_t x = (size_t)info; - return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (KICK_SHARDS - 1); -} - -int grpc_kick_read_fd(grpc_pollset *p) { - return g_kick_pipes[kick_shard(p)][0]; -} - -static int grpc_kick_write_fd(grpc_pollset *p) { - return g_kick_pipes[kick_shard(p)][1]; -} - -void grpc_pollset_force_kick(grpc_pollset *p) { - char c = 0; - while (write(grpc_kick_write_fd(p), &c, 1) != 1 && errno == EINTR) - ; -} - void grpc_pollset_kick(grpc_pollset *p) { if (!p->counter) return; - grpc_pollset_force_kick(p); + grpc_pollset_kick_kick(&p->kick_state); } -void grpc_kick_drain(grpc_pollset *p) { - int fd = grpc_kick_read_fd(p); - char buf[128]; - int r; - - for (;;) { - r = read(fd, buf, sizeof(buf)); - if (r > 0) continue; - if (r == 0) return; - switch (errno) { - case EAGAIN: - return; - case EINTR: - continue; - default: - gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno)); - return; - } - } -} +void grpc_pollset_force_kick(grpc_pollset *p) { grpc_pollset_kick(p); } /* global state management */ grpc_pollset *grpc_backup_pollset(void) { return &g_backup_pollset; } void grpc_pollset_global_init(void) { - int i; gpr_thd_id id; - /* initialize the kick shards */ - for (i = 0; i < KICK_SHARDS; i++) { - GPR_ASSERT(0 == pipe(g_kick_pipes[i])); - GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][0], 1)); - GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][1], 1)); - } + /* Initialize kick fd state */ + grpc_pollset_kick_global_init(); /* initialize the backup pollset */ grpc_pollset_init(&g_backup_pollset); @@ -152,8 +97,6 @@ void grpc_pollset_global_init(void) { } void grpc_pollset_global_shutdown(void) { - int i; - /* terminate the backup poller thread */ gpr_mu_lock(&g_backup_pollset.mu); g_shutdown_backup_poller = 1; @@ -163,11 +106,8 @@ void grpc_pollset_global_shutdown(void) { /* destroy the backup pollset */ grpc_pollset_destroy(&g_backup_pollset); - /* destroy the kick shards */ - for (i = 0; i < KICK_SHARDS; i++) { - close(g_kick_pipes[i][0]); - close(g_kick_pipes[i][1]); - } + /* destroy the kick pipes */ + grpc_pollset_kick_global_destroy(); } /* main interface */ @@ -178,6 +118,7 @@ static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd); void grpc_pollset_init(grpc_pollset *pollset) { gpr_mu_init(&pollset->mu); gpr_cv_init(&pollset->cv); + grpc_pollset_kick_init(&pollset->kick_state); become_empty_pollset(pollset); } @@ -213,6 +154,7 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { void grpc_pollset_destroy(grpc_pollset *pollset) { pollset->vtable->destroy(pollset); + grpc_pollset_kick_destroy(&pollset->kick_state); gpr_mu_destroy(&pollset->mu); gpr_cv_destroy(&pollset->cv); } @@ -290,7 +232,11 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, return 1; } } - pfd[0].fd = grpc_kick_read_fd(pollset); + pfd[0].fd = grpc_pollset_kick_pre_poll(&pollset->kick_state); + if (pfd[0].fd < 0) { + /* Already kicked */ + return 1; + } pfd[0].events = POLLIN; pfd[0].revents = 0; pfd[1].fd = fd->fd; @@ -308,7 +254,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, /* do nothing */ } else { if (pfd[0].revents & POLLIN) { - grpc_kick_drain(pollset); + grpc_pollset_kick_consume(&pollset->kick_state); } if (pfd[1].revents & POLLIN) { grpc_fd_become_readable(fd, allow_synchronous_callback); @@ -318,6 +264,8 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, } } + grpc_pollset_kick_post_poll(&pollset->kick_state); + gpr_mu_lock(&pollset->mu); grpc_fd_end_poll(fd, pollset); pollset->counter = 0; |