diff options
author | Jan Tattermusch <jtattermusch@users.noreply.github.com> | 2016-06-29 10:35:13 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-06-29 10:35:13 -0700 |
commit | 40a3e64aba597760edcdab2d9e95a0304840ebe8 (patch) | |
tree | af682d770e73746973109ed520f4ee7edf208814 /src/core/lib | |
parent | 3db76b92b92c5cb0d79fbf81e2526602357a6b84 (diff) | |
parent | 34217248cffed13526a7b2f5f0554a189e3b59cb (diff) |
Merge pull request #7123 from sreecha/epoll_perf
Trace statements in epoll poller (enabled by default to debug the recent perf regression)
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_linux.c | 94 |
1 files changed, 74 insertions, 20 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 475bd3c1df..cf0fe736a0 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -60,6 +60,13 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" +/* TODO: sreek - Move this to init.c and initialize this like other tracers. */ +static int grpc_polling_trace = 0; /* Disabled by default */ +#define GRPC_POLLING_TRACE(fmt, ...) \ + if (grpc_polling_trace) { \ + gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \ + } + static int grpc_wakeup_signal = -1; static bool is_grpc_wakeup_signal_initialized = false; @@ -195,7 +202,11 @@ typedef struct polling_island { * Pollset Declarations */ struct grpc_pollset_worker { - pthread_t pt_id; /* Thread id of this worker */ + /* Thread id of this worker */ + pthread_t pt_id; + + /* Used to prevent a worker from getting kicked multiple times */ + gpr_atm is_kicked; struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; }; @@ -1058,9 +1069,16 @@ static void pollset_global_shutdown(void) { static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) { grpc_error *err = GRPC_ERROR_NONE; - int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal); - if (err_num != 0) { - err = GRPC_OS_ERROR(err_num, "pthread_kill"); + + /* Kick the worker only if it was not already kicked */ + if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) { + GRPC_POLLING_TRACE( + "pollset_worker_kick: Kicking worker: %p (thread id: %ld)", + (void *)worker, worker->pt_id); + int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal); + if (err_num != 0) { + err = GRPC_OS_ERROR(err_num, "pthread_kill"); + } } return err; } @@ -1104,7 +1122,6 @@ static grpc_error *pollset_kick(grpc_pollset *p, GPR_TIMER_BEGIN("pollset_kick", 0); grpc_error *error = GRPC_ERROR_NONE; const char *err_desc = "Kick Failure"; - grpc_pollset_worker *worker = specific_worker; if (worker != NULL) { if (worker == GRPC_POLLSET_KICK_BROADCAST) { @@ -1270,7 +1287,8 @@ static void pollset_reset(grpc_pollset *pollset) { #define GRPC_EPOLL_MAX_EVENTS 1000 /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, int timeout_ms, + grpc_pollset *pollset, + grpc_pollset_worker *worker, int timeout_ms, sigset_t *sig_mask, grpc_error **error) { struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; int epoll_fd = -1; @@ -1298,6 +1316,8 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, } PI_ADD_REF(pollset->polling_island, "ps"); + GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p", + (void *)pollset, (void *)pollset->polling_island); } pi = polling_island_maybe_get_latest(pollset->polling_island); @@ -1331,6 +1351,9 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, } else { /* We were interrupted. Save an interation by doing a zero timeout epoll_wait to see if there are any other events of interest */ + GRPC_POLLING_TRACE( + "pollset_work: pollset: %p, worker: %p received kick", + (void *)pollset, (void *)worker); ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); } } @@ -1347,6 +1370,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd), err_desc); } else if (data_ptr == &polling_island_wakeup_fd) { + GRPC_POLLING_TRACE( + "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " + "%d) got merged", + (void *)pollset, (void *)worker, epoll_fd); /* This means that our polling island is merged with a different island. We do not have to do anything here since the subsequent call to the function pollset_work_and_unlock() will pick up the correct @@ -1394,6 +1421,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker worker; worker.next = worker.prev = NULL; worker.pt_id = pthread_self(); + gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0); *worker_hdl = &worker; @@ -1409,18 +1437,20 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->kicked_without_pollers = 0; } else if (!pollset->shutting_down) { /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up - (i.e 'kicking') a worker in the pollset. - A 'kick' is a way to inform that worker that there is some pending work - that needs immediate attention (like an event on the completion queue, - or a polling island merge that results in a new epoll-fd to wait on) and - that the worker should not spend time waiting in epoll_pwait(). - - A kick can come at anytime (i.e before/during or after the worker calls - epoll_pwait()) but in all cases we have to make sure that when a worker - gets a kick, it does not spend time in epoll_pwait(). In other words, one - kick should result in skipping/exiting of one epoll_pwait(); - - To accomplish this, we mask 'grpc_wakeup_signal' on this worker at all + (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the + worker that there is some pending work that needs immediate attention + (like an event on the completion queue, or a polling island merge that + results in a new epoll-fd to wait on) and that the worker should not + spend time waiting in epoll_pwait(). + + A worker can be kicked anytime from the point it is added to the pollset + via push_front_worker() (or push_back_worker()) to the point it is + removed via remove_worker(). + If the worker is kicked before/during it calls epoll_pwait(), it should + immediately exit from epoll_wait(). If the worker is kicked after it + returns from epoll_wait(), then nothing really needs to be done. + + To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all times *except* when it is in epoll_pwait(). This way, the worker never misses acting on a kick */ @@ -1442,11 +1472,14 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, push_front_worker(pollset, &worker); /* Add worker to pollset */ - pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &g_orig_sigmask, - &error); + pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms, + &g_orig_sigmask, &error); grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->mu); + + /* Note: There is no need to reset worker.is_kicked to 0 since we are no + longer going to use this worker */ remove_worker(pollset, &worker); } @@ -1506,17 +1539,38 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pi_new = fd->polling_island; if (pi_new == NULL) { pi_new = polling_island_create(fd, &error); + + GRPC_POLLING_TRACE( + "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, " + "pollset: %p)", + (void *)pi_new, fd->fd, (void *)pollset); } } else if (fd->polling_island == NULL) { pi_new = polling_island_lock(pollset->polling_island); polling_island_add_fds_locked(pi_new, &fd, 1, true, &error); gpr_mu_unlock(&pi_new->mu); + + GRPC_POLLING_TRACE( + "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, " + "pollset->pi: %p)", + (void *)pi_new, fd->fd, (void *)pollset, + (void *)pollset->polling_island); } else if (pollset->polling_island == NULL) { pi_new = polling_island_lock(fd->polling_island); gpr_mu_unlock(&pi_new->mu); + + GRPC_POLLING_TRACE( + "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: " + "%p, fd->pi: %p", + (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island); } else { pi_new = polling_island_merge(fd->polling_island, pollset->polling_island, &error); + GRPC_POLLING_TRACE( + "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: " + "%p, fd->pi: %p, pollset->pi: %p)", + (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island, + (void *)pollset->polling_island); } /* At this point, pi_new is the polling island that both fd->polling_island |