aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/combiner.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-08-31 12:59:24 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-08-31 12:59:24 -0700
commita7cd41cc46bf60aed17dea8ea2d3787814c45475 (patch)
treedc1b2f991fcbac444056df0aab9d1b8d8a8b8881 /src/core/lib/iomgr/combiner.c
parentd58daa2b2f577dbf6c629e9914f405df223aac8d (diff)
Note polling coverage when taking combiner locks: resolves offload issues
Diffstat (limited to 'src/core/lib/iomgr/combiner.c')
-rw-r--r--src/core/lib/iomgr/combiner.c112
1 files changed, 58 insertions, 54 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index f1a2b29519..40be4dea7b 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -58,7 +58,9 @@ struct grpc_combiner {
// lower bit - zero if orphaned
// other bits - number of items queued on the lock
gpr_atm state;
- bool take_async_break_before_final_list;
+ // number of elements in the list that are covered by a poller: if >0, we can
+ // offload safely
+ gpr_atm covered_by_poller;
bool time_to_execute_final_list;
grpc_closure_list final_list;
grpc_closure offload;
@@ -66,14 +68,27 @@ struct grpc_combiner {
static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
+typedef struct {
+ grpc_error *error;
+ bool covered_by_poller;
+} error_data;
+
+static uintptr_t pack_error_data(error_data d) {
+ return ((uintptr_t)d.error) | (d.covered_by_poller ? 1 : 0);
+}
+
+static error_data unpack_error_data(uintptr_t p) {
+ return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1};
+}
+
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
lock->next_combiner_on_this_exec_ctx = NULL;
lock->time_to_execute_final_list = false;
lock->optional_workqueue = optional_workqueue;
gpr_atm_no_barrier_store(&lock->state, 1);
+ gpr_atm_no_barrier_store(&lock->covered_by_poller, 0);
gpr_mpscq_init(&lock->queue);
- lock->take_async_break_before_final_list = false;
grpc_closure_list_init(&lock->final_list);
grpc_closure_init(&lock->offload, offload, lock);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
@@ -108,13 +123,18 @@ static void queue_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
}
void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *cl, grpc_error *error) {
+ grpc_closure *cl, grpc_error *error,
+ bool covered_by_poller) {
GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl));
+ gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d", lock, cl, covered_by_poller));
GPR_TIMER_BEGIN("combiner.execute", 0);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
GPR_ASSERT(last & 1); // ensure lock has not been destroyed
- cl->error = error;
+ cl->error_data.scratch =
+ pack_error_data((error_data){error, covered_by_poller});
+ if (covered_by_poller) {
+ gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, 1);
+ }
gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
if (last == 1) {
// code will be written when the exec_ctx calls
@@ -152,11 +172,12 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
}
if (lock->optional_workqueue != NULL &&
- grpc_exec_ctx_ready_to_finish(exec_ctx)) {
+ grpc_exec_ctx_ready_to_finish(exec_ctx) &&
+ gpr_atm_acq_load(&lock->covered_by_poller) > 0) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
- // this execution context wants to move on, and we have a workqueue (and so
- // can help the execution context out): schedule remaining work to be picked
- // up on the workqueue
+ // this execution context wants to move on, and we have a workqueue (and
+ // so can help the execution context out): schedule remaining work to be
+ // picked up on the workqueue
queue_offload(exec_ctx, lock);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
@@ -173,7 +194,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
// queue is in an inconsistant state: use this as a cue that we should
// go off and do something else for a while (and come back later)
GPR_TIMER_MARK("delay_busy", 0);
- if (lock->optional_workqueue != NULL) {
+ if (lock->optional_workqueue != NULL && gpr_atm_acq_load(&lock->covered_by_poller) > 0) {
queue_offload(exec_ctx, lock);
}
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
@@ -181,37 +202,28 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
}
GPR_TIMER_BEGIN("combiner.exec1", 0);
grpc_closure *cl = (grpc_closure *)n;
- grpc_error *error = cl->error;
- cl->cb(exec_ctx, cl->cb_arg, error);
- GRPC_ERROR_UNREF(error);
+ error_data err = unpack_error_data(cl->error_data.scratch);
+ cl->cb(exec_ctx, cl->cb_arg, err.error);
+ if (err.covered_by_poller) {
+ gpr_atm_no_barrier_fetch_add(&lock->covered_by_poller, -1);
+ }
+ GRPC_ERROR_UNREF(err.error);
GPR_TIMER_END("combiner.exec1", 0);
} else {
- if (lock->take_async_break_before_final_list) {
- GPR_TIMER_MARK("async_break", 0);
- GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p take async break", lock));
- lock->take_async_break_before_final_list = false;
- if (lock->optional_workqueue != NULL) {
- queue_offload(exec_ctx, lock);
- }
- GPR_TIMER_END("combiner.continue_exec_ctx", 0);
- return true;
- } else {
- grpc_closure *c = lock->final_list.head;
- GPR_ASSERT(c != NULL);
- grpc_closure_list_init(&lock->final_list);
- lock->take_async_break_before_final_list = false;
- int loops = 0;
- while (c != NULL) {
- GPR_TIMER_BEGIN("combiner.exec_1final", 0);
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c));
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error;
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
- GPR_TIMER_END("combiner.exec_1final", 0);
- }
+ grpc_closure *c = lock->final_list.head;
+ GPR_ASSERT(c != NULL);
+ grpc_closure_list_init(&lock->final_list);
+ int loops = 0;
+ while (c != NULL) {
+ GPR_TIMER_BEGIN("combiner.exec_1final", 0);
+ GRPC_COMBINER_TRACE(
+ gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c));
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ GPR_TIMER_END("combiner.exec_1final", 0);
}
}
@@ -250,35 +262,27 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
grpc_error *error) {
grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
- GRPC_ERROR_REF(error), false);
+ GRPC_ERROR_REF(error));
}
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error,
- bool force_async_break) {
- GRPC_COMBINER_TRACE(gpr_log(
- GPR_DEBUG,
- "C:%p grpc_combiner_execute_finally c=%p force_async_break=%d; ac=%p",
- lock, closure, force_async_break, exec_ctx->active_combiner));
+ grpc_closure *closure, grpc_error *error) {
+ GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
+ "C:%p grpc_combiner_execute_finally c=%p; ac=%p",
+ lock, closure, exec_ctx->active_combiner));
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
grpc_combiner_execute(exec_ctx, lock,
- grpc_closure_create(enqueue_finally, closure), error);
+ grpc_closure_create(enqueue_finally, closure), error,
+ false);
GPR_TIMER_END("combiner.execute_finally", 0);
return;
}
- if (force_async_break) {
- lock->take_async_break_before_final_list = true;
- }
if (grpc_closure_list_empty(lock->final_list)) {
gpr_atm_full_fetch_add(&lock->state, 2);
}
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
}
-
-void grpc_combiner_force_async_finally(grpc_combiner *lock) {
- lock->take_async_break_before_final_list = true;
-}