aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/lib/iomgr/timer_manager.c73
1 files changed, 63 insertions, 10 deletions
diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c
index 1d83341299..8b83cc0106 100644
--- a/src/core/lib/iomgr/timer_manager.c
+++ b/src/core/lib/iomgr/timer_manager.c
@@ -52,8 +52,10 @@ static int g_thread_count;
static int g_waiter_count;
static completed_thread *g_completed_threads;
static bool g_kicked;
+static bool g_has_timed_waiter;
+static uint64_t g_timed_waiter_generation;
-#define MAX_WAITERS 3
+#define MAX_WAITERS 2
static void timer_thread(void *unused);
@@ -92,39 +94,87 @@ void grpc_timer_manager_tick() {
}
static void timer_thread(void *unused) {
+ // this threads exec_ctx: we try to run things through to completion here
+ // since it's easy to spin up new threads
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
+ const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
for (;;) {
- gpr_timespec next = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ gpr_timespec next = inf_future;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ // check timer state, updates next to the next time to run a check
if (grpc_timer_check(&exec_ctx, now, &next)) {
+ // if there's something to execute...
gpr_mu_lock(&g_mu);
+ // remove a waiter from the pool, and start another thread if necessary
--g_waiter_count;
bool start_thread = g_waiter_count == 0;
if (start_thread && g_threaded) {
start_timer_thread_and_unlock();
} else {
+ // if there's no thread waiting with a timeout, kick an existing waiter
+ // so that the next deadline is not missed
+ if (!g_has_timed_waiter) {
+ gpr_log(GPR_DEBUG, "kick untimed waiter");
+ gpr_cv_signal(&g_cv_wait);
+ }
gpr_mu_unlock(&g_mu);
}
+ // without our lock, flush the exec_ctx
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(&g_mu);
+ // garbage collect any threads hanging out that are dead
gc_completed_threads();
+ // get ready to wait again
++g_waiter_count;
gpr_mu_unlock(&g_mu);
} else {
gpr_mu_lock(&g_mu);
+ // if we're not threaded anymore, leave
if (!g_threaded) break;
- if (gpr_cv_wait(&g_cv_wait, &g_mu, next)) {
- if (g_kicked) {
- grpc_timer_consume_kick();
- g_kicked = false;
- } else if (g_waiter_count > MAX_WAITERS) {
- break;
- }
+ // if there's no timed waiter, we should become one: that waiter waits
+ // only until the next timer should expire
+ // all other timers wait forever
+ uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
+ if (!g_has_timed_waiter) {
+ g_has_timed_waiter = true;
+ // we use a generation counter to track the timed waiter so we can
+ // cancel an existing one quickly (and when it actually times out it'll
+ // figure stuff out instead of incurring a wakeup)
+ my_timed_waiter_generation = ++g_timed_waiter_generation;
+ gpr_log(GPR_DEBUG, "sleep for a while");
+ } else {
+ next = inf_future;
+ gpr_log(GPR_DEBUG, "sleep until kicked");
+ }
+ bool timed_out = gpr_cv_wait(&g_cv_wait, &g_mu, next);
+ // if we timed out and we have too many waiters, maybe exit this thread
+ bool should_stop = (timed_out && g_waiter_count > MAX_WAITERS);
+ gpr_log(GPR_DEBUG, "wait ended: was_timed:%d timed_out:%d kicked:%d",
+ my_timed_waiter_generation == g_timed_waiter_generation,
+ timed_out, g_kicked);
+ // if this was the timed waiter, then we need to check timers, and flag
+ // that there's now no timed waiter... we'll look for a replacement if
+ // there's work to do after checking timers (code above)
+ if (my_timed_waiter_generation == g_timed_waiter_generation) {
+ g_has_timed_waiter = false;
+ should_stop = false;
+ }
+ // if this was a kick from the timer system, consume it (and don't stop
+ // this thread yet)
+ if (g_kicked) {
+ grpc_timer_consume_kick();
+ g_kicked = false;
+ should_stop = false;
+ }
+ if (should_stop) {
+ break;
}
- gpr_mu_unlock(&g_mu);
}
+ gpr_mu_unlock(&g_mu);
}
+ // terminate the thread: drop the waiter count, thread count, and let whomever
+ // stopped the threading stuff know that we're done
--g_waiter_count;
--g_thread_count;
if (0 == g_thread_count) {
@@ -135,6 +185,7 @@ static void timer_thread(void *unused) {
ct->next = g_completed_threads;
g_completed_threads = ct;
gpr_mu_unlock(&g_mu);
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_log(GPR_DEBUG, "End timer thread");
}
@@ -193,6 +244,8 @@ void grpc_timer_manager_set_threading(bool threaded) {
void grpc_kick_poller(void) {
gpr_mu_lock(&g_mu);
g_kicked = true;
+ g_has_timed_waiter = false;
+ ++g_timed_waiter_generation;
gpr_cv_signal(&g_cv_wait);
gpr_mu_unlock(&g_mu);
}