diff options
author | Craig Tiller <ctiller@google.com> | 2017-06-07 16:45:59 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-07 16:45:59 -0700 |
commit | 6b19d3b408903a9a91727d1dfd4cd1704433cfe7 (patch) | |
tree | ef53e1954a3b6c4160a3ffe4169726267140d361 /src/core | |
parent | 2f6715d49b04163a9006c4c93b5ef8a1a1e4e98c (diff) | |
parent | 6d7c49f97bc8d4498f33310368fbb54dbcd82b52 (diff) |
Merge pull request #11333 from ctiller/faster_timer_pool
Get rid of zero-length poll case now that timer pool exists
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/iomgr/iomgr.c | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_generic.c | 58 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_manager.c | 187 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_uv.c | 6 | ||||
-rw-r--r-- | src/core/lib/support/log.c | 6 |
6 files changed, 160 insertions, 110 deletions
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 1fd41c2f88..d0071bfc94 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -107,7 +107,8 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) { + if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) == + GRPC_TIMERS_FIRED) { gpr_mu_unlock(&g_mu); grpc_exec_ctx_flush(exec_ctx); grpc_iomgr_platform_flush(); diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index e0338f93c7..d6758f7c3b 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -89,6 +89,12 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); /* iomgr internal api for dealing with timers */ +typedef enum { + GRPC_TIMERS_NOT_CHECKED, + GRPC_TIMERS_CHECKED_AND_EMPTY, + GRPC_TIMERS_FIRED, +} grpc_timer_check_result; + /* Check for timers to be run, and run them. Return true if timer callbacks were executed. If next is non-null, TRY to update *next with the next running timer @@ -96,8 +102,8 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); *next is never guaranteed to be updated on any given execution; however, with high probability at least one thread in the system will see an update at any time slice. */ -bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next); +grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, + gpr_timespec now, gpr_timespec *next); void grpc_timer_list_init(gpr_timespec now); void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index b28340b71c..019cd8f309 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -101,8 +101,10 @@ static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { return a + b; } -static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, - gpr_atm *next, grpc_error *error); +static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, + gpr_atm now, + gpr_atm *next, + grpc_error *error); static gpr_timespec dbl_to_ts(double d) { gpr_timespec ts; @@ -421,19 +423,22 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, return n; } -static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, - gpr_atm *next, grpc_error *error) { - size_t n = 0; +static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, + gpr_atm now, + gpr_atm *next, + grpc_error *error) { + grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED; gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer); gpr_tls_set(&g_last_seen_min_timer, min_timer); if (now < min_timer) { if (next != NULL) *next = GPR_MIN(*next, min_timer); - return 0; + return GRPC_TIMERS_CHECKED_AND_EMPTY; } if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) { gpr_mu_lock(&g_shared_mutables.mu); + result = GRPC_TIMERS_CHECKED_AND_EMPTY; if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR, @@ -448,14 +453,17 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, /* For efficiency, we pop as many available timers as we can from the shard. This may violate perfect timer deadline ordering, but that shouldn't be a big deal because we don't make ordering guarantees. */ - n += - pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error); + if (pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, + error) > 0) { + result = GRPC_TIMERS_FIRED; + } if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, " .. popped --> %" PRIdPTR - ", shard[%d]->min_deadline %" PRIdPTR - " --> %" PRIdPTR ", now=%" PRIdPTR, - n, (int)(g_shard_queue[0] - g_shards), + gpr_log(GPR_DEBUG, + " .. result --> %d" + ", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR + ", now=%" PRIdPTR, + result, (int)(g_shard_queue[0] - g_shards), g_shard_queue[0]->min_deadline, new_min_deadline, now); } @@ -476,26 +484,15 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, g_shard_queue[0]->min_deadline); gpr_mu_unlock(&g_shared_mutables.mu); gpr_spinlock_unlock(&g_shared_mutables.checker_mu); - } else if (next != NULL) { - /* TODO(ctiller): this forces calling code to do an short poll, and - then retry the timer check (because this time through the timer list was - contended). - - We could reduce the cost here dramatically by keeping a count of how - many currently active pollers got through the uncontended case above - successfully, and waking up other pollers IFF that count drops to zero. - - Once that count is in place, this entire else branch could disappear. */ - *next = GPR_MIN(*next, now + 1); } GRPC_ERROR_UNREF(error); - return (int)n; + return result; } -bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next) { +grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, + gpr_timespec now, gpr_timespec *next) { // prelude GPR_ASSERT(now.clock_type == g_clock_type); gpr_atm now_atm = timespec_to_atm_round_down(now); @@ -513,7 +510,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, "TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR, now_atm, min_timer); } - return 0; + return GRPC_TIMERS_CHECKED_AND_EMPTY; } grpc_error *shutdown_error = @@ -538,7 +535,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_free(next_str); } // actual code - bool r; + grpc_timer_check_result r; gpr_atm next_atm; if (next == NULL) { r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); @@ -556,11 +553,10 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, next->tv_nsec, next_atm); } - gpr_log(GPR_DEBUG, "TIMER CHECK END: %d timers triggered; next=%s", r, - next_str); + gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str); gpr_free(next_str); } - return r > 0; + return r; } #endif /* GRPC_TIMER_USE_GENERIC */ diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index 24085093e7..082dfe8299 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -107,86 +107,124 @@ void grpc_timer_manager_tick() { grpc_exec_ctx_finish(&exec_ctx); } -static void timer_thread(void *unused) { - // this threads exec_ctx: we try to run things through to completion here - // since it's easy to spin up new threads - grpc_exec_ctx exec_ctx = - GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); +static void run_some_timers(grpc_exec_ctx *exec_ctx) { + // if there's something to execute... + gpr_mu_lock(&g_mu); + // remove a waiter from the pool, and start another thread if necessary + --g_waiter_count; + if (g_waiter_count == 0 && g_threaded) { + start_timer_thread_and_unlock(); + } else { + // if there's no thread waiting with a timeout, kick an existing + // waiter + // so that the next deadline is not missed + if (!g_has_timed_waiter) { + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "kick untimed waiter"); + } + gpr_cv_signal(&g_cv_wait); + } + gpr_mu_unlock(&g_mu); + } + // without our lock, flush the exec_ctx + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&g_mu); + // garbage collect any threads hanging out that are dead + gc_completed_threads(); + // get ready to wait again + ++g_waiter_count; + gpr_mu_unlock(&g_mu); +} + +// wait until 'next' (or forever if there is already a timed waiter in the pool) +// returns true if the thread should continue executing (false if it should +// shutdown) +static bool wait_until(gpr_timespec next) { + const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); + gpr_mu_lock(&g_mu); + // if we're not threaded anymore, leave + if (!g_threaded) { + gpr_mu_unlock(&g_mu); + return false; + } + // if there's no timed waiter, we should become one: that waiter waits + // only until the next timer should expire + // all other timers wait forever + uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; + if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) { + g_has_timed_waiter = true; + // we use a generation counter to track the timed waiter so we can + // cancel an existing one quickly (and when it actually times out it'll + // figure stuff out instead of incurring a wakeup) + my_timed_waiter_generation = ++g_timed_waiter_generation; + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_timespec wait_time = gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", + wait_time.tv_sec, wait_time.tv_nsec); + } + } else { + next = inf_future; + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "sleep until kicked"); + } + } + gpr_cv_wait(&g_cv_wait, &g_mu, next); + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", + my_timed_waiter_generation == g_timed_waiter_generation, g_kicked); + } + // if this was the timed waiter, then we need to check timers, and flag + // that there's now no timed waiter... we'll look for a replacement if + // there's work to do after checking timers (code above) + if (my_timed_waiter_generation == g_timed_waiter_generation) { + g_has_timed_waiter = false; + } + // if this was a kick from the timer system, consume it (and don't stop + // this thread yet) + if (g_kicked) { + grpc_timer_consume_kick(); + g_kicked = false; + } + gpr_mu_unlock(&g_mu); + return true; +} + +static void timer_main_loop(grpc_exec_ctx *exec_ctx) { const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); for (;;) { gpr_timespec next = inf_future; gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); // check timer state, updates next to the next time to run a check - if (grpc_timer_check(&exec_ctx, now, &next)) { - // if there's something to execute... - gpr_mu_lock(&g_mu); - // remove a waiter from the pool, and start another thread if necessary - --g_waiter_count; - if (g_waiter_count == 0 && g_threaded) { - start_timer_thread_and_unlock(); - } else { - // if there's no thread waiting with a timeout, kick an existing waiter - // so that the next deadline is not missed - if (!g_has_timed_waiter) { - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "kick untimed waiter"); - } - gpr_cv_signal(&g_cv_wait); - } - gpr_mu_unlock(&g_mu); - } - // without our lock, flush the exec_ctx - grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(&g_mu); - // garbage collect any threads hanging out that are dead - gc_completed_threads(); - // get ready to wait again - ++g_waiter_count; - gpr_mu_unlock(&g_mu); - } else { - gpr_mu_lock(&g_mu); - // if we're not threaded anymore, leave - if (!g_threaded) break; - // if there's no timed waiter, we should become one: that waiter waits - // only until the next timer should expire - // all other timers wait forever - uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; - if (!g_has_timed_waiter) { - g_has_timed_waiter = true; - // we use a generation counter to track the timed waiter so we can - // cancel an existing one quickly (and when it actually times out it'll - // figure stuff out instead of incurring a wakeup) - my_timed_waiter_generation = ++g_timed_waiter_generation; + switch (grpc_timer_check(exec_ctx, now, &next)) { + case GRPC_TIMERS_FIRED: + run_some_timers(exec_ctx); + break; + case GRPC_TIMERS_NOT_CHECKED: + /* This case only happens under contention, meaning more than one timer + manager thread checked timers concurrently. + + If that happens, we're guaranteed that some other thread has just + checked timers, and this will avalanche into some other thread seeing + empty timers and doing a timed sleep. + + Consequently, we can just sleep forever here and be happy at some + saved wakeup cycles. */ if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "sleep for a while"); + gpr_log(GPR_DEBUG, "timers not checked: expect another thread to"); } - } else { next = inf_future; - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "sleep until kicked"); + /* fall through */ + case GRPC_TIMERS_CHECKED_AND_EMPTY: + if (!wait_until(next)) { + return; } - } - gpr_cv_wait(&g_cv_wait, &g_mu, next); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", - my_timed_waiter_generation == g_timed_waiter_generation, - g_kicked); - } - // if this was the timed waiter, then we need to check timers, and flag - // that there's now no timed waiter... we'll look for a replacement if - // there's work to do after checking timers (code above) - if (my_timed_waiter_generation == g_timed_waiter_generation) { - g_has_timed_waiter = false; - } - // if this was a kick from the timer system, consume it (and don't stop - // this thread yet) - if (g_kicked) { - grpc_timer_consume_kick(); - g_kicked = false; - } - gpr_mu_unlock(&g_mu); + break; } } +} + +static void timer_thread_cleanup(void) { + gpr_mu_lock(&g_mu); // terminate the thread: drop the waiter count, thread count, and let whomever // stopped the threading stuff know that we're done --g_waiter_count; @@ -199,12 +237,21 @@ static void timer_thread(void *unused) { ct->next = g_completed_threads; g_completed_threads = ct; gpr_mu_unlock(&g_mu); - grpc_exec_ctx_finish(&exec_ctx); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "End timer thread"); } } +static void timer_thread(void *unused) { + // this threads exec_ctx: we try to run things through to completion here + // since it's easy to spin up new threads + grpc_exec_ctx exec_ctx = + GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); + timer_main_loop(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); + timer_thread_cleanup(); +} + static void start_threads(void) { gpr_mu_lock(&g_mu); if (!g_threaded) { diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index 2952e44b58..967e84eb14 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -96,9 +96,9 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { } } -bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next) { - return false; +grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, + gpr_timespec now, gpr_timespec *next) { + return GRPC_TIMERS_NOT_CHECKED; } void grpc_timer_list_init(gpr_timespec now) {} diff --git a/src/core/lib/support/log.c b/src/core/lib/support/log.c index af1651dae5..4d15553768 100644 --- a/src/core/lib/support/log.c +++ b/src/core/lib/support/log.c @@ -43,7 +43,7 @@ #include <string.h> extern void gpr_default_log(gpr_log_func_args *args); -static gpr_log_func g_log_func = gpr_default_log; +static gpr_atm g_log_func = (gpr_atm)gpr_default_log; static gpr_atm g_min_severity_to_print = GPR_LOG_VERBOSITY_UNSET; const char *gpr_log_severity_string(gpr_log_severity severity) { @@ -70,7 +70,7 @@ void gpr_log_message(const char *file, int line, gpr_log_severity severity, lfargs.line = line; lfargs.severity = severity; lfargs.message = message; - g_log_func(&lfargs); + ((gpr_log_func)gpr_atm_no_barrier_load(&g_log_func))(&lfargs); } void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print) { @@ -99,5 +99,5 @@ void gpr_log_verbosity_init() { } void gpr_set_log_function(gpr_log_func f) { - g_log_func = f ? f : gpr_default_log; + gpr_atm_no_barrier_store(&g_log_func, (gpr_atm)(f ? f : gpr_default_log)); } |