aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/pollset_multipoller_with_poll_posix.c
diff options
context:
space:
mode:
authorGravatar ctiller <ctiller@google.com>2015-01-07 14:03:30 -0800
committerGravatar Tim Emiola <temiola@google.com>2015-01-08 13:01:56 -0800
commit1a277ecd93e908a47a905d323a4a1a77287cede1 (patch)
treed1648196ca8201d778e73c7c9e59a8ac90613763 /src/core/iomgr/pollset_multipoller_with_poll_posix.c
parent3040cb7c434e83c0e70839ac20218f1c2d77e1eb (diff)
Remove libevent.
Fixed any exposed bugs across the stack. Add a poll() based implementation. Heavily leverages pollset infrastructure to allow small polls to be the norm. Exposes a mechanism to plug in epoll/kqueue for platforms where we have them. Simplify iomgr callbacks to return one bit of success or failure (instead of the multi valued result that was mostly unused previously). This will ease the burden on new implementations, and the previous system provided no real value anyway. Removed timeouts on endpoint read/write routines. This simplifies porting burden by providing a more orthogonal interface, and the functionality can always be replicated when desired by using an alarm combined with endpoint_shutdown. I'm fairly certain we ended up with this interface because it was convenient to do from libevent. Things that need attention still: - adding an fd to a pollset is O(n^2) - but this is probably ok given that we'll not use this for multipolling once platform specific implementations are added. - we rely on the backup poller too often - especially for SSL handshakes and for client connection establishment we should have a better mechanism ([] [] - Linux needs to use epoll for multiple fds, FreeBSD variants (including Darwin) need to use kqueue. ([] [] - Linux needs to use eventfd for poll kicking. ([] Change on 2015/01/07 by ctiller <ctiller@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=83461069
Diffstat (limited to 'src/core/iomgr/pollset_multipoller_with_poll_posix.c')
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c237
1 files changed, 237 insertions, 0 deletions
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
new file mode 100644
index 0000000000..06c7a5a0dd
--- /dev/null
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -0,0 +1,237 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_MULTIPOLL_WITH_POLL
+
+#include "src/core/iomgr/pollset_posix.h"
+
+#include <errno.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/iomgr_internal.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+
+typedef struct {
+ /* all polled fds */
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+ /* fds being polled by the current poller: parallel arrays of pollfd and the
+ * grpc_fd* that the pollfd was constructed from */
+ size_t pfd_count;
+ size_t pfd_capacity;
+ grpc_fd **selfds;
+ struct pollfd *pfds;
+ /* fds that have been removed from the pollset explicitly */
+ size_t del_count;
+ size_t del_capacity;
+ grpc_fd **dels;
+} pollset_hdr;
+
+static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset,
+ grpc_fd *fd) {
+ size_t i;
+ pollset_hdr *h = pollset->data.ptr;
+ /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
+ for (i = 0; i < h->fd_count; i++) {
+ if (h->fds[i] == fd) return;
+ }
+ if (h->fd_count == h->fd_capacity) {
+ h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2);
+ h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity);
+ }
+ h->fds[h->fd_count++] = fd;
+ grpc_fd_ref(fd);
+}
+
+static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset,
+ grpc_fd *fd) {
+ /* will get removed next poll cycle */
+ pollset_hdr *h = pollset->data.ptr;
+ if (h->del_count == h->del_capacity) {
+ h->del_capacity = GPR_MAX(h->del_capacity + 8, h->del_count * 3 / 2);
+ h->dels = gpr_realloc(h->dels, sizeof(grpc_fd *) * h->del_capacity);
+ }
+ h->dels[h->del_count++] = fd;
+ grpc_fd_ref(fd);
+}
+
+static void end_polling(grpc_pollset *pollset) {
+ size_t i;
+ pollset_hdr *h;
+ h = pollset->data.ptr;
+ for (i = 1; i < h->pfd_count; i++) {
+ grpc_fd_end_poll(h->selfds[i], pollset);
+ }
+}
+
+static int multipoll_with_poll_pollset_maybe_work(
+ grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
+ int allow_synchronous_callback) {
+ int timeout;
+ int r;
+ size_t i, np, nf, nd;
+ pollset_hdr *h;
+
+ if (pollset->counter) {
+ return 0;
+ }
+ h = pollset->data.ptr;
+ if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
+ timeout = -1;
+ } else {
+ timeout = gpr_time_to_millis(gpr_time_sub(deadline, now));
+ if (timeout <= 0) {
+ return 1;
+ }
+ }
+ if (h->pfd_capacity < h->fd_count + 1) {
+ h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1);
+ gpr_free(h->pfds);
+ gpr_free(h->selfds);
+ h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity);
+ h->selfds = gpr_malloc(sizeof(grpc_fd *) * h->pfd_capacity);
+ }
+ nf = 0;
+ np = 1;
+ h->pfds[0].fd = grpc_kick_read_fd(pollset);
+ h->pfds[0].events = POLLIN;
+ h->pfds[0].revents = POLLOUT;
+ for (i = 0; i < h->fd_count; i++) {
+ int remove = grpc_fd_is_orphaned(h->fds[i]);
+ for (nd = 0; nd < h->del_count; nd++) {
+ if (h->fds[i] == h->dels[nd]) remove = 1;
+ }
+ if (remove) {
+ grpc_fd_unref(h->fds[i]);
+ } else {
+ h->fds[nf++] = h->fds[i];
+ h->pfds[np].events =
+ grpc_fd_begin_poll(h->fds[i], pollset, POLLIN, POLLOUT);
+ h->selfds[np] = h->fds[i];
+ h->pfds[np].fd = h->fds[i]->fd;
+ h->pfds[np].revents = 0;
+ np++;
+ }
+ }
+ h->pfd_count = np;
+ h->fd_count = nf;
+ for (nd = 0; nd < h->del_count; nd++) {
+ grpc_fd_unref(h->dels[nd]);
+ }
+ h->del_count = 0;
+ if (h->pfd_count == 0) {
+ end_polling(pollset);
+ return 0;
+ }
+ pollset->counter = 1;
+ gpr_mu_unlock(&pollset->mu);
+
+ r = poll(h->pfds, h->pfd_count, timeout);
+ if (r < 0) {
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ } else if (r == 0) {
+ /* do nothing */
+ } else {
+ if (h->pfds[0].revents & POLLIN) {
+ grpc_kick_drain(pollset);
+ }
+ for (i = 1; i < np; i++) {
+ if (h->pfds[i].revents & POLLIN) {
+ grpc_fd_become_readable(h->selfds[i], allow_synchronous_callback);
+ }
+ if (h->pfds[i].revents & POLLOUT) {
+ grpc_fd_become_writable(h->selfds[i], allow_synchronous_callback);
+ }
+ }
+ }
+ end_polling(pollset);
+
+ gpr_mu_lock(&pollset->mu);
+ pollset->counter = 0;
+ gpr_cv_broadcast(&pollset->cv);
+ return 1;
+}
+
+static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
+ size_t i;
+ pollset_hdr *h = pollset->data.ptr;
+ GPR_ASSERT(pollset->counter == 0);
+ for (i = 0; i < h->fd_count; i++) {
+ grpc_fd_unref(h->fds[i]);
+ }
+ for (i = 0; i < h->del_count; i++) {
+ grpc_fd_unref(h->dels[i]);
+ }
+ gpr_free(h->pfds);
+ gpr_free(h->selfds);
+ gpr_free(h->fds);
+ gpr_free(h->dels);
+ gpr_free(h);
+}
+
+static const grpc_pollset_vtable multipoll_with_poll_pollset = {
+ multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd,
+ multipoll_with_poll_pollset_maybe_work,
+ multipoll_with_poll_pollset_destroy};
+
+void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
+ size_t nfds) {
+ size_t i;
+ pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
+ pollset->vtable = &multipoll_with_poll_pollset;
+ pollset->data.ptr = h;
+ h->fd_count = nfds;
+ h->fd_capacity = nfds;
+ h->fds = gpr_malloc(nfds * sizeof(grpc_fd *));
+ h->pfd_count = 0;
+ h->pfd_capacity = 0;
+ h->pfds = NULL;
+ h->selfds = NULL;
+ h->del_count = 0;
+ h->del_capacity = 0;
+ h->dels = NULL;
+ for (i = 0; i < nfds; i++) {
+ h->fds[i] = fds[i];
+ grpc_fd_ref(fds[i]);
+ }
+}
+
+#endif