aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/timer_manager.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/timer_manager.c')
-rw-r--r--src/core/lib/iomgr/timer_manager.c225
1 files changed, 128 insertions, 97 deletions
diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c
index fd9a7a2f04..3154ddd7f5 100644
--- a/src/core/lib/iomgr/timer_manager.c
+++ b/src/core/lib/iomgr/timer_manager.c
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2017, Google Inc.
- * All rights reserved.
+ * Copyright 2017 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -68,7 +53,7 @@ static bool g_has_timed_waiter;
// generation counter to track which thread is waiting for the next timer
static uint64_t g_timed_waiter_generation;
-static void timer_thread(void *unused);
+static void timer_thread(void *completed_thread_ptr);
static void gc_completed_threads(void) {
if (g_completed_threads != NULL) {
@@ -107,86 +92,124 @@ void grpc_timer_manager_tick() {
grpc_exec_ctx_finish(&exec_ctx);
}
-static void timer_thread(void *completed_thread_ptr) {
- // this threads exec_ctx: we try to run things through to completion here
- // since it's easy to spin up new threads
- grpc_exec_ctx exec_ctx =
- GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
+static void run_some_timers(grpc_exec_ctx *exec_ctx) {
+ // if there's something to execute...
+ gpr_mu_lock(&g_mu);
+ // remove a waiter from the pool, and start another thread if necessary
+ --g_waiter_count;
+ if (g_waiter_count == 0 && g_threaded) {
+ start_timer_thread_and_unlock();
+ } else {
+ // if there's no thread waiting with a timeout, kick an existing
+ // waiter
+ // so that the next deadline is not missed
+ if (!g_has_timed_waiter) {
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "kick untimed waiter");
+ }
+ gpr_cv_signal(&g_cv_wait);
+ }
+ gpr_mu_unlock(&g_mu);
+ }
+ // without our lock, flush the exec_ctx
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&g_mu);
+ // garbage collect any threads hanging out that are dead
+ gc_completed_threads();
+ // get ready to wait again
+ ++g_waiter_count;
+ gpr_mu_unlock(&g_mu);
+}
+
+// wait until 'next' (or forever if there is already a timed waiter in the pool)
+// returns true if the thread should continue executing (false if it should
+// shutdown)
+static bool wait_until(gpr_timespec next) {
+ const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ gpr_mu_lock(&g_mu);
+ // if we're not threaded anymore, leave
+ if (!g_threaded) {
+ gpr_mu_unlock(&g_mu);
+ return false;
+ }
+ // if there's no timed waiter, we should become one: that waiter waits
+ // only until the next timer should expire
+ // all other timers wait forever
+ uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
+ if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) {
+ g_has_timed_waiter = true;
+ // we use a generation counter to track the timed waiter so we can
+ // cancel an existing one quickly (and when it actually times out it'll
+ // figure stuff out instead of incurring a wakeup)
+ my_timed_waiter_generation = ++g_timed_waiter_generation;
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_timespec wait_time = gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds",
+ wait_time.tv_sec, wait_time.tv_nsec);
+ }
+ } else {
+ next = inf_future;
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "sleep until kicked");
+ }
+ }
+ gpr_cv_wait(&g_cv_wait, &g_mu, next);
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
+ my_timed_waiter_generation == g_timed_waiter_generation, g_kicked);
+ }
+ // if this was the timed waiter, then we need to check timers, and flag
+ // that there's now no timed waiter... we'll look for a replacement if
+ // there's work to do after checking timers (code above)
+ if (my_timed_waiter_generation == g_timed_waiter_generation) {
+ g_has_timed_waiter = false;
+ }
+ // if this was a kick from the timer system, consume it (and don't stop
+ // this thread yet)
+ if (g_kicked) {
+ grpc_timer_consume_kick();
+ g_kicked = false;
+ }
+ gpr_mu_unlock(&g_mu);
+ return true;
+}
+
+static void timer_main_loop(grpc_exec_ctx *exec_ctx) {
const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
for (;;) {
gpr_timespec next = inf_future;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
// check timer state, updates next to the next time to run a check
- if (grpc_timer_check(&exec_ctx, now, &next)) {
- // if there's something to execute...
- gpr_mu_lock(&g_mu);
- // remove a waiter from the pool, and start another thread if necessary
- --g_waiter_count;
- if (g_waiter_count == 0 && g_threaded) {
- start_timer_thread_and_unlock();
- } else {
- // if there's no thread waiting with a timeout, kick an existing waiter
- // so that the next deadline is not missed
- if (!g_has_timed_waiter) {
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "kick untimed waiter");
- }
- gpr_cv_signal(&g_cv_wait);
- }
- gpr_mu_unlock(&g_mu);
- }
- // without our lock, flush the exec_ctx
- grpc_exec_ctx_flush(&exec_ctx);
- gpr_mu_lock(&g_mu);
- // garbage collect any threads hanging out that are dead
- gc_completed_threads();
- // get ready to wait again
- ++g_waiter_count;
- gpr_mu_unlock(&g_mu);
- } else {
- gpr_mu_lock(&g_mu);
- // if we're not threaded anymore, leave
- if (!g_threaded) break;
- // if there's no timed waiter, we should become one: that waiter waits
- // only until the next timer should expire
- // all other timers wait forever
- uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
- if (!g_has_timed_waiter) {
- g_has_timed_waiter = true;
- // we use a generation counter to track the timed waiter so we can
- // cancel an existing one quickly (and when it actually times out it'll
- // figure stuff out instead of incurring a wakeup)
- my_timed_waiter_generation = ++g_timed_waiter_generation;
+ switch (grpc_timer_check(exec_ctx, now, &next)) {
+ case GRPC_TIMERS_FIRED:
+ run_some_timers(exec_ctx);
+ break;
+ case GRPC_TIMERS_NOT_CHECKED:
+ /* This case only happens under contention, meaning more than one timer
+ manager thread checked timers concurrently.
+
+ If that happens, we're guaranteed that some other thread has just
+ checked timers, and this will avalanche into some other thread seeing
+ empty timers and doing a timed sleep.
+
+ Consequently, we can just sleep forever here and be happy at some
+ saved wakeup cycles. */
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "sleep for a while");
+ gpr_log(GPR_DEBUG, "timers not checked: expect another thread to");
}
- } else {
next = inf_future;
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "sleep until kicked");
+ /* fall through */
+ case GRPC_TIMERS_CHECKED_AND_EMPTY:
+ if (!wait_until(next)) {
+ return;
}
- }
- gpr_cv_wait(&g_cv_wait, &g_mu, next);
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
- my_timed_waiter_generation == g_timed_waiter_generation,
- g_kicked);
- }
- // if this was the timed waiter, then we need to check timers, and flag
- // that there's now no timed waiter... we'll look for a replacement if
- // there's work to do after checking timers (code above)
- if (my_timed_waiter_generation == g_timed_waiter_generation) {
- g_has_timed_waiter = false;
- }
- // if this was a kick from the timer system, consume it (and don't stop
- // this thread yet)
- if (g_kicked) {
- grpc_timer_consume_kick();
- g_kicked = false;
- }
- gpr_mu_unlock(&g_mu);
+ break;
}
}
+}
+
+static void timer_thread_cleanup(completed_thread *ct) {
+ gpr_mu_lock(&g_mu);
// terminate the thread: drop the waiter count, thread count, and let whomever
// stopped the threading stuff know that we're done
--g_waiter_count;
@@ -194,16 +217,24 @@ static void timer_thread(void *completed_thread_ptr) {
if (0 == g_thread_count) {
gpr_cv_signal(&g_cv_shutdown);
}
- completed_thread *ct = completed_thread_ptr;
ct->next = g_completed_threads;
g_completed_threads = ct;
gpr_mu_unlock(&g_mu);
- grpc_exec_ctx_finish(&exec_ctx);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "End timer thread");
}
}
+static void timer_thread(void *completed_thread_ptr) {
+ // this threads exec_ctx: we try to run things through to completion here
+ // since it's easy to spin up new threads
+ grpc_exec_ctx exec_ctx =
+ GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
+ timer_main_loop(&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
+ timer_thread_cleanup(completed_thread_ptr);
+}
+
static void start_threads(void) {
gpr_mu_lock(&g_mu);
if (!g_threaded) {