aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/pollset_multipoller_with_poll_posix.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-29 15:58:11 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-29 15:58:11 -0700
commit5ddbb9d405c01564b935e9ab81ae2833124e0b12 (patch)
tree1fd4d868339987be609ebb96331c023272c7b065 /src/core/iomgr/pollset_multipoller_with_poll_posix.c
parent4b6a0c73f318f6bc3d6e81676b06d8619e5e8325 (diff)
Allow specific pollers to be woken
Currently, if two threads call grpc_completion_queue_pluck on the same completion queue for different tags, there is a 50% chance that we deliver the completion wakeup to the wrong poller - forcing the correct poller to wait until its polling times out before it can return an event up to the application. This change tweaks our polling interfaces so that we can indeed wake a specific poller. Nothing has been performance tuned yet. It's definitely sub-optimal in a number of places. Wakeup file-descriptors should be recycled. We should have a path that avoids calling poll() followed by epoll(). We can probably live without it right at the second though. This code will fail on Windows at least (I'll do that port when I'm in the office and have a Windows machine).
Diffstat (limited to 'src/core/iomgr/pollset_multipoller_with_poll_posix.c')
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c123
1 files changed, 43 insertions, 80 deletions
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 0084e83953..1249b1b64a 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -53,12 +53,6 @@ typedef struct {
size_t fd_count;
size_t fd_capacity;
grpc_fd **fds;
- /* fds being polled by the current poller: parallel arrays of pollfd, and
- a grpc_fd_watcher */
- size_t pfd_count;
- size_t pfd_capacity;
- grpc_fd_watcher *watchers;
- struct pollfd *pfds;
/* fds that have been removed from the pollset explicitly */
size_t del_count;
size_t del_capacity;
@@ -102,80 +96,60 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset,
}
}
-static void end_polling(grpc_pollset *pollset) {
- size_t i;
- pollset_hdr *h;
- h = pollset->data.ptr;
- for (i = 1; i < h->pfd_count; i++) {
- grpc_fd_end_poll(&h->watchers[i], h->pfds[i].revents & POLLIN,
- h->pfds[i].revents & POLLOUT);
- }
-}
-
static void multipoll_with_poll_pollset_maybe_work(
- grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
+ grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline,
+ gpr_timespec now, int allow_synchronous_callback) {
int timeout;
int r;
- size_t i, np, nf, nd;
+ size_t i, j, pfd_count, fd_count;
pollset_hdr *h;
- grpc_kick_fd_info *kfd;
+ /* TODO(ctiller): inline some elements to avoid an allocation */
+ grpc_fd_watcher *watchers;
+ struct pollfd *pfds;
h = pollset->data.ptr;
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
- if (h->pfd_capacity < h->fd_count + 1) {
- h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1);
- gpr_free(h->pfds);
- gpr_free(h->watchers);
- h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity);
- h->watchers = gpr_malloc(sizeof(grpc_fd_watcher) * h->pfd_capacity);
- }
- nf = 0;
- np = 1;
- kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
- if (kfd == NULL) {
- /* Already kicked */
- return;
- }
- h->pfds[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
- h->pfds[0].events = POLLIN;
- h->pfds[0].revents = POLLOUT;
+ /* TODO(ctiller): perform just one malloc here if we exceed the inline case */
+ pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 1));
+ watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 1));
+ fd_count = 0;
+ pfd_count = 1;
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfds[0].events = POLLIN;
+ pfds[0].revents = POLLOUT;
for (i = 0; i < h->fd_count; i++) {
int remove = grpc_fd_is_orphaned(h->fds[i]);
- for (nd = 0; nd < h->del_count; nd++) {
- if (h->fds[i] == h->dels[nd]) remove = 1;
+ for (j = 0; j < h->del_count; j++) {
+ if (h->fds[i] == h->dels[j]) remove = 1;
}
if (remove) {
GRPC_FD_UNREF(h->fds[i], "multipoller");
} else {
- h->fds[nf++] = h->fds[i];
- h->watchers[np].fd = h->fds[i];
- h->pfds[np].fd = h->fds[i]->fd;
- h->pfds[np].revents = 0;
- np++;
+ h->fds[fd_count++] = h->fds[i];
+ watchers[pfd_count].fd = h->fds[i];
+ pfds[pfd_count].fd = h->fds[i]->fd;
+ pfds[pfd_count].revents = 0;
+ pfd_count++;
}
}
- h->pfd_count = np;
- h->fd_count = nf;
- for (nd = 0; nd < h->del_count; nd++) {
- GRPC_FD_UNREF(h->dels[nd], "multipoller_del");
+ for (j = 0; j < h->del_count; j++) {
+ GRPC_FD_UNREF(h->dels[j], "multipoller_del");
}
h->del_count = 0;
- if (h->pfd_count == 0) {
- end_polling(pollset);
- return;
- }
- pollset->counter++;
+ h->fd_count = fd_count;
gpr_mu_unlock(&pollset->mu);
- for (i = 1; i < np; i++) {
- h->pfds[i].events = grpc_fd_begin_poll(h->watchers[i].fd, pollset, POLLIN,
- POLLOUT, &h->watchers[i]);
+ for (i = 1; i < pfd_count; i++) {
+ pfds[i].events = grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN,
+ POLLOUT, &watchers[i]);
}
- r = poll(h->pfds, h->pfd_count, timeout);
+ r = poll(pfds, pfd_count, timeout);
- end_polling(pollset);
+ for (i = 1; i < pfd_count; i++) {
+ grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,
+ pfds[i].revents & POLLOUT);
+ }
if (r < 0) {
if (errno != EINTR) {
@@ -184,35 +158,31 @@ static void multipoll_with_poll_pollset_maybe_work(
} else if (r == 0) {
/* do nothing */
} else {
- if (h->pfds[0].revents & POLLIN) {
- grpc_pollset_kick_consume(&pollset->kick_state, kfd);
+ if (pfds[0].revents & POLLIN) {
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
- for (i = 1; i < np; i++) {
- if (h->watchers[i].fd == NULL) {
+ for (i = 1; i < pfd_count; i++) {
+ if (watchers[i].fd == NULL) {
continue;
}
- if (h->pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
- grpc_fd_become_readable(h->watchers[i].fd, allow_synchronous_callback);
+ if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
+ grpc_fd_become_readable(watchers[i].fd, allow_synchronous_callback);
}
- if (h->pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
- grpc_fd_become_writable(h->watchers[i].fd, allow_synchronous_callback);
+ if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
+ grpc_fd_become_writable(watchers[i].fd, allow_synchronous_callback);
}
}
}
- grpc_pollset_kick_post_poll(&pollset->kick_state, kfd);
- gpr_mu_lock(&pollset->mu);
- pollset->counter--;
-}
+ gpr_free(pfds);
+ gpr_free(watchers);
-static void multipoll_with_poll_pollset_kick(grpc_pollset *p) {
- grpc_pollset_force_kick(p);
+ gpr_mu_lock(&pollset->mu);
}
static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
size_t i;
pollset_hdr *h = pollset->data.ptr;
- GPR_ASSERT(pollset->counter == 0);
for (i = 0; i < h->fd_count; i++) {
GRPC_FD_UNREF(h->fds[i], "multipoller");
}
@@ -226,8 +196,6 @@ static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
multipoll_with_poll_pollset_finish_shutdown(pollset);
- gpr_free(h->pfds);
- gpr_free(h->watchers);
gpr_free(h->fds);
gpr_free(h->dels);
gpr_free(h);
@@ -237,7 +205,6 @@ static const grpc_pollset_vtable multipoll_with_poll_pollset = {
multipoll_with_poll_pollset_add_fd,
multipoll_with_poll_pollset_del_fd,
multipoll_with_poll_pollset_maybe_work,
- multipoll_with_poll_pollset_kick,
multipoll_with_poll_pollset_finish_shutdown,
multipoll_with_poll_pollset_destroy};
@@ -250,10 +217,6 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
h->fd_count = nfds;
h->fd_capacity = nfds;
h->fds = gpr_malloc(nfds * sizeof(grpc_fd *));
- h->pfd_count = 0;
- h->pfd_capacity = 0;
- h->pfds = NULL;
- h->watchers = NULL;
h->del_count = 0;
h->del_capacity = 0;
h->dels = NULL;