diff options
Diffstat (limited to 'src/core/lib/iomgr/ev_epoll1_linux.c')
-rw-r--r-- | src/core/lib/iomgr/ev_epoll1_linux.c | 281 |
1 files changed, 212 insertions, 69 deletions
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 90e0ce36cd..b940d48ba9 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -48,7 +48,60 @@ #include "src/core/lib/support/string.h" static grpc_wakeup_fd global_wakeup_fd; -static int g_epfd; + +/******************************************************************************* + * Singleton epoll set related fields + */ + +#define MAX_EPOLL_EVENTS 100 +#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1 + +/* NOTE ON SYNCHRONIZATION: + * - Fields in this struct are only modified by the designated poller. Hence + * there is no need for any locks to protect the struct. + * - num_events and cursor fields have to be of atomic type to provide memory + * visibility guarantees only. i.e In case of multiple pollers, the designated + * polling thread keeps changing; the thread that wrote these values may be + * different from the thread reading the values + */ +typedef struct epoll_set { + int epfd; + + /* The epoll_events after the last call to epoll_wait() */ + struct epoll_event events[MAX_EPOLL_EVENTS]; + + /* The number of epoll_events after the last call to epoll_wait() */ + gpr_atm num_events; + + /* Index of the first event in epoll_events that has to be processed. This + * field is only valid if num_events > 0 */ + gpr_atm cursor; +} epoll_set; + +/* The global singleton epoll set */ +static epoll_set g_epoll_set; + +/* Must be called *only* once */ +static bool epoll_set_init() { + g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC); + if (g_epoll_set.epfd < 0) { + gpr_log(GPR_ERROR, "epoll unavailable"); + return false; + } + + gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd); + gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0); + gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0); + return true; +} + +/* epoll_set_init() MUST be called before calling this. */ +static void epoll_set_shutdown() { + if (g_epoll_set.epfd >= 0) { + close(g_epoll_set.epfd); + g_epoll_set.epfd = -1; + } +} /******************************************************************************* * Fd Declarations @@ -122,7 +175,7 @@ struct grpc_pollset { bool kicked_without_poller; /* Set to true if the pollset is observed to have no workers available to - * poll */ + poll */ bool seen_inactive; bool shutting_down; /* Is the pollset shutting down ? */ grpc_closure *shutdown_closure; /* Called after after shutdown is complete */ @@ -228,7 +281,7 @@ static grpc_fd *fd_create(int fd, const char *name) { struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET), .data.ptr = new_fd}; - if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) { + if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) { gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno)); } @@ -326,7 +379,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { GPR_TLS_DECL(g_current_thread_pollset); GPR_TLS_DECL(g_current_thread_worker); + +/* The designated poller */ static gpr_atm g_active_poller; + static pollset_neighbourhood *g_neighbourhoods; static size_t g_num_neighbourhoods; @@ -380,7 +436,8 @@ static grpc_error *pollset_global_init(void) { if (err != GRPC_ERROR_NONE) return err; struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET), .data.ptr = &global_wakeup_fd}; - if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) { + if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, + &ev) != 0) { return GRPC_OS_ERROR(errno, "epoll_ctl"); } g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS); @@ -406,7 +463,14 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->mu); *mu = &pollset->mu; pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()]; + pollset->reassigning_neighbourhood = false; + pollset->root_worker = NULL; + pollset->kicked_without_poller = false; pollset->seen_inactive = true; + pollset->shutting_down = false; + pollset->shutdown_closure = NULL; + pollset->begin_refs = 0; + pollset->next = pollset->prev = NULL; } static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { @@ -438,6 +502,7 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } static grpc_error *pollset_kick_all(grpc_pollset *pollset) { + GPR_TIMER_BEGIN("pollset_kick_all", 0); grpc_error *error = GRPC_ERROR_NONE; if (pollset->root_worker != NULL) { grpc_pollset_worker *worker = pollset->root_worker; @@ -463,7 +528,7 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { } // TODO: sreek. Check if we need to set 'kicked_without_poller' to true here // in the else case - + GPR_TIMER_END("pollset_kick_all", 0); return error; } @@ -471,6 +536,7 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL && pollset->begin_refs == 0) { + GPR_TIMER_MARK("pollset_finish_shutdown", 0); GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); pollset->shutdown_closure = NULL; } @@ -478,16 +544,16 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { + GPR_TIMER_BEGIN("pollset_shutdown", 0); GPR_ASSERT(pollset->shutdown_closure == NULL); GPR_ASSERT(!pollset->shutting_down); pollset->shutdown_closure = closure; pollset->shutting_down = true; GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset)); pollset_maybe_finish_shutdown(exec_ctx, pollset); + GPR_TIMER_END("pollset_shutdown", 0); } -#define MAX_EPOLL_EVENTS 100 - static int poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) { gpr_timespec timeout; @@ -506,52 +572,93 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, return millis >= 1 ? millis : 1; } -static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - gpr_timespec now, gpr_timespec deadline) { - struct epoll_event events[MAX_EPOLL_EVENTS]; - static const char *err_desc = "pollset_poll"; - - int timeout = poll_deadline_to_millis_timeout(deadline, now); - - if (timeout != 0) { - GRPC_SCHEDULING_START_BLOCKING_REGION; - } - int r; - do { - r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout); - } while (r < 0 && errno == EINTR); - if (timeout != 0) { - GRPC_SCHEDULING_END_BLOCKING_REGION; - } +/* Process the epoll events found by do_epoll_wait() function. + - g_epoll_set.cursor points to the index of the first event to be processed + - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and + updates the g_epoll_set.cursor + + NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only + called by g_active_poller thread. So there is no need for synchronization + when accessing fields in g_epoll_set */ +static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset) { + static const char *err_desc = "process_events"; + grpc_error *error = GRPC_ERROR_NONE; - if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); + GPR_TIMER_BEGIN("process_epoll_events", 0); + long num_events = gpr_atm_acq_load(&g_epoll_set.num_events); + long cursor = gpr_atm_acq_load(&g_epoll_set.cursor); + for (int idx = 0; + (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events; + idx++) { + long c = cursor++; + struct epoll_event *ev = &g_epoll_set.events[c]; + void *data_ptr = ev->data.ptr; - grpc_error *error = GRPC_ERROR_NONE; - for (int i = 0; i < r; i++) { - void *data_ptr = events[i].data.ptr; if (data_ptr == &global_wakeup_fd) { append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); } else { grpc_fd *fd = (grpc_fd *)(data_ptr); - bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0; - bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0; - bool write_ev = (events[i].events & EPOLLOUT) != 0; + bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; + bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; + bool write_ev = (ev->events & EPOLLOUT) != 0; + if (read_ev || cancel) { fd_become_readable(exec_ctx, fd, pollset); } + if (write_ev || cancel) { fd_become_writable(exec_ctx, fd); } } } - + gpr_atm_rel_store(&g_epoll_set.cursor, cursor); + GPR_TIMER_END("process_epoll_events", 0); return error; } +/* Do epoll_wait and store the events in g_epoll_set.events field. This does not + "process" any of the events yet; that is done in process_epoll_events(). + *See process_epoll_events() function for more details. + + NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller + (i.e the designated poller thread) will be calling this function. So there is + no need for any synchronization when accesing fields in g_epoll_set */ +static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, + gpr_timespec now, gpr_timespec deadline) { + GPR_TIMER_BEGIN("do_epoll_wait", 0); + + int r; + int timeout = poll_deadline_to_millis_timeout(deadline, now); + if (timeout != 0) { + GRPC_SCHEDULING_START_BLOCKING_REGION; + } + do { + r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS, + timeout); + } while (r < 0 && errno == EINTR); + if (timeout != 0) { + GRPC_SCHEDULING_END_BLOCKING_REGION; + } + + if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); + + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r); + } + + gpr_atm_rel_store(&g_epoll_set.num_events, r); + gpr_atm_rel_store(&g_epoll_set.cursor, 0); + + GPR_TIMER_END("do_epoll_wait", 0); + return GRPC_ERROR_NONE; +} + static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl, gpr_timespec *now, gpr_timespec deadline) { + GPR_TIMER_BEGIN("begin_worker", 0); if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; SET_KICK_STATE(worker, UNKICKED); @@ -656,14 +763,17 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, if (pollset->kicked_without_poller) { pollset->kicked_without_poller = false; + GPR_TIMER_END("begin_worker", 0); return false; } + GPR_TIMER_END("begin_worker", 0); return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down; } static bool check_neighbourhood_for_available_poller( pollset_neighbourhood *neighbourhood) { + GPR_TIMER_BEGIN("check_neighbourhood_for_available_poller", 0); bool found_worker = false; do { grpc_pollset *inspect = neighbourhood->active_root; @@ -685,6 +795,7 @@ static bool check_neighbourhood_for_available_poller( } SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER); if (inspect_worker->initialized_cv) { + GPR_TIMER_MARK("signal worker", 0); gpr_cv_signal(&inspect_worker->cv); } } else { @@ -720,12 +831,14 @@ static bool check_neighbourhood_for_available_poller( } gpr_mu_unlock(&inspect->mu); } while (!found_worker); + GPR_TIMER_END("check_neighbourhood_for_available_poller", 0); return found_worker; } static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { + GPR_TIMER_BEGIN("end_worker", 0); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker); } @@ -795,42 +908,71 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset_maybe_finish_shutdown(exec_ctx, pollset); } GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker); + GPR_TIMER_END("end_worker", 0); } /* pollset->po.mu lock must be held by the caller before calling this. The function pollset_work() may temporarily release the lock (pollset->po.mu) during the course of its execution but it will always re-acquire the lock and ensure that it is held by the time the function returns */ -static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, +static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollset_work"; - if (pollset->kicked_without_poller) { - pollset->kicked_without_poller = false; + GPR_TIMER_BEGIN("pollset_work", 0); + if (ps->kicked_without_poller) { + ps->kicked_without_poller = false; + GPR_TIMER_END("pollset_work", 0); return GRPC_ERROR_NONE; } - if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { - gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); + + if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) { + gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); - GPR_ASSERT(!pollset->shutting_down); - GPR_ASSERT(!pollset->seen_inactive); - gpr_mu_unlock(&pollset->mu); - append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline), - err_desc); - gpr_mu_lock(&pollset->mu); + GPR_ASSERT(!ps->shutting_down); + GPR_ASSERT(!ps->seen_inactive); + + gpr_mu_unlock(&ps->mu); /* unlock */ + /* This is the designated polling thread at this point and should ideally do + polling. However, if there are unprocessed events left from a previous + call to do_epoll_wait(), skip calling epoll_wait() in this iteration and + process the pending epoll events. + + The reason for decoupling do_epoll_wait and process_epoll_events is to + better distrubute the work (i.e handling epoll events) across multiple + threads + + process_epoll_events() returns very quickly: It just queues the work on + exec_ctx but does not execute it (the actual exectution or more + accurately grpc_exec_ctx_flush() happens in end_worker() AFTER selecting + a designated poller). So we are not waiting long periods without a + designated poller */ + if (gpr_atm_acq_load(&g_epoll_set.cursor) == + gpr_atm_acq_load(&g_epoll_set.num_events)) { + append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline), + err_desc); + } + append_error(&error, process_epoll_events(exec_ctx, ps), err_desc); + + gpr_mu_lock(&ps->mu); /* lock */ + gpr_tls_set(&g_current_thread_worker, 0); } else { - gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); + gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps); } - end_worker(exec_ctx, pollset, &worker, worker_hdl); + end_worker(exec_ctx, ps, &worker, worker_hdl); + gpr_tls_set(&g_current_thread_pollset, 0); + GPR_TIMER_END("pollset_work", 0); return error; } static grpc_error *pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + GPR_TIMER_BEGIN("pollset_kick", 0); + grpc_error *ret_err = GRPC_ERROR_NONE; if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_strvec log; gpr_strvec_init(&log); @@ -865,7 +1007,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. kicked_without_poller"); } - return GRPC_ERROR_NONE; + goto done; } grpc_pollset_worker *next_worker = root_worker->next; if (root_worker->kick_state == KICKED) { @@ -873,13 +1015,13 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, gpr_log(GPR_ERROR, " .. already kicked %p", root_worker); } SET_KICK_STATE(root_worker, KICKED); - return GRPC_ERROR_NONE; + goto done; } else if (next_worker->kick_state == KICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. already kicked %p", next_worker); } SET_KICK_STATE(next_worker, KICKED); - return GRPC_ERROR_NONE; + goto done; } else if (root_worker == next_worker && // only try and wake up a poller if // there is no next worker @@ -889,7 +1031,8 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, gpr_log(GPR_ERROR, " .. kicked %p", root_worker); } SET_KICK_STATE(root_worker, KICKED); - return grpc_wakeup_fd_wakeup(&global_wakeup_fd); + ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd); + goto done; } else if (next_worker->kick_state == UNKICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. kicked %p", next_worker); @@ -897,7 +1040,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, GPR_ASSERT(next_worker->initialized_cv); SET_KICK_STATE(next_worker, KICKED); gpr_cv_signal(&next_worker->cv); - return GRPC_ERROR_NONE; + goto done; } else if (next_worker->kick_state == DESIGNATED_POLLER) { if (root_worker->kick_state != DESIGNATED_POLLER) { if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -910,59 +1053,64 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, if (root_worker->initialized_cv) { gpr_cv_signal(&root_worker->cv); } - return GRPC_ERROR_NONE; + goto done; } else { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker, root_worker); } SET_KICK_STATE(next_worker, KICKED); - return grpc_wakeup_fd_wakeup(&global_wakeup_fd); + ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd); + goto done; } } else { GPR_ASSERT(next_worker->kick_state == KICKED); SET_KICK_STATE(next_worker, KICKED); - return GRPC_ERROR_NONE; + goto done; } } else { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. kicked while waking up"); } - return GRPC_ERROR_NONE; + goto done; } } else if (specific_worker->kick_state == KICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. specific worker already kicked"); } - return GRPC_ERROR_NONE; + goto done; } else if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker); } SET_KICK_STATE(specific_worker, KICKED); - return GRPC_ERROR_NONE; + goto done; } else if (specific_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. kick active poller"); } SET_KICK_STATE(specific_worker, KICKED); - return grpc_wakeup_fd_wakeup(&global_wakeup_fd); + ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd); + goto done; } else if (specific_worker->initialized_cv) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. kick waiting worker"); } SET_KICK_STATE(specific_worker, KICKED); gpr_cv_signal(&specific_worker->cv); - return GRPC_ERROR_NONE; + goto done; } else { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. kick non-waiting worker"); } SET_KICK_STATE(specific_worker, KICKED); - return GRPC_ERROR_NONE; + goto done; } +done: + GPR_TIMER_END("pollset_kick", 0); + return ret_err; } static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -1006,7 +1154,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, static void shutdown_engine(void) { fd_global_shutdown(); pollset_global_shutdown(); - close(g_epfd); + epoll_set_shutdown(); } static const grpc_event_engine_vtable vtable = { @@ -1041,10 +1189,9 @@ static const grpc_event_engine_vtable vtable = { }; /* It is possible that GLIBC has epoll but the underlying kernel doesn't. - * Create a dummy epoll_fd to make sure epoll support is available */ + * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll + * support is available */ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) { - /* TODO(sreek): Temporarily disable this poller unless explicitly requested - * via GRPC_POLL_STRATEGY */ if (!explicit_request) { return NULL; } @@ -1053,22 +1200,18 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) { return NULL; } - g_epfd = epoll_create1(EPOLL_CLOEXEC); - if (g_epfd < 0) { - gpr_log(GPR_ERROR, "epoll unavailable"); + if (!epoll_set_init()) { return NULL; } fd_global_init(); if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) { - close(g_epfd); fd_global_shutdown(); + epoll_set_shutdown(); return NULL; } - gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd); - return &vtable; } |