aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-09-29 11:18:26 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-09-29 11:18:26 -0700
commit1e868f0f9539925e51aa52269c848082d23b7c4e (patch)
tree49fd965a5f58e2605864df54a1d70621367eee36 /src/core/lib/iomgr
parent710334577ce1b2de94f656ec0762eeba6effd29b (diff)
parent903f06fe3f8b1b971f1b633dff45488ca68c6708 (diff)
Merge github.com:grpc/grpc into flowctl+millis
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/executor.c69
-rw-r--r--src/core/lib/iomgr/socket_utils_windows.c4
-rw-r--r--src/core/lib/iomgr/timer_generic.c134
-rw-r--r--src/core/lib/iomgr/timer_generic.h3
4 files changed, 167 insertions, 43 deletions
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index cb4f506aed..58d9daf2dd 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -32,14 +32,16 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/spinlock.h"
+#define MAX_DEPTH 2
+
typedef struct {
gpr_mu mu;
gpr_cv cv;
grpc_closure_list elems;
+ size_t depth;
bool shutdown;
bool queued_long_job;
gpr_thd_id id;
- grpc_closure_list local_elems;
} thread_state;
static thread_state *g_thread_state;
@@ -54,35 +56,32 @@ static grpc_tracer_flag executor_trace =
static void executor_thread(void *arg);
-static void run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) {
- int n = 0; // number of closures executed
+static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
+ size_t n = 0;
- while (!grpc_closure_list_empty(*list)) {
- grpc_closure *c = list->head;
- grpc_closure_list_init(list);
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error_data.error;
- if (GRPC_TRACER_ON(executor_trace)) {
+ grpc_closure *c = list.head;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ if (GRPC_TRACER_ON(executor_trace)) {
#ifndef NDEBUG
- gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
- c->file_created, c->line_created);
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
+ c->file_created, c->line_created);
#else
- gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
#endif
- }
+ }
#ifndef NDEBUG
- c->scheduled = false;
+ c->scheduled = false;
#endif
- n++;
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
- grpc_exec_ctx_flush(exec_ctx);
- }
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ n++;
+ grpc_exec_ctx_flush(exec_ctx);
}
- GRPC_STATS_INC_EXECUTOR_CLOSURES_PER_WAKEUP(exec_ctx, n);
+ return n;
}
bool grpc_executor_is_threaded() {
@@ -127,7 +126,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
for (size_t i = 0; i < g_max_threads; i++) {
gpr_mu_destroy(&g_thread_state[i].mu);
gpr_cv_destroy(&g_thread_state[i].cv);
- run_closures(exec_ctx, &g_thread_state[i].elems);
+ run_closures(exec_ctx, g_thread_state[i].elems);
}
gpr_free(g_thread_state);
gpr_tls_destroy(&g_this_thread_state);
@@ -151,14 +150,14 @@ static void executor_thread(void *arg) {
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
- GRPC_STATS_INC_EXECUTOR_THREADS_CREATED(&exec_ctx);
-
- bool used = false;
+ size_t subtract_depth = 0;
for (;;) {
if (GRPC_TRACER_ON(executor_trace)) {
- gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step", (int)(ts - g_thread_state));
+ gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")",
+ (int)(ts - g_thread_state), subtract_depth);
}
gpr_mu_lock(&ts->mu);
+ ts->depth -= subtract_depth;
while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
ts->queued_long_job = false;
gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
@@ -171,13 +170,8 @@ static void executor_thread(void *arg) {
gpr_mu_unlock(&ts->mu);
break;
}
- if (!used) {
- GRPC_STATS_INC_EXECUTOR_THREADS_USED(&exec_ctx);
- used = true;
- }
GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx);
- GPR_ASSERT(grpc_closure_list_empty(ts->local_elems));
- ts->local_elems = ts->elems;
+ grpc_closure_list exec = ts->elems;
ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu);
if (GRPC_TRACER_ON(executor_trace)) {
@@ -185,7 +179,7 @@ static void executor_thread(void *arg) {
}
grpc_exec_ctx_invalidate_now(&exec_ctx);
- run_closures(&exec_ctx, &ts->local_elems);
+ subtract_depth = run_closures(&exec_ctx, exec);
}
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -218,10 +212,6 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
} else {
GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx);
- if (is_short) {
- grpc_closure_list_append(&ts->local_elems, closure, error);
- return;
- }
}
thread_state *orig_ts = ts;
@@ -261,7 +251,8 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_cv_signal(&ts->cv);
}
grpc_closure_list_append(&ts->elems, closure, error);
- try_new_thread = ts->elems.head != closure &&
+ ts->depth++;
+ try_new_thread = ts->depth > MAX_DEPTH &&
cur_thread_count < g_max_threads && !ts->shutdown;
if (!is_short) ts->queued_long_job = true;
gpr_mu_unlock(&ts->mu);
diff --git a/src/core/lib/iomgr/socket_utils_windows.c b/src/core/lib/iomgr/socket_utils_windows.c
index 2732c159aa..6e85e4b61f 100644
--- a/src/core/lib/iomgr/socket_utils_windows.c
+++ b/src/core/lib/iomgr/socket_utils_windows.c
@@ -26,12 +26,8 @@
#include <grpc/support/log.h>
const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size) {
-#ifdef GPR_WIN_INET_NTOP
- return inet_ntop(af, src, dst, size);
-#else
/* Windows InetNtopA wants a mutable ip pointer */
return InetNtopA(af, (void *)src, dst, size);
-#endif /* GPR_WIN_INET_NTOP */
}
#endif /* GRPC_WINDOWS_SOCKETUTILS */
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index 78765818ca..14330bd1dd 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -79,6 +79,125 @@ static timer_shard g_shards[NUM_SHARDS];
* Access to this is protected by g_shared_mutables.mu */
static timer_shard *g_shard_queue[NUM_SHARDS];
+#ifndef NDEBUG
+
+/* == Hash table for duplicate timer detection == */
+
+#define NUM_HASH_BUCKETS 1009 /* Prime number close to 1000 */
+
+static gpr_mu g_hash_mu[NUM_HASH_BUCKETS]; /* One mutex per bucket */
+static grpc_timer *g_timer_ht[NUM_HASH_BUCKETS] = {NULL};
+
+static void init_timer_ht() {
+ for (int i = 0; i < NUM_HASH_BUCKETS; i++) {
+ gpr_mu_init(&g_hash_mu[i]);
+ }
+}
+
+static bool is_in_ht(grpc_timer *t) {
+ size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
+
+ gpr_mu_lock(&g_hash_mu[i]);
+ grpc_timer *p = g_timer_ht[i];
+ while (p != NULL && p != t) {
+ p = p->hash_table_next;
+ }
+ gpr_mu_unlock(&g_hash_mu[i]);
+
+ return (p == t);
+}
+
+static void add_to_ht(grpc_timer *t) {
+ GPR_ASSERT(!t->hash_table_next);
+ size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
+
+ gpr_mu_lock(&g_hash_mu[i]);
+ grpc_timer *p = g_timer_ht[i];
+ while (p != NULL && p != t) {
+ p = p->hash_table_next;
+ }
+
+ if (p == t) {
+ grpc_closure *c = t->closure;
+ gpr_log(GPR_ERROR,
+ "** Duplicate timer (%p) being added. Closure: (%p), created at: "
+ "(%s:%d), scheduled at: (%s:%d) **",
+ t, c, c->file_created, c->line_created, c->file_initiated,
+ c->line_initiated);
+ abort();
+ }
+
+ /* Timer not present in the bucket. Insert at head of the list */
+ t->hash_table_next = g_timer_ht[i];
+ g_timer_ht[i] = t;
+ gpr_mu_unlock(&g_hash_mu[i]);
+}
+
+static void remove_from_ht(grpc_timer *t) {
+ size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS);
+ bool removed = false;
+
+ gpr_mu_lock(&g_hash_mu[i]);
+ if (g_timer_ht[i] == t) {
+ g_timer_ht[i] = g_timer_ht[i]->hash_table_next;
+ removed = true;
+ } else if (g_timer_ht[i] != NULL) {
+ grpc_timer *p = g_timer_ht[i];
+ while (p->hash_table_next != NULL && p->hash_table_next != t) {
+ p = p->hash_table_next;
+ }
+
+ if (p->hash_table_next == t) {
+ p->hash_table_next = t->hash_table_next;
+ removed = true;
+ }
+ }
+ gpr_mu_unlock(&g_hash_mu[i]);
+
+ if (!removed) {
+ grpc_closure *c = t->closure;
+ gpr_log(GPR_ERROR,
+ "** Removing timer (%p) that is not added to hash table. Closure "
+ "(%p), created at: (%s:%d), scheduled at: (%s:%d) **",
+ t, c, c->file_created, c->line_created, c->file_initiated,
+ c->line_initiated);
+ abort();
+ }
+
+ t->hash_table_next = NULL;
+}
+
+/* If a timer is added to a timer shard (either heap or a list), it cannot
+ * be pending. A timer is added to hash table only-if it is added to the
+ * timer shard.
+ * Therefore, if timer->pending is false, it cannot be in hash table */
+static void validate_non_pending_timer(grpc_timer *t) {
+ if (!t->pending && is_in_ht(t)) {
+ grpc_closure *c = t->closure;
+ gpr_log(GPR_ERROR,
+ "** gpr_timer_cancel() called on a non-pending timer (%p) which "
+ "is in the hash table. Closure: (%p), created at: (%s:%d), "
+ "scheduled at: (%s:%d) **",
+ t, c, c->file_created, c->line_created, c->file_initiated,
+ c->line_initiated);
+ abort();
+ }
+}
+
+#define INIT_TIMER_HASH_TABLE() init_timer_ht()
+#define ADD_TO_HASH_TABLE(t) add_to_ht((t))
+#define REMOVE_FROM_HASH_TABLE(t) remove_from_ht((t))
+#define VALIDATE_NON_PENDING_TIMER(t) validate_non_pending_timer((t))
+
+#else
+
+#define INIT_TIMER_HASH_TABLE()
+#define ADD_TO_HASH_TABLE(t)
+#define REMOVE_FROM_HASH_TABLE(t)
+#define VALIDATE_NON_PENDING_TIMER(t)
+
+#endif
+
/* Thread local variable that stores the deadline of the next timer the thread
* has last-seen. This is an optimization to prevent the thread from checking
* shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
@@ -139,6 +258,8 @@ void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) {
shard->min_deadline = compute_min_deadline(shard);
g_shard_queue[i] = shard;
}
+
+ INIT_TIMER_HASH_TABLE();
}
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
@@ -202,6 +323,10 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
timer->closure = closure;
timer->deadline = deadline;
+#ifndef NDEBUG
+ timer->hash_table_next = NULL;
+#endif
+
if (GRPC_TRACER_ON(grpc_timer_trace)) {
gpr_log(GPR_DEBUG,
"TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer,
@@ -229,6 +354,9 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
grpc_time_averaged_stats_add_sample(&shard->stats,
(double)(deadline - now) / 1000.0);
+
+ ADD_TO_HASH_TABLE(timer);
+
if (deadline < shard->queue_deadline_cap) {
is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
} else {
@@ -290,7 +418,10 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,
timer->pending ? "true" : "false");
}
+
if (timer->pending) {
+ REMOVE_FROM_HASH_TABLE(timer);
+
GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
timer->pending = false;
if (timer->heap_index == INVALID_HEAP_INDEX) {
@@ -298,6 +429,8 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
} else {
grpc_timer_heap_remove(&shard->heap, timer);
}
+ } else {
+ VALIDATE_NON_PENDING_TIMER(timer);
}
gpr_mu_unlock(&shard->mu);
}
@@ -382,6 +515,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard,
grpc_timer *timer;
gpr_mu_lock(&shard->mu);
while ((timer = pop_one(shard, now))) {
+ REMOVE_FROM_HASH_TABLE(timer);
GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_REF(error));
n++;
}
diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h
index 72a4ac1f10..f0597f6ea0 100644
--- a/src/core/lib/iomgr/timer_generic.h
+++ b/src/core/lib/iomgr/timer_generic.h
@@ -29,6 +29,9 @@ struct grpc_timer {
struct grpc_timer *next;
struct grpc_timer *prev;
grpc_closure *closure;
+#ifndef NDEBUG
+ struct grpc_timer *hash_table_next;
+#endif
};
#endif /* GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H */