aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc66
-rw-r--r--src/core/lib/iomgr/exec_ctx.cc2
-rw-r--r--src/core/lib/iomgr/executor.cc22
-rw-r--r--src/core/lib/iomgr/fork_posix.cc4
-rw-r--r--src/core/lib/iomgr/iocp_windows.cc2
-rw-r--r--src/core/lib/iomgr/iomgr.cc2
-rw-r--r--src/core/lib/iomgr/pollset_windows.cc2
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.cc2
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.cc2
-rw-r--r--src/core/lib/iomgr/timer_manager.cc29
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.cc2
11 files changed, 87 insertions, 48 deletions
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index e979ff7eb5..6120f9f44b 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -38,9 +38,9 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/murmur_hash.h"
-#include "src/core/lib/gpr/thd.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/wakeup_fd_cv.h"
@@ -255,8 +255,13 @@ typedef struct poll_result {
} poll_result;
typedef struct poll_args {
+ grpc_core::Thread poller_thd;
gpr_cv trigger;
int trigger_set;
+ bool harvestable;
+ gpr_cv harvest;
+ bool joinable;
+ gpr_cv join;
struct pollfd* fds;
nfds_t nfds;
poll_result* result;
@@ -266,15 +271,17 @@ typedef struct poll_args {
// This is a 2-tiered cache, we mantain a hash table
// of active poll calls, so we can wait on the result
-// of that call. We also maintain a freelist of inactive
-// poll threads.
+// of that call. We also maintain freelists of inactive
+// poll args and of dead poller threads.
typedef struct poll_hash_table {
poll_args* free_pollers;
poll_args** active_pollers;
+ poll_args* dead_pollers;
unsigned int size;
unsigned int count;
} poll_hash_table;
+// TODO(kpayson64): Eliminate use of global non-POD variables
poll_hash_table poll_cache;
grpc_cv_fd_table g_cvfds;
@@ -1301,6 +1308,7 @@ static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
static void run_poll(void* args);
static void cache_poller_locked(poll_args* args);
+static void cache_harvest_locked();
static void cache_insert_locked(poll_args* args) {
uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd),
@@ -1363,6 +1371,10 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) {
poll_args* pargs =
static_cast<poll_args*>(gpr_malloc(sizeof(struct poll_args)));
gpr_cv_init(&pargs->trigger);
+ gpr_cv_init(&pargs->harvest);
+ gpr_cv_init(&pargs->join);
+ pargs->harvestable = false;
+ pargs->joinable = false;
pargs->fds = fds;
pargs->nfds = count;
pargs->next = nullptr;
@@ -1370,11 +1382,9 @@ static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) {
pargs->trigger_set = 0;
init_result(pargs);
cache_poller_locked(pargs);
- gpr_thd_id t_id;
- gpr_thd_options opt = gpr_thd_options_default();
gpr_ref(&g_cvfds.pollcount);
- gpr_thd_options_set_detached(&opt);
- GPR_ASSERT(gpr_thd_new(&t_id, "grpc_poller", &run_poll, pargs, &opt));
+ pargs->poller_thd = grpc_core::Thread("grpc_poller", &run_poll, pargs);
+ pargs->poller_thd.Start();
return pargs;
}
@@ -1439,7 +1449,33 @@ static void cache_destroy_locked(poll_args* args) {
poll_cache.free_pollers = args->next;
}
- gpr_free(args);
+ // Now move this args to the dead poller list for later join
+ if (poll_cache.dead_pollers != nullptr) {
+ poll_cache.dead_pollers->prev = args;
+ }
+ args->prev = nullptr;
+ args->next = poll_cache.dead_pollers;
+ poll_cache.dead_pollers = args;
+}
+
+static void cache_harvest_locked() {
+ while (poll_cache.dead_pollers) {
+ poll_args* args = poll_cache.dead_pollers;
+ poll_cache.dead_pollers = poll_cache.dead_pollers->next;
+ // Keep the list consistent in case new dead pollers get added when we
+ // release the lock below to wait on joining
+ if (poll_cache.dead_pollers) {
+ poll_cache.dead_pollers->prev = nullptr;
+ }
+ args->harvestable = true;
+ gpr_cv_signal(&args->harvest);
+ while (!args->joinable) {
+ gpr_cv_wait(&args->join, &g_cvfds.mu,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
+ args->poller_thd.Join();
+ gpr_free(args);
+ }
}
static void decref_poll_result(poll_result* res) {
@@ -1471,6 +1507,7 @@ static void run_poll(void* args) {
poll_result* result = pargs->result;
int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS);
gpr_mu_lock(&g_cvfds.mu);
+ cache_harvest_locked();
if (retval != 0) {
result->completed = 1;
result->retval = retval;
@@ -1490,6 +1527,7 @@ static void run_poll(void* args) {
deadline = gpr_time_add(deadline, thread_grace);
pargs->trigger_set = 0;
gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline);
+ cache_harvest_locked();
if (!pargs->trigger_set) {
cache_destroy_locked(pargs);
break;
@@ -1498,10 +1536,15 @@ static void run_poll(void* args) {
gpr_mu_unlock(&g_cvfds.mu);
}
- // We still have the lock here
if (gpr_unref(&g_cvfds.pollcount)) {
gpr_cv_signal(&g_cvfds.shutdown_cv);
}
+ while (!pargs->harvestable) {
+ gpr_cv_wait(&pargs->harvest, &g_cvfds.mu,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ }
+ pargs->joinable = true;
+ gpr_cv_signal(&pargs->join);
gpr_mu_unlock(&g_cvfds.mu);
}
@@ -1514,6 +1557,7 @@ static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) {
nfds_t nsockfds = 0;
poll_result* result = nullptr;
gpr_mu_lock(&g_cvfds.mu);
+ cache_harvest_locked();
pollcv = static_cast<grpc_cv_node*>(gpr_malloc(sizeof(grpc_cv_node)));
pollcv->next = nullptr;
gpr_cv pollcv_cv;
@@ -1577,12 +1621,14 @@ static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) {
pargs->trigger_set = 1;
gpr_cv_signal(&pargs->trigger);
gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
+ cache_harvest_locked();
res = result->retval;
errno = result->err;
result->watchcount--;
remove_cvn(&result->watchers, pollcv);
} else if (!skip_poll) {
gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
+ cache_harvest_locked();
}
idx = 0;
@@ -1639,6 +1685,7 @@ static void global_cv_fd_table_init() {
for (unsigned int i = 0; i < poll_cache.size; i++) {
poll_cache.active_pollers[i] = nullptr;
}
+ poll_cache.dead_pollers = nullptr;
gpr_mu_unlock(&g_cvfds.mu);
}
@@ -1657,6 +1704,7 @@ static void global_cv_fd_table_shutdown() {
grpc_poll_function = g_cvfds.poll;
gpr_free(g_cvfds.cvfds);
+ cache_harvest_locked();
gpr_free(poll_cache.active_pollers);
gpr_mu_unlock(&g_cvfds.mu);
diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc
index 132fe87870..2f544b20ab 100644
--- a/src/core/lib/iomgr/exec_ctx.cc
+++ b/src/core/lib/iomgr/exec_ctx.cc
@@ -23,7 +23,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/profiling/timers.h"
diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc
index e7f412a562..b017db53f8 100644
--- a/src/core/lib/iomgr/executor.cc
+++ b/src/core/lib/iomgr/executor.cc
@@ -29,9 +29,9 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/spinlock.h"
-#include "src/core/lib/gpr/thd.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#define MAX_DEPTH 2
@@ -43,7 +43,7 @@ typedef struct {
size_t depth;
bool shutdown;
bool queued_long_job;
- gpr_thd_id id;
+ grpc_core::Thread thd;
} thread_state;
static thread_state* g_thread_state;
@@ -101,13 +101,13 @@ void grpc_executor_set_threading(bool threading) {
for (size_t i = 0; i < g_max_threads; i++) {
gpr_mu_init(&g_thread_state[i].mu);
gpr_cv_init(&g_thread_state[i].cv);
+ g_thread_state[i].thd = grpc_core::Thread();
g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT;
}
- gpr_thd_options opt = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&opt);
- gpr_thd_new(&g_thread_state[0].id, "grpc_executor", executor_thread,
- &g_thread_state[0], &opt);
+ g_thread_state[0].thd =
+ grpc_core::Thread("grpc_executor", executor_thread, &g_thread_state[0]);
+ g_thread_state[0].thd.Start();
} else {
if (cur_threads == 0) return;
for (size_t i = 0; i < g_max_threads; i++) {
@@ -121,7 +121,7 @@ void grpc_executor_set_threading(bool threading) {
gpr_spinlock_lock(&g_adding_thread_lock);
gpr_spinlock_unlock(&g_adding_thread_lock);
for (gpr_atm i = 0; i < g_cur_threads; i++) {
- gpr_thd_join(g_thread_state[i].id);
+ g_thread_state[i].thd.Join();
}
gpr_atm_no_barrier_store(&g_cur_threads, 0);
for (size_t i = 0; i < g_max_threads; i++) {
@@ -264,10 +264,10 @@ static void executor_push(grpc_closure* closure, grpc_error* error,
if (cur_thread_count < g_max_threads) {
gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
- gpr_thd_options opt = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&opt);
- gpr_thd_new(&g_thread_state[cur_thread_count].id, "gpr_executor",
- executor_thread, &g_thread_state[cur_thread_count], &opt);
+ g_thread_state[cur_thread_count].thd =
+ grpc_core::Thread("grpc_executor", executor_thread,
+ &g_thread_state[cur_thread_count]);
+ g_thread_state[cur_thread_count].thd.Start();
}
gpr_spinlock_unlock(&g_adding_thread_lock);
}
diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc
index d32fbc4588..f8645ab157 100644
--- a/src/core/lib/iomgr/fork_posix.cc
+++ b/src/core/lib/iomgr/fork_posix.cc
@@ -29,7 +29,7 @@
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/fork.h"
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer_manager.h"
@@ -53,7 +53,7 @@ void grpc_prefork() {
grpc_timer_manager_set_threading(false);
grpc_executor_set_threading(false);
grpc_core::ExecCtx::Get()->Flush();
- if (!gpr_await_threads(
+ if (!grpc_core::Thread::AwaitAll(
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(3, GPR_TIMESPAN)))) {
gpr_log(GPR_ERROR, "gRPC thread still active! Cannot fork!");
diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc
index 5285734719..ce77231036 100644
--- a/src/core/lib/iomgr/iocp_windows.cc
+++ b/src/core/lib/iomgr/iocp_windows.cc
@@ -30,7 +30,7 @@
#include <grpc/support/log_windows.h>
#include "src/core/lib/debug/stats.h"
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/socket_windows.h"
diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc
index 70a80e1998..3c2b83a549 100644
--- a/src/core/lib/iomgr/iomgr.cc
+++ b/src/core/lib/iomgr/iomgr.cc
@@ -31,8 +31,8 @@
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/gpr/thd.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
diff --git a/src/core/lib/iomgr/pollset_windows.cc b/src/core/lib/iomgr/pollset_windows.cc
index 62ab760875..c1b83ddc14 100644
--- a/src/core/lib/iomgr/pollset_windows.cc
+++ b/src/core/lib/iomgr/pollset_windows.cc
@@ -24,7 +24,7 @@
#include <grpc/support/log.h>
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/pollset.h"
diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc
index 9307f19bcf..2f68dbe214 100644
--- a/src/core/lib/iomgr/resolve_address_posix.cc
+++ b/src/core/lib/iomgr/resolve_address_posix.cc
@@ -35,8 +35,8 @@
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/gpr/thd.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc
index 4e1dcaabaf..7a62c88720 100644
--- a/src/core/lib/iomgr/resolve_address_windows.cc
+++ b/src/core/lib/iomgr/resolve_address_windows.cc
@@ -37,7 +37,7 @@
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc
index 0210e70015..94f288af27 100644
--- a/src/core/lib/iomgr/timer_manager.cc
+++ b/src/core/lib/iomgr/timer_manager.cc
@@ -18,21 +18,20 @@
#include <grpc/support/port_platform.h>
-#include "src/core/lib/iomgr/timer_manager.h"
+#include <inttypes.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <inttypes.h>
-
#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/gpr/thd.h"
+#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/iomgr/timer_manager.h"
-typedef struct completed_thread {
- gpr_thd_id t;
- struct completed_thread* next;
-} completed_thread;
+struct completed_thread {
+ grpc_core::Thread thd;
+ completed_thread* next;
+};
extern grpc_core::TraceFlag grpc_timer_check_trace;
@@ -68,7 +67,7 @@ static void gc_completed_threads(void) {
g_completed_threads = nullptr;
gpr_mu_unlock(&g_mu);
while (to_gc != nullptr) {
- gpr_thd_join(to_gc->t);
+ to_gc->thd.Join();
completed_thread* next = to_gc->next;
gpr_free(to_gc);
to_gc = next;
@@ -85,18 +84,10 @@ static void start_timer_thread_and_unlock(void) {
if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_DEBUG, "Spawn timer thread");
}
- gpr_thd_options opt = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&opt);
completed_thread* ct =
static_cast<completed_thread*>(gpr_malloc(sizeof(*ct)));
- // The call to gpr_thd_new() has to be under the same lock used by
- // gc_completed_threads(), particularly due to ct->t, which is written here
- // (internally by gpr_thd_new) and read there. Otherwise it's possible for ct
- // to leak through g_completed_threads and be freed in gc_completed_threads()
- // before "&ct->t" is written to, causing a use-after-free.
- gpr_mu_lock(&g_mu);
- gpr_thd_new(&ct->t, "grpc_global_timer", timer_thread, ct, &opt);
- gpr_mu_unlock(&g_mu);
+ ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct);
+ ct->thd.Start();
}
void grpc_timer_manager_tick() {
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.cc b/src/core/lib/iomgr/wakeup_fd_cv.cc
index ee322105ae..74faa6379e 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.cc
+++ b/src/core/lib/iomgr/wakeup_fd_cv.cc
@@ -32,8 +32,8 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
-#include "src/core/lib/gpr/thd.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/thd.h"
#define MAX_TABLE_RESIZE 256