diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_linux.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.c | 26 |
3 files changed, 26 insertions, 8 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index ac94d2e634..e8daa1f52c 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -1420,7 +1420,9 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, workqueue_maybe_wakeup(pi); } grpc_closure *c = (grpc_closure *)n; - grpc_closure_run(exec_ctx, c, c->error_data.error); + grpc_error *error = c->error_data.error; + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); return true; } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) { /* n == NULL might mean there's work but it's not available to be popped diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index c243bc803f..6aa788f8e5 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -66,8 +66,10 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; while (c != NULL) { grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; did_something = true; - grpc_closure_run(exec_ctx, c, c->error_data.error); + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); c = next; } } else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) { diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 5f2a789e30..1342a28d8d 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -77,10 +77,18 @@ static void closure_exec_thread_func(void *ignored) { gpr_mu_unlock(&g_executor.mu); break; } else { - grpc_closure_list_sched(&exec_ctx, &g_executor.closures); + grpc_closure *c = g_executor.closures.head; + grpc_closure_list_init(&g_executor.closures); + gpr_mu_unlock(&g_executor.mu); + while (c != NULL) { + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; + c->cb(&exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + } + grpc_exec_ctx_flush(&exec_ctx); } - gpr_mu_unlock(&g_executor.mu); - grpc_exec_ctx_flush(&exec_ctx); } grpc_exec_ctx_finish(&exec_ctx); } @@ -115,8 +123,6 @@ static void maybe_spawn_locked() { static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error) { gpr_mu_lock(&g_executor.mu); - GPR_ASSERT(closure->scheduler == grpc_executor_scheduler); - closure->scheduler = grpc_schedule_on_exec_ctx; if (g_executor.shutting_down == 0) { grpc_closure_list_append(&g_executor.closures, closure, error); maybe_spawn_locked(); @@ -136,7 +142,15 @@ void grpc_executor_shutdown() { * list below because we aren't accepting new work */ /* Execute pending callbacks, some may be performing cleanups */ - grpc_closure_list_sched(&exec_ctx, &g_executor.closures); + grpc_closure *c = g_executor.closures.head; + grpc_closure_list_init(&g_executor.closures); + while (c != NULL) { + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; + c->cb(&exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + } grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(grpc_closure_list_empty(g_executor.closures)); if (pending_join) { |