aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/channel_args.c9
-rw-r--r--src/core/lib/channel/compress_filter.c12
-rw-r--r--src/core/lib/channel/compress_filter.h2
-rw-r--r--src/core/lib/http/parser.c3
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c2
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c1212
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.h41
-rw-r--r--src/core/lib/iomgr/ev_posix.c93
-rw-r--r--src/core/lib/iomgr/exec_ctx.c17
-rw-r--r--src/core/lib/iomgr/exec_ctx.h29
-rw-r--r--src/core/lib/iomgr/iomgr_posix.c6
-rw-r--r--src/core/lib/iomgr/udp_server.c18
-rw-r--r--src/core/lib/iomgr/udp_server.h6
-rw-r--r--src/core/lib/support/string_util_win32.c2
-rw-r--r--src/core/lib/surface/byte_buffer_reader.c19
-rw-r--r--src/core/lib/surface/call.c32
-rw-r--r--src/core/lib/surface/init.c2
17 files changed, 1464 insertions, 41 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index 28d2d78d00..893cf0700e 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -170,7 +170,7 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
if (a == NULL) return 0;
for (i = 0; i < a->num_args; ++i) {
if (a->args[i].type == GRPC_ARG_INTEGER &&
- !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) {
+ !strcmp(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, a->args[i].key)) {
return (grpc_compression_algorithm)a->args[i].value.integer;
break;
}
@@ -182,7 +182,7 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
grpc_channel_args *a, grpc_compression_algorithm algorithm) {
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
- tmp.key = GRPC_COMPRESSION_ALGORITHM_ARG;
+ tmp.key = GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM;
tmp.value.integer = algorithm;
return grpc_channel_args_copy_and_add(a, &tmp, 1);
}
@@ -196,7 +196,8 @@ static int find_compression_algorithm_states_bitset(const grpc_channel_args *a,
size_t i;
for (i = 0; i < a->num_args; ++i) {
if (a->args[i].type == GRPC_ARG_INTEGER &&
- !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) {
+ !strcmp(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
+ a->args[i].key)) {
*states_arg = &a->args[i].value.integer;
return 1; /* GPR_TRUE */
}
@@ -222,7 +223,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
/* create a new arg */
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
- tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
+ tmp.key = GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET;
/* all enabled by default */
tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
if (state != 0) {
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 5510c79b18..0e548c61b8 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -47,7 +47,7 @@
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
-int grpc_compress_filter_trace = 0;
+int grpc_compression_trace = 0;
typedef struct call_data {
gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
@@ -171,7 +171,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
did_compress =
grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp);
if (did_compress) {
- if (grpc_compress_filter_trace) {
+ if (grpc_compression_trace) {
char *algo_name;
const size_t before_size = calld->slices.length;
const size_t after_size = tmp.length;
@@ -185,12 +185,14 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
gpr_slice_buffer_swap(&calld->slices, &tmp);
calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
} else {
- if (grpc_compress_filter_trace) {
+ if (grpc_compression_trace) {
char *algo_name;
GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
&algo_name));
- gpr_log(GPR_DEBUG, "Algorithm '%s' enabled but decided not to compress.",
- algo_name);
+ gpr_log(
+ GPR_DEBUG,
+ "Algorithm '%s' enabled but decided not to compress. Input size: %d",
+ algo_name, calld->slices.length);
}
}
diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h
index cf5879d82e..0ce5d08837 100644
--- a/src/core/lib/channel/compress_filter.h
+++ b/src/core/lib/channel/compress_filter.h
@@ -38,7 +38,7 @@
#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "grpc-internal-encoding-request"
-extern int grpc_compress_filter_trace;
+extern int grpc_compression_trace;
/** Compression filter for outgoing data.
*
diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c
index a7efb5e73e..09b2ed40d1 100644
--- a/src/core/lib/http/parser.c
+++ b/src/core/lib/http/parser.c
@@ -161,8 +161,9 @@ static int add_header(grpc_http_parser *parser) {
cur++;
}
if (cur == end) {
- if (grpc_http1_trace)
+ if (grpc_http1_trace) {
gpr_log(GPR_ERROR, "Didn't find ':' in header string");
+ }
goto error;
}
GPR_ASSERT(cur >= beg);
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index 3c8127e1a8..aeb6e28665 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -790,7 +790,6 @@ static void pollset_kick(grpc_pollset *p,
static void pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
gpr_tls_init(&g_current_thread_worker);
- grpc_wakeup_fd_global_init();
grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
}
@@ -798,7 +797,6 @@ static void pollset_global_shutdown(void) {
grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_poller);
gpr_tls_destroy(&g_current_thread_worker);
- grpc_wakeup_fd_global_destroy();
}
static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
new file mode 100644
index 0000000000..e91ae40212
--- /dev/null
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -0,0 +1,1212 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_SOCKET
+
+#include "src/core/lib/iomgr/ev_poll_posix.h"
+
+#include <assert.h>
+#include <errno.h>
+#include <poll.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/tls.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/block_annotate.h"
+
+/*******************************************************************************
+ * FD declarations
+ */
+
+typedef struct grpc_fd_watcher {
+ struct grpc_fd_watcher *next;
+ struct grpc_fd_watcher *prev;
+ grpc_pollset *pollset;
+ grpc_pollset_worker *worker;
+ grpc_fd *fd;
+} grpc_fd_watcher;
+
+struct grpc_fd {
+ int fd;
+ /* refst format:
+ bit0: 1=active/0=orphaned
+ bit1-n: refcount
+ meaning that mostly we ref by two to avoid altering the orphaned bit,
+ and just unref by 1 when we're ready to flag the object as orphaned */
+ gpr_atm refst;
+
+ gpr_mu mu;
+ int shutdown;
+ int closed;
+ int released;
+
+ /* The watcher list.
+
+ The following watcher related fields are protected by watcher_mu.
+
+ An fd_watcher is an ephemeral object created when an fd wants to
+ begin polling, and destroyed after the poll.
+
+ It denotes the fd's interest in whether to read poll or write poll
+ or both or neither on this fd.
+
+ If a watcher is asked to poll for reads or writes, the read_watcher
+ or write_watcher fields are set respectively. A watcher may be asked
+ to poll for both, in which case both fields will be set.
+
+ read_watcher and write_watcher may be NULL if no watcher has been
+ asked to poll for reads or writes.
+
+ If an fd_watcher is not asked to poll for reads or writes, it's added
+ to a linked list of inactive watchers, rooted at inactive_watcher_root.
+ If at a later time there becomes need of a poller to poll, one of
+ the inactive pollers may be kicked out of their poll loops to take
+ that responsibility. */
+ grpc_fd_watcher inactive_watcher_root;
+ grpc_fd_watcher *read_watcher;
+ grpc_fd_watcher *write_watcher;
+
+ grpc_closure *read_closure;
+ grpc_closure *write_closure;
+
+ grpc_closure *on_done_closure;
+
+ grpc_iomgr_object iomgr_object;
+};
+
+/* Begin polling on an fd.
+ Registers that the given pollset is interested in this fd - so that if read
+ or writability interest changes, the pollset can be kicked to pick up that
+ new interest.
+ Return value is:
+ (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
+ i.e. a combination of read_mask and write_mask determined by the fd's current
+ interest in said events.
+ Polling strategies that do not need to alter their behavior depending on the
+ fd's current interest (such as epoll) do not need to call this function.
+ MUST NOT be called with a pollset lock taken */
+static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
+ grpc_pollset_worker *worker, uint32_t read_mask,
+ uint32_t write_mask, grpc_fd_watcher *rec);
+/* Complete polling previously started with fd_begin_poll
+ MUST NOT be called with a pollset lock taken
+ if got_read or got_write are 1, also does the become_{readable,writable} as
+ appropriate. */
+static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
+ int got_read, int got_write);
+
+/* Return 1 if this fd is orphaned, 0 otherwise */
+static bool fd_is_orphaned(grpc_fd *fd);
+
+/* Reference counting for fds */
+/*#define GRPC_FD_REF_COUNT_DEBUG*/
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
+static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
+ int line);
+#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
+#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
+#else
+static void fd_ref(grpc_fd *fd);
+static void fd_unref(grpc_fd *fd);
+#define GRPC_FD_REF(fd, reason) fd_ref(fd)
+#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
+#endif
+
+#define CLOSURE_NOT_READY ((grpc_closure *)0)
+#define CLOSURE_READY ((grpc_closure *)1)
+
+/*******************************************************************************
+ * pollset declarations
+ */
+
+typedef struct grpc_cached_wakeup_fd {
+ grpc_wakeup_fd fd;
+ struct grpc_cached_wakeup_fd *next;
+} grpc_cached_wakeup_fd;
+
+struct grpc_pollset_worker {
+ grpc_cached_wakeup_fd *wakeup_fd;
+ int reevaluate_polling_on_wakeup;
+ int kicked_specifically;
+ struct grpc_pollset_worker *next;
+ struct grpc_pollset_worker *prev;
+};
+
+struct grpc_pollset {
+ gpr_mu mu;
+ grpc_pollset_worker root_worker;
+ int in_flight_cbs;
+ int shutting_down;
+ int called_shutdown;
+ int kicked_without_pollers;
+ grpc_closure *shutdown_done;
+ grpc_closure_list idle_jobs;
+ /* all polled fds */
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+ /* fds that have been removed from the pollset explicitly */
+ size_t del_count;
+ size_t del_capacity;
+ grpc_fd **dels;
+ /* Local cache of eventfds for workers */
+ grpc_cached_wakeup_fd *local_wakeup_cache;
+};
+
+/* Add an fd to a pollset */
+static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ struct grpc_fd *fd);
+
+static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd);
+
+/* Convert a timespec to milliseconds:
+ - very small or negative poll times are clamped to zero to do a
+ non-blocking poll (which becomes spin polling)
+ - other small values are rounded up to one millisecond
+ - longer than a millisecond polls are rounded up to the next nearest
+ millisecond to avoid spinning
+ - infinite timeouts are converted to -1 */
+static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
+ gpr_timespec now);
+
+/* Allow kick to wakeup the currently polling worker */
+#define GRPC_POLLSET_CAN_KICK_SELF 1
+/* Force the wakee to repoll when awoken */
+#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
+/* As per pollset_kick, with an extended set of flags (defined above)
+ -- mostly for fd_posix's use. */
+static void pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ uint32_t flags);
+
+/* Return 1 if the pollset has active threads in pollset_work (pollset must
+ * be locked) */
+static int pollset_has_workers(grpc_pollset *pollset);
+
+/*******************************************************************************
+ * pollset_set definitions
+ */
+
+struct grpc_pollset_set {
+ gpr_mu mu;
+
+ size_t pollset_count;
+ size_t pollset_capacity;
+ grpc_pollset **pollsets;
+
+ size_t pollset_set_count;
+ size_t pollset_set_capacity;
+ struct grpc_pollset_set **pollset_sets;
+
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+};
+
+/*******************************************************************************
+ * fd_posix.c
+ */
+
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
+#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
+static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
+ int line) {
+ gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
+ gpr_atm_no_barrier_load(&fd->refst),
+ gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
+#else
+#define REF_BY(fd, n, reason) ref_by(fd, n)
+#define UNREF_BY(fd, n, reason) unref_by(fd, n)
+static void ref_by(grpc_fd *fd, int n) {
+#endif
+ GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
+}
+
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
+ int line) {
+ gpr_atm old;
+ gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
+ gpr_atm_no_barrier_load(&fd->refst),
+ gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
+#else
+static void unref_by(grpc_fd *fd, int n) {
+ gpr_atm old;
+#endif
+ old = gpr_atm_full_fetch_add(&fd->refst, -n);
+ if (old == n) {
+ gpr_mu_destroy(&fd->mu);
+ grpc_iomgr_unregister_object(&fd->iomgr_object);
+ gpr_free(fd);
+ } else {
+ GPR_ASSERT(old > n);
+ }
+}
+
+static grpc_fd *fd_create(int fd, const char *name) {
+ grpc_fd *r = gpr_malloc(sizeof(*r));
+ gpr_mu_init(&r->mu);
+ gpr_atm_rel_store(&r->refst, 1);
+ r->shutdown = 0;
+ r->read_closure = CLOSURE_NOT_READY;
+ r->write_closure = CLOSURE_NOT_READY;
+ r->fd = fd;
+ r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
+ &r->inactive_watcher_root;
+ r->read_watcher = r->write_watcher = NULL;
+ r->on_done_closure = NULL;
+ r->closed = 0;
+ r->released = 0;
+
+ char *name2;
+ gpr_asprintf(&name2, "%s fd=%d", name, fd);
+ grpc_iomgr_register_object(&r->iomgr_object, name2);
+ gpr_free(name2);
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+ gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
+#endif
+ return r;
+}
+
+static bool fd_is_orphaned(grpc_fd *fd) {
+ return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
+}
+
+static void pollset_kick_locked(grpc_fd_watcher *watcher) {
+ gpr_mu_lock(&watcher->pollset->mu);
+ GPR_ASSERT(watcher->worker);
+ pollset_kick_ext(watcher->pollset, watcher->worker,
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
+ gpr_mu_unlock(&watcher->pollset->mu);
+}
+
+static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
+ if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
+ pollset_kick_locked(fd->inactive_watcher_root.next);
+ } else if (fd->read_watcher) {
+ pollset_kick_locked(fd->read_watcher);
+ } else if (fd->write_watcher) {
+ pollset_kick_locked(fd->write_watcher);
+ }
+}
+
+static void wake_all_watchers_locked(grpc_fd *fd) {
+ grpc_fd_watcher *watcher;
+ for (watcher = fd->inactive_watcher_root.next;
+ watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
+ pollset_kick_locked(watcher);
+ }
+ if (fd->read_watcher) {
+ pollset_kick_locked(fd->read_watcher);
+ }
+ if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
+ pollset_kick_locked(fd->write_watcher);
+ }
+}
+
+static int has_watchers(grpc_fd *fd) {
+ return fd->read_watcher != NULL || fd->write_watcher != NULL ||
+ fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
+}
+
+static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+ fd->closed = 1;
+ if (!fd->released) {
+ close(fd->fd);
+ }
+ grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
+}
+
+static int fd_wrapped_fd(grpc_fd *fd) {
+ if (fd->released || fd->closed) {
+ return -1;
+ } else {
+ return fd->fd;
+ }
+}
+
+static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *on_done, int *release_fd,
+ const char *reason) {
+ fd->on_done_closure = on_done;
+ fd->released = release_fd != NULL;
+ if (!fd->released) {
+ shutdown(fd->fd, SHUT_RDWR);
+ } else {
+ *release_fd = fd->fd;
+ }
+ gpr_mu_lock(&fd->mu);
+ REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
+ if (!has_watchers(fd)) {
+ close_fd_locked(exec_ctx, fd);
+ } else {
+ wake_all_watchers_locked(fd);
+ }
+ gpr_mu_unlock(&fd->mu);
+ UNREF_BY(fd, 2, reason); /* drop the reference */
+}
+
+/* increment refcount by two to avoid changing the orphan bit */
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
+ int line) {
+ ref_by(fd, 2, reason, file, line);
+}
+
+static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
+ int line) {
+ unref_by(fd, 2, reason, file, line);
+}
+#else
+static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
+
+static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
+#endif
+
+static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st, grpc_closure *closure) {
+ if (*st == CLOSURE_NOT_READY) {
+ /* not ready ==> switch to a waiting state by setting the closure */
+ *st = closure;
+ } else if (*st == CLOSURE_READY) {
+ /* already ready ==> queue the closure to run immediately */
+ *st = CLOSURE_NOT_READY;
+ grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
+ maybe_wake_one_watcher_locked(fd);
+ } else {
+ /* upcallptr was set to a different closure. This is an error! */
+ gpr_log(GPR_ERROR,
+ "User called a notify_on function with a previous callback still "
+ "pending");
+ abort();
+ }
+}
+
+/* returns 1 if state becomes not ready */
+static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st) {
+ if (*st == CLOSURE_READY) {
+ /* duplicate ready ==> ignore */
+ return 0;
+ } else if (*st == CLOSURE_NOT_READY) {
+ /* not ready, and not waiting ==> flag ready */
+ *st = CLOSURE_READY;
+ return 0;
+ } else {
+ /* waiting ==> queue closure */
+ grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
+ *st = CLOSURE_NOT_READY;
+ return 1;
+ }
+}
+
+static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+ gpr_mu_lock(&fd->mu);
+ GPR_ASSERT(!fd->shutdown);
+ fd->shutdown = 1;
+ set_ready_locked(exec_ctx, fd, &fd->read_closure);
+ set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *closure) {
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *closure) {
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
+ grpc_pollset_worker *worker, uint32_t read_mask,
+ uint32_t write_mask, grpc_fd_watcher *watcher) {
+ uint32_t mask = 0;
+ grpc_closure *cur;
+ int requested;
+ /* keep track of pollers that have requested our events, in case they change
+ */
+ GRPC_FD_REF(fd, "poll");
+
+ gpr_mu_lock(&fd->mu);
+
+ /* if we are shutdown, then don't add to the watcher set */
+ if (fd->shutdown) {
+ watcher->fd = NULL;
+ watcher->pollset = NULL;
+ watcher->worker = NULL;
+ gpr_mu_unlock(&fd->mu);
+ GRPC_FD_UNREF(fd, "poll");
+ return 0;
+ }
+
+ /* if there is nobody polling for read, but we need to, then start doing so */
+ cur = fd->read_closure;
+ requested = cur != CLOSURE_READY;
+ if (read_mask && fd->read_watcher == NULL && requested) {
+ fd->read_watcher = watcher;
+ mask |= read_mask;
+ }
+ /* if there is nobody polling for write, but we need to, then start doing so
+ */
+ cur = fd->write_closure;
+ requested = cur != CLOSURE_READY;
+ if (write_mask && fd->write_watcher == NULL && requested) {
+ fd->write_watcher = watcher;
+ mask |= write_mask;
+ }
+ /* if not polling, remember this watcher in case we need someone to later */
+ if (mask == 0 && worker != NULL) {
+ watcher->next = &fd->inactive_watcher_root;
+ watcher->prev = watcher->next->prev;
+ watcher->next->prev = watcher->prev->next = watcher;
+ }
+ watcher->pollset = pollset;
+ watcher->worker = worker;
+ watcher->fd = fd;
+ gpr_mu_unlock(&fd->mu);
+
+ return mask;
+}
+
+static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
+ int got_read, int got_write) {
+ int was_polling = 0;
+ int kick = 0;
+ grpc_fd *fd = watcher->fd;
+
+ if (fd == NULL) {
+ return;
+ }
+
+ gpr_mu_lock(&fd->mu);
+
+ if (watcher == fd->read_watcher) {
+ /* remove read watcher, kick if we still need a read */
+ was_polling = 1;
+ if (!got_read) {
+ kick = 1;
+ }
+ fd->read_watcher = NULL;
+ }
+ if (watcher == fd->write_watcher) {
+ /* remove write watcher, kick if we still need a write */
+ was_polling = 1;
+ if (!got_write) {
+ kick = 1;
+ }
+ fd->write_watcher = NULL;
+ }
+ if (!was_polling && watcher->worker != NULL) {
+ /* remove from inactive list */
+ watcher->next->prev = watcher->prev;
+ watcher->prev->next = watcher->next;
+ }
+ if (got_read) {
+ if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
+ kick = 1;
+ }
+ }
+ if (got_write) {
+ if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
+ kick = 1;
+ }
+ }
+ if (kick) {
+ maybe_wake_one_watcher_locked(fd);
+ }
+ if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
+ close_fd_locked(exec_ctx, fd);
+ }
+ gpr_mu_unlock(&fd->mu);
+
+ GRPC_FD_UNREF(fd, "poll");
+}
+
+/*******************************************************************************
+ * pollset_posix.c
+ */
+
+GPR_TLS_DECL(g_current_thread_poller);
+GPR_TLS_DECL(g_current_thread_worker);
+
+static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev->next = worker->next;
+ worker->next->prev = worker->prev;
+}
+
+static int pollset_has_workers(grpc_pollset *p) {
+ return p->root_worker.next != &p->root_worker;
+}
+
+static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
+ if (pollset_has_workers(p)) {
+ grpc_pollset_worker *w = p->root_worker.next;
+ remove_worker(p, w);
+ return w;
+ } else {
+ return NULL;
+ }
+}
+
+static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->next = &p->root_worker;
+ worker->prev = worker->next->prev;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev = &p->root_worker;
+ worker->next = worker->prev->next;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ uint32_t flags) {
+ GPR_TIMER_BEGIN("pollset_kick_ext", 0);
+
+ /* pollset->mu already held */
+ if (specific_worker != NULL) {
+ if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+ GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
+ for (specific_worker = p->root_worker.next;
+ specific_worker != &p->root_worker;
+ specific_worker = specific_worker->next) {
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ }
+ p->kicked_without_pollers = 1;
+ GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
+ } else if (gpr_tls_get(&g_current_thread_worker) !=
+ (intptr_t)specific_worker) {
+ GPR_TIMER_MARK("different_thread_worker", 0);
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = 1;
+ }
+ specific_worker->kicked_specifically = 1;
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
+ GPR_TIMER_MARK("kick_yoself", 0);
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = 1;
+ }
+ specific_worker->kicked_specifically = 1;
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ }
+ } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
+ GPR_TIMER_MARK("kick_anonymous", 0);
+ specific_worker = pop_front_worker(p);
+ if (specific_worker != NULL) {
+ if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
+ GPR_TIMER_MARK("kick_anonymous_not_self", 0);
+ push_back_worker(p, specific_worker);
+ specific_worker = pop_front_worker(p);
+ if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
+ gpr_tls_get(&g_current_thread_worker) ==
+ (intptr_t)specific_worker) {
+ push_back_worker(p, specific_worker);
+ specific_worker = NULL;
+ }
+ }
+ if (specific_worker != NULL) {
+ GPR_TIMER_MARK("finally_kick", 0);
+ push_back_worker(p, specific_worker);
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ }
+ } else {
+ GPR_TIMER_MARK("kicked_no_pollers", 0);
+ p->kicked_without_pollers = 1;
+ }
+ }
+
+ GPR_TIMER_END("pollset_kick_ext", 0);
+}
+
+static void pollset_kick(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker) {
+ pollset_kick_ext(p, specific_worker, 0);
+}
+
+/* global state management */
+
+static void pollset_global_init(void) {
+ gpr_tls_init(&g_current_thread_poller);
+ gpr_tls_init(&g_current_thread_worker);
+ grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+}
+
+static void pollset_global_shutdown(void) {
+ grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ gpr_tls_destroy(&g_current_thread_poller);
+ gpr_tls_destroy(&g_current_thread_worker);
+}
+
+static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
+
+/* main interface */
+
+static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
+ gpr_mu_init(&pollset->mu);
+ *mu = &pollset->mu;
+ pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
+ pollset->in_flight_cbs = 0;
+ pollset->shutting_down = 0;
+ pollset->called_shutdown = 0;
+ pollset->kicked_without_pollers = 0;
+ pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
+ pollset->local_wakeup_cache = NULL;
+ pollset->kicked_without_pollers = 0;
+ pollset->fd_count = 0;
+ pollset->fd_capacity = 0;
+ pollset->del_count = 0;
+ pollset->del_capacity = 0;
+ pollset->fds = NULL;
+ pollset->dels = NULL;
+}
+
+static void pollset_destroy(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->in_flight_cbs == 0);
+ GPR_ASSERT(!pollset_has_workers(pollset));
+ GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
+ while (pollset->local_wakeup_cache) {
+ grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
+ grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
+ gpr_free(pollset->local_wakeup_cache);
+ pollset->local_wakeup_cache = next;
+ }
+ gpr_free(pollset->fds);
+ gpr_free(pollset->dels);
+ gpr_mu_destroy(&pollset->mu);
+}
+
+static void pollset_reset(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->shutting_down);
+ GPR_ASSERT(pollset->in_flight_cbs == 0);
+ GPR_ASSERT(!pollset_has_workers(pollset));
+ GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
+ GPR_ASSERT(pollset->fd_count == 0);
+ GPR_ASSERT(pollset->del_count == 0);
+ pollset->shutting_down = 0;
+ pollset->called_shutdown = 0;
+ pollset->kicked_without_pollers = 0;
+}
+
+static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd) {
+ gpr_mu_lock(&pollset->mu);
+ size_t i;
+ /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
+ for (i = 0; i < pollset->fd_count; i++) {
+ if (pollset->fds[i] == fd) goto exit;
+ }
+ if (pollset->fd_count == pollset->fd_capacity) {
+ pollset->fd_capacity =
+ GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
+ pollset->fds =
+ gpr_realloc(pollset->fds, sizeof(grpc_fd *) * pollset->fd_capacity);
+ }
+ pollset->fds[pollset->fd_count++] = fd;
+ GRPC_FD_REF(fd, "multipoller");
+ pollset_kick(pollset, NULL);
+exit:
+ gpr_mu_unlock(&pollset->mu);
+}
+
+static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
+ GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
+ size_t i;
+ for (i = 0; i < pollset->fd_count; i++) {
+ GRPC_FD_UNREF(pollset->fds[i], "multipoller");
+ }
+ for (i = 0; i < pollset->del_count; i++) {
+ GRPC_FD_UNREF(pollset->dels[i], "multipoller_del");
+ }
+ pollset->fd_count = 0;
+ pollset->del_count = 0;
+ grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
+}
+
+static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker_hdl, gpr_timespec now,
+ gpr_timespec deadline) {
+ grpc_pollset_worker worker;
+ *worker_hdl = &worker;
+
+ /* pollset->mu already held */
+ int added_worker = 0;
+ int locked = 1;
+ int queued_work = 0;
+ int keep_polling = 0;
+ GPR_TIMER_BEGIN("pollset_work", 0);
+ /* this must happen before we (potentially) drop pollset->mu */
+ worker.next = worker.prev = NULL;
+ worker.reevaluate_polling_on_wakeup = 0;
+ if (pollset->local_wakeup_cache != NULL) {
+ worker.wakeup_fd = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker.wakeup_fd->next;
+ } else {
+ worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
+ grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
+ }
+ worker.kicked_specifically = 0;
+ /* If there's work waiting for the pollset to be idle, and the
+ pollset is idle, then do that work */
+ if (!pollset_has_workers(pollset) &&
+ !grpc_closure_list_empty(pollset->idle_jobs)) {
+ GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ goto done;
+ }
+ /* If we're shutting down then we don't execute any extended work */
+ if (pollset->shutting_down) {
+ GPR_TIMER_MARK("pollset_work.shutting_down", 0);
+ goto done;
+ }
+ /* Give do_promote priority so we don't starve it out */
+ if (pollset->in_flight_cbs) {
+ GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0);
+ gpr_mu_unlock(&pollset->mu);
+ locked = 0;
+ goto done;
+ }
+ /* Start polling, and keep doing so while we're being asked to
+ re-evaluate our pollers (this allows poll() based pollers to
+ ensure they don't miss wakeups) */
+ keep_polling = 1;
+ gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
+ while (keep_polling) {
+ keep_polling = 0;
+ if (!pollset->kicked_without_pollers) {
+ if (!added_worker) {
+ push_front_worker(pollset, &worker);
+ added_worker = 1;
+ gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
+ }
+ GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
+#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
+#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
+
+ int timeout;
+ int r;
+ size_t i, j, fd_count;
+ nfds_t pfd_count;
+ /* TODO(ctiller): inline some elements to avoid an allocation */
+ grpc_fd_watcher *watchers;
+ struct pollfd *pfds;
+
+ timeout = poll_deadline_to_millis_timeout(deadline, now);
+ /* TODO(ctiller): perform just one malloc here if we exceed the inline
+ * case */
+ pfds = gpr_malloc(sizeof(*pfds) * (pollset->fd_count + 2));
+ watchers = gpr_malloc(sizeof(*watchers) * (pollset->fd_count + 2));
+ fd_count = 0;
+ pfd_count = 2;
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
+ pfds[0].events = POLLIN;
+ pfds[0].revents = 0;
+ pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
+ pfds[1].events = POLLIN;
+ pfds[1].revents = 0;
+ for (i = 0; i < pollset->fd_count; i++) {
+ int remove = fd_is_orphaned(pollset->fds[i]);
+ for (j = 0; !remove && j < pollset->del_count; j++) {
+ if (pollset->fds[i] == pollset->dels[j]) remove = 1;
+ }
+ if (remove) {
+ GRPC_FD_UNREF(pollset->fds[i], "multipoller");
+ } else {
+ pollset->fds[fd_count++] = pollset->fds[i];
+ watchers[pfd_count].fd = pollset->fds[i];
+ GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
+ pfds[pfd_count].fd = pollset->fds[i]->fd;
+ pfds[pfd_count].revents = 0;
+ pfd_count++;
+ }
+ }
+ for (j = 0; j < pollset->del_count; j++) {
+ GRPC_FD_UNREF(pollset->dels[j], "multipoller_del");
+ }
+ pollset->del_count = 0;
+ pollset->fd_count = fd_count;
+ gpr_mu_unlock(&pollset->mu);
+
+ for (i = 2; i < pfd_count; i++) {
+ grpc_fd *fd = watchers[i].fd;
+ pfds[i].events = (short)fd_begin_poll(fd, pollset, &worker, POLLIN,
+ POLLOUT, &watchers[i]);
+ GRPC_FD_UNREF(fd, "multipoller_start");
+ }
+
+ /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
+ even going into the blocking annotation if possible */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ r = grpc_poll_function(pfds, pfd_count, timeout);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+
+ if (r < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ }
+ for (i = 2; i < pfd_count; i++) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ }
+ } else if (r == 0) {
+ for (i = 2; i < pfd_count; i++) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ }
+ } else {
+ if (pfds[0].revents & POLLIN_CHECK) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ }
+ if (pfds[1].revents & POLLIN_CHECK) {
+ grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd);
+ }
+ for (i = 2; i < pfd_count; i++) {
+ if (watchers[i].fd == NULL) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ } else {
+ fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
+ pfds[i].revents & POLLOUT_CHECK);
+ }
+ }
+ }
+
+ gpr_free(pfds);
+ gpr_free(watchers);
+ GPR_TIMER_END("maybe_work_and_unlock", 0);
+ locked = 0;
+ } else {
+ GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
+ pollset->kicked_without_pollers = 0;
+ }
+ /* Finished execution - start cleaning up.
+ Note that we may arrive here from outside the enclosing while() loop.
+ In that case we won't loop though as we haven't added worker to the
+ worker list, which means nobody could ask us to re-evaluate polling). */
+ done:
+ if (!locked) {
+ queued_work |= grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ locked = 1;
+ }
+ /* If we're forced to re-evaluate polling (via pollset_kick with
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
+ a loop */
+ if (worker.reevaluate_polling_on_wakeup) {
+ worker.reevaluate_polling_on_wakeup = 0;
+ pollset->kicked_without_pollers = 0;
+ if (queued_work || worker.kicked_specifically) {
+ /* If there's queued work on the list, then set the deadline to be
+ immediate so we get back out of the polling loop quickly */
+ deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ }
+ keep_polling = 1;
+ }
+ if (keep_polling) {
+ now = gpr_now(now.clock_type);
+ }
+ }
+ gpr_tls_set(&g_current_thread_poller, 0);
+ if (added_worker) {
+ remove_worker(pollset, &worker);
+ gpr_tls_set(&g_current_thread_worker, 0);
+ }
+ /* release wakeup fd to the local pool */
+ worker.wakeup_fd->next = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker.wakeup_fd;
+ /* check shutdown conditions */
+ if (pollset->shutting_down) {
+ if (pollset_has_workers(pollset)) {
+ pollset_kick(pollset, NULL);
+ } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
+ pollset->called_shutdown = 1;
+ gpr_mu_unlock(&pollset->mu);
+ finish_shutdown(exec_ctx, pollset);
+ grpc_exec_ctx_flush(exec_ctx);
+ /* Continuing to access pollset here is safe -- it is the caller's
+ * responsibility to not destroy when it has outstanding calls to
+ * pollset_work.
+ * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
+ gpr_mu_lock(&pollset->mu);
+ } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ gpr_mu_unlock(&pollset->mu);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ }
+ }
+ *worker_hdl = NULL;
+ GPR_TIMER_END("pollset_work", 0);
+}
+
+static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_closure *closure) {
+ GPR_ASSERT(!pollset->shutting_down);
+ pollset->shutting_down = 1;
+ pollset->shutdown_done = closure;
+ pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
+ if (!pollset_has_workers(pollset)) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ }
+ if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
+ !pollset_has_workers(pollset)) {
+ pollset->called_shutdown = 1;
+ finish_shutdown(exec_ctx, pollset);
+ }
+}
+
+static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
+ gpr_timespec now) {
+ gpr_timespec timeout;
+ static const int64_t max_spin_polling_us = 10;
+ if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
+ return -1;
+ }
+ if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
+ max_spin_polling_us,
+ GPR_TIMESPAN))) <= 0) {
+ return 0;
+ }
+ timeout = gpr_time_sub(deadline, now);
+ return gpr_time_to_millis(gpr_time_add(
+ timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
+}
+
+/*******************************************************************************
+ * pollset_set_posix.c
+ */
+
+static grpc_pollset_set *pollset_set_create(void) {
+ grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
+ memset(pollset_set, 0, sizeof(*pollset_set));
+ gpr_mu_init(&pollset_set->mu);
+ return pollset_set;
+}
+
+static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
+ size_t i;
+ gpr_mu_destroy(&pollset_set->mu);
+ for (i = 0; i < pollset_set->fd_count; i++) {
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
+ }
+ gpr_free(pollset_set->pollsets);
+ gpr_free(pollset_set->pollset_sets);
+ gpr_free(pollset_set->fds);
+ gpr_free(pollset_set);
+}
+
+static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset) {
+ size_t i, j;
+ gpr_mu_lock(&pollset_set->mu);
+ if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
+ pollset_set->pollset_capacity =
+ GPR_MAX(8, 2 * pollset_set->pollset_capacity);
+ pollset_set->pollsets =
+ gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
+ sizeof(*pollset_set->pollsets));
+ }
+ pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
+ for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
+ if (fd_is_orphaned(pollset_set->fds[i])) {
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
+ } else {
+ pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
+ pollset_set->fds[j++] = pollset_set->fds[i];
+ }
+ }
+ pollset_set->fd_count = j;
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ for (i = 0; i < pollset_set->pollset_count; i++) {
+ if (pollset_set->pollsets[i] == pollset) {
+ pollset_set->pollset_count--;
+ GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
+ pollset_set->pollsets[pollset_set->pollset_count]);
+ break;
+ }
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i, j;
+ gpr_mu_lock(&bag->mu);
+ if (bag->pollset_set_count == bag->pollset_set_capacity) {
+ bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
+ bag->pollset_sets =
+ gpr_realloc(bag->pollset_sets,
+ bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
+ }
+ bag->pollset_sets[bag->pollset_set_count++] = item;
+ for (i = 0, j = 0; i < bag->fd_count; i++) {
+ if (fd_is_orphaned(bag->fds[i])) {
+ GRPC_FD_UNREF(bag->fds[i], "pollset_set");
+ } else {
+ pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
+ bag->fds[j++] = bag->fds[i];
+ }
+ }
+ bag->fd_count = j;
+ gpr_mu_unlock(&bag->mu);
+}
+
+static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i;
+ gpr_mu_lock(&bag->mu);
+ for (i = 0; i < bag->pollset_set_count; i++) {
+ if (bag->pollset_sets[i] == item) {
+ bag->pollset_set_count--;
+ GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
+ bag->pollset_sets[bag->pollset_set_count]);
+ break;
+ }
+ }
+ gpr_mu_unlock(&bag->mu);
+}
+
+static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ if (pollset_set->fd_count == pollset_set->fd_capacity) {
+ pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
+ pollset_set->fds = gpr_realloc(
+ pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
+ }
+ GRPC_FD_REF(fd, "pollset_set");
+ pollset_set->fds[pollset_set->fd_count++] = fd;
+ for (i = 0; i < pollset_set->pollset_count; i++) {
+ pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
+ }
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ for (i = 0; i < pollset_set->fd_count; i++) {
+ if (pollset_set->fds[i] == fd) {
+ pollset_set->fd_count--;
+ GPR_SWAP(grpc_fd *, pollset_set->fds[i],
+ pollset_set->fds[pollset_set->fd_count]);
+ GRPC_FD_UNREF(fd, "pollset_set");
+ break;
+ }
+ }
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+/*******************************************************************************
+ * event engine binding
+ */
+
+static void shutdown_engine(void) { pollset_global_shutdown(); }
+
+static const grpc_event_engine_vtable vtable = {
+ .pollset_size = sizeof(grpc_pollset),
+
+ .fd_create = fd_create,
+ .fd_wrapped_fd = fd_wrapped_fd,
+ .fd_orphan = fd_orphan,
+ .fd_shutdown = fd_shutdown,
+ .fd_notify_on_read = fd_notify_on_read,
+ .fd_notify_on_write = fd_notify_on_write,
+
+ .pollset_init = pollset_init,
+ .pollset_shutdown = pollset_shutdown,
+ .pollset_reset = pollset_reset,
+ .pollset_destroy = pollset_destroy,
+ .pollset_work = pollset_work,
+ .pollset_kick = pollset_kick,
+ .pollset_add_fd = pollset_add_fd,
+
+ .pollset_set_create = pollset_set_create,
+ .pollset_set_destroy = pollset_set_destroy,
+ .pollset_set_add_pollset = pollset_set_add_pollset,
+ .pollset_set_del_pollset = pollset_set_del_pollset,
+ .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
+ .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
+ .pollset_set_add_fd = pollset_set_add_fd,
+ .pollset_set_del_fd = pollset_set_del_fd,
+
+ .kick_poller = kick_poller,
+
+ .shutdown_engine = shutdown_engine,
+};
+
+const grpc_event_engine_vtable *grpc_init_poll_posix(void) {
+ pollset_global_init();
+ return &vtable;
+}
+
+#endif
diff --git a/src/core/lib/iomgr/ev_poll_posix.h b/src/core/lib/iomgr/ev_poll_posix.h
new file mode 100644
index 0000000000..291736a2db
--- /dev/null
+++ b/src/core/lib/iomgr/ev_poll_posix.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H
+#define GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H
+
+#include "src/core/lib/iomgr/ev_posix.h"
+
+const grpc_event_engine_vtable *grpc_init_poll_posix(void);
+
+#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 7df1751352..a7dfc9552d 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -37,23 +37,104 @@
#include "src/core/lib/iomgr/ev_posix.h"
+#include <string.h>
+
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h"
+#include "src/core/lib/iomgr/ev_poll_posix.h"
+#include "src/core/lib/support/env.h"
+
+/** Default poll() function - a pointer so that it can be overridden by some
+ * tests */
+grpc_poll_function_type grpc_poll_function = poll;
static const grpc_event_engine_vtable *g_event_engine;
-grpc_poll_function_type grpc_poll_function = poll;
+typedef const grpc_event_engine_vtable *(*event_engine_factory_fn)(void);
+
+typedef struct {
+ const char *name;
+ event_engine_factory_fn factory;
+} event_engine_factory;
+
+static const event_engine_factory g_factories[] = {
+ {"poll", grpc_init_poll_posix}, {"legacy", grpc_init_poll_and_epoll_posix},
+};
+
+static void add(const char *beg, const char *end, char ***ss, size_t *ns) {
+ size_t n = *ns;
+ size_t np = n + 1;
+ char *s;
+ size_t len;
+ GPR_ASSERT(end >= beg);
+ len = (size_t)(end - beg);
+ s = gpr_malloc(len + 1);
+ memcpy(s, beg, len);
+ s[len] = 0;
+ *ss = gpr_realloc(*ss, sizeof(char **) * np);
+ (*ss)[n] = s;
+ *ns = np;
+}
+
+static void split(const char *s, char ***ss, size_t *ns) {
+ const char *c = strchr(s, ',');
+ if (c == NULL) {
+ add(s, s + strlen(s), ss, ns);
+ } else {
+ add(s, c, ss, ns);
+ split(c + 1, ss, ns);
+ }
+}
+
+static bool is(const char *want, const char *have) {
+ return 0 == strcmp(want, "all") || 0 == strcmp(want, have);
+}
+
+static void try_engine(const char *engine) {
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
+ if (is(engine, g_factories[i].name)) {
+ if ((g_event_engine = g_factories[i].factory())) {
+ gpr_log(GPR_DEBUG, "Using polling engine: %s", g_factories[i].name);
+ return;
+ }
+ }
+ }
+}
void grpc_event_engine_init(void) {
- if ((g_event_engine = grpc_init_poll_and_epoll_posix())) {
- return;
+ char *s = gpr_getenv("GRPC_POLL_STRATEGY");
+ if (s == NULL) {
+ s = gpr_strdup("all");
+ }
+
+ char **strings = NULL;
+ size_t nstrings = 0;
+ split(s, &strings, &nstrings);
+
+ for (size_t i = 0; g_event_engine == NULL && i < nstrings; i++) {
+ try_engine(strings[i]);
+ }
+
+ for (size_t i = 0; i < nstrings; i++) {
+ gpr_free(strings[i]);
+ }
+ gpr_free(strings);
+ gpr_free(s);
+
+ if (g_event_engine == NULL) {
+ gpr_log(GPR_ERROR, "No event engine could be initialized");
+ abort();
}
- gpr_log(GPR_ERROR, "No event engine could be initialized");
- abort();
}
-void grpc_event_engine_shutdown(void) { g_event_engine->shutdown_engine(); }
+void grpc_event_engine_shutdown(void) {
+ g_event_engine->shutdown_engine();
+ g_event_engine = NULL;
+}
grpc_fd *grpc_fd_create(int fd, const char *name) {
return g_event_engine->fd_create(fd, name);
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 2146c7dd1f..e451479073 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -39,6 +39,22 @@
#include "src/core/lib/profiling/timers.h"
+bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) {
+ if (!exec_ctx->cached_ready_to_finish) {
+ exec_ctx->cached_ready_to_finish = exec_ctx->check_ready_to_finish(
+ exec_ctx, exec_ctx->check_ready_to_finish_arg);
+ }
+ return exec_ctx->cached_ready_to_finish;
+}
+
+bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
+ return false;
+}
+
+bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
+ return true;
+}
+
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0;
@@ -61,6 +77,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
}
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
+ exec_ctx->cached_ready_to_finish = true;
grpc_exec_ctx_flush(exec_ctx);
}
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 976cc40347..9d47a262f8 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -53,6 +53,9 @@ typedef struct grpc_workqueue grpc_workqueue;
* - track a list of work that needs to be delayed until the top of the
* call stack (this provides a convenient mechanism to run callbacks
* without worrying about locking issues)
+ * - provide a decision maker (via grpc_exec_ctx_ready_to_finish) that provides
+ * signal as to whether a borrowed thread should continue to do work or
+ * should actively try to finish up and get this thread back to its owner
*
* CONVENTIONS:
* Instance of this must ALWAYS be constructed on the stack, never
@@ -63,18 +66,26 @@ typedef struct grpc_workqueue grpc_workqueue;
*/
struct grpc_exec_ctx {
grpc_closure_list closure_list;
+ bool cached_ready_to_finish;
+ void *check_ready_to_finish_arg;
+ bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
-#define GRPC_EXEC_CTX_INIT \
- { GRPC_CLOSURE_LIST_INIT }
+#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
+ { GRPC_CLOSURE_LIST_INIT, false, finish_check_arg, finish_check }
#else
struct grpc_exec_ctx {
- int unused;
+ bool cached_ready_to_finish;
+ void *check_ready_to_finish_arg;
+ bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
-#define GRPC_EXEC_CTX_INIT \
- { 0 }
+#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
+ { false, finish_check_arg, finish_check }
#endif
+#define GRPC_EXEC_CTX_INIT \
+ GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_never_ready_to_finish, NULL)
+
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.
* Returns true if work was performed, false otherwise. */
@@ -86,6 +97,14 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
bool success,
grpc_workqueue *offload_target_or_null);
+/** Returns true if we'd like to leave this execution context as soon as
+ possible: useful for deciding whether to do something more or not depending
+ on outside context */
+bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx);
+/** A finish check that is never ready to finish */
+bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
+/** A finish check that is always ready to finish */
+bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
/** Add a list of closures to be executed at the next flush/finish point.
* Leaves \a list empty. */
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/iomgr_posix.c b/src/core/lib/iomgr/iomgr_posix.c
index 016c501f75..cede97f4c6 100644
--- a/src/core/lib/iomgr/iomgr_posix.c
+++ b/src/core/lib/iomgr/iomgr_posix.c
@@ -41,12 +41,16 @@
#include "src/core/lib/iomgr/tcp_posix.h"
void grpc_iomgr_platform_init(void) {
+ grpc_wakeup_fd_global_init();
grpc_event_engine_init();
grpc_register_tracer("tcp", &grpc_tcp_trace);
}
void grpc_iomgr_platform_flush(void) {}
-void grpc_iomgr_platform_shutdown(void) { grpc_event_engine_shutdown(); }
+void grpc_iomgr_platform_shutdown(void) {
+ grpc_event_engine_shutdown();
+ grpc_wakeup_fd_global_destroy();
+}
#endif /* GRPC_POSIX_SOCKET */
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index df6cf956d9..98ffccd59b 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -81,6 +81,7 @@ typedef struct {
grpc_closure read_closure;
grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
+ grpc_udp_server_orphan_cb orphan_cb;
} server_port;
/* the overall server */
@@ -168,6 +169,10 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
server_port *sp = &s->ports[i];
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
+
+ GPR_ASSERT(sp->orphan_cb);
+ sp->orphan_cb(sp->emfd);
+
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown");
}
@@ -268,7 +273,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
static int add_socket_to_server(grpc_udp_server *s, int fd,
const struct sockaddr *addr, size_t addr_len,
- grpc_udp_server_read_cb read_cb) {
+ grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb) {
server_port *sp;
int port;
char *addr_str;
@@ -292,6 +298,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
sp->read_cb = read_cb;
+ sp->orphan_cb = orphan_cb;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(name);
@@ -301,7 +308,8 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
}
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
- size_t addr_len, grpc_udp_server_read_cb read_cb) {
+ size_t addr_len, grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb) {
int allocated_port1 = -1;
int allocated_port2 = -1;
unsigned i;
@@ -348,7 +356,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6);
fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
- allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+ allocated_port1 =
+ add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@@ -370,7 +379,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy);
}
- allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+ allocated_port2 =
+ add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
done:
gpr_free(allocated_addr);
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index d8cf957a22..33c5ce11cd 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -48,6 +48,9 @@ typedef struct grpc_udp_server grpc_udp_server;
typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
struct grpc_server *server);
+/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
+typedef void (*grpc_udp_server_orphan_cb)(grpc_fd *emfd);
+
/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void);
@@ -69,7 +72,8 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);
/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
- size_t addr_len, grpc_udp_server_read_cb read_cb);
+ size_t addr_len, grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb);
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server,
grpc_closure *on_done);
diff --git a/src/core/lib/support/string_util_win32.c b/src/core/lib/support/string_util_win32.c
index f3cb0c050f..0d7bcdb5aa 100644
--- a/src/core/lib/support/string_util_win32.c
+++ b/src/core/lib/support/string_util_win32.c
@@ -83,7 +83,7 @@ char *gpr_format_message(int messageid) {
DWORD status = FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
- NULL, (DWORD)messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ NULL, (DWORD)messageid, MAKELANGID(LANG_ENGLISH, SUBLANG_DEFAULT),
(LPTSTR)(&tmessage), 0, NULL);
if (status == 0) return gpr_strdup("Unable to retrieve error string");
message = gpr_tchar_to_char(tmessage);
diff --git a/src/core/lib/surface/byte_buffer_reader.c b/src/core/lib/surface/byte_buffer_reader.c
index 809fd5f1fa..c97079f638 100644
--- a/src/core/lib/surface/byte_buffer_reader.c
+++ b/src/core/lib/surface/byte_buffer_reader.c
@@ -62,12 +62,19 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
case GRPC_BB_RAW:
gpr_slice_buffer_init(&decompressed_slices_buffer);
if (is_compressed(reader->buffer_in)) {
- grpc_msg_decompress(reader->buffer_in->data.raw.compression,
- &reader->buffer_in->data.raw.slice_buffer,
- &decompressed_slices_buffer);
- reader->buffer_out =
- grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices,
- decompressed_slices_buffer.count);
+ if (grpc_msg_decompress(reader->buffer_in->data.raw.compression,
+ &reader->buffer_in->data.raw.slice_buffer,
+ &decompressed_slices_buffer) == 0) {
+ gpr_log(GPR_ERROR,
+ "Unexpected error decompressing data for algorithm with enum "
+ "value '%d'. Reading data as if it were uncompressed.",
+ reader->buffer_in->data.raw.compression);
+ reader->buffer_out = reader->buffer_in;
+ } else { /* all fine */
+ reader->buffer_out =
+ grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices,
+ decompressed_slices_buffer.count);
+ }
gpr_slice_buffer_destroy(&decompressed_slices_buffer);
} else { /* not compressed, use the input buffer as output */
reader->buffer_out = reader->buffer_in;
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 9b2b94eedf..c8728fa278 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -261,6 +261,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
call->channel = channel;
call->cq = cq;
call->parent = parent_call;
+ /* Always support no compression */
+ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = server_transport_data == NULL;
if (call->is_client) {
GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT);
@@ -408,6 +410,7 @@ static void set_status_code(grpc_call *call, status_source source,
static void set_compression_algorithm(grpc_call *call,
grpc_compression_algorithm algo) {
+ GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT);
call->compression_algorithm = algo;
}
@@ -828,12 +831,16 @@ static uint32_t decode_status(grpc_mdelem *md) {
return status;
}
-static uint32_t decode_compression(grpc_mdelem *md) {
+static grpc_compression_algorithm decode_compression(grpc_mdelem *md) {
grpc_compression_algorithm algorithm =
grpc_compression_algorithm_from_mdstr(md->value);
if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) {
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
- gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
+ gpr_log(GPR_ERROR,
+ "Invalid incoming compression algorithm: '%s'. Interpreting "
+ "incoming data as uncompressed.",
+ md_c_str);
+ return GRPC_COMPRESS_NONE;
}
return algorithm;
}
@@ -1087,6 +1094,24 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
grpc_metadata_batch_filter(md, recv_initial_filter, call);
+ /* make sure the received grpc-encoding is amongst the ones listed in
+ * grpc-accept-encoding */
+
+ GPR_ASSERT(call->encodings_accepted_by_peer != 0);
+ if (!GPR_BITGET(call->encodings_accepted_by_peer,
+ call->compression_algorithm)) {
+ extern int grpc_compression_trace;
+ if (grpc_compression_trace) {
+ char *algo_name;
+ grpc_compression_algorithm_name(call->compression_algorithm,
+ &algo_name);
+ gpr_log(GPR_ERROR,
+ "Compression algorithm (grpc-encoding = '%s') not present in "
+ "the bitset of accepted encodings (grpc-accept-encodings: "
+ "'0x%x')",
+ algo_name, call->encodings_accepted_by_peer);
+ }
+ }
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
0 &&
!call->is_client) {
@@ -1474,7 +1499,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
grpc_call_error err;
GRPC_API_TRACE(
- "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)",
+ "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
+ "reserved=%p)",
5, (call, ops, (unsigned long)nops, tag, reserved));
if (reserved != NULL) {
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 57c6897626..1c8b709015 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -164,7 +164,7 @@ void grpc_init(void) {
grpc_register_tracer("channel_stack_builder",
&grpc_trace_channel_stack_builder);
grpc_register_tracer("http1", &grpc_http1_trace);
- grpc_register_tracer("compression", &grpc_compress_filter_trace);
+ grpc_register_tracer("compression", &grpc_compression_trace);
grpc_security_pre_init();
grpc_iomgr_init();
grpc_executor_init();