aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c153
1 files changed, 87 insertions, 66 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 046ec5e740..d45f87c2f8 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -844,6 +844,8 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
/*******************************************************************************
* Pollset Definitions
*/
+GPR_TLS_DECL(g_current_thread_pollset);
+GPR_TLS_DECL(g_current_thread_worker);
static void sig_handler(int sig_num) {
#ifdef GRPC_EPOLL_DEBUG
@@ -859,11 +861,15 @@ static void poller_kick_init() {
/* Global state management */
static void pollset_global_init(void) {
grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+ gpr_tls_init(&g_current_thread_pollset);
+ gpr_tls_init(&g_current_thread_worker);
poller_kick_init();
}
static void pollset_global_shutdown(void) {
grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ gpr_tls_destroy(&g_current_thread_pollset);
+ gpr_tls_destroy(&g_current_thread_worker);
}
static void pollset_worker_kick(grpc_pollset_worker *worker) {
@@ -915,7 +921,9 @@ static void pollset_kick(grpc_pollset *p,
GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
for (worker = p->root_worker.next; worker != &p->root_worker;
worker = worker->next) {
- pollset_worker_kick(worker);
+ if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
+ pollset_worker_kick(worker);
+ }
}
} else {
p->kicked_without_pollers = true;
@@ -923,9 +931,18 @@ static void pollset_kick(grpc_pollset *p,
GPR_TIMER_END("pollset_kick.broadcast", 0);
} else {
GPR_TIMER_MARK("kicked_specifically", 0);
- pollset_worker_kick(worker);
+ if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
+ pollset_worker_kick(worker);
+ }
}
- } else {
+ } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
+ /* Since worker == NULL, it means that we can kick "any" worker on this
+ pollset 'p'. If 'p' happens to be the same pollset this thread is
+ currently polling (i.e in pollset_work() function), then there is no need
+ to kick any other worker since the current thread can just absorb the
+ kick. This is the reason why we enter this case only when
+ g_current_thread_pollset is != p */
+
GPR_TIMER_MARK("kick_anonymous", 0);
worker = pop_front_worker(p);
if (worker != NULL) {
@@ -999,6 +1016,69 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_unlock(&fd->mu);
}
+/* Release the reference to pollset->polling_island and set it to NULL.
+ pollset->mu must be held */
+static void pollset_release_polling_island_locked(grpc_pollset *pollset) {
+ gpr_mu_lock(&pollset->pi_mu);
+ if (pollset->polling_island) {
+ pollset->polling_island =
+ polling_island_update_and_lock(pollset->polling_island, 1, 0);
+ polling_island_unref_and_unlock(pollset->polling_island, 1);
+ pollset->polling_island = NULL;
+ }
+ gpr_mu_unlock(&pollset->pi_mu);
+}
+
+static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset) {
+ /* The pollset cannot have any workers if we are at this stage */
+ GPR_ASSERT(!pollset_has_workers(pollset));
+
+ pollset->finish_shutdown_called = true;
+ pollset_release_polling_island_locked(pollset);
+
+ grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
+}
+
+/* pollset->mu lock must be held by the caller before calling this */
+static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_closure *closure) {
+ GPR_TIMER_BEGIN("pollset_shutdown", 0);
+ GPR_ASSERT(!pollset->shutting_down);
+ pollset->shutting_down = true;
+ pollset->shutdown_done = closure;
+ pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
+
+ /* If the pollset has any workers, we cannot call finish_shutdown_locked()
+ because it would release the underlying polling island. In such a case, we
+ let the last worker call finish_shutdown_locked() from pollset_work() */
+ if (!pollset_has_workers(pollset)) {
+ GPR_ASSERT(!pollset->finish_shutdown_called);
+ GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
+ finish_shutdown_locked(exec_ctx, pollset);
+ }
+ GPR_TIMER_END("pollset_shutdown", 0);
+}
+
+/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
+ * than destroying the mutexes, there is nothing special that needs to be done
+ * here */
+static void pollset_destroy(grpc_pollset *pollset) {
+ GPR_ASSERT(!pollset_has_workers(pollset));
+ gpr_mu_destroy(&pollset->pi_mu);
+ gpr_mu_destroy(&pollset->mu);
+}
+
+static void pollset_reset(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->shutting_down);
+ GPR_ASSERT(!pollset_has_workers(pollset));
+ pollset->shutting_down = false;
+ pollset->finish_shutdown_called = false;
+ pollset->kicked_without_pollers = false;
+ pollset->shutdown_done = NULL;
+ pollset_release_polling_island_locked(pollset);
+}
+
#define GRPC_EPOLL_MAX_EVENTS 1000
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset, int timeout_ms,
@@ -1103,69 +1183,6 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("pollset_work_and_unlock", 0);
}
-/* Release the reference to pollset->polling_island and set it to NULL.
- pollset->mu must be held */
-static void pollset_release_polling_island_locked(grpc_pollset *pollset) {
- gpr_mu_lock(&pollset->pi_mu);
- if (pollset->polling_island) {
- pollset->polling_island =
- polling_island_update_and_lock(pollset->polling_island, 1, 0);
- polling_island_unref_and_unlock(pollset->polling_island, 1);
- pollset->polling_island = NULL;
- }
- gpr_mu_unlock(&pollset->pi_mu);
-}
-
-static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset) {
- /* The pollset cannot have any workers if we are at this stage */
- GPR_ASSERT(!pollset_has_workers(pollset));
-
- pollset->finish_shutdown_called = true;
- pollset_release_polling_island_locked(pollset);
-
- grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
-}
-
-/* pollset->mu lock must be held by the caller before calling this */
-static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_closure *closure) {
- GPR_TIMER_BEGIN("pollset_shutdown", 0);
- GPR_ASSERT(!pollset->shutting_down);
- pollset->shutting_down = true;
- pollset->shutdown_done = closure;
- pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
-
- /* If the pollset has any workers, we cannot call finish_shutdown_locked()
- because it would release the underlying polling island. In such a case, we
- let the last worker call finish_shutdown_locked() from pollset_work() */
- if (!pollset_has_workers(pollset)) {
- GPR_ASSERT(!pollset->finish_shutdown_called);
- GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
- finish_shutdown_locked(exec_ctx, pollset);
- }
- GPR_TIMER_END("pollset_shutdown", 0);
-}
-
-/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
- * than destroying the mutexes, there is nothing special that needs to be done
- * here */
-static void pollset_destroy(grpc_pollset *pollset) {
- GPR_ASSERT(!pollset_has_workers(pollset));
- gpr_mu_destroy(&pollset->pi_mu);
- gpr_mu_destroy(&pollset->mu);
-}
-
-static void pollset_reset(grpc_pollset *pollset) {
- GPR_ASSERT(pollset->shutting_down);
- GPR_ASSERT(!pollset_has_workers(pollset));
- pollset->shutting_down = false;
- pollset->finish_shutdown_called = false;
- pollset->kicked_without_pollers = false;
- pollset->shutdown_done = NULL;
- pollset_release_polling_island_locked(pollset);
-}
-
/* pollset->mu lock must be held by the caller before calling this.
The function pollset_work() may temporarily release the lock (pollset->mu)
during the course of its execution but it will always re-acquire the lock and
@@ -1184,6 +1201,8 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
worker.pt_id = pthread_self();
*worker_hdl = &worker;
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+ gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
if (pollset->kicked_without_pollers) {
/* If the pollset was kicked without pollers, pretend that the current
@@ -1226,6 +1245,8 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
*worker_hdl = NULL;
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
+ gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
GPR_TIMER_END("pollset_work", 0);
}