aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/closure.c36
-rw-r--r--src/core/lib/iomgr/closure.h34
-rw-r--r--src/core/lib/iomgr/endpoint.h2
-rw-r--r--src/core/lib/iomgr/endpoint_pair_posix.c4
-rw-r--r--src/core/lib/iomgr/error.c535
-rw-r--r--src/core/lib/iomgr/error.h192
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c273
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c1267
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.h41
-rw-r--r--src/core/lib/iomgr/ev_posix.c118
-rw-r--r--src/core/lib/iomgr/ev_posix.h24
-rw-r--r--src/core/lib/iomgr/exec_ctx.c32
-rw-r--r--src/core/lib/iomgr/exec_ctx.h35
-rw-r--r--src/core/lib/iomgr/executor.c4
-rw-r--r--src/core/lib/iomgr/executor.h2
-rw-r--r--src/core/lib/iomgr/iocp_windows.c43
-rw-r--r--src/core/lib/iomgr/iocp_windows.h8
-rw-r--r--src/core/lib/iomgr/iomgr.c9
-rw-r--r--src/core/lib/iomgr/iomgr_posix.c6
-rw-r--r--src/core/lib/iomgr/iomgr_windows.c2
-rw-r--r--src/core/lib/iomgr/load_file.c89
-rw-r--r--src/core/lib/iomgr/load_file.h56
-rw-r--r--src/core/lib/iomgr/polling_entity.c104
-rw-r--r--src/core/lib/iomgr/polling_entity.h81
-rw-r--r--src/core/lib/iomgr/pollset.h11
-rw-r--r--src/core/lib/iomgr/pollset_set_windows.c5
-rw-r--r--src/core/lib/iomgr/pollset_windows.c16
-rw-r--r--src/core/lib/iomgr/resolve_address.h14
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c93
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.c88
-rw-r--r--src/core/lib/iomgr/sockaddr.h4
-rw-r--r--src/core/lib/iomgr/sockaddr_windows.h (renamed from src/core/lib/iomgr/sockaddr_win32.h)6
-rw-r--r--src/core/lib/iomgr/socket_utils_common_posix.c137
-rw-r--r--src/core/lib/iomgr/socket_utils_posix.h31
-rw-r--r--src/core/lib/iomgr/socket_windows.c63
-rw-r--r--src/core/lib/iomgr/socket_windows.h13
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c93
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c83
-rw-r--r--src/core/lib/iomgr/tcp_posix.c82
-rw-r--r--src/core/lib/iomgr/tcp_server.h8
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c167
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c215
-rw-r--r--src/core/lib/iomgr/tcp_windows.c95
-rw-r--r--src/core/lib/iomgr/tcp_windows.h2
-rw-r--r--src/core/lib/iomgr/timer.c39
-rw-r--r--src/core/lib/iomgr/udp_server.c32
-rw-r--r--src/core/lib/iomgr/udp_server.h6
-rw-r--r--src/core/lib/iomgr/unix_sockets_posix.c15
-rw-r--r--src/core/lib/iomgr/unix_sockets_posix.h3
-rw-r--r--src/core/lib/iomgr/unix_sockets_posix_noop.c6
-rw-r--r--src/core/lib/iomgr/wakeup_fd_eventfd.c20
-rw-r--r--src/core/lib/iomgr/wakeup_fd_pipe.c22
-rw-r--r--src/core/lib/iomgr/wakeup_fd_posix.c12
-rw-r--r--src/core/lib/iomgr/wakeup_fd_posix.h15
-rw-r--r--src/core/lib/iomgr/workqueue.h9
-rw-r--r--src/core/lib/iomgr/workqueue_posix.c66
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c4
57 files changed, 3681 insertions, 791 deletions
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index 27793c32e4..0b6c3b2539 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -39,25 +39,32 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg) {
closure->cb = cb;
closure->cb_arg = cb_arg;
- closure->final_data = 0;
}
-void grpc_closure_list_add(grpc_closure_list *closure_list,
- grpc_closure *closure, bool success) {
- if (closure == NULL) return;
- closure->final_data = (success != 0);
+void grpc_closure_list_append(grpc_closure_list *closure_list,
+ grpc_closure *closure, grpc_error *error) {
+ if (closure == NULL) {
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ closure->error = error;
+ closure->next_data.next = NULL;
if (closure_list->head == NULL) {
closure_list->head = closure;
} else {
- closure_list->tail->final_data |= (uintptr_t)closure;
+ closure_list->tail->next_data.next = closure;
}
closure_list->tail = closure;
}
-void grpc_closure_list_fail_all(grpc_closure_list *list) {
- for (grpc_closure *c = list->head; c != NULL; c = grpc_closure_next(c)) {
- c->final_data &= ~(uintptr_t)1;
+void grpc_closure_list_fail_all(grpc_closure_list *list,
+ grpc_error *forced_failure) {
+ for (grpc_closure *c = list->head; c != NULL; c = c->next_data.next) {
+ if (c->error == GRPC_ERROR_NONE) {
+ c->error = GRPC_ERROR_REF(forced_failure);
+ }
}
+ GRPC_ERROR_UNREF(forced_failure);
}
bool grpc_closure_list_empty(grpc_closure_list closure_list) {
@@ -71,7 +78,7 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) {
if (dst->head == NULL) {
*dst = *src;
} else {
- dst->tail->final_data |= (uintptr_t)src->head;
+ dst->tail->next_data.next = src->head;
dst->tail = src->tail;
}
src->head = src->tail = NULL;
@@ -83,12 +90,13 @@ typedef struct {
grpc_closure wrapper;
} wrapped_closure;
-static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
wrapped_closure *wc = arg;
grpc_iomgr_cb_func cb = wc->cb;
void *cb_arg = wc->cb_arg;
gpr_free(wc);
- cb(exec_ctx, cb_arg, success);
+ cb(exec_ctx, cb_arg, error);
}
grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
@@ -98,7 +106,3 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
grpc_closure_init(&wc->wrapper, closure_wrapper, wc);
return &wc->wrapper;
}
-
-grpc_closure *grpc_closure_next(grpc_closure *closure) {
- return (grpc_closure *)(closure->final_data & ~(uintptr_t)1);
-}
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index fdc2daed9d..08e59a168e 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -36,6 +36,7 @@
#include <grpc/support/port_platform.h>
#include <stdbool.h>
+#include "src/core/lib/iomgr/error.h"
struct grpc_closure;
typedef struct grpc_closure grpc_closure;
@@ -52,10 +53,10 @@ typedef struct grpc_closure_list {
/** gRPC Callback definition.
*
* \param arg Arbitrary input.
- * \param success An indication on the state of the iomgr. On false, cleanup
- * actions should be taken (eg, shutdown). */
+ * \param error GRPC_ERROR_NONE if no error occurred, otherwise some grpc_error
+ * describing what went wrong */
typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg,
- bool success);
+ grpc_error *error);
/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
@@ -65,10 +66,15 @@ struct grpc_closure {
/** Arguments to be passed to "cb". */
void *cb_arg;
- /** Once enqueued, contains in the lower bit the success of the closure,
- and in the upper bits the pointer to the next closure in the list.
- Before enqueing for execution, this is usable for scratch data. */
- uintptr_t final_data;
+ /** Once queued, the result of the closure. Before then: scratch space */
+ grpc_error *error;
+
+ /** Once queued, next indicates the next queued closure; before then, scratch
+ * space */
+ union {
+ grpc_closure *next;
+ uintptr_t scratch;
+ } next_data;
};
/** Initializes \a closure with \a cb and \a cb_arg. */
@@ -81,13 +87,14 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
#define GRPC_CLOSURE_LIST_INIT \
{ NULL, NULL }
-/** add \a closure to the end of \a list and set \a closure's success to \a
- * success */
-void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure,
- bool success);
+/** add \a closure to the end of \a list
+ and set \a closure's result to \a error */
+void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure,
+ grpc_error *error);
/** force all success bits in \a list to false */
-void grpc_closure_list_fail_all(grpc_closure_list *list);
+void grpc_closure_list_fail_all(grpc_closure_list *list,
+ grpc_error *forced_failure);
/** append all closures from \a src to \a dst and empty \a src. */
void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
@@ -95,7 +102,4 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
/** return whether \a list is empty. */
bool grpc_closure_list_empty(grpc_closure_list list);
-/** return the next pointer for a queued closure list */
-grpc_closure *grpc_closure_next(grpc_closure *closure);
-
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index 3877ceb1e2..f9808bbda1 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -82,7 +82,7 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep);
void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_slice_buffer *slices, grpc_closure *cb);
-/* Causes any pending read/write callbacks to run immediately with
+/* Causes any pending and future read/write callbacks to run immediately with
success==0 */
void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.c
index e0ce47c773..e295fb4867 100644
--- a/src/core/lib/iomgr/endpoint_pair_posix.c
+++ b/src/core/lib/iomgr/endpoint_pair_posix.c
@@ -58,8 +58,8 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
flags = fcntl(sv[1], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
- GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[0]));
- GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1]));
+ GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[0]) == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1]) == GRPC_ERROR_NONE);
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
new file mode 100644
index 0000000000..540fb4fa7e
--- /dev/null
+++ b/src/core/lib/iomgr/error.c
@@ -0,0 +1,535 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/error.h"
+
+#include <inttypes.h>
+#include <stdbool.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/avl.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#ifdef GPR_WINDOWS
+#include <grpc/support/log_windows.h>
+#endif
+
+static void destroy_integer(void *key) {}
+
+static void *copy_integer(void *key) { return key; }
+
+static long compare_integers(void *key1, void *key2) {
+ return GPR_ICMP((uintptr_t)key1, (uintptr_t)key2);
+}
+
+static void destroy_string(void *str) { gpr_free(str); }
+
+static void *copy_string(void *str) { return gpr_strdup(str); }
+
+static void destroy_err(void *err) { GRPC_ERROR_UNREF(err); }
+
+static void *copy_err(void *err) { return GRPC_ERROR_REF(err); }
+
+static void destroy_time(void *tm) { gpr_free(tm); }
+
+static gpr_timespec *box_time(gpr_timespec tm) {
+ gpr_timespec *out = gpr_malloc(sizeof(*out));
+ *out = tm;
+ return out;
+}
+
+static void *copy_time(void *tm) { return box_time(*(gpr_timespec *)tm); }
+
+static const gpr_avl_vtable avl_vtable_ints = {destroy_integer, copy_integer,
+ compare_integers,
+ destroy_integer, copy_integer};
+
+static const gpr_avl_vtable avl_vtable_strs = {destroy_integer, copy_integer,
+ compare_integers, destroy_string,
+ copy_string};
+
+static const gpr_avl_vtable avl_vtable_times = {
+ destroy_integer, copy_integer, compare_integers, destroy_time, copy_time};
+
+static const gpr_avl_vtable avl_vtable_errs = {
+ destroy_integer, copy_integer, compare_integers, destroy_err, copy_err};
+
+static const char *error_int_name(grpc_error_ints key) {
+ switch (key) {
+ case GRPC_ERROR_INT_ERRNO:
+ return "errno";
+ case GRPC_ERROR_INT_FILE_LINE:
+ return "file_line";
+ case GRPC_ERROR_INT_STREAM_ID:
+ return "stream_id";
+ case GRPC_ERROR_INT_GRPC_STATUS:
+ return "grpc_status";
+ case GRPC_ERROR_INT_OFFSET:
+ return "offset";
+ case GRPC_ERROR_INT_INDEX:
+ return "index";
+ case GRPC_ERROR_INT_SIZE:
+ return "size";
+ case GRPC_ERROR_INT_HTTP2_ERROR:
+ return "http2_error";
+ case GRPC_ERROR_INT_TSI_CODE:
+ return "tsi_code";
+ case GRPC_ERROR_INT_SECURITY_STATUS:
+ return "security_status";
+ case GRPC_ERROR_INT_FD:
+ return "fd";
+ case GRPC_ERROR_INT_WSA_ERROR:
+ return "wsa_error";
+ case GRPC_ERROR_INT_HTTP_STATUS:
+ return "http_status";
+ }
+ GPR_UNREACHABLE_CODE(return "unknown");
+}
+
+static const char *error_str_name(grpc_error_strs key) {
+ switch (key) {
+ case GRPC_ERROR_STR_DESCRIPTION:
+ return "description";
+ case GRPC_ERROR_STR_OS_ERROR:
+ return "os_error";
+ case GRPC_ERROR_STR_TARGET_ADDRESS:
+ return "target_address";
+ case GRPC_ERROR_STR_SYSCALL:
+ return "syscall";
+ case GRPC_ERROR_STR_FILE:
+ return "file";
+ case GRPC_ERROR_STR_GRPC_MESSAGE:
+ return "grpc_message";
+ case GRPC_ERROR_STR_RAW_BYTES:
+ return "raw_bytes";
+ case GRPC_ERROR_STR_TSI_ERROR:
+ return "tsi_error";
+ case GRPC_ERROR_STR_FILENAME:
+ return "filename";
+ }
+ GPR_UNREACHABLE_CODE(return "unknown");
+}
+
+static const char *error_time_name(grpc_error_times key) {
+ switch (key) {
+ case GRPC_ERROR_TIME_CREATED:
+ return "created";
+ }
+ GPR_UNREACHABLE_CODE(return "unknown");
+}
+
+struct grpc_error {
+ gpr_refcount refs;
+ gpr_avl ints;
+ gpr_avl strs;
+ gpr_avl times;
+ gpr_avl errs;
+ uintptr_t next_err;
+};
+
+static bool is_special(grpc_error *err) {
+ return err == GRPC_ERROR_NONE || err == GRPC_ERROR_OOM ||
+ err == GRPC_ERROR_CANCELLED;
+}
+
+#ifdef GRPC_ERROR_REFCOUNT_DEBUG
+grpc_error *grpc_error_ref(grpc_error *err, const char *file, int line,
+ const char *func) {
+ if (is_special(err)) return err;
+ gpr_log(GPR_DEBUG, "%p: %d -> %d [%s:%d %s]", err, err->refs.count,
+ err->refs.count + 1, file, line, func);
+ gpr_ref(&err->refs);
+ return err;
+}
+#else
+grpc_error *grpc_error_ref(grpc_error *err) {
+ if (is_special(err)) return err;
+ gpr_ref(&err->refs);
+ return err;
+}
+#endif
+
+static void error_destroy(grpc_error *err) {
+ GPR_ASSERT(!is_special(err));
+ gpr_avl_unref(err->ints);
+ gpr_avl_unref(err->strs);
+ gpr_avl_unref(err->errs);
+ gpr_avl_unref(err->times);
+ gpr_free(err);
+}
+
+#ifdef GRPC_ERROR_REFCOUNT_DEBUG
+void grpc_error_unref(grpc_error *err, const char *file, int line,
+ const char *func) {
+ if (is_special(err)) return;
+ gpr_log(GPR_DEBUG, "%p: %d -> %d [%s:%d %s]", err, err->refs.count,
+ err->refs.count - 1, file, line, func);
+ if (gpr_unref(&err->refs)) {
+ error_destroy(err);
+ }
+}
+#else
+void grpc_error_unref(grpc_error *err) {
+ if (is_special(err)) return;
+ if (gpr_unref(&err->refs)) {
+ error_destroy(err);
+ }
+}
+#endif
+
+grpc_error *grpc_error_create(const char *file, int line, const char *desc,
+ grpc_error **referencing,
+ size_t num_referencing) {
+ grpc_error *err = gpr_malloc(sizeof(*err));
+ if (err == NULL) { // TODO(ctiller): make gpr_malloc return NULL
+ return GRPC_ERROR_OOM;
+ }
+#ifdef GRPC_ERROR_REFCOUNT_DEBUG
+ gpr_log(GPR_DEBUG, "%p create [%s:%d]", err, file, line);
+#endif
+ err->ints = gpr_avl_add(gpr_avl_create(&avl_vtable_ints),
+ (void *)(uintptr_t)GRPC_ERROR_INT_FILE_LINE,
+ (void *)(uintptr_t)line);
+ err->strs = gpr_avl_add(
+ gpr_avl_add(gpr_avl_create(&avl_vtable_strs),
+ (void *)(uintptr_t)GRPC_ERROR_STR_FILE, gpr_strdup(file)),
+ (void *)(uintptr_t)GRPC_ERROR_STR_DESCRIPTION, gpr_strdup(desc));
+ err->errs = gpr_avl_create(&avl_vtable_errs);
+ err->next_err = 0;
+ for (size_t i = 0; i < num_referencing; i++) {
+ if (referencing[i] == GRPC_ERROR_NONE) continue;
+ err->errs = gpr_avl_add(err->errs, (void *)(err->next_err++),
+ GRPC_ERROR_REF(referencing[i]));
+ }
+ err->times = gpr_avl_add(gpr_avl_create(&avl_vtable_times),
+ (void *)(uintptr_t)GRPC_ERROR_TIME_CREATED,
+ box_time(gpr_now(GPR_CLOCK_REALTIME)));
+ gpr_ref_init(&err->refs, 1);
+ return err;
+}
+
+static grpc_error *copy_error_and_unref(grpc_error *in) {
+ if (is_special(in)) {
+ if (in == GRPC_ERROR_NONE) return GRPC_ERROR_CREATE("no error");
+ if (in == GRPC_ERROR_OOM) return GRPC_ERROR_CREATE("oom");
+ if (in == GRPC_ERROR_CANCELLED) return GRPC_ERROR_CREATE("cancelled");
+ return GRPC_ERROR_CREATE("unknown");
+ }
+ grpc_error *out = gpr_malloc(sizeof(*out));
+#ifdef GRPC_ERROR_REFCOUNT_DEBUG
+ gpr_log(GPR_DEBUG, "%p create copying", out);
+#endif
+ out->ints = gpr_avl_ref(in->ints);
+ out->strs = gpr_avl_ref(in->strs);
+ out->errs = gpr_avl_ref(in->errs);
+ out->times = gpr_avl_ref(in->times);
+ out->next_err = in->next_err;
+ gpr_ref_init(&out->refs, 1);
+ GRPC_ERROR_UNREF(in);
+ return out;
+}
+
+grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
+ intptr_t value) {
+ grpc_error *new = copy_error_and_unref(src);
+ new->ints = gpr_avl_add(new->ints, (void *)(uintptr_t)which, (void *)value);
+ return new;
+}
+
+bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) {
+ void *pp;
+ if (gpr_avl_maybe_get(err->ints, (void *)(uintptr_t)which, &pp)) {
+ if (p != NULL) *p = (intptr_t)pp;
+ return true;
+ }
+ return false;
+}
+
+grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
+ const char *value) {
+ grpc_error *new = copy_error_and_unref(src);
+ new->strs =
+ gpr_avl_add(new->strs, (void *)(uintptr_t)which, gpr_strdup(value));
+ return new;
+}
+
+grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) {
+ grpc_error *new = copy_error_and_unref(src);
+ new->errs = gpr_avl_add(new->errs, (void *)(new->next_err++), child);
+ return new;
+}
+
+static const char *no_error_string = "null";
+static const char *oom_error_string = "\"Out of memory\"";
+static const char *cancelled_error_string = "\"Cancelled\"";
+
+typedef struct {
+ char *key;
+ char *value;
+} kv_pair;
+
+typedef struct {
+ kv_pair *kvs;
+ size_t num_kvs;
+ size_t cap_kvs;
+} kv_pairs;
+
+static void append_kv(kv_pairs *kvs, char *key, char *value) {
+ if (kvs->num_kvs == kvs->cap_kvs) {
+ kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4);
+ kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs);
+ }
+ kvs->kvs[kvs->num_kvs].key = key;
+ kvs->kvs[kvs->num_kvs].value = value;
+ kvs->num_kvs++;
+}
+
+static void collect_kvs(gpr_avl_node *node, char *key(void *k),
+ char *fmt(void *v), kv_pairs *kvs) {
+ if (node == NULL) return;
+ append_kv(kvs, key(node->key), fmt(node->value));
+ collect_kvs(node->left, key, fmt, kvs);
+ collect_kvs(node->right, key, fmt, kvs);
+}
+
+static char *key_int(void *p) {
+ return gpr_strdup(error_int_name((grpc_error_ints)(uintptr_t)p));
+}
+
+static char *key_str(void *p) {
+ return gpr_strdup(error_str_name((grpc_error_strs)(uintptr_t)p));
+}
+
+static char *key_time(void *p) {
+ return gpr_strdup(error_time_name((grpc_error_times)(uintptr_t)p));
+}
+
+static char *fmt_int(void *p) {
+ char *s;
+ gpr_asprintf(&s, "%" PRIdPTR, (intptr_t)p);
+ return s;
+}
+
+static void append_chr(char c, char **s, size_t *sz, size_t *cap) {
+ if (*sz == *cap) {
+ *cap = GPR_MAX(8, 3 * *cap / 2);
+ *s = gpr_realloc(*s, *cap);
+ }
+ (*s)[(*sz)++] = c;
+}
+
+static void append_str(const char *str, char **s, size_t *sz, size_t *cap) {
+ for (const char *c = str; *c; c++) {
+ append_chr(*c, s, sz, cap);
+ }
+}
+
+static void append_esc_str(const char *str, char **s, size_t *sz, size_t *cap) {
+ static const char *hex = "0123456789abcdef";
+ append_chr('"', s, sz, cap);
+ for (const uint8_t *c = (const uint8_t *)str; *c; c++) {
+ if (*c < 32 || *c >= 127) {
+ append_chr('\\', s, sz, cap);
+ switch (*c) {
+ case '\b':
+ append_chr('b', s, sz, cap);
+ break;
+ case '\f':
+ append_chr('f', s, sz, cap);
+ break;
+ case '\n':
+ append_chr('n', s, sz, cap);
+ break;
+ case '\r':
+ append_chr('r', s, sz, cap);
+ break;
+ case '\t':
+ append_chr('t', s, sz, cap);
+ break;
+ default:
+ append_chr('u', s, sz, cap);
+ append_chr('0', s, sz, cap);
+ append_chr('0', s, sz, cap);
+ append_chr(hex[*c >> 4], s, sz, cap);
+ append_chr(hex[*c & 0x0f], s, sz, cap);
+ break;
+ }
+ } else {
+ append_chr((char)*c, s, sz, cap);
+ }
+ }
+ append_chr('"', s, sz, cap);
+}
+
+static char *fmt_str(void *p) {
+ char *s = NULL;
+ size_t sz = 0;
+ size_t cap = 0;
+ append_esc_str(p, &s, &sz, &cap);
+ append_chr(0, &s, &sz, &cap);
+ return s;
+}
+
+static char *fmt_time(void *p) {
+ gpr_timespec tm = *(gpr_timespec *)p;
+ char *out;
+ char *pfx = "!!";
+ switch (tm.clock_type) {
+ case GPR_CLOCK_MONOTONIC:
+ pfx = "@monotonic:";
+ break;
+ case GPR_CLOCK_REALTIME:
+ pfx = "@";
+ break;
+ case GPR_CLOCK_PRECISE:
+ pfx = "@precise:";
+ break;
+ case GPR_TIMESPAN:
+ pfx = "";
+ break;
+ }
+ gpr_asprintf(&out, "\"%s%" PRId64 ".%09d\"", pfx, tm.tv_sec, tm.tv_nsec);
+ return out;
+}
+
+static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap) {
+ if (n == NULL) return;
+ add_errs(n->left, s, sz, cap);
+ const char *e = grpc_error_string(n->value);
+ append_str(e, s, sz, cap);
+ grpc_error_free_string(e);
+ add_errs(n->right, s, sz, cap);
+}
+
+static char *errs_string(grpc_error *err) {
+ char *s = NULL;
+ size_t sz = 0;
+ size_t cap = 0;
+ append_chr('[', &s, &sz, &cap);
+ add_errs(err->errs.root, &s, &sz, &cap);
+ append_chr(']', &s, &sz, &cap);
+ append_chr(0, &s, &sz, &cap);
+ return s;
+}
+
+static int cmp_kvs(const void *a, const void *b) {
+ const kv_pair *ka = a;
+ const kv_pair *kb = b;
+ return strcmp(ka->key, kb->key);
+}
+
+static const char *finish_kvs(kv_pairs *kvs) {
+ char *s = NULL;
+ size_t sz = 0;
+ size_t cap = 0;
+
+ append_chr('{', &s, &sz, &cap);
+ for (size_t i = 0; i < kvs->num_kvs; i++) {
+ if (i != 0) append_chr(',', &s, &sz, &cap);
+ append_esc_str(kvs->kvs[i].key, &s, &sz, &cap);
+ gpr_free(kvs->kvs[i].key);
+ append_chr(':', &s, &sz, &cap);
+ append_str(kvs->kvs[i].value, &s, &sz, &cap);
+ gpr_free(kvs->kvs[i].value);
+ }
+ append_chr('}', &s, &sz, &cap);
+ append_chr(0, &s, &sz, &cap);
+
+ gpr_free(kvs->kvs);
+ return s;
+}
+
+void grpc_error_free_string(const char *str) {
+ if (str == no_error_string) return;
+ if (str == oom_error_string) return;
+ if (str == cancelled_error_string) return;
+ gpr_free((char *)str);
+}
+
+const char *grpc_error_string(grpc_error *err) {
+ if (err == GRPC_ERROR_NONE) return no_error_string;
+ if (err == GRPC_ERROR_OOM) return oom_error_string;
+ if (err == GRPC_ERROR_CANCELLED) return cancelled_error_string;
+
+ kv_pairs kvs;
+ memset(&kvs, 0, sizeof(kvs));
+
+ collect_kvs(err->ints.root, key_int, fmt_int, &kvs);
+ collect_kvs(err->strs.root, key_str, fmt_str, &kvs);
+ collect_kvs(err->times.root, key_time, fmt_time, &kvs);
+ if (!gpr_avl_is_empty(err->errs)) {
+ append_kv(&kvs, gpr_strdup("referenced_errors"), errs_string(err));
+ }
+
+ qsort(kvs.kvs, kvs.num_kvs, sizeof(kv_pair), cmp_kvs);
+
+ return finish_kvs(&kvs);
+}
+
+grpc_error *grpc_os_error(const char *file, int line, int err,
+ const char *call_name) {
+ return grpc_error_set_str(
+ grpc_error_set_str(
+ grpc_error_set_int(grpc_error_create(file, line, "OS Error", NULL, 0),
+ GRPC_ERROR_INT_ERRNO, err),
+ GRPC_ERROR_STR_OS_ERROR, strerror(err)),
+ GRPC_ERROR_STR_SYSCALL, call_name);
+}
+
+#ifdef GPR_WINDOWS
+grpc_error *grpc_wsa_error(const char *file, int line, int err,
+ const char *call_name) {
+ char *utf8_message = gpr_format_message(err);
+ grpc_error *error = grpc_error_set_str(
+ grpc_error_set_str(
+ grpc_error_set_int(grpc_error_create(file, line, "OS Error", NULL, 0),
+ GRPC_ERROR_INT_WSA_ERROR, err),
+ GRPC_ERROR_STR_OS_ERROR, utf8_message),
+ GRPC_ERROR_STR_SYSCALL, call_name);
+ gpr_free(utf8_message);
+ return error;
+}
+#endif
+
+bool grpc_log_if_error(const char *what, grpc_error *error, const char *file,
+ int line) {
+ if (error == GRPC_ERROR_NONE) return true;
+ const char *msg = grpc_error_string(error);
+ gpr_log(file, line, GPR_LOG_SEVERITY_ERROR, "%s: %s", what, msg);
+ grpc_error_free_string(msg);
+ GRPC_ERROR_UNREF(error);
+ return false;
+}
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
new file mode 100644
index 0000000000..69cdf3028e
--- /dev/null
+++ b/src/core/lib/iomgr/error.h
@@ -0,0 +1,192 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_ERROR_H
+#define GRPC_CORE_LIB_IOMGR_ERROR_H
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#include <grpc/support/time.h>
+
+/// Opaque representation of an error.
+/// Errors are refcounted objects that represent the result of an operation.
+/// Ownership laws:
+/// if a grpc_error is returned by a function, the caller owns a ref to that
+/// instance
+/// if a grpc_error is passed to a grpc_closure callback function (functions
+/// with the signature:
+/// void (*f)(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error))
+/// then those functions do not automatically own a ref to error
+/// if a grpc_error is passed to *ANY OTHER FUNCTION* then that function takes
+/// ownership of the error
+/// Errors have:
+/// a set of ints, strings, and timestamps that describe the error
+/// always present are:
+/// GRPC_ERROR_STR_FILE, GRPC_ERROR_INT_FILE_LINE - source location the error
+/// was generated
+/// GRPC_ERROR_STR_DESCRIPTION - a human readable description of the error
+/// GRPC_ERROR_TIME_CREATED - a timestamp indicating when the error happened
+/// an error can also have children; these are other errors that are believed
+/// to have contributed to this one. By accumulating children, we can begin
+/// to root cause high level failures from low level failures, without having
+/// to derive execution paths from log lines
+typedef struct grpc_error grpc_error;
+
+typedef enum {
+ /// 'errno' from the operating system
+ GRPC_ERROR_INT_ERRNO,
+ /// __LINE__ from the call site creating the error
+ GRPC_ERROR_INT_FILE_LINE,
+ /// stream identifier: for errors that are associated with an individual
+ /// wire stream
+ GRPC_ERROR_INT_STREAM_ID,
+ /// grpc status code representing this error
+ GRPC_ERROR_INT_GRPC_STATUS,
+ /// offset into some binary blob (usually represented by
+ /// GRPC_ERROR_STR_RAW_BYTES) where the error occurred
+ GRPC_ERROR_INT_OFFSET,
+ /// context sensitive index associated with the error
+ GRPC_ERROR_INT_INDEX,
+ /// context sensitive size associated with the error
+ GRPC_ERROR_INT_SIZE,
+ /// http2 error code associated with the error (see the HTTP2 RFC)
+ GRPC_ERROR_INT_HTTP2_ERROR,
+ /// TSI status code associated with the error
+ GRPC_ERROR_INT_TSI_CODE,
+ /// grpc_security_status associated with the error
+ GRPC_ERROR_INT_SECURITY_STATUS,
+ /// WSAGetLastError() reported when this error occurred
+ GRPC_ERROR_INT_WSA_ERROR,
+ /// File descriptor associated with this error
+ GRPC_ERROR_INT_FD,
+ /// HTTP status (i.e. 404)
+ GRPC_ERROR_INT_HTTP_STATUS,
+} grpc_error_ints;
+
+typedef enum {
+ /// top-level textual description of this error
+ GRPC_ERROR_STR_DESCRIPTION,
+ /// source file in which this error occurred
+ GRPC_ERROR_STR_FILE,
+ /// operating system description of this error
+ GRPC_ERROR_STR_OS_ERROR,
+ /// syscall that generated this error
+ GRPC_ERROR_STR_SYSCALL,
+ /// peer that we were trying to communicate when this error occurred
+ GRPC_ERROR_STR_TARGET_ADDRESS,
+ /// grpc status message associated with this error
+ GRPC_ERROR_STR_GRPC_MESSAGE,
+ /// hex dump (or similar) with the data that generated this error
+ GRPC_ERROR_STR_RAW_BYTES,
+ /// tsi error string associated with this error
+ GRPC_ERROR_STR_TSI_ERROR,
+ /// filename that we were trying to read/write when this error occurred
+ GRPC_ERROR_STR_FILENAME,
+} grpc_error_strs;
+
+typedef enum {
+ /// timestamp of error creation
+ GRPC_ERROR_TIME_CREATED,
+} grpc_error_times;
+
+#define GRPC_ERROR_NONE ((grpc_error *)NULL)
+#define GRPC_ERROR_OOM ((grpc_error *)1)
+#define GRPC_ERROR_CANCELLED ((grpc_error *)2)
+
+const char *grpc_error_string(grpc_error *error);
+void grpc_error_free_string(const char *str);
+
+/// Create an error - but use GRPC_ERROR_CREATE instead
+grpc_error *grpc_error_create(const char *file, int line, const char *desc,
+ grpc_error **referencing, size_t num_referencing);
+/// Create an error (this is the preferred way of generating an error that is
+/// not due to a system call - for system calls, use GRPC_OS_ERROR or
+/// GRPC_WSA_ERROR as appropriate)
+/// \a referencing is an array of num_referencing elements indicating one or
+/// more errors that are believed to have contributed to this one
+/// err = grpc_error_create(x, y, z, r, nr) is equivalent to:
+/// err = grpc_error_create(x, y, z, NULL, 0);
+/// for (i=0; i<nr; i++) err = grpc_error_add_child(err, r[i]);
+#define GRPC_ERROR_CREATE(desc) \
+ grpc_error_create(__FILE__, __LINE__, desc, NULL, 0)
+
+// Create an error that references some other errors. This function adds a
+// reference to each error in errs - it does not consume an existing reference
+#define GRPC_ERROR_CREATE_REFERENCING(desc, errs, count) \
+ grpc_error_create(__FILE__, __LINE__, desc, errs, count)
+
+//#define GRPC_ERROR_REFCOUNT_DEBUG
+#ifdef GRPC_ERROR_REFCOUNT_DEBUG
+grpc_error *grpc_error_ref(grpc_error *err, const char *file, int line,
+ const char *func);
+void grpc_error_unref(grpc_error *err, const char *file, int line,
+ const char *func);
+#define GRPC_ERROR_REF(err) grpc_error_ref(err, __FILE__, __LINE__, __func__)
+#define GRPC_ERROR_UNREF(err) \
+ grpc_error_unref(err, __FILE__, __LINE__, __func__)
+#else
+grpc_error *grpc_error_ref(grpc_error *err);
+void grpc_error_unref(grpc_error *err);
+#define GRPC_ERROR_REF(err) grpc_error_ref(err)
+#define GRPC_ERROR_UNREF(err) grpc_error_unref(err)
+#endif
+
+grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
+ intptr_t value);
+bool grpc_error_get_int(grpc_error *error, grpc_error_ints which, intptr_t *p);
+grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which,
+ gpr_timespec value);
+grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
+ const char *value);
+/// Add a child error: an error that is believed to have contributed to this
+/// error occurring. Allows root causing high level errors from lower level
+/// errors that contributed to them.
+grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child);
+grpc_error *grpc_os_error(const char *file, int line, int err,
+ const char *call_name);
+/// create an error associated with errno!=0 (an 'operating system' error)
+#define GRPC_OS_ERROR(err, call_name) \
+ grpc_os_error(__FILE__, __LINE__, err, call_name)
+grpc_error *grpc_wsa_error(const char *file, int line, int err,
+ const char *call_name);
+/// windows only: create an error associated with WSAGetLastError()!=0
+#define GRPC_WSA_ERROR(err, call_name) \
+ grpc_wsa_error(__FILE__, __LINE__, err, call_name)
+
+bool grpc_log_if_error(const char *what, grpc_error *error, const char *file,
+ int line);
+#define GRPC_LOG_IF_ERROR(what, error) \
+ grpc_log_if_error((what), (error), __FILE__, __LINE__)
+
+#endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index 3c8127e1a8..9e306af5fa 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -126,6 +126,9 @@ struct grpc_fd {
grpc_closure *on_done_closure;
grpc_iomgr_object iomgr_object;
+
+ /* The pollset that last noticed and notified that the fd is readable */
+ grpc_pollset *read_notifier_pollset;
};
/* Begin polling on an fd.
@@ -147,7 +150,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
if got_read or got_write are 1, also does the become_{readable,writable} as
appropriate. */
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
- int got_read, int got_write);
+ int got_read, int got_write,
+ grpc_pollset *read_notifier_pollset);
/* Return 1 if this fd is orphaned, 0 otherwise */
static bool fd_is_orphaned(grpc_fd *fd);
@@ -217,9 +221,10 @@ struct grpc_pollset {
struct grpc_pollset_vtable {
void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
struct grpc_fd *fd, int and_unlock_pollset);
- void (*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker *worker,
- gpr_timespec deadline, gpr_timespec now);
+ grpc_error *(*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
+ gpr_timespec deadline, gpr_timespec now);
void (*finish_shutdown)(grpc_pollset *pollset);
void (*destroy)(grpc_pollset *pollset);
};
@@ -247,9 +252,9 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
/* As per pollset_kick, with an extended set of flags (defined above)
-- mostly for fd_posix's use. */
-static void pollset_kick_ext(grpc_pollset *p,
- grpc_pollset_worker *specific_worker,
- uint32_t flags);
+static grpc_error *pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ uint32_t flags) GRPC_MUST_USE_RESULT;
/* turn a pollset into a multipoller: platform specific */
typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx,
@@ -342,6 +347,7 @@ static grpc_fd *alloc_fd(int fd) {
r->on_done_closure = NULL;
r->closed = 0;
r->released = 0;
+ r->read_notifier_pollset = NULL;
gpr_mu_unlock(&r->mu);
return r;
}
@@ -415,12 +421,13 @@ static bool fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
-static void pollset_kick_locked(grpc_fd_watcher *watcher) {
+static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) {
gpr_mu_lock(&watcher->pollset->mu);
GPR_ASSERT(watcher->worker);
- pollset_kick_ext(watcher->pollset, watcher->worker,
- GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
+ grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker,
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
gpr_mu_unlock(&watcher->pollset->mu);
+ return err;
}
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
@@ -459,7 +466,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
} else {
remove_fd_from_all_epoll_sets(fd->fd);
}
- grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
}
static int fd_wrapped_fd(grpc_fd *fd) {
@@ -508,15 +515,27 @@ static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
+static grpc_error *fd_shutdown_error(bool shutdown) {
+ if (!shutdown) {
+ return GRPC_ERROR_NONE;
+ } else {
+ return GRPC_ERROR_CREATE("FD shutdown");
+ }
+}
+
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
- if (*st == CLOSURE_NOT_READY) {
+ if (fd->shutdown) {
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
+ NULL);
+ } else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
+ grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
+ NULL);
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@@ -539,19 +558,35 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
+ grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
*st = CLOSURE_NOT_READY;
return 1;
}
}
+static void set_read_notifier_pollset_locked(
+ grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) {
+ fd->read_notifier_pollset = read_notifier_pollset;
+}
+
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->mu);
- GPR_ASSERT(!fd->shutdown);
- fd->shutdown = 1;
- set_ready_locked(exec_ctx, fd, &fd->read_closure);
- set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ /* only shutdown once */
+ if (!fd->shutdown) {
+ fd->shutdown = 1;
+ /* signal read/write closed to OS so that future operations fail */
+ shutdown(fd->fd, SHUT_RDWR);
+ set_ready_locked(exec_ctx, fd, &fd->read_closure);
+ set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ }
+ gpr_mu_unlock(&fd->mu);
+}
+
+static bool fd_is_shutdown(grpc_fd *fd) {
+ gpr_mu_lock(&fd->mu);
+ bool r = fd->shutdown;
gpr_mu_unlock(&fd->mu);
+ return r;
}
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
@@ -568,6 +603,18 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
gpr_mu_unlock(&fd->mu);
}
+/* Return the read-notifier pollset */
+static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd) {
+ grpc_pollset *notifier = NULL;
+
+ gpr_mu_lock(&fd->mu);
+ notifier = fd->read_notifier_pollset;
+ gpr_mu_unlock(&fd->mu);
+
+ return notifier;
+}
+
static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
grpc_pollset_worker *worker, uint32_t read_mask,
uint32_t write_mask, grpc_fd_watcher *watcher) {
@@ -620,7 +667,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
}
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
- int got_read, int got_write) {
+ int got_read, int got_write,
+ grpc_pollset *read_notifier_pollset) {
int was_polling = 0;
int kick = 0;
grpc_fd *fd = watcher->fd;
@@ -656,6 +704,10 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
kick = 1;
}
+
+ if (read_notifier_pollset != NULL) {
+ set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
+ }
}
if (got_write) {
if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
@@ -717,10 +769,19 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next->prev = worker;
}
-static void pollset_kick_ext(grpc_pollset *p,
- grpc_pollset_worker *specific_worker,
- uint32_t flags) {
+static void kick_append_error(grpc_error **composite, grpc_error *error) {
+ if (error == GRPC_ERROR_NONE) return;
+ if (*composite == GRPC_ERROR_NONE) {
+ *composite = GRPC_ERROR_CREATE("Kick Failure");
+ }
+ *composite = grpc_error_add_child(*composite, error);
+}
+
+static grpc_error *pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ uint32_t flags) {
GPR_TIMER_BEGIN("pollset_kick_ext", 0);
+ grpc_error *error = GRPC_ERROR_NONE;
/* pollset->mu already held */
if (specific_worker != NULL) {
@@ -730,25 +791,28 @@ static void pollset_kick_ext(grpc_pollset *p,
for (specific_worker = p->root_worker.next;
specific_worker != &p->root_worker;
specific_worker = specific_worker->next) {
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ kick_append_error(
+ &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
}
- p->kicked_without_pollers = 1;
+ p->kicked_without_pollers = true;
GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
} else if (gpr_tls_get(&g_current_thread_worker) !=
(intptr_t)specific_worker) {
GPR_TIMER_MARK("different_thread_worker", 0);
if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
- specific_worker->reevaluate_polling_on_wakeup = 1;
+ specific_worker->reevaluate_polling_on_wakeup = true;
}
- specific_worker->kicked_specifically = 1;
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ specific_worker->kicked_specifically = true;
+ kick_append_error(&error,
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
} else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
GPR_TIMER_MARK("kick_yoself", 0);
if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
- specific_worker->reevaluate_polling_on_wakeup = 1;
+ specific_worker->reevaluate_polling_on_wakeup = true;
}
- specific_worker->kicked_specifically = 1;
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ specific_worker->kicked_specifically = true;
+ kick_append_error(&error,
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
}
} else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
@@ -769,39 +833,41 @@ static void pollset_kick_ext(grpc_pollset *p,
if (specific_worker != NULL) {
GPR_TIMER_MARK("finally_kick", 0);
push_back_worker(p, specific_worker);
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ kick_append_error(
+ &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
}
} else {
GPR_TIMER_MARK("kicked_no_pollers", 0);
- p->kicked_without_pollers = 1;
+ p->kicked_without_pollers = true;
}
}
GPR_TIMER_END("pollset_kick_ext", 0);
+ return error;
}
-static void pollset_kick(grpc_pollset *p,
- grpc_pollset_worker *specific_worker) {
- pollset_kick_ext(p, specific_worker, 0);
+static grpc_error *pollset_kick(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker) {
+ return pollset_kick_ext(p, specific_worker, 0);
}
/* global state management */
-static void pollset_global_init(void) {
+static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
gpr_tls_init(&g_current_thread_worker);
- grpc_wakeup_fd_global_init();
- grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+ return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
}
static void pollset_global_shutdown(void) {
grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_poller);
gpr_tls_destroy(&g_current_thread_worker);
- grpc_wakeup_fd_global_destroy();
}
-static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
+static grpc_error *kick_poller(void) {
+ return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
+}
/* main interface */
@@ -864,14 +930,23 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
pollset->vtable->finish_shutdown(pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
+}
+
+static void work_combine_error(grpc_error **composite, grpc_error *error) {
+ if (error == GRPC_ERROR_NONE) return;
+ if (*composite == GRPC_ERROR_NONE) {
+ *composite = GRPC_ERROR_CREATE("pollset_work");
+ }
+ *composite = grpc_error_add_child(*composite, error);
}
-static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker_hdl, gpr_timespec now,
- gpr_timespec deadline) {
+static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker_hdl,
+ gpr_timespec now, gpr_timespec deadline) {
grpc_pollset_worker worker;
*worker_hdl = &worker;
+ grpc_error *error = GRPC_ERROR_NONE;
/* pollset->mu already held */
int added_worker = 0;
@@ -887,7 +962,10 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->local_wakeup_cache = worker.wakeup_fd->next;
} else {
worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
- grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
+ error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
+ if (error != GRPC_ERROR_NONE) {
+ return error;
+ }
}
worker.kicked_specifically = 0;
/* If there's work waiting for the pollset to be idle, and the
@@ -924,8 +1002,9 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
- pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker,
- deadline, now);
+ work_combine_error(&error,
+ pollset->vtable->maybe_work_and_unlock(
+ exec_ctx, pollset, &worker, deadline, now));
GPR_TIMER_END("maybe_work_and_unlock", 0);
locked = 0;
gpr_tls_set(&g_current_thread_poller, 0);
@@ -987,6 +1066,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
*worker_hdl = NULL;
GPR_TIMER_END("pollset_work", 0);
+ return error;
}
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -1035,7 +1115,7 @@ typedef struct grpc_unary_promote_args {
} grpc_unary_promote_args;
static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args,
- bool success) {
+ grpc_error *error) {
grpc_unary_promote_args *up_args = args;
const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
grpc_pollset *pollset = up_args->pollset;
@@ -1137,7 +1217,8 @@ static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
up_args->promotion_closure.cb = basic_do_promote;
up_args->promotion_closure.cb_arg = up_args;
- grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1);
+ grpc_closure_list_append(&pollset->idle_jobs, &up_args->promotion_closure,
+ GRPC_ERROR_NONE);
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
exit:
@@ -1146,11 +1227,9 @@ exit:
}
}
-static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset,
- grpc_pollset_worker *worker,
- gpr_timespec deadline,
- gpr_timespec now) {
+static grpc_error *basic_pollset_maybe_work_and_unlock(
+ grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
+ gpr_timespec deadline, gpr_timespec now) {
#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
@@ -1160,6 +1239,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
int timeout;
int r;
nfds_t nfds;
+ grpc_error *error = GRPC_ERROR_NONE;
fd = pollset->data.ptr;
if (fd && fd_is_orphaned(fd)) {
@@ -1200,33 +1280,37 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
if (r < 0) {
if (errno != EINTR) {
- gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
}
if (fd) {
- fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+ fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
}
} else if (r == 0) {
if (fd) {
- fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+ fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
}
} else {
if (pfd[0].revents & POLLIN_CHECK) {
- grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ work_combine_error(&error,
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
}
if (pfd[1].revents & POLLIN_CHECK) {
- grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
+ work_combine_error(&error,
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd));
}
if (nfds > 2) {
fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
- pfd[2].revents & POLLOUT_CHECK);
+ pfd[2].revents & POLLOUT_CHECK, pollset);
} else if (fd) {
- fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+ fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
}
}
if (fd) {
GRPC_FD_UNREF(fd, "basicpoll_begin");
}
+
+ return error;
}
static void basic_pollset_destroy(grpc_pollset *pollset) {
@@ -1287,7 +1371,7 @@ exit:
}
}
-static void multipoll_with_poll_pollset_maybe_work_and_unlock(
+static grpc_error *multipoll_with_poll_pollset_maybe_work_and_unlock(
grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline, gpr_timespec now) {
#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
@@ -1301,6 +1385,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
/* TODO(ctiller): inline some elements to avoid an allocation */
grpc_fd_watcher *watchers;
struct pollfd *pfds;
+ grpc_error *error = GRPC_ERROR_NONE;
h = pollset->data.ptr;
timeout = poll_deadline_to_millis_timeout(deadline, now);
@@ -1353,34 +1438,38 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
if (r < 0) {
if (errno != EINTR) {
- gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
}
for (i = 2; i < pfd_count; i++) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
}
} else if (r == 0) {
for (i = 2; i < pfd_count; i++) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
- grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ work_combine_error(&error,
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
}
if (pfds[1].revents & POLLIN_CHECK) {
- grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
+ work_combine_error(&error,
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd));
}
for (i = 2; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
continue;
}
fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
- pfds[i].revents & POLLOUT_CHECK);
+ pfds[i].revents & POLLOUT_CHECK, pollset);
}
}
gpr_free(pfds);
gpr_free(watchers);
+
+ return error;
}
static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
@@ -1451,20 +1540,31 @@ static void poll_become_multipoller(grpc_exec_ctx *exec_ctx,
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
-static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
+static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st,
+ grpc_pollset *read_notifier_pollset) {
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
gpr_mu_lock(&fd->mu);
set_ready_locked(exec_ctx, fd, st);
+
+ /* A non-NULL read_notifier_pollset means that the fd is readable. */
+ if (read_notifier_pollset != NULL) {
+ /* Note: Since the fd might be a part of multiple pollsets, this might be
+ * called multiple times (for each time the fd becomes readable) and it is
+ * okay to set the fd's read-notifier pollset to anyone of these pollsets */
+ set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
+ }
+
gpr_mu_unlock(&fd->mu);
}
-static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- set_ready(exec_ctx, fd, &fd->read_closure);
+static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_pollset *notifier_pollset) {
+ set_ready(exec_ctx, fd, &fd->read_closure, notifier_pollset);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- set_ready(exec_ctx, fd, &fd->write_closure);
+ set_ready(exec_ctx, fd, &fd->write_closure, NULL);
}
struct epoll_fd_list {
@@ -1556,11 +1656,11 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
}
}
- fd_end_poll(exec_ctx, &watcher, 0, 0);
+ fd_end_poll(exec_ctx, &watcher, 0, 0, NULL);
}
static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_status) {
+ grpc_error *error) {
delayed_add *da = arg;
if (!fd_is_orphaned(da->fd)) {
@@ -1573,7 +1673,8 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
/* We don't care about this pollset anymore. */
if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
da->pollset->called_shutdown = 1;
- grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, da->pollset->shutdown_done, GRPC_ERROR_NONE,
+ NULL);
}
}
gpr_mu_unlock(&da->pollset->mu);
@@ -1597,14 +1698,14 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
GRPC_FD_REF(fd, "delayed_add");
grpc_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
- grpc_exec_ctx_enqueue(exec_ctx, &da->closure, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &da->closure, GRPC_ERROR_NONE, NULL);
}
}
/* TODO(klempner): We probably want to turn this down a bit */
#define GRPC_EPOLL_MAX_EVENTS 1000
-static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
+static grpc_error *multipoll_with_epoll_pollset_maybe_work_and_unlock(
grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline, gpr_timespec now) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
@@ -1613,6 +1714,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
epoll_hdr *h = pollset->data.ptr;
int timeout_ms;
struct pollfd pfds[2];
+ grpc_error *error = GRPC_ERROR_NONE;
/* If you want to ignore epoll's ability to sanely handle parallel pollers,
* for a more apples-to-apples performance comparison with poll, add a
@@ -1641,13 +1743,14 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
if (poll_rv < 0) {
if (errno != EINTR) {
- gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
}
} else if (poll_rv == 0) {
/* do nothing */
} else {
if (pfds[0].revents) {
- grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
+ work_combine_error(&error,
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd));
}
if (pfds[1].revents) {
do {
@@ -1655,7 +1758,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
if (ep_rv < 0) {
if (errno != EINTR) {
- gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
+ work_combine_error(&error, GRPC_OS_ERROR(errno, "epoll_wait"));
}
} else {
int i;
@@ -1667,10 +1770,11 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
int write_ev = ep_ev[i].events & EPOLLOUT;
if (fd == NULL) {
- grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ work_combine_error(&error, grpc_wakeup_fd_consume_wakeup(
+ &grpc_global_wakeup_fd));
} else {
if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd);
+ fd_become_readable(exec_ctx, fd, pollset);
}
if (write_ev || cancel) {
fd_become_writable(exec_ctx, fd);
@@ -1681,6 +1785,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
} while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
}
}
+ return error;
}
static void multipoll_with_epoll_pollset_finish_shutdown(
@@ -1897,8 +2002,10 @@ static const grpc_event_engine_vtable vtable = {
.fd_wrapped_fd = fd_wrapped_fd,
.fd_orphan = fd_orphan,
.fd_shutdown = fd_shutdown,
+ .fd_is_shutdown = fd_is_shutdown,
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
+ .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
new file mode 100644
index 0000000000..45c0a5e954
--- /dev/null
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -0,0 +1,1267 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_SOCKET
+
+#include "src/core/lib/iomgr/ev_poll_posix.h"
+
+#include <assert.h>
+#include <errno.h>
+#include <poll.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/tls.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/block_annotate.h"
+
+/*******************************************************************************
+ * FD declarations
+ */
+
+typedef struct grpc_fd_watcher {
+ struct grpc_fd_watcher *next;
+ struct grpc_fd_watcher *prev;
+ grpc_pollset *pollset;
+ grpc_pollset_worker *worker;
+ grpc_fd *fd;
+} grpc_fd_watcher;
+
+struct grpc_fd {
+ int fd;
+ /* refst format:
+ bit0: 1=active/0=orphaned
+ bit1-n: refcount
+ meaning that mostly we ref by two to avoid altering the orphaned bit,
+ and just unref by 1 when we're ready to flag the object as orphaned */
+ gpr_atm refst;
+
+ gpr_mu mu;
+ int shutdown;
+ int closed;
+ int released;
+
+ /* The watcher list.
+
+ The following watcher related fields are protected by watcher_mu.
+
+ An fd_watcher is an ephemeral object created when an fd wants to
+ begin polling, and destroyed after the poll.
+
+ It denotes the fd's interest in whether to read poll or write poll
+ or both or neither on this fd.
+
+ If a watcher is asked to poll for reads or writes, the read_watcher
+ or write_watcher fields are set respectively. A watcher may be asked
+ to poll for both, in which case both fields will be set.
+
+ read_watcher and write_watcher may be NULL if no watcher has been
+ asked to poll for reads or writes.
+
+ If an fd_watcher is not asked to poll for reads or writes, it's added
+ to a linked list of inactive watchers, rooted at inactive_watcher_root.
+ If at a later time there becomes need of a poller to poll, one of
+ the inactive pollers may be kicked out of their poll loops to take
+ that responsibility. */
+ grpc_fd_watcher inactive_watcher_root;
+ grpc_fd_watcher *read_watcher;
+ grpc_fd_watcher *write_watcher;
+
+ grpc_closure *read_closure;
+ grpc_closure *write_closure;
+
+ grpc_closure *on_done_closure;
+
+ grpc_iomgr_object iomgr_object;
+
+ /* The pollset that last noticed and notified that the fd is readable */
+ grpc_pollset *read_notifier_pollset;
+};
+
+/* Begin polling on an fd.
+ Registers that the given pollset is interested in this fd - so that if read
+ or writability interest changes, the pollset can be kicked to pick up that
+ new interest.
+ Return value is:
+ (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
+ i.e. a combination of read_mask and write_mask determined by the fd's current
+ interest in said events.
+ Polling strategies that do not need to alter their behavior depending on the
+ fd's current interest (such as epoll) do not need to call this function.
+ MUST NOT be called with a pollset lock taken */
+static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
+ grpc_pollset_worker *worker, uint32_t read_mask,
+ uint32_t write_mask, grpc_fd_watcher *rec);
+/* Complete polling previously started with fd_begin_poll
+ MUST NOT be called with a pollset lock taken
+ if got_read or got_write are 1, also does the become_{readable,writable} as
+ appropriate. */
+static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
+ int got_read, int got_write,
+ grpc_pollset *read_notifier_pollset);
+
+/* Return 1 if this fd is orphaned, 0 otherwise */
+static bool fd_is_orphaned(grpc_fd *fd);
+
+/* Reference counting for fds */
+/*#define GRPC_FD_REF_COUNT_DEBUG*/
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
+static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
+ int line);
+#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
+#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
+#else
+static void fd_ref(grpc_fd *fd);
+static void fd_unref(grpc_fd *fd);
+#define GRPC_FD_REF(fd, reason) fd_ref(fd)
+#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
+#endif
+
+#define CLOSURE_NOT_READY ((grpc_closure *)0)
+#define CLOSURE_READY ((grpc_closure *)1)
+
+/*******************************************************************************
+ * pollset declarations
+ */
+
+typedef struct grpc_cached_wakeup_fd {
+ grpc_wakeup_fd fd;
+ struct grpc_cached_wakeup_fd *next;
+} grpc_cached_wakeup_fd;
+
+struct grpc_pollset_worker {
+ grpc_cached_wakeup_fd *wakeup_fd;
+ int reevaluate_polling_on_wakeup;
+ int kicked_specifically;
+ struct grpc_pollset_worker *next;
+ struct grpc_pollset_worker *prev;
+};
+
+struct grpc_pollset {
+ gpr_mu mu;
+ grpc_pollset_worker root_worker;
+ int shutting_down;
+ int called_shutdown;
+ int kicked_without_pollers;
+ grpc_closure *shutdown_done;
+ grpc_closure_list idle_jobs;
+ /* all polled fds */
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+ /* Local cache of eventfds for workers */
+ grpc_cached_wakeup_fd *local_wakeup_cache;
+};
+
+/* Add an fd to a pollset */
+static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ struct grpc_fd *fd);
+
+static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd);
+
+/* Convert a timespec to milliseconds:
+ - very small or negative poll times are clamped to zero to do a
+ non-blocking poll (which becomes spin polling)
+ - other small values are rounded up to one millisecond
+ - longer than a millisecond polls are rounded up to the next nearest
+ millisecond to avoid spinning
+ - infinite timeouts are converted to -1 */
+static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
+ gpr_timespec now);
+
+/* Allow kick to wakeup the currently polling worker */
+#define GRPC_POLLSET_CAN_KICK_SELF 1
+/* Force the wakee to repoll when awoken */
+#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
+/* As per pollset_kick, with an extended set of flags (defined above)
+ -- mostly for fd_posix's use. */
+static grpc_error *pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ uint32_t flags) GRPC_MUST_USE_RESULT;
+
+/* Return 1 if the pollset has active threads in pollset_work (pollset must
+ * be locked) */
+static int pollset_has_workers(grpc_pollset *pollset);
+
+/*******************************************************************************
+ * pollset_set definitions
+ */
+
+struct grpc_pollset_set {
+ gpr_mu mu;
+
+ size_t pollset_count;
+ size_t pollset_capacity;
+ grpc_pollset **pollsets;
+
+ size_t pollset_set_count;
+ size_t pollset_set_capacity;
+ struct grpc_pollset_set **pollset_sets;
+
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+};
+
+/*******************************************************************************
+ * fd_posix.c
+ */
+
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
+#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
+static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
+ int line) {
+ gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
+ gpr_atm_no_barrier_load(&fd->refst),
+ gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
+#else
+#define REF_BY(fd, n, reason) ref_by(fd, n)
+#define UNREF_BY(fd, n, reason) unref_by(fd, n)
+static void ref_by(grpc_fd *fd, int n) {
+#endif
+ GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
+}
+
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
+ int line) {
+ gpr_atm old;
+ gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
+ gpr_atm_no_barrier_load(&fd->refst),
+ gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
+#else
+static void unref_by(grpc_fd *fd, int n) {
+ gpr_atm old;
+#endif
+ old = gpr_atm_full_fetch_add(&fd->refst, -n);
+ if (old == n) {
+ gpr_mu_destroy(&fd->mu);
+ grpc_iomgr_unregister_object(&fd->iomgr_object);
+ gpr_free(fd);
+ } else {
+ GPR_ASSERT(old > n);
+ }
+}
+
+static grpc_fd *fd_create(int fd, const char *name) {
+ grpc_fd *r = gpr_malloc(sizeof(*r));
+ gpr_mu_init(&r->mu);
+ gpr_atm_rel_store(&r->refst, 1);
+ r->shutdown = 0;
+ r->read_closure = CLOSURE_NOT_READY;
+ r->write_closure = CLOSURE_NOT_READY;
+ r->fd = fd;
+ r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
+ &r->inactive_watcher_root;
+ r->read_watcher = r->write_watcher = NULL;
+ r->on_done_closure = NULL;
+ r->closed = 0;
+ r->released = 0;
+ r->read_notifier_pollset = NULL;
+
+ char *name2;
+ gpr_asprintf(&name2, "%s fd=%d", name, fd);
+ grpc_iomgr_register_object(&r->iomgr_object, name2);
+ gpr_free(name2);
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+ gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
+#endif
+ return r;
+}
+
+static bool fd_is_orphaned(grpc_fd *fd) {
+ return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
+}
+
+/* Return the read-notifier pollset */
+static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd) {
+ grpc_pollset *notifier = NULL;
+
+ gpr_mu_lock(&fd->mu);
+ notifier = fd->read_notifier_pollset;
+ gpr_mu_unlock(&fd->mu);
+
+ return notifier;
+}
+
+static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) {
+ gpr_mu_lock(&watcher->pollset->mu);
+ GPR_ASSERT(watcher->worker);
+ grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker,
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
+ gpr_mu_unlock(&watcher->pollset->mu);
+ return err;
+}
+
+static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
+ if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
+ pollset_kick_locked(fd->inactive_watcher_root.next);
+ } else if (fd->read_watcher) {
+ pollset_kick_locked(fd->read_watcher);
+ } else if (fd->write_watcher) {
+ pollset_kick_locked(fd->write_watcher);
+ }
+}
+
+static void wake_all_watchers_locked(grpc_fd *fd) {
+ grpc_fd_watcher *watcher;
+ for (watcher = fd->inactive_watcher_root.next;
+ watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
+ pollset_kick_locked(watcher);
+ }
+ if (fd->read_watcher) {
+ pollset_kick_locked(fd->read_watcher);
+ }
+ if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
+ pollset_kick_locked(fd->write_watcher);
+ }
+}
+
+static int has_watchers(grpc_fd *fd) {
+ return fd->read_watcher != NULL || fd->write_watcher != NULL ||
+ fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
+}
+
+static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+ fd->closed = 1;
+ if (!fd->released) {
+ close(fd->fd);
+ }
+ grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
+}
+
+static int fd_wrapped_fd(grpc_fd *fd) {
+ if (fd->released || fd->closed) {
+ return -1;
+ } else {
+ return fd->fd;
+ }
+}
+
+static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *on_done, int *release_fd,
+ const char *reason) {
+ fd->on_done_closure = on_done;
+ fd->released = release_fd != NULL;
+ if (!fd->released) {
+ shutdown(fd->fd, SHUT_RDWR);
+ } else {
+ *release_fd = fd->fd;
+ }
+ gpr_mu_lock(&fd->mu);
+ REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
+ if (!has_watchers(fd)) {
+ close_fd_locked(exec_ctx, fd);
+ } else {
+ wake_all_watchers_locked(fd);
+ }
+ gpr_mu_unlock(&fd->mu);
+ UNREF_BY(fd, 2, reason); /* drop the reference */
+}
+
+/* increment refcount by two to avoid changing the orphan bit */
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
+ int line) {
+ ref_by(fd, 2, reason, file, line);
+}
+
+static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
+ int line) {
+ unref_by(fd, 2, reason, file, line);
+}
+#else
+static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
+
+static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
+#endif
+
+static grpc_error *fd_shutdown_error(bool shutdown) {
+ if (!shutdown) {
+ return GRPC_ERROR_NONE;
+ } else {
+ return GRPC_ERROR_CREATE("FD shutdown");
+ }
+}
+
+static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st, grpc_closure *closure) {
+ if (fd->shutdown) {
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
+ NULL);
+ } else if (*st == CLOSURE_NOT_READY) {
+ /* not ready ==> switch to a waiting state by setting the closure */
+ *st = closure;
+ } else if (*st == CLOSURE_READY) {
+ /* already ready ==> queue the closure to run immediately */
+ *st = CLOSURE_NOT_READY;
+ grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
+ NULL);
+ maybe_wake_one_watcher_locked(fd);
+ } else {
+ /* upcallptr was set to a different closure. This is an error! */
+ gpr_log(GPR_ERROR,
+ "User called a notify_on function with a previous callback still "
+ "pending");
+ abort();
+ }
+}
+
+/* returns 1 if state becomes not ready */
+static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st) {
+ if (*st == CLOSURE_READY) {
+ /* duplicate ready ==> ignore */
+ return 0;
+ } else if (*st == CLOSURE_NOT_READY) {
+ /* not ready, and not waiting ==> flag ready */
+ *st = CLOSURE_READY;
+ return 0;
+ } else {
+ /* waiting ==> queue closure */
+ grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
+ *st = CLOSURE_NOT_READY;
+ return 1;
+ }
+}
+
+static void set_read_notifier_pollset_locked(
+ grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) {
+ fd->read_notifier_pollset = read_notifier_pollset;
+}
+
+static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+ gpr_mu_lock(&fd->mu);
+ /* only shutdown once */
+ if (!fd->shutdown) {
+ fd->shutdown = 1;
+ /* signal read/write closed to OS so that future operations fail */
+ shutdown(fd->fd, SHUT_RDWR);
+ set_ready_locked(exec_ctx, fd, &fd->read_closure);
+ set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ }
+ gpr_mu_unlock(&fd->mu);
+}
+
+static bool fd_is_shutdown(grpc_fd *fd) {
+ gpr_mu_lock(&fd->mu);
+ bool r = fd->shutdown;
+ gpr_mu_unlock(&fd->mu);
+ return r;
+}
+
+static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *closure) {
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *closure) {
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
+ grpc_pollset_worker *worker, uint32_t read_mask,
+ uint32_t write_mask, grpc_fd_watcher *watcher) {
+ uint32_t mask = 0;
+ grpc_closure *cur;
+ int requested;
+ /* keep track of pollers that have requested our events, in case they change
+ */
+ GRPC_FD_REF(fd, "poll");
+
+ gpr_mu_lock(&fd->mu);
+
+ /* if we are shutdown, then don't add to the watcher set */
+ if (fd->shutdown) {
+ watcher->fd = NULL;
+ watcher->pollset = NULL;
+ watcher->worker = NULL;
+ gpr_mu_unlock(&fd->mu);
+ GRPC_FD_UNREF(fd, "poll");
+ return 0;
+ }
+
+ /* if there is nobody polling for read, but we need to, then start doing so */
+ cur = fd->read_closure;
+ requested = cur != CLOSURE_READY;
+ if (read_mask && fd->read_watcher == NULL && requested) {
+ fd->read_watcher = watcher;
+ mask |= read_mask;
+ }
+ /* if there is nobody polling for write, but we need to, then start doing so
+ */
+ cur = fd->write_closure;
+ requested = cur != CLOSURE_READY;
+ if (write_mask && fd->write_watcher == NULL && requested) {
+ fd->write_watcher = watcher;
+ mask |= write_mask;
+ }
+ /* if not polling, remember this watcher in case we need someone to later */
+ if (mask == 0 && worker != NULL) {
+ watcher->next = &fd->inactive_watcher_root;
+ watcher->prev = watcher->next->prev;
+ watcher->next->prev = watcher->prev->next = watcher;
+ }
+ watcher->pollset = pollset;
+ watcher->worker = worker;
+ watcher->fd = fd;
+ gpr_mu_unlock(&fd->mu);
+
+ return mask;
+}
+
+static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
+ int got_read, int got_write,
+ grpc_pollset *read_notifier_pollset) {
+ int was_polling = 0;
+ int kick = 0;
+ grpc_fd *fd = watcher->fd;
+
+ if (fd == NULL) {
+ return;
+ }
+
+ gpr_mu_lock(&fd->mu);
+
+ if (watcher == fd->read_watcher) {
+ /* remove read watcher, kick if we still need a read */
+ was_polling = 1;
+ if (!got_read) {
+ kick = 1;
+ }
+ fd->read_watcher = NULL;
+ }
+ if (watcher == fd->write_watcher) {
+ /* remove write watcher, kick if we still need a write */
+ was_polling = 1;
+ if (!got_write) {
+ kick = 1;
+ }
+ fd->write_watcher = NULL;
+ }
+ if (!was_polling && watcher->worker != NULL) {
+ /* remove from inactive list */
+ watcher->next->prev = watcher->prev;
+ watcher->prev->next = watcher->next;
+ }
+ if (got_read) {
+ if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
+ kick = 1;
+ }
+ if (read_notifier_pollset != NULL) {
+ set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
+ }
+ }
+ if (got_write) {
+ if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
+ kick = 1;
+ }
+ }
+ if (kick) {
+ maybe_wake_one_watcher_locked(fd);
+ }
+ if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
+ close_fd_locked(exec_ctx, fd);
+ }
+ gpr_mu_unlock(&fd->mu);
+
+ GRPC_FD_UNREF(fd, "poll");
+}
+
+/*******************************************************************************
+ * pollset_posix.c
+ */
+
+GPR_TLS_DECL(g_current_thread_poller);
+GPR_TLS_DECL(g_current_thread_worker);
+
+static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev->next = worker->next;
+ worker->next->prev = worker->prev;
+}
+
+static int pollset_has_workers(grpc_pollset *p) {
+ return p->root_worker.next != &p->root_worker;
+}
+
+static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
+ if (pollset_has_workers(p)) {
+ grpc_pollset_worker *w = p->root_worker.next;
+ remove_worker(p, w);
+ return w;
+ } else {
+ return NULL;
+ }
+}
+
+static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->next = &p->root_worker;
+ worker->prev = worker->next->prev;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev = &p->root_worker;
+ worker->next = worker->prev->next;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void kick_append_error(grpc_error **composite, grpc_error *error) {
+ if (error == GRPC_ERROR_NONE) return;
+ if (*composite == GRPC_ERROR_NONE) {
+ *composite = GRPC_ERROR_CREATE("Kick Failure");
+ }
+ *composite = grpc_error_add_child(*composite, error);
+}
+
+static grpc_error *pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ uint32_t flags) {
+ GPR_TIMER_BEGIN("pollset_kick_ext", 0);
+ grpc_error *error = GRPC_ERROR_NONE;
+
+ /* pollset->mu already held */
+ if (specific_worker != NULL) {
+ if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+ GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
+ for (specific_worker = p->root_worker.next;
+ specific_worker != &p->root_worker;
+ specific_worker = specific_worker->next) {
+ kick_append_error(
+ &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
+ }
+ p->kicked_without_pollers = true;
+ GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
+ } else if (gpr_tls_get(&g_current_thread_worker) !=
+ (intptr_t)specific_worker) {
+ GPR_TIMER_MARK("different_thread_worker", 0);
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = true;
+ }
+ specific_worker->kicked_specifically = true;
+ kick_append_error(&error,
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
+ } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
+ GPR_TIMER_MARK("kick_yoself", 0);
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = true;
+ }
+ specific_worker->kicked_specifically = true;
+ kick_append_error(&error,
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
+ }
+ } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
+ GPR_TIMER_MARK("kick_anonymous", 0);
+ specific_worker = pop_front_worker(p);
+ if (specific_worker != NULL) {
+ if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
+ GPR_TIMER_MARK("kick_anonymous_not_self", 0);
+ push_back_worker(p, specific_worker);
+ specific_worker = pop_front_worker(p);
+ if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
+ gpr_tls_get(&g_current_thread_worker) ==
+ (intptr_t)specific_worker) {
+ push_back_worker(p, specific_worker);
+ specific_worker = NULL;
+ }
+ }
+ if (specific_worker != NULL) {
+ GPR_TIMER_MARK("finally_kick", 0);
+ push_back_worker(p, specific_worker);
+ kick_append_error(
+ &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
+ }
+ } else {
+ GPR_TIMER_MARK("kicked_no_pollers", 0);
+ p->kicked_without_pollers = true;
+ }
+ }
+
+ GPR_TIMER_END("pollset_kick_ext", 0);
+ GRPC_LOG_IF_ERROR("pollset_kick_ext", GRPC_ERROR_REF(error));
+ return error;
+}
+
+static grpc_error *pollset_kick(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker) {
+ return pollset_kick_ext(p, specific_worker, 0);
+}
+
+/* global state management */
+
+static grpc_error *pollset_global_init(void) {
+ gpr_tls_init(&g_current_thread_poller);
+ gpr_tls_init(&g_current_thread_worker);
+ return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+}
+
+static void pollset_global_shutdown(void) {
+ grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ gpr_tls_destroy(&g_current_thread_poller);
+ gpr_tls_destroy(&g_current_thread_worker);
+}
+
+static grpc_error *kick_poller(void) {
+ return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
+}
+
+/* main interface */
+
+static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
+ gpr_mu_init(&pollset->mu);
+ *mu = &pollset->mu;
+ pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
+ pollset->shutting_down = 0;
+ pollset->called_shutdown = 0;
+ pollset->kicked_without_pollers = 0;
+ pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
+ pollset->local_wakeup_cache = NULL;
+ pollset->kicked_without_pollers = 0;
+ pollset->fd_count = 0;
+ pollset->fd_capacity = 0;
+ pollset->fds = NULL;
+}
+
+static void pollset_destroy(grpc_pollset *pollset) {
+ GPR_ASSERT(!pollset_has_workers(pollset));
+ GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
+ while (pollset->local_wakeup_cache) {
+ grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
+ grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
+ gpr_free(pollset->local_wakeup_cache);
+ pollset->local_wakeup_cache = next;
+ }
+ gpr_free(pollset->fds);
+ gpr_mu_destroy(&pollset->mu);
+}
+
+static void pollset_reset(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->shutting_down);
+ GPR_ASSERT(!pollset_has_workers(pollset));
+ GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
+ GPR_ASSERT(pollset->fd_count == 0);
+ pollset->shutting_down = 0;
+ pollset->called_shutdown = 0;
+ pollset->kicked_without_pollers = 0;
+}
+
+static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd) {
+ gpr_mu_lock(&pollset->mu);
+ size_t i;
+ /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
+ for (i = 0; i < pollset->fd_count; i++) {
+ if (pollset->fds[i] == fd) goto exit;
+ }
+ if (pollset->fd_count == pollset->fd_capacity) {
+ pollset->fd_capacity =
+ GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
+ pollset->fds =
+ gpr_realloc(pollset->fds, sizeof(grpc_fd *) * pollset->fd_capacity);
+ }
+ pollset->fds[pollset->fd_count++] = fd;
+ GRPC_FD_REF(fd, "multipoller");
+ pollset_kick(pollset, NULL);
+exit:
+ gpr_mu_unlock(&pollset->mu);
+}
+
+static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
+ GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
+ size_t i;
+ for (i = 0; i < pollset->fd_count; i++) {
+ GRPC_FD_UNREF(pollset->fds[i], "multipoller");
+ }
+ pollset->fd_count = 0;
+ grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
+}
+
+static void work_combine_error(grpc_error **composite, grpc_error *error) {
+ if (error == GRPC_ERROR_NONE) return;
+ if (*composite == GRPC_ERROR_NONE) {
+ *composite = GRPC_ERROR_CREATE("pollset_work");
+ }
+ *composite = grpc_error_add_child(*composite, error);
+}
+
+static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker_hdl,
+ gpr_timespec now, gpr_timespec deadline) {
+ grpc_pollset_worker worker;
+ *worker_hdl = &worker;
+ grpc_error *error = GRPC_ERROR_NONE;
+
+ /* pollset->mu already held */
+ int added_worker = 0;
+ int locked = 1;
+ int queued_work = 0;
+ int keep_polling = 0;
+ GPR_TIMER_BEGIN("pollset_work", 0);
+ /* this must happen before we (potentially) drop pollset->mu */
+ worker.next = worker.prev = NULL;
+ worker.reevaluate_polling_on_wakeup = 0;
+ if (pollset->local_wakeup_cache != NULL) {
+ worker.wakeup_fd = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker.wakeup_fd->next;
+ } else {
+ worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
+ error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
+ return error;
+ }
+ }
+ worker.kicked_specifically = 0;
+ /* If there's work waiting for the pollset to be idle, and the
+ pollset is idle, then do that work */
+ if (!pollset_has_workers(pollset) &&
+ !grpc_closure_list_empty(pollset->idle_jobs)) {
+ GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ goto done;
+ }
+ /* If we're shutting down then we don't execute any extended work */
+ if (pollset->shutting_down) {
+ GPR_TIMER_MARK("pollset_work.shutting_down", 0);
+ goto done;
+ }
+ /* Start polling, and keep doing so while we're being asked to
+ re-evaluate our pollers (this allows poll() based pollers to
+ ensure they don't miss wakeups) */
+ keep_polling = 1;
+ gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
+ while (keep_polling) {
+ keep_polling = 0;
+ if (!pollset->kicked_without_pollers) {
+ if (!added_worker) {
+ push_front_worker(pollset, &worker);
+ added_worker = 1;
+ gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
+ }
+ GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
+#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
+#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
+
+ int timeout;
+ int r;
+ size_t i, fd_count;
+ nfds_t pfd_count;
+ /* TODO(ctiller): inline some elements to avoid an allocation */
+ grpc_fd_watcher *watchers;
+ struct pollfd *pfds;
+
+ timeout = poll_deadline_to_millis_timeout(deadline, now);
+ /* TODO(ctiller): perform just one malloc here if we exceed the inline
+ * case */
+ pfds = gpr_malloc(sizeof(*pfds) * (pollset->fd_count + 2));
+ watchers = gpr_malloc(sizeof(*watchers) * (pollset->fd_count + 2));
+ fd_count = 0;
+ pfd_count = 2;
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
+ pfds[0].events = POLLIN;
+ pfds[0].revents = 0;
+ pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
+ pfds[1].events = POLLIN;
+ pfds[1].revents = 0;
+ for (i = 0; i < pollset->fd_count; i++) {
+ if (fd_is_orphaned(pollset->fds[i])) {
+ GRPC_FD_UNREF(pollset->fds[i], "multipoller");
+ } else {
+ pollset->fds[fd_count++] = pollset->fds[i];
+ watchers[pfd_count].fd = pollset->fds[i];
+ GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
+ pfds[pfd_count].fd = pollset->fds[i]->fd;
+ pfds[pfd_count].revents = 0;
+ pfd_count++;
+ }
+ }
+ pollset->fd_count = fd_count;
+ gpr_mu_unlock(&pollset->mu);
+
+ for (i = 2; i < pfd_count; i++) {
+ grpc_fd *fd = watchers[i].fd;
+ pfds[i].events = (short)fd_begin_poll(fd, pollset, &worker, POLLIN,
+ POLLOUT, &watchers[i]);
+ GRPC_FD_UNREF(fd, "multipoller_start");
+ }
+
+ /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
+ even going into the blocking annotation if possible */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ r = grpc_poll_function(pfds, pfd_count, timeout);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+
+ if (r < 0) {
+ if (errno != EINTR) {
+ work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
+ }
+ for (i = 2; i < pfd_count; i++) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
+ }
+ } else if (r == 0) {
+ for (i = 2; i < pfd_count; i++) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
+ }
+ } else {
+ if (pfds[0].revents & POLLIN_CHECK) {
+ work_combine_error(
+ &error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
+ }
+ if (pfds[1].revents & POLLIN_CHECK) {
+ work_combine_error(
+ &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
+ }
+ for (i = 2; i < pfd_count; i++) {
+ if (watchers[i].fd == NULL) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
+ } else {
+ fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
+ pfds[i].revents & POLLOUT_CHECK, pollset);
+ }
+ }
+ }
+
+ gpr_free(pfds);
+ gpr_free(watchers);
+ GPR_TIMER_END("maybe_work_and_unlock", 0);
+ locked = 0;
+ } else {
+ GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
+ pollset->kicked_without_pollers = 0;
+ }
+ /* Finished execution - start cleaning up.
+ Note that we may arrive here from outside the enclosing while() loop.
+ In that case we won't loop though as we haven't added worker to the
+ worker list, which means nobody could ask us to re-evaluate polling). */
+ done:
+ if (!locked) {
+ queued_work |= grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ locked = 1;
+ }
+ /* If we're forced to re-evaluate polling (via pollset_kick with
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
+ a loop */
+ if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
+ worker.reevaluate_polling_on_wakeup = 0;
+ pollset->kicked_without_pollers = 0;
+ if (queued_work || worker.kicked_specifically) {
+ /* If there's queued work on the list, then set the deadline to be
+ immediate so we get back out of the polling loop quickly */
+ deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ }
+ keep_polling = 1;
+ }
+ if (keep_polling) {
+ now = gpr_now(now.clock_type);
+ }
+ }
+ gpr_tls_set(&g_current_thread_poller, 0);
+ if (added_worker) {
+ remove_worker(pollset, &worker);
+ gpr_tls_set(&g_current_thread_worker, 0);
+ }
+ /* release wakeup fd to the local pool */
+ worker.wakeup_fd->next = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker.wakeup_fd;
+ /* check shutdown conditions */
+ if (pollset->shutting_down) {
+ if (pollset_has_workers(pollset)) {
+ pollset_kick(pollset, NULL);
+ } else if (!pollset->called_shutdown) {
+ pollset->called_shutdown = 1;
+ gpr_mu_unlock(&pollset->mu);
+ finish_shutdown(exec_ctx, pollset);
+ grpc_exec_ctx_flush(exec_ctx);
+ /* Continuing to access pollset here is safe -- it is the caller's
+ * responsibility to not destroy when it has outstanding calls to
+ * pollset_work.
+ * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
+ gpr_mu_lock(&pollset->mu);
+ } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ gpr_mu_unlock(&pollset->mu);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ }
+ }
+ *worker_hdl = NULL;
+ GPR_TIMER_END("pollset_work", 0);
+ GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
+ return error;
+}
+
+static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_closure *closure) {
+ GPR_ASSERT(!pollset->shutting_down);
+ pollset->shutting_down = 1;
+ pollset->shutdown_done = closure;
+ pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
+ if (!pollset_has_workers(pollset)) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ }
+ if (!pollset->called_shutdown && !pollset_has_workers(pollset)) {
+ pollset->called_shutdown = 1;
+ finish_shutdown(exec_ctx, pollset);
+ }
+}
+
+static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
+ gpr_timespec now) {
+ gpr_timespec timeout;
+ static const int64_t max_spin_polling_us = 10;
+ if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
+ return -1;
+ }
+ if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
+ max_spin_polling_us,
+ GPR_TIMESPAN))) <= 0) {
+ return 0;
+ }
+ timeout = gpr_time_sub(deadline, now);
+ return gpr_time_to_millis(gpr_time_add(
+ timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
+}
+
+/*******************************************************************************
+ * pollset_set_posix.c
+ */
+
+static grpc_pollset_set *pollset_set_create(void) {
+ grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
+ memset(pollset_set, 0, sizeof(*pollset_set));
+ gpr_mu_init(&pollset_set->mu);
+ return pollset_set;
+}
+
+static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
+ size_t i;
+ gpr_mu_destroy(&pollset_set->mu);
+ for (i = 0; i < pollset_set->fd_count; i++) {
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
+ }
+ gpr_free(pollset_set->pollsets);
+ gpr_free(pollset_set->pollset_sets);
+ gpr_free(pollset_set->fds);
+ gpr_free(pollset_set);
+}
+
+static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset) {
+ size_t i, j;
+ gpr_mu_lock(&pollset_set->mu);
+ if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
+ pollset_set->pollset_capacity =
+ GPR_MAX(8, 2 * pollset_set->pollset_capacity);
+ pollset_set->pollsets =
+ gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
+ sizeof(*pollset_set->pollsets));
+ }
+ pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
+ for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
+ if (fd_is_orphaned(pollset_set->fds[i])) {
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
+ } else {
+ pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
+ pollset_set->fds[j++] = pollset_set->fds[i];
+ }
+ }
+ pollset_set->fd_count = j;
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ for (i = 0; i < pollset_set->pollset_count; i++) {
+ if (pollset_set->pollsets[i] == pollset) {
+ pollset_set->pollset_count--;
+ GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
+ pollset_set->pollsets[pollset_set->pollset_count]);
+ break;
+ }
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i, j;
+ gpr_mu_lock(&bag->mu);
+ if (bag->pollset_set_count == bag->pollset_set_capacity) {
+ bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
+ bag->pollset_sets =
+ gpr_realloc(bag->pollset_sets,
+ bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
+ }
+ bag->pollset_sets[bag->pollset_set_count++] = item;
+ for (i = 0, j = 0; i < bag->fd_count; i++) {
+ if (fd_is_orphaned(bag->fds[i])) {
+ GRPC_FD_UNREF(bag->fds[i], "pollset_set");
+ } else {
+ pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
+ bag->fds[j++] = bag->fds[i];
+ }
+ }
+ bag->fd_count = j;
+ gpr_mu_unlock(&bag->mu);
+}
+
+static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i;
+ gpr_mu_lock(&bag->mu);
+ for (i = 0; i < bag->pollset_set_count; i++) {
+ if (bag->pollset_sets[i] == item) {
+ bag->pollset_set_count--;
+ GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
+ bag->pollset_sets[bag->pollset_set_count]);
+ break;
+ }
+ }
+ gpr_mu_unlock(&bag->mu);
+}
+
+static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ if (pollset_set->fd_count == pollset_set->fd_capacity) {
+ pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
+ pollset_set->fds = gpr_realloc(
+ pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
+ }
+ GRPC_FD_REF(fd, "pollset_set");
+ pollset_set->fds[pollset_set->fd_count++] = fd;
+ for (i = 0; i < pollset_set->pollset_count; i++) {
+ pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
+ }
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ for (i = 0; i < pollset_set->fd_count; i++) {
+ if (pollset_set->fds[i] == fd) {
+ pollset_set->fd_count--;
+ GPR_SWAP(grpc_fd *, pollset_set->fds[i],
+ pollset_set->fds[pollset_set->fd_count]);
+ GRPC_FD_UNREF(fd, "pollset_set");
+ break;
+ }
+ }
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+/*******************************************************************************
+ * event engine binding
+ */
+
+static void shutdown_engine(void) { pollset_global_shutdown(); }
+
+static const grpc_event_engine_vtable vtable = {
+ .pollset_size = sizeof(grpc_pollset),
+
+ .fd_create = fd_create,
+ .fd_wrapped_fd = fd_wrapped_fd,
+ .fd_orphan = fd_orphan,
+ .fd_shutdown = fd_shutdown,
+ .fd_is_shutdown = fd_is_shutdown,
+ .fd_notify_on_read = fd_notify_on_read,
+ .fd_notify_on_write = fd_notify_on_write,
+ .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
+
+ .pollset_init = pollset_init,
+ .pollset_shutdown = pollset_shutdown,
+ .pollset_reset = pollset_reset,
+ .pollset_destroy = pollset_destroy,
+ .pollset_work = pollset_work,
+ .pollset_kick = pollset_kick,
+ .pollset_add_fd = pollset_add_fd,
+
+ .pollset_set_create = pollset_set_create,
+ .pollset_set_destroy = pollset_set_destroy,
+ .pollset_set_add_pollset = pollset_set_add_pollset,
+ .pollset_set_del_pollset = pollset_set_del_pollset,
+ .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
+ .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
+ .pollset_set_add_fd = pollset_set_add_fd,
+ .pollset_set_del_fd = pollset_set_del_fd,
+
+ .kick_poller = kick_poller,
+
+ .shutdown_engine = shutdown_engine,
+};
+
+const grpc_event_engine_vtable *grpc_init_poll_posix(void) {
+ if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
+ return NULL;
+ }
+ return &vtable;
+}
+
+#endif
diff --git a/src/core/lib/iomgr/ev_poll_posix.h b/src/core/lib/iomgr/ev_poll_posix.h
new file mode 100644
index 0000000000..291736a2db
--- /dev/null
+++ b/src/core/lib/iomgr/ev_poll_posix.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H
+#define GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H
+
+#include "src/core/lib/iomgr/ev_posix.h"
+
+const grpc_event_engine_vtable *grpc_init_poll_posix(void);
+
+#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 7df1751352..4c360f13a3 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -37,23 +37,104 @@
#include "src/core/lib/iomgr/ev_posix.h"
+#include <string.h>
+
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h"
+#include "src/core/lib/iomgr/ev_poll_posix.h"
+#include "src/core/lib/support/env.h"
+
+/** Default poll() function - a pointer so that it can be overridden by some
+ * tests */
+grpc_poll_function_type grpc_poll_function = poll;
static const grpc_event_engine_vtable *g_event_engine;
-grpc_poll_function_type grpc_poll_function = poll;
+typedef const grpc_event_engine_vtable *(*event_engine_factory_fn)(void);
+
+typedef struct {
+ const char *name;
+ event_engine_factory_fn factory;
+} event_engine_factory;
+
+static const event_engine_factory g_factories[] = {
+ {"poll", grpc_init_poll_posix}, {"legacy", grpc_init_poll_and_epoll_posix},
+};
+
+static void add(const char *beg, const char *end, char ***ss, size_t *ns) {
+ size_t n = *ns;
+ size_t np = n + 1;
+ char *s;
+ size_t len;
+ GPR_ASSERT(end >= beg);
+ len = (size_t)(end - beg);
+ s = gpr_malloc(len + 1);
+ memcpy(s, beg, len);
+ s[len] = 0;
+ *ss = gpr_realloc(*ss, sizeof(char **) * np);
+ (*ss)[n] = s;
+ *ns = np;
+}
+
+static void split(const char *s, char ***ss, size_t *ns) {
+ const char *c = strchr(s, ',');
+ if (c == NULL) {
+ add(s, s + strlen(s), ss, ns);
+ } else {
+ add(s, c, ss, ns);
+ split(c + 1, ss, ns);
+ }
+}
+
+static bool is(const char *want, const char *have) {
+ return 0 == strcmp(want, "all") || 0 == strcmp(want, have);
+}
+
+static void try_engine(const char *engine) {
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
+ if (is(engine, g_factories[i].name)) {
+ if ((g_event_engine = g_factories[i].factory())) {
+ gpr_log(GPR_DEBUG, "Using polling engine: %s", g_factories[i].name);
+ return;
+ }
+ }
+ }
+}
void grpc_event_engine_init(void) {
- if ((g_event_engine = grpc_init_poll_and_epoll_posix())) {
- return;
+ char *s = gpr_getenv("GRPC_POLL_STRATEGY");
+ if (s == NULL) {
+ s = gpr_strdup("all");
+ }
+
+ char **strings = NULL;
+ size_t nstrings = 0;
+ split(s, &strings, &nstrings);
+
+ for (size_t i = 0; g_event_engine == NULL && i < nstrings; i++) {
+ try_engine(strings[i]);
+ }
+
+ for (size_t i = 0; i < nstrings; i++) {
+ gpr_free(strings[i]);
+ }
+ gpr_free(strings);
+ gpr_free(s);
+
+ if (g_event_engine == NULL) {
+ gpr_log(GPR_ERROR, "No event engine could be initialized");
+ abort();
}
- gpr_log(GPR_ERROR, "No event engine could be initialized");
- abort();
}
-void grpc_event_engine_shutdown(void) { g_event_engine->shutdown_engine(); }
+void grpc_event_engine_shutdown(void) {
+ g_event_engine->shutdown_engine();
+ g_event_engine = NULL;
+}
grpc_fd *grpc_fd_create(int fd, const char *name) {
return g_event_engine->fd_create(fd, name);
@@ -72,6 +153,10 @@ void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
g_event_engine->fd_shutdown(exec_ctx, fd);
}
+bool grpc_fd_is_shutdown(grpc_fd *fd) {
+ return g_event_engine->fd_is_shutdown(fd);
+}
+
void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
g_event_engine->fd_notify_on_read(exec_ctx, fd, closure);
@@ -82,6 +167,11 @@ void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
g_event_engine->fd_notify_on_write(exec_ctx, fd, closure);
}
+grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd) {
+ return g_event_engine->fd_get_read_notifier_pollset(exec_ctx, fd);
+}
+
size_t grpc_pollset_size(void) { return g_event_engine->pollset_size; }
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
@@ -101,15 +191,15 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
g_event_engine->pollset_destroy(pollset);
}
-void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker, gpr_timespec now,
- gpr_timespec deadline) {
- g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline);
+grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker, gpr_timespec now,
+ gpr_timespec deadline) {
+ return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline);
}
-void grpc_pollset_kick(grpc_pollset *pollset,
- grpc_pollset_worker *specific_worker) {
- g_event_engine->pollset_kick(pollset, specific_worker);
+grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
+ grpc_pollset_worker *specific_worker) {
+ return g_event_engine->pollset_kick(pollset, specific_worker);
}
void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -159,6 +249,6 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
g_event_engine->pollset_set_del_fd(exec_ctx, pollset_set, fd);
}
-void grpc_kick_poller(void) { g_event_engine->kick_poller(); }
+grpc_error *grpc_kick_poller(void) { return g_event_engine->kick_poller(); }
#endif // GPR_POSIX_SOCKET
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 1fa9f5ef2d..87e9bc4d46 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -55,17 +55,20 @@ typedef struct grpc_event_engine_vtable {
grpc_closure *closure);
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
+ bool (*fd_is_shutdown)(grpc_fd *fd);
+ grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd);
void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu);
void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure);
void (*pollset_reset)(grpc_pollset *pollset);
void (*pollset_destroy)(grpc_pollset *pollset);
- void (*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker, gpr_timespec now,
- gpr_timespec deadline);
- void (*pollset_kick)(grpc_pollset *pollset,
- grpc_pollset_worker *specific_worker);
+ grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker, gpr_timespec now,
+ gpr_timespec deadline);
+ grpc_error *(*pollset_kick)(grpc_pollset *pollset,
+ grpc_pollset_worker *specific_worker);
void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
struct grpc_fd *fd);
@@ -88,7 +91,7 @@ typedef struct grpc_event_engine_vtable {
void (*pollset_set_del_fd)(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set, grpc_fd *fd);
- void (*kick_poller)(void);
+ grpc_error *(*kick_poller)(void);
void (*shutdown_engine)(void);
} grpc_event_engine_vtable;
@@ -114,7 +117,10 @@ int grpc_fd_wrapped_fd(grpc_fd *fd);
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
int *release_fd, const char *reason);
-/* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */
+/* Has grpc_fd_shutdown been called on an fd? */
+bool grpc_fd_is_shutdown(grpc_fd *fd);
+
+/* Cause any current and future callbacks to fail. */
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
/* Register read interest, causing read_cb to be called once when fd becomes
@@ -137,6 +143,10 @@ void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
+/* Return the read notifier pollset from the fd */
+grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd);
+
/* pollset_posix functions */
/* Add an fd to a pollset */
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 2146c7dd1f..c44aafcddf 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -39,6 +39,22 @@
#include "src/core/lib/profiling/timers.h"
+bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) {
+ if (!exec_ctx->cached_ready_to_finish) {
+ exec_ctx->cached_ready_to_finish = exec_ctx->check_ready_to_finish(
+ exec_ctx, exec_ctx->check_ready_to_finish_arg);
+ }
+ return exec_ctx->cached_ready_to_finish;
+}
+
+bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
+ return false;
+}
+
+bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
+ return true;
+}
+
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0;
@@ -47,11 +63,12 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
grpc_closure *c = exec_ctx->closure_list.head;
exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
while (c != NULL) {
- bool success = (bool)(c->final_data & 1);
- grpc_closure *next = (grpc_closure *)(c->final_data & ~(uintptr_t)1);
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error;
did_something = true;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0);
- c->cb(exec_ctx, c->cb_arg, success);
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0);
c = next;
}
@@ -61,14 +78,15 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
}
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
+ exec_ctx->cached_ready_to_finish = true;
grpc_exec_ctx_flush(exec_ctx);
}
-void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- bool success,
- grpc_workqueue *offload_target_or_null) {
+void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error,
+ grpc_workqueue *offload_target_or_null) {
GPR_ASSERT(offload_target_or_null == NULL);
- grpc_closure_list_add(&exec_ctx->closure_list, closure, success);
+ grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
}
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 976cc40347..38f27d9b13 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -53,6 +53,9 @@ typedef struct grpc_workqueue grpc_workqueue;
* - track a list of work that needs to be delayed until the top of the
* call stack (this provides a convenient mechanism to run callbacks
* without worrying about locking issues)
+ * - provide a decision maker (via grpc_exec_ctx_ready_to_finish) that provides
+ * signal as to whether a borrowed thread should continue to do work or
+ * should actively try to finish up and get this thread back to its owner
*
* CONVENTIONS:
* Instance of this must ALWAYS be constructed on the stack, never
@@ -63,18 +66,26 @@ typedef struct grpc_workqueue grpc_workqueue;
*/
struct grpc_exec_ctx {
grpc_closure_list closure_list;
+ bool cached_ready_to_finish;
+ void *check_ready_to_finish_arg;
+ bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
-#define GRPC_EXEC_CTX_INIT \
- { GRPC_CLOSURE_LIST_INIT }
+#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
+ { GRPC_CLOSURE_LIST_INIT, false, finish_check_arg, finish_check }
#else
struct grpc_exec_ctx {
- int unused;
+ bool cached_ready_to_finish;
+ void *check_ready_to_finish_arg;
+ bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
-#define GRPC_EXEC_CTX_INIT \
- { 0 }
+#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
+ { false, finish_check_arg, finish_check }
#endif
+#define GRPC_EXEC_CTX_INIT \
+ GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_never_ready_to_finish, NULL)
+
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.
* Returns true if work was performed, false otherwise. */
@@ -83,9 +94,17 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
* the instance is destroyed, or work may be lost. */
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
/** Add a closure to be executed at the next flush/finish point */
-void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- bool success,
- grpc_workqueue *offload_target_or_null);
+void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error,
+ grpc_workqueue *offload_target_or_null);
+/** Returns true if we'd like to leave this execution context as soon as
+ possible: useful for deciding whether to do something more or not depending
+ on outside context */
+bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx);
+/** A finish check that is never ready to finish */
+bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
+/** A finish check that is always ready to finish */
+bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
/** Add a list of closures to be executed at the next flush/finish point.
* Leaves \a list empty. */
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 36e22e4271..8d7535d6fe 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -112,10 +112,10 @@ static void maybe_spawn_locked() {
g_executor.pending_join = 1;
}
-void grpc_executor_enqueue(grpc_closure *closure, bool success) {
+void grpc_executor_push(grpc_closure *closure, grpc_error *error) {
gpr_mu_lock(&g_executor.mu);
if (g_executor.shutting_down == 0) {
- grpc_closure_list_add(&g_executor.closures, closure, success);
+ grpc_closure_list_append(&g_executor.closures, closure, error);
maybe_spawn_locked();
}
gpr_mu_unlock(&g_executor.mu);
diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h
index b7e6f51aa5..da9dcd07d0 100644
--- a/src/core/lib/iomgr/executor.h
+++ b/src/core/lib/iomgr/executor.h
@@ -45,7 +45,7 @@ void grpc_executor_init();
/** Enqueue \a closure for its eventual execution of \a f(arg) on a separate
* thread */
-void grpc_executor_enqueue(grpc_closure *closure, bool success);
+void grpc_executor_push(grpc_closure *closure, grpc_error *error);
/** Shutdown the executor, running all pending work as part of the call */
void grpc_executor_shutdown();
diff --git a/src/core/lib/iomgr/iocp_windows.c b/src/core/lib/iomgr/iocp_windows.c
index d46558ab1b..2532e52e48 100644
--- a/src/core/lib/iomgr/iocp_windows.c
+++ b/src/core/lib/iomgr/iocp_windows.c
@@ -39,7 +39,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/log_win32.h>
+#include <grpc/support/log_windows.h>
#include <grpc/support/thd.h>
#include "src/core/lib/iomgr/iocp_windows.h"
@@ -104,7 +104,6 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx,
} else if (overlapped == &socket->read_info.overlapped) {
info = &socket->read_info;
} else {
- gpr_log(GPR_ERROR, "Unknown IOCP operation");
abort();
}
success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
@@ -112,16 +111,7 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx,
info->bytes_transfered = bytes;
info->wsa_error = success ? 0 : WSAGetLastError();
GPR_ASSERT(overlapped == &info->overlapped);
- GPR_ASSERT(!info->has_pending_iocp);
- gpr_mu_lock(&socket->state_mu);
- if (info->closure) {
- closure = info->closure;
- info->closure = NULL;
- } else {
- info->has_pending_iocp = 1;
- }
- gpr_mu_unlock(&socket->state_mu);
- grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
+ grpc_socket_become_ready(exec_ctx, socket, info);
return GRPC_IOCP_WORK_WORK;
}
@@ -176,33 +166,4 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
GPR_ASSERT(ret == g_iocp);
}
-/* Calling notify_on_read or write means either of two things:
- -) The IOCP already completed in the background, and we need to call
- the callback now.
- -) The IOCP hasn't completed yet, and we're queuing it for later. */
-static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
- grpc_winsocket *socket, grpc_closure *closure,
- grpc_winsocket_callback_info *info) {
- GPR_ASSERT(info->closure == NULL);
- gpr_mu_lock(&socket->state_mu);
- if (info->has_pending_iocp) {
- info->has_pending_iocp = 0;
- grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
- } else {
- info->closure = closure;
- }
- gpr_mu_unlock(&socket->state_mu);
-}
-
-void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
- grpc_winsocket *socket,
- grpc_closure *closure) {
- socket_notify_on_iocp(exec_ctx, socket, closure, &socket->write_info);
-}
-
-void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
- grpc_closure *closure) {
- socket_notify_on_iocp(exec_ctx, socket, closure, &socket->read_info);
-}
-
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/iocp_windows.h b/src/core/lib/iomgr/iocp_windows.h
index ae210fa7d7..011fee38ff 100644
--- a/src/core/lib/iomgr/iocp_windows.h
+++ b/src/core/lib/iomgr/iocp_windows.h
@@ -52,12 +52,4 @@ void grpc_iocp_flush(void);
void grpc_iocp_shutdown(void);
void grpc_iocp_add_socket(grpc_winsocket *);
-void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
- grpc_winsocket *winsocket,
- grpc_closure *closure);
-
-void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx,
- grpc_winsocket *winsocket,
- grpc_closure *closure);
-
#endif /* GRPC_CORE_LIB_IOMGR_IOCP_WINDOWS_H */
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 60cef8ba77..89292a153e 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -96,7 +96,8 @@ void grpc_iomgr_shutdown(void) {
gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_warning_time),
gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
if (g_root_object.next != &g_root_object) {
- gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed",
+ gpr_log(GPR_DEBUG,
+ "Waiting for %" PRIuPTR " iomgr objects to be destroyed",
count_objects());
}
last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
@@ -114,9 +115,9 @@ void grpc_iomgr_shutdown(void) {
if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline)) {
if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) {
if (g_root_object.next != &g_root_object) {
- gpr_log(GPR_DEBUG,
- "Failed to free %d iomgr objects before shutdown deadline: "
- "memory leaks are likely",
+ gpr_log(GPR_DEBUG, "Failed to free %" PRIuPTR
+ " iomgr objects before shutdown deadline: "
+ "memory leaks are likely",
count_objects());
dump_objects("LEAKED");
if (grpc_iomgr_abort_on_leaks()) {
diff --git a/src/core/lib/iomgr/iomgr_posix.c b/src/core/lib/iomgr/iomgr_posix.c
index 016c501f75..cede97f4c6 100644
--- a/src/core/lib/iomgr/iomgr_posix.c
+++ b/src/core/lib/iomgr/iomgr_posix.c
@@ -41,12 +41,16 @@
#include "src/core/lib/iomgr/tcp_posix.h"
void grpc_iomgr_platform_init(void) {
+ grpc_wakeup_fd_global_init();
grpc_event_engine_init();
grpc_register_tracer("tcp", &grpc_tcp_trace);
}
void grpc_iomgr_platform_flush(void) {}
-void grpc_iomgr_platform_shutdown(void) { grpc_event_engine_shutdown(); }
+void grpc_iomgr_platform_shutdown(void) {
+ grpc_event_engine_shutdown();
+ grpc_wakeup_fd_global_destroy();
+}
#endif /* GRPC_POSIX_SOCKET */
diff --git a/src/core/lib/iomgr/iomgr_windows.c b/src/core/lib/iomgr/iomgr_windows.c
index 398517fc75..7653f6e635 100644
--- a/src/core/lib/iomgr/iomgr_windows.c
+++ b/src/core/lib/iomgr/iomgr_windows.c
@@ -35,7 +35,7 @@
#ifdef GPR_WINSOCK_SOCKET
-#include "src/core/lib/iomgr/sockaddr_win32.h"
+#include "src/core/lib/iomgr/sockaddr_windows.h"
#include <grpc/support/log.h>
diff --git a/src/core/lib/iomgr/load_file.c b/src/core/lib/iomgr/load_file.c
new file mode 100644
index 0000000000..b62ecbc534
--- /dev/null
+++ b/src/core/lib/iomgr/load_file.c
@@ -0,0 +1,89 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/load_file.h"
+
+#include <errno.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/support/block_annotate.h"
+#include "src/core/lib/support/string.h"
+
+grpc_error *grpc_load_file(const char *filename, int add_null_terminator,
+ gpr_slice *output) {
+ unsigned char *contents = NULL;
+ size_t contents_size = 0;
+ gpr_slice result = gpr_empty_slice();
+ FILE *file;
+ size_t bytes_read = 0;
+ grpc_error *error = GRPC_ERROR_NONE;
+
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ file = fopen(filename, "rb");
+ if (file == NULL) {
+ error = GRPC_OS_ERROR(errno, "fopen");
+ goto end;
+ }
+ fseek(file, 0, SEEK_END);
+ /* Converting to size_t on the assumption that it will not fail */
+ contents_size = (size_t)ftell(file);
+ fseek(file, 0, SEEK_SET);
+ contents = gpr_malloc(contents_size + (add_null_terminator ? 1 : 0));
+ bytes_read = fread(contents, 1, contents_size, file);
+ if (bytes_read < contents_size) {
+ error = GRPC_OS_ERROR(errno, "fread");
+ GPR_ASSERT(ferror(file));
+ goto end;
+ }
+ if (add_null_terminator) {
+ contents[contents_size++] = 0;
+ }
+ result = gpr_slice_new(contents, contents_size, gpr_free);
+
+end:
+ *output = result;
+ if (file != NULL) fclose(file);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error *error_out = grpc_error_set_str(
+ GRPC_ERROR_CREATE_REFERENCING("Failed to load file", &error, 1),
+ GRPC_ERROR_STR_FILENAME, filename);
+ GRPC_ERROR_UNREF(error);
+ error = error_out;
+ }
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+ return error;
+}
diff --git a/src/core/lib/iomgr/load_file.h b/src/core/lib/iomgr/load_file.h
new file mode 100644
index 0000000000..9aac2225d1
--- /dev/null
+++ b/src/core/lib/iomgr/load_file.h
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_LOAD_FILE_H
+#define GRPC_CORE_LIB_IOMGR_LOAD_FILE_H
+
+#include <stdio.h>
+
+#include <grpc/support/slice.h>
+
+#include "src/core/lib/iomgr/error.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* Loads the content of a file into a slice. add_null_terminator will add
+ a NULL terminator if non-zero. */
+grpc_error *grpc_load_file(const char *filename, int add_null_terminator,
+ gpr_slice *slice);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* GRPC_CORE_LIB_IOMGR_LOAD_FILE_H */
diff --git a/src/core/lib/iomgr/polling_entity.c b/src/core/lib/iomgr/polling_entity.c
new file mode 100644
index 0000000000..d1686aa12f
--- /dev/null
+++ b/src/core/lib/iomgr/polling_entity.c
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/iomgr/polling_entity.h"
+
+grpc_polling_entity grpc_polling_entity_create_from_pollset_set(
+ grpc_pollset_set *pollset_set) {
+ grpc_polling_entity pollent;
+ pollent.pollent.pollset_set = pollset_set;
+ pollent.tag = POPS_POLLSET_SET;
+ return pollent;
+}
+
+grpc_polling_entity grpc_polling_entity_create_from_pollset(
+ grpc_pollset *pollset) {
+ grpc_polling_entity pollent;
+ pollent.pollent.pollset = pollset;
+ pollent.tag = POPS_POLLSET;
+ return pollent;
+}
+
+grpc_pollset *grpc_polling_entity_pollset(grpc_polling_entity *pollent) {
+ if (pollent->tag == POPS_POLLSET) {
+ return pollent->pollent.pollset;
+ }
+ return NULL;
+}
+
+grpc_pollset_set *grpc_polling_entity_pollset_set(
+ grpc_polling_entity *pollent) {
+ if (pollent->tag == POPS_POLLSET_SET) {
+ return pollent->pollent.pollset_set;
+ }
+ return NULL;
+}
+
+bool grpc_polling_entity_is_empty(const grpc_polling_entity *pollent) {
+ return pollent->tag == POPS_NONE;
+}
+
+void grpc_polling_entity_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_polling_entity *pollent,
+ grpc_pollset_set *pss_dst) {
+ if (pollent->tag == POPS_POLLSET) {
+ GPR_ASSERT(pollent->pollent.pollset != NULL);
+ grpc_pollset_set_add_pollset(exec_ctx, pss_dst, pollent->pollent.pollset);
+ } else if (pollent->tag == POPS_POLLSET_SET) {
+ GPR_ASSERT(pollent->pollent.pollset_set != NULL);
+ grpc_pollset_set_add_pollset_set(exec_ctx, pss_dst,
+ pollent->pollent.pollset_set);
+ } else {
+ gpr_log(GPR_ERROR, "Invalid grpc_polling_entity tag '%d'", pollent->tag);
+ abort();
+ }
+}
+
+void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_polling_entity *pollent,
+ grpc_pollset_set *pss_dst) {
+ if (pollent->tag == POPS_POLLSET) {
+ GPR_ASSERT(pollent->pollent.pollset != NULL);
+ grpc_pollset_set_del_pollset(exec_ctx, pss_dst, pollent->pollent.pollset);
+ } else if (pollent->tag == POPS_POLLSET_SET) {
+ GPR_ASSERT(pollent->pollent.pollset_set != NULL);
+ grpc_pollset_set_del_pollset_set(exec_ctx, pss_dst,
+ pollent->pollent.pollset_set);
+ } else {
+ gpr_log(GPR_ERROR, "Invalid grpc_polling_entity tag '%d'", pollent->tag);
+ abort();
+ }
+}
diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h
new file mode 100644
index 0000000000..e81531053c
--- /dev/null
+++ b/src/core/lib/iomgr/polling_entity.h
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H
+#define GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H
+
+#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/pollset_set.h"
+
+/* A grpc_polling_entity is a pollset-or-pollset_set container. It allows
+ * functions that
+ * accept a pollset XOR a pollset_set to do so through an abstract interface.
+ * No ownership is taken. */
+
+typedef struct grpc_polling_entity {
+ union {
+ grpc_pollset *pollset;
+ grpc_pollset_set *pollset_set;
+ } pollent;
+ enum pops_tag { POPS_NONE, POPS_POLLSET, POPS_POLLSET_SET } tag;
+} grpc_polling_entity;
+
+grpc_polling_entity grpc_polling_entity_create_from_pollset_set(
+ grpc_pollset_set *pollset_set);
+grpc_polling_entity grpc_polling_entity_create_from_pollset(
+ grpc_pollset *pollset);
+
+/** If \a pollent contains a pollset, return it. Otherwise, return NULL */
+grpc_pollset *grpc_polling_entity_pollset(grpc_polling_entity *pollent);
+
+/** If \a pollent contains a pollset_set, return it. Otherwise, return NULL */
+grpc_pollset_set *grpc_polling_entity_pollset_set(grpc_polling_entity *pollent);
+
+bool grpc_polling_entity_is_empty(const grpc_polling_entity *pollent);
+
+/** Add the pollset or pollset_set in \a pollent to the destination pollset_set
+ * \a
+ * pss_dst */
+void grpc_polling_entity_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_polling_entity *pollent,
+ grpc_pollset_set *pss_dst);
+
+/** Delete the pollset or pollset_set in \a pollent from the destination
+ * pollset_set \a
+ * pss_dst */
+void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_polling_entity *pollent,
+ grpc_pollset_set *pss_dst);
+/* pollset_set specific */
+
+#endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */
diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h
index c40a474877..8d9edc8406 100644
--- a/src/core/lib/iomgr/pollset.h
+++ b/src/core/lib/iomgr/pollset.h
@@ -81,14 +81,15 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
May call grpc_closure_list_run on grpc_closure_list, without holding the
pollset
lock */
-void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker, gpr_timespec now,
- gpr_timespec deadline);
+grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker, gpr_timespec now,
+ gpr_timespec deadline) GRPC_MUST_USE_RESULT;
/* Break one polling thread out of polling work for this pollset.
If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.
Otherwise, if specific_worker is non-NULL, then kick that worker. */
-void grpc_pollset_kick(grpc_pollset *pollset,
- grpc_pollset_worker *specific_worker);
+grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
+ grpc_pollset_worker *specific_worker)
+ GRPC_MUST_USE_RESULT;
#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_H */
diff --git a/src/core/lib/iomgr/pollset_set_windows.c b/src/core/lib/iomgr/pollset_set_windows.c
index 89f60b92fb..a35a9766fc 100644
--- a/src/core/lib/iomgr/pollset_set_windows.c
+++ b/src/core/lib/iomgr/pollset_set_windows.c
@@ -32,12 +32,15 @@
*/
#include <grpc/support/port_platform.h>
+#include <stdint.h>
#ifdef GPR_WINSOCK_SOCKET
#include "src/core/lib/iomgr/pollset_set_windows.h"
-grpc_pollset_set* grpc_pollset_set_create(void) { return NULL; }
+grpc_pollset_set* grpc_pollset_set_create(void) {
+ return (grpc_pollset_set*)((intptr_t)0xdeafbeef);
+}
void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {}
diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c
index bff5c586f8..626dd784b3 100644
--- a/src/core/lib/iomgr/pollset_windows.c
+++ b/src/core/lib/iomgr/pollset_windows.c
@@ -109,7 +109,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
- grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
} else {
pollset->on_shutdown = closure;
}
@@ -127,9 +127,9 @@ void grpc_pollset_reset(grpc_pollset *pollset) {
pollset->on_shutdown = NULL;
}
-void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker_hdl, gpr_timespec now,
- gpr_timespec deadline) {
+grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker_hdl,
+ gpr_timespec now, gpr_timespec deadline) {
grpc_pollset_worker worker;
*worker_hdl = &worker;
@@ -167,7 +167,8 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
if (pollset->shutting_down && pollset->on_shutdown != NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE,
+ NULL);
pollset->on_shutdown = NULL;
}
goto done;
@@ -197,9 +198,11 @@ done:
}
gpr_cv_destroy(&worker.cv);
*worker_hdl = NULL;
+ return GRPC_ERROR_NONE;
}
-void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+grpc_error *grpc_pollset_kick(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker) {
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
for (specific_worker =
@@ -233,6 +236,7 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
p->kicked_without_pollers = 1;
}
}
+ return GRPC_ERROR_NONE;
}
void grpc_kick_poller(void) { grpc_iocp_kick(); }
diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h
index ef198fe0f6..ddbe375755 100644
--- a/src/core/lib/iomgr/resolve_address.h
+++ b/src/core/lib/iomgr/resolve_address.h
@@ -50,24 +50,20 @@ typedef struct {
grpc_resolved_address *addrs;
} grpc_resolved_addresses;
-/* Async result callback:
- On success: addresses is the result, and the callee must call
- grpc_resolved_addresses_destroy when it's done with them
- On failure: addresses is NULL */
-typedef void (*grpc_resolve_cb)(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_resolved_addresses *addresses);
/* Asynchronously resolve addr. Use default_port if a port isn't designated
in addr, otherwise use the port in addr. */
/* TODO(ctiller): add a timeout here */
extern void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *addr,
const char *default_port,
- grpc_resolve_cb cb, void *arg);
+ grpc_closure *on_done,
+ grpc_resolved_addresses **addresses);
/* Destroy resolved addresses */
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses);
/* Resolve addr in a blocking fashion. Returns NULL on failure. On success,
result must be freed with grpc_resolved_addresses_destroy. */
-extern grpc_resolved_addresses *(*grpc_blocking_resolve_address)(
- const char *name, const char *default_port);
+extern grpc_error *(*grpc_blocking_resolve_address)(
+ const char *name, const char *default_port,
+ grpc_resolved_addresses **addresses);
#endif /* GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H */
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index cae91eec20..4e9f978584 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -54,38 +54,33 @@
#include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/string.h"
-typedef struct {
- char *name;
- char *default_port;
- grpc_resolve_cb cb;
- grpc_closure request_closure;
- void *arg;
-} request;
-
-static grpc_resolved_addresses *blocking_resolve_address_impl(
- const char *name, const char *default_port) {
+static grpc_error *blocking_resolve_address_impl(
+ const char *name, const char *default_port,
+ grpc_resolved_addresses **addresses) {
struct addrinfo hints;
struct addrinfo *result = NULL, *resp;
char *host;
char *port;
int s;
size_t i;
- grpc_resolved_addresses *addrs = NULL;
+ grpc_error *err;
if (name[0] == 'u' && name[1] == 'n' && name[2] == 'i' && name[3] == 'x' &&
name[4] == ':' && name[5] != 0) {
- return grpc_resolve_unix_domain_address(name + 5);
+ return grpc_resolve_unix_domain_address(name + 5, addresses);
}
/* parse name, splitting it into host and port parts */
gpr_split_host_port(name, &host, &port);
if (host == NULL) {
- gpr_log(GPR_ERROR, "unparseable host:port: '%s'", name);
+ err = grpc_error_set_str(GRPC_ERROR_CREATE("unparseable host:port"),
+ GRPC_ERROR_STR_TARGET_ADDRESS, name);
goto done;
}
if (port == NULL) {
if (default_port == NULL) {
- gpr_log(GPR_ERROR, "no port in name '%s'", name);
+ err = grpc_error_set_str(GRPC_ERROR_CREATE("no port in name"),
+ GRPC_ERROR_STR_TARGET_ADDRESS, name);
goto done;
}
port = gpr_strdup(default_port);
@@ -115,23 +110,31 @@ static grpc_resolved_addresses *blocking_resolve_address_impl(
}
if (s != 0) {
- gpr_log(GPR_ERROR, "getaddrinfo: %s", gai_strerror(s));
+ err = grpc_error_set_str(
+ grpc_error_set_str(
+ grpc_error_set_str(grpc_error_set_int(GRPC_ERROR_CREATE("OS Error"),
+ GRPC_ERROR_INT_ERRNO, s),
+ GRPC_ERROR_STR_OS_ERROR, gai_strerror(s)),
+ GRPC_ERROR_STR_SYSCALL, "getaddrinfo"),
+ GRPC_ERROR_STR_TARGET_ADDRESS, name);
goto done;
}
/* Success path: set addrs non-NULL, fill it in */
- addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
- addrs->naddrs = 0;
+ *addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
+ (*addresses)->naddrs = 0;
for (resp = result; resp != NULL; resp = resp->ai_next) {
- addrs->naddrs++;
+ (*addresses)->naddrs++;
}
- addrs->addrs = gpr_malloc(sizeof(grpc_resolved_address) * addrs->naddrs);
+ (*addresses)->addrs =
+ gpr_malloc(sizeof(grpc_resolved_address) * (*addresses)->naddrs);
i = 0;
for (resp = result; resp != NULL; resp = resp->ai_next) {
- memcpy(&addrs->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
- addrs->addrs[i].len = resp->ai_addrlen;
+ memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
+ (*addresses)->addrs[i].len = resp->ai_addrlen;
i++;
}
+ err = GRPC_ERROR_NONE;
done:
gpr_free(host);
@@ -139,45 +142,59 @@ done:
if (result) {
freeaddrinfo(result);
}
- return addrs;
+ return err;
}
-grpc_resolved_addresses *(*grpc_blocking_resolve_address)(
- const char *name, const char *default_port) = blocking_resolve_address_impl;
+grpc_error *(*grpc_blocking_resolve_address)(
+ const char *name, const char *default_port,
+ grpc_resolved_addresses **addresses) = blocking_resolve_address_impl;
+
+typedef struct {
+ char *name;
+ char *default_port;
+ grpc_closure *on_done;
+ grpc_resolved_addresses **addrs_out;
+ grpc_closure request_closure;
+ void *arg;
+} request;
/* Callback to be passed to grpc_executor to asynch-ify
* grpc_blocking_resolve_address */
-static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) {
+static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp,
+ grpc_error *error) {
request *r = rp;
- grpc_resolved_addresses *resolved =
- grpc_blocking_resolve_address(r->name, r->default_port);
- void *arg = r->arg;
- grpc_resolve_cb cb = r->cb;
+ grpc_exec_ctx_sched(
+ exec_ctx, r->on_done,
+ grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out),
+ NULL);
gpr_free(r->name);
gpr_free(r->default_port);
- cb(exec_ctx, arg, resolved);
gpr_free(r);
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
- gpr_free(addrs->addrs);
+ if (addrs != NULL) {
+ gpr_free(addrs->addrs);
+ }
gpr_free(addrs);
}
static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
- const char *default_port, grpc_resolve_cb cb,
- void *arg) {
+ const char *default_port,
+ grpc_closure *on_done,
+ grpc_resolved_addresses **addrs) {
request *r = gpr_malloc(sizeof(request));
grpc_closure_init(&r->request_closure, do_request_thread, r);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
- r->cb = cb;
- r->arg = arg;
- grpc_executor_enqueue(&r->request_closure, 1);
+ r->on_done = on_done;
+ r->addrs_out = addrs;
+ grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE);
}
void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
- const char *default_port, grpc_resolve_cb cb,
- void *arg) = resolve_address_impl;
+ const char *default_port, grpc_closure *on_done,
+ grpc_resolved_addresses **addrs) =
+ resolve_address_impl;
#endif
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index 914736234d..2af8af82dc 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -43,7 +43,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
-#include <grpc/support/log_win32.h>
+#include <grpc/support/log_windows.h>
#include <grpc/support/string_util.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
@@ -56,30 +56,37 @@
typedef struct {
char *name;
char *default_port;
- grpc_resolve_cb cb;
grpc_closure request_closure;
- void *arg;
+ grpc_closure *on_done;
+ grpc_resolved_addresses **addresses;
} request;
-static grpc_resolved_addresses *blocking_resolve_address_impl(
- const char *name, const char *default_port) {
+static grpc_error *blocking_resolve_address_impl(
+ const char *name, const char *default_port,
+ grpc_resolved_addresses **addresses) {
struct addrinfo hints;
struct addrinfo *result = NULL, *resp;
char *host;
char *port;
int s;
size_t i;
- grpc_resolved_addresses *addrs = NULL;
+ grpc_error *error = GRPC_ERROR_NONE;
/* parse name, splitting it into host and port parts */
gpr_split_host_port(name, &host, &port);
if (host == NULL) {
- gpr_log(GPR_ERROR, "unparseable host:port: '%s'", name);
+ char *msg;
+ gpr_asprintf(&msg, "unparseable host:port: '%s'", name);
+ error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
goto done;
}
if (port == NULL) {
if (default_port == NULL) {
- gpr_log(GPR_ERROR, "no port in name '%s'", name);
+ char *msg;
+ gpr_asprintf(&msg, "no port in name '%s'", name);
+ error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
goto done;
}
port = gpr_strdup(default_port);
@@ -95,31 +102,30 @@ static grpc_resolved_addresses *blocking_resolve_address_impl(
s = getaddrinfo(host, port, &hints, &result);
GRPC_SCHEDULING_END_BLOCKING_REGION;
if (s != 0) {
- char *error_message = gpr_format_message(s);
- gpr_log(GPR_ERROR, "getaddrinfo: %s", error_message);
- gpr_free(error_message);
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "getaddrinfo");
goto done;
}
/* Success path: set addrs non-NULL, fill it in */
- addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
- addrs->naddrs = 0;
+ (*addresses) = gpr_malloc(sizeof(grpc_resolved_addresses));
+ (*addresses)->naddrs = 0;
for (resp = result; resp != NULL; resp = resp->ai_next) {
- addrs->naddrs++;
+ (*addresses)->naddrs++;
}
- addrs->addrs = gpr_malloc(sizeof(grpc_resolved_address) * addrs->naddrs);
+ (*addresses)->addrs =
+ gpr_malloc(sizeof(grpc_resolved_address) * (*addresses)->naddrs);
i = 0;
for (resp = result; resp != NULL; resp = resp->ai_next) {
- memcpy(&addrs->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
- addrs->addrs[i].len = resp->ai_addrlen;
+ memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
+ (*addresses)->addrs[i].len = resp->ai_addrlen;
i++;
}
{
- for (i = 0; i < addrs->naddrs; i++) {
+ for (i = 0; i < (*addresses)->naddrs; i++) {
char *buf;
- grpc_sockaddr_to_string(&buf, (struct sockaddr *)&addrs->addrs[i].addr,
- 0);
+ grpc_sockaddr_to_string(
+ &buf, (struct sockaddr *)&(*addresses)->addrs[i].addr, 0);
gpr_free(buf);
}
}
@@ -130,45 +136,53 @@ done:
if (result) {
freeaddrinfo(result);
}
- return addrs;
+ return error;
}
-grpc_resolved_addresses *(*grpc_blocking_resolve_address)(
- const char *name, const char *default_port) = blocking_resolve_address_impl;
+grpc_error *(*grpc_blocking_resolve_address)(
+ const char *name, const char *default_port,
+ grpc_resolved_addresses **addresses) = blocking_resolve_address_impl;
/* Callback to be passed to grpc_executor to asynch-ify
* grpc_blocking_resolve_address */
-static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) {
+static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp,
+ grpc_error *error) {
request *r = rp;
- grpc_resolved_addresses *resolved =
- grpc_blocking_resolve_address(r->name, r->default_port);
- void *arg = r->arg;
- grpc_resolve_cb cb = r->cb;
+ if (error == GRPC_ERROR_NONE) {
+ error =
+ grpc_blocking_resolve_address(r->name, r->default_port, r->addresses);
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ grpc_exec_ctx_sched(exec_ctx, r->on_done, error, NULL);
gpr_free(r->name);
gpr_free(r->default_port);
- cb(exec_ctx, arg, resolved);
gpr_free(r);
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
- gpr_free(addrs->addrs);
+ if (addrs != NULL) {
+ gpr_free(addrs->addrs);
+ }
gpr_free(addrs);
}
static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
- const char *default_port, grpc_resolve_cb cb,
- void *arg) {
+ const char *default_port,
+ grpc_closure *on_done,
+ grpc_resolved_addresses **addresses) {
request *r = gpr_malloc(sizeof(request));
grpc_closure_init(&r->request_closure, do_request_thread, r);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
- r->cb = cb;
- r->arg = arg;
- grpc_executor_enqueue(&r->request_closure, 1);
+ r->on_done = on_done;
+ r->addresses = addresses;
+ grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE);
}
void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
- const char *default_port, grpc_resolve_cb cb,
- void *arg) = resolve_address_impl;
+ const char *default_port, grpc_closure *on_done,
+ grpc_resolved_addresses **addresses) =
+ resolve_address_impl;
#endif
diff --git a/src/core/lib/iomgr/sockaddr.h b/src/core/lib/iomgr/sockaddr.h
index 891a2f094f..5563d0b8a6 100644
--- a/src/core/lib/iomgr/sockaddr.h
+++ b/src/core/lib/iomgr/sockaddr.h
@@ -36,8 +36,8 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_WIN32
-#include "src/core/lib/iomgr/sockaddr_win32.h"
+#ifdef GPR_WINDOWS
+#include "src/core/lib/iomgr/sockaddr_windows.h"
#endif
#ifdef GPR_POSIX_SOCKETADDR
diff --git a/src/core/lib/iomgr/sockaddr_win32.h b/src/core/lib/iomgr/sockaddr_windows.h
index 02aeae7619..971db5b32b 100644
--- a/src/core/lib/iomgr/sockaddr_win32.h
+++ b/src/core/lib/iomgr/sockaddr_windows.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_IOMGR_SOCKADDR_WIN32_H
-#define GRPC_CORE_LIB_IOMGR_SOCKADDR_WIN32_H
+#ifndef GRPC_CORE_LIB_IOMGR_SOCKADDR_WINDOWS_H
+#define GRPC_CORE_LIB_IOMGR_SOCKADDR_WINDOWS_H
#include <winsock2.h>
#include <ws2tcpip.h>
@@ -40,4 +40,4 @@
// must be included after the above
#include <mswsock.h>
-#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_WIN32_H */
+#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_WINDOWS_H */
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c
index fa83ceef30..3a1371617e 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.c
+++ b/src/core/lib/iomgr/socket_utils_common_posix.c
@@ -49,6 +49,7 @@
#include <sys/types.h>
#include <unistd.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
@@ -57,10 +58,10 @@
#include "src/core/lib/support/string.h"
/* set a socket to non blocking mode */
-int grpc_set_socket_nonblocking(int fd, int non_blocking) {
+grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking) {
int oldflags = fcntl(fd, F_GETFL, 0);
if (oldflags < 0) {
- return 0;
+ return GRPC_OS_ERROR(errno, "fcntl");
}
if (non_blocking) {
@@ -70,52 +71,71 @@ int grpc_set_socket_nonblocking(int fd, int non_blocking) {
}
if (fcntl(fd, F_SETFL, oldflags) != 0) {
- return 0;
+ return GRPC_OS_ERROR(errno, "fcntl");
}
- return 1;
+ return GRPC_ERROR_NONE;
}
-int grpc_set_socket_no_sigpipe_if_possible(int fd) {
+grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd) {
#ifdef GPR_HAVE_SO_NOSIGPIPE
int val = 1;
int newval;
socklen_t intlen = sizeof(newval);
- return 0 == setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) &&
- 0 == getsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen) &&
- (newval != 0) == val;
-#else
- return 1;
+ if (0 != setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) {
+ return GRPC_OS_ERROR(errno, "setsockopt(SO_NOSIGPIPE)");
+ }
+ if (0 != getsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) {
+ return GRPC_OS_ERROR(errno, "getsockopt(SO_NOSIGPIPE)");
+ }
+ if ((newval != 0) != (val != 0)) {
+ return GRPC_ERROR_CREATE("Failed to set SO_NOSIGPIPE");
+ }
#endif
+ return GRPC_ERROR_NONE;
}
-int grpc_set_socket_ip_pktinfo_if_possible(int fd) {
+grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd) {
#ifdef GPR_HAVE_IP_PKTINFO
int get_local_ip = 1;
- return 0 == setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
- sizeof(get_local_ip));
-#else
- (void)fd;
- return 1;
+ if (0 != setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
+ sizeof(get_local_ip))) {
+ return GRPC_OS_ERROR(errno, "setsockopt(IP_PKTINFO)");
+ }
#endif
+ return GRPC_ERROR_NONE;
}
-int grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) {
+grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) {
#ifdef GPR_HAVE_IPV6_RECVPKTINFO
int get_local_ip = 1;
- return 0 == setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
- sizeof(get_local_ip));
-#else
- (void)fd;
- return 1;
+ if (0 != setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
+ sizeof(get_local_ip))) {
+ return GRPC_OS_ERROR(errno, "setsockopt(IPV6_RECVPKTINFO)");
+ }
#endif
+ return GRPC_ERROR_NONE;
+}
+
+grpc_error *grpc_set_socket_sndbuf(int fd, int buffer_size_bytes) {
+ return 0 == setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes,
+ sizeof(buffer_size_bytes))
+ ? GRPC_ERROR_NONE
+ : GRPC_OS_ERROR(errno, "setsockopt(SO_SNDBUF)");
+}
+
+grpc_error *grpc_set_socket_rcvbuf(int fd, int buffer_size_bytes) {
+ return 0 == setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buffer_size_bytes,
+ sizeof(buffer_size_bytes))
+ ? GRPC_ERROR_NONE
+ : GRPC_OS_ERROR(errno, "setsockopt(SO_RCVBUF)");
}
/* set a socket to close on exec */
-int grpc_set_socket_cloexec(int fd, int close_on_exec) {
+grpc_error *grpc_set_socket_cloexec(int fd, int close_on_exec) {
int oldflags = fcntl(fd, F_GETFD, 0);
if (oldflags < 0) {
- return 0;
+ return GRPC_OS_ERROR(errno, "fcntl");
}
if (close_on_exec) {
@@ -125,30 +145,45 @@ int grpc_set_socket_cloexec(int fd, int close_on_exec) {
}
if (fcntl(fd, F_SETFD, oldflags) != 0) {
- return 0;
+ return GRPC_OS_ERROR(errno, "fcntl");
}
- return 1;
+ return GRPC_ERROR_NONE;
}
/* set a socket to reuse old addresses */
-int grpc_set_socket_reuse_addr(int fd, int reuse) {
+grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse) {
int val = (reuse != 0);
int newval;
socklen_t intlen = sizeof(newval);
- return 0 == setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) &&
- 0 == getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen) &&
- (newval != 0) == val;
+ if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) {
+ return GRPC_OS_ERROR(errno, "setsockopt(SO_REUSEADDR)");
+ }
+ if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen)) {
+ return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEADDR)");
+ }
+ if ((newval != 0) != val) {
+ return GRPC_ERROR_CREATE("Failed to set SO_REUSEADDR");
+ }
+
+ return GRPC_ERROR_NONE;
}
/* disable nagle */
-int grpc_set_socket_low_latency(int fd, int low_latency) {
+grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) {
int val = (low_latency != 0);
int newval;
socklen_t intlen = sizeof(newval);
- return 0 == setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) &&
- 0 == getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen) &&
- (newval != 0) == val;
+ if (0 != setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
+ return GRPC_OS_ERROR(errno, "setsockopt(TCP_NODELAY)");
+ }
+ if (0 != getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen)) {
+ return GRPC_OS_ERROR(errno, "getsockopt(TCP_NODELAY)");
+ }
+ if ((newval != 0) != val) {
+ return GRPC_ERROR_CREATE("Failed to set TCP_NODELAY");
+ }
+ return GRPC_ERROR_NONE;
}
static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT;
@@ -196,35 +231,47 @@ static int set_socket_dualstack(int fd) {
}
}
-int grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
- int protocol, grpc_dualstack_mode *dsmode) {
+static grpc_error *error_for_fd(int fd, const struct sockaddr *addr) {
+ if (fd >= 0) return GRPC_ERROR_NONE;
+ char *addr_str;
+ grpc_sockaddr_to_string(&addr_str, addr, 0);
+ grpc_error *err = grpc_error_set_str(GRPC_OS_ERROR(errno, "socket"),
+ GRPC_ERROR_STR_TARGET_ADDRESS, addr_str);
+ gpr_free(addr_str);
+ return err;
+}
+
+grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
+ int protocol,
+ grpc_dualstack_mode *dsmode,
+ int *newfd) {
int family = addr->sa_family;
if (family == AF_INET6) {
- int fd;
if (grpc_ipv6_loopback_available()) {
- fd = socket(family, type, protocol);
+ *newfd = socket(family, type, protocol);
} else {
- fd = -1;
+ *newfd = -1;
errno = EAFNOSUPPORT;
}
/* Check if we've got a valid dualstack socket. */
- if (fd >= 0 && set_socket_dualstack(fd)) {
+ if (*newfd >= 0 && set_socket_dualstack(*newfd)) {
*dsmode = GRPC_DSMODE_DUALSTACK;
- return fd;
+ return GRPC_ERROR_NONE;
}
/* If this isn't an IPv4 address, then return whatever we've got. */
if (!grpc_sockaddr_is_v4mapped(addr, NULL)) {
*dsmode = GRPC_DSMODE_IPV6;
- return fd;
+ return error_for_fd(*newfd, addr);
}
/* Fall back to AF_INET. */
- if (fd >= 0) {
- close(fd);
+ if (*newfd >= 0) {
+ close(*newfd);
}
family = AF_INET;
}
*dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE;
- return socket(family, type, protocol);
+ *newfd = socket(family, type, protocol);
+ return error_for_fd(*newfd, addr);
}
#endif
diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h
index a8f6e5e658..30ff39dfa3 100644
--- a/src/core/lib/iomgr/socket_utils_posix.h
+++ b/src/core/lib/iomgr/socket_utils_posix.h
@@ -37,21 +37,23 @@
#include <sys/socket.h>
#include <unistd.h>
+#include "src/core/lib/iomgr/error.h"
+
/* a wrapper for accept or accept4 */
int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen,
int nonblock, int cloexec);
/* set a socket to non blocking mode */
-int grpc_set_socket_nonblocking(int fd, int non_blocking);
+grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking);
/* set a socket to close on exec */
-int grpc_set_socket_cloexec(int fd, int close_on_exec);
+grpc_error *grpc_set_socket_cloexec(int fd, int close_on_exec);
/* set a socket to reuse old addresses */
-int grpc_set_socket_reuse_addr(int fd, int reuse);
+grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse);
/* disable nagle */
-int grpc_set_socket_low_latency(int fd, int low_latency);
+grpc_error *grpc_set_socket_low_latency(int fd, int low_latency);
/* Returns true if this system can create AF_INET6 sockets bound to ::1.
The value is probed once, and cached for the life of the process.
@@ -64,19 +66,22 @@ int grpc_set_socket_low_latency(int fd, int low_latency);
int grpc_ipv6_loopback_available(void);
/* Tries to set SO_NOSIGPIPE if available on this platform.
- Returns 1 on success, 0 on failure.
If SO_NO_SIGPIPE is not available, returns 1. */
-int grpc_set_socket_no_sigpipe_if_possible(int fd);
+grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd);
/* Tries to set IP_PKTINFO if available on this platform.
- Returns 1 on success, 0 on failure.
If IP_PKTINFO is not available, returns 1. */
-int grpc_set_socket_ip_pktinfo_if_possible(int fd);
+grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd);
/* Tries to set IPV6_RECVPKTINFO if available on this platform.
- Returns 1 on success, 0 on failure.
If IPV6_RECVPKTINFO is not available, returns 1. */
-int grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd);
+grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd);
+
+/* Tries to set the socket's send buffer to given size. */
+grpc_error *grpc_set_socket_sndbuf(int fd, int buffer_size_bytes);
+
+/* Tries to set the socket's receive buffer to given size. */
+grpc_error *grpc_set_socket_rcvbuf(int fd, int buffer_size_bytes);
/* An enum to keep track of IPv4/IPv6 socket modes.
@@ -117,7 +122,9 @@ extern int grpc_forbid_dualstack_sockets_for_testing;
IPv4, so that bind() or connect() see the correct family.
Also, it's important to distinguish between DUALSTACK and IPV6 when
listening on the [::] wildcard address. */
-int grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
- int protocol, grpc_dualstack_mode *dsmode);
+grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
+ int protocol,
+ grpc_dualstack_mode *dsmode,
+ int *newfd);
#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */
diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c
index ebd77e0372..d7d5f6f157 100644
--- a/src/core/lib/iomgr/socket_windows.c
+++ b/src/core/lib/iomgr/socket_windows.c
@@ -42,7 +42,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/log_win32.h>
+#include <grpc/support/log_windows.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/iomgr/iocp_windows.h"
@@ -91,10 +91,69 @@ void grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
closesocket(winsocket->socket);
}
-void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
+static void destroy(grpc_winsocket *winsocket) {
grpc_iomgr_unregister_object(&winsocket->iomgr_object);
gpr_mu_destroy(&winsocket->state_mu);
gpr_free(winsocket);
}
+static bool check_destroyable(grpc_winsocket *winsocket) {
+ return winsocket->destroy_called == true &&
+ winsocket->write_info.closure == NULL &&
+ winsocket->read_info.closure == NULL;
+}
+
+void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
+ gpr_mu_lock(&winsocket->state_mu);
+ GPR_ASSERT(!winsocket->destroy_called);
+ winsocket->destroy_called = true;
+ bool should_destroy = check_destroyable(winsocket);
+ gpr_mu_unlock(&winsocket->state_mu);
+ if (should_destroy) destroy(winsocket);
+}
+
+/* Calling notify_on_read or write means either of two things:
+-) The IOCP already completed in the background, and we need to call
+the callback now.
+-) The IOCP hasn't completed yet, and we're queuing it for later. */
+static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
+ grpc_winsocket *socket, grpc_closure *closure,
+ grpc_winsocket_callback_info *info) {
+ GPR_ASSERT(info->closure == NULL);
+ gpr_mu_lock(&socket->state_mu);
+ if (info->has_pending_iocp) {
+ info->has_pending_iocp = 0;
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+ } else {
+ info->closure = closure;
+ }
+ gpr_mu_unlock(&socket->state_mu);
+}
+
+void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
+ grpc_winsocket *socket,
+ grpc_closure *closure) {
+ socket_notify_on_iocp(exec_ctx, socket, closure, &socket->write_info);
+}
+
+void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
+ grpc_closure *closure) {
+ socket_notify_on_iocp(exec_ctx, socket, closure, &socket->read_info);
+}
+
+void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
+ grpc_winsocket_callback_info *info) {
+ GPR_ASSERT(!info->has_pending_iocp);
+ gpr_mu_lock(&socket->state_mu);
+ if (info->closure) {
+ grpc_exec_ctx_sched(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL);
+ info->closure = NULL;
+ } else {
+ info->has_pending_iocp = 1;
+ }
+ bool should_destroy = check_destroyable(socket);
+ gpr_mu_unlock(&socket->state_mu);
+ if (should_destroy) destroy(socket);
+}
+
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h
index 73c4384987..490d0e0a06 100644
--- a/src/core/lib/iomgr/socket_windows.h
+++ b/src/core/lib/iomgr/socket_windows.h
@@ -81,6 +81,7 @@ typedef struct grpc_winsocket_callback_info {
is closer to what happens in posix world. */
typedef struct grpc_winsocket {
SOCKET socket;
+ bool destroy_called;
grpc_winsocket_callback_info write_info;
grpc_winsocket_callback_info read_info;
@@ -108,4 +109,16 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket);
/* Destroy a socket. Should only be called if there's no pending operation. */
void grpc_winsocket_destroy(grpc_winsocket *socket);
+void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
+ grpc_winsocket *winsocket,
+ grpc_closure *closure);
+
+void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx,
+ grpc_winsocket *winsocket,
+ grpc_closure *closure);
+
+void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx,
+ grpc_winsocket *winsocket,
+ grpc_winsocket_callback_info *ci);
+
#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index e93d5734a0..80c7a3f128 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -71,33 +71,39 @@ typedef struct {
grpc_closure *closure;
} async_connect;
-static int prepare_socket(const struct sockaddr *addr, int fd) {
- if (fd < 0) {
- goto error;
+static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) {
+ grpc_error *err = GRPC_ERROR_NONE;
+
+ GPR_ASSERT(fd >= 0);
+
+ err = grpc_set_socket_nonblocking(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ err = grpc_set_socket_cloexec(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ if (!grpc_is_unix_socket(addr)) {
+ err = grpc_set_socket_low_latency(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
}
-
- if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) ||
- (!grpc_is_unix_socket(addr) && !grpc_set_socket_low_latency(fd, 1)) ||
- !grpc_set_socket_no_sigpipe_if_possible(fd)) {
- gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd,
- strerror(errno));
- goto error;
- }
- return 1;
+ err = grpc_set_socket_no_sigpipe_if_possible(fd);
+ if (err != GRPC_ERROR_NONE) goto error;
+ goto done;
error:
if (fd >= 0) {
close(fd);
}
- return 0;
+done:
+ return err;
}
-static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
+static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
int done;
async_connect *ac = acp;
if (grpc_tcp_trace) {
- gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: success=%d", ac->addr_str,
- success);
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s", ac->addr_str,
+ str);
+ grpc_error_free_string(str);
}
gpr_mu_lock(&ac->mu);
if (ac->fd != NULL) {
@@ -112,7 +118,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
}
}
-static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
+static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect *ac = acp;
int so_error = 0;
socklen_t so_error_size;
@@ -122,9 +128,13 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
grpc_closure *closure = ac->closure;
grpc_fd *fd;
+ GRPC_ERROR_REF(error);
+
if (grpc_tcp_trace) {
- gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: success=%d",
- ac->addr_str, success);
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: error=%s",
+ ac->addr_str, str);
+ grpc_error_free_string(str);
}
gpr_mu_lock(&ac->mu);
@@ -136,15 +146,14 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
grpc_timer_cancel(exec_ctx, &ac->alarm);
gpr_mu_lock(&ac->mu);
- if (success) {
+ if (error == GRPC_ERROR_NONE) {
do {
so_error_size = sizeof(so_error);
err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
&so_error_size);
} while (err < 0 && errno == EINTR);
if (err < 0) {
- gpr_log(GPR_ERROR, "failed to connect to '%s': getsockopt(ERROR): %s",
- ac->addr_str, strerror(errno));
+ error = GRPC_OS_ERROR(errno, "getsockopt");
goto finish;
} else if (so_error != 0) {
if (so_error == ENOBUFS) {
@@ -169,14 +178,12 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
} else {
switch (so_error) {
case ECONNREFUSED:
- gpr_log(
- GPR_ERROR,
- "failed to connect to '%s': socket error: connection refused",
- ac->addr_str);
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, errno);
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ "Connection refused");
break;
default:
- gpr_log(GPR_ERROR, "failed to connect to '%s': socket error: %d",
- ac->addr_str, so_error);
+ error = GRPC_OS_ERROR(errno, "getsockopt(SO_ERROR)");
break;
}
goto finish;
@@ -188,8 +195,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
goto finish;
}
} else {
- gpr_log(GPR_ERROR, "failed to connect to '%s': timeout occurred",
- ac->addr_str);
+ error =
+ grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred");
goto finish;
}
@@ -203,12 +210,18 @@ finish:
}
done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
+ if (error != GRPC_ERROR_NONE) {
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION,
+ "Failed to connect to remote host");
+ error =
+ grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str);
+ }
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
gpr_free(ac);
}
- grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL, NULL);
+ grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
}
static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
@@ -225,6 +238,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
grpc_fd *fdobj;
char *name;
char *addr_str;
+ grpc_error *error;
*ep = NULL;
@@ -234,9 +248,10 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
addr_len = sizeof(addr6_v4mapped);
}
- fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
- if (fd < 0) {
- gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
+ error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+ return;
}
if (dsmode == GRPC_DSMODE_IPV4) {
/* If we got an AF_INET socket, map the address back to IPv4. */
@@ -244,8 +259,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy);
}
- if (!prepare_socket(addr, fd)) {
- grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
+ if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
return;
}
@@ -261,14 +276,14 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
if (err >= 0) {
*ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str);
- grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
goto done;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
- gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno));
grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
- grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"),
+ NULL);
goto done;
}
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index 7d78beb15a..562cb9c6bf 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -35,11 +35,11 @@
#ifdef GPR_WINSOCK_SOCKET
-#include "src/core/lib/iomgr/sockaddr_win32.h"
+#include "src/core/lib/iomgr/sockaddr_windows.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/log_win32.h>
+#include <grpc/support/log_windows.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/support/useful.h>
@@ -63,59 +63,65 @@ typedef struct {
grpc_endpoint **endpoint;
} async_connect;
-static void async_connect_unlock_and_cleanup(async_connect *ac) {
+static void async_connect_unlock_and_cleanup(async_connect *ac,
+ grpc_winsocket *socket) {
int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (done) {
- if (ac->socket != NULL) grpc_winsocket_destroy(ac->socket);
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name);
gpr_free(ac);
}
+ if (socket != NULL) grpc_winsocket_destroy(socket);
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool occured) {
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
- /* If the alarm didn't occur, it got cancelled. */
- if (ac->socket != NULL && occured) {
- grpc_winsocket_shutdown(ac->socket);
+ grpc_winsocket *socket = ac->socket;
+ ac->socket = NULL;
+ if (socket != NULL) {
+ grpc_winsocket_shutdown(socket);
}
- async_connect_unlock_and_cleanup(ac);
+ async_connect_unlock_and_cleanup(ac, socket);
}
-static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, bool from_iocp) {
+static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect *ac = acp;
- SOCKET sock = ac->socket->socket;
grpc_endpoint **ep = ac->endpoint;
- grpc_winsocket_callback_info *info = &ac->socket->write_info;
+ GPR_ASSERT(*ep == NULL);
grpc_closure *on_done = ac->on_done;
+ GRPC_ERROR_REF(error);
+
+ gpr_mu_lock(&ac->mu);
+ grpc_winsocket *socket = ac->socket;
+ ac->socket = NULL;
+ gpr_mu_unlock(&ac->mu);
+
grpc_timer_cancel(exec_ctx, &ac->alarm);
gpr_mu_lock(&ac->mu);
- if (from_iocp) {
+ if (error == GRPC_ERROR_NONE && socket != NULL) {
DWORD transfered_bytes = 0;
DWORD flags;
- BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
- &transfered_bytes, FALSE, &flags);
+ BOOL wsa_success =
+ WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
+ &transfered_bytes, FALSE, &flags);
GPR_ASSERT(transfered_bytes == 0);
if (!wsa_success) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "on_connect error connecting to '%s': %s",
- ac->addr_name, utf8_message);
- gpr_free(utf8_message);
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
} else {
- *ep = grpc_tcp_create(ac->socket, ac->addr_name);
- ac->socket = NULL;
+ *ep = grpc_tcp_create(socket, ac->addr_name);
+ socket = NULL;
}
}
- async_connect_unlock_and_cleanup(ac);
+ async_connect_unlock_and_cleanup(ac, socket);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
- on_done->cb(exec_ctx, on_done->cb_arg, *ep != NULL);
+ grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL);
}
/* Tries to issue one async connection, then schedules both an IOCP
@@ -135,9 +141,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
LPFN_CONNECTEX ConnectEx;
GUID guid = WSAID_CONNECTEX;
DWORD ioctl_num_bytes;
- const char *message = NULL;
- char *utf8_message;
grpc_winsocket_callback_info *info;
+ grpc_error *error = GRPC_ERROR_NONE;
*endpoint = NULL;
@@ -150,12 +155,12 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED);
if (sock == INVALID_SOCKET) {
- message = "Unable to create socket: %s";
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
goto failure;
}
- if (!grpc_tcp_prepare_socket(sock)) {
- message = "Unable to set socket options: %s";
+ error = grpc_tcp_prepare_socket(sock);
+ if (error != GRPC_ERROR_NONE) {
goto failure;
}
@@ -166,7 +171,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
&ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, NULL, NULL);
if (status != 0) {
- message = "Unable to retrieve ConnectEx pointer: %s";
+ error = GRPC_WSA_ERROR(WSAGetLastError(),
+ "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)");
goto failure;
}
@@ -174,7 +180,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
status = bind(sock, (struct sockaddr *)&local_address, sizeof(local_address));
if (status != 0) {
- message = "Unable to bind socket: %s";
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
goto failure;
}
@@ -186,9 +192,9 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
/* It wouldn't be unusual to get a success immediately. But we'll still get
an IOCP notification, so let's ignore it. */
if (!success) {
- int error = WSAGetLastError();
- if (error != ERROR_IO_PENDING) {
- message = "ConnectEx failed: %s";
+ int last_error = WSAGetLastError();
+ if (last_error != ERROR_IO_PENDING) {
+ error = GRPC_WSA_ERROR(last_error, "ConnectEx");
goto failure;
}
}
@@ -208,15 +214,18 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
return;
failure:
- utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, message, utf8_message);
- gpr_free(utf8_message);
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ char *target_uri = grpc_sockaddr_to_uri(addr);
+ grpc_error *final_error = grpc_error_set_str(
+ GRPC_ERROR_CREATE_REFERENCING("Failed to connect", &error, 1),
+ GRPC_ERROR_STR_TARGET_ADDRESS, target_uri);
+ GRPC_ERROR_UNREF(error);
if (socket != NULL) {
grpc_winsocket_destroy(socket);
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
- grpc_exec_ctx_enqueue(exec_ctx, on_done, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL);
}
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 7210aef5d5..017f52c367 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -38,6 +38,7 @@
#include "src/core/lib/iomgr/tcp_posix.h"
#include <errno.h>
+#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
@@ -74,7 +75,7 @@ typedef struct {
grpc_endpoint base;
grpc_fd *em_fd;
int fd;
- int finished_edge;
+ bool finished_edge;
msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
size_t slice_size;
gpr_refcount refcount;
@@ -101,9 +102,9 @@ typedef struct {
} grpc_tcp;
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- bool success);
+ grpc_error *error);
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- bool success);
+ grpc_error *error);
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
@@ -155,23 +156,26 @@ static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
TCP_UNREF(exec_ctx, tcp, "destroy");
}
-static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) {
+static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
+ grpc_error *error) {
grpc_closure *cb = tcp->read_cb;
- if (grpc_tcp_trace) {
+ if (false && grpc_tcp_trace) {
size_t i;
- gpr_log(GPR_DEBUG, "read: success=%d", success);
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "read: error=%s", str);
+ grpc_error_free_string(str);
for (i = 0; i < tcp->incoming_buffer->count; i++) {
char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
+ gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
gpr_free(dump);
}
}
tcp->read_cb = NULL;
tcp->incoming_buffer = NULL;
- cb->cb(exec_ctx, cb->cb_arg, success);
+ grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
}
#define MAX_READ_IOVEC 4
@@ -219,15 +223,14 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
/* We've consumed the edge, request a new one */
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- /* TODO(klempner): Log interesting errors */
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp, 0);
+ call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg"));
TCP_UNREF(exec_ctx, tcp, "read");
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp, 0);
+ call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("EOF"));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
@@ -240,7 +243,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
++tcp->iov_size;
}
GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
- call_read_cb(exec_ctx, tcp, 1);
+ call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE);
TCP_UNREF(exec_ctx, tcp, "read");
}
@@ -248,13 +251,13 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
}
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- bool success) {
+ grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp, 0);
+ call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
tcp_continue_read(exec_ctx, tcp);
@@ -271,17 +274,16 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
if (tcp->finished_edge) {
- tcp->finished_edge = 0;
+ tcp->finished_edge = false;
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL);
}
}
-typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result;
-
+/* returns true if done, false if pending; if returning true, *error is set */
#define MAX_WRITE_IOVEC 16
-static flush_result tcp_flush(grpc_tcp *tcp) {
+static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
msg_iovlen_type iov_size;
@@ -331,10 +333,10 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
if (errno == EAGAIN) {
tcp->outgoing_slice_idx = unwind_slice_idx;
tcp->outgoing_byte_idx = unwind_byte_idx;
- return FLUSH_PENDING;
+ return false;
} else {
- /* TODO(klempner): Log some of these */
- return FLUSH_ERROR;
+ *error = GRPC_OS_ERROR(errno, "sendmsg");
+ return true;
}
}
@@ -355,50 +357,50 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
}
if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
- return FLUSH_DONE;
+ *error = GRPC_ERROR_NONE;
+ return true;
}
};
}
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- bool success) {
+ grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)arg;
- flush_result status;
grpc_closure *cb;
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb->cb(exec_ctx, cb->cb_arg, 0);
+ cb->cb(exec_ctx, cb->cb_arg, error);
TCP_UNREF(exec_ctx, tcp, "write");
return;
}
- status = tcp_flush(tcp);
- if (status == FLUSH_PENDING) {
+ if (!tcp_flush(tcp, &error)) {
grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
} else {
cb = tcp->write_cb;
tcp->write_cb = NULL;
GPR_TIMER_BEGIN("tcp_handle_write.cb", 0);
- cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE);
+ cb->cb(exec_ctx, cb->cb_arg, error);
GPR_TIMER_END("tcp_handle_write.cb", 0);
TCP_UNREF(exec_ctx, tcp, "write");
+ GRPC_ERROR_UNREF(error);
}
}
static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_slice_buffer *buf, grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- flush_result status;
+ grpc_error *error = GRPC_ERROR_NONE;
- if (grpc_tcp_trace) {
+ if (false && grpc_tcp_trace) {
size_t i;
for (i = 0; i < buf->count; i++) {
char *data =
gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
+ gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
gpr_free(data);
}
}
@@ -408,20 +410,22 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
- grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, cb, grpc_fd_is_shutdown(tcp->em_fd)
+ ? GRPC_ERROR_CREATE("EOF")
+ : GRPC_ERROR_NONE,
+ NULL);
return;
}
tcp->outgoing_buffer = buf;
tcp->outgoing_slice_idx = 0;
tcp->outgoing_byte_idx = 0;
- status = tcp_flush(tcp);
- if (status == FLUSH_PENDING) {
+ if (!tcp_flush(tcp, &error)) {
TCP_REF(tcp, "write");
tcp->write_cb = cb;
grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
}
GPR_TIMER_END("tcp_write", 0);
@@ -461,7 +465,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->incoming_buffer = NULL;
tcp->slice_size = slice_size;
tcp->iov_size = 1;
- tcp->finished_edge = 1;
+ tcp->finished_edge = true;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
tcp->em_fd = em_fd;
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 99b9f29729..1d6e70dbe9 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -52,12 +52,14 @@ typedef struct grpc_tcp_server_acceptor {
/* Called for newly connected TCP connections. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep,
+ grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor);
/* Create a server, initially not bound to any ports. The caller owns one ref.
If shutdown_complete is not NULL, it will be used by
grpc_tcp_server_unref() when the ref count reaches zero. */
-grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete);
+grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+ grpc_tcp_server **server);
/* Start listening to bound ports */
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
@@ -73,8 +75,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
but not dualstack sockets. */
/* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
-int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
- size_t addr_len);
+grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
+ size_t addr_len, int *out_port);
/* Number of fds at the given port_index, or 0 if port_index is out of
bounds. */
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index aaeb384f6e..6fb547bb36 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -59,6 +59,8 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
@@ -128,9 +130,13 @@ struct grpc_tcp_server {
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
+
+ /* next pollset to assign a channel to */
+ size_t next_pollset_to_assign;
};
-grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
+grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+ grpc_tcp_server **server) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
@@ -145,12 +151,14 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
s->head = NULL;
s->tail = NULL;
s->nports = 0;
- return s;
+ s->next_pollset_to_assign = 0;
+ *server = s;
+ return GRPC_ERROR_NONE;
}
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
}
gpr_mu_destroy(&s->mu);
@@ -165,7 +173,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server,
- bool success) {
+ grpc_error *error) {
grpc_tcp_server *s = server;
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
@@ -259,64 +267,75 @@ static int get_max_accept_queue_size(void) {
}
/* Prepare a recently-created socket for listening. */
-static int prepare_socket(int fd, const struct sockaddr *addr,
- size_t addr_len) {
+static grpc_error *prepare_socket(int fd, const struct sockaddr *addr,
+ size_t addr_len, int *port) {
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
-
- if (fd < 0) {
- goto error;
- }
-
- if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) ||
- (!grpc_is_unix_socket(addr) && (!grpc_set_socket_low_latency(fd, 1) ||
- !grpc_set_socket_reuse_addr(fd, 1))) ||
- !grpc_set_socket_no_sigpipe_if_possible(fd)) {
- gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd,
- strerror(errno));
- goto error;
+ grpc_error *err = GRPC_ERROR_NONE;
+
+ GPR_ASSERT(fd >= 0);
+
+ err = grpc_set_socket_nonblocking(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ err = grpc_set_socket_cloexec(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ if (!grpc_is_unix_socket(addr)) {
+ err = grpc_set_socket_low_latency(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ err = grpc_set_socket_reuse_addr(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
}
+ err = grpc_set_socket_no_sigpipe_if_possible(fd);
+ if (err != GRPC_ERROR_NONE) goto error;
GPR_ASSERT(addr_len < ~(socklen_t)0);
if (bind(fd, addr, (socklen_t)addr_len) < 0) {
- char *addr_str;
- grpc_sockaddr_to_string(&addr_str, addr, 0);
- gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno));
- gpr_free(addr_str);
+ err = GRPC_OS_ERROR(errno, "bind");
goto error;
}
if (listen(fd, get_max_accept_queue_size()) < 0) {
- gpr_log(GPR_ERROR, "listen: %s", strerror(errno));
+ err = GRPC_OS_ERROR(errno, "listen");
goto error;
}
sockname_len = sizeof(sockname_temp);
if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) {
+ err = GRPC_OS_ERROR(errno, "getsockname");
goto error;
}
- return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ *port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ return GRPC_ERROR_NONE;
error:
+ GPR_ASSERT(err != GRPC_ERROR_NONE);
if (fd >= 0) {
close(fd);
}
- return -1;
+ grpc_error *ret = grpc_error_set_int(
+ GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1),
+ GRPC_ERROR_INT_FD, fd);
+ GRPC_ERROR_UNREF(err);
+ return ret;
}
/* event manager callback when reads are ready */
-static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
sp->fd_index};
+ grpc_pollset *read_notifier_pollset = NULL;
grpc_fd *fdobj;
- size_t i;
- if (!success) {
+ if (err != GRPC_ERROR_NONE) {
goto error;
}
+ read_notifier_pollset =
+ sp->server->pollsets[(sp->server->next_pollset_to_assign++) %
+ sp->server->pollset_count];
+
/* loop until accept4 returns EAGAIN, and then re-arm notification */
for (;;) {
struct sockaddr_storage addr;
@@ -349,16 +368,18 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
}
fdobj = grpc_fd_create(fd, name);
- /* TODO(ctiller): revise this when we have server-side sharding
- of channels -- we certainly should not be automatically adding every
- incoming channel to every pollset owned by the server */
- for (i = 0; i < sp->server->pollset_count; i++) {
- grpc_pollset_add_fd(exec_ctx, sp->server->pollsets[i], fdobj);
+
+ if (read_notifier_pollset == NULL) {
+ gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
+ goto error;
}
+
+ grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
+
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
- &acceptor);
+ read_notifier_pollset, &acceptor);
gpr_free(name);
gpr_free(addr_str);
@@ -376,18 +397,19 @@ error:
}
}
-static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
- const struct sockaddr *addr,
- size_t addr_len,
- unsigned port_index,
- unsigned fd_index) {
+static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd,
+ const struct sockaddr *addr,
+ size_t addr_len, unsigned port_index,
+ unsigned fd_index,
+ grpc_tcp_listener **listener) {
grpc_tcp_listener *sp = NULL;
- int port;
+ int port = -1;
char *addr_str;
char *name;
- port = prepare_socket(fd, addr, addr_len);
- if (port >= 0) {
+ grpc_error *err = prepare_socket(fd, addr, addr_len, &port);
+ if (err == GRPC_ERROR_NONE) {
+ GPR_ASSERT(port > 0);
grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
gpr_asprintf(&name, "tcp-server-listener:%s", addr_str);
gpr_mu_lock(&s->mu);
@@ -417,11 +439,12 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
gpr_free(name);
}
- return sp;
+ *listener = sp;
+ return err;
}
-int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
- size_t addr_len) {
+grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
+ size_t addr_len, int *out_port) {
grpc_tcp_listener *sp;
grpc_tcp_listener *sp2 = NULL;
int fd;
@@ -436,6 +459,7 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
int port;
unsigned port_index = 0;
unsigned fd_index = 0;
+ grpc_error *errs[2] = {GRPC_ERROR_NONE, GRPC_ERROR_NONE};
if (s->tail != NULL) {
port_index = s->tail->port_index + 1;
}
@@ -474,33 +498,35 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
/* Try listening on IPv6 first. */
addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6);
- fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
- sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index);
- if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
- goto done;
- }
- if (sp != NULL) {
- ++fd_index;
- }
- /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
- if (port == 0 && sp != NULL) {
- grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port);
+ errs[0] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
+ if (errs[0] == GRPC_ERROR_NONE) {
+ errs[0] = add_socket_to_server(s, fd, addr, addr_len, port_index,
+ fd_index, &sp);
+ if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
+ goto done;
+ }
+ if (sp != NULL) {
+ ++fd_index;
+ }
+ /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
+ if (port == 0 && sp != NULL) {
+ grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port);
+ }
}
addr = (struct sockaddr *)&wild4;
addr_len = sizeof(wild4);
}
- fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
- if (fd < 0) {
- gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
- } else {
+ errs[1] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
+ if (errs[1] == GRPC_ERROR_NONE) {
if (dsmode == GRPC_DSMODE_IPV4 &&
grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy);
}
sp2 = sp;
- sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index);
+ errs[1] =
+ add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index, &sp);
if (sp2 != NULL && sp != NULL) {
sp2->sibling = sp;
sp->is_sibling = 1;
@@ -510,9 +536,21 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
done:
gpr_free(allocated_addr);
if (sp != NULL) {
- return sp->port;
+ *out_port = sp->port;
+ GRPC_ERROR_UNREF(errs[0]);
+ GRPC_ERROR_UNREF(errs[1]);
+ return GRPC_ERROR_NONE;
} else {
- return -1;
+ *out_port = -1;
+ char *addr_str = grpc_sockaddr_to_uri(addr);
+ grpc_error *err = grpc_error_set_str(
+ GRPC_ERROR_CREATE_REFERENCING("Failed to add port to server", errs,
+ GPR_ARRAY_SIZE(errs)),
+ GRPC_ERROR_STR_TARGET_ADDRESS, addr_str);
+ GRPC_ERROR_UNREF(errs[0]);
+ GRPC_ERROR_UNREF(errs[1]);
+ gpr_free(addr_str);
+ return err;
}
}
@@ -581,7 +619,8 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
grpc_closure *shutdown_starting) {
gpr_mu_lock(&s->mu);
- grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1);
+ grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
+ GRPC_ERROR_NONE);
gpr_mu_unlock(&s->mu);
}
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index 125f521d87..86982bc183 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -41,7 +41,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/log_win32.h>
+#include <grpc/support/log_windows.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
@@ -102,7 +102,8 @@ struct grpc_tcp_server {
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */
-grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
+grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+ grpc_tcp_server **server) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
@@ -114,12 +115,13 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
s->shutdown_starting.head = NULL;
s->shutdown_starting.tail = NULL;
s->shutdown_complete = shutdown_complete;
- return s;
+ *server = s;
+ return GRPC_ERROR_NONE;
}
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
}
/* Now that the accepts have been aborted, we can destroy the sockets.
@@ -143,7 +145,8 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
grpc_closure *shutdown_starting) {
gpr_mu_lock(&s->mu);
- grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1);
+ grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
+ GRPC_ERROR_NONE);
gpr_mu_unlock(&s->mu);
}
@@ -187,51 +190,49 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
/* Prepare (bind) a recently-created socket for listening. */
-static int prepare_socket(SOCKET sock, const struct sockaddr *addr,
- size_t addr_len) {
+static grpc_error *prepare_socket(SOCKET sock, const struct sockaddr *addr,
+ size_t addr_len, int *port) {
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
+ grpc_error *error = GRPC_ERROR_NONE;
- if (sock == INVALID_SOCKET) goto error;
-
- if (!grpc_tcp_prepare_socket(sock)) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "Unable to prepare socket: %s", utf8_message);
- gpr_free(utf8_message);
- goto error;
+ error = grpc_tcp_prepare_socket(sock);
+ if (error != GRPC_ERROR_NONE) {
+ goto failure;
}
if (bind(sock, addr, (int)addr_len) == SOCKET_ERROR) {
- char *addr_str;
- char *utf8_message = gpr_format_message(WSAGetLastError());
- grpc_sockaddr_to_string(&addr_str, addr, 0);
- gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, utf8_message);
- gpr_free(utf8_message);
- gpr_free(addr_str);
- goto error;
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
+ goto failure;
}
if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "listen: %s", utf8_message);
- gpr_free(utf8_message);
- goto error;
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
+ goto failure;
}
sockname_len = sizeof(sockname_temp);
if (getsockname(sock, (struct sockaddr *)&sockname_temp, &sockname_len) ==
SOCKET_ERROR) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "getsockname: %s", utf8_message);
- gpr_free(utf8_message);
- goto error;
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
+ goto failure;
}
- return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ *port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ return GRPC_ERROR_NONE;
-error:
+failure:
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ char *tgtaddr = grpc_sockaddr_to_uri(addr);
+ grpc_error *final_error = grpc_error_set_int(
+ grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING(
+ "Failed to prepare server socket", &error, 1),
+ GRPC_ERROR_STR_TARGET_ADDRESS, tgtaddr),
+ GRPC_ERROR_INT_FD, (intptr_t)sock);
+ gpr_free(tgtaddr);
+ GRPC_ERROR_UNREF(error);
if (sock != INVALID_SOCKET) closesocket(sock);
- return -1;
+ return error;
}
static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
@@ -251,26 +252,23 @@ static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
/* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */
-static void start_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *port) {
+static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_listener *port) {
SOCKET sock = INVALID_SOCKET;
- char *message;
- char *utf8_message;
BOOL success;
DWORD addrlen = sizeof(struct sockaddr_in6) + 16;
DWORD bytes_received = 0;
+ grpc_error *error = GRPC_ERROR_NONE;
sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED);
-
if (sock == INVALID_SOCKET) {
- message = "Unable to create socket: %s";
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
goto failure;
}
- if (!grpc_tcp_prepare_socket(sock)) {
- message = "Unable to prepare socket: %s";
- goto failure;
- }
+ error = grpc_tcp_prepare_socket(sock);
+ if (error != GRPC_ERROR_NONE) goto failure;
/* Start the "accept" asynchronously. */
success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
@@ -280,9 +278,9 @@ static void start_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *port) {
/* It is possible to get an accept immediately without delay. However, we
will still get an IOCP notification for it. So let's just ignore it. */
if (!success) {
- int error = WSAGetLastError();
- if (error != ERROR_IO_PENDING) {
- message = "AcceptEx failed: %s";
+ int last_error = WSAGetLastError();
+ if (last_error != ERROR_IO_PENDING) {
+ error = GRPC_WSA_ERROR(last_error, "AcceptEx");
goto failure;
}
}
@@ -291,9 +289,10 @@ static void start_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *port) {
immediately process an accept that happened in the meantime. */
port->new_socket = sock;
grpc_socket_notify_on_read(exec_ctx, port->socket, &port->on_accept);
- return;
+ return error;
failure:
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
if (port->shutting_down) {
/* We are abandoning the listener port, take that into account to prevent
occasional hangs on shutdown. The hang happens when sp->shutting_down
@@ -301,16 +300,15 @@ failure:
but we fail there because the listening port has been closed in the
meantime. */
decrement_active_ports_and_notify(exec_ctx, port);
- return;
+ GRPC_ERROR_UNREF(error);
+ return GRPC_ERROR_NONE;
}
- utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, message, utf8_message);
- gpr_free(utf8_message);
if (sock != INVALID_SOCKET) closesocket(sock);
+ return error;
}
/* Event manager callback when reads are ready. */
-static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, bool from_iocp) {
+static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
SOCKET sock = sp->new_socket;
@@ -328,7 +326,10 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, bool from_iocp) {
/* The general mechanism for shutting down is to queue abortion calls. While
this is necessary in the read/write case, it's useless for the accept
case. We only need to adjust the pending callback count */
- if (!from_iocp) {
+ if (error != GRPC_ERROR_NONE) {
+ const char *msg = grpc_error_string(error);
+ gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
+ grpc_error_free_string(msg);
return;
}
@@ -379,28 +380,28 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, bool from_iocp) {
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
- if (ep)
- sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep,
+ if (ep) {
+ sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
&acceptor);
+ }
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
connection. */
- start_accept(exec_ctx, sp);
+ GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
}
-static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
- const struct sockaddr *addr,
- size_t addr_len,
- unsigned port_index) {
+static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
+ const struct sockaddr *addr,
+ size_t addr_len, unsigned port_index,
+ grpc_tcp_listener **listener) {
grpc_tcp_listener *sp = NULL;
- int port;
+ int port = -1;
int status;
GUID guid = WSAID_ACCEPTEX;
DWORD ioctl_num_bytes;
LPFN_ACCEPTEX AcceptEx;
-
- if (sock == INVALID_SOCKET) return NULL;
+ grpc_error *error = GRPC_ERROR_NONE;
/* We need to grab the AcceptEx pointer for that port, as it may be
interface-dependent. We'll cache it to avoid doing that again. */
@@ -416,44 +417,49 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
return NULL;
}
- port = prepare_socket(sock, addr, addr_len);
- if (port >= 0) {
- gpr_mu_lock(&s->mu);
- GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
- sp = gpr_malloc(sizeof(grpc_tcp_listener));
- sp->next = NULL;
- if (s->head == NULL) {
- s->head = sp;
- } else {
- s->tail->next = sp;
- }
- s->tail = sp;
- sp->server = s;
- sp->socket = grpc_winsocket_create(sock, "listener");
- sp->shutting_down = 0;
- sp->AcceptEx = AcceptEx;
- sp->new_socket = INVALID_SOCKET;
- sp->port = port;
- sp->port_index = port_index;
- grpc_closure_init(&sp->on_accept, on_accept, sp);
- GPR_ASSERT(sp->socket);
- gpr_mu_unlock(&s->mu);
+ error = prepare_socket(sock, addr, addr_len, &port);
+ if (error != GRPC_ERROR_NONE) {
+ return error;
+ }
+
+ GPR_ASSERT(port >= 0);
+ gpr_mu_lock(&s->mu);
+ GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
+ sp = gpr_malloc(sizeof(grpc_tcp_listener));
+ sp->next = NULL;
+ if (s->head == NULL) {
+ s->head = sp;
+ } else {
+ s->tail->next = sp;
}
+ s->tail = sp;
+ sp->server = s;
+ sp->socket = grpc_winsocket_create(sock, "listener");
+ sp->shutting_down = 0;
+ sp->AcceptEx = AcceptEx;
+ sp->new_socket = INVALID_SOCKET;
+ sp->port = port;
+ sp->port_index = port_index;
+ grpc_closure_init(&sp->on_accept, on_accept, sp);
+ GPR_ASSERT(sp->socket);
+ gpr_mu_unlock(&s->mu);
+ *listener = sp;
- return sp;
+ return GRPC_ERROR_NONE;
}
-int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
- size_t addr_len) {
- grpc_tcp_listener *sp;
+grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
+ size_t addr_len, int *port) {
+ grpc_tcp_listener *sp = NULL;
SOCKET sock;
struct sockaddr_in6 addr6_v4mapped;
struct sockaddr_in6 wildcard;
struct sockaddr *allocated_addr = NULL;
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
- int port;
unsigned port_index = 0;
+ grpc_error *error = GRPC_ERROR_NONE;
+
if (s->tail != NULL) {
port_index = s->tail->port_index + 1;
}
@@ -465,11 +471,11 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
sockname_len = sizeof(sockname_temp);
if (0 == getsockname(sp->socket->socket,
(struct sockaddr *)&sockname_temp, &sockname_len)) {
- port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
- if (port > 0) {
+ *port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ if (*port > 0) {
allocated_addr = gpr_malloc(addr_len);
memcpy(allocated_addr, addr, addr_len);
- grpc_sockaddr_set_port(allocated_addr, port);
+ grpc_sockaddr_set_port(allocated_addr, *port);
addr = allocated_addr;
break;
}
@@ -483,8 +489,8 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
}
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
- if (grpc_sockaddr_is_wildcard(addr, &port)) {
- grpc_sockaddr_make_wildcard6(port, &wildcard);
+ if (grpc_sockaddr_is_wildcard(addr, port)) {
+ grpc_sockaddr_make_wildcard6(*port, &wildcard);
addr = (struct sockaddr *)&wildcard;
addr_len = sizeof(wildcard);
@@ -493,19 +499,26 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED);
if (sock == INVALID_SOCKET) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "unable to create socket: %s", utf8_message);
- gpr_free(utf8_message);
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
+ goto done;
}
- sp = add_socket_to_server(s, sock, addr, addr_len, port_index);
+ error = add_socket_to_server(s, sock, addr, addr_len, port_index, &sp);
+
+done:
gpr_free(allocated_addr);
- if (sp) {
- return sp->port;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING(
+ "Failed to add port to server", &error, 1);
+ GRPC_ERROR_UNREF(error);
+ error = error_out;
+ *port = -1;
} else {
- return -1;
+ GPR_ASSERT(sp != NULL);
+ *port = sp->port;
}
+ return error;
}
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
@@ -520,7 +533,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg;
for (sp = s->head; sp; sp = sp->next) {
- start_accept(exec_ctx, sp);
+ GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
s->active_ports++;
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 551149e1a6..b2af8030aa 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -37,11 +37,11 @@
#include <limits.h>
-#include "src/core/lib/iomgr/sockaddr_win32.h"
+#include "src/core/lib/iomgr/sockaddr_windows.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/log_win32.h>
+#include <grpc/support/log_windows.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
@@ -61,27 +61,34 @@
#define GRPC_FIONBIO FIONBIO
#endif
-static int set_non_block(SOCKET sock) {
+static grpc_error *set_non_block(SOCKET sock) {
int status;
uint32_t param = 1;
DWORD ret;
status = WSAIoctl(sock, GRPC_FIONBIO, &param, sizeof(param), NULL, 0, &ret,
NULL, NULL);
- return status == 0;
+ return status == 0
+ ? GRPC_ERROR_NONE
+ : GRPC_WSA_ERROR(WSAGetLastError(), "WSAIoctl(GRPC_FIONBIO)");
}
-static int set_dualstack(SOCKET sock) {
+static grpc_error *set_dualstack(SOCKET sock) {
int status;
unsigned long param = 0;
status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char *)&param,
sizeof(param));
- return status == 0;
+ return status == 0
+ ? GRPC_ERROR_NONE
+ : GRPC_WSA_ERROR(WSAGetLastError(), "setsockopt(IPV6_V6ONLY)");
}
-int grpc_tcp_prepare_socket(SOCKET sock) {
- if (!set_non_block(sock)) return 0;
- if (!set_dualstack(sock)) return 0;
- return 1;
+grpc_error *grpc_tcp_prepare_socket(SOCKET sock) {
+ grpc_error *err;
+ err = set_non_block(sock);
+ if (err != GRPC_ERROR_NONE) return err;
+ err = set_dualstack(sock);
+ if (err != GRPC_ERROR_NONE) return err;
+ return GRPC_ERROR_NONE;
}
typedef struct grpc_tcp {
@@ -148,39 +155,35 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
/* Asynchronous callback from the IOCP, or the background thread. */
-static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, bool success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
grpc_tcp *tcp = tcpp;
grpc_closure *cb = tcp->read_cb;
grpc_winsocket *socket = tcp->socket;
gpr_slice sub;
grpc_winsocket_callback_info *info = &socket->read_info;
- if (success) {
+ GRPC_ERROR_REF(error);
+
+ if (error == GRPC_ERROR_NONE) {
if (info->wsa_error != 0 && !tcp->shutting_down) {
- if (info->wsa_error != WSAECONNRESET) {
- char *utf8_message = gpr_format_message(info->wsa_error);
- gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
- gpr_free(utf8_message);
- }
- success = 0;
+ char *utf8_message = gpr_format_message(info->wsa_error);
+ error = GRPC_ERROR_CREATE(utf8_message);
+ gpr_free(utf8_message);
gpr_slice_unref(tcp->read_slice);
} else {
if (info->bytes_transfered != 0 && !tcp->shutting_down) {
sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
gpr_slice_buffer_add(tcp->read_slices, sub);
- success = 1;
} else {
gpr_slice_unref(tcp->read_slice);
- success = 0;
+ error = GRPC_ERROR_CREATE("End of TCP stream");
}
}
}
tcp->read_cb = NULL;
TCP_UNREF(tcp, "read");
- if (cb) {
- cb->cb(exec_ctx, cb->cb_arg, success);
- }
+ grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
}
static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@@ -194,7 +197,8 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
WSABUF buffer;
if (tcp->shutting_down) {
- grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, cb,
+ GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
return;
}
@@ -218,7 +222,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read;
- grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE, NULL);
return;
}
@@ -231,7 +235,8 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
info->wsa_error = wsa_error;
- grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &tcp->on_read,
+ GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL);
return;
}
}
@@ -240,32 +245,29 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
/* Asynchronous callback from the IOCP, or the background thread. */
-static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, bool success) {
+static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
grpc_closure *cb;
+ GRPC_ERROR_REF(error);
+
gpr_mu_lock(&tcp->mu);
cb = tcp->write_cb;
tcp->write_cb = NULL;
gpr_mu_unlock(&tcp->mu);
- if (success) {
+ if (error == GRPC_ERROR_NONE) {
if (info->wsa_error != 0) {
- if (info->wsa_error != WSAECONNRESET) {
- char *utf8_message = gpr_format_message(info->wsa_error);
- gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
- gpr_free(utf8_message);
- }
- success = 0;
+ error = GRPC_WSA_ERROR(info->wsa_error, "WSASend");
} else {
GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
}
}
TCP_UNREF(tcp, "write");
- cb->cb(exec_ctx, cb->cb_arg, success);
+ grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
}
/* Initiates a write. */
@@ -283,7 +285,8 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
size_t len;
if (tcp->shutting_down) {
- grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, cb,
+ GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
return;
}
@@ -311,19 +314,10 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
connection that has its send queue filled up. But if we don't, then we can
avoid doing an async write operation at all. */
if (info->wsa_error != WSAEWOULDBLOCK) {
- bool ok = false;
- if (status == 0) {
- ok = true;
- GPR_ASSERT(bytes_sent == tcp->write_slices->length);
- } else {
- if (info->wsa_error != WSAECONNRESET) {
- char *utf8_message = gpr_format_message(info->wsa_error);
- gpr_log(GPR_ERROR, "WSASend error: %s", utf8_message);
- gpr_free(utf8_message);
- }
- }
- if (allocated) gpr_free(allocated);
- grpc_exec_ctx_enqueue(exec_ctx, cb, ok, NULL);
+ grpc_error *error = status == 0
+ ? GRPC_ERROR_NONE
+ : GRPC_WSA_ERROR(info->wsa_error, "WSASend");
+ grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
return;
}
@@ -340,7 +334,8 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
TCP_UNREF(tcp, "write");
- grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"),
+ NULL);
return;
}
}
diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h
index a2f58eddd5..86d777235e 100644
--- a/src/core/lib/iomgr/tcp_windows.h
+++ b/src/core/lib/iomgr/tcp_windows.h
@@ -52,6 +52,6 @@
*/
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string);
-int grpc_tcp_prepare_socket(SOCKET sock);
+grpc_error *grpc_tcp_prepare_socket(SOCKET sock);
#endif /* GRPC_CORE_LIB_IOMGR_TCP_WINDOWS_H */
diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer.c
index acb5b26c87..9975fa1671 100644
--- a/src/core/lib/iomgr/timer.c
+++ b/src/core/lib/iomgr/timer.c
@@ -73,7 +73,7 @@ static shard_type *g_shard_queue[NUM_SHARDS];
static bool g_initialized = false;
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
- gpr_timespec *next, int success);
+ gpr_timespec *next, grpc_error *error);
static gpr_timespec compute_min_deadline(shard_type *shard) {
return grpc_timer_heap_is_empty(&shard->heap)
@@ -105,7 +105,8 @@ void grpc_timer_list_init(gpr_timespec now) {
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
int i;
- run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0);
+ run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL,
+ GRPC_ERROR_CREATE("Timer list shutdown"));
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
gpr_mu_destroy(&shard->mu);
@@ -185,13 +186,16 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
if (!g_initialized) {
timer->triggered = 1;
- grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, &timer->closure,
+ GRPC_ERROR_CREATE("Attempt to create timer before initialization"),
+ NULL);
return;
}
if (gpr_time_cmp(deadline, now) <= 0) {
timer->triggered = 1;
- grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
return;
}
@@ -235,10 +239,15 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
}
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
+ if (!g_initialized) {
+ /* must have already been cancelled, also the shard mutex is invalid */
+ return;
+ }
+
shard_type *shard = &g_shards[shard_idx(timer)];
gpr_mu_lock(&shard->mu);
if (!timer->triggered) {
- grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL);
timer->triggered = 1;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
@@ -278,8 +287,8 @@ static int refill_queue(shard_type *shard, gpr_timespec now) {
return !grpc_timer_heap_is_empty(&shard->heap);
}
-/* This pops the next non-cancelled timer with deadline <= now from the queue,
- or returns NULL if there isn't one.
+/* This pops the next non-cancelled timer with deadline <= now from the
+ queue, or returns NULL if there isn't one.
REQUIRES: shard->mu locked */
static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) {
grpc_timer *timer;
@@ -299,12 +308,12 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) {
/* REQUIRES: shard->mu unlocked */
static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
gpr_timespec now, gpr_timespec *new_min_deadline,
- int success) {
+ grpc_error *error) {
size_t n = 0;
grpc_timer *timer;
gpr_mu_lock(&shard->mu);
while ((timer = pop_one(shard, now))) {
- grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_REF(error), NULL);
n++;
}
*new_min_deadline = compute_min_deadline(shard);
@@ -313,7 +322,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
}
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
- gpr_timespec *next, int success) {
+ gpr_timespec *next, grpc_error *error) {
size_t n = 0;
/* TODO(ctiller): verify that there are any timers (atomically) here */
@@ -327,8 +336,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
/* For efficiency, we pop as many available timers as we can from the
shard. This may violate perfect timer deadline ordering, but that
shouldn't be a big deal because we don't make ordering guarantees. */
- n += pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline,
- success);
+ n +=
+ pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error);
/* An grpc_timer_init() on the shard could intervene here, adding a new
timer that is earlier than new_min_deadline. However,
@@ -359,6 +368,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
*next, gpr_time_add(now, gpr_time_from_millis(1, GPR_TIMESPAN)));
}
+ GRPC_ERROR_UNREF(error);
+
return (int)n;
}
@@ -367,5 +378,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
GPR_ASSERT(now.clock_type == g_clock_type);
return run_some_expired_timers(
exec_ctx, now, next,
- gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0);
+ gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0
+ ? GRPC_ERROR_NONE
+ : GRPC_ERROR_CREATE("Shutting down timer system"));
}
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index df6cf956d9..16150687d3 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -81,6 +81,7 @@ typedef struct {
grpc_closure read_closure;
grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
+ grpc_udp_server_orphan_cb orphan_cb;
} server_port;
/* the overall server */
@@ -168,6 +169,10 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
server_port *sp = &s->ports[i];
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
+
+ GPR_ASSERT(sp->orphan_cb);
+ sp->orphan_cb(sp->emfd);
+
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown");
}
@@ -205,6 +210,8 @@ static int prepare_socket(int fd, const struct sockaddr *addr,
size_t addr_len) {
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
+ /* Set send/receive socket buffers to 1 MB */
+ int buffer_size_bytes = 1024 * 1024;
if (fd < 0) {
goto error;
@@ -234,6 +241,18 @@ static int prepare_socket(int fd, const struct sockaddr *addr,
goto error;
}
+ if (!grpc_set_socket_sndbuf(fd, buffer_size_bytes)) {
+ gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes",
+ buf_size_bytes);
+ goto error;
+ }
+
+ if (!grpc_set_socket_rcvbuf(fd, buffer_size_bytes)) {
+ gpr_log(GPR_ERROR, "Failed to set receive buffer size to %d bytes",
+ buf_size_bytes);
+ goto error;
+ }
+
return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
error:
@@ -268,7 +287,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
static int add_socket_to_server(grpc_udp_server *s, int fd,
const struct sockaddr *addr, size_t addr_len,
- grpc_udp_server_read_cb read_cb) {
+ grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb) {
server_port *sp;
int port;
char *addr_str;
@@ -292,6 +312,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
sp->read_cb = read_cb;
+ sp->orphan_cb = orphan_cb;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(name);
@@ -301,7 +322,8 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
}
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
- size_t addr_len, grpc_udp_server_read_cb read_cb) {
+ size_t addr_len, grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb) {
int allocated_port1 = -1;
int allocated_port2 = -1;
unsigned i;
@@ -348,7 +370,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6);
fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
- allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+ allocated_port1 =
+ add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@@ -370,7 +393,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy);
}
- allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+ allocated_port2 =
+ add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
done:
gpr_free(allocated_addr);
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index d8cf957a22..33c5ce11cd 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -48,6 +48,9 @@ typedef struct grpc_udp_server grpc_udp_server;
typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
struct grpc_server *server);
+/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
+typedef void (*grpc_udp_server_orphan_cb)(grpc_fd *emfd);
+
/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void);
@@ -69,7 +72,8 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);
/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
- size_t addr_len, grpc_udp_server_read_cb read_cb);
+ size_t addr_len, grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb);
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server,
grpc_closure *on_done);
diff --git a/src/core/lib/iomgr/unix_sockets_posix.c b/src/core/lib/iomgr/unix_sockets_posix.c
index 5767c852df..0e7670e5a5 100644
--- a/src/core/lib/iomgr/unix_sockets_posix.c
+++ b/src/core/lib/iomgr/unix_sockets_posix.c
@@ -47,17 +47,18 @@ void grpc_create_socketpair_if_unix(int sv[2]) {
GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
}
-grpc_resolved_addresses *grpc_resolve_unix_domain_address(const char *name) {
+grpc_error *grpc_resolve_unix_domain_address(const char *name,
+ grpc_resolved_addresses **addrs) {
struct sockaddr_un *un;
- grpc_resolved_addresses *addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
- addrs->naddrs = 1;
- addrs->addrs = gpr_malloc(sizeof(grpc_resolved_address));
- un = (struct sockaddr_un *)addrs->addrs->addr;
+ *addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
+ (*addrs)->naddrs = 1;
+ (*addrs)->addrs = gpr_malloc(sizeof(grpc_resolved_address));
+ un = (struct sockaddr_un *)(*addrs)->addrs->addr;
un->sun_family = AF_UNIX;
strcpy(un->sun_path, name);
- addrs->addrs->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1;
- return addrs;
+ (*addrs)->addrs->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1;
+ return GRPC_ERROR_NONE;
}
int grpc_is_unix_socket(const struct sockaddr *addr) {
diff --git a/src/core/lib/iomgr/unix_sockets_posix.h b/src/core/lib/iomgr/unix_sockets_posix.h
index 6758c498e5..db0516d945 100644
--- a/src/core/lib/iomgr/unix_sockets_posix.h
+++ b/src/core/lib/iomgr/unix_sockets_posix.h
@@ -43,7 +43,8 @@
void grpc_create_socketpair_if_unix(int sv[2]);
-grpc_resolved_addresses *grpc_resolve_unix_domain_address(const char *name);
+grpc_error *grpc_resolve_unix_domain_address(
+ const char *name, grpc_resolved_addresses **addresses);
int grpc_is_unix_socket(const struct sockaddr *addr);
diff --git a/src/core/lib/iomgr/unix_sockets_posix_noop.c b/src/core/lib/iomgr/unix_sockets_posix_noop.c
index d30952789f..56b47c3daf 100644
--- a/src/core/lib/iomgr/unix_sockets_posix_noop.c
+++ b/src/core/lib/iomgr/unix_sockets_posix_noop.c
@@ -44,8 +44,10 @@ void grpc_create_socketpair_if_unix(int sv[2]) {
GPR_ASSERT(0);
}
-grpc_resolved_addresses *grpc_resolve_unix_domain_address(const char *name) {
- return NULL;
+grpc_error *grpc_resolve_unix_domain_address(
+ const char *name, grpc_resolved_addresses **addresses) {
+ *addresses = NULL;
+ return GRPC_ERROR_CREATE("Unix domain sockets are not supported on Windows");
}
int grpc_is_unix_socket(const struct sockaddr *addr) { return false; }
diff --git a/src/core/lib/iomgr/wakeup_fd_eventfd.c b/src/core/lib/iomgr/wakeup_fd_eventfd.c
index 8a772add13..667b4a5f90 100644
--- a/src/core/lib/iomgr/wakeup_fd_eventfd.c
+++ b/src/core/lib/iomgr/wakeup_fd_eventfd.c
@@ -44,29 +44,39 @@
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
-static void eventfd_create(grpc_wakeup_fd* fd_info) {
+static grpc_error* eventfd_create(grpc_wakeup_fd* fd_info) {
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
- /* TODO(klempner): Handle failure more gracefully */
- GPR_ASSERT(efd >= 0);
+ if (efd < 0) {
+ return GRPC_OS_ERROR(errno, "eventfd");
+ }
fd_info->read_fd = efd;
fd_info->write_fd = -1;
+ return GRPC_ERROR_NONE;
}
-static void eventfd_consume(grpc_wakeup_fd* fd_info) {
+static grpc_error* eventfd_consume(grpc_wakeup_fd* fd_info) {
eventfd_t value;
int err;
do {
err = eventfd_read(fd_info->read_fd, &value);
} while (err < 0 && errno == EINTR);
+ if (err < 0 && errno != EAGAIN) {
+ return GRPC_OS_ERROR(errno, "eventfd_read");
+ }
+ return GRPC_ERROR_NONE;
}
-static void eventfd_wakeup(grpc_wakeup_fd* fd_info) {
+static grpc_error* eventfd_wakeup(grpc_wakeup_fd* fd_info) {
int err;
GPR_TIMER_BEGIN("eventfd_wakeup", 0);
do {
err = eventfd_write(fd_info->read_fd, 1);
} while (err < 0 && errno == EINTR);
+ if (err < 0) {
+ return GRPC_OS_ERROR(errno, "eventfd_write");
+ }
GPR_TIMER_END("eventfd_wakeup", 0);
+ return GRPC_ERROR_NONE;
}
static void eventfd_destroy(grpc_wakeup_fd* fd_info) {
diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.c
index e9b9a0119f..4e5dbdcb73 100644
--- a/src/core/lib/iomgr/wakeup_fd_pipe.c
+++ b/src/core/lib/iomgr/wakeup_fd_pipe.c
@@ -45,7 +45,7 @@
#include "src/core/lib/iomgr/socket_utils_posix.h"
-static void pipe_init(grpc_wakeup_fd* fd_info) {
+static grpc_error* pipe_init(grpc_wakeup_fd* fd_info) {
int pipefd[2];
/* TODO(klempner): Make this nonfatal */
int r = pipe(pipefd);
@@ -53,36 +53,40 @@ static void pipe_init(grpc_wakeup_fd* fd_info) {
gpr_log(GPR_ERROR, "pipe creation failed (%d): %s", errno, strerror(errno));
abort();
}
- GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1));
- GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1));
+ grpc_error* err;
+ err = grpc_set_socket_nonblocking(pipefd[0], 1);
+ if (err != GRPC_ERROR_NONE) return err;
+ err = grpc_set_socket_nonblocking(pipefd[1], 1);
+ if (err != GRPC_ERROR_NONE) return err;
fd_info->read_fd = pipefd[0];
fd_info->write_fd = pipefd[1];
+ return GRPC_ERROR_NONE;
}
-static void pipe_consume(grpc_wakeup_fd* fd_info) {
+static grpc_error* pipe_consume(grpc_wakeup_fd* fd_info) {
char buf[128];
ssize_t r;
for (;;) {
r = read(fd_info->read_fd, buf, sizeof(buf));
if (r > 0) continue;
- if (r == 0) return;
+ if (r == 0) return GRPC_ERROR_NONE;
switch (errno) {
case EAGAIN:
- return;
+ return GRPC_ERROR_NONE;
case EINTR:
continue;
default:
- gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno));
- return;
+ return GRPC_OS_ERROR(errno, "read");
}
}
}
-static void pipe_wakeup(grpc_wakeup_fd* fd_info) {
+static grpc_error* pipe_wakeup(grpc_wakeup_fd* fd_info) {
char c = 0;
while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR)
;
+ return GRPC_ERROR_NONE;
}
static void pipe_destroy(grpc_wakeup_fd* fd_info) {
diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.c
index 525369c356..046208abc8 100644
--- a/src/core/lib/iomgr/wakeup_fd_posix.c
+++ b/src/core/lib/iomgr/wakeup_fd_posix.c
@@ -53,16 +53,16 @@ void grpc_wakeup_fd_global_init(void) {
void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; }
-void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) {
- wakeup_fd_vtable->init(fd_info);
+grpc_error *grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) {
+ return wakeup_fd_vtable->init(fd_info);
}
-void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) {
- wakeup_fd_vtable->consume(fd_info);
+grpc_error *grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) {
+ return wakeup_fd_vtable->consume(fd_info);
}
-void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) {
- wakeup_fd_vtable->wakeup(fd_info);
+grpc_error *grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) {
+ return wakeup_fd_vtable->wakeup(fd_info);
}
void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) {
diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h
index 6b069c1837..e269f242d8 100644
--- a/src/core/lib/iomgr/wakeup_fd_posix.h
+++ b/src/core/lib/iomgr/wakeup_fd_posix.h
@@ -62,6 +62,8 @@
#ifndef GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H
#define GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H
+#include "src/core/lib/iomgr/error.h"
+
void grpc_wakeup_fd_global_init(void);
void grpc_wakeup_fd_global_destroy(void);
@@ -72,9 +74,9 @@ void grpc_wakeup_fd_global_init_force_fallback(void);
typedef struct grpc_wakeup_fd grpc_wakeup_fd;
typedef struct grpc_wakeup_fd_vtable {
- void (*init)(grpc_wakeup_fd* fd_info);
- void (*consume)(grpc_wakeup_fd* fd_info);
- void (*wakeup)(grpc_wakeup_fd* fd_info);
+ grpc_error* (*init)(grpc_wakeup_fd* fd_info);
+ grpc_error* (*consume)(grpc_wakeup_fd* fd_info);
+ grpc_error* (*wakeup)(grpc_wakeup_fd* fd_info);
void (*destroy)(grpc_wakeup_fd* fd_info);
/* Must be called before calling any other functions */
int (*check_availability)(void);
@@ -89,9 +91,10 @@ extern int grpc_allow_specialized_wakeup_fd;
#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)
-void grpc_wakeup_fd_init(grpc_wakeup_fd* fd_info);
-void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd* fd_info);
-void grpc_wakeup_fd_wakeup(grpc_wakeup_fd* fd_info);
+grpc_error* grpc_wakeup_fd_init(grpc_wakeup_fd* fd_info) GRPC_MUST_USE_RESULT;
+grpc_error* grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd* fd_info)
+ GRPC_MUST_USE_RESULT;
+grpc_error* grpc_wakeup_fd_wakeup(grpc_wakeup_fd* fd_info) GRPC_MUST_USE_RESULT;
void grpc_wakeup_fd_destroy(grpc_wakeup_fd* fd_info);
/* Defined in some specialized implementation's .c file, or by
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 3e2b223670..5cc40eea50 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -43,14 +43,15 @@
#include "src/core/lib/iomgr/workqueue_posix.h"
#endif
-#ifdef GPR_WIN32
+#ifdef GPR_WINDOWS
#include "src/core/lib/iomgr/workqueue_windows.h"
#endif
/* grpc_workqueue is forward declared in exec_ctx.h */
/** Create a work queue */
-grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx);
+grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue **workqueue);
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
@@ -77,7 +78,7 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset);
/** Add a work item to a workqueue */
-void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
- int success);
+void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ grpc_closure *closure, grpc_error *error);
#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
index 80e7a0b206..45e0f6063b 100644
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ b/src/core/lib/iomgr/workqueue_posix.c
@@ -45,22 +45,27 @@
#include "src/core/lib/iomgr/ev_posix.h"
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success);
+static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
-grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) {
+grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue **workqueue) {
char name[32];
- grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue));
- gpr_ref_init(&workqueue->refs, 1);
- gpr_mu_init(&workqueue->mu);
- workqueue->closure_list.head = workqueue->closure_list.tail = NULL;
- grpc_wakeup_fd_init(&workqueue->wakeup_fd);
- sprintf(name, "workqueue:%p", (void *)workqueue);
- workqueue->wakeup_read_fd =
- grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name);
- grpc_closure_init(&workqueue->read_closure, on_readable, workqueue);
- grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
- &workqueue->read_closure);
- return workqueue;
+ *workqueue = gpr_malloc(sizeof(grpc_workqueue));
+ gpr_ref_init(&(*workqueue)->refs, 1);
+ gpr_mu_init(&(*workqueue)->mu);
+ (*workqueue)->closure_list.head = (*workqueue)->closure_list.tail = NULL;
+ grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd);
+ if (err != GRPC_ERROR_NONE) {
+ gpr_free(*workqueue);
+ return err;
+ }
+ sprintf(name, "workqueue:%p", (void *)(*workqueue));
+ (*workqueue)->wakeup_read_fd = grpc_fd_create(
+ GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name);
+ grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue);
+ grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd,
+ &(*workqueue)->read_closure);
+ return GRPC_ERROR_NONE;
}
static void workqueue_destroy(grpc_exec_ctx *exec_ctx,
@@ -103,17 +108,14 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
gpr_mu_lock(&workqueue->mu);
- if (grpc_closure_list_empty(workqueue->closure_list)) {
- grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
- }
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
gpr_mu_unlock(&workqueue->mu);
}
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_workqueue *workqueue = arg;
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
gpr_mu_destroy(&workqueue->mu);
/* HACK: let wakeup_fd code know that we stole the fd */
workqueue->wakeup_fd.read_fd = 0;
@@ -123,20 +125,32 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
} else {
gpr_mu_lock(&workqueue->mu);
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
- grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
+ error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
gpr_mu_unlock(&workqueue->mu);
- grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
- &workqueue->read_closure);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
+ &workqueue->read_closure);
+ } else {
+ /* recurse to get error handling */
+ on_readable(exec_ctx, arg, error);
+ }
}
}
-void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
- int success) {
+void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ grpc_closure *closure, grpc_error *error) {
+ grpc_error *push_error = GRPC_ERROR_NONE;
gpr_mu_lock(&workqueue->mu);
if (grpc_closure_list_empty(workqueue->closure_list)) {
- grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
+ push_error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
+ }
+ grpc_closure_list_append(&workqueue->closure_list, closure, error);
+ if (push_error != GRPC_ERROR_NONE) {
+ const char *msg = grpc_error_string(push_error);
+ gpr_log(GPR_ERROR, "Failed to push to workqueue: %s", msg);
+ grpc_error_free_string(msg);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
}
- grpc_closure_list_add(&workqueue->closure_list, closure, success);
gpr_mu_unlock(&workqueue->mu);
}
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
index c3c0446a57..275f040b1c 100644
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ b/src/core/lib/iomgr/workqueue_windows.c
@@ -33,8 +33,8 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_WIN32
+#ifdef GPR_WINDOWS
#include "src/core/lib/iomgr/workqueue.h"
-#endif /* GPR_WIN32 */
+#endif /* GPR_WINDOWS */