diff options
Diffstat (limited to 'src/core/lib/iomgr/combiner.c')
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 110 |
1 files changed, 94 insertions, 16 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index cfc67020ae..c26a73b2b7 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -56,6 +56,10 @@ int grpc_combiner_trace = 0; struct grpc_combiner { grpc_combiner *next_combiner_on_this_exec_ctx; grpc_workqueue *optional_workqueue; + grpc_closure_scheduler uncovered_scheduler; + grpc_closure_scheduler covered_scheduler; + grpc_closure_scheduler uncovered_finally_scheduler; + grpc_closure_scheduler covered_finally_scheduler; gpr_mpscq queue; // state is: // lower bit - zero if orphaned (STATE_UNORPHANED) @@ -70,6 +74,26 @@ struct grpc_combiner { grpc_closure offload; }; +static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, + grpc_closure *closure, grpc_error *error); +static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, + grpc_closure *closure, grpc_error *error); +static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx, + grpc_closure *closure, + grpc_error *error); +static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx, + grpc_closure *closure, + grpc_error *error); + +static const grpc_closure_scheduler_vtable scheduler_uncovered = { + combiner_exec_uncovered, combiner_exec_uncovered}; +static const grpc_closure_scheduler_vtable scheduler_covered = { + combiner_exec_covered, combiner_exec_covered}; +static const grpc_closure_scheduler_vtable finally_scheduler_uncovered = { + combiner_finally_exec_uncovered, combiner_finally_exec_uncovered}; +static const grpc_closure_scheduler_vtable finally_scheduler_covered = { + combiner_finally_exec_covered, combiner_finally_exec_covered}; + static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); typedef struct { @@ -102,11 +126,16 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { lock->time_to_execute_final_list = false; lock->optional_workqueue = optional_workqueue; lock->final_list_covered_by_poller = false; + lock->uncovered_scheduler.vtable = &scheduler_uncovered; + lock->covered_scheduler.vtable = &scheduler_covered; + lock->uncovered_finally_scheduler.vtable = &finally_scheduler_uncovered; + lock->covered_finally_scheduler.vtable = &finally_scheduler_covered; gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); - grpc_closure_init(&lock->offload, offload, lock); + grpc_closure_init(&lock->offload, offload, lock, + grpc_workqueue_scheduler(lock->optional_workqueue)); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); return lock; } @@ -148,9 +177,9 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx, } } -void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *cl, grpc_error *error, - bool covered_by_poller) { +static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, + grpc_closure *cl, grpc_error *error, + bool covered_by_poller) { GPR_TIMER_BEGIN("combiner.execute", 0); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); GRPC_COMBINER_TRACE(gpr_log( @@ -171,6 +200,24 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, GPR_TIMER_END("combiner.execute", 0); } +#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \ + ((grpc_combiner *)(((char *)((closure)->scheduler)) - \ + offsetof(grpc_combiner, scheduler_name))) + +static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *cl, + grpc_error *error) { + combiner_exec(exec_ctx, + COMBINER_FROM_CLOSURE_SCHEDULER(cl, uncovered_scheduler), cl, + error, false); +} + +static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *cl, + grpc_error *error) { + combiner_exec(exec_ctx, + COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_scheduler), cl, + error, true); +} + static void move_next(grpc_exec_ctx *exec_ctx) { exec_ctx->active_combiner = exec_ctx->active_combiner->next_combiner_on_this_exec_ctx; @@ -188,8 +235,7 @@ static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { move_next(exec_ctx); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock, lock->optional_workqueue)); - grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, &lock->offload, - GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE); } bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { @@ -312,23 +358,22 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { } static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, - grpc_error *error) { - grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, - GRPC_ERROR_REF(error), false); -} + grpc_error *error); -void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error, - bool covered_by_poller) { +static void combiner_execute_finally(grpc_exec_ctx *exec_ctx, + grpc_combiner *lock, grpc_closure *closure, + grpc_error *error, + bool covered_by_poller) { GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock, closure, exec_ctx->active_combiner, covered_by_poller)); GPR_TIMER_BEGIN("combiner.execute_finally", 0); if (exec_ctx->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); - grpc_combiner_execute(exec_ctx, lock, - grpc_closure_create(enqueue_finally, closure), error, - false); + grpc_closure_sched( + exec_ctx, grpc_closure_create(enqueue_finally, closure, + grpc_combiner_scheduler(lock, false)), + error); GPR_TIMER_END("combiner.execute_finally", 0); return; } @@ -342,3 +387,36 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure_list_append(&lock->final_list, closure, error); GPR_TIMER_END("combiner.execute_finally", 0); } + +static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, + grpc_error *error) { + combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, + GRPC_ERROR_REF(error), false); +} + +static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx, + grpc_closure *cl, + grpc_error *error) { + combiner_execute_finally(exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER( + cl, uncovered_finally_scheduler), + cl, error, false); +} + +static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx, + grpc_closure *cl, grpc_error *error) { + combiner_execute_finally( + exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_finally_scheduler), + cl, error, true); +} + +grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner, + bool covered_by_poller) { + return covered_by_poller ? &combiner->covered_scheduler + : &combiner->uncovered_scheduler; +} + +grpc_closure_scheduler *grpc_combiner_finally_scheduler( + grpc_combiner *combiner, bool covered_by_poller) { + return covered_by_poller ? &combiner->covered_finally_scheduler + : &combiner->uncovered_finally_scheduler; +} |