diff options
Diffstat (limited to 'src/core/lib/iomgr/combiner.c')
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 116 |
1 files changed, 61 insertions, 55 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 52a4a3ee9f..981d3348ce 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -58,7 +58,9 @@ struct grpc_combiner { // lower bit - zero if orphaned // other bits - number of items queued on the lock gpr_atm state; - bool take_async_break_before_final_list; + // number of elements in the list that are covered by a poller: if >0, we can + // offload safely + gpr_atm covered_by_poller; bool time_to_execute_final_list; grpc_closure_list final_list; grpc_closure offload; @@ -66,14 +68,27 @@ struct grpc_combiner { static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); +typedef struct { + grpc_error *error; + bool covered_by_poller; +} error_data; + +static uintptr_t pack_error_data(error_data d) { + return ((uintptr_t)d.error) | (d.covered_by_poller ? 1 : 0); +} + +static error_data unpack_error_data(uintptr_t p) { + return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1}; +} + grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); lock->next_combiner_on_this_exec_ctx = NULL; lock->time_to_execute_final_list = false; lock->optional_workqueue = optional_workqueue; gpr_atm_no_barrier_store(&lock->state, 1); + gpr_atm_no_barrier_store(&lock->covered_by_poller, 0); gpr_mpscq_init(&lock->queue); - lock->take_async_break_before_final_list = false; grpc_closure_list_init(&lock->final_list); grpc_closure_init(&lock->offload, offload, lock); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); @@ -108,13 +123,19 @@ static void queue_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { } void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *cl, grpc_error *error) { - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl)); + grpc_closure *cl, grpc_error *error, + bool covered_by_poller) { + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, + "C:%p grpc_combiner_execute c=%p cov=%d", lock, + cl, covered_by_poller)); GPR_TIMER_BEGIN("combiner.execute", 0); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); GPR_ASSERT(last & 1); // ensure lock has not been destroyed - cl->error = error; + cl->error_data.scratch = + pack_error_data((error_data){error, covered_by_poller}); + if (covered_by_poller) { + gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, 1); + } gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); if (last == 1) { // code will be written when the exec_ctx calls @@ -152,11 +173,12 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { } if (lock->optional_workqueue != NULL && - grpc_exec_ctx_ready_to_finish(exec_ctx)) { + grpc_exec_ctx_ready_to_finish(exec_ctx) && + gpr_atm_acq_load(&lock->covered_by_poller) > 0) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); - // this execution context wants to move on, and we have a workqueue (and so - // can help the execution context out): schedule remaining work to be picked - // up on the workqueue + // this execution context wants to move on, and we have a workqueue (and + // so can help the execution context out): schedule remaining work to be + // picked up on the workqueue queue_offload(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; @@ -173,7 +195,8 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { // queue is in an inconsistant state: use this as a cue that we should // go off and do something else for a while (and come back later) GPR_TIMER_MARK("delay_busy", 0); - if (lock->optional_workqueue != NULL) { + if (lock->optional_workqueue != NULL && + gpr_atm_acq_load(&lock->covered_by_poller) > 0) { queue_offload(exec_ctx, lock); } GPR_TIMER_END("combiner.continue_exec_ctx", 0); @@ -181,37 +204,28 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { } GPR_TIMER_BEGIN("combiner.exec1", 0); grpc_closure *cl = (grpc_closure *)n; - grpc_error *error = cl->error; - cl->cb(exec_ctx, cl->cb_arg, error); - GRPC_ERROR_UNREF(error); + error_data err = unpack_error_data(cl->error_data.scratch); + cl->cb(exec_ctx, cl->cb_arg, err.error); + if (err.covered_by_poller) { + gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, -1); + } + GRPC_ERROR_UNREF(err.error); GPR_TIMER_END("combiner.exec1", 0); } else { - if (lock->take_async_break_before_final_list) { - GPR_TIMER_MARK("async_break", 0); - GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p take async break", lock)); - lock->take_async_break_before_final_list = false; - if (lock->optional_workqueue != NULL) { - queue_offload(exec_ctx, lock); - } - GPR_TIMER_END("combiner.continue_exec_ctx", 0); - return true; - } else { - grpc_closure *c = lock->final_list.head; - GPR_ASSERT(c != NULL); - grpc_closure_list_init(&lock->final_list); - lock->take_async_break_before_final_list = false; - int loops = 0; - while (c != NULL) { - GPR_TIMER_BEGIN("combiner.exec_1final", 0); - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c)); - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error; - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; - GPR_TIMER_END("combiner.exec_1final", 0); - } + grpc_closure *c = lock->final_list.head; + GPR_ASSERT(c != NULL); + grpc_closure_list_init(&lock->final_list); + int loops = 0; + while (c != NULL) { + GPR_TIMER_BEGIN("combiner.exec_1final", 0); + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c)); + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + GPR_TIMER_END("combiner.exec_1final", 0); } } @@ -253,35 +267,27 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, grpc_error *error) { grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, - GRPC_ERROR_REF(error), false); + GRPC_ERROR_REF(error)); } void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error, - bool force_async_break) { - GRPC_COMBINER_TRACE(gpr_log( - GPR_DEBUG, - "C:%p grpc_combiner_execute_finally c=%p force_async_break=%d; ac=%p", - lock, closure, force_async_break, exec_ctx->active_combiner)); + grpc_closure *closure, grpc_error *error) { + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, + "C:%p grpc_combiner_execute_finally c=%p; ac=%p", + lock, closure, exec_ctx->active_combiner)); GPR_TIMER_BEGIN("combiner.execute_finally", 0); if (exec_ctx->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); grpc_combiner_execute(exec_ctx, lock, - grpc_closure_create(enqueue_finally, closure), error); + grpc_closure_create(enqueue_finally, closure), error, + false); GPR_TIMER_END("combiner.execute_finally", 0); return; } - if (force_async_break) { - lock->take_async_break_before_final_list = true; - } if (grpc_closure_list_empty(lock->final_list)) { gpr_atm_full_fetch_add(&lock->state, 2); } grpc_closure_list_append(&lock->final_list, closure, error); GPR_TIMER_END("combiner.execute_finally", 0); } - -void grpc_combiner_force_async_finally(grpc_combiner *lock) { - lock->take_async_break_before_final_list = true; -} |