diff options
author | Craig Tiller <ctiller@google.com> | 2016-05-06 17:12:37 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-05-06 17:12:37 -0700 |
commit | 4f1d0f337b15748d171ec21dddba1f6d80e94778 (patch) | |
tree | 255d62f007305913e744efc72ac429583dc166f1 /src/core/lib/iomgr | |
parent | 80384bd2e389b38f594f74055051e46d62509bf3 (diff) |
Error reporting progress
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/error.c | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.h | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 131 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.c | 16 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.h | 12 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset.h | 11 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.c | 15 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_pipe.c | 22 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_posix.c | 12 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_posix.h | 15 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue_posix.c | 62 |
12 files changed, 210 insertions, 112 deletions
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index d5587a4194..bc50d6870f 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -469,3 +469,13 @@ grpc_error *grpc_os_error(const char *file, int line, int err, GRPC_ERROR_STR_OS_ERROR, strerror(err)), GRPC_ERROR_STR_SYSCALL, call_name); } + +bool grpc_log_if_error(const char *what, grpc_error *error, const char *file, + int line) { + if (error == GRPC_ERROR_NONE) return true; + const char *msg = grpc_error_string(error); + gpr_log(file, line, GPR_LOG_SEVERITY_ERROR, "%s: %s", what, msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(error); + return false; +} diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 98661a6a2c..2b56aede75 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -34,6 +34,7 @@ #ifndef GRPC_CORE_LIB_IOMGR_ERROR_H #define GRPC_CORE_LIB_IOMGR_ERROR_H +#include <stdbool.h> #include <stdint.h> #include <grpc/support/time.h> @@ -114,4 +115,9 @@ grpc_error *grpc_os_error(const char *file, int line, int err, #define GRPC_OS_ERROR(err, call_name) \ grpc_os_error(__FILE__, __LINE__, err, call_name) +bool grpc_log_if_error(const char *what, grpc_error *error, const char *file, + int line); +#define GRPC_LOG_IF_ERROR(what, error) \ + grpc_log_if_error((what), (error), __FILE__, __LINE__) + #endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */ diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index d11c947df2..288a44103b 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -217,9 +217,10 @@ struct grpc_pollset { struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd, int and_unlock_pollset); - void (*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now); + grpc_error *(*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now); void (*finish_shutdown)(grpc_pollset *pollset); void (*destroy)(grpc_pollset *pollset); }; @@ -247,9 +248,9 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 /* As per pollset_kick, with an extended set of flags (defined above) -- mostly for fd_posix's use. */ -static void pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags); +static grpc_error *pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) GRPC_MUST_USE_RESULT; /* turn a pollset into a multipoller: platform specific */ typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, @@ -415,12 +416,13 @@ static bool fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -static void pollset_kick_locked(grpc_fd_watcher *watcher) { +static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) { gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); - pollset_kick_ext(watcher->pollset, watcher->worker, - GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); + grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker, + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); gpr_mu_unlock(&watcher->pollset->mu); + return err; } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { @@ -726,10 +728,19 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } -static void pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags) { +static void kick_append_error(grpc_error **composite, grpc_error *error) { + if (error == GRPC_ERROR_NONE) return; + if (*composite == GRPC_ERROR_NONE) { + *composite = GRPC_ERROR_CREATE("Kick Failure"); + } + *composite = grpc_error_add_child(*composite, error); +} + +static grpc_error *pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) { GPR_TIMER_BEGIN("pollset_kick_ext", 0); + grpc_error *error = GRPC_ERROR_NONE; /* pollset->mu already held */ if (specific_worker != NULL) { @@ -739,7 +750,8 @@ static void pollset_kick_ext(grpc_pollset *p, for (specific_worker = p->root_worker.next; specific_worker != &p->root_worker; specific_worker = specific_worker->next) { - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error( + &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } p->kicked_without_pollers = 1; GPR_TIMER_END("pollset_kick_ext.broadcast", 0); @@ -750,14 +762,16 @@ static void pollset_kick_ext(grpc_pollset *p, specific_worker->reevaluate_polling_on_wakeup = 1; } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error(&error, + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { GPR_TIMER_MARK("kick_yoself", 0); if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { specific_worker->reevaluate_polling_on_wakeup = 1; } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error(&error, + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); @@ -778,7 +792,8 @@ static void pollset_kick_ext(grpc_pollset *p, if (specific_worker != NULL) { GPR_TIMER_MARK("finally_kick", 0); push_back_worker(p, specific_worker); - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error( + &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } } else { GPR_TIMER_MARK("kicked_no_pollers", 0); @@ -787,20 +802,21 @@ static void pollset_kick_ext(grpc_pollset *p, } GPR_TIMER_END("pollset_kick_ext", 0); + return error; } -static void pollset_kick(grpc_pollset *p, - grpc_pollset_worker *specific_worker) { - pollset_kick_ext(p, specific_worker, 0); +static grpc_error *pollset_kick(grpc_pollset *p, + grpc_pollset_worker *specific_worker) { + return pollset_kick_ext(p, specific_worker, 0); } /* global state management */ -static void pollset_global_init(void) { +static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); gpr_tls_init(&g_current_thread_worker); grpc_wakeup_fd_global_init(); - grpc_wakeup_fd_init(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_init(&grpc_global_wakeup_fd); } static void pollset_global_shutdown(void) { @@ -810,7 +826,9 @@ static void pollset_global_shutdown(void) { grpc_wakeup_fd_global_destroy(); } -static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } +static grpc_error *kick_poller(void) { + return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); +} /* main interface */ @@ -876,11 +894,12 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { grpc_exec_ctx_push(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } -static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker_hdl, gpr_timespec now, - gpr_timespec deadline) { +static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker_hdl, + gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; *worker_hdl = &worker; + grpc_error *error = GRPC_ERROR_NONE; /* pollset->mu already held */ int added_worker = 0; @@ -896,7 +915,10 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->local_wakeup_cache = worker.wakeup_fd->next; } else { worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); - grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + if (error != GRPC_ERROR_NONE) { + return error; + } } worker.kicked_specifically = 0; /* If there's work waiting for the pollset to be idle, and the @@ -933,8 +955,8 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, - deadline, now); + error = pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, + deadline, now); GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; gpr_tls_set(&g_current_thread_poller, 0); @@ -955,7 +977,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* If we're forced to re-evaluate polling (via pollset_kick with GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force a loop */ - if (worker.reevaluate_polling_on_wakeup) { + if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) { worker.reevaluate_polling_on_wakeup = 0; pollset->kicked_without_pollers = 0; if (queued_work || worker.kicked_specifically) { @@ -996,6 +1018,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } *worker_hdl = NULL; GPR_TIMER_END("pollset_work", 0); + return error; } static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -1156,11 +1179,19 @@ exit: } } -static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, - gpr_timespec now) { +static void work_combine_error(grpc_error **composite, grpc_error *error) { + if (error == GRPC_ERROR_NONE) return; + if (*composite == GRPC_ERROR_NONE) { + *composite = GRPC_ERROR_CREATE("pollset_work"); + } + *composite = grpc_error_add_child(*composite, error); +} + +static grpc_error *basic_pollset_maybe_work_and_unlock( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now) { + grpc_error *error = GRPC_ERROR_NONE; + #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) @@ -1210,7 +1241,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, if (r < 0) { if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); } if (fd) { fd_end_poll(exec_ctx, &fd_watcher, 0, 0); @@ -1221,10 +1252,12 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, } } else { if (pfd[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); } if (pfd[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } if (nfds > 2) { fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, @@ -1237,6 +1270,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, if (fd) { GRPC_FD_UNREF(fd, "basicpoll_begin"); } + + return error; } static void basic_pollset_destroy(grpc_pollset *pollset) { @@ -1297,9 +1332,11 @@ exit: } } -static void multipoll_with_poll_pollset_maybe_work_and_unlock( +static grpc_error *multipoll_with_poll_pollset_maybe_work_and_unlock( grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { + grpc_error *error = GRPC_ERROR_NONE; + #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) @@ -1374,10 +1411,12 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( } } else { if (pfds[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); } if (pfds[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } for (i = 2; i < pfd_count; i++) { if (watchers[i].fd == NULL) { @@ -1391,6 +1430,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( gpr_free(pfds); gpr_free(watchers); + return error; } static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) { @@ -1934,14 +1974,23 @@ static const grpc_event_engine_vtable vtable = { }; const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void) { + const char *msg; + grpc_error *err = GRPC_ERROR_NONE; #ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL platform_become_multipoller = epoll_become_multipoller; #else platform_become_multipoller = poll_become_multipoller; #endif fd_global_init(); - pollset_global_init(); + err = pollset_global_init(); + if (err != GRPC_ERROR_NONE) goto error; return &vtable; + +error: + msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "%s", msg); + grpc_error_free_string(msg); + return NULL; } #endif diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 7df1751352..3b965c8b8c 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -101,15 +101,15 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { g_event_engine->pollset_destroy(pollset); } -void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) { - g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); +grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline) { + return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); } -void grpc_pollset_kick(grpc_pollset *pollset, - grpc_pollset_worker *specific_worker) { - g_event_engine->pollset_kick(pollset, specific_worker); +grpc_error *grpc_pollset_kick(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker) { + return g_event_engine->pollset_kick(pollset, specific_worker); } void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -159,6 +159,6 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, g_event_engine->pollset_set_del_fd(exec_ctx, pollset_set, fd); } -void grpc_kick_poller(void) { g_event_engine->kick_poller(); } +grpc_error *grpc_kick_poller(void) { return g_event_engine->kick_poller(); } #endif // GPR_POSIX_SOCKET diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1fa9f5ef2d..c9f9fd9957 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -61,11 +61,11 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*pollset_reset)(grpc_pollset *pollset); void (*pollset_destroy)(grpc_pollset *pollset); - void (*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); - void (*pollset_kick)(grpc_pollset *pollset, - grpc_pollset_worker *specific_worker); + grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline); + grpc_error *(*pollset_kick)(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker); void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd); @@ -88,7 +88,7 @@ typedef struct grpc_event_engine_vtable { void (*pollset_set_del_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd); - void (*kick_poller)(void); + grpc_error *(*kick_poller)(void); void (*shutdown_engine)(void); } grpc_event_engine_vtable; diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index c40a474877..8d9edc8406 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -81,14 +81,15 @@ void grpc_pollset_destroy(grpc_pollset *pollset); May call grpc_closure_list_run on grpc_closure_list, without holding the pollset lock */ -void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); +grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline) GRPC_MUST_USE_RESULT; /* Break one polling thread out of polling work for this pollset. If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers. Otherwise, if specific_worker is non-NULL, then kick that worker. */ -void grpc_pollset_kick(grpc_pollset *pollset, - grpc_pollset_worker *specific_worker); +grpc_error *grpc_pollset_kick(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker) + GRPC_MUST_USE_RESULT; #endif /* GRPC_CORE_LIB_IOMGR_POLLSET_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index f5a3fb4aa3..c70f11b40a 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -76,13 +76,16 @@ static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) { GPR_ASSERT(fd >= 0); - if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) || - (!grpc_is_unix_socket(addr) && !grpc_set_socket_low_latency(fd, 1)) || - !grpc_set_socket_no_sigpipe_if_possible(fd)) { - gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, - strerror(errno)); - goto error; + err = grpc_set_socket_nonblocking(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_cloexec(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + if (!grpc_is_unix_socket(addr)) { + err = grpc_set_socket_low_latency(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; } + err = grpc_set_socket_no_sigpipe_if_possible(fd); + if (err != GRPC_ERROR_NONE) goto error; goto done; error: diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.c index e9b9a0119f..4e5dbdcb73 100644 --- a/src/core/lib/iomgr/wakeup_fd_pipe.c +++ b/src/core/lib/iomgr/wakeup_fd_pipe.c @@ -45,7 +45,7 @@ #include "src/core/lib/iomgr/socket_utils_posix.h" -static void pipe_init(grpc_wakeup_fd* fd_info) { +static grpc_error* pipe_init(grpc_wakeup_fd* fd_info) { int pipefd[2]; /* TODO(klempner): Make this nonfatal */ int r = pipe(pipefd); @@ -53,36 +53,40 @@ static void pipe_init(grpc_wakeup_fd* fd_info) { gpr_log(GPR_ERROR, "pipe creation failed (%d): %s", errno, strerror(errno)); abort(); } - GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1)); - GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1)); + grpc_error* err; + err = grpc_set_socket_nonblocking(pipefd[0], 1); + if (err != GRPC_ERROR_NONE) return err; + err = grpc_set_socket_nonblocking(pipefd[1], 1); + if (err != GRPC_ERROR_NONE) return err; fd_info->read_fd = pipefd[0]; fd_info->write_fd = pipefd[1]; + return GRPC_ERROR_NONE; } -static void pipe_consume(grpc_wakeup_fd* fd_info) { +static grpc_error* pipe_consume(grpc_wakeup_fd* fd_info) { char buf[128]; ssize_t r; for (;;) { r = read(fd_info->read_fd, buf, sizeof(buf)); if (r > 0) continue; - if (r == 0) return; + if (r == 0) return GRPC_ERROR_NONE; switch (errno) { case EAGAIN: - return; + return GRPC_ERROR_NONE; case EINTR: continue; default: - gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno)); - return; + return GRPC_OS_ERROR(errno, "read"); } } } -static void pipe_wakeup(grpc_wakeup_fd* fd_info) { +static grpc_error* pipe_wakeup(grpc_wakeup_fd* fd_info) { char c = 0; while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR) ; + return GRPC_ERROR_NONE; } static void pipe_destroy(grpc_wakeup_fd* fd_info) { diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.c index 525369c356..046208abc8 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.c +++ b/src/core/lib/iomgr/wakeup_fd_posix.c @@ -53,16 +53,16 @@ void grpc_wakeup_fd_global_init(void) { void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; } -void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) { - wakeup_fd_vtable->init(fd_info); +grpc_error *grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) { + return wakeup_fd_vtable->init(fd_info); } -void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) { - wakeup_fd_vtable->consume(fd_info); +grpc_error *grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) { + return wakeup_fd_vtable->consume(fd_info); } -void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) { - wakeup_fd_vtable->wakeup(fd_info); +grpc_error *grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) { + return wakeup_fd_vtable->wakeup(fd_info); } void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) { diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h index 6b069c1837..e269f242d8 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.h +++ b/src/core/lib/iomgr/wakeup_fd_posix.h @@ -62,6 +62,8 @@ #ifndef GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H #define GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H +#include "src/core/lib/iomgr/error.h" + void grpc_wakeup_fd_global_init(void); void grpc_wakeup_fd_global_destroy(void); @@ -72,9 +74,9 @@ void grpc_wakeup_fd_global_init_force_fallback(void); typedef struct grpc_wakeup_fd grpc_wakeup_fd; typedef struct grpc_wakeup_fd_vtable { - void (*init)(grpc_wakeup_fd* fd_info); - void (*consume)(grpc_wakeup_fd* fd_info); - void (*wakeup)(grpc_wakeup_fd* fd_info); + grpc_error* (*init)(grpc_wakeup_fd* fd_info); + grpc_error* (*consume)(grpc_wakeup_fd* fd_info); + grpc_error* (*wakeup)(grpc_wakeup_fd* fd_info); void (*destroy)(grpc_wakeup_fd* fd_info); /* Must be called before calling any other functions */ int (*check_availability)(void); @@ -89,9 +91,10 @@ extern int grpc_allow_specialized_wakeup_fd; #define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd) -void grpc_wakeup_fd_init(grpc_wakeup_fd* fd_info); -void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd* fd_info); -void grpc_wakeup_fd_wakeup(grpc_wakeup_fd* fd_info); +grpc_error* grpc_wakeup_fd_init(grpc_wakeup_fd* fd_info) GRPC_MUST_USE_RESULT; +grpc_error* grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd* fd_info) + GRPC_MUST_USE_RESULT; +grpc_error* grpc_wakeup_fd_wakeup(grpc_wakeup_fd* fd_info) GRPC_MUST_USE_RESULT; void grpc_wakeup_fd_destroy(grpc_wakeup_fd* fd_info); /* Defined in some specialized implementation's .c file, or by diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 20b47e1c14..cb6aed9bd4 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -50,9 +50,11 @@ /* grpc_workqueue is forward declared in exec_ctx.h */ /** Create a work queue */ -grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx); +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue); -void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); +grpc_error *grpc_workqueue_flush( + grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) GRPC_MUST_USE_RESULT; #define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG @@ -77,7 +79,7 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); /** Add a work item to a workqueue */ -void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, - grpc_error *error); +void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error); #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */ diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index e27525dd50..1187e627a2 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -47,20 +47,25 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) { +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue) { char name[32]; - grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue)); - gpr_ref_init(&workqueue->refs, 1); - gpr_mu_init(&workqueue->mu); - workqueue->closure_list.head = workqueue->closure_list.tail = NULL; - grpc_wakeup_fd_init(&workqueue->wakeup_fd); - sprintf(name, "workqueue:%p", (void *)workqueue); - workqueue->wakeup_read_fd = - grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name); - grpc_closure_init(&workqueue->read_closure, on_readable, workqueue); - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); - return workqueue; + *workqueue = gpr_malloc(sizeof(grpc_workqueue)); + gpr_ref_init(&(*workqueue)->refs, 1); + gpr_mu_init(&(*workqueue)->mu); + (*workqueue)->closure_list.head = (*workqueue)->closure_list.tail = NULL; + grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd); + if (err != GRPC_ERROR_NONE) { + gpr_free(*workqueue); + return err; + } + sprintf(name, "workqueue:%p", (void *)(*workqueue)); + (*workqueue)->wakeup_read_fd = grpc_fd_create( + GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name); + grpc_closure_init(&(*workqueue)->read_closure, on_readable, workqueue); + grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd, + &(*workqueue)->read_closure); + return GRPC_ERROR_NONE; } static void workqueue_destroy(grpc_exec_ctx *exec_ctx, @@ -101,13 +106,16 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd); } -void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { +grpc_error *grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, + grpc_workqueue *workqueue) { + grpc_error *error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { - grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); gpr_mu_unlock(&workqueue->mu); + return error; } static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -123,20 +131,32 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } else { gpr_mu_lock(&workqueue->mu); grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); - grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); + error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); gpr_mu_unlock(&workqueue->mu); - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); + if (error == GRPC_ERROR_NONE) { + grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, + &workqueue->read_closure); + } else { + /* recurse to get error handling */ + on_readable(exec_ctx, arg, error); + } } } -void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, - grpc_error *error) { +void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error) { + grpc_error *push_error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { - grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + push_error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } grpc_closure_list_append(&workqueue->closure_list, closure, error); + if (push_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(push_error); + gpr_log(GPR_ERROR, "Failed to push to workqueue: %s", msg); + grpc_error_free_string(msg); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); + } gpr_mu_unlock(&workqueue->mu); } |