aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/executor.c
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2017-09-20 11:41:46 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2017-09-20 11:41:46 -0700
commita27764e5939fd4ca866d4fd453448cc6d4b87e06 (patch)
treee69fc12aa9be99f98627ea12cdcb69172c52eb43 /src/core/lib/iomgr/executor.c
parente16329dd5f2be074ce72564666e83a33e34e468c (diff)
parent31aea7ee3a2c188d805cdf0b7be6ae446720bc43 (diff)
Merge branch 'master' into debug-timers
Diffstat (limited to 'src/core/lib/iomgr/executor.c')
-rw-r--r--src/core/lib/iomgr/executor.c69
1 files changed, 39 insertions, 30 deletions
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 892385d7d7..2439f15a8a 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -32,16 +32,14 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/spinlock.h"
-#define MAX_DEPTH 2
-
typedef struct {
gpr_mu mu;
gpr_cv cv;
grpc_closure_list elems;
- size_t depth;
bool shutdown;
bool queued_long_job;
gpr_thd_id id;
+ grpc_closure_list local_elems;
} thread_state;
static thread_state *g_thread_state;
@@ -56,32 +54,35 @@ static grpc_tracer_flag executor_trace =
static void executor_thread(void *arg);
-static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
- size_t n = 0;
+static void run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) {
+ int n = 0; // number of closures executed
- grpc_closure *c = list.head;
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error_data.error;
- if (GRPC_TRACER_ON(executor_trace)) {
+ while (!grpc_closure_list_empty(*list)) {
+ grpc_closure *c = list->head;
+ grpc_closure_list_init(list);
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ if (GRPC_TRACER_ON(executor_trace)) {
#ifndef NDEBUG
- gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
- c->file_created, c->line_created);
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
+ c->file_created, c->line_created);
#else
- gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
#endif
- }
+ }
#ifndef NDEBUG
- c->scheduled = false;
+ c->scheduled = false;
#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
- n++;
- grpc_exec_ctx_flush(exec_ctx);
+ n++;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ grpc_exec_ctx_flush(exec_ctx);
+ }
}
- return n;
+ GRPC_STATS_INC_EXECUTOR_CLOSURES_PER_WAKEUP(exec_ctx, n);
}
bool grpc_executor_is_threaded() {
@@ -126,7 +127,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
for (size_t i = 0; i < g_max_threads; i++) {
gpr_mu_destroy(&g_thread_state[i].mu);
gpr_cv_destroy(&g_thread_state[i].cv);
- run_closures(exec_ctx, g_thread_state[i].elems);
+ run_closures(exec_ctx, &g_thread_state[i].elems);
}
gpr_free(g_thread_state);
gpr_tls_destroy(&g_this_thread_state);
@@ -150,14 +151,14 @@ static void executor_thread(void *arg) {
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
- size_t subtract_depth = 0;
+ GRPC_STATS_INC_EXECUTOR_THREADS_CREATED(&exec_ctx);
+
+ bool used = false;
for (;;) {
if (GRPC_TRACER_ON(executor_trace)) {
- gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")",
- (int)(ts - g_thread_state), subtract_depth);
+ gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step", (int)(ts - g_thread_state));
}
gpr_mu_lock(&ts->mu);
- ts->depth -= subtract_depth;
while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
ts->queued_long_job = false;
gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
@@ -170,15 +171,20 @@ static void executor_thread(void *arg) {
gpr_mu_unlock(&ts->mu);
break;
}
+ if (!used) {
+ GRPC_STATS_INC_EXECUTOR_THREADS_USED(&exec_ctx);
+ used = true;
+ }
GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx);
- grpc_closure_list exec = ts->elems;
+ GPR_ASSERT(grpc_closure_list_empty(ts->local_elems));
+ ts->local_elems = ts->elems;
ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu);
if (GRPC_TRACER_ON(executor_trace)) {
gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state));
}
- subtract_depth = run_closures(&exec_ctx, exec);
+ run_closures(&exec_ctx, &ts->local_elems);
}
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -211,6 +217,10 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
} else {
GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx);
+ if (is_short) {
+ grpc_closure_list_append(&ts->local_elems, closure, error);
+ return;
+ }
}
thread_state *orig_ts = ts;
@@ -250,8 +260,7 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_cv_signal(&ts->cv);
}
grpc_closure_list_append(&ts->elems, closure, error);
- ts->depth++;
- try_new_thread = ts->depth > MAX_DEPTH &&
+ try_new_thread = ts->elems.head != closure &&
cur_thread_count < g_max_threads && !ts->shutdown;
if (!is_short) ts->queued_long_job = true;
gpr_mu_unlock(&ts->mu);