diff options
author | 2017-09-29 11:18:26 -0700 | |
---|---|---|
committer | 2017-09-29 11:18:26 -0700 | |
commit | 1e868f0f9539925e51aa52269c848082d23b7c4e (patch) | |
tree | 49fd965a5f58e2605864df54a1d70621367eee36 /src/core/lib/iomgr | |
parent | 710334577ce1b2de94f656ec0762eeba6effd29b (diff) | |
parent | 903f06fe3f8b1b971f1b633dff45488ca68c6708 (diff) |
Merge github.com:grpc/grpc into flowctl+millis
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/executor.c | 69 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_utils_windows.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_generic.c | 134 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_generic.h | 3 |
4 files changed, 167 insertions, 43 deletions
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index cb4f506aed..58d9daf2dd 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -32,14 +32,16 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/spinlock.h" +#define MAX_DEPTH 2 + typedef struct { gpr_mu mu; gpr_cv cv; grpc_closure_list elems; + size_t depth; bool shutdown; bool queued_long_job; gpr_thd_id id; - grpc_closure_list local_elems; } thread_state; static thread_state *g_thread_state; @@ -54,35 +56,32 @@ static grpc_tracer_flag executor_trace = static void executor_thread(void *arg); -static void run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) { - int n = 0; // number of closures executed +static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { + size_t n = 0; - while (!grpc_closure_list_empty(*list)) { - grpc_closure *c = list->head; - grpc_closure_list_init(list); - while (c != NULL) { - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error_data.error; - if (GRPC_TRACER_ON(executor_trace)) { + grpc_closure *c = list.head; + while (c != NULL) { + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; + if (GRPC_TRACER_ON(executor_trace)) { #ifndef NDEBUG - gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, - c->file_created, c->line_created); + gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, + c->file_created, c->line_created); #else - gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); + gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); #endif - } + } #ifndef NDEBUG - c->scheduled = false; + c->scheduled = false; #endif - n++; - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; - grpc_exec_ctx_flush(exec_ctx); - } + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + n++; + grpc_exec_ctx_flush(exec_ctx); } - GRPC_STATS_INC_EXECUTOR_CLOSURES_PER_WAKEUP(exec_ctx, n); + return n; } bool grpc_executor_is_threaded() { @@ -127,7 +126,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { for (size_t i = 0; i < g_max_threads; i++) { gpr_mu_destroy(&g_thread_state[i].mu); gpr_cv_destroy(&g_thread_state[i].cv); - run_closures(exec_ctx, &g_thread_state[i].elems); + run_closures(exec_ctx, g_thread_state[i].elems); } gpr_free(g_thread_state); gpr_tls_destroy(&g_this_thread_state); @@ -151,14 +150,14 @@ static void executor_thread(void *arg) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); - GRPC_STATS_INC_EXECUTOR_THREADS_CREATED(&exec_ctx); - - bool used = false; + size_t subtract_depth = 0; for (;;) { if (GRPC_TRACER_ON(executor_trace)) { - gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step", (int)(ts - g_thread_state)); + gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")", + (int)(ts - g_thread_state), subtract_depth); } gpr_mu_lock(&ts->mu); + ts->depth -= subtract_depth; while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { ts->queued_long_job = false; gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); @@ -171,13 +170,8 @@ static void executor_thread(void *arg) { gpr_mu_unlock(&ts->mu); break; } - if (!used) { - GRPC_STATS_INC_EXECUTOR_THREADS_USED(&exec_ctx); - used = true; - } GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx); - GPR_ASSERT(grpc_closure_list_empty(ts->local_elems)); - ts->local_elems = ts->elems; + grpc_closure_list exec = ts->elems; ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); if (GRPC_TRACER_ON(executor_trace)) { @@ -185,7 +179,7 @@ static void executor_thread(void *arg) { } grpc_exec_ctx_invalidate_now(&exec_ctx); - run_closures(&exec_ctx, &ts->local_elems); + subtract_depth = run_closures(&exec_ctx, exec); } grpc_exec_ctx_finish(&exec_ctx); } @@ -218,10 +212,6 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; } else { GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx); - if (is_short) { - grpc_closure_list_append(&ts->local_elems, closure, error); - return; - } } thread_state *orig_ts = ts; @@ -261,7 +251,8 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, gpr_cv_signal(&ts->cv); } grpc_closure_list_append(&ts->elems, closure, error); - try_new_thread = ts->elems.head != closure && + ts->depth++; + try_new_thread = ts->depth > MAX_DEPTH && cur_thread_count < g_max_threads && !ts->shutdown; if (!is_short) ts->queued_long_job = true; gpr_mu_unlock(&ts->mu); diff --git a/src/core/lib/iomgr/socket_utils_windows.c b/src/core/lib/iomgr/socket_utils_windows.c index 2732c159aa..6e85e4b61f 100644 --- a/src/core/lib/iomgr/socket_utils_windows.c +++ b/src/core/lib/iomgr/socket_utils_windows.c @@ -26,12 +26,8 @@ #include <grpc/support/log.h> const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size) { -#ifdef GPR_WIN_INET_NTOP - return inet_ntop(af, src, dst, size); -#else /* Windows InetNtopA wants a mutable ip pointer */ return InetNtopA(af, (void *)src, dst, size); -#endif /* GPR_WIN_INET_NTOP */ } #endif /* GRPC_WINDOWS_SOCKETUTILS */ diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 78765818ca..14330bd1dd 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -79,6 +79,125 @@ static timer_shard g_shards[NUM_SHARDS]; * Access to this is protected by g_shared_mutables.mu */ static timer_shard *g_shard_queue[NUM_SHARDS]; +#ifndef NDEBUG + +/* == Hash table for duplicate timer detection == */ + +#define NUM_HASH_BUCKETS 1009 /* Prime number close to 1000 */ + +static gpr_mu g_hash_mu[NUM_HASH_BUCKETS]; /* One mutex per bucket */ +static grpc_timer *g_timer_ht[NUM_HASH_BUCKETS] = {NULL}; + +static void init_timer_ht() { + for (int i = 0; i < NUM_HASH_BUCKETS; i++) { + gpr_mu_init(&g_hash_mu[i]); + } +} + +static bool is_in_ht(grpc_timer *t) { + size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS); + + gpr_mu_lock(&g_hash_mu[i]); + grpc_timer *p = g_timer_ht[i]; + while (p != NULL && p != t) { + p = p->hash_table_next; + } + gpr_mu_unlock(&g_hash_mu[i]); + + return (p == t); +} + +static void add_to_ht(grpc_timer *t) { + GPR_ASSERT(!t->hash_table_next); + size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS); + + gpr_mu_lock(&g_hash_mu[i]); + grpc_timer *p = g_timer_ht[i]; + while (p != NULL && p != t) { + p = p->hash_table_next; + } + + if (p == t) { + grpc_closure *c = t->closure; + gpr_log(GPR_ERROR, + "** Duplicate timer (%p) being added. Closure: (%p), created at: " + "(%s:%d), scheduled at: (%s:%d) **", + t, c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated); + abort(); + } + + /* Timer not present in the bucket. Insert at head of the list */ + t->hash_table_next = g_timer_ht[i]; + g_timer_ht[i] = t; + gpr_mu_unlock(&g_hash_mu[i]); +} + +static void remove_from_ht(grpc_timer *t) { + size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS); + bool removed = false; + + gpr_mu_lock(&g_hash_mu[i]); + if (g_timer_ht[i] == t) { + g_timer_ht[i] = g_timer_ht[i]->hash_table_next; + removed = true; + } else if (g_timer_ht[i] != NULL) { + grpc_timer *p = g_timer_ht[i]; + while (p->hash_table_next != NULL && p->hash_table_next != t) { + p = p->hash_table_next; + } + + if (p->hash_table_next == t) { + p->hash_table_next = t->hash_table_next; + removed = true; + } + } + gpr_mu_unlock(&g_hash_mu[i]); + + if (!removed) { + grpc_closure *c = t->closure; + gpr_log(GPR_ERROR, + "** Removing timer (%p) that is not added to hash table. Closure " + "(%p), created at: (%s:%d), scheduled at: (%s:%d) **", + t, c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated); + abort(); + } + + t->hash_table_next = NULL; +} + +/* If a timer is added to a timer shard (either heap or a list), it cannot + * be pending. A timer is added to hash table only-if it is added to the + * timer shard. + * Therefore, if timer->pending is false, it cannot be in hash table */ +static void validate_non_pending_timer(grpc_timer *t) { + if (!t->pending && is_in_ht(t)) { + grpc_closure *c = t->closure; + gpr_log(GPR_ERROR, + "** gpr_timer_cancel() called on a non-pending timer (%p) which " + "is in the hash table. Closure: (%p), created at: (%s:%d), " + "scheduled at: (%s:%d) **", + t, c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated); + abort(); + } +} + +#define INIT_TIMER_HASH_TABLE() init_timer_ht() +#define ADD_TO_HASH_TABLE(t) add_to_ht((t)) +#define REMOVE_FROM_HASH_TABLE(t) remove_from_ht((t)) +#define VALIDATE_NON_PENDING_TIMER(t) validate_non_pending_timer((t)) + +#else + +#define INIT_TIMER_HASH_TABLE() +#define ADD_TO_HASH_TABLE(t) +#define REMOVE_FROM_HASH_TABLE(t) +#define VALIDATE_NON_PENDING_TIMER(t) + +#endif + /* Thread local variable that stores the deadline of the next timer the thread * has last-seen. This is an optimization to prevent the thread from checking * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock, @@ -139,6 +258,8 @@ void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { shard->min_deadline = compute_min_deadline(shard); g_shard_queue[i] = shard; } + + INIT_TIMER_HASH_TABLE(); } void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { @@ -202,6 +323,10 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, timer->closure = closure; timer->deadline = deadline; +#ifndef NDEBUG + timer->hash_table_next = NULL; +#endif + if (GRPC_TRACER_ON(grpc_timer_trace)) { gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer, @@ -229,6 +354,9 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, grpc_time_averaged_stats_add_sample(&shard->stats, (double)(deadline - now) / 1000.0); + + ADD_TO_HASH_TABLE(timer); + if (deadline < shard->queue_deadline_cap) { is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { @@ -290,7 +418,10 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, timer->pending ? "true" : "false"); } + if (timer->pending) { + REMOVE_FROM_HASH_TABLE(timer); + GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); timer->pending = false; if (timer->heap_index == INVALID_HEAP_INDEX) { @@ -298,6 +429,8 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { } else { grpc_timer_heap_remove(&shard->heap, timer); } + } else { + VALIDATE_NON_PENDING_TIMER(timer); } gpr_mu_unlock(&shard->mu); } @@ -382,6 +515,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard, grpc_timer *timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { + REMOVE_FROM_HASH_TABLE(timer); GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_REF(error)); n++; } diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h index 72a4ac1f10..f0597f6ea0 100644 --- a/src/core/lib/iomgr/timer_generic.h +++ b/src/core/lib/iomgr/timer_generic.h @@ -29,6 +29,9 @@ struct grpc_timer { struct grpc_timer *next; struct grpc_timer *prev; grpc_closure *closure; +#ifndef NDEBUG + struct grpc_timer *hash_table_next; +#endif }; #endif /* GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H */ |