diff options
Diffstat (limited to 'src/core/lib/iomgr/workqueue_posix.c')
-rw-r--r-- | src/core/lib/iomgr/workqueue_posix.c | 97 |
1 files changed, 26 insertions, 71 deletions
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index ecfea68f56..e0d6dac230 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -44,7 +44,6 @@ #include <grpc/support/useful.h> #include "src/core/lib/iomgr/ev_posix.h" -#include "src/core/lib/profiling/timers.h" static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); @@ -53,7 +52,8 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, char name[32]; *workqueue = gpr_malloc(sizeof(grpc_workqueue)); gpr_ref_init(&(*workqueue)->refs, 1); - gpr_atm_no_barrier_store(&(*workqueue)->state, 1); + gpr_mu_init(&(*workqueue)->mu); + (*workqueue)->closure_list.head = (*workqueue)->closure_list.tail = NULL; grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd); if (err != GRPC_ERROR_NONE) { gpr_free(*workqueue); @@ -62,7 +62,6 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, sprintf(name, "workqueue:%p", (void *)(*workqueue)); (*workqueue)->wakeup_read_fd = grpc_fd_create( GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name); - gpr_mpscq_init(&(*workqueue)->queue); grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue); grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd, &(*workqueue)->read_closure); @@ -71,79 +70,57 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, static void workqueue_destroy(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd); } -static void workqueue_orphan(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - if (gpr_atm_full_fetch_add(&workqueue->state, -1) == 1) { - workqueue_destroy(exec_ctx, workqueue); - } -} - #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, const char *reason) { - if (workqueue == NULL) return; gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p ref %d -> %d %s", workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1, reason); - gpr_ref(&workqueue->refs); -} #else void grpc_workqueue_ref(grpc_workqueue *workqueue) { - if (workqueue == NULL) return; +#endif gpr_ref(&workqueue->refs); } -#endif #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, const char *file, int line, const char *reason) { - if (workqueue == NULL) return; gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s", workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1, reason); - if (gpr_unref(&workqueue->refs)) { - workqueue_orphan(exec_ctx, workqueue); - } -} #else void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - if (workqueue == NULL) return; +#endif if (gpr_unref(&workqueue->refs)) { - workqueue_orphan(exec_ctx, workqueue); + workqueue_destroy(exec_ctx, workqueue); } } -#endif - -static void drain(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - abort(); -} -static void wakeup(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - GPR_TIMER_MARK("workqueue.wakeup", 0); - grpc_error *err = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); - if (!GRPC_LOG_IF_ERROR("wakeupfd_wakeup", err)) { - drain(exec_ctx, workqueue); - } +void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { + gpr_mu_lock(&workqueue->mu); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); + gpr_mu_unlock(&workqueue->mu); } static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.on_readable", 0); - grpc_workqueue *workqueue = arg; if (error != GRPC_ERROR_NONE) { + gpr_mu_destroy(&workqueue->mu); /* HACK: let wakeup_fd code know that we stole the fd */ workqueue->wakeup_fd.read_fd = 0; grpc_wakeup_fd_destroy(&workqueue->wakeup_fd); grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, NULL, "destroy"); - GPR_ASSERT(gpr_atm_no_barrier_load(&workqueue->state) == 0); gpr_free(workqueue); } else { + gpr_mu_lock(&workqueue->mu); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); - gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue); + gpr_mu_unlock(&workqueue->mu); if (error == GRPC_ERROR_NONE) { grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, &workqueue->read_closure); @@ -151,46 +128,24 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* recurse to get error handling */ on_readable(exec_ctx, arg, error); } - if (n == NULL) { - /* try again - queue in an inconsistant state */ - wakeup(exec_ctx, workqueue); - } else { - switch (gpr_atm_full_fetch_add(&workqueue->state, -2)) { - case 3: // had one count, one unorphaned --> done, unorphaned - break; - case 2: // had one count, one orphaned --> done, orphaned - workqueue_destroy(exec_ctx, workqueue); - break; - case 1: - case 0: - // these values are illegal - representing an already done or - // deleted workqueue - GPR_UNREACHABLE_CODE(break); - default: - // schedule a wakeup since there's more to do - wakeup(exec_ctx, workqueue); - } - grpc_closure *cl = (grpc_closure *)n; - grpc_error *clerr = cl->error; - cl->cb(exec_ctx, cl->cb_arg, clerr); - GRPC_ERROR_UNREF(clerr); - } } - - GPR_TIMER_END("workqueue.on_readable", 0); } void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.enqueue", 0); - gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2); - GPR_ASSERT(last & 1); - closure->error = error; - gpr_mpscq_push(&workqueue->queue, &closure->next_data.atm_next); - if (last == 1) { - wakeup(exec_ctx, workqueue); + grpc_error *push_error = GRPC_ERROR_NONE; + gpr_mu_lock(&workqueue->mu); + if (grpc_closure_list_empty(workqueue->closure_list)) { + push_error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + } + grpc_closure_list_append(&workqueue->closure_list, closure, error); + if (push_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(push_error); + gpr_log(GPR_ERROR, "Failed to push to workqueue: %s", msg); + grpc_error_free_string(msg); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); } - GPR_TIMER_END("workqueue.enqueue", 0); + gpr_mu_unlock(&workqueue->mu); } #endif /* GPR_POSIX_SOCKET */ |