aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/pollset_multipoller_with_epoll.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-22 12:33:20 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-22 12:33:20 -0700
commita82950e68318a6aab6fe894fa39f7fa616c4647b (patch)
tree7d02bd1e9e1cbae1f14ad4ad1e06d3ae81a96dfe /src/core/iomgr/pollset_multipoller_with_epoll.c
parent8af4c337181322cc4fb396199c90f574cfb4163f (diff)
clang-format all core files
Diffstat (limited to 'src/core/iomgr/pollset_multipoller_with_epoll.c')
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c289
1 files changed, 127 insertions, 162 deletions
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index c62a786cf6..5626b08a47 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -45,28 +45,24 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-typedef struct wakeup_fd_hdl
-{
+typedef struct wakeup_fd_hdl {
grpc_wakeup_fd wakeup_fd;
struct wakeup_fd_hdl *next;
} wakeup_fd_hdl;
-typedef struct
-{
+typedef struct {
grpc_pollset *pollset;
grpc_fd *fd;
grpc_closure closure;
} delayed_add;
-typedef struct
-{
+typedef struct {
int epoll_fd;
wakeup_fd_hdl *free_wakeup_fds;
} pollset_hdr;
-static void
-finally_add_fd (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc_fd * fd)
-{
+static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd) {
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
@@ -75,98 +71,90 @@ finally_add_fd (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc_fd * fd)
/* We pretend to be polling whilst adding an fd to keep the fd from being
closed during the add. This may result in a spurious wakeup being assigned
to this pollset whilst adding, but that should be benign. */
- GPR_ASSERT (grpc_fd_begin_poll (fd, pollset, 0, 0, &watcher) == 0);
- if (watcher.fd != NULL)
- {
- ev.events = (uint32_t) (EPOLLIN | EPOLLOUT | EPOLLET);
- ev.data.ptr = fd;
- err = epoll_ctl (h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
- if (err < 0)
- {
- /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
- if (errno != EEXIST)
- {
- gpr_log (GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, strerror (errno));
- }
- }
+ GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0);
+ if (watcher.fd != NULL) {
+ ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
+ ev.data.ptr = fd;
+ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+ if (err < 0) {
+ /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
+ if (errno != EEXIST) {
+ gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
+ strerror(errno));
+ }
}
- grpc_fd_end_poll (exec_ctx, &watcher, 0, 0);
+ }
+ grpc_fd_end_poll(exec_ctx, &watcher, 0, 0);
}
-static void
-perform_delayed_add (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_status)
-{
+static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_status) {
delayed_add *da = arg;
- if (!grpc_fd_is_orphaned (da->fd))
- {
- finally_add_fd (exec_ctx, da->pollset, da->fd);
- }
+ if (!grpc_fd_is_orphaned(da->fd)) {
+ finally_add_fd(exec_ctx, da->pollset, da->fd);
+ }
- gpr_mu_lock (&da->pollset->mu);
+ gpr_mu_lock(&da->pollset->mu);
da->pollset->in_flight_cbs--;
- if (da->pollset->shutting_down)
- {
- /* We don't care about this pollset anymore. */
- if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown)
- {
- da->pollset->called_shutdown = 1;
- grpc_exec_ctx_enqueue (exec_ctx, da->pollset->shutdown_done, 1);
- }
+ if (da->pollset->shutting_down) {
+ /* We don't care about this pollset anymore. */
+ if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
+ da->pollset->called_shutdown = 1;
+ grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, 1);
}
- gpr_mu_unlock (&da->pollset->mu);
+ }
+ gpr_mu_unlock(&da->pollset->mu);
- GRPC_FD_UNREF (da->fd, "delayed_add");
+ GRPC_FD_UNREF(da->fd, "delayed_add");
- gpr_free (da);
+ gpr_free(da);
}
-static void
-multipoll_with_epoll_pollset_add_fd (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc_fd * fd, int and_unlock_pollset)
-{
- if (and_unlock_pollset)
- {
- gpr_mu_unlock (&pollset->mu);
- finally_add_fd (exec_ctx, pollset, fd);
- }
- else
- {
- delayed_add *da = gpr_malloc (sizeof (*da));
- da->pollset = pollset;
- da->fd = fd;
- GRPC_FD_REF (fd, "delayed_add");
- grpc_closure_init (&da->closure, perform_delayed_add, da);
- pollset->in_flight_cbs++;
- grpc_exec_ctx_enqueue (exec_ctx, &da->closure, 1);
- }
+static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ grpc_fd *fd,
+ int and_unlock_pollset) {
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ finally_add_fd(exec_ctx, pollset, fd);
+ } else {
+ delayed_add *da = gpr_malloc(sizeof(*da));
+ da->pollset = pollset;
+ da->fd = fd;
+ GRPC_FD_REF(fd, "delayed_add");
+ grpc_closure_init(&da->closure, perform_delayed_add, da);
+ pollset->in_flight_cbs++;
+ grpc_exec_ctx_enqueue(exec_ctx, &da->closure, 1);
+ }
}
-static void
-multipoll_with_epoll_pollset_del_fd (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc_fd * fd, int and_unlock_pollset)
-{
+static void multipoll_with_epoll_pollset_del_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ grpc_fd *fd,
+ int and_unlock_pollset) {
pollset_hdr *h = pollset->data.ptr;
int err;
- if (and_unlock_pollset)
- {
- gpr_mu_unlock (&pollset->mu);
- }
+ if (and_unlock_pollset) {
+ gpr_mu_unlock(&pollset->mu);
+ }
/* Note that this can race with concurrent poll, but that should be fine since
* at worst it creates a spurious read event on a reused grpc_fd object. */
- err = epoll_ctl (h->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
- if (err < 0)
- {
- gpr_log (GPR_ERROR, "epoll_ctl del for %d failed: %s", fd->fd, strerror (errno));
- }
+ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
+ if (err < 0) {
+ gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd->fd,
+ strerror(errno));
+ }
}
/* TODO(klempner): We probably want to turn this down a bit */
#define GRPC_EPOLL_MAX_EVENTS 1000
-static void
-multipoll_with_epoll_pollset_maybe_work_and_unlock (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc_pollset_worker * worker, gpr_timespec deadline, gpr_timespec now)
-{
+static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
+ grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline, gpr_timespec now) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int ep_rv;
int poll_rv;
@@ -180,116 +168,93 @@ multipoll_with_epoll_pollset_maybe_work_and_unlock (grpc_exec_ctx * exec_ctx, gr
* here.
*/
- gpr_mu_unlock (&pollset->mu);
+ gpr_mu_unlock(&pollset->mu);
- timeout_ms = grpc_poll_deadline_to_millis_timeout (deadline, now);
+ timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
- pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD (&worker->wakeup_fd);
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
pfds[0].events = POLLIN;
pfds[0].revents = 0;
pfds[1].fd = h->epoll_fd;
pfds[1].events = POLLIN;
pfds[1].revents = 0;
- poll_rv = grpc_poll_function (pfds, 2, timeout_ms);
+ poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
- if (poll_rv < 0)
- {
- if (errno != EINTR)
- {
- gpr_log (GPR_ERROR, "poll() failed: %s", strerror (errno));
- }
+ if (poll_rv < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
}
- else if (poll_rv == 0)
- {
- /* do nothing */
+ } else if (poll_rv == 0) {
+ /* do nothing */
+ } else {
+ if (pfds[0].revents) {
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
- else
- {
- if (pfds[0].revents)
- {
- grpc_wakeup_fd_consume_wakeup (&worker->wakeup_fd);
- }
- if (pfds[1].revents)
- {
- do
- {
- ep_rv = epoll_wait (h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
- if (ep_rv < 0)
- {
- if (errno != EINTR)
- {
- gpr_log (GPR_ERROR, "epoll_wait() failed: %s", strerror (errno));
- }
- }
- else
- {
- int i;
- for (i = 0; i < ep_rv; ++i)
- {
- grpc_fd *fd = ep_ev[i].data.ptr;
- /* TODO(klempner): We might want to consider making err and pri
- * separate events */
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write = ep_ev[i].events & EPOLLOUT;
- if (read || cancel)
- {
- grpc_fd_become_readable (exec_ctx, fd);
- }
- if (write || cancel)
- {
- grpc_fd_become_writable (exec_ctx, fd);
- }
- }
- }
- }
- while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
- }
+ if (pfds[1].revents) {
+ do {
+ ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
+ if (ep_rv < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
+ }
+ } else {
+ int i;
+ for (i = 0; i < ep_rv; ++i) {
+ grpc_fd *fd = ep_ev[i].data.ptr;
+ /* TODO(klempner): We might want to consider making err and pri
+ * separate events */
+ int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+ int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+ int write = ep_ev[i].events & EPOLLOUT;
+ if (read || cancel) {
+ grpc_fd_become_readable(exec_ctx, fd);
+ }
+ if (write || cancel) {
+ grpc_fd_become_writable(exec_ctx, fd);
+ }
+ }
+ }
+ } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
}
+ }
}
-static void
-multipoll_with_epoll_pollset_finish_shutdown (grpc_pollset * pollset)
-{
-}
+static void multipoll_with_epoll_pollset_finish_shutdown(
+ grpc_pollset *pollset) {}
-static void
-multipoll_with_epoll_pollset_destroy (grpc_pollset * pollset)
-{
+static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
- close (h->epoll_fd);
- gpr_free (h);
+ close(h->epoll_fd);
+ gpr_free(h);
}
static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
- multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd,
- multipoll_with_epoll_pollset_maybe_work_and_unlock,
- multipoll_with_epoll_pollset_finish_shutdown,
- multipoll_with_epoll_pollset_destroy
-};
-
-static void
-epoll_become_multipoller (grpc_exec_ctx * exec_ctx, grpc_pollset * pollset, grpc_fd ** fds, size_t nfds)
-{
+ multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd,
+ multipoll_with_epoll_pollset_maybe_work_and_unlock,
+ multipoll_with_epoll_pollset_finish_shutdown,
+ multipoll_with_epoll_pollset_destroy};
+
+static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset, grpc_fd **fds,
+ size_t nfds) {
size_t i;
- pollset_hdr *h = gpr_malloc (sizeof (pollset_hdr));
+ pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
pollset->vtable = &multipoll_with_epoll_pollset;
pollset->data.ptr = h;
- h->epoll_fd = epoll_create1 (EPOLL_CLOEXEC);
- if (h->epoll_fd < 0)
- {
- /* TODO(klempner): Fall back to poll here, especially on ENOSYS */
- gpr_log (GPR_ERROR, "epoll_create1 failed: %s", strerror (errno));
- abort ();
- }
- for (i = 0; i < nfds; i++)
- {
- multipoll_with_epoll_pollset_add_fd (exec_ctx, pollset, fds[i], 0);
- }
+ h->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ if (h->epoll_fd < 0) {
+ /* TODO(klempner): Fall back to poll here, especially on ENOSYS */
+ gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
+ abort();
+ }
+ for (i = 0; i < nfds; i++) {
+ multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0);
+ }
}
-grpc_platform_become_multipoller_type grpc_platform_become_multipoller = epoll_become_multipoller;
+grpc_platform_become_multipoller_type grpc_platform_become_multipoller =
+ epoll_become_multipoller;
#endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */