aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/executor.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:05:05 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:05:05 -0800
commitad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch)
tree6a657f8c6179d873b34505cdc24bce9462ca68eb /src/core/lib/iomgr/executor.cc
parenta3df36cc2505a89c2f481eea4a66a87b3002844a (diff)
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'src/core/lib/iomgr/executor.cc')
-rw-r--r--src/core/lib/iomgr/executor.cc58
1 files changed, 31 insertions, 27 deletions
diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc
index b45223ce16..fabdbdf934 100644
--- a/src/core/lib/iomgr/executor.cc
+++ b/src/core/lib/iomgr/executor.cc
@@ -55,7 +55,7 @@ grpc_core::TraceFlag executor_trace(false, "executor");
static void executor_thread(void* arg);
-static size_t run_closures(grpc_closure_list list) {
+static size_t run_closures(grpc_exec_ctx* exec_ctx, grpc_closure_list list) {
size_t n = 0;
grpc_closure* c = list.head;
@@ -73,11 +73,11 @@ static size_t run_closures(grpc_closure_list list) {
#ifndef NDEBUG
c->scheduled = false;
#endif
- c->cb(c->cb_arg, error);
+ c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
c = next;
n++;
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(exec_ctx);
}
return n;
@@ -87,7 +87,7 @@ bool grpc_executor_is_threaded() {
return gpr_atm_no_barrier_load(&g_cur_threads) > 0;
}
-void grpc_executor_set_threading(bool threading) {
+void grpc_executor_set_threading(grpc_exec_ctx* exec_ctx, bool threading) {
gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads);
if (threading) {
if (cur_threads > 0) return;
@@ -125,25 +125,28 @@ void grpc_executor_set_threading(bool threading) {
for (size_t i = 0; i < g_max_threads; i++) {
gpr_mu_destroy(&g_thread_state[i].mu);
gpr_cv_destroy(&g_thread_state[i].cv);
- run_closures(g_thread_state[i].elems);
+ run_closures(exec_ctx, g_thread_state[i].elems);
}
gpr_free(g_thread_state);
gpr_tls_destroy(&g_this_thread_state);
}
}
-void grpc_executor_init() {
+void grpc_executor_init(grpc_exec_ctx* exec_ctx) {
gpr_atm_no_barrier_store(&g_cur_threads, 0);
- grpc_executor_set_threading(true);
+ grpc_executor_set_threading(exec_ctx, true);
}
-void grpc_executor_shutdown() { grpc_executor_set_threading(false); }
+void grpc_executor_shutdown(grpc_exec_ctx* exec_ctx) {
+ grpc_executor_set_threading(exec_ctx, false);
+}
static void executor_thread(void* arg) {
thread_state* ts = (thread_state*)arg;
gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
- grpc_core::ExecCtx exec_ctx(0);
+ grpc_exec_ctx exec_ctx =
+ GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, nullptr);
size_t subtract_depth = 0;
for (;;) {
@@ -165,7 +168,7 @@ static void executor_thread(void* arg) {
gpr_mu_unlock(&ts->mu);
break;
}
- GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED();
+ GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx);
grpc_closure_list exec = ts->elems;
ts->elems = GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu);
@@ -173,18 +176,19 @@ static void executor_thread(void* arg) {
gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state));
}
- grpc_core::ExecCtx::Get()->InvalidateNow();
- subtract_depth = run_closures(exec);
+ grpc_exec_ctx_invalidate_now(&exec_ctx);
+ subtract_depth = run_closures(&exec_ctx, exec);
}
+ grpc_exec_ctx_finish(&exec_ctx);
}
-static void executor_push(grpc_closure* closure, grpc_error* error,
- bool is_short) {
+static void executor_push(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
+ grpc_error* error, bool is_short) {
bool retry_push;
if (is_short) {
- GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS();
+ GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx);
} else {
- GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS();
+ GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(exec_ctx);
}
do {
retry_push = false;
@@ -198,16 +202,14 @@ static void executor_push(grpc_closure* closure, grpc_error* error,
gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure);
#endif
}
- grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(),
- closure, error);
+ grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
return;
}
thread_state* ts = (thread_state*)gpr_tls_get(&g_this_thread_state);
if (ts == nullptr) {
- ts = &g_thread_state[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(),
- cur_thread_count)];
+ ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
} else {
- GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF();
+ GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx);
}
thread_state* orig_ts = ts;
@@ -243,7 +245,7 @@ static void executor_push(grpc_closure* closure, grpc_error* error,
continue;
}
if (grpc_closure_list_empty(ts->elems)) {
- GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED();
+ GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx);
gpr_cv_signal(&ts->cv);
}
grpc_closure_list_append(&ts->elems, closure, error);
@@ -267,17 +269,19 @@ static void executor_push(grpc_closure* closure, grpc_error* error,
gpr_spinlock_unlock(&g_adding_thread_lock);
}
if (retry_push) {
- GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES();
+ GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(exec_ctx);
}
} while (retry_push);
}
-static void executor_push_short(grpc_closure* closure, grpc_error* error) {
- executor_push(closure, error, true);
+static void executor_push_short(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
+ grpc_error* error) {
+ executor_push(exec_ctx, closure, error, true);
}
-static void executor_push_long(grpc_closure* closure, grpc_error* error) {
- executor_push(closure, error, false);
+static void executor_push_long(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
+ grpc_error* error) {
+ executor_push(exec_ctx, closure, error, false);
}
static const grpc_closure_scheduler_vtable executor_vtable_short = {