aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/lib/iomgr/iomgr.c3
-rw-r--r--src/core/lib/iomgr/timer.h10
-rw-r--r--src/core/lib/iomgr/timer_generic.c53
-rw-r--r--src/core/lib/iomgr/timer_manager.c183
-rw-r--r--src/core/lib/iomgr/timer_uv.c6
5 files changed, 149 insertions, 106 deletions
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 1fd41c2f88..d0071bfc94 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -107,7 +107,8 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) {
}
last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
}
- if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
+ if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) ==
+ GRPC_TIMERS_FIRED) {
gpr_mu_unlock(&g_mu);
grpc_exec_ctx_flush(exec_ctx);
grpc_iomgr_platform_flush();
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index e0338f93c7..d6758f7c3b 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -89,6 +89,12 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer);
/* iomgr internal api for dealing with timers */
+typedef enum {
+ GRPC_TIMERS_NOT_CHECKED,
+ GRPC_TIMERS_CHECKED_AND_EMPTY,
+ GRPC_TIMERS_FIRED,
+} grpc_timer_check_result;
+
/* Check for timers to be run, and run them.
Return true if timer callbacks were executed.
If next is non-null, TRY to update *next with the next running timer
@@ -96,8 +102,8 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer);
*next is never guaranteed to be updated on any given execution; however,
with high probability at least one thread in the system will see an update
at any time slice. */
-bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
- gpr_timespec *next);
+grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
+ gpr_timespec now, gpr_timespec *next);
void grpc_timer_list_init(gpr_timespec now);
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index b28340b71c..288782c060 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -101,8 +101,10 @@ static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
return a + b;
}
-static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
- gpr_atm *next, grpc_error *error);
+static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx,
+ gpr_atm now,
+ gpr_atm *next,
+ grpc_error *error);
static gpr_timespec dbl_to_ts(double d) {
gpr_timespec ts;
@@ -421,9 +423,11 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
return n;
}
-static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
- gpr_atm *next, grpc_error *error) {
- size_t n = 0;
+static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx,
+ gpr_atm now,
+ gpr_atm *next,
+ grpc_error *error) {
+ grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED;
gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer);
gpr_tls_set(&g_last_seen_min_timer, min_timer);
@@ -434,6 +438,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) {
gpr_mu_lock(&g_shared_mutables.mu);
+ result = GRPC_TIMERS_CHECKED_AND_EMPTY;
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR,
@@ -448,14 +453,17 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
/* For efficiency, we pop as many available timers as we can from the
shard. This may violate perfect timer deadline ordering, but that
shouldn't be a big deal because we don't make ordering guarantees. */
- n +=
- pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error);
+ if (pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline,
+ error) > 0) {
+ result = GRPC_TIMERS_FIRED;
+ }
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, " .. popped --> %" PRIdPTR
- ", shard[%d]->min_deadline %" PRIdPTR
- " --> %" PRIdPTR ", now=%" PRIdPTR,
- n, (int)(g_shard_queue[0] - g_shards),
+ gpr_log(GPR_DEBUG,
+ " .. result --> %d"
+ ", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR
+ ", now=%" PRIdPTR,
+ result, (int)(g_shard_queue[0] - g_shards),
g_shard_queue[0]->min_deadline, new_min_deadline, now);
}
@@ -476,26 +484,15 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
g_shard_queue[0]->min_deadline);
gpr_mu_unlock(&g_shared_mutables.mu);
gpr_spinlock_unlock(&g_shared_mutables.checker_mu);
- } else if (next != NULL) {
- /* TODO(ctiller): this forces calling code to do an short poll, and
- then retry the timer check (because this time through the timer list was
- contended).
-
- We could reduce the cost here dramatically by keeping a count of how
- many currently active pollers got through the uncontended case above
- successfully, and waking up other pollers IFF that count drops to zero.
-
- Once that count is in place, this entire else branch could disappear. */
- *next = GPR_MIN(*next, now + 1);
}
GRPC_ERROR_UNREF(error);
- return (int)n;
+ return result;
}
-bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
- gpr_timespec *next) {
+grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
+ gpr_timespec now, gpr_timespec *next) {
// prelude
GPR_ASSERT(now.clock_type == g_clock_type);
gpr_atm now_atm = timespec_to_atm_round_down(now);
@@ -513,7 +510,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
"TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR,
now_atm, min_timer);
}
- return 0;
+ return GRPC_TIMERS_CHECKED_AND_EMPTY;
}
grpc_error *shutdown_error =
@@ -538,7 +535,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_free(next_str);
}
// actual code
- bool r;
+ grpc_timer_check_result r;
gpr_atm next_atm;
if (next == NULL) {
r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error);
@@ -560,7 +557,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
next_str);
gpr_free(next_str);
}
- return r > 0;
+ return r;
}
#endif /* GRPC_TIMER_USE_GENERIC */
diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c
index 24085093e7..767bb64961 100644
--- a/src/core/lib/iomgr/timer_manager.c
+++ b/src/core/lib/iomgr/timer_manager.c
@@ -107,86 +107,116 @@ void grpc_timer_manager_tick() {
grpc_exec_ctx_finish(&exec_ctx);
}
-static void timer_thread(void *unused) {
- // this threads exec_ctx: we try to run things through to completion here
- // since it's easy to spin up new threads
- grpc_exec_ctx exec_ctx =
- GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
+static void run_some_timers(grpc_exec_ctx *exec_ctx) {
+ // if there's something to execute...
+ gpr_mu_lock(&g_mu);
+ // remove a waiter from the pool, and start another thread if necessary
+ --g_waiter_count;
+ if (g_waiter_count == 0 && g_threaded) {
+ start_timer_thread_and_unlock();
+ } else {
+ // if there's no thread waiting with a timeout, kick an existing
+ // waiter
+ // so that the next deadline is not missed
+ if (!g_has_timed_waiter) {
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "kick untimed waiter");
+ }
+ gpr_cv_signal(&g_cv_wait);
+ }
+ gpr_mu_unlock(&g_mu);
+ }
+ // without our lock, flush the exec_ctx
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&g_mu);
+ // garbage collect any threads hanging out that are dead
+ gc_completed_threads();
+ // get ready to wait again
+ ++g_waiter_count;
+ gpr_mu_unlock(&g_mu);
+}
+
+// wait until 'next' (or forever if there is already a timed waiter in the pool)
+// returns true if the thread should continue executing (false if it should
+// shutdown)
+static bool wait_until(gpr_timespec next) {
+ const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ gpr_mu_lock(&g_mu);
+ // if we're not threaded anymore, leave
+ if (!g_threaded) return false;
+ // if there's no timed waiter, we should become one: that waiter waits
+ // only until the next timer should expire
+ // all other timers wait forever
+ uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
+ if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) {
+ g_has_timed_waiter = true;
+ // we use a generation counter to track the timed waiter so we can
+ // cancel an existing one quickly (and when it actually times out it'll
+ // figure stuff out instead of incurring a wakeup)
+ my_timed_waiter_generation = ++g_timed_waiter_generation;
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "sleep for a while");
+ }
+ } else {
+ next = inf_future;
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "sleep until kicked");
+ }
+ }
+ gpr_cv_wait(&g_cv_wait, &g_mu, next);
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
+ my_timed_waiter_generation == g_timed_waiter_generation, g_kicked);
+ }
+ // if this was the timed waiter, then we need to check timers, and flag
+ // that there's now no timed waiter... we'll look for a replacement if
+ // there's work to do after checking timers (code above)
+ if (my_timed_waiter_generation == g_timed_waiter_generation) {
+ g_has_timed_waiter = false;
+ }
+ // if this was a kick from the timer system, consume it (and don't stop
+ // this thread yet)
+ if (g_kicked) {
+ grpc_timer_consume_kick();
+ g_kicked = false;
+ }
+ gpr_mu_unlock(&g_mu);
+ return true;
+}
+
+static void timer_main_loop(grpc_exec_ctx *exec_ctx) {
const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
for (;;) {
gpr_timespec next = inf_future;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
// check timer state, updates next to the next time to run a check
- if (grpc_timer_check(&exec_ctx, now, &next)) {
- // if there's something to execute...
- gpr_mu_lock(&g_mu);
- // remove a waiter from the pool, and start another thread if necessary
- --g_waiter_count;
- if (g_waiter_count == 0 && g_threaded) {
- start_timer_thread_and_unlock();
- } else {
- // if there's no thread waiting with a timeout, kick an existing waiter
- // so that the next deadline is not missed
- if (!g_has_timed_waiter) {
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "kick untimed waiter");
- }
- gpr_cv_signal(&g_cv_wait);
- }
- gpr_mu_unlock(&g_mu);
- }
- // without our lock, flush the exec_ctx
- grpc_exec_ctx_flush(&exec_ctx);
- gpr_mu_lock(&g_mu);
- // garbage collect any threads hanging out that are dead
- gc_completed_threads();
- // get ready to wait again
- ++g_waiter_count;
- gpr_mu_unlock(&g_mu);
- } else {
- gpr_mu_lock(&g_mu);
- // if we're not threaded anymore, leave
- if (!g_threaded) break;
- // if there's no timed waiter, we should become one: that waiter waits
- // only until the next timer should expire
- // all other timers wait forever
- uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
- if (!g_has_timed_waiter) {
- g_has_timed_waiter = true;
- // we use a generation counter to track the timed waiter so we can
- // cancel an existing one quickly (and when it actually times out it'll
- // figure stuff out instead of incurring a wakeup)
- my_timed_waiter_generation = ++g_timed_waiter_generation;
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "sleep for a while");
- }
- } else {
+ switch (grpc_timer_check(exec_ctx, now, &next)) {
+ case GRPC_TIMERS_FIRED:
+ run_some_timers(exec_ctx);
+ break;
+ case GRPC_TIMERS_NOT_CHECKED:
+ /* This case only happens under contention, meaning more than one timer
+ manager thread checked timers concurrently.
+
+ If that happens, we're guaranteed that some other thread has just
+ checked timers, and this will avalanche into some other thread seeing
+ empty timers and doing a timed sleep.
+
+ Consequently, we can just sleep forever here and be happy at some
+ saved wakeup cycles. */
next = inf_future;
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "sleep until kicked");
+ /* fall through */
+ case GRPC_TIMERS_CHECKED_AND_EMPTY:
+ if (!wait_until(next)) {
+ return;
}
- }
- gpr_cv_wait(&g_cv_wait, &g_mu, next);
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
- my_timed_waiter_generation == g_timed_waiter_generation,
- g_kicked);
- }
- // if this was the timed waiter, then we need to check timers, and flag
- // that there's now no timed waiter... we'll look for a replacement if
- // there's work to do after checking timers (code above)
- if (my_timed_waiter_generation == g_timed_waiter_generation) {
- g_has_timed_waiter = false;
- }
- // if this was a kick from the timer system, consume it (and don't stop
- // this thread yet)
- if (g_kicked) {
- grpc_timer_consume_kick();
- g_kicked = false;
- }
- gpr_mu_unlock(&g_mu);
+ break;
}
}
+}
+
+static void timer_thread_cleanup(void) {
+ gpr_mu_lock(&g_mu);
// terminate the thread: drop the waiter count, thread count, and let whomever
// stopped the threading stuff know that we're done
--g_waiter_count;
@@ -199,12 +229,21 @@ static void timer_thread(void *unused) {
ct->next = g_completed_threads;
g_completed_threads = ct;
gpr_mu_unlock(&g_mu);
- grpc_exec_ctx_finish(&exec_ctx);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "End timer thread");
}
}
+static void timer_thread(void *unused) {
+ // this threads exec_ctx: we try to run things through to completion here
+ // since it's easy to spin up new threads
+ grpc_exec_ctx exec_ctx =
+ GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
+ timer_main_loop(&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
+ timer_thread_cleanup();
+}
+
static void start_threads(void) {
gpr_mu_lock(&g_mu);
if (!g_threaded) {
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index 2952e44b58..967e84eb14 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -96,9 +96,9 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
}
}
-bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
- gpr_timespec *next) {
- return false;
+grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
+ gpr_timespec now, gpr_timespec *next) {
+ return GRPC_TIMERS_NOT_CHECKED;
}
void grpc_timer_list_init(gpr_timespec now) {}