diff options
Diffstat (limited to 'src/core/iomgr/pollset_multipoller_with_epoll.c')
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 81 |
1 files changed, 50 insertions, 31 deletions
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 481bdc4ede..d26e60f665 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -42,6 +42,7 @@ #include <unistd.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -53,7 +54,7 @@ typedef struct wakeup_fd_hdl { typedef struct { grpc_pollset *pollset; grpc_fd *fd; - grpc_iomgr_closure closure; + grpc_closure closure; } delayed_add; typedef struct { @@ -61,7 +62,8 @@ typedef struct { wakeup_fd_hdl *free_wakeup_fds; } pollset_hdr; -static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) { +static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_fd *fd) { pollset_hdr *h = pollset->data.ptr; struct epoll_event ev; int err; @@ -83,15 +85,15 @@ static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) { } } } - grpc_fd_end_poll(&watcher, 0, 0); + grpc_fd_end_poll(exec_ctx, &watcher, 0, 0); } -static void perform_delayed_add(void *arg, int iomgr_status) { +static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_status) { delayed_add *da = arg; - int do_shutdown_cb = 0; if (!grpc_fd_is_orphaned(da->fd)) { - finally_add_fd(da->pollset, da->fd); + finally_add_fd(exec_ctx, da->pollset, da->fd); } gpr_mu_lock(&da->pollset->mu); @@ -100,38 +102,36 @@ static void perform_delayed_add(void *arg, int iomgr_status) { /* We don't care about this pollset anymore. */ if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) { da->pollset->called_shutdown = 1; - do_shutdown_cb = 1; + grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, 1); } } gpr_mu_unlock(&da->pollset->mu); GRPC_FD_UNREF(da->fd, "delayed_add"); - if (do_shutdown_cb) { - da->pollset->shutdown_done_cb(da->pollset->shutdown_done_arg); - } - gpr_free(da); } -static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, +static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, grpc_fd *fd, int and_unlock_pollset) { if (and_unlock_pollset) { gpr_mu_unlock(&pollset->mu); - finally_add_fd(pollset, fd); + finally_add_fd(exec_ctx, pollset, fd); } else { delayed_add *da = gpr_malloc(sizeof(*da)); da->pollset = pollset; da->fd = fd; GRPC_FD_REF(fd, "delayed_add"); - grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da); + grpc_closure_init(&da->closure, perform_delayed_add, da); pollset->in_flight_cbs++; - grpc_iomgr_add_callback(&da->closure); + grpc_exec_ctx_enqueue(exec_ctx, &da->closure, 1); } } -static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, +static void multipoll_with_epoll_pollset_del_fd(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, grpc_fd *fd, int and_unlock_pollset) { pollset_hdr *h = pollset->data.ptr; @@ -153,9 +153,9 @@ static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, /* TODO(klempner): We probably want to turn this down a bit */ #define GRPC_EPOLL_MAX_EVENTS 1000 -static void multipoll_with_epoll_pollset_maybe_work( - grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, - gpr_timespec now, int allow_synchronous_callback) { +static void multipoll_with_epoll_pollset_maybe_work_and_unlock( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now) { struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; int ep_rv; int poll_rv; @@ -180,7 +180,11 @@ static void multipoll_with_epoll_pollset_maybe_work( pfds[1].events = POLLIN; pfds[1].revents = 0; + /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid + even going into the blocking annotation if possible */ + GRPC_SCHEDULING_START_BLOCKING_REGION; poll_rv = grpc_poll_function(pfds, 2, timeout_ms); + GRPC_SCHEDULING_END_BLOCKING_REGION; if (poll_rv < 0) { if (errno != EINTR) { @@ -194,6 +198,7 @@ static void multipoll_with_epoll_pollset_maybe_work( } if (pfds[1].revents) { do { + /* The following epoll_wait never blocks; it has a timeout of 0 */ ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); if (ep_rv < 0) { if (errno != EINTR) { @@ -206,21 +211,23 @@ static void multipoll_with_epoll_pollset_maybe_work( /* TODO(klempner): We might want to consider making err and pri * separate events */ int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write = ep_ev[i].events & EPOLLOUT; - if (read || cancel) { - grpc_fd_become_readable(fd, allow_synchronous_callback); - } - if (write || cancel) { - grpc_fd_become_writable(fd, allow_synchronous_callback); + int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); + int write_ev = ep_ev[i].events & EPOLLOUT; + if (fd == NULL) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } else { + if (read_ev || cancel) { + grpc_fd_become_readable(exec_ctx, fd); + } + if (write_ev || cancel) { + grpc_fd_become_writable(exec_ctx, fd); + } } } } } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); } } - - gpr_mu_lock(&pollset->mu); } static void multipoll_with_epoll_pollset_finish_shutdown( @@ -234,14 +241,17 @@ static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { static const grpc_pollset_vtable multipoll_with_epoll_pollset = { multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd, - multipoll_with_epoll_pollset_maybe_work, + multipoll_with_epoll_pollset_maybe_work_and_unlock, multipoll_with_epoll_pollset_finish_shutdown, multipoll_with_epoll_pollset_destroy}; -static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, +static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, grpc_fd **fds, size_t nfds) { size_t i; pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); + struct epoll_event ev; + int err; pollset->vtable = &multipoll_with_epoll_pollset; pollset->data.ptr = h; @@ -251,8 +261,17 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); abort(); } + + ev.events = (uint32_t)(EPOLLIN | EPOLLET); + ev.data.ptr = NULL; + err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev); + if (err < 0) { + gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), + strerror(errno)); + } + for (i = 0; i < nfds; i++) { - multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0); + multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0); } } |