diff options
Diffstat (limited to 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c')
-rw-r--r-- | src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 131 |
1 files changed, 90 insertions, 41 deletions
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index d11c947df2..288a44103b 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -217,9 +217,10 @@ struct grpc_pollset { struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd, int and_unlock_pollset); - void (*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now); + grpc_error *(*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now); void (*finish_shutdown)(grpc_pollset *pollset); void (*destroy)(grpc_pollset *pollset); }; @@ -247,9 +248,9 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 /* As per pollset_kick, with an extended set of flags (defined above) -- mostly for fd_posix's use. */ -static void pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags); +static grpc_error *pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) GRPC_MUST_USE_RESULT; /* turn a pollset into a multipoller: platform specific */ typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, @@ -415,12 +416,13 @@ static bool fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -static void pollset_kick_locked(grpc_fd_watcher *watcher) { +static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) { gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); - pollset_kick_ext(watcher->pollset, watcher->worker, - GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); + grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker, + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); gpr_mu_unlock(&watcher->pollset->mu); + return err; } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { @@ -726,10 +728,19 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } -static void pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags) { +static void kick_append_error(grpc_error **composite, grpc_error *error) { + if (error == GRPC_ERROR_NONE) return; + if (*composite == GRPC_ERROR_NONE) { + *composite = GRPC_ERROR_CREATE("Kick Failure"); + } + *composite = grpc_error_add_child(*composite, error); +} + +static grpc_error *pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) { GPR_TIMER_BEGIN("pollset_kick_ext", 0); + grpc_error *error = GRPC_ERROR_NONE; /* pollset->mu already held */ if (specific_worker != NULL) { @@ -739,7 +750,8 @@ static void pollset_kick_ext(grpc_pollset *p, for (specific_worker = p->root_worker.next; specific_worker != &p->root_worker; specific_worker = specific_worker->next) { - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error( + &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } p->kicked_without_pollers = 1; GPR_TIMER_END("pollset_kick_ext.broadcast", 0); @@ -750,14 +762,16 @@ static void pollset_kick_ext(grpc_pollset *p, specific_worker->reevaluate_polling_on_wakeup = 1; } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error(&error, + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { GPR_TIMER_MARK("kick_yoself", 0); if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { specific_worker->reevaluate_polling_on_wakeup = 1; } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error(&error, + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); @@ -778,7 +792,8 @@ static void pollset_kick_ext(grpc_pollset *p, if (specific_worker != NULL) { GPR_TIMER_MARK("finally_kick", 0); push_back_worker(p, specific_worker); - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error( + &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } } else { GPR_TIMER_MARK("kicked_no_pollers", 0); @@ -787,20 +802,21 @@ static void pollset_kick_ext(grpc_pollset *p, } GPR_TIMER_END("pollset_kick_ext", 0); + return error; } -static void pollset_kick(grpc_pollset *p, - grpc_pollset_worker *specific_worker) { - pollset_kick_ext(p, specific_worker, 0); +static grpc_error *pollset_kick(grpc_pollset *p, + grpc_pollset_worker *specific_worker) { + return pollset_kick_ext(p, specific_worker, 0); } /* global state management */ -static void pollset_global_init(void) { +static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); gpr_tls_init(&g_current_thread_worker); grpc_wakeup_fd_global_init(); - grpc_wakeup_fd_init(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_init(&grpc_global_wakeup_fd); } static void pollset_global_shutdown(void) { @@ -810,7 +826,9 @@ static void pollset_global_shutdown(void) { grpc_wakeup_fd_global_destroy(); } -static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } +static grpc_error *kick_poller(void) { + return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); +} /* main interface */ @@ -876,11 +894,12 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { grpc_exec_ctx_push(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } -static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker_hdl, gpr_timespec now, - gpr_timespec deadline) { +static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker_hdl, + gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; *worker_hdl = &worker; + grpc_error *error = GRPC_ERROR_NONE; /* pollset->mu already held */ int added_worker = 0; @@ -896,7 +915,10 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->local_wakeup_cache = worker.wakeup_fd->next; } else { worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); - grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + if (error != GRPC_ERROR_NONE) { + return error; + } } worker.kicked_specifically = 0; /* If there's work waiting for the pollset to be idle, and the @@ -933,8 +955,8 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, - deadline, now); + error = pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, + deadline, now); GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; gpr_tls_set(&g_current_thread_poller, 0); @@ -955,7 +977,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* If we're forced to re-evaluate polling (via pollset_kick with GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force a loop */ - if (worker.reevaluate_polling_on_wakeup) { + if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) { worker.reevaluate_polling_on_wakeup = 0; pollset->kicked_without_pollers = 0; if (queued_work || worker.kicked_specifically) { @@ -996,6 +1018,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } *worker_hdl = NULL; GPR_TIMER_END("pollset_work", 0); + return error; } static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -1156,11 +1179,19 @@ exit: } } -static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, - gpr_timespec now) { +static void work_combine_error(grpc_error **composite, grpc_error *error) { + if (error == GRPC_ERROR_NONE) return; + if (*composite == GRPC_ERROR_NONE) { + *composite = GRPC_ERROR_CREATE("pollset_work"); + } + *composite = grpc_error_add_child(*composite, error); +} + +static grpc_error *basic_pollset_maybe_work_and_unlock( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now) { + grpc_error *error = GRPC_ERROR_NONE; + #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) @@ -1210,7 +1241,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, if (r < 0) { if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); } if (fd) { fd_end_poll(exec_ctx, &fd_watcher, 0, 0); @@ -1221,10 +1252,12 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, } } else { if (pfd[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); } if (pfd[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } if (nfds > 2) { fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, @@ -1237,6 +1270,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, if (fd) { GRPC_FD_UNREF(fd, "basicpoll_begin"); } + + return error; } static void basic_pollset_destroy(grpc_pollset *pollset) { @@ -1297,9 +1332,11 @@ exit: } } -static void multipoll_with_poll_pollset_maybe_work_and_unlock( +static grpc_error *multipoll_with_poll_pollset_maybe_work_and_unlock( grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { + grpc_error *error = GRPC_ERROR_NONE; + #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) @@ -1374,10 +1411,12 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( } } else { if (pfds[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); } if (pfds[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } for (i = 2; i < pfd_count; i++) { if (watchers[i].fd == NULL) { @@ -1391,6 +1430,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( gpr_free(pfds); gpr_free(watchers); + return error; } static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) { @@ -1934,14 +1974,23 @@ static const grpc_event_engine_vtable vtable = { }; const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void) { + const char *msg; + grpc_error *err = GRPC_ERROR_NONE; #ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL platform_become_multipoller = epoll_become_multipoller; #else platform_become_multipoller = poll_become_multipoller; #endif fd_global_init(); - pollset_global_init(); + err = pollset_global_init(); + if (err != GRPC_ERROR_NONE) goto error; return &vtable; + +error: + msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "%s", msg); + grpc_error_free_string(msg); + return NULL; } #endif |