diff options
author | Craig Tiller <ctiller@google.com> | 2017-05-04 21:16:03 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-05-04 21:16:03 -0700 |
commit | 14a7f9a2a834c6429309a44312ae089711ddedac (patch) | |
tree | 486c121606e1516ec65cc8cdb46cc6705ad8aced /src | |
parent | 9fe1bb1bc8bfca4678019438aa3e7ee0f69fcdfa (diff) |
Reduce wakeups, comment code
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/iomgr/timer_manager.c | 73 |
1 files changed, 63 insertions, 10 deletions
diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index 1d83341299..8b83cc0106 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -52,8 +52,10 @@ static int g_thread_count; static int g_waiter_count; static completed_thread *g_completed_threads; static bool g_kicked; +static bool g_has_timed_waiter; +static uint64_t g_timed_waiter_generation; -#define MAX_WAITERS 3 +#define MAX_WAITERS 2 static void timer_thread(void *unused); @@ -92,39 +94,87 @@ void grpc_timer_manager_tick() { } static void timer_thread(void *unused) { + // this threads exec_ctx: we try to run things through to completion here + // since it's easy to spin up new threads grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); + const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); for (;;) { - gpr_timespec next = gpr_inf_future(GPR_CLOCK_MONOTONIC); + gpr_timespec next = inf_future; gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + // check timer state, updates next to the next time to run a check if (grpc_timer_check(&exec_ctx, now, &next)) { + // if there's something to execute... gpr_mu_lock(&g_mu); + // remove a waiter from the pool, and start another thread if necessary --g_waiter_count; bool start_thread = g_waiter_count == 0; if (start_thread && g_threaded) { start_timer_thread_and_unlock(); } else { + // if there's no thread waiting with a timeout, kick an existing waiter + // so that the next deadline is not missed + if (!g_has_timed_waiter) { + gpr_log(GPR_DEBUG, "kick untimed waiter"); + gpr_cv_signal(&g_cv_wait); + } gpr_mu_unlock(&g_mu); } + // without our lock, flush the exec_ctx grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(&g_mu); + // garbage collect any threads hanging out that are dead gc_completed_threads(); + // get ready to wait again ++g_waiter_count; gpr_mu_unlock(&g_mu); } else { gpr_mu_lock(&g_mu); + // if we're not threaded anymore, leave if (!g_threaded) break; - if (gpr_cv_wait(&g_cv_wait, &g_mu, next)) { - if (g_kicked) { - grpc_timer_consume_kick(); - g_kicked = false; - } else if (g_waiter_count > MAX_WAITERS) { - break; - } + // if there's no timed waiter, we should become one: that waiter waits + // only until the next timer should expire + // all other timers wait forever + uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; + if (!g_has_timed_waiter) { + g_has_timed_waiter = true; + // we use a generation counter to track the timed waiter so we can + // cancel an existing one quickly (and when it actually times out it'll + // figure stuff out instead of incurring a wakeup) + my_timed_waiter_generation = ++g_timed_waiter_generation; + gpr_log(GPR_DEBUG, "sleep for a while"); + } else { + next = inf_future; + gpr_log(GPR_DEBUG, "sleep until kicked"); + } + bool timed_out = gpr_cv_wait(&g_cv_wait, &g_mu, next); + // if we timed out and we have too many waiters, maybe exit this thread + bool should_stop = (timed_out && g_waiter_count > MAX_WAITERS); + gpr_log(GPR_DEBUG, "wait ended: was_timed:%d timed_out:%d kicked:%d", + my_timed_waiter_generation == g_timed_waiter_generation, + timed_out, g_kicked); + // if this was the timed waiter, then we need to check timers, and flag + // that there's now no timed waiter... we'll look for a replacement if + // there's work to do after checking timers (code above) + if (my_timed_waiter_generation == g_timed_waiter_generation) { + g_has_timed_waiter = false; + should_stop = false; + } + // if this was a kick from the timer system, consume it (and don't stop + // this thread yet) + if (g_kicked) { + grpc_timer_consume_kick(); + g_kicked = false; + should_stop = false; + } + if (should_stop) { + break; } - gpr_mu_unlock(&g_mu); } + gpr_mu_unlock(&g_mu); } + // terminate the thread: drop the waiter count, thread count, and let whomever + // stopped the threading stuff know that we're done --g_waiter_count; --g_thread_count; if (0 == g_thread_count) { @@ -135,6 +185,7 @@ static void timer_thread(void *unused) { ct->next = g_completed_threads; g_completed_threads = ct; gpr_mu_unlock(&g_mu); + grpc_exec_ctx_finish(&exec_ctx); gpr_log(GPR_DEBUG, "End timer thread"); } @@ -193,6 +244,8 @@ void grpc_timer_manager_set_threading(bool threaded) { void grpc_kick_poller(void) { gpr_mu_lock(&g_mu); g_kicked = true; + g_has_timed_waiter = false; + ++g_timed_waiter_generation; gpr_cv_signal(&g_cv_wait); gpr_mu_unlock(&g_mu); } |