diff options
author | 2017-03-16 16:25:12 -0700 | |
---|---|---|
committer | 2017-03-16 16:25:12 -0700 | |
commit | 7b2dd93362099eef75b49fe33b93692bb148b93f (patch) | |
tree | 550d87c1a8ba64d4728e1a00bc97ac35686c32a0 /src/core | |
parent | da73580d2c61134529d31651cc18584ac5924227 (diff) |
Track milliseconds since process start in timer heap
Allows reducing a lock-then-check to an atomic load and check on the
fast path of timer checks.
Reduces locks per RPC by about 5.
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/iomgr/timer_generic.c | 111 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_generic.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_heap.c | 16 |
3 files changed, 74 insertions, 55 deletions
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index d4df96c214..090b4dc2d4 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -56,8 +56,8 @@ typedef struct { gpr_mu mu; grpc_time_averaged_stats stats; /* All and only timers with deadlines <= this will be in the heap. */ - gpr_timespec queue_deadline_cap; - gpr_timespec min_deadline; + gpr_atm queue_deadline_cap; + gpr_atm min_deadline; /* Index in the g_shard_queue */ uint32_t shard_queue_index; /* This holds all timers with deadlines < queue_deadline_cap. Timers in this @@ -76,11 +76,32 @@ static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ static shard_type *g_shard_queue[NUM_SHARDS]; static bool g_initialized = false; +static gpr_timespec g_start_time; +static gpr_atm g_min_timer; -static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next, grpc_error *error); +static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, + gpr_atm *next, grpc_error *error); -static gpr_timespec compute_min_deadline(shard_type *shard) { +static gpr_timespec dbl_to_ts(double d) { + gpr_timespec ts; + ts.tv_sec = (int64_t)d; + ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); + ts.clock_type = GPR_TIMESPAN; + return ts; +} + +static gpr_atm timespec_to_atm(gpr_timespec ts) { + double x = gpr_timespec_to_micros(gpr_time_sub(ts, g_start_time)) / 1000.0; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +static gpr_timespec atm_to_timespec(gpr_atm x) { + return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); +} + +static gpr_atm compute_min_deadline(shard_type *shard) { return grpc_timer_heap_is_empty(&shard->heap) ? shard->queue_deadline_cap : grpc_timer_heap_top(&shard->heap)->deadline; @@ -92,13 +113,15 @@ void grpc_timer_list_init(gpr_timespec now) { g_initialized = true; gpr_mu_init(&g_mu); g_clock_type = now.clock_type; + g_start_time = now; + g_min_timer = timespec_to_atm(now); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_init(&shard->mu); grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, 0.5); - shard->queue_deadline_cap = now; + shard->queue_deadline_cap = timespec_to_atm(now); shard->shard_queue_index = i; grpc_timer_heap_init(&shard->heap); shard->list.next = shard->list.prev = &shard->list; @@ -109,7 +132,7 @@ void grpc_timer_list_init(gpr_timespec now) { void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { int i; - run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, + run_some_expired_timers(exec_ctx, GPR_ATM_MAX, NULL, GRPC_ERROR_CREATE("Timer list shutdown")); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -124,14 +147,6 @@ static double ts_to_dbl(gpr_timespec ts) { return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; } -static gpr_timespec dbl_to_ts(double d) { - gpr_timespec ts; - ts.tv_sec = (int64_t)d; - ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); - ts.clock_type = GPR_TIMESPAN; - return ts; -} - static void list_join(grpc_timer *head, grpc_timer *timer) { timer->next = head; timer->prev = head->prev; @@ -157,15 +172,13 @@ static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { static void note_deadline_change(shard_type *shard) { while (shard->shard_queue_index > 0 && - gpr_time_cmp( - shard->min_deadline, - g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) { + shard->min_deadline < + g_shard_queue[shard->shard_queue_index - 1]->min_deadline) { swap_adjacent_shards_in_queue(shard->shard_queue_index - 1); } while (shard->shard_queue_index < NUM_SHARDS - 1 && - gpr_time_cmp( - shard->min_deadline, - g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) { + shard->min_deadline > + g_shard_queue[shard->shard_queue_index + 1]->min_deadline) { swap_adjacent_shards_in_queue(shard->shard_queue_index); } } @@ -178,7 +191,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; - timer->deadline = deadline; + timer->deadline = timespec_to_atm(deadline); if (!g_initialized) { timer->pending = false; @@ -200,7 +213,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, grpc_time_averaged_stats_add_sample(&shard->stats, ts_to_dbl(gpr_time_sub(deadline, now))); - if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) { + if (timer->deadline < shard->queue_deadline_cap) { is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { timer->heap_index = INVALID_HEAP_INDEX; @@ -221,12 +234,12 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, grpc_timer_check. */ if (is_first_timer) { gpr_mu_lock(&g_mu); - if (gpr_time_cmp(deadline, shard->min_deadline) < 0) { - gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline; - shard->min_deadline = deadline; + if (timer->deadline < shard->min_deadline) { + gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; + shard->min_deadline = timer->deadline; note_deadline_change(shard); - if (shard->shard_queue_index == 0 && - gpr_time_cmp(deadline, old_min_deadline) < 0) { + if (shard->shard_queue_index == 0 && timer->deadline < old_min_deadline) { + gpr_atm_no_barrier_store(&g_min_timer, timer->deadline); grpc_kick_poller(); } } @@ -259,7 +272,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { for timers that fall at or under it. Returns true if the queue is no longer empty. REQUIRES: shard->mu locked */ -static int refill_queue(shard_type *shard, gpr_timespec now) { +static int refill_queue(shard_type *shard, gpr_atm now) { /* Compute the new queue window width and bound by the limits: */ double computed_deadline_delta = grpc_time_averaged_stats_update_average(&shard->stats) * @@ -270,12 +283,12 @@ static int refill_queue(shard_type *shard, gpr_timespec now) { grpc_timer *timer, *next; /* Compute the new cap and put all timers under it into the queue: */ - shard->queue_deadline_cap = gpr_time_add( - gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta)); + shard->queue_deadline_cap = GPR_MAX(now, shard->queue_deadline_cap) + + (gpr_atm)(deadline_delta * 1000.0); for (timer = shard->list.next; timer != &shard->list; timer = next) { next = timer->next; - if (gpr_time_cmp(timer->deadline, shard->queue_deadline_cap) < 0) { + if (timer->deadline < shard->queue_deadline_cap) { list_remove(timer); grpc_timer_heap_add(&shard->heap, timer); } @@ -286,15 +299,15 @@ static int refill_queue(shard_type *shard, gpr_timespec now) { /* This pops the next non-cancelled timer with deadline <= now from the queue, or returns NULL if there isn't one. REQUIRES: shard->mu locked */ -static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { +static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { grpc_timer *timer; for (;;) { if (grpc_timer_heap_is_empty(&shard->heap)) { - if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL; + if (now < shard->queue_deadline_cap) return NULL; if (!refill_queue(shard, now)) return NULL; } timer = grpc_timer_heap_top(&shard->heap); - if (gpr_time_cmp(timer->deadline, now) > 0) return NULL; + if (timer->deadline > now) return NULL; timer->pending = false; grpc_timer_heap_pop(&shard->heap); return timer; @@ -303,7 +316,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { /* REQUIRES: shard->mu unlocked */ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, - gpr_timespec now, gpr_timespec *new_min_deadline, + gpr_atm now, gpr_atm *new_min_deadline, grpc_error *error) { size_t n = 0; grpc_timer *timer; @@ -317,17 +330,19 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, return n; } -static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next, grpc_error *error) { +static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, + gpr_atm *next, grpc_error *error) { size_t n = 0; - /* TODO(ctiller): verify that there are any timers (atomically) here */ + if (now < gpr_atm_no_barrier_load(&g_min_timer)) { + return 0; + } if (gpr_spinlock_trylock(&g_checker_mu)) { gpr_mu_lock(&g_mu); - while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) { - gpr_timespec new_min_deadline; + while (g_shard_queue[0]->min_deadline < now) { + gpr_atm new_min_deadline; /* For efficiency, we pop as many available timers as we can from the shard. This may violate perfect timer deadline ordering, but that @@ -345,9 +360,10 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, } if (next) { - *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline); + *next = GPR_MIN(*next, g_shard_queue[0]->min_deadline); } + gpr_atm_no_barrier_store(&g_min_timer, g_shard_queue[0]->min_deadline); gpr_mu_unlock(&g_mu); gpr_spinlock_unlock(&g_checker_mu); } else if (next != NULL) { @@ -360,8 +376,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, successfully, and waking up other pollers IFF that count drops to zero. Once that count is in place, this entire else branch could disappear. */ - *next = gpr_time_min( - *next, gpr_time_add(now, gpr_time_from_millis(1, GPR_TIMESPAN))); + *next = GPR_MIN(*next, now + 1); } GRPC_ERROR_UNREF(error); @@ -372,11 +387,15 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next) { GPR_ASSERT(now.clock_type == g_clock_type); - return run_some_expired_timers( - exec_ctx, now, next, + gpr_atm now_atm = timespec_to_atm(now); + gpr_atm next_atm; + bool r = run_some_expired_timers( + exec_ctx, now_atm, &next_atm, gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("Shutting down timer system")); + if (next != NULL) *next = atm_to_timespec(next_atm); + return r; } #endif /* GRPC_TIMER_USE_GENERIC */ diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h index 1608dce9fb..c79a431aa0 100644 --- a/src/core/lib/iomgr/timer_generic.h +++ b/src/core/lib/iomgr/timer_generic.h @@ -38,7 +38,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" struct grpc_timer { - gpr_timespec deadline; + gpr_atm deadline; uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */ bool pending; struct grpc_timer *next; diff --git a/src/core/lib/iomgr/timer_heap.c b/src/core/lib/iomgr/timer_heap.c index f736d335e6..03ccfe023a 100644 --- a/src/core/lib/iomgr/timer_heap.c +++ b/src/core/lib/iomgr/timer_heap.c @@ -50,7 +50,7 @@ static void adjust_upwards(grpc_timer **first, uint32_t i, grpc_timer *t) { while (i > 0) { uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(first[parent]->deadline, t->deadline) <= 0) break; + if (first[parent]->deadline <= t->deadline) break; first[i] = first[parent]; first[i]->heap_index = i; i = parent; @@ -68,12 +68,12 @@ static void adjust_downwards(grpc_timer **first, uint32_t i, uint32_t length, uint32_t left_child = 1u + 2u * i; if (left_child >= length) break; uint32_t right_child = left_child + 1; - uint32_t next_i = right_child < length && - gpr_time_cmp(first[left_child]->deadline, - first[right_child]->deadline) > 0 - ? right_child - : left_child; - if (gpr_time_cmp(t->deadline, first[next_i]->deadline) <= 0) break; + uint32_t next_i = + right_child < length && + first[left_child]->deadline > first[right_child]->deadline + ? right_child + : left_child; + if (t->deadline <= first[next_i]->deadline) break; first[i] = first[next_i]; first[i]->heap_index = i; i = next_i; @@ -97,7 +97,7 @@ static void maybe_shrink(grpc_timer_heap *heap) { static void note_changed_priority(grpc_timer_heap *heap, grpc_timer *timer) { uint32_t i = timer->heap_index; uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(heap->timers[parent]->deadline, timer->deadline) > 0) { + if (heap->timers[parent]->deadline > timer->deadline) { adjust_upwards(heap->timers, i, timer); } else { adjust_downwards(heap->timers, i, heap->timer_count, timer); |