/* * * Copyright 2017, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ #include "src/core/lib/iomgr/timer_manager.h" #include #include #include #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/timer.h" typedef struct completed_thread { gpr_thd_id t; struct completed_thread *next; } completed_thread; extern grpc_tracer_flag grpc_timer_check_trace; // global mutex static gpr_mu g_mu; // are we multi-threaded static bool g_threaded; // cv to wait until a thread is needed static gpr_cv g_cv_wait; // cv for notification when threading ends static gpr_cv g_cv_shutdown; // number of threads in the system static int g_thread_count; // number of threads sitting around waiting static int g_waiter_count; // linked list of threads that have completed (and need joining) static completed_thread *g_completed_threads; // was the manager kicked by the timer system static bool g_kicked; // is there a thread waiting until the next timer should fire? static bool g_has_timed_waiter; // generation counter to track which thread is waiting for the next timer static uint64_t g_timed_waiter_generation; static void timer_thread(void *unused); static void gc_completed_threads(void) { if (g_completed_threads != NULL) { completed_thread *to_gc = g_completed_threads; g_completed_threads = NULL; gpr_mu_unlock(&g_mu); while (to_gc != NULL) { gpr_thd_join(to_gc->t); completed_thread *next = to_gc->next; gpr_free(to_gc); to_gc = next; } gpr_mu_lock(&g_mu); } } static void start_timer_thread_and_unlock(void) { GPR_ASSERT(g_threaded); ++g_waiter_count; ++g_thread_count; gpr_mu_unlock(&g_mu); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "Spawn timer thread"); } gpr_thd_id thd; gpr_thd_options opt = gpr_thd_options_default(); gpr_thd_options_set_joinable(&opt); gpr_thd_new(&thd, timer_thread, NULL, &opt); } void grpc_timer_manager_tick() { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_timespec next = gpr_inf_future(GPR_CLOCK_MONOTONIC); gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); grpc_timer_check(&exec_ctx, now, &next); grpc_exec_ctx_finish(&exec_ctx); } static void timer_thread(void *unused) { // this threads exec_ctx: we try to run things through to completion here // since it's easy to spin up new threads grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); for (;;) { gpr_timespec next = inf_future; gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); // check timer state, updates next to the next time to run a check if (grpc_timer_check(&exec_ctx, now, &next)) { // if there's something to execute... gpr_mu_lock(&g_mu); // remove a waiter from the pool, and start another thread if necessary --g_waiter_count; if (g_waiter_count == 0 && g_threaded) { start_timer_thread_and_unlock(); } else { // if there's no thread waiting with a timeout, kick an existing waiter // so that the next deadline is not missed if (!g_has_timed_waiter) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "kick untimed waiter"); } gpr_cv_signal(&g_cv_wait); } gpr_mu_unlock(&g_mu); } // without our lock, flush the exec_ctx grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(&g_mu); // garbage collect any threads hanging out that are dead gc_completed_threads(); // get ready to wait again ++g_waiter_count; gpr_mu_unlock(&g_mu); } else { gpr_mu_lock(&g_mu); // if we're not threaded anymore, leave if (!g_threaded) break; // if there's no timed waiter, we should become one: that waiter waits // only until the next timer should expire // all other timers wait forever uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; if (!g_has_timed_waiter) { g_has_timed_waiter = true; // we use a generation counter to track the timed waiter so we can // cancel an existing one quickly (and when it actually times out it'll // figure stuff out instead of incurring a wakeup) my_timed_waiter_generation = ++g_timed_waiter_generation; if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "sleep for a while"); } } else { next = inf_future; if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "sleep until kicked"); } } gpr_cv_wait(&g_cv_wait, &g_mu, next); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", my_timed_waiter_generation == g_timed_waiter_generation, g_kicked); } // if this was the timed waiter, then we need to check timers, and flag // that there's now no timed waiter... we'll look for a replacement if // there's work to do after checking timers (code above) if (my_timed_waiter_generation == g_timed_waiter_generation) { g_has_timed_waiter = false; } // if this was a kick from the timer system, consume it (and don't stop // this thread yet) if (g_kicked) { grpc_timer_consume_kick(); g_kicked = false; } gpr_mu_unlock(&g_mu); } } // terminate the thread: drop the waiter count, thread count, and let whomever // stopped the threading stuff know that we're done --g_waiter_count; --g_thread_count; if (0 == g_thread_count) { gpr_cv_signal(&g_cv_shutdown); } completed_thread *ct = gpr_malloc(sizeof(*ct)); ct->t = gpr_thd_currentid(); ct->next = g_completed_threads; g_completed_threads = ct; gpr_mu_unlock(&g_mu); grpc_exec_ctx_finish(&exec_ctx); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "End timer thread"); } } static void start_threads(void) { gpr_mu_lock(&g_mu); if (!g_threaded) { g_threaded = true; start_timer_thread_and_unlock(); } else { g_threaded = false; gpr_mu_unlock(&g_mu); } } void grpc_timer_manager_init(void) { gpr_mu_init(&g_mu); gpr_cv_init(&g_cv_wait); gpr_cv_init(&g_cv_shutdown); g_threaded = false; g_thread_count = 0; g_waiter_count = 0; g_completed_threads = NULL; start_threads(); } static void stop_threads(void) { gpr_mu_lock(&g_mu); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "stop timer threads: threaded=%d", g_threaded); } if (g_threaded) { g_threaded = false; gpr_cv_broadcast(&g_cv_wait); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count); } while (g_thread_count > 0) { gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count); } gc_completed_threads(); } } gpr_mu_unlock(&g_mu); } void grpc_timer_manager_shutdown(void) { stop_threads(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_cv_wait); gpr_cv_destroy(&g_cv_shutdown); } void grpc_timer_manager_set_threading(bool threaded) { if (threaded) { start_threads(); } else { stop_threads(); } } void grpc_kick_poller(void) { gpr_mu_lock(&g_mu); g_kicked = true; g_has_timed_waiter = false; ++g_timed_waiter_generation; gpr_cv_signal(&g_cv_wait); gpr_mu_unlock(&g_mu); }