diff options
author | Craig Tiller <ctiller@google.com> | 2017-05-01 13:51:14 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-05-01 13:51:14 -0700 |
commit | 50da5ec21d3d8be5e76b9809242821f9e5badba1 (patch) | |
tree | 3d4b984e3430a920f1d810529300f8a04dabbdf5 /src | |
parent | a95bacf7dba3d5ffc8968729525a4cd5ff0ff88c (diff) |
Add workqueue
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/iomgr/ev_epoll1_linux.c | 84 | ||||
-rw-r--r-- | src/core/lib/support/mpscq.c | 13 | ||||
-rw-r--r-- | src/core/lib/support/mpscq.h | 4 |
3 files changed, 80 insertions, 21 deletions
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 29254e6324..cce52b2d94 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -104,6 +104,7 @@ struct grpc_pollset_worker { grpc_pollset_worker *next; grpc_pollset_worker *prev; gpr_cv cv; + grpc_closure_list schedule_on_end_work; }; #define MAX_NEIGHBOURHOODS 1024 @@ -288,7 +289,7 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { - return NULL; /* TODO(ctiller): add a global workqueue */ + return (grpc_workqueue *)0xb0b51ed; } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, @@ -317,6 +318,7 @@ GPR_TLS_DECL(g_current_thread_worker); static gpr_atm g_active_poller; static pollset_neighbourhood *g_neighbourhoods; static size_t g_num_neighbourhoods; +static gpr_mpscq g_workqueue_items; /* Return true if first in list */ static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) { @@ -365,6 +367,7 @@ static grpc_error *pollset_global_init(void) { gpr_atm_no_barrier_store(&g_active_poller, 0); global_wakeup_fd.read_fd = -1; grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd); + gpr_mpscq_init(&g_workqueue_items); if (err != GRPC_ERROR_NONE) return err; struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET), .data.ptr = &global_wakeup_fd}; @@ -383,6 +386,7 @@ static grpc_error *pollset_global_init(void) { static void pollset_global_shutdown(void) { gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_worker); + gpr_mpscq_destroy(&g_workqueue_items); if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd); for (size_t i = 0; i < g_num_neighbourhoods; i++) { gpr_mu_destroy(&g_neighbourhoods[i].mu); @@ -528,30 +532,13 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, return error; } -#if 0 -static void verify_all_entries_in_neighbourhood_list( - grpc_pollset *root, bool should_be_seen_inactive) { - if (root == NULL) return; - grpc_pollset *p = root; - do { - GPR_ASSERT(p->seen_inactive == should_be_seen_inactive); - p = p->next; - } while (p != root); -} - -static void verify_neighbourhood_lists(pollset_neighbourhood *neighbourhood) { - // assumes neighbourhood->mu locked - verify_all_entries_in_neighbourhood_list(neighbourhood->active_root, false); - verify_all_entries_in_neighbourhood_list(neighbourhood->inactive_root, true); -} -#endif - static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl, gpr_timespec *now, gpr_timespec deadline) { if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; worker->kick_state = UNKICKED; + worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; pollset->begin_refs++; if (pollset->seen_inactive) { @@ -669,6 +656,8 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl) { if (worker_hdl != NULL) *worker_hdl = NULL; worker->kick_state = KICKED; + grpc_closure_list_move(&worker->schedule_on_end_work, + &exec_ctx->closure_list); if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) { if (worker->next != worker && worker->next->kick_state == UNKICKED) { GPR_ASSERT(worker->next->initialized_cv); @@ -712,6 +701,10 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->mu); } + } else if (grpc_exec_ctx_has_work(exec_ctx)) { + gpr_mu_unlock(&pollset->mu); + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&pollset->mu); } if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); @@ -828,8 +821,59 @@ static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} #endif +static void wq_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + // find a neighbourhood to wakeup + bool scheduled = false; + size_t initial_neighbourhood = choose_neighbourhood(); + for (size_t i = 0; !scheduled && i < g_num_neighbourhoods; i++) { + pollset_neighbourhood *neighbourhood = + &g_neighbourhoods[(initial_neighbourhood + i) % g_num_neighbourhoods]; + if (gpr_mu_trylock(&neighbourhood->mu)) { + if (neighbourhood->active_root != NULL) { + grpc_pollset *inspect = neighbourhood->active_root; + do { + if (gpr_mu_trylock(&inspect->mu)) { + if (inspect->root_worker != NULL) { + grpc_pollset_worker *inspect_worker = inspect->root_worker; + do { + if (inspect_worker->kick_state == UNKICKED) { + inspect_worker->kick_state = KICKED; + grpc_closure_list_append( + &inspect_worker->schedule_on_end_work, closure, error); + if (inspect_worker->initialized_cv) { + gpr_cv_signal(&inspect_worker->cv); + } + scheduled = true; + } + inspect_worker = inspect_worker->next; + } while (!scheduled && inspect_worker != inspect->root_worker); + } + gpr_mu_unlock(&inspect->mu); + } + inspect = inspect->next; + } while (!scheduled && inspect != neighbourhood->active_root); + } + gpr_mu_unlock(&neighbourhood->mu); + } + } + if (!scheduled) { + closure->error_data.error = error; + gpr_mpscq_push(&g_workqueue_items, &closure->next_data.atm_next); + GRPC_LOG_IF_ERROR("workqueue_scheduler", + grpc_wakeup_fd_wakeup(&global_wakeup_fd)); + } +} + +static const grpc_closure_scheduler_vtable + singleton_workqueue_scheduler_vtable = {wq_sched, wq_sched, + "epoll1_workqueue"}; + +static grpc_closure_scheduler singleton_workqueue_scheduler = { + &singleton_workqueue_scheduler_vtable}; + static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - return grpc_schedule_on_exec_ctx; + return &singleton_workqueue_scheduler; } /******************************************************************************* diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c index 5b9323275a..1015cc6776 100644 --- a/src/core/lib/support/mpscq.c +++ b/src/core/lib/support/mpscq.c @@ -54,21 +54,31 @@ void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { } gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { + bool empty; + return gpr_mpscq_pop_and_check_end(q, &empty); +} + +gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty) { gpr_mpscq_node *tail = q->tail; gpr_mpscq_node *next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next); if (tail == &q->stub) { // indicates the list is actually (ephemerally) empty - if (next == NULL) return NULL; + if (next == NULL) { + *empty = true; + return NULL; + } q->tail = next; tail = next; next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next); } if (next != NULL) { + *empty = false; q->tail = next; return tail; } gpr_mpscq_node *head = (gpr_mpscq_node *)gpr_atm_acq_load(&q->head); if (tail != head) { + *empty = false; // indicates a retry is in order: we're still adding return NULL; } @@ -79,5 +89,6 @@ gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { return tail; } // indicates a retry is in order: we're still adding + *empty = false; return NULL; } diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h index 977a117952..24c89f90c9 100644 --- a/src/core/lib/support/mpscq.h +++ b/src/core/lib/support/mpscq.h @@ -35,6 +35,7 @@ #define GRPC_CORE_LIB_SUPPORT_MPSCQ_H #include <grpc/support/atm.h> +#include <stdbool.h> #include <stddef.h> // Multiple-producer single-consumer lock free queue, based upon the @@ -62,4 +63,7 @@ void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); // the queue is empty!!) gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q); +// Pop a node; sets *empty to true if the queue is empty, or false if it is not +gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty); + #endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */ |