aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/timer_generic.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/timer_generic.cc')
-rw-r--r--src/core/lib/iomgr/timer_generic.cc174
1 files changed, 123 insertions, 51 deletions
diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc
index 0c6f236f83..4294162af7 100644
--- a/src/core/lib/iomgr/timer_generic.cc
+++ b/src/core/lib/iomgr/timer_generic.cc
@@ -34,6 +34,7 @@
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/time_averaged_stats.h"
#include "src/core/lib/iomgr/timer_heap.h"
@@ -59,9 +60,9 @@ typedef struct {
gpr_mu mu;
grpc_time_averaged_stats stats;
/* All and only timers with deadlines <= this will be in the heap. */
- gpr_atm queue_deadline_cap;
+ grpc_millis queue_deadline_cap;
/* The deadline of the next timer due in this shard */
- gpr_atm min_deadline;
+ grpc_millis min_deadline;
/* Index of this timer_shard in the g_shard_queue */
uint32_t shard_queue_index;
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this
@@ -209,15 +210,23 @@ static void validate_non_pending_timer(grpc_timer* t) {
#endif
+#if GPR_ARCH_64
+/* NOTE: TODO(sreek) - Currently the thread local storage support in grpc is
+ for intptr_t which means on 32-bit machines it is not wide enough to hold
+ grpc_millis which is 64-bit. Adding thread local support for 64 bit values
+ is a lot of work for very little gain. So we are currently restricting this
+ optimization to only 64 bit machines */
+
/* Thread local variable that stores the deadline of the next timer the thread
* has last-seen. This is an optimization to prevent the thread from checking
* shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
* an expensive operation) */
GPR_TLS_DECL(g_last_seen_min_timer);
+#endif
struct shared_mutables {
/* The deadline of the next timer due across all timer shards */
- gpr_atm min_timer;
+ grpc_millis min_timer;
/* Allow only one run_some_expired_timers at once */
gpr_spinlock checker_mu;
bool initialized;
@@ -227,18 +236,18 @@ struct shared_mutables {
static struct shared_mutables g_shared_mutables;
-static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
- if (a > GPR_ATM_MAX - b) {
- return GPR_ATM_MAX;
+static grpc_millis saturating_add(grpc_millis a, grpc_millis b) {
+ if (a > GRPC_MILLIS_INF_FUTURE - b) {
+ return GRPC_MILLIS_INF_FUTURE;
}
return a + b;
}
-static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
- gpr_atm* next,
+static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
+ grpc_millis* next,
grpc_error* error);
-static gpr_atm compute_min_deadline(timer_shard* shard) {
+static grpc_millis compute_min_deadline(timer_shard* shard) {
return grpc_timer_heap_is_empty(&shard->heap)
? saturating_add(shard->queue_deadline_cap, 1)
: grpc_timer_heap_top(&shard->heap)->deadline;
@@ -257,8 +266,11 @@ static void timer_list_init() {
g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
gpr_mu_init(&g_shared_mutables.mu);
g_shared_mutables.min_timer = grpc_core::ExecCtx::Get()->Now();
+
+#if GPR_ARCH_64
gpr_tls_init(&g_last_seen_min_timer);
gpr_tls_set(&g_last_seen_min_timer, 0);
+#endif
for (i = 0; i < g_num_shards; i++) {
timer_shard* shard = &g_shards[i];
@@ -287,7 +299,11 @@ static void timer_list_shutdown() {
grpc_timer_heap_destroy(&shard->heap);
}
gpr_mu_destroy(&g_shared_mutables.mu);
+
+#if GPR_ARCH_64
gpr_tls_destroy(&g_last_seen_min_timer);
+#endif
+
gpr_free(g_shards);
gpr_free(g_shard_queue);
g_shared_mutables.initialized = false;
@@ -346,9 +362,9 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
#endif
if (grpc_timer_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer,
- deadline, grpc_core::ExecCtx::Get()->Now(), closure, closure->cb);
+ gpr_log(GPR_INFO, "TIMER %p: SET %" PRId64 " now %" PRId64 " call %p[%p]",
+ timer, deadline, grpc_core::ExecCtx::Get()->Now(), closure,
+ closure->cb);
}
if (!g_shared_mutables.initialized) {
@@ -382,8 +398,8 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
list_join(&shard->list, timer);
}
if (grpc_timer_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- " .. add to shard %d with queue_deadline_cap=%" PRIdPTR
+ gpr_log(GPR_INFO,
+ " .. add to shard %d with queue_deadline_cap=%" PRId64
" => is_first_timer=%s",
static_cast<int>(shard - g_shards), shard->queue_deadline_cap,
is_first_timer ? "true" : "false");
@@ -404,15 +420,27 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
if (is_first_timer) {
gpr_mu_lock(&g_shared_mutables.mu);
if (grpc_timer_trace.enabled()) {
- gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR,
+ gpr_log(GPR_INFO, " .. old shard min_deadline=%" PRId64,
shard->min_deadline);
}
if (deadline < shard->min_deadline) {
- gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline;
+ grpc_millis old_min_deadline = g_shard_queue[0]->min_deadline;
shard->min_deadline = deadline;
note_deadline_change(shard);
if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
- gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline);
+#if GPR_ARCH_64
+ // TODO: sreek - Using c-style cast here. static_cast<> gives an error
+ // (on mac platforms complaining that gpr_atm* is (long *) while
+ // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
+ // safe since we know that both are pointer types and 64-bit wide.
+ gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
+ deadline);
+#else
+ // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
+ // types (like grpc_millis). So all reads and writes to
+ // g_shared_mutables.min_timer varialbe under g_shared_mutables.mu
+ g_shared_mutables.min_timer = deadline;
+#endif
grpc_kick_poller();
}
}
@@ -421,8 +449,10 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
}
static void timer_consume_kick(void) {
- /* force re-evaluation of last seeen min */
+#if GPR_ARCH_64
+ /* Force re-evaluation of last seen min */
gpr_tls_set(&g_last_seen_min_timer, 0);
+#endif
}
static void timer_cancel(grpc_timer* timer) {
@@ -434,7 +464,7 @@ static void timer_cancel(grpc_timer* timer) {
timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
gpr_mu_lock(&shard->mu);
if (grpc_timer_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,
+ gpr_log(GPR_INFO, "TIMER %p: CANCEL pending=%s", timer,
timer->pending ? "true" : "false");
}
@@ -459,7 +489,7 @@ static void timer_cancel(grpc_timer* timer) {
'queue_deadline_cap') into into shard->heap.
Returns 'true' if shard->heap has atleast ONE element
REQUIRES: shard->mu locked */
-static int refill_heap(timer_shard* shard, gpr_atm now) {
+static int refill_heap(timer_shard* shard, grpc_millis now) {
/* Compute the new queue window width and bound by the limits: */
double computed_deadline_delta =
grpc_time_averaged_stats_update_average(&shard->stats) *
@@ -472,10 +502,10 @@ static int refill_heap(timer_shard* shard, gpr_atm now) {
/* Compute the new cap and put all timers under it into the queue: */
shard->queue_deadline_cap =
saturating_add(GPR_MAX(now, shard->queue_deadline_cap),
- static_cast<gpr_atm>(deadline_delta * 1000.0));
+ static_cast<grpc_millis>(deadline_delta * 1000.0));
if (grpc_timer_check_trace.enabled()) {
- gpr_log(GPR_DEBUG, " .. shard[%d]->queue_deadline_cap --> %" PRIdPTR,
+ gpr_log(GPR_INFO, " .. shard[%d]->queue_deadline_cap --> %" PRId64,
static_cast<int>(shard - g_shards), shard->queue_deadline_cap);
}
for (timer = shard->list.next; timer != &shard->list; timer = next) {
@@ -483,7 +513,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) {
if (timer->deadline < shard->queue_deadline_cap) {
if (grpc_timer_check_trace.enabled()) {
- gpr_log(GPR_DEBUG, " .. add timer with deadline %" PRIdPTR " to heap",
+ gpr_log(GPR_INFO, " .. add timer with deadline %" PRId64 " to heap",
timer->deadline);
}
list_remove(timer);
@@ -496,11 +526,11 @@ static int refill_heap(timer_shard* shard, gpr_atm now) {
/* This pops the next non-cancelled timer with deadline <= now from the
queue, or returns NULL if there isn't one.
REQUIRES: shard->mu locked */
-static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) {
+static grpc_timer* pop_one(timer_shard* shard, grpc_millis now) {
grpc_timer* timer;
for (;;) {
if (grpc_timer_check_trace.enabled()) {
- gpr_log(GPR_DEBUG, " .. shard[%d]: heap_empty=%s",
+ gpr_log(GPR_INFO, " .. shard[%d]: heap_empty=%s",
static_cast<int>(shard - g_shards),
grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false");
}
@@ -510,13 +540,13 @@ static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) {
}
timer = grpc_timer_heap_top(&shard->heap);
if (grpc_timer_check_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- " .. check top timer deadline=%" PRIdPTR " now=%" PRIdPTR,
+ gpr_log(GPR_INFO,
+ " .. check top timer deadline=%" PRId64 " now=%" PRId64,
timer->deadline, now);
}
if (timer->deadline > now) return nullptr;
if (grpc_timer_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late via %s scheduler",
+ gpr_log(GPR_INFO, "TIMER %p: FIRE %" PRId64 "ms late via %s scheduler",
timer, now - timer->deadline,
timer->closure->scheduler->vtable->name);
}
@@ -527,8 +557,8 @@ static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) {
}
/* REQUIRES: shard->mu unlocked */
-static size_t pop_timers(timer_shard* shard, gpr_atm now,
- gpr_atm* new_min_deadline, grpc_error* error) {
+static size_t pop_timers(timer_shard* shard, grpc_millis now,
+ grpc_millis* new_min_deadline, grpc_error* error) {
size_t n = 0;
grpc_timer* timer;
gpr_mu_lock(&shard->mu);
@@ -540,19 +570,33 @@ static size_t pop_timers(timer_shard* shard, gpr_atm now,
*new_min_deadline = compute_min_deadline(shard);
gpr_mu_unlock(&shard->mu);
if (grpc_timer_check_trace.enabled()) {
- gpr_log(GPR_DEBUG, " .. shard[%d] popped %" PRIdPTR,
+ gpr_log(GPR_INFO, " .. shard[%d] popped %" PRIdPTR,
static_cast<int>(shard - g_shards), n);
}
return n;
}
-static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
- gpr_atm* next,
+static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
+ grpc_millis* next,
grpc_error* error) {
grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED;
- gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer);
+#if GPR_ARCH_64
+ // TODO: sreek - Using c-style cast here. static_cast<> gives an error (on
+ // mac platforms complaining that gpr_atm* is (long *) while
+ // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
+ // safe since we know that both are pointer types and 64-bit wide
+ grpc_millis min_timer = static_cast<grpc_millis>(
+ gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer)));
gpr_tls_set(&g_last_seen_min_timer, min_timer);
+#else
+ // On 32-bit systems, gpr_atm_no_barrier_load does not work on 64-bit types
+ // (like grpc_millis). So all reads and writes to g_shared_mutables.min_timer
+ // are done under g_shared_mutables.mu
+ gpr_mu_lock(&g_shared_mutables.mu);
+ grpc_millis min_timer = g_shared_mutables.min_timer;
+ gpr_mu_unlock(&g_shared_mutables.mu);
+#endif
if (now < min_timer) {
if (next != nullptr) *next = GPR_MIN(*next, min_timer);
return GRPC_TIMERS_CHECKED_AND_EMPTY;
@@ -563,14 +607,15 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
result = GRPC_TIMERS_CHECKED_AND_EMPTY;
if (grpc_timer_check_trace.enabled()) {
- gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR,
+ gpr_log(GPR_INFO, " .. shard[%d]->min_deadline = %" PRId64,
static_cast<int>(g_shard_queue[0] - g_shards),
g_shard_queue[0]->min_deadline);
}
while (g_shard_queue[0]->min_deadline < now ||
- (now != GPR_ATM_MAX && g_shard_queue[0]->min_deadline == now)) {
- gpr_atm new_min_deadline;
+ (now != GRPC_MILLIS_INF_FUTURE &&
+ g_shard_queue[0]->min_deadline == now)) {
+ grpc_millis new_min_deadline;
/* For efficiency, we pop as many available timers as we can from the
shard. This may violate perfect timer deadline ordering, but that
@@ -580,10 +625,10 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
}
if (grpc_timer_check_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
" .. result --> %d"
- ", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR
- ", now=%" PRIdPTR,
+ ", shard[%d]->min_deadline %" PRId64 " --> %" PRId64
+ ", now=%" PRId64,
result, static_cast<int>(g_shard_queue[0] - g_shards),
g_shard_queue[0]->min_deadline, new_min_deadline, now);
}
@@ -601,8 +646,19 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
*next = GPR_MIN(*next, g_shard_queue[0]->min_deadline);
}
- gpr_atm_no_barrier_store(&g_shared_mutables.min_timer,
+#if GPR_ARCH_64
+ // TODO: sreek - Using c-style cast here. static_cast<> gives an error (on
+ // mac platforms complaining that gpr_atm* is (long *) while
+ // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
+ // safe since we know that both are pointer types and 64-bit wide
+ gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
g_shard_queue[0]->min_deadline);
+#else
+ // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
+ // types (like grpc_millis). So all reads and writes to
+ // g_shared_mutables.min_timer are done under g_shared_mutables.mu
+ g_shared_mutables.min_timer = g_shard_queue[0]->min_deadline;
+#endif
gpr_mu_unlock(&g_shared_mutables.mu);
gpr_spinlock_unlock(&g_shared_mutables.checker_mu);
}
@@ -616,17 +672,28 @@ static grpc_timer_check_result timer_check(grpc_millis* next) {
// prelude
grpc_millis now = grpc_core::ExecCtx::Get()->Now();
+#if GPR_ARCH_64
/* fetch from a thread-local first: this avoids contention on a globally
mutable cacheline in the common case */
grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer);
+#else
+ // On 32-bit systems, we currently do not have thread local support for 64-bit
+ // types. In this case, directly read from g_shared_mutables.min_timer.
+ // Also, note that on 32-bit systems, gpr_atm_no_barrier_store does not work
+ // on 64-bit types (like grpc_millis). So all reads and writes to
+ // g_shared_mutables.min_timer are done under g_shared_mutables.mu
+ gpr_mu_lock(&g_shared_mutables.mu);
+ grpc_millis min_timer = g_shared_mutables.min_timer;
+ gpr_mu_unlock(&g_shared_mutables.mu);
+#endif
+
if (now < min_timer) {
if (next != nullptr) {
*next = GPR_MIN(*next, min_timer);
}
if (grpc_timer_check_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now,
- min_timer);
+ gpr_log(GPR_INFO, "TIMER CHECK SKIP: now=%" PRId64 " min_timer=%" PRId64,
+ now, min_timer);
}
return GRPC_TIMERS_CHECKED_AND_EMPTY;
}
@@ -642,13 +709,18 @@ static grpc_timer_check_result timer_check(grpc_millis* next) {
if (next == nullptr) {
next_str = gpr_strdup("NULL");
} else {
- gpr_asprintf(&next_str, "%" PRIdPTR, *next);
+ gpr_asprintf(&next_str, "%" PRId64, *next);
}
- gpr_log(GPR_DEBUG,
- "TIMER CHECK BEGIN: now=%" PRIdPTR " next=%s tls_min=%" PRIdPTR
+#if GPR_ARCH_64
+ gpr_log(GPR_INFO,
+ "TIMER CHECK BEGIN: now=%" PRId64 " next=%s tls_min=%" PRId64
" glob_min=%" PRIdPTR,
- now, next_str, gpr_tls_get(&g_last_seen_min_timer),
- gpr_atm_no_barrier_load(&g_shared_mutables.min_timer));
+ now, next_str, min_timer,
+ gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer)));
+#else
+ gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s min=%" PRId64,
+ now, next_str, min_timer);
+#endif
gpr_free(next_str);
}
// actual code
@@ -660,9 +732,9 @@ static grpc_timer_check_result timer_check(grpc_millis* next) {
if (next == nullptr) {
next_str = gpr_strdup("NULL");
} else {
- gpr_asprintf(&next_str, "%" PRIdPTR, *next);
+ gpr_asprintf(&next_str, "%" PRId64, *next);
}
- gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str);
+ gpr_log(GPR_INFO, "TIMER CHECK END: r=%d; next=%s", r, next_str);
gpr_free(next_str);
}
return r;