diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-10-13 16:07:13 -0700 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2017-10-18 17:12:19 -0700 |
commit | 0ee7574732a06e8cace4e099a678f4bd5dbff679 (patch) | |
tree | e43d5de442fdcc3d39cd5af687f319fa39612d3f /src/core/lib/iomgr/combiner.cc | |
parent | 6bf5f833efe2cb9e2ecc14358dd9699cd5d05263 (diff) |
Removing instances of exec_ctx being passed around in functions in
src/core. exec_ctx is now a thread_local pointer of type ExecCtx instead of
grpc_exec_ctx which is initialized whenever ExecCtx is instantiated. ExecCtx
also keeps track of the previous exec_ctx so that nesting of exec_ctx is
allowed. This means that there is only one exec_ctx being used at any
time. Also, grpc_exec_ctx_finish is called in the destructor of the
object, and the previous exec_ctx is restored to avoid breaking current
functionality. The code still explicitly calls grpc_exec_ctx_finish
because removing all such instances causes the code to break.
Diffstat (limited to 'src/core/lib/iomgr/combiner.cc')
-rw-r--r-- | src/core/lib/iomgr/combiner.cc | 90 |
1 files changed, 39 insertions, 51 deletions
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 53f4b7eaa7..c6d6d0fc43 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -62,17 +62,15 @@ struct grpc_combiner { gpr_refcount refs; }; -static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error); -static void combiner_finally_exec(grpc_exec_ctx *exec_ctx, - grpc_closure *closure, grpc_error *error); +static void combiner_exec(grpc_closure *closure, grpc_error *error); +static void combiner_finally_exec(grpc_closure *closure, grpc_error *error); static const grpc_closure_scheduler_vtable scheduler = { combiner_exec, combiner_exec, "combiner:immediately"}; static const grpc_closure_scheduler_vtable finally_scheduler = { combiner_finally_exec, combiner_finally_exec, "combiner:finally"}; -static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); +static void offload(void *arg, grpc_error *error); grpc_combiner *grpc_combiner_create(void) { grpc_combiner *lock = (grpc_combiner *)gpr_zalloc(sizeof(*lock)); @@ -88,19 +86,19 @@ grpc_combiner *grpc_combiner_create(void) { return lock; } -static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { +static void really_destroy(grpc_combiner *lock) { GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p really_destroy", lock)); GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); gpr_mpscq_destroy(&lock->queue); gpr_free(lock); } -static void start_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { +static void start_destroy(grpc_combiner *lock) { gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED); GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); if (old_state == 1) { - really_destroy(exec_ctx, lock); + really_destroy(lock); } } @@ -116,11 +114,10 @@ static void start_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { #define GRPC_COMBINER_DEBUG_SPAM(op, delta) #endif -void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, - grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) { +void grpc_combiner_unref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) { GRPC_COMBINER_DEBUG_SPAM("UNREF", -1); if (gpr_unref(&lock->refs)) { - start_destroy(exec_ctx, lock); + start_destroy(lock); } } @@ -130,8 +127,7 @@ grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) { return lock; } -static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx, - grpc_combiner *lock) { +static void push_last_on_exec_ctx(grpc_combiner *lock) { lock->next_combiner_on_this_exec_ctx = NULL; if (exec_ctx->active_combiner == NULL) { exec_ctx->active_combiner = exec_ctx->last_combiner = lock; @@ -141,8 +137,7 @@ static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx, } } -static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx, - grpc_combiner *lock) { +static void push_first_on_exec_ctx(grpc_combiner *lock) { lock->next_combiner_on_this_exec_ctx = exec_ctx->active_combiner; exec_ctx->active_combiner = lock; if (lock->next_combiner_on_this_exec_ctx == NULL) { @@ -154,9 +149,8 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx, ((grpc_combiner *)(((char *)((closure)->scheduler)) - \ offsetof(grpc_combiner, scheduler_name))) -static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl, - grpc_error *error) { - GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx); +static void combiner_exec(grpc_closure *cl, grpc_error *error) { + GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS(); GPR_TIMER_BEGIN("combiner.execute", 0); grpc_combiner *lock = COMBINER_FROM_CLOSURE_SCHEDULER(cl, scheduler); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); @@ -164,19 +158,19 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl, "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR, lock, cl, last)); if (last == 1) { - GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx); + GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(); GPR_TIMER_MARK("combiner.initiated", 0); gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, - (gpr_atm)exec_ctx); + (gpr_atm)&exec_ctx); // first element on this list: add it to the list of combiner locks // executing within this exec_ctx - push_last_on_exec_ctx(exec_ctx, lock); + push_last_on_exec_ctx(lock); } else { // there may be a race with setting here: if that happens, we may delay // offload for one or two actions, and that's fine gpr_atm initiator = gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null); - if (initiator != 0 && initiator != (gpr_atm)exec_ctx) { + if (initiator != 0 && initiator != (gpr_atm)&exec_ctx) { gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0); } } @@ -187,7 +181,7 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl, GPR_TIMER_END("combiner.execute", 0); } -static void move_next(grpc_exec_ctx *exec_ctx) { +static void move_next() { exec_ctx->active_combiner = exec_ctx->active_combiner->next_combiner_on_this_exec_ctx; if (exec_ctx->active_combiner == NULL) { @@ -195,19 +189,19 @@ static void move_next(grpc_exec_ctx *exec_ctx) { } } -static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { +static void offload(void *arg, grpc_error *error) { grpc_combiner *lock = (grpc_combiner *)arg; - push_last_on_exec_ctx(exec_ctx, lock); + push_last_on_exec_ctx(lock); } -static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx); - move_next(exec_ctx); +static void queue_offload(grpc_combiner *lock) { + GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(); + move_next(); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload", lock)); - GRPC_CLOSURE_SCHED(exec_ctx, &lock->offload, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&lock->offload, GRPC_ERROR_NONE); } -bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { +bool grpc_combiner_continue_exec_ctx() { GPR_TIMER_BEGIN("combiner.continue_exec_ctx", 0); grpc_combiner *lock = exec_ctx->active_combiner; if (lock == NULL) { @@ -223,16 +217,15 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { "contended=%d " "exec_ctx_ready_to_finish=%d " "time_to_execute_final_list=%d", - lock, contended, - grpc_exec_ctx_ready_to_finish(exec_ctx), + lock, contended, grpc_exec_ctx_ready_to_finish(), lock->time_to_execute_final_list)); - if (contended && grpc_exec_ctx_ready_to_finish(exec_ctx) && + if (contended && grpc_exec_ctx_ready_to_finish() && grpc_executor_is_threaded()) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on: schedule remaining work to be // picked up on the executor - queue_offload(exec_ctx, lock); + queue_offload(lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; } @@ -248,7 +241,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { // queue is in an inconsistent state: use this as a cue that we should // go off and do something else for a while (and come back later) GPR_TIMER_MARK("delay_busy", 0); - queue_offload(exec_ctx, lock); + queue_offload(lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; } @@ -258,7 +251,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { #ifndef NDEBUG cl->scheduled = false; #endif - cl->cb(exec_ctx, cl->cb_arg, cl_err); + cl->cb(cl->cb_arg, cl_err); GRPC_ERROR_UNREF(cl_err); GPR_TIMER_END("combiner.exec1", 0); } else { @@ -275,7 +268,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { #ifndef NDEBUG c->scheduled = false; #endif - c->cb(exec_ctx, c->cb_arg, error); + c->cb(c->cb_arg, error); GRPC_ERROR_UNREF(error); c = next; GPR_TIMER_END("combiner.exec_1final", 0); @@ -283,7 +276,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { } GPR_TIMER_MARK("unref", 0); - move_next(exec_ctx); + move_next(); lock->time_to_execute_final_list = false; gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_ELEM_COUNT_LOW_BIT); @@ -312,7 +305,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { return true; case OLD_STATE_WAS(true, 1): // and one count, one orphaned --> unlocked and orphaned - really_destroy(exec_ctx, lock); + really_destroy(lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; case OLD_STATE_WAS(false, 0): @@ -322,17 +315,15 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { GPR_TIMER_END("combiner.continue_exec_ctx", 0); GPR_UNREACHABLE_CODE(return true); } - push_first_on_exec_ctx(exec_ctx, lock); + push_first_on_exec_ctx(lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; } -static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, - grpc_error *error); +static void enqueue_finally(void *closure, grpc_error *error); -static void combiner_finally_exec(grpc_exec_ctx *exec_ctx, - grpc_closure *closure, grpc_error *error) { - GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(exec_ctx); +static void combiner_finally_exec(grpc_closure *closure, grpc_error *error) { + GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(); grpc_combiner *lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, @@ -341,8 +332,7 @@ static void combiner_finally_exec(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("combiner.execute_finally", 0); if (exec_ctx->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); - GRPC_CLOSURE_SCHED(exec_ctx, - GRPC_CLOSURE_CREATE(enqueue_finally, closure, + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(enqueue_finally, closure, grpc_combiner_scheduler(lock)), error); GPR_TIMER_END("combiner.execute_finally", 0); @@ -356,10 +346,8 @@ static void combiner_finally_exec(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("combiner.execute_finally", 0); } -static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, - grpc_error *error) { - combiner_finally_exec(exec_ctx, (grpc_closure *)closure, - GRPC_ERROR_REF(error)); +static void enqueue_finally(void *closure, grpc_error *error) { + combiner_finally_exec((grpc_closure *)closure, GRPC_ERROR_REF(error)); } grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner) { |