diff options
author | 2016-09-07 13:02:05 -0700 | |
---|---|---|
committer | 2016-09-07 13:02:05 -0700 | |
commit | 09b05fd3fd91a473c42b99cc9636c1634eeb327e (patch) | |
tree | 718241fa4db1bb16a8c7fc41d3ac407e042dbdfa /src/core/lib/iomgr/combiner.c | |
parent | a81dac2888d4cb7b766a976aa5084a4ca797d21e (diff) |
Get write batching working again
Diffstat (limited to 'src/core/lib/iomgr/combiner.c')
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 21 |
1 files changed, 15 insertions, 6 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 721db6337e..b2d6559751 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -62,6 +62,7 @@ struct grpc_combiner { // offload safely gpr_atm covered_by_poller; bool time_to_execute_final_list; + bool final_list_covered_by_poller; grpc_closure_list final_list; grpc_closure offload; }; @@ -81,6 +82,11 @@ static error_data unpack_error_data(uintptr_t p) { return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1}; } +static bool is_covered_by_poller(grpc_combiner *lock) { + return lock->final_list_covered_by_poller || + gpr_atm_acq_load(&lock->covered_by_poller) > 0; +} + grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); lock->next_combiner_on_this_exec_ctx = NULL; @@ -183,8 +189,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { } if (lock->optional_workqueue != NULL && - grpc_exec_ctx_ready_to_finish(exec_ctx) && - gpr_atm_acq_load(&lock->covered_by_poller) > 0) { + grpc_exec_ctx_ready_to_finish(exec_ctx) && is_covered_by_poller(lock)) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on, and we have a workqueue (and // so can help the execution context out): schedule remaining work to be @@ -205,8 +210,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { // queue is in an inconsistant state: use this as a cue that we should // go off and do something else for a while (and come back later) GPR_TIMER_MARK("delay_busy", 0); - if (lock->optional_workqueue != NULL && - gpr_atm_acq_load(&lock->covered_by_poller) > 0) { + if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) { queue_offload(exec_ctx, lock); } GPR_TIMER_END("combiner.continue_exec_ctx", 0); @@ -225,6 +229,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { grpc_closure *c = lock->final_list.head; GPR_ASSERT(c != NULL); grpc_closure_list_init(&lock->final_list); + lock->final_list_covered_by_poller = false; int loops = 0; while (c != NULL) { GPR_TIMER_BEGIN("combiner.exec_1final", 0); @@ -277,11 +282,12 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, grpc_error *error) { grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, - GRPC_ERROR_REF(error)); + GRPC_ERROR_REF(error), false); } void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error) { + grpc_closure *closure, grpc_error *error, + bool covered_by_poller) { GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", lock, closure, exec_ctx->active_combiner)); @@ -298,6 +304,9 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, if (grpc_closure_list_empty(lock->final_list)) { gpr_atm_full_fetch_add(&lock->state, 2); } + if (covered_by_poller) { + lock->final_list_covered_by_poller = true; + } grpc_closure_list_append(&lock->final_list, closure, error); GPR_TIMER_END("combiner.execute_finally", 0); } |