diff options
author | 2017-05-01 20:57:59 +0000 | |
---|---|---|
committer | 2017-05-01 20:57:59 +0000 | |
commit | 67e229e5c124efa826ef15e8a7e16634786fabeb (patch) | |
tree | 9e0218adb3e3499082529b58e7daa6a292b39624 /src/core/lib/iomgr/ev_epoll1_linux.c | |
parent | 50da5ec21d3d8be5e76b9809242821f9e5badba1 (diff) |
Fix wakeup path
Diffstat (limited to 'src/core/lib/iomgr/ev_epoll1_linux.c')
-rw-r--r-- | src/core/lib/iomgr/ev_epoll1_linux.c | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index cce52b2d94..8766e4a1aa 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -318,7 +318,8 @@ GPR_TLS_DECL(g_current_thread_worker); static gpr_atm g_active_poller; static pollset_neighbourhood *g_neighbourhoods; static size_t g_num_neighbourhoods; -static gpr_mpscq g_workqueue_items; +static gpr_mu g_wq_mu; +static grpc_closure_list g_wq_items; /* Return true if first in list */ static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) { @@ -367,7 +368,8 @@ static grpc_error *pollset_global_init(void) { gpr_atm_no_barrier_store(&g_active_poller, 0); global_wakeup_fd.read_fd = -1; grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd); - gpr_mpscq_init(&g_workqueue_items); + gpr_mu_init(&g_wq_mu); + g_wq_items = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; if (err != GRPC_ERROR_NONE) return err; struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET), .data.ptr = &global_wakeup_fd}; @@ -386,7 +388,7 @@ static grpc_error *pollset_global_init(void) { static void pollset_global_shutdown(void) { gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_worker); - gpr_mpscq_destroy(&g_workqueue_items); + gpr_mu_destroy(&g_wq_mu); if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd); for (size_t i = 0; i < g_num_neighbourhoods; i++) { gpr_mu_destroy(&g_neighbourhoods[i].mu); @@ -513,6 +515,9 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (gpr_atm_no_barrier_cas(&g_timer_kick, 1, 0)) { grpc_timer_consume_kick(); } + gpr_mu_lock(&g_wq_mu); + grpc_closure_list_move(&g_wq_items, &exec_ctx->closure_list); + gpr_mu_unlock(&g_wq_mu); append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); } else { @@ -858,8 +863,9 @@ static void wq_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, } } if (!scheduled) { - closure->error_data.error = error; - gpr_mpscq_push(&g_workqueue_items, &closure->next_data.atm_next); + gpr_mu_lock(&g_wq_mu); + grpc_closure_list_append(&g_wq_items, closure, error); + gpr_mu_unlock(&g_wq_mu); GRPC_LOG_IF_ERROR("workqueue_scheduler", grpc_wakeup_fd_wakeup(&global_wakeup_fd)); } |