diff options
Diffstat (limited to 'src/core/lib/iomgr/executor.c')
-rw-r--r-- | src/core/lib/iomgr/executor.c | 37 |
1 files changed, 29 insertions, 8 deletions
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 8d7535d6fe..852775564f 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -77,10 +77,18 @@ static void closure_exec_thread_func(void *ignored) { gpr_mu_unlock(&g_executor.mu); break; } else { - grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL); + grpc_closure *c = g_executor.closures.head; + grpc_closure_list_init(&g_executor.closures); + gpr_mu_unlock(&g_executor.mu); + while (c != NULL) { + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; + c->cb(&exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + } + grpc_exec_ctx_flush(&exec_ctx); } - gpr_mu_unlock(&g_executor.mu); - grpc_exec_ctx_flush(&exec_ctx); } grpc_exec_ctx_finish(&exec_ctx); } @@ -112,7 +120,8 @@ static void maybe_spawn_locked() { g_executor.pending_join = 1; } -void grpc_executor_push(grpc_closure *closure, grpc_error *error) { +static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { gpr_mu_lock(&g_executor.mu); if (g_executor.shutting_down == 0) { grpc_closure_list_append(&g_executor.closures, closure, error); @@ -121,9 +130,8 @@ void grpc_executor_push(grpc_closure *closure, grpc_error *error) { gpr_mu_unlock(&g_executor.mu); } -void grpc_executor_shutdown() { +void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) { int pending_join; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(&g_executor.mu); pending_join = g_executor.pending_join; @@ -133,11 +141,24 @@ void grpc_executor_shutdown() { * list below because we aren't accepting new work */ /* Execute pending callbacks, some may be performing cleanups */ - grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL); - grpc_exec_ctx_finish(&exec_ctx); + grpc_closure *c = g_executor.closures.head; + grpc_closure_list_init(&g_executor.closures); + while (c != NULL) { + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + } + grpc_exec_ctx_flush(exec_ctx); GPR_ASSERT(grpc_closure_list_empty(g_executor.closures)); if (pending_join) { gpr_thd_join(g_executor.tid); } gpr_mu_destroy(&g_executor.mu); } + +static const grpc_closure_scheduler_vtable executor_vtable = {executor_push, + executor_push}; +static grpc_closure_scheduler executor_scheduler = {&executor_vtable}; +grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler; |