diff options
Diffstat (limited to 'src/core/lib/iomgr/exec_ctx.cc')
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.cc | 124 |
1 files changed, 78 insertions, 46 deletions
diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index e005437e0a..1777456342 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -25,7 +25,39 @@ #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/profiling/timers.h" -static void exec_ctx_run(grpc_closure* closure, grpc_error* error) { +bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx* exec_ctx) { + if ((exec_ctx->flags & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) { + if (exec_ctx->check_ready_to_finish(exec_ctx, + exec_ctx->check_ready_to_finish_arg)) { + exec_ctx->flags |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; + return true; + } + return false; + } else { + return true; + } +} + +bool grpc_never_ready_to_finish(grpc_exec_ctx* exec_ctx, void* arg_ignored) { + return false; +} + +bool grpc_always_ready_to_finish(grpc_exec_ctx* exec_ctx, void* arg_ignored) { + return true; +} + +bool grpc_exec_ctx_has_work(grpc_exec_ctx* exec_ctx) { + return exec_ctx->active_combiner != nullptr || + !grpc_closure_list_empty(exec_ctx->closure_list); +} + +void grpc_exec_ctx_finish(grpc_exec_ctx* exec_ctx) { + exec_ctx->flags |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; + grpc_exec_ctx_flush(exec_ctx); +} + +static void exec_ctx_run(grpc_exec_ctx* exec_ctx, grpc_closure* closure, + grpc_error* error) { #ifndef NDEBUG closure->scheduled = false; if (grpc_trace_closure.enabled()) { @@ -35,7 +67,7 @@ static void exec_ctx_run(grpc_closure* closure, grpc_error* error) { closure->line_initiated); } #endif - closure->cb(closure->cb_arg, error); + closure->cb(exec_ctx, closure->cb_arg, error); #ifndef NDEBUG if (grpc_trace_closure.enabled()) { gpr_log(GPR_DEBUG, "closure %p finished", closure); @@ -44,13 +76,42 @@ static void exec_ctx_run(grpc_closure* closure, grpc_error* error) { GRPC_ERROR_UNREF(error); } -static void exec_ctx_sched(grpc_closure* closure, grpc_error* error) { - grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure, - error); +bool grpc_exec_ctx_flush(grpc_exec_ctx* exec_ctx) { + bool did_something = 0; + GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); + for (;;) { + if (!grpc_closure_list_empty(exec_ctx->closure_list)) { + grpc_closure* c = exec_ctx->closure_list.head; + exec_ctx->closure_list.head = exec_ctx->closure_list.tail = nullptr; + while (c != nullptr) { + grpc_closure* next = c->next_data.next; + grpc_error* error = c->error_data.error; + did_something = true; + exec_ctx_run(exec_ctx, c, error); + c = next; + } + } else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) { + break; + } + } + GPR_ASSERT(exec_ctx->active_combiner == nullptr); + GPR_TIMER_END("grpc_exec_ctx_flush", 0); + return did_something; +} + +static void exec_ctx_sched(grpc_exec_ctx* exec_ctx, grpc_closure* closure, + grpc_error* error) { + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } static gpr_timespec g_start_time; +void grpc_exec_ctx_global_init(void) { + g_start_time = gpr_now(GPR_CLOCK_MONOTONIC); +} + +void grpc_exec_ctx_global_shutdown(void) {} + static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { ts = gpr_time_sub(ts, g_start_time); double x = @@ -70,6 +131,18 @@ static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { return (gpr_atm)x; } +grpc_millis grpc_exec_ctx_now(grpc_exec_ctx* exec_ctx) { + if (!exec_ctx->now_is_valid) { + exec_ctx->now = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC)); + exec_ctx->now_is_valid = true; + } + return exec_ctx->now; +} + +void grpc_exec_ctx_invalidate_now(grpc_exec_ctx* exec_ctx) { + exec_ctx->now_is_valid = false; +} + gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock_type) { // special-case infinities as grpc_millis can be 32bit on some platforms @@ -102,44 +175,3 @@ static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { exec_ctx_run, exec_ctx_sched, "exec_ctx"}; static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable}; grpc_closure_scheduler* grpc_schedule_on_exec_ctx = &exec_ctx_scheduler; - -namespace grpc_core { -GPR_TLS_CLASS_DEF(ExecCtx::exec_ctx_); - -void ExecCtx::GlobalInit(void) { - g_start_time = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_tls_init(&exec_ctx_); -} - -bool ExecCtx::Flush() { - bool did_something = 0; - GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); - for (;;) { - if (!grpc_closure_list_empty(closure_list_)) { - grpc_closure* c = closure_list_.head; - closure_list_.head = closure_list_.tail = nullptr; - while (c != nullptr) { - grpc_closure* next = c->next_data.next; - grpc_error* error = c->error_data.error; - did_something = true; - exec_ctx_run(c, error); - c = next; - } - } else if (!grpc_combiner_continue_exec_ctx()) { - break; - } - } - GPR_ASSERT(combiner_data_.active_combiner == nullptr); - GPR_TIMER_END("grpc_exec_ctx_flush", 0); - return did_something; -} - -grpc_millis ExecCtx::Now() { - if (!now_is_valid_) { - now_ = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC)); - now_is_valid_ = true; - } - return now_; -} - -} // namespace grpc_core |