diff options
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 155 |
1 files changed, 49 insertions, 106 deletions
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 48111cc07b..d9bd01d668 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -92,15 +92,14 @@ void grpc_pollset_global_shutdown(void) { /* main interface */ -static void become_empty_pollset(grpc_pollset *pollset); -static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd); +static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); void grpc_pollset_init(grpc_pollset *pollset) { gpr_mu_init(&pollset->mu); grpc_pollset_kick_init(&pollset->kick_state); pollset->in_flight_cbs = 0; pollset->shutting_down = 0; - become_empty_pollset(pollset); + become_basic_pollset(pollset, NULL); } void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { @@ -180,80 +179,8 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { } /* - * empty_pollset - a vtable that provides polling for NO file descriptors - */ - -static void empty_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { - become_unary_pollset(pollset, fd); -} - -static void empty_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {} - -static int empty_pollset_maybe_work(grpc_pollset *pollset, - gpr_timespec deadline, gpr_timespec now, - int allow_synchronous_callback) { - struct pollfd pfd; - int timeout; - int r; - - if (pollset->in_flight_cbs) { - /* Give do_promote priority so we don't starve it out */ - return 1; - } - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { - timeout = -1; - } else { - timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout <= 0) { - return 1; - } - } - pfd.fd = grpc_pollset_kick_pre_poll(&pollset->kick_state); - if (pfd.fd < 0) { - /* Already kicked */ - return 1; - } - pfd.events = POLLIN; - pfd.revents = 0; - pollset->counter++; - gpr_mu_unlock(&pollset->mu); - - /* poll fd count (argument 2) is shortened by one if we have no events - to poll on - such that it only includes the kicker */ - r = poll(&pfd, 1, timeout); - - if (r < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); - } - } else if (r == 0) { - /* do nothing */ - } else { - if (pfd.revents & POLLIN) { - grpc_pollset_kick_consume(&pollset->kick_state); - } - } - - grpc_pollset_kick_post_poll(&pollset->kick_state); - - gpr_mu_lock(&pollset->mu); - pollset->counter--; - return 1; -} - -static void empty_pollset_destroy(grpc_pollset *pollset) {} - -static const grpc_pollset_vtable empty_pollset = { - empty_pollset_add_fd, empty_pollset_del_fd, empty_pollset_maybe_work, - kick_using_pollset_kick, empty_pollset_destroy}; - -static void become_empty_pollset(grpc_pollset *pollset) { - pollset->vtable = &empty_pollset; -} - -/* - * unary_poll_pollset - a vtable that provides polling for one file descriptor - * via poll() + * basic_pollset - a vtable that provides polling for zero or one file + * descriptor via poll() */ @@ -263,7 +190,7 @@ typedef struct grpc_unary_promote_args { grpc_fd *fd; } grpc_unary_promote_args; -static void unary_poll_do_promote(void *args, int success) { +static void basic_do_promote(void *args, int success) { grpc_unary_promote_args *up_args = args; const grpc_pollset_vtable *original_vtable = up_args->original_vtable; grpc_pollset *pollset = up_args->pollset; @@ -283,7 +210,7 @@ static void unary_poll_do_promote(void *args, int success) { /* First we need to ensure that nobody is polling concurrently */ while (pollset->counter != 0) { grpc_pollset_kick(pollset); - grpc_iomgr_add_callback(unary_poll_do_promote, up_args); + grpc_iomgr_add_callback(basic_do_promote, up_args); gpr_mu_unlock(&pollset->mu); return; } @@ -329,12 +256,13 @@ static void unary_poll_do_promote(void *args, int success) { pollset->shutdown_done_cb(pollset->shutdown_done_arg); } - /* Matching ref in unary_poll_pollset_add_fd */ + /* Matching ref in basic_pollset_add_fd */ grpc_fd_unref(fd); } -static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { +static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { grpc_unary_promote_args *up_args; + GPR_ASSERT(fd); if (fd == pollset->data.ptr) return; if (!pollset->counter) { @@ -345,7 +273,10 @@ static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { fds[0] = pollset->data.ptr; fds[1] = fd; - if (!grpc_fd_is_orphaned(fds[0])) { + if (fds[0] == NULL) { + pollset->data.ptr = fd; + grpc_fd_ref(fd); + } else if (!grpc_fd_is_orphaned(fds[0])) { grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); grpc_fd_unref(fds[0]); } else { @@ -366,19 +297,20 @@ static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { up_args->pollset = pollset; up_args->fd = fd; up_args->original_vtable = pollset->vtable; - grpc_iomgr_add_callback(unary_poll_do_promote, up_args); + grpc_iomgr_add_callback(basic_do_promote, up_args); grpc_pollset_kick(pollset); } -static void unary_poll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { +static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { + GPR_ASSERT(fd); if (fd == pollset->data.ptr) { grpc_fd_unref(pollset->data.ptr); - become_empty_pollset(pollset); + pollset->data.ptr = NULL; } } -static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, +static int basic_pollset_maybe_work(grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, int allow_synchronous_callback) { @@ -387,16 +319,16 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, grpc_fd_watcher fd_watcher; int timeout; int r; + int nfds; if (pollset->in_flight_cbs) { /* Give do_promote priority so we don't starve it out */ return 1; } fd = pollset->data.ptr; - if (grpc_fd_is_orphaned(fd)) { + if (fd && grpc_fd_is_orphaned(fd)) { grpc_fd_unref(fd); - become_empty_pollset(pollset); - return 1; + fd = pollset->data.ptr = NULL; } if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { timeout = -1; @@ -413,19 +345,28 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, } pfd[0].events = POLLIN; pfd[0].revents = 0; - pfd[1].fd = fd->fd; - pfd[1].revents = 0; - pollset->counter++; - gpr_mu_unlock(&pollset->mu); - - pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); + nfds = 1; + if (fd) { + pfd[1].fd = fd->fd; + pfd[1].revents = 0; + pollset->counter++; + gpr_mu_unlock(&pollset->mu); + pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); + if (pfd[1].events != 0) { + nfds++; + } + } else { + gpr_mu_unlock(&pollset->mu); + } /* poll fd count (argument 2) is shortened by one if we have no events to poll on - such that it only includes the kicker */ - r = poll(pfd, GPR_ARRAY_SIZE(pfd) - (pfd[1].events == 0), timeout); + r = poll(pfd, nfds, timeout); GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); - grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN, pfd[1].revents & POLLOUT); + if (fd) { + grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN, pfd[1].revents & POLLOUT); + } if (r < 0) { if (errno != EINTR) { @@ -452,21 +393,23 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, return 1; } -static void unary_poll_pollset_destroy(grpc_pollset *pollset) { +static void basic_pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(pollset->counter == 0); grpc_fd_unref(pollset->data.ptr); } -static const grpc_pollset_vtable unary_poll_pollset = { - unary_poll_pollset_add_fd, unary_poll_pollset_del_fd, - unary_poll_pollset_maybe_work, kick_using_pollset_kick, - unary_poll_pollset_destroy}; +static const grpc_pollset_vtable basic_pollset = { + basic_pollset_add_fd, basic_pollset_del_fd, + basic_pollset_maybe_work, kick_using_pollset_kick, + basic_pollset_destroy}; -static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd) { - pollset->vtable = &unary_poll_pollset; +static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { + pollset->vtable = &basic_pollset; pollset->counter = 0; - pollset->data.ptr = fd; - grpc_fd_ref(fd); + pollset->data.ptr = fd_or_null; + if (fd_or_null) { + grpc_fd_ref(fd_or_null); + } } #endif /* GPR_POSIX_POLLSET */ |