diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-12-06 09:05:05 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-06 09:05:05 -0800 |
commit | ad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch) | |
tree | 6a657f8c6179d873b34505cdc24bce9462ca68eb /src/core/lib/iomgr/combiner.cc | |
parent | a3df36cc2505a89c2f481eea4a66a87b3002844a (diff) |
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'src/core/lib/iomgr/combiner.cc')
-rw-r--r-- | src/core/lib/iomgr/combiner.cc | 128 |
1 files changed, 66 insertions, 62 deletions
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index e4d7a6abd8..15c009dd77 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -61,15 +61,17 @@ struct grpc_combiner { gpr_refcount refs; }; -static void combiner_exec(grpc_closure* closure, grpc_error* error); -static void combiner_finally_exec(grpc_closure* closure, grpc_error* error); +static void combiner_exec(grpc_exec_ctx* exec_ctx, grpc_closure* closure, + grpc_error* error); +static void combiner_finally_exec(grpc_exec_ctx* exec_ctx, + grpc_closure* closure, grpc_error* error); static const grpc_closure_scheduler_vtable scheduler = { combiner_exec, combiner_exec, "combiner:immediately"}; static const grpc_closure_scheduler_vtable finally_scheduler = { combiner_finally_exec, combiner_finally_exec, "combiner:finally"}; -static void offload(void* arg, grpc_error* error); +static void offload(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error); grpc_combiner* grpc_combiner_create(void) { grpc_combiner* lock = (grpc_combiner*)gpr_zalloc(sizeof(*lock)); @@ -85,19 +87,19 @@ grpc_combiner* grpc_combiner_create(void) { return lock; } -static void really_destroy(grpc_combiner* lock) { +static void really_destroy(grpc_exec_ctx* exec_ctx, grpc_combiner* lock) { GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p really_destroy", lock)); GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); gpr_mpscq_destroy(&lock->queue); gpr_free(lock); } -static void start_destroy(grpc_combiner* lock) { +static void start_destroy(grpc_exec_ctx* exec_ctx, grpc_combiner* lock) { gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED); GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); if (old_state == 1) { - really_destroy(lock); + really_destroy(exec_ctx, lock); } } @@ -113,10 +115,11 @@ static void start_destroy(grpc_combiner* lock) { #define GRPC_COMBINER_DEBUG_SPAM(op, delta) #endif -void grpc_combiner_unref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) { +void grpc_combiner_unref(grpc_exec_ctx* exec_ctx, + grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) { GRPC_COMBINER_DEBUG_SPAM("UNREF", -1); if (gpr_unref(&lock->refs)) { - start_destroy(lock); + start_destroy(exec_ctx, lock); } } @@ -126,25 +129,23 @@ grpc_combiner* grpc_combiner_ref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) { return lock; } -static void push_last_on_exec_ctx(grpc_combiner* lock) { +static void push_last_on_exec_ctx(grpc_exec_ctx* exec_ctx, + grpc_combiner* lock) { lock->next_combiner_on_this_exec_ctx = nullptr; - if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) { - grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = - grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock; + if (exec_ctx->active_combiner == nullptr) { + exec_ctx->active_combiner = exec_ctx->last_combiner = lock; } else { - grpc_core::ExecCtx::Get() - ->combiner_data() - ->last_combiner->next_combiner_on_this_exec_ctx = lock; - grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock; + exec_ctx->last_combiner->next_combiner_on_this_exec_ctx = lock; + exec_ctx->last_combiner = lock; } } -static void push_first_on_exec_ctx(grpc_combiner* lock) { - lock->next_combiner_on_this_exec_ctx = - grpc_core::ExecCtx::Get()->combiner_data()->active_combiner; - grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = lock; +static void push_first_on_exec_ctx(grpc_exec_ctx* exec_ctx, + grpc_combiner* lock) { + lock->next_combiner_on_this_exec_ctx = exec_ctx->active_combiner; + exec_ctx->active_combiner = lock; if (lock->next_combiner_on_this_exec_ctx == nullptr) { - grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock; + exec_ctx->last_combiner = lock; } } @@ -152,8 +153,9 @@ static void push_first_on_exec_ctx(grpc_combiner* lock) { ((grpc_combiner*)(((char*)((closure)->scheduler)) - \ offsetof(grpc_combiner, scheduler_name))) -static void combiner_exec(grpc_closure* cl, grpc_error* error) { - GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS(); +static void combiner_exec(grpc_exec_ctx* exec_ctx, grpc_closure* cl, + grpc_error* error) { + GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx); GPR_TIMER_BEGIN("combiner.execute", 0); grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(cl, scheduler); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); @@ -161,19 +163,19 @@ static void combiner_exec(grpc_closure* cl, grpc_error* error) { "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR, lock, cl, last)); if (last == 1) { - GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(); + GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx); GPR_TIMER_MARK("combiner.initiated", 0); gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, - (gpr_atm)grpc_core::ExecCtx::Get()); + (gpr_atm)exec_ctx); // first element on this list: add it to the list of combiner locks // executing within this exec_ctx - push_last_on_exec_ctx(lock); + push_last_on_exec_ctx(exec_ctx, lock); } else { // there may be a race with setting here: if that happens, we may delay // offload for one or two actions, and that's fine gpr_atm initiator = gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null); - if (initiator != 0 && initiator != (gpr_atm)grpc_core::ExecCtx::Get()) { + if (initiator != 0 && initiator != (gpr_atm)exec_ctx) { gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0); } } @@ -184,32 +186,29 @@ static void combiner_exec(grpc_closure* cl, grpc_error* error) { GPR_TIMER_END("combiner.execute", 0); } -static void move_next() { - grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = - grpc_core::ExecCtx::Get() - ->combiner_data() - ->active_combiner->next_combiner_on_this_exec_ctx; - if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) { - grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = nullptr; +static void move_next(grpc_exec_ctx* exec_ctx) { + exec_ctx->active_combiner = + exec_ctx->active_combiner->next_combiner_on_this_exec_ctx; + if (exec_ctx->active_combiner == nullptr) { + exec_ctx->last_combiner = nullptr; } } -static void offload(void* arg, grpc_error* error) { +static void offload(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_combiner* lock = (grpc_combiner*)arg; - push_last_on_exec_ctx(lock); + push_last_on_exec_ctx(exec_ctx, lock); } -static void queue_offload(grpc_combiner* lock) { - GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(); - move_next(); +static void queue_offload(grpc_exec_ctx* exec_ctx, grpc_combiner* lock) { + GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx); + move_next(exec_ctx); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload", lock)); - GRPC_CLOSURE_SCHED(&lock->offload, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &lock->offload, GRPC_ERROR_NONE); } -bool grpc_combiner_continue_exec_ctx() { +bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx* exec_ctx) { GPR_TIMER_BEGIN("combiner.continue_exec_ctx", 0); - grpc_combiner* lock = - grpc_core::ExecCtx::Get()->combiner_data()->active_combiner; + grpc_combiner* lock = exec_ctx->active_combiner; if (lock == nullptr) { GPR_TIMER_END("combiner.continue_exec_ctx", 0); return false; @@ -224,15 +223,15 @@ bool grpc_combiner_continue_exec_ctx() { "exec_ctx_ready_to_finish=%d " "time_to_execute_final_list=%d", lock, contended, - grpc_core::ExecCtx::Get()->IsReadyToFinish(), + grpc_exec_ctx_ready_to_finish(exec_ctx), lock->time_to_execute_final_list)); - if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() && + if (contended && grpc_exec_ctx_ready_to_finish(exec_ctx) && grpc_executor_is_threaded()) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on: schedule remaining work to be // picked up on the executor - queue_offload(lock); + queue_offload(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; } @@ -248,7 +247,7 @@ bool grpc_combiner_continue_exec_ctx() { // queue is in an inconsistent state: use this as a cue that we should // go off and do something else for a while (and come back later) GPR_TIMER_MARK("delay_busy", 0); - queue_offload(lock); + queue_offload(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; } @@ -258,7 +257,7 @@ bool grpc_combiner_continue_exec_ctx() { #ifndef NDEBUG cl->scheduled = false; #endif - cl->cb(cl->cb_arg, cl_err); + cl->cb(exec_ctx, cl->cb_arg, cl_err); GRPC_ERROR_UNREF(cl_err); GPR_TIMER_END("combiner.exec1", 0); } else { @@ -275,7 +274,7 @@ bool grpc_combiner_continue_exec_ctx() { #ifndef NDEBUG c->scheduled = false; #endif - c->cb(c->cb_arg, error); + c->cb(exec_ctx, c->cb_arg, error); GRPC_ERROR_UNREF(error); c = next; GPR_TIMER_END("combiner.exec_1final", 0); @@ -283,7 +282,7 @@ bool grpc_combiner_continue_exec_ctx() { } GPR_TIMER_MARK("unref", 0); - move_next(); + move_next(exec_ctx); lock->time_to_execute_final_list = false; gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_ELEM_COUNT_LOW_BIT); @@ -312,7 +311,7 @@ bool grpc_combiner_continue_exec_ctx() { return true; case OLD_STATE_WAS(true, 1): // and one count, one orphaned --> unlocked and orphaned - really_destroy(lock); + really_destroy(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; case OLD_STATE_WAS(false, 0): @@ -322,24 +321,27 @@ bool grpc_combiner_continue_exec_ctx() { GPR_TIMER_END("combiner.continue_exec_ctx", 0); GPR_UNREACHABLE_CODE(return true); } - push_first_on_exec_ctx(lock); + push_first_on_exec_ctx(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; } -static void enqueue_finally(void* closure, grpc_error* error); +static void enqueue_finally(grpc_exec_ctx* exec_ctx, void* closure, + grpc_error* error); -static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) { - GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(); +static void combiner_finally_exec(grpc_exec_ctx* exec_ctx, + grpc_closure* closure, grpc_error* error) { + GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(exec_ctx); grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler); - GRPC_COMBINER_TRACE(gpr_log( - GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock, - closure, grpc_core::ExecCtx::Get()->combiner_data()->active_combiner)); + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, + "C:%p grpc_combiner_execute_finally c=%p; ac=%p", + lock, closure, exec_ctx->active_combiner)); GPR_TIMER_BEGIN("combiner.execute_finally", 0); - if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) { + if (exec_ctx->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(enqueue_finally, closure, + GRPC_CLOSURE_SCHED(exec_ctx, + GRPC_CLOSURE_CREATE(enqueue_finally, closure, grpc_combiner_scheduler(lock)), error); GPR_TIMER_END("combiner.execute_finally", 0); @@ -353,8 +355,10 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) { GPR_TIMER_END("combiner.execute_finally", 0); } -static void enqueue_finally(void* closure, grpc_error* error) { - combiner_finally_exec((grpc_closure*)closure, GRPC_ERROR_REF(error)); +static void enqueue_finally(grpc_exec_ctx* exec_ctx, void* closure, + grpc_error* error) { + combiner_finally_exec(exec_ctx, (grpc_closure*)closure, + GRPC_ERROR_REF(error)); } grpc_closure_scheduler* grpc_combiner_scheduler(grpc_combiner* combiner) { |