aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/combiner.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:05:05 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:05:05 -0800
commitad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch)
tree6a657f8c6179d873b34505cdc24bce9462ca68eb /src/core/lib/iomgr/combiner.cc
parenta3df36cc2505a89c2f481eea4a66a87b3002844a (diff)
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'src/core/lib/iomgr/combiner.cc')
-rw-r--r--src/core/lib/iomgr/combiner.cc128
1 files changed, 66 insertions, 62 deletions
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc
index e4d7a6abd8..15c009dd77 100644
--- a/src/core/lib/iomgr/combiner.cc
+++ b/src/core/lib/iomgr/combiner.cc
@@ -61,15 +61,17 @@ struct grpc_combiner {
gpr_refcount refs;
};
-static void combiner_exec(grpc_closure* closure, grpc_error* error);
-static void combiner_finally_exec(grpc_closure* closure, grpc_error* error);
+static void combiner_exec(grpc_exec_ctx* exec_ctx, grpc_closure* closure,
+ grpc_error* error);
+static void combiner_finally_exec(grpc_exec_ctx* exec_ctx,
+ grpc_closure* closure, grpc_error* error);
static const grpc_closure_scheduler_vtable scheduler = {
combiner_exec, combiner_exec, "combiner:immediately"};
static const grpc_closure_scheduler_vtable finally_scheduler = {
combiner_finally_exec, combiner_finally_exec, "combiner:finally"};
-static void offload(void* arg, grpc_error* error);
+static void offload(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error);
grpc_combiner* grpc_combiner_create(void) {
grpc_combiner* lock = (grpc_combiner*)gpr_zalloc(sizeof(*lock));
@@ -85,19 +87,19 @@ grpc_combiner* grpc_combiner_create(void) {
return lock;
}
-static void really_destroy(grpc_combiner* lock) {
+static void really_destroy(grpc_exec_ctx* exec_ctx, grpc_combiner* lock) {
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p really_destroy", lock));
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0);
gpr_mpscq_destroy(&lock->queue);
gpr_free(lock);
}
-static void start_destroy(grpc_combiner* lock) {
+static void start_destroy(grpc_exec_ctx* exec_ctx, grpc_combiner* lock) {
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED);
GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state));
if (old_state == 1) {
- really_destroy(lock);
+ really_destroy(exec_ctx, lock);
}
}
@@ -113,10 +115,11 @@ static void start_destroy(grpc_combiner* lock) {
#define GRPC_COMBINER_DEBUG_SPAM(op, delta)
#endif
-void grpc_combiner_unref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
+void grpc_combiner_unref(grpc_exec_ctx* exec_ctx,
+ grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
GRPC_COMBINER_DEBUG_SPAM("UNREF", -1);
if (gpr_unref(&lock->refs)) {
- start_destroy(lock);
+ start_destroy(exec_ctx, lock);
}
}
@@ -126,25 +129,23 @@ grpc_combiner* grpc_combiner_ref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
return lock;
}
-static void push_last_on_exec_ctx(grpc_combiner* lock) {
+static void push_last_on_exec_ctx(grpc_exec_ctx* exec_ctx,
+ grpc_combiner* lock) {
lock->next_combiner_on_this_exec_ctx = nullptr;
- if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) {
- grpc_core::ExecCtx::Get()->combiner_data()->active_combiner =
- grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock;
+ if (exec_ctx->active_combiner == nullptr) {
+ exec_ctx->active_combiner = exec_ctx->last_combiner = lock;
} else {
- grpc_core::ExecCtx::Get()
- ->combiner_data()
- ->last_combiner->next_combiner_on_this_exec_ctx = lock;
- grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock;
+ exec_ctx->last_combiner->next_combiner_on_this_exec_ctx = lock;
+ exec_ctx->last_combiner = lock;
}
}
-static void push_first_on_exec_ctx(grpc_combiner* lock) {
- lock->next_combiner_on_this_exec_ctx =
- grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
- grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = lock;
+static void push_first_on_exec_ctx(grpc_exec_ctx* exec_ctx,
+ grpc_combiner* lock) {
+ lock->next_combiner_on_this_exec_ctx = exec_ctx->active_combiner;
+ exec_ctx->active_combiner = lock;
if (lock->next_combiner_on_this_exec_ctx == nullptr) {
- grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock;
+ exec_ctx->last_combiner = lock;
}
}
@@ -152,8 +153,9 @@ static void push_first_on_exec_ctx(grpc_combiner* lock) {
((grpc_combiner*)(((char*)((closure)->scheduler)) - \
offsetof(grpc_combiner, scheduler_name)))
-static void combiner_exec(grpc_closure* cl, grpc_error* error) {
- GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS();
+static void combiner_exec(grpc_exec_ctx* exec_ctx, grpc_closure* cl,
+ grpc_error* error) {
+ GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx);
GPR_TIMER_BEGIN("combiner.execute", 0);
grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(cl, scheduler);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
@@ -161,19 +163,19 @@ static void combiner_exec(grpc_closure* cl, grpc_error* error) {
"C:%p grpc_combiner_execute c=%p last=%" PRIdPTR,
lock, cl, last));
if (last == 1) {
- GRPC_STATS_INC_COMBINER_LOCKS_INITIATED();
+ GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx);
GPR_TIMER_MARK("combiner.initiated", 0);
gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null,
- (gpr_atm)grpc_core::ExecCtx::Get());
+ (gpr_atm)exec_ctx);
// first element on this list: add it to the list of combiner locks
// executing within this exec_ctx
- push_last_on_exec_ctx(lock);
+ push_last_on_exec_ctx(exec_ctx, lock);
} else {
// there may be a race with setting here: if that happens, we may delay
// offload for one or two actions, and that's fine
gpr_atm initiator =
gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null);
- if (initiator != 0 && initiator != (gpr_atm)grpc_core::ExecCtx::Get()) {
+ if (initiator != 0 && initiator != (gpr_atm)exec_ctx) {
gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0);
}
}
@@ -184,32 +186,29 @@ static void combiner_exec(grpc_closure* cl, grpc_error* error) {
GPR_TIMER_END("combiner.execute", 0);
}
-static void move_next() {
- grpc_core::ExecCtx::Get()->combiner_data()->active_combiner =
- grpc_core::ExecCtx::Get()
- ->combiner_data()
- ->active_combiner->next_combiner_on_this_exec_ctx;
- if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) {
- grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = nullptr;
+static void move_next(grpc_exec_ctx* exec_ctx) {
+ exec_ctx->active_combiner =
+ exec_ctx->active_combiner->next_combiner_on_this_exec_ctx;
+ if (exec_ctx->active_combiner == nullptr) {
+ exec_ctx->last_combiner = nullptr;
}
}
-static void offload(void* arg, grpc_error* error) {
+static void offload(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
grpc_combiner* lock = (grpc_combiner*)arg;
- push_last_on_exec_ctx(lock);
+ push_last_on_exec_ctx(exec_ctx, lock);
}
-static void queue_offload(grpc_combiner* lock) {
- GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED();
- move_next();
+static void queue_offload(grpc_exec_ctx* exec_ctx, grpc_combiner* lock) {
+ GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx);
+ move_next(exec_ctx);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload", lock));
- GRPC_CLOSURE_SCHED(&lock->offload, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &lock->offload, GRPC_ERROR_NONE);
}
-bool grpc_combiner_continue_exec_ctx() {
+bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx* exec_ctx) {
GPR_TIMER_BEGIN("combiner.continue_exec_ctx", 0);
- grpc_combiner* lock =
- grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
+ grpc_combiner* lock = exec_ctx->active_combiner;
if (lock == nullptr) {
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return false;
@@ -224,15 +223,15 @@ bool grpc_combiner_continue_exec_ctx() {
"exec_ctx_ready_to_finish=%d "
"time_to_execute_final_list=%d",
lock, contended,
- grpc_core::ExecCtx::Get()->IsReadyToFinish(),
+ grpc_exec_ctx_ready_to_finish(exec_ctx),
lock->time_to_execute_final_list));
- if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() &&
+ if (contended && grpc_exec_ctx_ready_to_finish(exec_ctx) &&
grpc_executor_is_threaded()) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
// this execution context wants to move on: schedule remaining work to be
// picked up on the executor
- queue_offload(lock);
+ queue_offload(exec_ctx, lock);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
}
@@ -248,7 +247,7 @@ bool grpc_combiner_continue_exec_ctx() {
// queue is in an inconsistent state: use this as a cue that we should
// go off and do something else for a while (and come back later)
GPR_TIMER_MARK("delay_busy", 0);
- queue_offload(lock);
+ queue_offload(exec_ctx, lock);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
}
@@ -258,7 +257,7 @@ bool grpc_combiner_continue_exec_ctx() {
#ifndef NDEBUG
cl->scheduled = false;
#endif
- cl->cb(cl->cb_arg, cl_err);
+ cl->cb(exec_ctx, cl->cb_arg, cl_err);
GRPC_ERROR_UNREF(cl_err);
GPR_TIMER_END("combiner.exec1", 0);
} else {
@@ -275,7 +274,7 @@ bool grpc_combiner_continue_exec_ctx() {
#ifndef NDEBUG
c->scheduled = false;
#endif
- c->cb(c->cb_arg, error);
+ c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
c = next;
GPR_TIMER_END("combiner.exec_1final", 0);
@@ -283,7 +282,7 @@ bool grpc_combiner_continue_exec_ctx() {
}
GPR_TIMER_MARK("unref", 0);
- move_next();
+ move_next(exec_ctx);
lock->time_to_execute_final_list = false;
gpr_atm old_state =
gpr_atm_full_fetch_add(&lock->state, -STATE_ELEM_COUNT_LOW_BIT);
@@ -312,7 +311,7 @@ bool grpc_combiner_continue_exec_ctx() {
return true;
case OLD_STATE_WAS(true, 1):
// and one count, one orphaned --> unlocked and orphaned
- really_destroy(lock);
+ really_destroy(exec_ctx, lock);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
case OLD_STATE_WAS(false, 0):
@@ -322,24 +321,27 @@ bool grpc_combiner_continue_exec_ctx() {
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
GPR_UNREACHABLE_CODE(return true);
}
- push_first_on_exec_ctx(lock);
+ push_first_on_exec_ctx(exec_ctx, lock);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
}
-static void enqueue_finally(void* closure, grpc_error* error);
+static void enqueue_finally(grpc_exec_ctx* exec_ctx, void* closure,
+ grpc_error* error);
-static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) {
- GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS();
+static void combiner_finally_exec(grpc_exec_ctx* exec_ctx,
+ grpc_closure* closure, grpc_error* error) {
+ GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(exec_ctx);
grpc_combiner* lock =
COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler);
- GRPC_COMBINER_TRACE(gpr_log(
- GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock,
- closure, grpc_core::ExecCtx::Get()->combiner_data()->active_combiner));
+ GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
+ "C:%p grpc_combiner_execute_finally c=%p; ac=%p",
+ lock, closure, exec_ctx->active_combiner));
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
- if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) {
+ if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
- GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(enqueue_finally, closure,
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ GRPC_CLOSURE_CREATE(enqueue_finally, closure,
grpc_combiner_scheduler(lock)),
error);
GPR_TIMER_END("combiner.execute_finally", 0);
@@ -353,8 +355,10 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) {
GPR_TIMER_END("combiner.execute_finally", 0);
}
-static void enqueue_finally(void* closure, grpc_error* error) {
- combiner_finally_exec((grpc_closure*)closure, GRPC_ERROR_REF(error));
+static void enqueue_finally(grpc_exec_ctx* exec_ctx, void* closure,
+ grpc_error* error) {
+ combiner_finally_exec(exec_ctx, (grpc_closure*)closure,
+ GRPC_ERROR_REF(error));
}
grpc_closure_scheduler* grpc_combiner_scheduler(grpc_combiner* combiner) {