aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/pollset_posix.c
diff options
context:
space:
mode:
authorGravatar David Klempner <klempner@google.com>2015-01-16 15:35:56 -0800
committerGravatar David Klempner <klempner@google.com>2015-01-16 15:42:31 -0800
commit7f3ed1eda19eaf846df93b7c4d1ab1069ab0c130 (patch)
tree85d4602d24a6af6550dc5d41c3c294b642d85bb0 /src/core/iomgr/pollset_posix.c
parentbeddbdaef22989309e4adf996843efc25d1891bd (diff)
Factor out the pollset kicking mechanism and eliminate sharding
This change pulls out a separate pollset_kick module, which currently uses a freelist of pipes dynamically assigned to pollsets when they enter polling rather than the previous racy sharding mechanism. We ultimately may wish to eliminate the dynamic assignment for multipoll sets, but this should be sufficient for the moment.
Diffstat (limited to 'src/core/iomgr/pollset_posix.c')
-rw-r--r--src/core/iomgr/pollset_posix.c84
1 files changed, 16 insertions, 68 deletions
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 6f1b3ced7d..2555322532 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -48,18 +48,6 @@
#include <grpc/support/thd.h>
#include <grpc/support/useful.h>
-/* kick pipes: we keep a sharded set of pipes to allow breaking from poll.
- Ideally this would be 1:1 with pollsets, but we'd like to avoid associating
- full kernel objects with each pollset to keep them lightweight, so instead
- keep a sharded set and allow associating a pollset with one of the shards.
-
- TODO(ctiller): move this out from this file, and allow an eventfd
- implementation on linux */
-
-#define LOG2_KICK_SHARDS 6
-#define KICK_SHARDS (1 << LOG2_KICK_SHARDS)
-
-static int g_kick_pipes[KICK_SHARDS][2];
static grpc_pollset g_backup_pollset;
static int g_shutdown_backup_poller;
static gpr_event g_backup_poller_done;
@@ -82,65 +70,22 @@ static void backup_poller(void *p) {
gpr_event_set(&g_backup_poller_done, (void *)1);
}
-static size_t kick_shard(const grpc_pollset *info) {
- size_t x = (size_t)info;
- return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (KICK_SHARDS - 1);
-}
-
-int grpc_kick_read_fd(grpc_pollset *p) {
- return g_kick_pipes[kick_shard(p)][0];
-}
-
-static int grpc_kick_write_fd(grpc_pollset *p) {
- return g_kick_pipes[kick_shard(p)][1];
-}
-
-void grpc_pollset_force_kick(grpc_pollset *p) {
- char c = 0;
- while (write(grpc_kick_write_fd(p), &c, 1) != 1 && errno == EINTR)
- ;
-}
-
void grpc_pollset_kick(grpc_pollset *p) {
if (!p->counter) return;
- grpc_pollset_force_kick(p);
+ grpc_pollset_kick_kick(&p->kick_state);
}
-void grpc_kick_drain(grpc_pollset *p) {
- int fd = grpc_kick_read_fd(p);
- char buf[128];
- int r;
-
- for (;;) {
- r = read(fd, buf, sizeof(buf));
- if (r > 0) continue;
- if (r == 0) return;
- switch (errno) {
- case EAGAIN:
- return;
- case EINTR:
- continue;
- default:
- gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno));
- return;
- }
- }
-}
+void grpc_pollset_force_kick(grpc_pollset *p) { grpc_pollset_kick(p); }
/* global state management */
grpc_pollset *grpc_backup_pollset(void) { return &g_backup_pollset; }
void grpc_pollset_global_init(void) {
- int i;
gpr_thd_id id;
- /* initialize the kick shards */
- for (i = 0; i < KICK_SHARDS; i++) {
- GPR_ASSERT(0 == pipe(g_kick_pipes[i]));
- GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][0], 1));
- GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][1], 1));
- }
+ /* Initialize kick fd state */
+ grpc_pollset_kick_global_init();
/* initialize the backup pollset */
grpc_pollset_init(&g_backup_pollset);
@@ -152,8 +97,6 @@ void grpc_pollset_global_init(void) {
}
void grpc_pollset_global_shutdown(void) {
- int i;
-
/* terminate the backup poller thread */
gpr_mu_lock(&g_backup_pollset.mu);
g_shutdown_backup_poller = 1;
@@ -163,11 +106,8 @@ void grpc_pollset_global_shutdown(void) {
/* destroy the backup pollset */
grpc_pollset_destroy(&g_backup_pollset);
- /* destroy the kick shards */
- for (i = 0; i < KICK_SHARDS; i++) {
- close(g_kick_pipes[i][0]);
- close(g_kick_pipes[i][1]);
- }
+ /* destroy the kick pipes */
+ grpc_pollset_kick_global_destroy();
}
/* main interface */
@@ -178,6 +118,7 @@ static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd);
void grpc_pollset_init(grpc_pollset *pollset) {
gpr_mu_init(&pollset->mu);
gpr_cv_init(&pollset->cv);
+ grpc_pollset_kick_init(&pollset->kick_state);
become_empty_pollset(pollset);
}
@@ -213,6 +154,7 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
void grpc_pollset_destroy(grpc_pollset *pollset) {
pollset->vtable->destroy(pollset);
+ grpc_pollset_kick_destroy(&pollset->kick_state);
gpr_mu_destroy(&pollset->mu);
gpr_cv_destroy(&pollset->cv);
}
@@ -290,7 +232,11 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
return 1;
}
}
- pfd[0].fd = grpc_kick_read_fd(pollset);
+ pfd[0].fd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
+ if (pfd[0].fd < 0) {
+ /* Already kicked */
+ return 1;
+ }
pfd[0].events = POLLIN;
pfd[0].revents = 0;
pfd[1].fd = fd->fd;
@@ -308,7 +254,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
/* do nothing */
} else {
if (pfd[0].revents & POLLIN) {
- grpc_kick_drain(pollset);
+ grpc_pollset_kick_consume(&pollset->kick_state);
}
if (pfd[1].revents & POLLIN) {
grpc_fd_become_readable(fd, allow_synchronous_callback);
@@ -318,6 +264,8 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
}
}
+ grpc_pollset_kick_post_poll(&pollset->kick_state);
+
gpr_mu_lock(&pollset->mu);
grpc_fd_end_poll(fd, pollset);
pollset->counter = 0;