aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/timer_generic.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-03-29 13:35:37 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-03-29 13:35:37 -0700
commitfac02603a82f16b9b1865cbc66fab00ead0fe701 (patch)
tree793dd251b06de79354b5d7870079f82db479833b /src/core/lib/iomgr/timer_generic.c
parent883243ae97cfbc58a3e125b72bae7b6e6433ed49 (diff)
parentd9bc2bf8252c6a29fb7fcc21be22f7ccd285e619 (diff)
Merge branch 'atomic-timers' of github.com:ctiller/grpc into atomic-timers
Diffstat (limited to 'src/core/lib/iomgr/timer_generic.c')
-rw-r--r--src/core/lib/iomgr/timer_generic.c54
1 files changed, 35 insertions, 19 deletions
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index 6bbe4a507a..e3e0e7fc3f 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -188,12 +188,10 @@ static double ts_to_dbl(gpr_timespec ts) {
}
/* returns true if the first element in the list */
-static bool list_join(grpc_timer *head, grpc_timer *timer) {
- bool is_first = head->next == head;
+static void list_join(grpc_timer *head, grpc_timer *timer) {
timer->next = head;
timer->prev = head->prev;
timer->next->prev = timer->prev->next = timer;
- return is_first;
}
static void list_remove(grpc_timer *timer) {
@@ -268,8 +266,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
} else {
timer->heap_index = INVALID_HEAP_INDEX;
- is_first_timer = list_join(&shard->list, timer) &&
- grpc_timer_heap_is_empty(&shard->heap);
+ list_join(&shard->list, timer);
}
if (grpc_timer_trace) {
gpr_log(GPR_DEBUG, " .. add to shard %d with queue_deadline_cap=%" PRIdPTR
@@ -322,6 +319,10 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
gpr_mu_lock(&shard->mu);
+ if (grpc_timer_trace) {
+ gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,
+ timer->pending ? "true" : "false");
+ }
if (timer->pending) {
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
timer->pending = false;
@@ -395,6 +396,10 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
timer->deadline, now);
}
if (timer->deadline > now) return NULL;
+ if (grpc_timer_trace) {
+ gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late", timer,
+ now - timer->deadline);
+ }
timer->pending = false;
grpc_timer_heap_pop(&shard->heap);
return timer;
@@ -421,15 +426,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
gpr_atm *next, grpc_error *error) {
size_t n = 0;
- /* fetch from a thread-local first: this avoids contention on a globally
- mutable cacheline in the common case */
- gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer);
- if (now < min_timer) {
- if (next != NULL) *next = GPR_MIN(*next, min_timer);
- return 0;
- }
-
- min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer);
+ gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer);
gpr_tls_set(&g_last_seen_min_timer, min_timer);
if (now < min_timer) {
if (next != NULL) *next = GPR_MIN(*next, min_timer);
@@ -445,7 +442,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
g_shard_queue[0]->min_deadline);
}
- while (g_shard_queue[0]->min_deadline < now) {
+ while (g_shard_queue[0]->min_deadline < now ||
+ (now != GPR_ATM_MAX && g_shard_queue[0]->min_deadline == now)) {
gpr_atm new_min_deadline;
/* For efficiency, we pop as many available timers as we can from the
@@ -455,11 +453,11 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error);
if (grpc_timer_check_trace) {
- gpr_log(GPR_DEBUG,
- " .. popped --> %" PRIdPTR
- ", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR,
+ gpr_log(GPR_DEBUG, " .. popped --> %" PRIdPTR
+ ", shard[%d]->min_deadline %" PRIdPTR
+ " --> %" PRIdPTR ", now=%" PRIdPTR,
n, (int)(g_shard_queue[0] - g_shards),
- g_shard_queue[0]->min_deadline, new_min_deadline);
+ g_shard_queue[0]->min_deadline, new_min_deadline, now);
}
/* An grpc_timer_init() on the shard could intervene here, adding a new
@@ -491,10 +489,28 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
// prelude
GPR_ASSERT(now.clock_type == g_clock_type);
gpr_atm now_atm = timespec_to_atm_round_down(now);
+
+ /* fetch from a thread-local first: this avoids contention on a globally
+ mutable cacheline in the common case */
+ gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer);
+ if (now_atm < min_timer) {
+ if (next != NULL) {
+ *next =
+ atm_to_timespec(GPR_MIN(timespec_to_atm_round_up(*next), min_timer));
+ }
+ if (grpc_timer_check_trace) {
+ gpr_log(GPR_DEBUG,
+ "TIMER CHECK SKIP: now_atm=%" PRId64 " min_timer=%" PRId64,
+ now_atm, min_timer);
+ }
+ return 0;
+ }
+
grpc_error *shutdown_error =
gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system");
+
// tracing
if (grpc_timer_check_trace) {
char *next_str;