aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c4
-rw-r--r--src/core/lib/iomgr/exec_ctx.c4
-rw-r--r--src/core/lib/iomgr/executor.c26
3 files changed, 26 insertions, 8 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index ac94d2e634..e8daa1f52c 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -1420,7 +1420,9 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
workqueue_maybe_wakeup(pi);
}
grpc_closure *c = (grpc_closure *)n;
- grpc_closure_run(exec_ctx, c, c->error_data.error);
+ grpc_error *error = c->error_data.error;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
return true;
} else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
/* n == NULL might mean there's work but it's not available to be popped
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index c243bc803f..6aa788f8e5 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -66,8 +66,10 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
while (c != NULL) {
grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
did_something = true;
- grpc_closure_run(exec_ctx, c, c->error_data.error);
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
c = next;
}
} else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) {
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 5f2a789e30..1342a28d8d 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -77,10 +77,18 @@ static void closure_exec_thread_func(void *ignored) {
gpr_mu_unlock(&g_executor.mu);
break;
} else {
- grpc_closure_list_sched(&exec_ctx, &g_executor.closures);
+ grpc_closure *c = g_executor.closures.head;
+ grpc_closure_list_init(&g_executor.closures);
+ gpr_mu_unlock(&g_executor.mu);
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ c->cb(&exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ }
+ grpc_exec_ctx_flush(&exec_ctx);
}
- gpr_mu_unlock(&g_executor.mu);
- grpc_exec_ctx_flush(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -115,8 +123,6 @@ static void maybe_spawn_locked() {
static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
gpr_mu_lock(&g_executor.mu);
- GPR_ASSERT(closure->scheduler == grpc_executor_scheduler);
- closure->scheduler = grpc_schedule_on_exec_ctx;
if (g_executor.shutting_down == 0) {
grpc_closure_list_append(&g_executor.closures, closure, error);
maybe_spawn_locked();
@@ -136,7 +142,15 @@ void grpc_executor_shutdown() {
* list below because we aren't accepting new work */
/* Execute pending callbacks, some may be performing cleanups */
- grpc_closure_list_sched(&exec_ctx, &g_executor.closures);
+ grpc_closure *c = g_executor.closures.head;
+ grpc_closure_list_init(&g_executor.closures);
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ c->cb(&exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ }
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
if (pending_join) {