diff options
Diffstat (limited to 'src/core/lib/iomgr/workqueue_posix.c')
-rw-r--r-- | src/core/lib/iomgr/workqueue_posix.c | 66 |
1 files changed, 40 insertions, 26 deletions
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 80e7a0b206..45e0f6063b 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -45,22 +45,27 @@ #include "src/core/lib/iomgr/ev_posix.h" -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success); +static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) { +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue) { char name[32]; - grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue)); - gpr_ref_init(&workqueue->refs, 1); - gpr_mu_init(&workqueue->mu); - workqueue->closure_list.head = workqueue->closure_list.tail = NULL; - grpc_wakeup_fd_init(&workqueue->wakeup_fd); - sprintf(name, "workqueue:%p", (void *)workqueue); - workqueue->wakeup_read_fd = - grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name); - grpc_closure_init(&workqueue->read_closure, on_readable, workqueue); - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); - return workqueue; + *workqueue = gpr_malloc(sizeof(grpc_workqueue)); + gpr_ref_init(&(*workqueue)->refs, 1); + gpr_mu_init(&(*workqueue)->mu); + (*workqueue)->closure_list.head = (*workqueue)->closure_list.tail = NULL; + grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd); + if (err != GRPC_ERROR_NONE) { + gpr_free(*workqueue); + return err; + } + sprintf(name, "workqueue:%p", (void *)(*workqueue)); + (*workqueue)->wakeup_read_fd = grpc_fd_create( + GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name); + grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue); + grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd, + &(*workqueue)->read_closure); + return GRPC_ERROR_NONE; } static void workqueue_destroy(grpc_exec_ctx *exec_ctx, @@ -103,17 +108,14 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { gpr_mu_lock(&workqueue->mu); - if (grpc_closure_list_empty(workqueue->closure_list)) { - grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); - } grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); gpr_mu_unlock(&workqueue->mu); } -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_workqueue *workqueue = arg; - if (!success) { + if (error != GRPC_ERROR_NONE) { gpr_mu_destroy(&workqueue->mu); /* HACK: let wakeup_fd code know that we stole the fd */ workqueue->wakeup_fd.read_fd = 0; @@ -123,20 +125,32 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { } else { gpr_mu_lock(&workqueue->mu); grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); - grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); + error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); gpr_mu_unlock(&workqueue->mu); - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); + if (error == GRPC_ERROR_NONE) { + grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, + &workqueue->read_closure); + } else { + /* recurse to get error handling */ + on_readable(exec_ctx, arg, error); + } } } -void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, - int success) { +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error) { + grpc_error *push_error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { - grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + push_error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + } + grpc_closure_list_append(&workqueue->closure_list, closure, error); + if (push_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(push_error); + gpr_log(GPR_ERROR, "Failed to push to workqueue: %s", msg); + grpc_error_free_string(msg); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); } - grpc_closure_list_add(&workqueue->closure_list, closure, success); gpr_mu_unlock(&workqueue->mu); } |