aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/exec_ctx.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/exec_ctx.c')
-rw-r--r--src/core/lib/iomgr/exec_ctx.c61
1 files changed, 50 insertions, 11 deletions
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index ac7785ec13..604713e578 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -37,6 +37,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
@@ -60,18 +61,43 @@ bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
- while (!grpc_closure_list_empty(exec_ctx->closure_list)) {
- grpc_closure *c = exec_ctx->closure_list.head;
- exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error;
- did_something = true;
- GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0);
+ for (;;) {
+ if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
+ grpc_closure *c = exec_ctx->closure_list.head;
+ exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ did_something = true;
+ grpc_closure_run(exec_ctx, c, c->error_data.error);
+ c = next;
+ }
+ } else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) {
+ break;
+ }
+ }
+ GPR_ASSERT(exec_ctx->active_combiner == NULL);
+ if (exec_ctx->stealing_from_workqueue != NULL) {
+ if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
+ grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue,
+ exec_ctx->stolen_closure,
+ exec_ctx->stolen_closure->error_data.error);
+ GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
+ "exec_ctx_sched");
+ exec_ctx->stealing_from_workqueue = NULL;
+ exec_ctx->stolen_closure = NULL;
+ } else {
+ grpc_closure *c = exec_ctx->stolen_closure;
+ GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
+ "exec_ctx_sched");
+ exec_ctx->stealing_from_workqueue = NULL;
+ exec_ctx->stolen_closure = NULL;
+ grpc_error *error = c->error_data.error;
+ GPR_TIMER_BEGIN("grpc_exec_ctx_flush.stolen_cb", 0);
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
- GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0);
- c = next;
+ GPR_TIMER_END("grpc_exec_ctx_flush.stolen_cb", 0);
+ grpc_exec_ctx_flush(exec_ctx);
+ did_something = true;
}
}
GPR_TIMER_END("grpc_exec_ctx_flush", 0);
@@ -86,12 +112,25 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error,
grpc_workqueue *offload_target_or_null) {
+ GPR_TIMER_BEGIN("grpc_exec_ctx_sched", 0);
if (offload_target_or_null == NULL) {
grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
- } else {
+ } else if (exec_ctx->stealing_from_workqueue == NULL) {
+ exec_ctx->stealing_from_workqueue = offload_target_or_null;
+ closure->error_data.error = error;
+ exec_ctx->stolen_closure = closure;
+ } else if (exec_ctx->stealing_from_workqueue != offload_target_or_null) {
grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error);
GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
+ } else { /* stealing_from_workqueue == offload_target_or_null */
+ grpc_workqueue_enqueue(exec_ctx, offload_target_or_null,
+ exec_ctx->stolen_closure,
+ exec_ctx->stolen_closure->error_data.error);
+ closure->error_data.error = error;
+ exec_ctx->stolen_closure = closure;
+ GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
}
+ GPR_TIMER_END("grpc_exec_ctx_sched", 0);
}
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,