aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/combiner.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/combiner.c')
-rw-r--r--src/core/lib/iomgr/combiner.c110
1 files changed, 94 insertions, 16 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index cfc67020ae..c26a73b2b7 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -56,6 +56,10 @@ int grpc_combiner_trace = 0;
struct grpc_combiner {
grpc_combiner *next_combiner_on_this_exec_ctx;
grpc_workqueue *optional_workqueue;
+ grpc_closure_scheduler uncovered_scheduler;
+ grpc_closure_scheduler covered_scheduler;
+ grpc_closure_scheduler uncovered_finally_scheduler;
+ grpc_closure_scheduler covered_finally_scheduler;
gpr_mpscq queue;
// state is:
// lower bit - zero if orphaned (STATE_UNORPHANED)
@@ -70,6 +74,26 @@ struct grpc_combiner {
grpc_closure offload;
};
+static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error);
+static void combiner_exec_covered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error);
+static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure,
+ grpc_error *error);
+static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure,
+ grpc_error *error);
+
+static const grpc_closure_scheduler_vtable scheduler_uncovered = {
+ combiner_exec_uncovered, combiner_exec_uncovered};
+static const grpc_closure_scheduler_vtable scheduler_covered = {
+ combiner_exec_covered, combiner_exec_covered};
+static const grpc_closure_scheduler_vtable finally_scheduler_uncovered = {
+ combiner_finally_exec_uncovered, combiner_finally_exec_uncovered};
+static const grpc_closure_scheduler_vtable finally_scheduler_covered = {
+ combiner_finally_exec_covered, combiner_finally_exec_covered};
+
static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
typedef struct {
@@ -102,11 +126,16 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
lock->time_to_execute_final_list = false;
lock->optional_workqueue = optional_workqueue;
lock->final_list_covered_by_poller = false;
+ lock->uncovered_scheduler.vtable = &scheduler_uncovered;
+ lock->covered_scheduler.vtable = &scheduler_covered;
+ lock->uncovered_finally_scheduler.vtable = &finally_scheduler_uncovered;
+ lock->covered_finally_scheduler.vtable = &finally_scheduler_covered;
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0);
gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list);
- grpc_closure_init(&lock->offload, offload, lock);
+ grpc_closure_init(&lock->offload, offload, lock,
+ grpc_workqueue_scheduler(lock->optional_workqueue));
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock;
}
@@ -148,9 +177,9 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx,
}
}
-void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *cl, grpc_error *error,
- bool covered_by_poller) {
+static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
+ grpc_closure *cl, grpc_error *error,
+ bool covered_by_poller) {
GPR_TIMER_BEGIN("combiner.execute", 0);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
GRPC_COMBINER_TRACE(gpr_log(
@@ -171,6 +200,24 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
GPR_TIMER_END("combiner.execute", 0);
}
+#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \
+ ((grpc_combiner *)(((char *)((closure)->scheduler)) - \
+ offsetof(grpc_combiner, scheduler_name)))
+
+static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
+ grpc_error *error) {
+ combiner_exec(exec_ctx,
+ COMBINER_FROM_CLOSURE_SCHEDULER(cl, uncovered_scheduler), cl,
+ error, false);
+}
+
+static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
+ grpc_error *error) {
+ combiner_exec(exec_ctx,
+ COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_scheduler), cl,
+ error, true);
+}
+
static void move_next(grpc_exec_ctx *exec_ctx) {
exec_ctx->active_combiner =
exec_ctx->active_combiner->next_combiner_on_this_exec_ctx;
@@ -188,8 +235,7 @@ static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
move_next(exec_ctx);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock,
lock->optional_workqueue));
- grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, &lock->offload,
- GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE);
}
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
@@ -312,23 +358,22 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
}
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
- grpc_error *error) {
- grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
- GRPC_ERROR_REF(error), false);
-}
+ grpc_error *error);
-void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error,
- bool covered_by_poller) {
+static void combiner_execute_finally(grpc_exec_ctx *exec_ctx,
+ grpc_combiner *lock, grpc_closure *closure,
+ grpc_error *error,
+ bool covered_by_poller) {
GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock,
closure, exec_ctx->active_combiner, covered_by_poller));
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
- grpc_combiner_execute(exec_ctx, lock,
- grpc_closure_create(enqueue_finally, closure), error,
- false);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_create(enqueue_finally, closure,
+ grpc_combiner_scheduler(lock, false)),
+ error);
GPR_TIMER_END("combiner.execute_finally", 0);
return;
}
@@ -342,3 +387,36 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
}
+
+static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
+ grpc_error *error) {
+ combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
+ GRPC_ERROR_REF(error), false);
+}
+
+static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *cl,
+ grpc_error *error) {
+ combiner_execute_finally(exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(
+ cl, uncovered_finally_scheduler),
+ cl, error, false);
+}
+
+static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *cl, grpc_error *error) {
+ combiner_execute_finally(
+ exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_finally_scheduler),
+ cl, error, true);
+}
+
+grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner,
+ bool covered_by_poller) {
+ return covered_by_poller ? &combiner->covered_scheduler
+ : &combiner->uncovered_scheduler;
+}
+
+grpc_closure_scheduler *grpc_combiner_finally_scheduler(
+ grpc_combiner *combiner, bool covered_by_poller) {
+ return covered_by_poller ? &combiner->covered_finally_scheduler
+ : &combiner->uncovered_finally_scheduler;
+}