aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/ev_epollex_linux.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-04-06 16:05:45 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-04-06 16:05:45 -0700
commite24b24d3c646c40248ea4581f3c5b03597797544 (patch)
treeba0dec0ff160e90fe585c8549391b1f1c59ed00d /src/core/lib/iomgr/ev_epollex_linux.c
parentad08c8189a210fa28457e4da025bed4b2fd95c26 (diff)
Implement pollset for epollex
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.c')
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c279
1 files changed, 247 insertions, 32 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 71e8b7e4fb..0985755a43 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -62,8 +62,9 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
-/* Uncomment the following to enable extra checks on poll_object operations */
-/* #define PO_DEBUG */
+#ifndef EPOLLEXCLUSIVE
+#define EPOLLEXCLUSIVE (1u << 28)
+#endif
/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
* sure to wake up one polling thread (which can wake up other threads if
@@ -85,6 +86,8 @@ struct grpc_fd {
Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst;
+ grpc_wakeup_fd workqueue_wakeup_fd;
+
/* The fd is either closed or we relinquished control of it. In either
cases, this indicates that the 'fd' on this structure is no longer
valid */
@@ -131,16 +134,22 @@ static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
* Pollset Declarations
*/
struct grpc_pollset_worker {
- /* Thread id of this worker */
- pthread_t pt_id;
-
- /* Used to prevent a worker from getting kicked multiple times */
- gpr_atm is_kicked;
- struct grpc_pollset_worker *next;
- struct grpc_pollset_worker *prev;
+ bool kicked;
+ bool initialized_cv;
+ gpr_cv cv;
+ grpc_pollset_worker *next;
+ grpc_pollset_worker *prev;
};
-struct grpc_pollset {};
+struct grpc_pollset {
+ gpr_mu mu;
+ int epfd;
+ int num_pollers;
+ gpr_atm shutdown_atm;
+ grpc_closure *shutdown_closure;
+ grpc_wakeup_fd pollset_wakeup;
+ grpc_pollset_worker *root_worker;
+};
/*******************************************************************************
* Pollset-set Declarations
@@ -151,6 +160,16 @@ struct grpc_pollset_set {};
* Common helpers
*/
+static bool append_error(grpc_error **composite, grpc_error *error,
+ const char *desc) {
+ if (error == GRPC_ERROR_NONE) return true;
+ if (*composite == GRPC_ERROR_NONE) {
+ *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
+ }
+ *composite = grpc_error_add_child(*composite, error);
+ return false;
+}
+
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
const char *file, int line,
@@ -400,13 +419,10 @@ static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { abort(); }
GPR_TLS_DECL(g_current_thread_pollset);
GPR_TLS_DECL(g_current_thread_worker);
-static void poller_kick_init() {}
-
/* Global state management */
static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_pollset);
gpr_tls_init(&g_current_thread_worker);
- poller_kick_init();
return grpc_wakeup_fd_init(&global_wakeup_fd);
}
@@ -419,12 +435,41 @@ static void pollset_global_shutdown(void) {
/* p->mu must be held before calling this function */
static grpc_error *pollset_kick(grpc_pollset *p,
grpc_pollset_worker *specific_worker) {
- abort();
+ if (specific_worker == NULL) {
+ if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
+ return grpc_wakeup_fd_wakeup(&p->pollset_wakeup);
+ } else {
+ return GRPC_ERROR_NONE;
+ }
+ } else if (gpr_tls_get(&g_current_thread_worker) ==
+ (intptr_t)specific_worker) {
+ return GRPC_ERROR_NONE;
+ } else if (specific_worker == p->root_worker) {
+ return grpc_wakeup_fd_wakeup(&p->pollset_wakeup);
+ } else {
+ gpr_cv_signal(&specific_worker->cv);
+ return GRPC_ERROR_NONE;
+ }
}
-static grpc_error *kick_poller(void) { abort(); }
+static grpc_error *kick_poller(void) {
+ return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+}
-static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {}
+static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
+ gpr_mu_init(&pollset->mu);
+ pollset->epfd = epoll_create1(EPOLL_CLOEXEC);
+ if (pollset->epfd < 0) {
+ GRPC_LOG_IF_ERROR("pollset_init", GRPC_OS_ERROR(errno, "epoll_create1"));
+ }
+ pollset->num_pollers = 0;
+ gpr_atm_no_barrier_store(&pollset->shutdown_atm, 0);
+ pollset->shutdown_closure = NULL;
+ GRPC_LOG_IF_ERROR("pollset_init",
+ grpc_wakeup_fd_init(&pollset->pollset_wakeup));
+ pollset->root_worker = NULL;
+ *mu = &pollset->mu;
+}
/* Convert a timespec to milliseconds:
- Very small or negative poll times are clamped to zero to do a non-blocking
@@ -469,33 +514,186 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
}
-static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset) {
- abort();
-}
-
/* pollset->po.mu lock must be held by the caller before calling this */
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_closure *closure) {}
+ grpc_closure *closure) {
+ GPR_ASSERT(pollset->shutdown_closure == NULL);
+ pollset->shutdown_closure = closure;
+ if (pollset->num_pollers > 0) {
+ struct epoll_event ev = {.events = EPOLLIN,
+ .data.ptr = &pollset->pollset_wakeup};
+ epoll_ctl(pollset->epfd, EPOLL_CTL_MOD, pollset->pollset_wakeup.read_fd,
+ &ev);
+ GRPC_LOG_IF_ERROR("pollset_shutdown",
+ grpc_wakeup_fd_wakeup(&pollset->pollset_wakeup));
+ } else {
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ }
+}
+
+/* pollset_shutdown is guaranteed to be called before pollset_destroy. */
+static void pollset_destroy(grpc_pollset *pollset) {
+ gpr_mu_destroy(&pollset->mu);
+ if (pollset->epfd >= 0) close(pollset->epfd);
+ grpc_wakeup_fd_destroy(&pollset->pollset_wakeup);
+}
-/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
- * than destroying the mutexes, there is nothing special that needs to be done
- * here */
-static void pollset_destroy(grpc_pollset *pollset) {}
+#define MAX_EPOLL_EVENTS 100
-/* pollset->po.mu lock must be held by the caller before calling this.
- The function pollset_work() may temporarily release the lock (pollset->po.mu)
+static grpc_error *pollset_poll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ gpr_timespec now, gpr_timespec deadline) {
+ struct epoll_event events[MAX_EPOLL_EVENTS];
+ static const char *err_desc = "pollset_poll";
+
+ if (pollset->epfd < 0) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "epoll fd failed to initialize");
+ }
+
+ int r = epoll_wait(pollset->epfd, events, MAX_EPOLL_EVENTS,
+ poll_deadline_to_millis_timeout(deadline, now));
+ if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
+
+ grpc_error *error = GRPC_ERROR_NONE;
+ for (int i = 0; i < r; i++) {
+ void *data_ptr = events[i].data.ptr;
+ if (data_ptr == &global_wakeup_fd) {
+ grpc_timer_consume_kick();
+ append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
+ err_desc);
+ } else if (data_ptr == &pollset->pollset_wakeup) {
+ /* once we start shutting down we stop consuming the wakeup:
+ the fd is level triggered and non-exclusive, which should result in all
+ pollers waking */
+ if (gpr_atm_no_barrier_load(&pollset->shutdown_atm) == 0) {
+ append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
+ err_desc);
+ }
+ } else {
+ grpc_fd *fd = (grpc_fd *)(((intptr_t)data_ptr) & ~(intptr_t)1);
+ bool is_workqueue = (((intptr_t)data_ptr) & 1) != 0;
+ bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
+ bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
+ bool write_ev = (events[i].events & EPOLLOUT) != 0;
+ if (is_workqueue) {
+ append_error(&error,
+ grpc_wakeup_fd_consume_wakeup(&fd->workqueue_wakeup_fd),
+ err_desc);
+ fd_invoke_workqueue(exec_ctx, fd);
+ } else {
+ if (read_ev || cancel) {
+ fd_become_readable(exec_ctx, fd, pollset);
+ }
+ if (write_ev || cancel) {
+ fd_become_writable(exec_ctx, fd);
+ }
+ }
+ }
+ }
+
+ return error;
+}
+
+/* Return true if this thread should poll */
+static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ grpc_pollset_worker **worker_hdl,
+ gpr_timespec deadline) {
+ if (worker_hdl != NULL) {
+ *worker_hdl = worker;
+ worker->kicked = false;
+ if (pollset->root_worker == NULL) {
+ pollset->root_worker = worker;
+ worker->next = worker->prev = worker;
+ worker->initialized_cv = false;
+ } else {
+ worker->next = pollset->root_worker;
+ worker->prev = worker->next->prev;
+ worker->next->prev = worker->prev->next = worker;
+ worker->initialized_cv = true;
+ gpr_cv_init(&worker->cv);
+ while (pollset->root_worker != worker) {
+ if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline)) return false;
+ if (worker->kicked) return false;
+ }
+ }
+ }
+ return pollset->shutdown_closure == NULL;
+}
+
+static void end_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
+ grpc_pollset_worker **worker_hdl) {
+ if (worker_hdl != NULL) {
+ if (worker == pollset->root_worker) {
+ if (worker == worker->next) {
+ pollset->root_worker = NULL;
+ } else {
+ pollset->root_worker = worker->next;
+ worker->prev->next = worker->next;
+ worker->next->prev = worker->prev;
+ }
+ } else {
+ worker->prev->next = worker->next;
+ worker->next->prev = worker->prev;
+ }
+ if (worker->initialized_cv) {
+ gpr_cv_destroy(&worker->cv);
+ }
+ }
+}
+
+/* pollset->mu lock must be held by the caller before calling this.
+ The function pollset_work() may temporarily release the lock (pollset->mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
- abort();
+ grpc_pollset_worker worker;
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (begin_worker(pollset, &worker, worker_hdl, deadline)) {
+ GPR_ASSERT(!pollset->shutdown_closure);
+ pollset->num_pollers++;
+ gpr_mu_unlock(&pollset->mu);
+ error = pollset_poll(exec_ctx, pollset, now, deadline);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ pollset->num_pollers--;
+ if (pollset->num_pollers == 0 && pollset->shutdown_closure != NULL) {
+ grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
+ }
+ }
+ end_worker(pollset, &worker, worker_hdl);
+ return error;
}
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {
- abort();
+ grpc_error *error = GRPC_ERROR_NONE;
+ static const char *err_desc = "pollset_add_fd";
+ struct epoll_event ev_fd = {
+ .events = EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE, .data.ptr = fd};
+ if (epoll_ctl(pollset->epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
+ switch (errno) {
+ case EEXIST: /* if this fd is already in the epoll set, the workqueue fd
+ must also be - just return */
+ return;
+ default:
+ append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc);
+ }
+ }
+ struct epoll_event ev_wq = {.events = EPOLLET | EPOLLIN | EPOLLEXCLUSIVE,
+ .data.ptr = fd};
+ if (epoll_ctl(pollset->epfd, EPOLL_CTL_ADD, fd->workqueue_wakeup_fd.read_fd,
+ &ev_wq) != 0) {
+ switch (errno) {
+ case EEXIST: /* if the workqueue fd is already in the epoll set we're ok -
+ no need to do anything special */
+ break;
+ default:
+ append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc);
+ }
+ }
+ GRPC_LOG_IF_ERROR("pollset_add_fd", error);
}
/*******************************************************************************
@@ -593,15 +791,32 @@ static const grpc_event_engine_vtable vtable = {
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
* Create a dummy epoll_fd to make sure epoll support is available */
-static bool is_epollex_available() {
+static bool is_epollex_available(void) {
int fd = epoll_create1(EPOLL_CLOEXEC);
if (fd < 0) {
gpr_log(
GPR_ERROR,
- "epoll_create1 failed with error: %d. Not using epoll polling engine",
+ "epoll_create1 failed with error: %d. Not using epollex polling engine",
fd);
return false;
}
+ grpc_wakeup_fd wakeup;
+ if (!GRPC_LOG_IF_ERROR("check_wakeupfd_for_epollex",
+ grpc_wakeup_fd_init(&wakeup))) {
+ return false;
+ }
+ struct epoll_event ev = {.events = EPOLLET | EPOLLIN | EPOLLEXCLUSIVE,
+ .data.ptr = NULL};
+ if (epoll_ctl(fd, EPOLL_CTL_ADD, wakeup.read_fd, &ev) != 0) {
+ gpr_log(GPR_ERROR,
+ "epoll_ctl with EPOLLEXCLUSIVE failed with error: %d. Not using "
+ "epollex polling engine",
+ fd);
+ close(fd);
+ grpc_wakeup_fd_destroy(&wakeup);
+ return false;
+ }
+ grpc_wakeup_fd_destroy(&wakeup);
close(fd);
return true;
}