aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-06-08 12:56:56 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-06-08 12:56:56 -0700
commit5855c478c69508f000baa4878f515d72b5f5a1e9 (patch)
tree2e63d6483736c6ac53ce47e6058ee1ef5296d390 /src/core/lib/iomgr
parent5dbbbb1912d08147709c0a53ced2bed081c2f80c (diff)
Use poll if not linux, add read notifier pollset support and some
groundwork for adding API that allows users to register custom kick signal number
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c72
-rw-r--r--src/core/lib/iomgr/ev_posix.c2
2 files changed, 53 insertions, 21 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index d5aac96fa4..69ab665e15 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -33,7 +33,7 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_POSIX_SOCKET
+#ifdef GPR_LINUX_EPOLL
#include "src/core/lib/iomgr/ev_epoll_linux.h"
@@ -60,6 +60,8 @@
struct polling_island;
+static int grpc_poller_kick_signum;
+
/*******************************************************************************
* Fd Declarations
*/
@@ -92,6 +94,9 @@ struct grpc_fd {
struct grpc_fd *freelist_next;
grpc_closure *on_done_closure;
+ /* The pollset that last noticed that the fd is readable */
+ grpc_pollset *read_notifier_pollset;
+
grpc_iomgr_object iomgr_object;
};
@@ -650,14 +655,15 @@ static grpc_fd *fd_create(int fd, const char *name) {
gpr_mu_lock(&new_fd->mu);
gpr_atm_rel_store(&new_fd->refst, 1);
+ new_fd->fd = fd;
new_fd->shutdown = false;
+ new_fd->orphaned = false;
new_fd->read_closure = CLOSURE_NOT_READY;
new_fd->write_closure = CLOSURE_NOT_READY;
- new_fd->fd = fd;
new_fd->polling_island = NULL;
new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL;
- new_fd->orphaned = false;
+ new_fd->read_notifier_pollset = NULL;
gpr_mu_unlock(&new_fd->mu);
@@ -765,6 +771,17 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
}
+static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd) {
+ grpc_pollset *notifier = NULL;
+
+ gpr_mu_lock(&fd->mu);
+ notifier = fd->read_notifier_pollset;
+ gpr_mu_unlock(&fd->mu);
+
+ return notifier;
+}
+
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->mu);
GPR_ASSERT(!fd->shutdown);
@@ -801,16 +818,25 @@ static void sig_handler(int sig_num) {
#endif
}
+static void poller_kick_init() {
+ grpc_poller_kick_signum = SIGRTMIN + 2;
+ signal(grpc_poller_kick_signum, sig_handler);
+}
+
/* Global state management */
static void pollset_global_init(void) {
grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
- signal(SIGUSR1, sig_handler); /* TODO: sreek - Do not hardcode SIGUSR1 */
+ poller_kick_init();
}
static void pollset_global_shutdown(void) {
grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
}
+static void pollset_worker_kick(grpc_pollset_worker *worker) {
+ pthread_kill(worker->pt_id, grpc_poller_kick_signum);
+}
+
/* Return 1 if the pollset has active threads in pollset_work (pollset must
* be locked) */
static int pollset_has_workers(grpc_pollset *p) {
@@ -856,7 +882,7 @@ static void pollset_kick(grpc_pollset *p,
GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
for (worker = p->root_worker.next; worker != &p->root_worker;
worker = worker->next) {
- pthread_kill(worker->pt_id, SIGUSR1);
+ pollset_worker_kick(worker);
}
} else {
p->kicked_without_pollers = true;
@@ -864,7 +890,7 @@ static void pollset_kick(grpc_pollset *p,
GPR_TIMER_END("pollset_kick.broadcast", 0);
} else {
GPR_TIMER_MARK("kicked_specifically", 0);
- pthread_kill(worker->pt_id, SIGUSR1);
+ pollset_worker_kick(worker);
}
} else {
GPR_TIMER_MARK("kick_anonymous", 0);
@@ -872,7 +898,7 @@ static void pollset_kick(grpc_pollset *p,
if (worker != NULL) {
GPR_TIMER_MARK("finally_kick", 0);
push_back_worker(p, worker);
- pthread_kill(worker->pt_id, SIGUSR1);
+ pollset_worker_kick(worker);
} else {
GPR_TIMER_MARK("kicked_no_pollers", 0);
p->kicked_without_pollers = true;
@@ -924,20 +950,20 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
}
-static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
- /* only one set_ready can be active at once (but there may be a racing
- notify_on) */
+static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_pollset *notifier) {
+ /* Need the fd->mu since we might be racing with fd_notify_on_read */
gpr_mu_lock(&fd->mu);
- set_ready_locked(exec_ctx, fd, st);
+ set_ready_locked(exec_ctx, fd, &fd->read_closure);
+ fd->read_notifier_pollset = notifier;
gpr_mu_unlock(&fd->mu);
}
-static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- set_ready(exec_ctx, fd, &fd->read_closure);
-}
-
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- set_ready(exec_ctx, fd, &fd->write_closure);
+ /* Need the fd->mu since we might be racing with fd_notify_on_write */
+ gpr_mu_lock(&fd->mu);
+ set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ gpr_mu_unlock(&fd->mu);
}
#define GRPC_EPOLL_MAX_EVENTS 1000
@@ -1007,7 +1033,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
} else {
if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd);
+ fd_become_readable(exec_ctx, fd, pollset);
}
if (write_ev || cancel) {
fd_become_writable(exec_ctx, fd);
@@ -1109,9 +1135,9 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->kicked_without_pollers = 0;
} else if (!pollset->shutting_down) {
sigemptyset(&new_mask);
- sigaddset(&new_mask, SIGUSR1);
+ sigaddset(&new_mask, grpc_poller_kick_signum);
pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
- sigdelset(&orig_mask, SIGUSR1);
+ sigdelset(&orig_mask, grpc_poller_kick_signum);
push_front_worker(pollset, &worker);
@@ -1350,6 +1376,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_shutdown = fd_shutdown,
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
+ .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -1380,4 +1407,9 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
return &vtable;
}
-#endif
+#else /* defined(GPR_LINUX_EPOLL) */
+/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
+ * NULL */
+const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
+
+#endif /* !defined(GPR_LINUX_EPOLL) */
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index e0c3558a51..2b15967adc 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -63,8 +63,8 @@ typedef struct {
} event_engine_factory;
static const event_engine_factory g_factories[] = {
- {"poll", grpc_init_poll_posix},
{"epoll", grpc_init_epoll_linux},
+ {"poll", grpc_init_poll_posix},
{"legacy", grpc_init_poll_and_epoll_posix},
};