diff options
author | 2017-12-06 09:05:05 -0800 | |
---|---|---|
committer | 2017-12-06 09:05:05 -0800 | |
commit | ad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch) | |
tree | 6a657f8c6179d873b34505cdc24bce9462ca68eb /src/core/lib/iomgr/executor.cc | |
parent | a3df36cc2505a89c2f481eea4a66a87b3002844a (diff) |
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'src/core/lib/iomgr/executor.cc')
-rw-r--r-- | src/core/lib/iomgr/executor.cc | 58 |
1 files changed, 31 insertions, 27 deletions
diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index b45223ce16..fabdbdf934 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -55,7 +55,7 @@ grpc_core::TraceFlag executor_trace(false, "executor"); static void executor_thread(void* arg); -static size_t run_closures(grpc_closure_list list) { +static size_t run_closures(grpc_exec_ctx* exec_ctx, grpc_closure_list list) { size_t n = 0; grpc_closure* c = list.head; @@ -73,11 +73,11 @@ static size_t run_closures(grpc_closure_list list) { #ifndef NDEBUG c->scheduled = false; #endif - c->cb(c->cb_arg, error); + c->cb(exec_ctx, c->cb_arg, error); GRPC_ERROR_UNREF(error); c = next; n++; - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(exec_ctx); } return n; @@ -87,7 +87,7 @@ bool grpc_executor_is_threaded() { return gpr_atm_no_barrier_load(&g_cur_threads) > 0; } -void grpc_executor_set_threading(bool threading) { +void grpc_executor_set_threading(grpc_exec_ctx* exec_ctx, bool threading) { gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads); if (threading) { if (cur_threads > 0) return; @@ -125,25 +125,28 @@ void grpc_executor_set_threading(bool threading) { for (size_t i = 0; i < g_max_threads; i++) { gpr_mu_destroy(&g_thread_state[i].mu); gpr_cv_destroy(&g_thread_state[i].cv); - run_closures(g_thread_state[i].elems); + run_closures(exec_ctx, g_thread_state[i].elems); } gpr_free(g_thread_state); gpr_tls_destroy(&g_this_thread_state); } } -void grpc_executor_init() { +void grpc_executor_init(grpc_exec_ctx* exec_ctx) { gpr_atm_no_barrier_store(&g_cur_threads, 0); - grpc_executor_set_threading(true); + grpc_executor_set_threading(exec_ctx, true); } -void grpc_executor_shutdown() { grpc_executor_set_threading(false); } +void grpc_executor_shutdown(grpc_exec_ctx* exec_ctx) { + grpc_executor_set_threading(exec_ctx, false); +} static void executor_thread(void* arg) { thread_state* ts = (thread_state*)arg; gpr_tls_set(&g_this_thread_state, (intptr_t)ts); - grpc_core::ExecCtx exec_ctx(0); + grpc_exec_ctx exec_ctx = + GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, nullptr); size_t subtract_depth = 0; for (;;) { @@ -165,7 +168,7 @@ static void executor_thread(void* arg) { gpr_mu_unlock(&ts->mu); break; } - GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(); + GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx); grpc_closure_list exec = ts->elems; ts->elems = GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); @@ -173,18 +176,19 @@ static void executor_thread(void* arg) { gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state)); } - grpc_core::ExecCtx::Get()->InvalidateNow(); - subtract_depth = run_closures(exec); + grpc_exec_ctx_invalidate_now(&exec_ctx); + subtract_depth = run_closures(&exec_ctx, exec); } + grpc_exec_ctx_finish(&exec_ctx); } -static void executor_push(grpc_closure* closure, grpc_error* error, - bool is_short) { +static void executor_push(grpc_exec_ctx* exec_ctx, grpc_closure* closure, + grpc_error* error, bool is_short) { bool retry_push; if (is_short) { - GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(); + GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx); } else { - GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(); + GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(exec_ctx); } do { retry_push = false; @@ -198,16 +202,14 @@ static void executor_push(grpc_closure* closure, grpc_error* error, gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure); #endif } - grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), - closure, error); + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); return; } thread_state* ts = (thread_state*)gpr_tls_get(&g_this_thread_state); if (ts == nullptr) { - ts = &g_thread_state[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(), - cur_thread_count)]; + ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; } else { - GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(); + GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx); } thread_state* orig_ts = ts; @@ -243,7 +245,7 @@ static void executor_push(grpc_closure* closure, grpc_error* error, continue; } if (grpc_closure_list_empty(ts->elems)) { - GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(); + GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx); gpr_cv_signal(&ts->cv); } grpc_closure_list_append(&ts->elems, closure, error); @@ -267,17 +269,19 @@ static void executor_push(grpc_closure* closure, grpc_error* error, gpr_spinlock_unlock(&g_adding_thread_lock); } if (retry_push) { - GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(); + GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(exec_ctx); } } while (retry_push); } -static void executor_push_short(grpc_closure* closure, grpc_error* error) { - executor_push(closure, error, true); +static void executor_push_short(grpc_exec_ctx* exec_ctx, grpc_closure* closure, + grpc_error* error) { + executor_push(exec_ctx, closure, error, true); } -static void executor_push_long(grpc_closure* closure, grpc_error* error) { - executor_push(closure, error, false); +static void executor_push_long(grpc_exec_ctx* exec_ctx, grpc_closure* closure, + grpc_error* error) { + executor_push(exec_ctx, closure, error, false); } static const grpc_closure_scheduler_vtable executor_vtable_short = { |