diff options
author | Craig Tiller <ctiller@google.com> | 2016-06-01 10:28:15 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-06-01 10:28:15 -0700 |
commit | 5e3a0ef666ce8f00744066df304d49e4f00124fe (patch) | |
tree | 9012126f4936fedb0fe370944a57f70a3c918028 /src/core | |
parent | 4c751ebd505d4759744489dbaae72a477f272f78 (diff) |
Reintegrate
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 169 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.c | 4 |
2 files changed, 110 insertions, 63 deletions
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 943c404f91..325b3c89b1 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -221,9 +221,10 @@ struct grpc_pollset { struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd, int and_unlock_pollset); - void (*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now); + grpc_error *(*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now); void (*finish_shutdown)(grpc_pollset *pollset); void (*destroy)(grpc_pollset *pollset); }; @@ -251,9 +252,9 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 /* As per pollset_kick, with an extended set of flags (defined above) -- mostly for fd_posix's use. */ -static void pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags); +static grpc_error *pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) GRPC_MUST_USE_RESULT; /* turn a pollset into a multipoller: platform specific */ typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, @@ -420,12 +421,13 @@ static bool fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -static void pollset_kick_locked(grpc_fd_watcher *watcher) { +static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) { gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); - pollset_kick_ext(watcher->pollset, watcher->worker, - GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); + grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker, + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); gpr_mu_unlock(&watcher->pollset->mu); + return err; } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { @@ -464,7 +466,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { } else { remove_fd_from_all_epoll_sets(fd->fd); } - grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL); + grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL); } static int fd_wrapped_fd(grpc_fd *fd) { @@ -513,6 +515,14 @@ static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); } static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif +static grpc_error *fd_shutdown_error(bool shutdown) { + if (!shutdown) { + return GRPC_ERROR_NONE; + } else { + return GRPC_ERROR_CREATE("FD shutdown"); + } +} + static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { if (*st == CLOSURE_NOT_READY) { @@ -521,7 +531,8 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown), + NULL); maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ @@ -544,7 +555,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL); + grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL); *st = CLOSURE_NOT_READY; return 1; } @@ -744,10 +755,19 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } -static void pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags) { +static void kick_append_error(grpc_error **composite, grpc_error *error) { + if (error == GRPC_ERROR_NONE) return; + if (*composite == GRPC_ERROR_NONE) { + *composite = GRPC_ERROR_CREATE("Kick Failure"); + } + *composite = grpc_error_add_child(*composite, error); +} + +static grpc_error *pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) { GPR_TIMER_BEGIN("pollset_kick_ext", 0); + grpc_error *error = GRPC_ERROR_NONE; /* pollset->mu already held */ if (specific_worker != NULL) { @@ -757,25 +777,28 @@ static void pollset_kick_ext(grpc_pollset *p, for (specific_worker = p->root_worker.next; specific_worker != &p->root_worker; specific_worker = specific_worker->next) { - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error( + &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } - p->kicked_without_pollers = 1; + p->kicked_without_pollers = true; GPR_TIMER_END("pollset_kick_ext.broadcast", 0); } else if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)specific_worker) { GPR_TIMER_MARK("different_thread_worker", 0); if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { - specific_worker->reevaluate_polling_on_wakeup = 1; + specific_worker->reevaluate_polling_on_wakeup = true; } - specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + specific_worker->kicked_specifically = true; + kick_append_error(&error, + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { GPR_TIMER_MARK("kick_yoself", 0); if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { - specific_worker->reevaluate_polling_on_wakeup = 1; + specific_worker->reevaluate_polling_on_wakeup = true; } - specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + specific_worker->kicked_specifically = true; + kick_append_error(&error, + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); @@ -783,14 +806,9 @@ static void pollset_kick_ext(grpc_pollset *p, specific_worker = pop_front_worker(p); if (specific_worker != NULL) { if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { - /* Prefer not to kick self. Push the worker to the end of the list and - * pop the one from front */ GPR_TIMER_MARK("kick_anonymous_not_self", 0); push_back_worker(p, specific_worker); specific_worker = pop_front_worker(p); - /* If there was only one worker on the pollset, we would get the same - * worker we pushed (the one set on current thread local) back. If so, - * kick it only if GRPC_POLLSET_CAN_KICK_SELF flag is set */ if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { @@ -801,28 +819,30 @@ static void pollset_kick_ext(grpc_pollset *p, if (specific_worker != NULL) { GPR_TIMER_MARK("finally_kick", 0); push_back_worker(p, specific_worker); - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error( + &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } } else { GPR_TIMER_MARK("kicked_no_pollers", 0); - p->kicked_without_pollers = 1; + p->kicked_without_pollers = true; } } GPR_TIMER_END("pollset_kick_ext", 0); + return error; } -static void pollset_kick(grpc_pollset *p, - grpc_pollset_worker *specific_worker) { - pollset_kick_ext(p, specific_worker, 0); +static grpc_error *pollset_kick(grpc_pollset *p, + grpc_pollset_worker *specific_worker) { + return pollset_kick_ext(p, specific_worker, 0); } /* global state management */ -static void pollset_global_init(void) { +static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); gpr_tls_init(&g_current_thread_worker); - grpc_wakeup_fd_init(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_init(&grpc_global_wakeup_fd); } static void pollset_global_shutdown(void) { @@ -831,7 +851,9 @@ static void pollset_global_shutdown(void) { gpr_tls_destroy(&g_current_thread_worker); } -static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } +static grpc_error *kick_poller(void) { + return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); +} /* main interface */ @@ -894,14 +916,23 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); pollset->vtable->finish_shutdown(pollset); - grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); + grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } -static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker_hdl, gpr_timespec now, - gpr_timespec deadline) { +static void work_combine_error(grpc_error **composite, grpc_error *error) { + if (error == GRPC_ERROR_NONE) return; + if (*composite == GRPC_ERROR_NONE) { + *composite = GRPC_ERROR_CREATE("pollset_work"); + } + *composite = grpc_error_add_child(*composite, error); +} + +static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker_hdl, + gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; *worker_hdl = &worker; + grpc_error *error = GRPC_ERROR_NONE; /* pollset->mu already held */ int added_worker = 0; @@ -917,7 +948,10 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->local_wakeup_cache = worker.wakeup_fd->next; } else { worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); - grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + if (error != GRPC_ERROR_NONE) { + return error; + } } worker.kicked_specifically = 0; /* If there's work waiting for the pollset to be idle, and the @@ -954,8 +988,9 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, - deadline, now); + work_combine_error(&error, + pollset->vtable->maybe_work_and_unlock( + exec_ctx, pollset, &worker, deadline, now)); GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; gpr_tls_set(&g_current_thread_poller, 0); @@ -1017,6 +1052,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } *worker_hdl = NULL; GPR_TIMER_END("pollset_work", 0); + return error; } static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -1065,7 +1101,7 @@ typedef struct grpc_unary_promote_args { } grpc_unary_promote_args; static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, - bool success) { + grpc_error *error) { grpc_unary_promote_args *up_args = args; const grpc_pollset_vtable *original_vtable = up_args->original_vtable; grpc_pollset *pollset = up_args->pollset; @@ -1167,7 +1203,8 @@ static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, up_args->promotion_closure.cb = basic_do_promote; up_args->promotion_closure.cb_arg = up_args; - grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); + grpc_closure_list_append(&pollset->idle_jobs, &up_args->promotion_closure, + GRPC_ERROR_NONE); pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); exit: @@ -1176,11 +1213,9 @@ exit: } } -static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, - gpr_timespec now) { +static grpc_error *basic_pollset_maybe_work_and_unlock( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now) { #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) @@ -1190,6 +1225,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, int timeout; int r; nfds_t nfds; + grpc_error *error = GRPC_ERROR_NONE; fd = pollset->data.ptr; if (fd && fd_is_orphaned(fd)) { @@ -1230,7 +1266,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, if (r < 0) { if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); } if (fd) { fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL); @@ -1241,10 +1277,12 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, } } else { if (pfd[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); } if (pfd[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } if (nfds > 2) { fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, @@ -1257,6 +1295,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, if (fd) { GRPC_FD_UNREF(fd, "basicpoll_begin"); } + + return error; } static void basic_pollset_destroy(grpc_pollset *pollset) { @@ -1383,7 +1423,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( if (r < 0) { if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); } for (i = 2; i < pfd_count; i++) { fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); @@ -1601,7 +1641,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_status) { + grpc_error *error) { delayed_add *da = arg; if (!fd_is_orphaned(da->fd)) { @@ -1614,7 +1654,8 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, /* We don't care about this pollset anymore. */ if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) { da->pollset->called_shutdown = 1; - grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL); + grpc_exec_ctx_sched(exec_ctx, da->pollset->shutdown_done, GRPC_ERROR_NONE, + NULL); } } gpr_mu_unlock(&da->pollset->mu); @@ -1638,14 +1679,14 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, GRPC_FD_REF(fd, "delayed_add"); grpc_closure_init(&da->closure, perform_delayed_add, da); pollset->in_flight_cbs++; - grpc_exec_ctx_enqueue(exec_ctx, &da->closure, true, NULL); + grpc_exec_ctx_sched(exec_ctx, &da->closure, GRPC_ERROR_NONE, NULL); } } /* TODO(klempner): We probably want to turn this down a bit */ #define GRPC_EPOLL_MAX_EVENTS 1000 -static void multipoll_with_epoll_pollset_maybe_work_and_unlock( +static grpc_error *multipoll_with_epoll_pollset_maybe_work_and_unlock( grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; @@ -1654,6 +1695,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( epoll_hdr *h = pollset->data.ptr; int timeout_ms; struct pollfd pfds[2]; + grpc_error *error = GRPC_ERROR_NONE; /* If you want to ignore epoll's ability to sanely handle parallel pollers, * for a more apples-to-apples performance comparison with poll, add a @@ -1682,13 +1724,14 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( if (poll_rv < 0) { if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); } } else if (poll_rv == 0) { /* do nothing */ } else { if (pfds[0].revents) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } if (pfds[1].revents) { do { @@ -1696,7 +1739,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); if (ep_rv < 0) { if (errno != EINTR) { - gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno)); + work_combine_error(&error, GRPC_OS_ERROR(errno, "epoll_wait")); } } else { int i; @@ -1708,7 +1751,8 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); int write_ev = ep_ev[i].events & EPOLLOUT; if (fd == NULL) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + work_combine_error(&error, grpc_wakeup_fd_consume_wakeup( + &grpc_global_wakeup_fd)); } else { if (read_ev || cancel) { fd_become_readable(exec_ctx, fd, pollset); @@ -1722,6 +1766,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); } } + return error; } static void multipoll_with_epoll_pollset_finish_shutdown( diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 9fe0671e98..87d48a47ab 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -1240,7 +1240,9 @@ static const grpc_event_engine_vtable vtable = { }; const grpc_event_engine_vtable *grpc_init_poll_posix(void) { - pollset_global_init(); + if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) { + return NULL; + } return &vtable; } |