aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/exec_ctx.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/exec_ctx.c')
-rw-r--r--src/core/lib/iomgr/exec_ctx.c33
1 files changed, 21 insertions, 12 deletions
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 12e51ac092..747b462a6e 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -37,6 +37,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
@@ -60,20 +61,28 @@ bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
- while (!grpc_closure_list_empty(exec_ctx->closure_list)) {
- grpc_closure *c = exec_ctx->closure_list.head;
- exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error;
- did_something = true;
- GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0);
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0);
- c = next;
+ for (;;) {
+ if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
+ grpc_closure *c = exec_ctx->closure_list.head;
+ exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error;
+ did_something = true;
+ GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0);
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0);
+ c = next;
+ }
+ continue;
+ }
+ if (grpc_combiner_continue_exec_ctx(exec_ctx)) {
+ continue;
}
+ break;
}
+ GPR_ASSERT(exec_ctx->active_combiner == NULL);
if (exec_ctx->stealing_from_workqueue != NULL) {
if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue,