diff options
Diffstat (limited to 'src/core/lib/iomgr/timer_manager.cc')
-rw-r--r-- | src/core/lib/iomgr/timer_manager.cc | 356 |
1 files changed, 356 insertions, 0 deletions
diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc new file mode 100644 index 0000000000..1248f82189 --- /dev/null +++ b/src/core/lib/iomgr/timer_manager.cc @@ -0,0 +1,356 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/timer_manager.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> +#include <grpc/support/thd.h> + +#include <inttypes.h> + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/timer.h" + +typedef struct completed_thread { + gpr_thd_id t; + struct completed_thread *next; +} completed_thread; + +extern "C" grpc_tracer_flag grpc_timer_check_trace; + +// global mutex +static gpr_mu g_mu; +// are we multi-threaded +static bool g_threaded; +// cv to wait until a thread is needed +static gpr_cv g_cv_wait; +// cv for notification when threading ends +static gpr_cv g_cv_shutdown; +// number of threads in the system +static int g_thread_count; +// number of threads sitting around waiting +static int g_waiter_count; +// linked list of threads that have completed (and need joining) +static completed_thread *g_completed_threads; +// was the manager kicked by the timer system +static bool g_kicked; +// is there a thread waiting until the next timer should fire? +static bool g_has_timed_waiter; +// the deadline of the current timed waiter thread (only relevant if +// g_has_timed_waiter is true) +static grpc_millis g_timed_waiter_deadline; +// generation counter to track which thread is waiting for the next timer +static uint64_t g_timed_waiter_generation; + +static void timer_thread(void *completed_thread_ptr); + +static void gc_completed_threads(void) { + if (g_completed_threads != NULL) { + completed_thread *to_gc = g_completed_threads; + g_completed_threads = NULL; + gpr_mu_unlock(&g_mu); + while (to_gc != NULL) { + gpr_thd_join(to_gc->t); + completed_thread *next = to_gc->next; + gpr_free(to_gc); + to_gc = next; + } + gpr_mu_lock(&g_mu); + } +} + +static void start_timer_thread_and_unlock(void) { + GPR_ASSERT(g_threaded); + ++g_waiter_count; + ++g_thread_count; + gpr_mu_unlock(&g_mu); + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "Spawn timer thread"); + } + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + completed_thread *ct = (completed_thread *)gpr_malloc(sizeof(*ct)); + // The call to gpr_thd_new() has to be under the same lock used by + // gc_completed_threads(), particularly due to ct->t, which is written here + // (internally by gpr_thd_new) and read there. Otherwise it's possible for ct + // to leak through g_completed_threads and be freed in gc_completed_threads() + // before "&ct->t" is written to, causing a use-after-free. + gpr_mu_lock(&g_mu); + gpr_thd_new(&ct->t, timer_thread, ct, &opt); + gpr_mu_unlock(&g_mu); +} + +void grpc_timer_manager_tick() { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_millis next = GRPC_MILLIS_INF_FUTURE; + grpc_timer_check(&exec_ctx, &next); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void run_some_timers(grpc_exec_ctx *exec_ctx) { + // if there's something to execute... + gpr_mu_lock(&g_mu); + // remove a waiter from the pool, and start another thread if necessary + --g_waiter_count; + if (g_waiter_count == 0 && g_threaded) { + start_timer_thread_and_unlock(); + } else { + // if there's no thread waiting with a timeout, kick an existing + // waiter so that the next deadline is not missed + if (!g_has_timed_waiter) { + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "kick untimed waiter"); + } + gpr_cv_signal(&g_cv_wait); + } + gpr_mu_unlock(&g_mu); + } + // without our lock, flush the exec_ctx + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "flush exec_ctx"); + } + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&g_mu); + // garbage collect any threads hanging out that are dead + gc_completed_threads(); + // get ready to wait again + ++g_waiter_count; + gpr_mu_unlock(&g_mu); +} + +// wait until 'next' (or forever if there is already a timed waiter in the pool) +// returns true if the thread should continue executing (false if it should +// shutdown) +static bool wait_until(grpc_exec_ctx *exec_ctx, grpc_millis next) { + gpr_mu_lock(&g_mu); + // if we're not threaded anymore, leave + if (!g_threaded) { + gpr_mu_unlock(&g_mu); + return false; + } + + // If g_kicked is true at this point, it means there was a kick from the timer + // system that the timer-manager threads here missed. We cannot trust 'next' + // here any longer (since there might be an earlier deadline). So if g_kicked + // is true at this point, we should quickly exit this and get the next + // deadline from the timer system + + if (!g_kicked) { + // if there's no timed waiter, we should become one: that waiter waits + // only until the next timer should expire. All other timers wait forever + // + // 'g_timed_waiter_generation' is a global generation counter. The idea here + // is that the thread becoming a timed-waiter increments and stores this + // global counter locally in 'my_timed_waiter_generation' before going to + // sleep. After waking up, if my_timed_waiter_generation == + // g_timed_waiter_generation, it can be sure that it was the timed_waiter + // thread (and that no other thread took over while this was asleep) + // + // Initialize my_timed_waiter_generation to some value that is NOT equal to + // g_timed_waiter_generation + uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; + + /* If there's no timed waiter, we should become one: that waiter waits only + until the next timer should expire. All other timer threads wait forever + unless their 'next' is earlier than the current timed-waiter's deadline + (in which case the thread with earlier 'next' takes over as the new timed + waiter) */ + if (next != GRPC_MILLIS_INF_FUTURE) { + if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) { + my_timed_waiter_generation = ++g_timed_waiter_generation; + g_has_timed_waiter = true; + g_timed_waiter_deadline = next; + + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + grpc_millis wait_time = next - grpc_exec_ctx_now(exec_ctx); + gpr_log(GPR_DEBUG, "sleep for a %" PRIdPTR " milliseconds", + wait_time); + } + } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline + next = GRPC_MILLIS_INF_FUTURE; + } + } + + if (GRPC_TRACER_ON(grpc_timer_check_trace) && + next == GRPC_MILLIS_INF_FUTURE) { + gpr_log(GPR_DEBUG, "sleep until kicked"); + } + + gpr_cv_wait(&g_cv_wait, &g_mu, + grpc_millis_to_timespec(next, GPR_CLOCK_REALTIME)); + + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", + my_timed_waiter_generation == g_timed_waiter_generation, + g_kicked); + } + // if this was the timed waiter, then we need to check timers, and flag + // that there's now no timed waiter... we'll look for a replacement if + // there's work to do after checking timers (code above) + if (my_timed_waiter_generation == g_timed_waiter_generation) { + g_has_timed_waiter = false; + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; + } + } + + // if this was a kick from the timer system, consume it (and don't stop + // this thread yet) + if (g_kicked) { + grpc_timer_consume_kick(); + g_kicked = false; + } + + gpr_mu_unlock(&g_mu); + return true; +} + +static void timer_main_loop(grpc_exec_ctx *exec_ctx) { + for (;;) { + grpc_millis next = GRPC_MILLIS_INF_FUTURE; + grpc_exec_ctx_invalidate_now(exec_ctx); + // check timer state, updates next to the next time to run a check + switch (grpc_timer_check(exec_ctx, &next)) { + case GRPC_TIMERS_FIRED: + run_some_timers(exec_ctx); + break; + case GRPC_TIMERS_NOT_CHECKED: + /* This case only happens under contention, meaning more than one timer + manager thread checked timers concurrently. + + If that happens, we're guaranteed that some other thread has just + checked timers, and this will avalanche into some other thread seeing + empty timers and doing a timed sleep. + + Consequently, we can just sleep forever here and be happy at some + saved wakeup cycles. */ + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "timers not checked: expect another thread to"); + } + next = GRPC_MILLIS_INF_FUTURE; + /* fall through */ + case GRPC_TIMERS_CHECKED_AND_EMPTY: + if (!wait_until(exec_ctx, next)) { + return; + } + break; + } + } +} + +static void timer_thread_cleanup(completed_thread *ct) { + gpr_mu_lock(&g_mu); + // terminate the thread: drop the waiter count, thread count, and let whomever + // stopped the threading stuff know that we're done + --g_waiter_count; + --g_thread_count; + if (0 == g_thread_count) { + gpr_cv_signal(&g_cv_shutdown); + } + ct->next = g_completed_threads; + g_completed_threads = ct; + gpr_mu_unlock(&g_mu); + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "End timer thread"); + } +} + +static void timer_thread(void *completed_thread_ptr) { + // this threads exec_ctx: we try to run things through to completion here + // since it's easy to spin up new threads + grpc_exec_ctx exec_ctx = + GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); + timer_main_loop(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); + timer_thread_cleanup((completed_thread *)completed_thread_ptr); +} + +static void start_threads(void) { + gpr_mu_lock(&g_mu); + if (!g_threaded) { + g_threaded = true; + start_timer_thread_and_unlock(); + } else { + g_threaded = false; + gpr_mu_unlock(&g_mu); + } +} + +void grpc_timer_manager_init(void) { + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv_wait); + gpr_cv_init(&g_cv_shutdown); + g_threaded = false; + g_thread_count = 0; + g_waiter_count = 0; + g_completed_threads = NULL; + + g_has_timed_waiter = false; + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; + + start_threads(); +} + +static void stop_threads(void) { + gpr_mu_lock(&g_mu); + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "stop timer threads: threaded=%d", g_threaded); + } + if (g_threaded) { + g_threaded = false; + gpr_cv_broadcast(&g_cv_wait); + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count); + } + while (g_thread_count > 0) { + gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count); + } + gc_completed_threads(); + } + } + gpr_mu_unlock(&g_mu); +} + +void grpc_timer_manager_shutdown(void) { + stop_threads(); + + gpr_mu_destroy(&g_mu); + gpr_cv_destroy(&g_cv_wait); + gpr_cv_destroy(&g_cv_shutdown); +} + +void grpc_timer_manager_set_threading(bool threaded) { + if (threaded) { + start_threads(); + } else { + stop_threads(); + } +} + +void grpc_kick_poller(void) { + gpr_mu_lock(&g_mu); + g_kicked = true; + g_has_timed_waiter = false; + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; + ++g_timed_waiter_generation; + gpr_cv_signal(&g_cv_wait); + gpr_mu_unlock(&g_mu); +} |