diff options
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.cc | 59 |
1 files changed, 36 insertions, 23 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index feda517b49..bee464cab1 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -70,6 +70,14 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type; typedef struct pollable pollable; +/// A pollable is something that can be polled: it has an epoll set to poll on, +/// and a wakeup fd for kicks +/// There are three broad types: +/// - PO_EMPTY - the empty pollable, used before file descriptors are added to +/// a pollset +/// - PO_FD - a pollable containing only one FD - used to optimize single-fd +/// pollsets (which are common with synchronous api usage) +/// - PO_MULTI - a pollable containing many fds struct pollable { pollable_type type; // immutable gpr_refcount refs; @@ -111,6 +119,8 @@ static char *pollable_desc(pollable *p) { return out; } +/// Shared empty pollable - used by pollset to poll on until the first fd is +/// added static pollable *g_empty_pollable; static grpc_error *pollable_create(pollable_type type, pollable **p); @@ -173,7 +183,10 @@ typedef enum { PWLINK_POLLABLE = 0, PWLINK_POLLSET, PWLINK_COUNT } pwlinks; struct grpc_pollset_worker { bool kicked; bool initialized_cv; +#ifndef NDEBUG + // debug aid: which thread started this worker pid_t originator; +#endif gpr_cv cv; grpc_pollset *pollset; pollable *pollable_obj; @@ -239,11 +252,6 @@ static bool append_error(grpc_error **composite, grpc_error *error, * becomes a spurious read notification on a reused fd. */ -/* The alarm system needs to be able to wakeup 'some poller' sometimes - * (specifically when a new alarm needs to be triggered earlier than the next - * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a - * case occurs. */ - static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; @@ -543,6 +551,7 @@ static void pollset_global_shutdown(void) { gpr_tls_destroy(&g_current_thread_worker); } +/* pollset->mu must be held while calling this function */ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -562,9 +571,8 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, /* pollset->mu must be held before calling this function, * pollset->active_pollable->mu & specific_worker->pollable_obj->mu must not be * held */ -static grpc_error *pollset_kick_one(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_pollset_worker *specific_worker) { +static grpc_error *kick_one_worker(grpc_exec_ctx *exec_ctx, + grpc_pollset_worker *specific_worker) { pollable *p = specific_worker->pollable_obj; grpc_core::mu_guard lock(&p->mu); GRPC_STATS_INC_POLLSET_KICK(exec_ctx); @@ -623,21 +631,22 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", pollset); } + GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER(exec_ctx); pollset->kicked_without_poller = true; return GRPC_ERROR_NONE; } else { - return pollset_kick_one( - exec_ctx, pollset, - pollset->root_worker->links[PWLINK_POLLSET].next); + return kick_one_worker( + exec_ctx, pollset->root_worker->links[PWLINK_POLLSET].next); } } else { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", pollset); } + GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx); return GRPC_ERROR_NONE; } } else { - return pollset_kick_one(exec_ctx, pollset, specific_worker); + return kick_one_worker(exec_ctx, specific_worker); } } @@ -648,7 +657,7 @@ static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset_worker *w = pollset->root_worker; if (w != NULL) { do { - append_error(&error, pollset_kick_one(exec_ctx, pollset, w), err_desc); + append_error(&error, kick_one_worker(exec_ctx, w), err_desc); w = w->links[PWLINK_POLLSET].next; } while (w != pollset->root_worker); } @@ -690,10 +699,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } -static grpc_error *fd_become_pollable(grpc_fd *fd, pollable **p) { +static grpc_error *fd_get_or_become_pollable(grpc_fd *fd, pollable **p) { gpr_mu_lock(&fd->pollable_mu); grpc_error *error = GRPC_ERROR_NONE; - static const char *err_desc = "fd_become_pollable"; + static const char *err_desc = "fd_get_or_become_pollable"; if (fd->pollable_obj == NULL) { if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj), err_desc)) { @@ -773,13 +782,13 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { pollset->active_pollable = NULL; } -static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - pollable *p, grpc_millis deadline) { +static grpc_error *pollable_epoll(grpc_exec_ctx *exec_ctx, pollable *p, + grpc_millis deadline) { int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (GRPC_TRACER_ON(grpc_polling_trace)) { char *desc = pollable_desc(p); - gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout); + gpr_log(GPR_DEBUG, "POLLABLE:%p[%s] poll for %dms", p, desc, timeout); gpr_free(desc); } @@ -798,7 +807,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r); + gpr_log(GPR_DEBUG, "POLLABLE:%p got %d events", p, r); } p->event_cursor = 0; @@ -934,9 +943,11 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } +#ifndef NDEBUG static long gettid(void) { return syscall(__NR_gettid); } +#endif -/* pollset->po.mu lock must be held by the caller before calling this. +/* pollset->mu lock must be held by the caller before calling this. The function pollset_work() may temporarily release the lock (pollset->po.mu) during the course of its execution but it will always re-acquire the lock and ensure that it is held by the time the function returns */ @@ -951,7 +962,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker worker; #define WORKER_PTR (&worker) #endif +#ifndef NDEBUG WORKER_PTR->originator = gettid(); +#endif if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR " kwp=%d pollable=%p", @@ -968,8 +981,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR); if (WORKER_PTR->pollable_obj->event_cursor == WORKER_PTR->pollable_obj->event_count) { - append_error(&error, pollset_epoll(exec_ctx, pollset, - WORKER_PTR->pollable_obj, deadline), + append_error(&error, pollable_epoll(exec_ctx, WORKER_PTR->pollable_obj, + deadline), err_desc); } append_error(&error, @@ -1000,7 +1013,7 @@ static grpc_error *pollset_transition_pollable_from_empty_to_fd_locked( } append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc); POLLABLE_UNREF(pollset->active_pollable, "pollset"); - append_error(&error, fd_become_pollable(fd, &pollset->active_pollable), + append_error(&error, fd_get_or_become_pollable(fd, &pollset->active_pollable), err_desc); return error; } |