aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/combiner.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-09-07 13:02:05 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-09-07 13:02:05 -0700
commit09b05fd3fd91a473c42b99cc9636c1634eeb327e (patch)
tree718241fa4db1bb16a8c7fc41d3ac407e042dbdfa /src/core/lib/iomgr/combiner.c
parenta81dac2888d4cb7b766a976aa5084a4ca797d21e (diff)
Get write batching working again
Diffstat (limited to 'src/core/lib/iomgr/combiner.c')
-rw-r--r--src/core/lib/iomgr/combiner.c21
1 files changed, 15 insertions, 6 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 721db6337e..b2d6559751 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -62,6 +62,7 @@ struct grpc_combiner {
// offload safely
gpr_atm covered_by_poller;
bool time_to_execute_final_list;
+ bool final_list_covered_by_poller;
grpc_closure_list final_list;
grpc_closure offload;
};
@@ -81,6 +82,11 @@ static error_data unpack_error_data(uintptr_t p) {
return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1};
}
+static bool is_covered_by_poller(grpc_combiner *lock) {
+ return lock->final_list_covered_by_poller ||
+ gpr_atm_acq_load(&lock->covered_by_poller) > 0;
+}
+
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
lock->next_combiner_on_this_exec_ctx = NULL;
@@ -183,8 +189,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
}
if (lock->optional_workqueue != NULL &&
- grpc_exec_ctx_ready_to_finish(exec_ctx) &&
- gpr_atm_acq_load(&lock->covered_by_poller) > 0) {
+ grpc_exec_ctx_ready_to_finish(exec_ctx) && is_covered_by_poller(lock)) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
// this execution context wants to move on, and we have a workqueue (and
// so can help the execution context out): schedule remaining work to be
@@ -205,8 +210,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
// queue is in an inconsistant state: use this as a cue that we should
// go off and do something else for a while (and come back later)
GPR_TIMER_MARK("delay_busy", 0);
- if (lock->optional_workqueue != NULL &&
- gpr_atm_acq_load(&lock->covered_by_poller) > 0) {
+ if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) {
queue_offload(exec_ctx, lock);
}
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
@@ -225,6 +229,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
grpc_closure *c = lock->final_list.head;
GPR_ASSERT(c != NULL);
grpc_closure_list_init(&lock->final_list);
+ lock->final_list_covered_by_poller = false;
int loops = 0;
while (c != NULL) {
GPR_TIMER_BEGIN("combiner.exec_1final", 0);
@@ -277,11 +282,12 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
grpc_error *error) {
grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
- GRPC_ERROR_REF(error));
+ GRPC_ERROR_REF(error), false);
}
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error) {
+ grpc_closure *closure, grpc_error *error,
+ bool covered_by_poller) {
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
"C:%p grpc_combiner_execute_finally c=%p; ac=%p",
lock, closure, exec_ctx->active_combiner));
@@ -298,6 +304,9 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
if (grpc_closure_list_empty(lock->final_list)) {
gpr_atm_full_fetch_add(&lock->state, 2);
}
+ if (covered_by_poller) {
+ lock->final_list_covered_by_poller = true;
+ }
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
}