diff options
author | Craig Tiller <ctiller@google.com> | 2017-11-06 12:58:11 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-11-06 12:58:11 -0800 |
commit | 55e6820f002afdce55476460b4b6f30ca87063b1 (patch) | |
tree | 724791c9fcecbbb984bc9da486b348f26e6da8d5 /src/core/lib/iomgr/timer_generic.cc | |
parent | fe421accaf861f701f90950d6bb5bf684e6a618d (diff) | |
parent | 0f2cfdcc0f57d21fe2823268263ef5bdc4b6704c (diff) |
Merge github.com:grpc/grpc into shardier
Diffstat (limited to 'src/core/lib/iomgr/timer_generic.cc')
-rw-r--r-- | src/core/lib/iomgr/timer_generic.cc | 107 |
1 files changed, 55 insertions, 52 deletions
diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index 2c11e0b70c..38ac66ea2f 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -77,12 +77,12 @@ static size_t g_num_shards; /* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address * is hashed to select the timer shard to add the timer to */ -static timer_shard *g_shards; +static timer_shard* g_shards; /* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e * the deadline of the next timer in each shard). * Access to this is protected by g_shared_mutables.mu */ -static timer_shard **g_shard_queue; +static timer_shard** g_shard_queue; #ifndef NDEBUG @@ -91,7 +91,7 @@ static timer_shard **g_shard_queue; #define NUM_HASH_BUCKETS 1009 /* Prime number close to 1000 */ static gpr_mu g_hash_mu[NUM_HASH_BUCKETS]; /* One mutex per bucket */ -static grpc_timer *g_timer_ht[NUM_HASH_BUCKETS] = {NULL}; +static grpc_timer* g_timer_ht[NUM_HASH_BUCKETS] = {NULL}; static void init_timer_ht() { for (int i = 0; i < NUM_HASH_BUCKETS; i++) { @@ -99,11 +99,11 @@ static void init_timer_ht() { } } -static bool is_in_ht(grpc_timer *t) { +static bool is_in_ht(grpc_timer* t) { size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS); gpr_mu_lock(&g_hash_mu[i]); - grpc_timer *p = g_timer_ht[i]; + grpc_timer* p = g_timer_ht[i]; while (p != NULL && p != t) { p = p->hash_table_next; } @@ -112,18 +112,18 @@ static bool is_in_ht(grpc_timer *t) { return (p == t); } -static void add_to_ht(grpc_timer *t) { +static void add_to_ht(grpc_timer* t) { GPR_ASSERT(!t->hash_table_next); size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS); gpr_mu_lock(&g_hash_mu[i]); - grpc_timer *p = g_timer_ht[i]; + grpc_timer* p = g_timer_ht[i]; while (p != NULL && p != t) { p = p->hash_table_next; } if (p == t) { - grpc_closure *c = t->closure; + grpc_closure* c = t->closure; gpr_log(GPR_ERROR, "** Duplicate timer (%p) being added. Closure: (%p), created at: " "(%s:%d), scheduled at: (%s:%d) **", @@ -138,7 +138,7 @@ static void add_to_ht(grpc_timer *t) { gpr_mu_unlock(&g_hash_mu[i]); } -static void remove_from_ht(grpc_timer *t) { +static void remove_from_ht(grpc_timer* t) { size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS); bool removed = false; @@ -147,7 +147,7 @@ static void remove_from_ht(grpc_timer *t) { g_timer_ht[i] = g_timer_ht[i]->hash_table_next; removed = true; } else if (g_timer_ht[i] != NULL) { - grpc_timer *p = g_timer_ht[i]; + grpc_timer* p = g_timer_ht[i]; while (p->hash_table_next != NULL && p->hash_table_next != t) { p = p->hash_table_next; } @@ -160,7 +160,7 @@ static void remove_from_ht(grpc_timer *t) { gpr_mu_unlock(&g_hash_mu[i]); if (!removed) { - grpc_closure *c = t->closure; + grpc_closure* c = t->closure; gpr_log(GPR_ERROR, "** Removing timer (%p) that is not added to hash table. Closure " "(%p), created at: (%s:%d), scheduled at: (%s:%d) **", @@ -176,9 +176,9 @@ static void remove_from_ht(grpc_timer *t) { * be pending. A timer is added to hash table only-if it is added to the * timer shard. * Therefore, if timer->pending is false, it cannot be in hash table */ -static void validate_non_pending_timer(grpc_timer *t) { +static void validate_non_pending_timer(grpc_timer* t) { if (!t->pending && is_in_ht(t)) { - grpc_closure *c = t->closure; + grpc_closure* c = t->closure; gpr_log(GPR_ERROR, "** gpr_timer_cancel() called on a non-pending timer (%p) which " "is in the hash table. Closure: (%p), created at: (%s:%d), " @@ -228,23 +228,24 @@ static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { return a + b; } -static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, +static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx* exec_ctx, gpr_atm now, - gpr_atm *next, - grpc_error *error); + gpr_atm* next, + grpc_error* error); -static gpr_atm compute_min_deadline(timer_shard *shard) { +static gpr_atm compute_min_deadline(timer_shard* shard) { return grpc_timer_heap_is_empty(&shard->heap) ? saturating_add(shard->queue_deadline_cap, 1) : grpc_timer_heap_top(&shard->heap)->deadline; } -void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { +void grpc_timer_list_init(grpc_exec_ctx* exec_ctx) { uint32_t i; g_num_shards = GPR_MIN(1, 2 * gpr_cpu_num_cores()); - g_shards = gpr_zalloc(g_num_shards * sizeof(*g_shards)); - g_shard_queue = gpr_zalloc(g_num_shards * sizeof(*g_shard_queue)); + g_shards = (timer_shard*)gpr_zalloc(g_num_shards * sizeof(*g_shards)); + g_shard_queue = + (timer_shard**)gpr_zalloc(g_num_shards * sizeof(*g_shard_queue)); g_shared_mutables.initialized = true; g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER; @@ -256,7 +257,7 @@ void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { grpc_register_tracer(&grpc_timer_check_trace); for (i = 0; i < g_num_shards; i++) { - timer_shard *shard = &g_shards[i]; + timer_shard* shard = &g_shards[i]; gpr_mu_init(&shard->mu); grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, 0.5); @@ -271,13 +272,13 @@ void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { INIT_TIMER_HASH_TABLE(); } -void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { +void grpc_timer_list_shutdown(grpc_exec_ctx* exec_ctx) { size_t i; run_some_expired_timers( exec_ctx, GPR_ATM_MAX, NULL, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); for (i = 0; i < g_num_shards; i++) { - timer_shard *shard = &g_shards[i]; + timer_shard* shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); grpc_timer_heap_destroy(&shard->heap); } @@ -289,19 +290,19 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { } /* returns true if the first element in the list */ -static void list_join(grpc_timer *head, grpc_timer *timer) { +static void list_join(grpc_timer* head, grpc_timer* timer) { timer->next = head; timer->prev = head->prev; timer->next->prev = timer->prev->next = timer; } -static void list_remove(grpc_timer *timer) { +static void list_remove(grpc_timer* timer) { timer->next->prev = timer->prev; timer->prev->next = timer->next; } static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { - timer_shard *temp; + timer_shard* temp; temp = g_shard_queue[first_shard_queue_index]; g_shard_queue[first_shard_queue_index] = g_shard_queue[first_shard_queue_index + 1]; @@ -312,7 +313,7 @@ static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { first_shard_queue_index + 1; } -static void note_deadline_change(timer_shard *shard) { +static void note_deadline_change(timer_shard* shard) { while (shard->shard_queue_index > 0 && shard->min_deadline < g_shard_queue[shard->shard_queue_index - 1]->min_deadline) { @@ -325,12 +326,12 @@ static void note_deadline_change(timer_shard *shard) { } } -void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = false; } +void grpc_timer_init_unset(grpc_timer* timer) { timer->pending = false; } -void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - grpc_millis deadline, grpc_closure *closure) { +void grpc_timer_init(grpc_exec_ctx* exec_ctx, grpc_timer* timer, + grpc_millis deadline, grpc_closure* closure) { int is_first_timer = 0; - timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)]; + timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)]; timer->closure = closure; timer->deadline = deadline; @@ -375,8 +376,9 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, list_join(&shard->list, timer); } if (GRPC_TRACER_ON(grpc_timer_trace)) { - gpr_log(GPR_DEBUG, " .. add to shard %d with queue_deadline_cap=%" PRIdPTR - " => is_first_timer=%s", + gpr_log(GPR_DEBUG, + " .. add to shard %d with queue_deadline_cap=%" PRIdPTR + " => is_first_timer=%s", (int)(shard - g_shards), shard->queue_deadline_cap, is_first_timer ? "true" : "false"); } @@ -417,13 +419,13 @@ void grpc_timer_consume_kick(void) { gpr_tls_set(&g_last_seen_min_timer, 0); } -void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { +void grpc_timer_cancel(grpc_exec_ctx* exec_ctx, grpc_timer* timer) { if (!g_shared_mutables.initialized) { /* must have already been cancelled, also the shard mutex is invalid */ return; } - timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)]; + timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)]; gpr_mu_lock(&shard->mu); if (GRPC_TRACER_ON(grpc_timer_trace)) { gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, @@ -451,7 +453,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { 'queue_deadline_cap') into into shard->heap. Returns 'true' if shard->heap has atleast ONE element REQUIRES: shard->mu locked */ -static int refill_heap(timer_shard *shard, gpr_atm now) { +static int refill_heap(timer_shard* shard, gpr_atm now) { /* Compute the new queue window width and bound by the limits: */ double computed_deadline_delta = grpc_time_averaged_stats_update_average(&shard->stats) * @@ -488,8 +490,8 @@ static int refill_heap(timer_shard *shard, gpr_atm now) { /* This pops the next non-cancelled timer with deadline <= now from the queue, or returns NULL if there isn't one. REQUIRES: shard->mu locked */ -static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) { - grpc_timer *timer; +static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) { + grpc_timer* timer; for (;;) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, " .. shard[%d]: heap_empty=%s", @@ -519,11 +521,11 @@ static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) { } /* REQUIRES: shard->mu unlocked */ -static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard, - gpr_atm now, gpr_atm *new_min_deadline, - grpc_error *error) { +static size_t pop_timers(grpc_exec_ctx* exec_ctx, timer_shard* shard, + gpr_atm now, gpr_atm* new_min_deadline, + grpc_error* error) { size_t n = 0; - grpc_timer *timer; + grpc_timer* timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { REMOVE_FROM_HASH_TABLE(timer); @@ -539,10 +541,10 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard, return n; } -static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, +static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx* exec_ctx, gpr_atm now, - gpr_atm *next, - grpc_error *error) { + gpr_atm* next, + grpc_error* error) { grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED; gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer); @@ -607,8 +609,8 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, return result; } -grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - grpc_millis *next) { +grpc_timer_check_result grpc_timer_check(grpc_exec_ctx* exec_ctx, + grpc_millis* next) { // prelude grpc_millis now = grpc_exec_ctx_now(exec_ctx); @@ -627,21 +629,22 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, return GRPC_TIMERS_CHECKED_AND_EMPTY; } - grpc_error *shutdown_error = + grpc_error* shutdown_error = now != GRPC_MILLIS_INF_FUTURE ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"); // tracing if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - char *next_str; + char* next_str; if (next == NULL) { next_str = gpr_strdup("NULL"); } else { gpr_asprintf(&next_str, "%" PRIdPTR, *next); } - gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRIdPTR - " next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, + gpr_log(GPR_DEBUG, + "TIMER CHECK BEGIN: now=%" PRIdPTR " next=%s tls_min=%" PRIdPTR + " glob_min=%" PRIdPTR, now, next_str, gpr_tls_get(&g_last_seen_min_timer), gpr_atm_no_barrier_load(&g_shared_mutables.min_timer)); gpr_free(next_str); @@ -651,7 +654,7 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, run_some_expired_timers(exec_ctx, now, next, shutdown_error); // tracing if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - char *next_str; + char* next_str; if (next == NULL) { next_str = gpr_strdup("NULL"); } else { |