diff options
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_linux.c | 153 |
1 files changed, 87 insertions, 66 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 046ec5e740..d45f87c2f8 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -844,6 +844,8 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, /******************************************************************************* * Pollset Definitions */ +GPR_TLS_DECL(g_current_thread_pollset); +GPR_TLS_DECL(g_current_thread_worker); static void sig_handler(int sig_num) { #ifdef GRPC_EPOLL_DEBUG @@ -859,11 +861,15 @@ static void poller_kick_init() { /* Global state management */ static void pollset_global_init(void) { grpc_wakeup_fd_init(&grpc_global_wakeup_fd); + gpr_tls_init(&g_current_thread_pollset); + gpr_tls_init(&g_current_thread_worker); poller_kick_init(); } static void pollset_global_shutdown(void) { grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); + gpr_tls_destroy(&g_current_thread_pollset); + gpr_tls_destroy(&g_current_thread_worker); } static void pollset_worker_kick(grpc_pollset_worker *worker) { @@ -915,7 +921,9 @@ static void pollset_kick(grpc_pollset *p, GPR_TIMER_BEGIN("pollset_kick.broadcast", 0); for (worker = p->root_worker.next; worker != &p->root_worker; worker = worker->next) { - pollset_worker_kick(worker); + if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) { + pollset_worker_kick(worker); + } } } else { p->kicked_without_pollers = true; @@ -923,9 +931,18 @@ static void pollset_kick(grpc_pollset *p, GPR_TIMER_END("pollset_kick.broadcast", 0); } else { GPR_TIMER_MARK("kicked_specifically", 0); - pollset_worker_kick(worker); + if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) { + pollset_worker_kick(worker); + } } - } else { + } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) { + /* Since worker == NULL, it means that we can kick "any" worker on this + pollset 'p'. If 'p' happens to be the same pollset this thread is + currently polling (i.e in pollset_work() function), then there is no need + to kick any other worker since the current thread can just absorb the + kick. This is the reason why we enter this case only when + g_current_thread_pollset is != p */ + GPR_TIMER_MARK("kick_anonymous", 0); worker = pop_front_worker(p); if (worker != NULL) { @@ -999,6 +1016,69 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_unlock(&fd->mu); } +/* Release the reference to pollset->polling_island and set it to NULL. + pollset->mu must be held */ +static void pollset_release_polling_island_locked(grpc_pollset *pollset) { + gpr_mu_lock(&pollset->pi_mu); + if (pollset->polling_island) { + pollset->polling_island = + polling_island_update_and_lock(pollset->polling_island, 1, 0); + polling_island_unref_and_unlock(pollset->polling_island, 1); + pollset->polling_island = NULL; + } + gpr_mu_unlock(&pollset->pi_mu); +} + +static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset) { + /* The pollset cannot have any workers if we are at this stage */ + GPR_ASSERT(!pollset_has_workers(pollset)); + + pollset->finish_shutdown_called = true; + pollset_release_polling_island_locked(pollset); + + grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); +} + +/* pollset->mu lock must be held by the caller before calling this */ +static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_closure *closure) { + GPR_TIMER_BEGIN("pollset_shutdown", 0); + GPR_ASSERT(!pollset->shutting_down); + pollset->shutting_down = true; + pollset->shutdown_done = closure; + pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); + + /* If the pollset has any workers, we cannot call finish_shutdown_locked() + because it would release the underlying polling island. In such a case, we + let the last worker call finish_shutdown_locked() from pollset_work() */ + if (!pollset_has_workers(pollset)) { + GPR_ASSERT(!pollset->finish_shutdown_called); + GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0); + finish_shutdown_locked(exec_ctx, pollset); + } + GPR_TIMER_END("pollset_shutdown", 0); +} + +/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other + * than destroying the mutexes, there is nothing special that needs to be done + * here */ +static void pollset_destroy(grpc_pollset *pollset) { + GPR_ASSERT(!pollset_has_workers(pollset)); + gpr_mu_destroy(&pollset->pi_mu); + gpr_mu_destroy(&pollset->mu); +} + +static void pollset_reset(grpc_pollset *pollset) { + GPR_ASSERT(pollset->shutting_down); + GPR_ASSERT(!pollset_has_workers(pollset)); + pollset->shutting_down = false; + pollset->finish_shutdown_called = false; + pollset->kicked_without_pollers = false; + pollset->shutdown_done = NULL; + pollset_release_polling_island_locked(pollset); +} + #define GRPC_EPOLL_MAX_EVENTS 1000 static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int timeout_ms, @@ -1103,69 +1183,6 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("pollset_work_and_unlock", 0); } -/* Release the reference to pollset->polling_island and set it to NULL. - pollset->mu must be held */ -static void pollset_release_polling_island_locked(grpc_pollset *pollset) { - gpr_mu_lock(&pollset->pi_mu); - if (pollset->polling_island) { - pollset->polling_island = - polling_island_update_and_lock(pollset->polling_island, 1, 0); - polling_island_unref_and_unlock(pollset->polling_island, 1); - pollset->polling_island = NULL; - } - gpr_mu_unlock(&pollset->pi_mu); -} - -static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset) { - /* The pollset cannot have any workers if we are at this stage */ - GPR_ASSERT(!pollset_has_workers(pollset)); - - pollset->finish_shutdown_called = true; - pollset_release_polling_island_locked(pollset); - - grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); -} - -/* pollset->mu lock must be held by the caller before calling this */ -static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_closure *closure) { - GPR_TIMER_BEGIN("pollset_shutdown", 0); - GPR_ASSERT(!pollset->shutting_down); - pollset->shutting_down = true; - pollset->shutdown_done = closure; - pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); - - /* If the pollset has any workers, we cannot call finish_shutdown_locked() - because it would release the underlying polling island. In such a case, we - let the last worker call finish_shutdown_locked() from pollset_work() */ - if (!pollset_has_workers(pollset)) { - GPR_ASSERT(!pollset->finish_shutdown_called); - GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0); - finish_shutdown_locked(exec_ctx, pollset); - } - GPR_TIMER_END("pollset_shutdown", 0); -} - -/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other - * than destroying the mutexes, there is nothing special that needs to be done - * here */ -static void pollset_destroy(grpc_pollset *pollset) { - GPR_ASSERT(!pollset_has_workers(pollset)); - gpr_mu_destroy(&pollset->pi_mu); - gpr_mu_destroy(&pollset->mu); -} - -static void pollset_reset(grpc_pollset *pollset) { - GPR_ASSERT(pollset->shutting_down); - GPR_ASSERT(!pollset_has_workers(pollset)); - pollset->shutting_down = false; - pollset->finish_shutdown_called = false; - pollset->kicked_without_pollers = false; - pollset->shutdown_done = NULL; - pollset_release_polling_island_locked(pollset); -} - /* pollset->mu lock must be held by the caller before calling this. The function pollset_work() may temporarily release the lock (pollset->mu) during the course of its execution but it will always re-acquire the lock and @@ -1184,6 +1201,8 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, worker.pt_id = pthread_self(); *worker_hdl = &worker; + gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); + gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); if (pollset->kicked_without_pollers) { /* If the pollset was kicked without pollers, pretend that the current @@ -1226,6 +1245,8 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } *worker_hdl = NULL; + gpr_tls_set(&g_current_thread_pollset, (intptr_t)0); + gpr_tls_set(&g_current_thread_worker, (intptr_t)0); GPR_TIMER_END("pollset_work", 0); } |