aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/lib/iomgr/timer_generic.c68
1 files changed, 45 insertions, 23 deletions
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index bf73d2c685..e6a9eb0e86 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -44,41 +44,63 @@
grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false);
grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false);
+/* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
+ * deadlines earlier than 'queue_deadline" cap are maintained in the heap and
+ * others are maintained in the list (unordered). This helps to keep the number
+ * of elements in the heap low.
+ *
+ * The 'queue_deadline_cap' gets recomputed periodically based on the timer
+ * stats maintained in 'stats' and the relevant timers are then moved from the
+ * 'list' to 'heap'
+ */
typedef struct {
gpr_mu mu;
grpc_time_averaged_stats stats;
/* All and only timers with deadlines <= this will be in the heap. */
gpr_atm queue_deadline_cap;
+ /* The deadline of the next timer due in this shard */
gpr_atm min_deadline;
- /* Index in the g_shard_queue */
+ /* Index of this timer_shard in the g_shard_queue */
uint32_t shard_queue_index;
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this
list have the top bit of their deadline set to 0. */
grpc_timer_heap heap;
/* This holds timers whose deadline is >= queue_deadline_cap. */
grpc_timer list;
-} shard_type;
+} timer_shard;
+
+/* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
+ * is hashed to select the timer shard to add the timer to */
+static timer_shard g_shards[NUM_SHARDS];
+
+/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
+ * the deadline of the next timer in each shard).
+ * Access to this is protected by g_shared_mutables.mu */
+static timer_shard *g_shard_queue[NUM_SHARDS];
+
+/* Thread local variable that stores the deadline of the next timer the thread
+ * has last-seen. This is an optimization to prevent the thread from checking
+ * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
+ * an expensive operation) */
+GPR_TLS_DECL(g_last_seen_min_timer);
struct shared_mutables {
+ /* The deadline of the next timer due across all timer shards */
gpr_atm min_timer;
/* Allow only one run_some_expired_timers at once */
gpr_spinlock checker_mu;
bool initialized;
- /* Protects g_shard_queue */
+ /* Protects g_shard_queue (and the shared_mutables struct itself) */
gpr_mu mu;
} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
static struct shared_mutables g_shared_mutables = {
.checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false,
};
+
static gpr_clock_type g_clock_type;
-static shard_type g_shards[NUM_SHARDS];
-/* Protected by g_shared_mutables.mu */
-static shard_type *g_shard_queue[NUM_SHARDS];
static gpr_timespec g_start_time;
-GPR_TLS_DECL(g_last_seen_min_timer);
-
static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
if (a > GPR_ATM_MAX - b) {
return GPR_ATM_MAX;
@@ -122,7 +144,7 @@ static gpr_timespec atm_to_timespec(gpr_atm x) {
return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0));
}
-static gpr_atm compute_min_deadline(shard_type *shard) {
+static gpr_atm compute_min_deadline(timer_shard *shard) {
return grpc_timer_heap_is_empty(&shard->heap)
? saturating_add(shard->queue_deadline_cap, 1)
: grpc_timer_heap_top(&shard->heap)->deadline;
@@ -142,7 +164,7 @@ void grpc_timer_list_init(gpr_timespec now) {
grpc_register_tracer("timer_check", &grpc_timer_check_trace);
for (i = 0; i < NUM_SHARDS; i++) {
- shard_type *shard = &g_shards[i];
+ timer_shard *shard = &g_shards[i];
gpr_mu_init(&shard->mu);
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
0.5);
@@ -161,7 +183,7 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
exec_ctx, GPR_ATM_MAX, NULL,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
for (i = 0; i < NUM_SHARDS; i++) {
- shard_type *shard = &g_shards[i];
+ timer_shard *shard = &g_shards[i];
gpr_mu_destroy(&shard->mu);
grpc_timer_heap_destroy(&shard->heap);
}
@@ -187,7 +209,7 @@ static void list_remove(grpc_timer *timer) {
}
static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
- shard_type *temp;
+ timer_shard *temp;
temp = g_shard_queue[first_shard_queue_index];
g_shard_queue[first_shard_queue_index] =
g_shard_queue[first_shard_queue_index + 1];
@@ -198,7 +220,7 @@ static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
first_shard_queue_index + 1;
}
-static void note_deadline_change(shard_type *shard) {
+static void note_deadline_change(timer_shard *shard) {
while (shard->shard_queue_index > 0 &&
shard->min_deadline <
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
@@ -215,7 +237,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now) {
int is_first_timer = 0;
- shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
+ timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
timer->closure = closure;
@@ -303,7 +325,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
return;
}
- shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
+ timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
gpr_mu_lock(&shard->mu);
if (GRPC_TRACER_ON(grpc_timer_trace)) {
gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,
@@ -321,12 +343,12 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
gpr_mu_unlock(&shard->mu);
}
-/* This is called when the queue is empty and "now" has reached the
- queue_deadline_cap. We compute a new queue deadline and then scan the map
- for timers that fall at or under it. Returns true if the queue is no
- longer empty.
+/* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
+ all relevant timers in shard->list (i.e timers with deadlines earlier than
+ 'queue_deadline_cap') into into shard->heap.
+ Returns 'true' if shard->heap has atleast ONE element
REQUIRES: shard->mu locked */
-static int refill_queue(shard_type *shard, gpr_atm now) {
+static int refill_heap(timer_shard *shard, gpr_atm now) {
/* Compute the new queue window width and bound by the limits: */
double computed_deadline_delta =
grpc_time_averaged_stats_update_average(&shard->stats) *
@@ -363,7 +385,7 @@ static int refill_queue(shard_type *shard, gpr_atm now) {
/* This pops the next non-cancelled timer with deadline <= now from the
queue, or returns NULL if there isn't one.
REQUIRES: shard->mu locked */
-static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
+static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) {
grpc_timer *timer;
for (;;) {
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
@@ -373,7 +395,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
}
if (grpc_timer_heap_is_empty(&shard->heap)) {
if (now < shard->queue_deadline_cap) return NULL;
- if (!refill_queue(shard, now)) return NULL;
+ if (!refill_heap(shard, now)) return NULL;
}
timer = grpc_timer_heap_top(&shard->heap);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
@@ -393,7 +415,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
}
/* REQUIRES: shard->mu unlocked */
-static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
+static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard,
gpr_atm now, gpr_atm *new_min_deadline,
grpc_error *error) {
size_t n = 0;