From 5efc913193876f57487a385c60da0a0323b91ddb Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 17 Aug 2017 14:10:38 -0700 Subject: Epoll1 Work Distribution: Parallelize processing epoll events across multiple threads --- src/core/lib/iomgr/ev_epoll1_linux.c | 204 ++++++++++++++++++++++++++--------- 1 file changed, 153 insertions(+), 51 deletions(-) (limited to 'src/core/lib/iomgr/ev_epoll1_linux.c') diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 90e0ce36cd..6fc3f46cf4 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -48,7 +48,54 @@ #include "src/core/lib/support/string.h" static grpc_wakeup_fd global_wakeup_fd; -static int g_epfd; + +/******************************************************************************* + * Singleton epoll set related fields + */ + +#define MAX_EPOLL_EVENTS 100 +#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 5 + +/* Note: Since fields in this struct are only modified by the designated poller, + we do not need any locks to protect the struct */ +typedef struct epoll_set { + int epfd; + + /* The epoll_events after the last call to epoll_wait() */ + struct epoll_event events[MAX_EPOLL_EVENTS]; + + /* The number of epoll_events after the last call to epoll_wait() */ + int num_events; + + /* Index of the first event in epoll_events that has to be processed. This + * field is only valid if num_events > 0 */ + int cursor; +} epoll_set; + +/* The global singleton epoll set */ +static epoll_set g_epoll_set; + +/* Must be called *only* once */ +static bool epoll_set_init() { + g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC); + if (g_epoll_set.epfd < 0) { + gpr_log(GPR_ERROR, "epoll unavailable"); + return false; + } + + gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epoll_set.epfd); + g_epoll_set.num_events = 0; + g_epoll_set.cursor = 0; + return true; +} + +/* epoll_set_init() MUST be called before calling this. */ +static void epoll_set_shutdown() { + if (g_epoll_set.epfd >= 0) { + close(g_epoll_set.epfd); + g_epoll_set.epfd = -1; + } +} /******************************************************************************* * Fd Declarations @@ -122,7 +169,7 @@ struct grpc_pollset { bool kicked_without_poller; /* Set to true if the pollset is observed to have no workers available to - * poll */ + poll */ bool seen_inactive; bool shutting_down; /* Is the pollset shutting down ? */ grpc_closure *shutdown_closure; /* Called after after shutdown is complete */ @@ -228,7 +275,7 @@ static grpc_fd *fd_create(int fd, const char *name) { struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET), .data.ptr = new_fd}; - if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) { + if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) { gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno)); } @@ -326,7 +373,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { GPR_TLS_DECL(g_current_thread_pollset); GPR_TLS_DECL(g_current_thread_worker); + +/* The designated poller */ static gpr_atm g_active_poller; + static pollset_neighbourhood *g_neighbourhoods; static size_t g_num_neighbourhoods; @@ -380,7 +430,8 @@ static grpc_error *pollset_global_init(void) { if (err != GRPC_ERROR_NONE) return err; struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET), .data.ptr = &global_wakeup_fd}; - if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) { + if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, + &ev) != 0) { return GRPC_OS_ERROR(errno, "epoll_ctl"); } g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS); @@ -486,8 +537,6 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset_maybe_finish_shutdown(exec_ctx, pollset); } -#define MAX_EPOLL_EVENTS 100 - static int poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) { gpr_timespec timeout; @@ -506,40 +555,39 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, return millis >= 1 ? millis : 1; } -static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - gpr_timespec now, gpr_timespec deadline) { - struct epoll_event events[MAX_EPOLL_EVENTS]; - static const char *err_desc = "pollset_poll"; +/* Process the epoll events found by do_epoll_wait() function. + - g_epoll_set.cursor points to the index of the first event to be processed + - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and + updates the g_epoll_set.cursor + + NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only + called by g_active_poller thread. So there is no need for synchronization + when accessing fields in g_epoll_set */ +static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset) { + static const char *err_desc = "process_events"; + grpc_error *error = GRPC_ERROR_NONE; - int timeout = poll_deadline_to_millis_timeout(deadline, now); + for (int idx = 0; (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && + g_epoll_set.cursor != g_epoll_set.num_events; + idx++) { + int c = g_epoll_set.cursor++; + struct epoll_event *ev = &g_epoll_set.events[c]; + void *data_ptr = ev->data.ptr; - if (timeout != 0) { - GRPC_SCHEDULING_START_BLOCKING_REGION; - } - int r; - do { - r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout); - } while (r < 0 && errno == EINTR); - if (timeout != 0) { - GRPC_SCHEDULING_END_BLOCKING_REGION; - } - - if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); - - grpc_error *error = GRPC_ERROR_NONE; - for (int i = 0; i < r; i++) { - void *data_ptr = events[i].data.ptr; if (data_ptr == &global_wakeup_fd) { append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); } else { grpc_fd *fd = (grpc_fd *)(data_ptr); - bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0; - bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0; - bool write_ev = (events[i].events & EPOLLOUT) != 0; + bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; + bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; + bool write_ev = (ev->events & EPOLLOUT) != 0; + if (read_ev || cancel) { fd_become_readable(exec_ctx, fd, pollset); } + if (write_ev || cancel) { fd_become_writable(exec_ctx, fd); } @@ -549,6 +597,40 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, return error; } +/* Do epoll_wait and store the events in g_epoll_set.events field. This does not + "process" any of the events yet; that is done in process_epoll_events(). + *See process_epoll_events() function for more details. + + NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller + (i.e the designated poller thread) will be calling this function. So there is + no need for any synchronization when accesing fields in g_epoll_set */ +static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, + gpr_timespec now, gpr_timespec deadline) { + int r; + int timeout = poll_deadline_to_millis_timeout(deadline, now); + if (timeout != 0) { + GRPC_SCHEDULING_START_BLOCKING_REGION; + } + do { + r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS, + timeout); + } while (r < 0 && errno == EINTR); + if (timeout != 0) { + GRPC_SCHEDULING_END_BLOCKING_REGION; + } + + if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); + + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r); + } + + g_epoll_set.num_events = r; + g_epoll_set.cursor = 0; + + return GRPC_ERROR_NONE; +} + static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl, gpr_timespec *now, gpr_timespec deadline) { @@ -801,30 +883,53 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, The function pollset_work() may temporarily release the lock (pollset->po.mu) during the course of its execution but it will always re-acquire the lock and ensure that it is held by the time the function returns */ -static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, +static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollset_work"; - if (pollset->kicked_without_poller) { - pollset->kicked_without_poller = false; + if (ps->kicked_without_poller) { + ps->kicked_without_poller = false; return GRPC_ERROR_NONE; } - if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { - gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); + + if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) { + gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); - GPR_ASSERT(!pollset->shutting_down); - GPR_ASSERT(!pollset->seen_inactive); - gpr_mu_unlock(&pollset->mu); - append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline), - err_desc); - gpr_mu_lock(&pollset->mu); + GPR_ASSERT(!ps->shutting_down); + GPR_ASSERT(!ps->seen_inactive); + + gpr_mu_unlock(&ps->mu); /* unlock */ + + /* This is the designated polling thread at this point and should ideally do + polling. However, if there are unprocessed events left from a previous + call to do_epoll_wait(), skip calling epoll_wait() in this iteration and + process the pending epoll events. + + The reason for decoupling do_epoll_wait and process_epoll_events is to + better distrubute the work (i.e handling epoll events) across multiple + threads + + process_epoll_events() returns very quickly: It just queues the work on + exec_ctx but does not execute it (the actual exectution or more + accurately grpc_exec_ctx_flush() happens in end_worker() AFTER selecting + a designated poller). So we are not waiting long periods without a + designated poller */ + if (g_epoll_set.cursor == g_epoll_set.num_events) { + append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline), + err_desc); + } + append_error(&error, process_epoll_events(exec_ctx, ps), err_desc); + + gpr_mu_lock(&ps->mu); /* lock */ + gpr_tls_set(&g_current_thread_worker, 0); } else { - gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); + gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps); } - end_worker(exec_ctx, pollset, &worker, worker_hdl); + end_worker(exec_ctx, ps, &worker, worker_hdl); + gpr_tls_set(&g_current_thread_pollset, 0); return error; } @@ -1006,7 +1111,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, static void shutdown_engine(void) { fd_global_shutdown(); pollset_global_shutdown(); - close(g_epfd); + epoll_set_shutdown(); } static const grpc_event_engine_vtable vtable = { @@ -1041,7 +1146,8 @@ static const grpc_event_engine_vtable vtable = { }; /* It is possible that GLIBC has epoll but the underlying kernel doesn't. - * Create a dummy epoll_fd to make sure epoll support is available */ + * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll + * support is available */ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) { /* TODO(sreek): Temporarily disable this poller unless explicitly requested * via GRPC_POLL_STRATEGY */ @@ -1053,22 +1159,18 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) { return NULL; } - g_epfd = epoll_create1(EPOLL_CLOEXEC); - if (g_epfd < 0) { - gpr_log(GPR_ERROR, "epoll unavailable"); + if (!epoll_set_init()) { return NULL; } fd_global_init(); if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) { - close(g_epfd); fd_global_shutdown(); + epoll_set_shutdown(); return NULL; } - gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd); - return &vtable; } -- cgit v1.2.3