aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/pollset_multipoller_with_epoll.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr/pollset_multipoller_with_epoll.c')
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c81
1 files changed, 50 insertions, 31 deletions
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 481bdc4ede..d26e60f665 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -42,6 +42,7 @@
#include <unistd.h>
#include "src/core/iomgr/fd_posix.h"
+#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -53,7 +54,7 @@ typedef struct wakeup_fd_hdl {
typedef struct {
grpc_pollset *pollset;
grpc_fd *fd;
- grpc_iomgr_closure closure;
+ grpc_closure closure;
} delayed_add;
typedef struct {
@@ -61,7 +62,8 @@ typedef struct {
wakeup_fd_hdl *free_wakeup_fds;
} pollset_hdr;
-static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
+static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd) {
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
@@ -83,15 +85,15 @@ static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
}
}
}
- grpc_fd_end_poll(&watcher, 0, 0);
+ grpc_fd_end_poll(exec_ctx, &watcher, 0, 0);
}
-static void perform_delayed_add(void *arg, int iomgr_status) {
+static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_status) {
delayed_add *da = arg;
- int do_shutdown_cb = 0;
if (!grpc_fd_is_orphaned(da->fd)) {
- finally_add_fd(da->pollset, da->fd);
+ finally_add_fd(exec_ctx, da->pollset, da->fd);
}
gpr_mu_lock(&da->pollset->mu);
@@ -100,38 +102,36 @@ static void perform_delayed_add(void *arg, int iomgr_status) {
/* We don't care about this pollset anymore. */
if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
da->pollset->called_shutdown = 1;
- do_shutdown_cb = 1;
+ grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, 1);
}
}
gpr_mu_unlock(&da->pollset->mu);
GRPC_FD_UNREF(da->fd, "delayed_add");
- if (do_shutdown_cb) {
- da->pollset->shutdown_done_cb(da->pollset->shutdown_done_arg);
- }
-
gpr_free(da);
}
-static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
+static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
grpc_fd *fd,
int and_unlock_pollset) {
if (and_unlock_pollset) {
gpr_mu_unlock(&pollset->mu);
- finally_add_fd(pollset, fd);
+ finally_add_fd(exec_ctx, pollset, fd);
} else {
delayed_add *da = gpr_malloc(sizeof(*da));
da->pollset = pollset;
da->fd = fd;
GRPC_FD_REF(fd, "delayed_add");
- grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
+ grpc_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
- grpc_iomgr_add_callback(&da->closure);
+ grpc_exec_ctx_enqueue(exec_ctx, &da->closure, 1);
}
}
-static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
+static void multipoll_with_epoll_pollset_del_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
grpc_fd *fd,
int and_unlock_pollset) {
pollset_hdr *h = pollset->data.ptr;
@@ -153,9 +153,9 @@ static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
/* TODO(klempner): We probably want to turn this down a bit */
#define GRPC_EPOLL_MAX_EVENTS 1000
-static void multipoll_with_epoll_pollset_maybe_work(
- grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline,
- gpr_timespec now, int allow_synchronous_callback) {
+static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
+ grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline, gpr_timespec now) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int ep_rv;
int poll_rv;
@@ -180,7 +180,11 @@ static void multipoll_with_epoll_pollset_maybe_work(
pfds[1].events = POLLIN;
pfds[1].revents = 0;
+ /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
+ even going into the blocking annotation if possible */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
if (poll_rv < 0) {
if (errno != EINTR) {
@@ -194,6 +198,7 @@ static void multipoll_with_epoll_pollset_maybe_work(
}
if (pfds[1].revents) {
do {
+ /* The following epoll_wait never blocks; it has a timeout of 0 */
ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
if (ep_rv < 0) {
if (errno != EINTR) {
@@ -206,21 +211,23 @@ static void multipoll_with_epoll_pollset_maybe_work(
/* TODO(klempner): We might want to consider making err and pri
* separate events */
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write = ep_ev[i].events & EPOLLOUT;
- if (read || cancel) {
- grpc_fd_become_readable(fd, allow_synchronous_callback);
- }
- if (write || cancel) {
- grpc_fd_become_writable(fd, allow_synchronous_callback);
+ int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+ int write_ev = ep_ev[i].events & EPOLLOUT;
+ if (fd == NULL) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ } else {
+ if (read_ev || cancel) {
+ grpc_fd_become_readable(exec_ctx, fd);
+ }
+ if (write_ev || cancel) {
+ grpc_fd_become_writable(exec_ctx, fd);
+ }
}
}
}
} while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
}
}
-
- gpr_mu_lock(&pollset->mu);
}
static void multipoll_with_epoll_pollset_finish_shutdown(
@@ -234,14 +241,17 @@ static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd,
- multipoll_with_epoll_pollset_maybe_work,
+ multipoll_with_epoll_pollset_maybe_work_and_unlock,
multipoll_with_epoll_pollset_finish_shutdown,
multipoll_with_epoll_pollset_destroy};
-static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
+static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset, grpc_fd **fds,
size_t nfds) {
size_t i;
pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
+ struct epoll_event ev;
+ int err;
pollset->vtable = &multipoll_with_epoll_pollset;
pollset->data.ptr = h;
@@ -251,8 +261,17 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
abort();
}
+
+ ev.events = (uint32_t)(EPOLLIN | EPOLLET);
+ ev.data.ptr = NULL;
+ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
+ if (err < 0) {
+ gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd),
+ strerror(errno));
+ }
+
for (i = 0; i < nfds; i++) {
- multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0);
+ multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0);
}
}