aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreecha@users.noreply.github.com>2017-08-28 14:10:25 -0700
committerGravatar GitHub <noreply@github.com>2017-08-28 14:10:25 -0700
commit2d060b6bd562b24f30fcf545ac0dac76fecd8b86 (patch)
tree6837e790c9cea5f805f9258f60ef168e49b765be
parentf769bf12737bcff22b98b7265c5d4cd6c7bebd4d (diff)
parente01940f2b3210b94485c80bab77392103d557648 (diff)
Merge pull request #12221 from sreecha/epoll1-work-dist
Epoll1 Work Distribution: Parallelize processing epoll events across multiple threads
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c226
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c2
2 files changed, 168 insertions, 60 deletions
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 3ee646fa36..b940d48ba9 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -48,7 +48,60 @@
#include "src/core/lib/support/string.h"
static grpc_wakeup_fd global_wakeup_fd;
-static int g_epfd;
+
+/*******************************************************************************
+ * Singleton epoll set related fields
+ */
+
+#define MAX_EPOLL_EVENTS 100
+#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
+
+/* NOTE ON SYNCHRONIZATION:
+ * - Fields in this struct are only modified by the designated poller. Hence
+ * there is no need for any locks to protect the struct.
+ * - num_events and cursor fields have to be of atomic type to provide memory
+ * visibility guarantees only. i.e In case of multiple pollers, the designated
+ * polling thread keeps changing; the thread that wrote these values may be
+ * different from the thread reading the values
+ */
+typedef struct epoll_set {
+ int epfd;
+
+ /* The epoll_events after the last call to epoll_wait() */
+ struct epoll_event events[MAX_EPOLL_EVENTS];
+
+ /* The number of epoll_events after the last call to epoll_wait() */
+ gpr_atm num_events;
+
+ /* Index of the first event in epoll_events that has to be processed. This
+ * field is only valid if num_events > 0 */
+ gpr_atm cursor;
+} epoll_set;
+
+/* The global singleton epoll set */
+static epoll_set g_epoll_set;
+
+/* Must be called *only* once */
+static bool epoll_set_init() {
+ g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC);
+ if (g_epoll_set.epfd < 0) {
+ gpr_log(GPR_ERROR, "epoll unavailable");
+ return false;
+ }
+
+ gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
+ gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
+ gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
+ return true;
+}
+
+/* epoll_set_init() MUST be called before calling this. */
+static void epoll_set_shutdown() {
+ if (g_epoll_set.epfd >= 0) {
+ close(g_epoll_set.epfd);
+ g_epoll_set.epfd = -1;
+ }
+}
/*******************************************************************************
* Fd Declarations
@@ -122,7 +175,7 @@ struct grpc_pollset {
bool kicked_without_poller;
/* Set to true if the pollset is observed to have no workers available to
- * poll */
+ poll */
bool seen_inactive;
bool shutting_down; /* Is the pollset shutting down ? */
grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
@@ -228,7 +281,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
.data.ptr = new_fd};
- if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
+ if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
}
@@ -326,7 +379,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
GPR_TLS_DECL(g_current_thread_pollset);
GPR_TLS_DECL(g_current_thread_worker);
+
+/* The designated poller */
static gpr_atm g_active_poller;
+
static pollset_neighbourhood *g_neighbourhoods;
static size_t g_num_neighbourhoods;
@@ -380,7 +436,8 @@ static grpc_error *pollset_global_init(void) {
if (err != GRPC_ERROR_NONE) return err;
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
.data.ptr = &global_wakeup_fd};
- if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
+ if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
+ &ev) != 0) {
return GRPC_OS_ERROR(errno, "epoll_ctl");
}
g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
@@ -497,8 +554,6 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_END("pollset_shutdown", 0);
}
-#define MAX_EPOLL_EVENTS 100
-
static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
@@ -517,56 +572,89 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
return millis >= 1 ? millis : 1;
}
-static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- gpr_timespec now, gpr_timespec deadline) {
- struct epoll_event events[MAX_EPOLL_EVENTS];
- static const char *err_desc = "pollset_poll";
-
- GPR_TIMER_BEGIN("pollset_epoll", 0);
-
- int timeout = poll_deadline_to_millis_timeout(deadline, now);
-
- if (timeout != 0) {
- GRPC_SCHEDULING_START_BLOCKING_REGION;
- }
- int r;
- do {
- GPR_TIMER_BEGIN("epoll_wait", 0);
- r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
- GPR_TIMER_END("epoll_wait", 0);
- } while (r < 0 && errno == EINTR);
- if (timeout != 0) {
- GRPC_SCHEDULING_END_BLOCKING_REGION;
- }
+/* Process the epoll events found by do_epoll_wait() function.
+ - g_epoll_set.cursor points to the index of the first event to be processed
+ - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
+ updates the g_epoll_set.cursor
+
+ NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
+ called by g_active_poller thread. So there is no need for synchronization
+ when accessing fields in g_epoll_set */
+static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset) {
+ static const char *err_desc = "process_events";
+ grpc_error *error = GRPC_ERROR_NONE;
- if (r < 0) {
- GPR_TIMER_END("pollset_epoll", 0);
- return GRPC_OS_ERROR(errno, "epoll_wait");
- }
+ GPR_TIMER_BEGIN("process_epoll_events", 0);
+ long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
+ long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
+ for (int idx = 0;
+ (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
+ idx++) {
+ long c = cursor++;
+ struct epoll_event *ev = &g_epoll_set.events[c];
+ void *data_ptr = ev->data.ptr;
- grpc_error *error = GRPC_ERROR_NONE;
- for (int i = 0; i < r; i++) {
- void *data_ptr = events[i].data.ptr;
if (data_ptr == &global_wakeup_fd) {
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else {
grpc_fd *fd = (grpc_fd *)(data_ptr);
- bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
- bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
- bool write_ev = (events[i].events & EPOLLOUT) != 0;
+ bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
+ bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
+ bool write_ev = (ev->events & EPOLLOUT) != 0;
+
if (read_ev || cancel) {
fd_become_readable(exec_ctx, fd, pollset);
}
+
if (write_ev || cancel) {
fd_become_writable(exec_ctx, fd);
}
}
}
- GPR_TIMER_END("pollset_epoll", 0);
+ gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
+ GPR_TIMER_END("process_epoll_events", 0);
return error;
}
+/* Do epoll_wait and store the events in g_epoll_set.events field. This does not
+ "process" any of the events yet; that is done in process_epoll_events().
+ *See process_epoll_events() function for more details.
+
+ NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
+ (i.e the designated poller thread) will be calling this function. So there is
+ no need for any synchronization when accesing fields in g_epoll_set */
+static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
+ gpr_timespec now, gpr_timespec deadline) {
+ GPR_TIMER_BEGIN("do_epoll_wait", 0);
+
+ int r;
+ int timeout = poll_deadline_to_millis_timeout(deadline, now);
+ if (timeout != 0) {
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ }
+ do {
+ r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
+ timeout);
+ } while (r < 0 && errno == EINTR);
+ if (timeout != 0) {
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+ }
+
+ if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
+
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r);
+ }
+
+ gpr_atm_rel_store(&g_epoll_set.num_events, r);
+ gpr_atm_rel_store(&g_epoll_set.cursor, 0);
+
+ GPR_TIMER_END("do_epoll_wait", 0);
+ return GRPC_ERROR_NONE;
+}
+
static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl, gpr_timespec *now,
gpr_timespec deadline) {
@@ -827,32 +915,55 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
-static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
grpc_pollset_worker worker;
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollset_work";
GPR_TIMER_BEGIN("pollset_work", 0);
- if (pollset->kicked_without_poller) {
- pollset->kicked_without_poller = false;
+ if (ps->kicked_without_poller) {
+ ps->kicked_without_poller = false;
GPR_TIMER_END("pollset_work", 0);
return GRPC_ERROR_NONE;
}
- if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+
+ if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) {
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
- GPR_ASSERT(!pollset->shutting_down);
- GPR_ASSERT(!pollset->seen_inactive);
- gpr_mu_unlock(&pollset->mu);
- append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
- err_desc);
- gpr_mu_lock(&pollset->mu);
+ GPR_ASSERT(!ps->shutting_down);
+ GPR_ASSERT(!ps->seen_inactive);
+
+ gpr_mu_unlock(&ps->mu); /* unlock */
+ /* This is the designated polling thread at this point and should ideally do
+ polling. However, if there are unprocessed events left from a previous
+ call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
+ process the pending epoll events.
+
+ The reason for decoupling do_epoll_wait and process_epoll_events is to
+ better distrubute the work (i.e handling epoll events) across multiple
+ threads
+
+ process_epoll_events() returns very quickly: It just queues the work on
+ exec_ctx but does not execute it (the actual exectution or more
+ accurately grpc_exec_ctx_flush() happens in end_worker() AFTER selecting
+ a designated poller). So we are not waiting long periods without a
+ designated poller */
+ if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
+ gpr_atm_acq_load(&g_epoll_set.num_events)) {
+ append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline),
+ err_desc);
+ }
+ append_error(&error, process_epoll_events(exec_ctx, ps), err_desc);
+
+ gpr_mu_lock(&ps->mu); /* lock */
+
gpr_tls_set(&g_current_thread_worker, 0);
} else {
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
}
- end_worker(exec_ctx, pollset, &worker, worker_hdl);
+ end_worker(exec_ctx, ps, &worker, worker_hdl);
+
gpr_tls_set(&g_current_thread_pollset, 0);
GPR_TIMER_END("pollset_work", 0);
return error;
@@ -1043,7 +1154,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
- close(g_epfd);
+ epoll_set_shutdown();
}
static const grpc_event_engine_vtable vtable = {
@@ -1078,7 +1189,8 @@ static const grpc_event_engine_vtable vtable = {
};
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
- * Create a dummy epoll_fd to make sure epoll support is available */
+ * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
+ * support is available */
const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
if (!explicit_request) {
return NULL;
@@ -1088,22 +1200,18 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
return NULL;
}
- g_epfd = epoll_create1(EPOLL_CLOEXEC);
- if (g_epfd < 0) {
- gpr_log(GPR_ERROR, "epoll unavailable");
+ if (!epoll_set_init()) {
return NULL;
}
fd_global_init();
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
- close(g_epfd);
fd_global_shutdown();
+ epoll_set_shutdown();
return NULL;
}
- gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd);
-
return &vtable;
}
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 770d1fd0a9..1f4adea5d4 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -49,7 +49,7 @@
#include "src/core/lib/support/spinlock.h"
/*******************************************************************************
- * Pollset-set sibling link
+ * Polling object
*/
typedef enum {