diff options
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.cc | 66 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.cc | 22 | ||||
-rw-r--r-- | src/core/lib/iomgr/fork_posix.cc | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/iocp_windows.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_windows.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_posix.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_windows.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_manager.cc | 29 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_cv.cc | 2 |
11 files changed, 87 insertions, 48 deletions
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index e979ff7eb5..6120f9f44b 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -38,9 +38,9 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/murmur_hash.h" -#include "src/core/lib/gpr/thd.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/wakeup_fd_cv.h" @@ -255,8 +255,13 @@ typedef struct poll_result { } poll_result; typedef struct poll_args { + grpc_core::Thread poller_thd; gpr_cv trigger; int trigger_set; + bool harvestable; + gpr_cv harvest; + bool joinable; + gpr_cv join; struct pollfd* fds; nfds_t nfds; poll_result* result; @@ -266,15 +271,17 @@ typedef struct poll_args { // This is a 2-tiered cache, we mantain a hash table // of active poll calls, so we can wait on the result -// of that call. We also maintain a freelist of inactive -// poll threads. +// of that call. We also maintain freelists of inactive +// poll args and of dead poller threads. typedef struct poll_hash_table { poll_args* free_pollers; poll_args** active_pollers; + poll_args* dead_pollers; unsigned int size; unsigned int count; } poll_hash_table; +// TODO(kpayson64): Eliminate use of global non-POD variables poll_hash_table poll_cache; grpc_cv_fd_table g_cvfds; @@ -1301,6 +1308,7 @@ static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { static void run_poll(void* args); static void cache_poller_locked(poll_args* args); +static void cache_harvest_locked(); static void cache_insert_locked(poll_args* args) { uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd), @@ -1363,6 +1371,10 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) { poll_args* pargs = static_cast<poll_args*>(gpr_malloc(sizeof(struct poll_args))); gpr_cv_init(&pargs->trigger); + gpr_cv_init(&pargs->harvest); + gpr_cv_init(&pargs->join); + pargs->harvestable = false; + pargs->joinable = false; pargs->fds = fds; pargs->nfds = count; pargs->next = nullptr; @@ -1370,11 +1382,9 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) { pargs->trigger_set = 0; init_result(pargs); cache_poller_locked(pargs); - gpr_thd_id t_id; - gpr_thd_options opt = gpr_thd_options_default(); gpr_ref(&g_cvfds.pollcount); - gpr_thd_options_set_detached(&opt); - GPR_ASSERT(gpr_thd_new(&t_id, "grpc_poller", &run_poll, pargs, &opt)); + pargs->poller_thd = grpc_core::Thread("grpc_poller", &run_poll, pargs); + pargs->poller_thd.Start(); return pargs; } @@ -1439,7 +1449,33 @@ static void cache_destroy_locked(poll_args* args) { poll_cache.free_pollers = args->next; } - gpr_free(args); + // Now move this args to the dead poller list for later join + if (poll_cache.dead_pollers != nullptr) { + poll_cache.dead_pollers->prev = args; + } + args->prev = nullptr; + args->next = poll_cache.dead_pollers; + poll_cache.dead_pollers = args; +} + +static void cache_harvest_locked() { + while (poll_cache.dead_pollers) { + poll_args* args = poll_cache.dead_pollers; + poll_cache.dead_pollers = poll_cache.dead_pollers->next; + // Keep the list consistent in case new dead pollers get added when we + // release the lock below to wait on joining + if (poll_cache.dead_pollers) { + poll_cache.dead_pollers->prev = nullptr; + } + args->harvestable = true; + gpr_cv_signal(&args->harvest); + while (!args->joinable) { + gpr_cv_wait(&args->join, &g_cvfds.mu, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } + args->poller_thd.Join(); + gpr_free(args); + } } static void decref_poll_result(poll_result* res) { @@ -1471,6 +1507,7 @@ static void run_poll(void* args) { poll_result* result = pargs->result; int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS); gpr_mu_lock(&g_cvfds.mu); + cache_harvest_locked(); if (retval != 0) { result->completed = 1; result->retval = retval; @@ -1490,6 +1527,7 @@ static void run_poll(void* args) { deadline = gpr_time_add(deadline, thread_grace); pargs->trigger_set = 0; gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline); + cache_harvest_locked(); if (!pargs->trigger_set) { cache_destroy_locked(pargs); break; @@ -1498,10 +1536,15 @@ static void run_poll(void* args) { gpr_mu_unlock(&g_cvfds.mu); } - // We still have the lock here if (gpr_unref(&g_cvfds.pollcount)) { gpr_cv_signal(&g_cvfds.shutdown_cv); } + while (!pargs->harvestable) { + gpr_cv_wait(&pargs->harvest, &g_cvfds.mu, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } + pargs->joinable = true; + gpr_cv_signal(&pargs->join); gpr_mu_unlock(&g_cvfds.mu); } @@ -1514,6 +1557,7 @@ static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { nfds_t nsockfds = 0; poll_result* result = nullptr; gpr_mu_lock(&g_cvfds.mu); + cache_harvest_locked(); pollcv = static_cast<grpc_cv_node*>(gpr_malloc(sizeof(grpc_cv_node))); pollcv->next = nullptr; gpr_cv pollcv_cv; @@ -1577,12 +1621,14 @@ static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { pargs->trigger_set = 1; gpr_cv_signal(&pargs->trigger); gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline); + cache_harvest_locked(); res = result->retval; errno = result->err; result->watchcount--; remove_cvn(&result->watchers, pollcv); } else if (!skip_poll) { gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline); + cache_harvest_locked(); } idx = 0; @@ -1639,6 +1685,7 @@ static void global_cv_fd_table_init() { for (unsigned int i = 0; i < poll_cache.size; i++) { poll_cache.active_pollers[i] = nullptr; } + poll_cache.dead_pollers = nullptr; gpr_mu_unlock(&g_cvfds.mu); } @@ -1657,6 +1704,7 @@ static void global_cv_fd_table_shutdown() { grpc_poll_function = g_cvfds.poll; gpr_free(g_cvfds.cvfds); + cache_harvest_locked(); gpr_free(poll_cache.active_pollers); gpr_mu_unlock(&g_cvfds.mu); diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index 132fe87870..2f544b20ab 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -23,7 +23,7 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/profiling/timers.h" diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index e7f412a562..b017db53f8 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -29,9 +29,9 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/spinlock.h" -#include "src/core/lib/gpr/thd.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #define MAX_DEPTH 2 @@ -43,7 +43,7 @@ typedef struct { size_t depth; bool shutdown; bool queued_long_job; - gpr_thd_id id; + grpc_core::Thread thd; } thread_state; static thread_state* g_thread_state; @@ -101,13 +101,13 @@ void grpc_executor_set_threading(bool threading) { for (size_t i = 0; i < g_max_threads; i++) { gpr_mu_init(&g_thread_state[i].mu); gpr_cv_init(&g_thread_state[i].cv); + g_thread_state[i].thd = grpc_core::Thread(); g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT; } - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&g_thread_state[0].id, "grpc_executor", executor_thread, - &g_thread_state[0], &opt); + g_thread_state[0].thd = + grpc_core::Thread("grpc_executor", executor_thread, &g_thread_state[0]); + g_thread_state[0].thd.Start(); } else { if (cur_threads == 0) return; for (size_t i = 0; i < g_max_threads; i++) { @@ -121,7 +121,7 @@ void grpc_executor_set_threading(bool threading) { gpr_spinlock_lock(&g_adding_thread_lock); gpr_spinlock_unlock(&g_adding_thread_lock); for (gpr_atm i = 0; i < g_cur_threads; i++) { - gpr_thd_join(g_thread_state[i].id); + g_thread_state[i].thd.Join(); } gpr_atm_no_barrier_store(&g_cur_threads, 0); for (size_t i = 0; i < g_max_threads; i++) { @@ -264,10 +264,10 @@ static void executor_push(grpc_closure* closure, grpc_error* error, if (cur_thread_count < g_max_threads) { gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&g_thread_state[cur_thread_count].id, "gpr_executor", - executor_thread, &g_thread_state[cur_thread_count], &opt); + g_thread_state[cur_thread_count].thd = + grpc_core::Thread("grpc_executor", executor_thread, + &g_thread_state[cur_thread_count]); + g_thread_state[cur_thread_count].thd.Start(); } gpr_spinlock_unlock(&g_adding_thread_lock); } diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc index d32fbc4588..f8645ab157 100644 --- a/src/core/lib/iomgr/fork_posix.cc +++ b/src/core/lib/iomgr/fork_posix.cc @@ -29,7 +29,7 @@ #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/fork.h" -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/timer_manager.h" @@ -53,7 +53,7 @@ void grpc_prefork() { grpc_timer_manager_set_threading(false); grpc_executor_set_threading(false); grpc_core::ExecCtx::Get()->Flush(); - if (!gpr_await_threads( + if (!grpc_core::Thread::AwaitAll( gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(3, GPR_TIMESPAN)))) { gpr_log(GPR_ERROR, "gRPC thread still active! Cannot fork!"); diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc index 5285734719..ce77231036 100644 --- a/src/core/lib/iomgr/iocp_windows.cc +++ b/src/core/lib/iomgr/iocp_windows.cc @@ -30,7 +30,7 @@ #include <grpc/support/log_windows.h> #include "src/core/lib/debug/stats.h" -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/iocp_windows.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/socket_windows.h" diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index 70a80e1998..3c2b83a549 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -31,8 +31,8 @@ #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gpr/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" diff --git a/src/core/lib/iomgr/pollset_windows.cc b/src/core/lib/iomgr/pollset_windows.cc index 62ab760875..c1b83ddc14 100644 --- a/src/core/lib/iomgr/pollset_windows.cc +++ b/src/core/lib/iomgr/pollset_windows.cc @@ -24,7 +24,7 @@ #include <grpc/support/log.h> -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/iocp_windows.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/pollset.h" diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 9307f19bcf..2f68dbe214 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -35,8 +35,8 @@ #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gpr/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index 4e1dcaabaf..7a62c88720 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -37,7 +37,7 @@ #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index 0210e70015..94f288af27 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -18,21 +18,20 @@ #include <grpc/support/port_platform.h> -#include "src/core/lib/iomgr/timer_manager.h" +#include <inttypes.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <inttypes.h> - #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/iomgr/timer_manager.h" -typedef struct completed_thread { - gpr_thd_id t; - struct completed_thread* next; -} completed_thread; +struct completed_thread { + grpc_core::Thread thd; + completed_thread* next; +}; extern grpc_core::TraceFlag grpc_timer_check_trace; @@ -68,7 +67,7 @@ static void gc_completed_threads(void) { g_completed_threads = nullptr; gpr_mu_unlock(&g_mu); while (to_gc != nullptr) { - gpr_thd_join(to_gc->t); + to_gc->thd.Join(); completed_thread* next = to_gc->next; gpr_free(to_gc); to_gc = next; @@ -85,18 +84,10 @@ static void start_timer_thread_and_unlock(void) { if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_DEBUG, "Spawn timer thread"); } - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); completed_thread* ct = static_cast<completed_thread*>(gpr_malloc(sizeof(*ct))); - // The call to gpr_thd_new() has to be under the same lock used by - // gc_completed_threads(), particularly due to ct->t, which is written here - // (internally by gpr_thd_new) and read there. Otherwise it's possible for ct - // to leak through g_completed_threads and be freed in gc_completed_threads() - // before "&ct->t" is written to, causing a use-after-free. - gpr_mu_lock(&g_mu); - gpr_thd_new(&ct->t, "grpc_global_timer", timer_thread, ct, &opt); - gpr_mu_unlock(&g_mu); + ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct); + ct->thd.Start(); } void grpc_timer_manager_tick() { diff --git a/src/core/lib/iomgr/wakeup_fd_cv.cc b/src/core/lib/iomgr/wakeup_fd_cv.cc index ee322105ae..74faa6379e 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.cc +++ b/src/core/lib/iomgr/wakeup_fd_cv.cc @@ -32,8 +32,8 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> -#include "src/core/lib/gpr/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #define MAX_TABLE_RESIZE 256 |