diff options
Diffstat (limited to 'src/core/lib/iomgr')
30 files changed, 1247 insertions, 422 deletions
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 27793c32e4..0b6c3b2539 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -39,25 +39,32 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg) { closure->cb = cb; closure->cb_arg = cb_arg; - closure->final_data = 0; } -void grpc_closure_list_add(grpc_closure_list *closure_list, - grpc_closure *closure, bool success) { - if (closure == NULL) return; - closure->final_data = (success != 0); +void grpc_closure_list_append(grpc_closure_list *closure_list, + grpc_closure *closure, grpc_error *error) { + if (closure == NULL) { + GRPC_ERROR_UNREF(error); + return; + } + closure->error = error; + closure->next_data.next = NULL; if (closure_list->head == NULL) { closure_list->head = closure; } else { - closure_list->tail->final_data |= (uintptr_t)closure; + closure_list->tail->next_data.next = closure; } closure_list->tail = closure; } -void grpc_closure_list_fail_all(grpc_closure_list *list) { - for (grpc_closure *c = list->head; c != NULL; c = grpc_closure_next(c)) { - c->final_data &= ~(uintptr_t)1; +void grpc_closure_list_fail_all(grpc_closure_list *list, + grpc_error *forced_failure) { + for (grpc_closure *c = list->head; c != NULL; c = c->next_data.next) { + if (c->error == GRPC_ERROR_NONE) { + c->error = GRPC_ERROR_REF(forced_failure); + } } + GRPC_ERROR_UNREF(forced_failure); } bool grpc_closure_list_empty(grpc_closure_list closure_list) { @@ -71,7 +78,7 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) { if (dst->head == NULL) { *dst = *src; } else { - dst->tail->final_data |= (uintptr_t)src->head; + dst->tail->next_data.next = src->head; dst->tail = src->tail; } src->head = src->tail = NULL; @@ -83,12 +90,13 @@ typedef struct { grpc_closure wrapper; } wrapped_closure; -static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { wrapped_closure *wc = arg; grpc_iomgr_cb_func cb = wc->cb; void *cb_arg = wc->cb_arg; gpr_free(wc); - cb(exec_ctx, cb_arg, success); + cb(exec_ctx, cb_arg, error); } grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) { @@ -98,7 +106,3 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) { grpc_closure_init(&wc->wrapper, closure_wrapper, wc); return &wc->wrapper; } - -grpc_closure *grpc_closure_next(grpc_closure *closure) { - return (grpc_closure *)(closure->final_data & ~(uintptr_t)1); -} diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index fdc2daed9d..344b7d3c3e 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -36,6 +36,7 @@ #include <grpc/support/port_platform.h> #include <stdbool.h> +#include "src/core/lib/iomgr/error.h" struct grpc_closure; typedef struct grpc_closure grpc_closure; @@ -55,7 +56,7 @@ typedef struct grpc_closure_list { * \param success An indication on the state of the iomgr. On false, cleanup * actions should be taken (eg, shutdown). */ typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg, - bool success); + grpc_error *error); /** A closure over a grpc_iomgr_cb_func. */ struct grpc_closure { @@ -65,10 +66,12 @@ struct grpc_closure { /** Arguments to be passed to "cb". */ void *cb_arg; - /** Once enqueued, contains in the lower bit the success of the closure, - and in the upper bits the pointer to the next closure in the list. - Before enqueing for execution, this is usable for scratch data. */ - uintptr_t final_data; + grpc_error *error; + + union { + grpc_closure *next; + uintptr_t scratch; + } next_data; }; /** Initializes \a closure with \a cb and \a cb_arg. */ @@ -83,11 +86,12 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg); /** add \a closure to the end of \a list and set \a closure's success to \a * success */ -void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure, - bool success); +void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure, + grpc_error *error); /** force all success bits in \a list to false */ -void grpc_closure_list_fail_all(grpc_closure_list *list); +void grpc_closure_list_fail_all(grpc_closure_list *list, + grpc_error *forced_failure); /** append all closures from \a src to \a dst and empty \a src. */ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); @@ -95,7 +99,4 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); /** return whether \a list is empty. */ bool grpc_closure_list_empty(grpc_closure_list list); -/** return the next pointer for a queued closure list */ -grpc_closure *grpc_closure_next(grpc_closure *closure); - #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.c index e0ce47c773..e295fb4867 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.c +++ b/src/core/lib/iomgr/endpoint_pair_posix.c @@ -58,8 +58,8 @@ static void create_sockets(int sv[2]) { GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); flags = fcntl(sv[1], F_GETFL, 0); GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); - GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[0])); - GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1])); + GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[0]) == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1]) == GRPC_ERROR_NONE); } grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c new file mode 100644 index 0000000000..7e5d74b495 --- /dev/null +++ b/src/core/lib/iomgr/error.c @@ -0,0 +1,492 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/error.h" + +#include <stdbool.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/avl.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + +static void destroy_integer(void *key) {} + +static void *copy_integer(void *key) { return key; } + +static long compare_integers(void *key1, void *key2) { + return GPR_ICMP((uintptr_t)key1, (uintptr_t)key2); +} + +static void destroy_string(void *str) { gpr_free(str); } + +static void *copy_string(void *str) { return gpr_strdup(str); } + +static void destroy_err(void *err) { GRPC_ERROR_UNREF(err); } + +static void *copy_err(void *err) { return GRPC_ERROR_REF(err); } + +static void destroy_time(void *tm) { gpr_free(tm); } + +static gpr_timespec *box_time(gpr_timespec tm) { + gpr_timespec *out = gpr_malloc(sizeof(*out)); + *out = tm; + return out; +} + +static void *copy_time(void *tm) { return box_time(*(gpr_timespec *)tm); } + +static const gpr_avl_vtable avl_vtable_ints = {destroy_integer, copy_integer, + compare_integers, + destroy_integer, copy_integer}; + +static const gpr_avl_vtable avl_vtable_strs = {destroy_integer, copy_integer, + compare_integers, destroy_string, + copy_string}; + +static const gpr_avl_vtable avl_vtable_times = { + destroy_integer, copy_integer, compare_integers, destroy_time, copy_time}; + +static const gpr_avl_vtable avl_vtable_errs = { + destroy_integer, copy_integer, compare_integers, destroy_err, copy_err}; + +static const char *error_int_name(grpc_error_ints key) { + switch (key) { + case GRPC_ERROR_INT_STATUS_CODE: + return "status_code"; + case GRPC_ERROR_INT_ERRNO: + return "errno"; + case GRPC_ERROR_INT_FILE_LINE: + return "file_line"; + case GRPC_ERROR_INT_WARNING: + return "warning"; + case GRPC_ERROR_INT_STREAM_ID: + return "stream_id"; + case GRPC_ERROR_INT_GRPC_STATUS: + return "grpc_status"; + case GRPC_ERROR_INT_OFFSET: + return "offset"; + case GRPC_ERROR_INT_INDEX: + return "index"; + case GRPC_ERROR_INT_SIZE: + return "size"; + case GRPC_ERROR_INT_HTTP2_ERROR: + return "http2_error"; + case GRPC_ERROR_INT_TSI_CODE: + return "tsi_code"; + case GRPC_ERROR_INT_SECURITY_STATUS: + return "security_status"; + case GRPC_ERROR_INT_FD: + return "fd"; + } + GPR_UNREACHABLE_CODE(return "unknown"); +} + +static const char *error_str_name(grpc_error_strs key) { + switch (key) { + case GRPC_ERROR_STR_DESCRIPTION: + return "description"; + case GRPC_ERROR_STR_OS_ERROR: + return "os_error"; + case GRPC_ERROR_STR_TARGET_ADDRESS: + return "target_address"; + case GRPC_ERROR_STR_SYSCALL: + return "syscall"; + case GRPC_ERROR_STR_FILE: + return "file"; + case GRPC_ERROR_STR_GRPC_MESSAGE: + return "grpc_message"; + case GRPC_ERROR_STR_RAW_BYTES: + return "raw_bytes"; + case GRPC_ERROR_STR_TSI_ERROR: + return "tsi_error"; + } + GPR_UNREACHABLE_CODE(return "unknown"); +} + +static const char *error_time_name(grpc_error_times key) { + switch (key) { + case GRPC_ERROR_TIME_CREATED: + return "created"; + } + GPR_UNREACHABLE_CODE(return "unknown"); +} + +struct grpc_error { + gpr_refcount refs; + gpr_avl ints; + gpr_avl strs; + gpr_avl times; + gpr_avl errs; + uintptr_t next_err; +}; + +static bool is_special(grpc_error *err) { + return err == GRPC_ERROR_NONE || err == GRPC_ERROR_OOM || + err == GRPC_ERROR_CANCELLED; +} + +grpc_error *grpc_error_ref(grpc_error *err, const char *file, int line, + const char *func) { + if (is_special(err)) return err; + gpr_log(GPR_DEBUG, "%p: %d -> %d [%s:%d %s]", err, err->refs.count, + err->refs.count + 1, file, line, func); + gpr_ref(&err->refs); + return err; +} + +static void error_destroy(grpc_error *err) { + GPR_ASSERT(!is_special(err)); + gpr_avl_unref(err->ints); + gpr_avl_unref(err->strs); + gpr_avl_unref(err->errs); + gpr_avl_unref(err->times); + gpr_free(err); +} + +void grpc_error_unref(grpc_error *err, const char *file, int line, + const char *func) { + if (is_special(err)) return; + gpr_log(GPR_DEBUG, "%p: %d -> %d [%s:%d %s]", err, err->refs.count, + err->refs.count - 1, file, line, func); + if (gpr_unref(&err->refs)) { + error_destroy(err); + } +} + +grpc_error *grpc_error_create(const char *file, int line, const char *desc, + grpc_error **referencing, + size_t num_referencing) { + grpc_error *err = gpr_malloc(sizeof(*err)); + if (err == NULL) { // TODO(ctiller): make gpr_malloc return NULL + return GRPC_ERROR_OOM; + } + gpr_log(GPR_DEBUG, "%p create [%s:%d]", err, file, line); + err->ints = gpr_avl_add(gpr_avl_create(&avl_vtable_ints), + (void *)(uintptr_t)GRPC_ERROR_INT_FILE_LINE, + (void *)(uintptr_t)line); + err->strs = gpr_avl_add( + gpr_avl_add(gpr_avl_create(&avl_vtable_strs), + (void *)(uintptr_t)GRPC_ERROR_STR_FILE, gpr_strdup(file)), + (void *)(uintptr_t)GRPC_ERROR_STR_DESCRIPTION, gpr_strdup(desc)); + err->errs = gpr_avl_create(&avl_vtable_errs); + for (size_t i = 0; i < num_referencing; i++) { + if (referencing[i] == GRPC_ERROR_NONE) continue; + err->errs = gpr_avl_add(err->errs, (void *)(err->next_err++), + GRPC_ERROR_REF(referencing[i])); + } + err->times = gpr_avl_add(gpr_avl_create(&avl_vtable_times), + (void *)(uintptr_t)GRPC_ERROR_TIME_CREATED, + box_time(gpr_now(GPR_CLOCK_REALTIME))); + err->next_err = 0; + gpr_ref_init(&err->refs, 1); + return err; +} + +static grpc_error *copy_error_and_unref(grpc_error *in) { + if (is_special(in)) { + if (in == GRPC_ERROR_NONE) return GRPC_ERROR_CREATE("no error"); + if (in == GRPC_ERROR_OOM) return GRPC_ERROR_CREATE("oom"); + if (in == GRPC_ERROR_CANCELLED) return GRPC_ERROR_CREATE("cancelled"); + return GRPC_ERROR_CREATE("unknown"); + } + grpc_error *out = gpr_malloc(sizeof(*out)); + gpr_log(GPR_DEBUG, "%p create copying", out); + out->ints = gpr_avl_ref(in->ints); + out->strs = gpr_avl_ref(in->strs); + out->errs = gpr_avl_ref(in->errs); + out->times = gpr_avl_ref(in->times); + out->next_err = in->next_err; + gpr_ref_init(&out->refs, 1); + GRPC_ERROR_UNREF(in); + return out; +} + +grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, + intptr_t value) { + grpc_error *new = copy_error_and_unref(src); + new->ints = gpr_avl_add(new->ints, (void *)(uintptr_t)which, (void *)value); + return new; +} + +bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) { + void *pp; + if (gpr_avl_maybe_get(err->ints, (void *)(uintptr_t)which, &pp)) { + if (p != NULL) *p = (intptr_t)pp; + return true; + } + return false; +} + +grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, + const char *value) { + grpc_error *new = copy_error_and_unref(src); + new->strs = + gpr_avl_add(new->strs, (void *)(uintptr_t)which, gpr_strdup(value)); + return new; +} + +grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) { + grpc_error *new = copy_error_and_unref(src); + new->errs = gpr_avl_add(new->errs, (void *)(new->next_err++), child); + return new; +} + +static const char *no_error_string = "null"; +static const char *oom_error_string = "\"Out of memory\""; +static const char *cancelled_error_string = "\"Cancelled\""; + +typedef struct { + char *key; + char *value; +} kv_pair; + +typedef struct { + kv_pair *kvs; + size_t num_kvs; + size_t cap_kvs; +} kv_pairs; + +static void append_kv(kv_pairs *kvs, char *key, char *value) { + if (kvs->num_kvs == kvs->cap_kvs) { + kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4); + kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs); + } + kvs->kvs[kvs->num_kvs].key = key; + kvs->kvs[kvs->num_kvs].value = value; + kvs->num_kvs++; +} + +static void collect_kvs(gpr_avl_node *node, char *key(void *k), + char *fmt(void *v), kv_pairs *kvs) { + if (node == NULL) return; + append_kv(kvs, key(node->key), fmt(node->value)); + collect_kvs(node->left, key, fmt, kvs); + collect_kvs(node->right, key, fmt, kvs); +} + +static char *key_int(void *p) { + return gpr_strdup(error_int_name((grpc_error_ints)(uintptr_t)p)); +} + +static char *key_str(void *p) { + return gpr_strdup(error_str_name((grpc_error_strs)(uintptr_t)p)); +} + +static char *key_time(void *p) { + return gpr_strdup(error_time_name((grpc_error_times)(uintptr_t)p)); +} + +static char *fmt_int(void *p) { + char *s; + gpr_asprintf(&s, "%lld", (intptr_t)p); + return s; +} + +static void append_chr(char c, char **s, size_t *sz, size_t *cap) { + if (*sz == *cap) { + *cap = GPR_MAX(8, 3 * *cap / 2); + *s = gpr_realloc(*s, *cap); + } + (*s)[(*sz)++] = c; +} + +static void append_str(const char *str, char **s, size_t *sz, size_t *cap) { + for (const char *c = str; *c; c++) { + append_chr(*c, s, sz, cap); + } +} + +static void append_esc_str(const char *str, char **s, size_t *sz, size_t *cap) { + static const char *hex = "0123456789abcdef"; + append_chr('"', s, sz, cap); + for (const uint8_t *c = (const uint8_t *)str; *c; c++) { + if (*c < 32 || *c >= 127) { + append_chr('\\', s, sz, cap); + switch (*c) { + case '\b': + append_chr('b', s, sz, cap); + break; + case '\f': + append_chr('f', s, sz, cap); + break; + case '\n': + append_chr('n', s, sz, cap); + break; + case '\r': + append_chr('r', s, sz, cap); + break; + case '\t': + append_chr('t', s, sz, cap); + break; + default: + append_chr('u', s, sz, cap); + append_chr('0', s, sz, cap); + append_chr('0', s, sz, cap); + append_chr(hex[*c >> 4], s, sz, cap); + append_chr(hex[*c & 0x0f], s, sz, cap); + break; + } + } else { + append_chr((char)*c, s, sz, cap); + } + } + append_chr('"', s, sz, cap); +} + +static char *fmt_str(void *p) { + char *s = NULL; + size_t sz = 0; + size_t cap = 0; + append_esc_str(p, &s, &sz, &cap); + append_chr(0, &s, &sz, &cap); + return s; +} + +static char *fmt_time(void *p) { + gpr_timespec tm = *(gpr_timespec *)p; + char *out; + char *pfx = "!!"; + switch (tm.clock_type) { + case GPR_CLOCK_MONOTONIC: + pfx = "@monotonic:"; + break; + case GPR_CLOCK_REALTIME: + pfx = "@"; + break; + case GPR_CLOCK_PRECISE: + pfx = "@precise:"; + break; + case GPR_TIMESPAN: + pfx = ""; + break; + } + gpr_asprintf(&out, "\"%s%d.%09d\"", pfx, tm.tv_sec, tm.tv_nsec); + return out; +} + +static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap) { + if (n == NULL) return; + add_errs(n->left, s, sz, cap); + const char *e = grpc_error_string(n->value); + append_str(e, s, sz, cap); + grpc_error_free_string(e); + add_errs(n->right, s, sz, cap); +} + +static char *errs_string(grpc_error *err) { + char *s = NULL; + size_t sz = 0; + size_t cap = 0; + append_chr('[', &s, &sz, &cap); + add_errs(err->errs.root, &s, &sz, &cap); + append_chr(']', &s, &sz, &cap); + append_chr(0, &s, &sz, &cap); + return s; +} + +static int cmp_kvs(const void *a, const void *b) { + const kv_pair *ka = a; + const kv_pair *kb = b; + return strcmp(ka->key, kb->key); +} + +static const char *finish_kvs(kv_pairs *kvs) { + char *s = NULL; + size_t sz = 0; + size_t cap = 0; + + append_chr('{', &s, &sz, &cap); + for (size_t i = 0; i < kvs->num_kvs; i++) { + if (i != 0) append_chr(',', &s, &sz, &cap); + append_esc_str(kvs->kvs[i].key, &s, &sz, &cap); + gpr_free(kvs->kvs[i].key); + append_chr(':', &s, &sz, &cap); + append_str(kvs->kvs[i].value, &s, &sz, &cap); + gpr_free(kvs->kvs[i].value); + } + append_chr('}', &s, &sz, &cap); + append_chr(0, &s, &sz, &cap); + + gpr_free(kvs->kvs); + return s; +} + +void grpc_error_free_string(const char *str) { + if (str == no_error_string) return; + if (str == oom_error_string) return; + if (str == cancelled_error_string) return; + gpr_free((char *)str); +} + +const char *grpc_error_string(grpc_error *err) { + if (err == GRPC_ERROR_NONE) return no_error_string; + if (err == GRPC_ERROR_OOM) return oom_error_string; + if (err == GRPC_ERROR_CANCELLED) return cancelled_error_string; + + kv_pairs kvs; + memset(&kvs, 0, sizeof(kvs)); + + collect_kvs(err->ints.root, key_int, fmt_int, &kvs); + collect_kvs(err->strs.root, key_str, fmt_str, &kvs); + collect_kvs(err->times.root, key_time, fmt_time, &kvs); + if (!gpr_avl_is_empty(err->errs)) { + append_kv(&kvs, gpr_strdup("referenced_errors"), errs_string(err)); + } + + qsort(kvs.kvs, kvs.num_kvs, sizeof(kv_pair), cmp_kvs); + + return finish_kvs(&kvs); +} + +grpc_error *grpc_os_error(const char *file, int line, int err, + const char *call_name) { + return grpc_error_set_str( + grpc_error_set_str( + grpc_error_set_int(grpc_error_create(file, line, "OS Error", NULL, 0), + GRPC_ERROR_INT_ERRNO, err), + GRPC_ERROR_STR_OS_ERROR, strerror(err)), + GRPC_ERROR_STR_SYSCALL, call_name); +} + +bool grpc_log_if_error(const char *what, grpc_error *error, const char *file, + int line) { + if (error == GRPC_ERROR_NONE) return true; + const char *msg = grpc_error_string(error); + gpr_log(file, line, GPR_LOG_SEVERITY_ERROR, "%s: %s", what, msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(error); + return false; +} diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h new file mode 100644 index 0000000000..0cc466e8c4 --- /dev/null +++ b/src/core/lib/iomgr/error.h @@ -0,0 +1,129 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_ERROR_H +#define GRPC_CORE_LIB_IOMGR_ERROR_H + +#include <stdbool.h> +#include <stdint.h> + +#include <grpc/support/time.h> + +// Opaque representation of an error. +// Errors are refcounted objects that represent the result of an operation. +// Ownership laws: +// if a grpc_error is returned by a function, the caller owns a ref to that +// instance +// if a grpc_error is passed to a grpc_closure callback function (functions +// with the signature: +// void (*f)(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error)) +// then those functions do not automatically own a ref to error +// if a grpc_error is passed to *ANY OTHER FUNCTION* then that function takes +// ownership of the error +typedef struct grpc_error grpc_error; + +typedef enum { + GRPC_ERROR_INT_ERRNO, + GRPC_ERROR_INT_FILE_LINE, + GRPC_ERROR_INT_STATUS_CODE, + GRPC_ERROR_INT_WARNING, + GRPC_ERROR_INT_STREAM_ID, + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_ERROR_INT_OFFSET, + GRPC_ERROR_INT_INDEX, + GRPC_ERROR_INT_SIZE, + GRPC_ERROR_INT_HTTP2_ERROR, + GRPC_ERROR_INT_TSI_CODE, + GRPC_ERROR_INT_SECURITY_STATUS, + GRPC_ERROR_INT_FD, +} grpc_error_ints; + +typedef enum { + GRPC_ERROR_STR_DESCRIPTION, + GRPC_ERROR_STR_FILE, + GRPC_ERROR_STR_OS_ERROR, + GRPC_ERROR_STR_SYSCALL, + GRPC_ERROR_STR_TARGET_ADDRESS, + GRPC_ERROR_STR_GRPC_MESSAGE, + GRPC_ERROR_STR_RAW_BYTES, + GRPC_ERROR_STR_TSI_ERROR, +} grpc_error_strs; + +typedef enum { + GRPC_ERROR_TIME_CREATED, +} grpc_error_times; + +#define GRPC_ERROR_NONE ((grpc_error *)NULL) +#define GRPC_ERROR_OOM ((grpc_error *)1) +#define GRPC_ERROR_CANCELLED ((grpc_error *)2) + +const char *grpc_error_string(grpc_error *error); +void grpc_error_free_string(const char *str); + +grpc_error *grpc_error_create(const char *file, int line, const char *desc, + grpc_error **referencing, size_t num_referencing); +#define GRPC_ERROR_CREATE(desc) \ + grpc_error_create(__FILE__, __LINE__, desc, NULL, 0) + +// Create an error that references some other errors. This function adds a +// reference to each error in errs - it does not consume an existing reference +#define GRPC_ERROR_CREATE_REFERENCING(desc, errs, count) \ + grpc_error_create(__FILE__, __LINE__, desc, errs, count) + +grpc_error *grpc_error_ref(grpc_error *err, const char *file, int line, + const char *func); +void grpc_error_unref(grpc_error *err, const char *file, int line, + const char *func); +#define GRPC_ERROR_REF(err) grpc_error_ref(err, __FILE__, __LINE__, __func__) +#define GRPC_ERROR_UNREF(err) \ + grpc_error_unref(err, __FILE__, __LINE__, __func__) + +grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, + intptr_t value); +bool grpc_error_get_int(grpc_error *error, grpc_error_ints which, intptr_t *p); +grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which, + gpr_timespec value); +grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, + const char *value); +grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child); +grpc_error *grpc_os_error(const char *file, int line, int err, + const char *call_name); +#define GRPC_OS_ERROR(err, call_name) \ + grpc_os_error(__FILE__, __LINE__, err, call_name) + +bool grpc_log_if_error(const char *what, grpc_error *error, const char *file, + int line); +#define GRPC_LOG_IF_ERROR(what, error) \ + grpc_log_if_error((what), (error), __FILE__, __LINE__) + +#endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */ diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 3c8127e1a8..836db6b3ce 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -217,9 +217,10 @@ struct grpc_pollset { struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd, int and_unlock_pollset); - void (*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now); + grpc_error *(*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now); void (*finish_shutdown)(grpc_pollset *pollset); void (*destroy)(grpc_pollset *pollset); }; @@ -247,9 +248,9 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 /* As per pollset_kick, with an extended set of flags (defined above) -- mostly for fd_posix's use. */ -static void pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags); +static grpc_error *pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) GRPC_MUST_USE_RESULT; /* turn a pollset into a multipoller: platform specific */ typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, @@ -415,12 +416,13 @@ static bool fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -static void pollset_kick_locked(grpc_fd_watcher *watcher) { +static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) { gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); - pollset_kick_ext(watcher->pollset, watcher->worker, - GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); + grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker, + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); gpr_mu_unlock(&watcher->pollset->mu); + return err; } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { @@ -459,7 +461,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { } else { remove_fd_from_all_epoll_sets(fd->fd); } - grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL); } static int fd_wrapped_fd(grpc_fd *fd) { @@ -508,6 +510,14 @@ static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); } static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif +static grpc_error *fd_shutdown_error(bool shutdown) { + if (!shutdown) { + return GRPC_ERROR_NONE; + } else { + return GRPC_ERROR_CREATE("FD shutdown"); + } +} + static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { if (*st == CLOSURE_NOT_READY) { @@ -516,7 +526,8 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL); + grpc_exec_ctx_push(exec_ctx, closure, fd_shutdown_error(fd->shutdown), + NULL); maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ @@ -539,7 +550,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL); + grpc_exec_ctx_push(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL); *st = CLOSURE_NOT_READY; return 1; } @@ -717,10 +728,19 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } -static void pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags) { +static void kick_append_error(grpc_error **composite, grpc_error *error) { + if (error == GRPC_ERROR_NONE) return; + if (*composite == GRPC_ERROR_NONE) { + *composite = GRPC_ERROR_CREATE("Kick Failure"); + } + *composite = grpc_error_add_child(*composite, error); +} + +static grpc_error *pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) { GPR_TIMER_BEGIN("pollset_kick_ext", 0); + grpc_error *error = GRPC_ERROR_NONE; /* pollset->mu already held */ if (specific_worker != NULL) { @@ -730,7 +750,8 @@ static void pollset_kick_ext(grpc_pollset *p, for (specific_worker = p->root_worker.next; specific_worker != &p->root_worker; specific_worker = specific_worker->next) { - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error( + &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } p->kicked_without_pollers = 1; GPR_TIMER_END("pollset_kick_ext.broadcast", 0); @@ -741,14 +762,16 @@ static void pollset_kick_ext(grpc_pollset *p, specific_worker->reevaluate_polling_on_wakeup = 1; } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error(&error, + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { GPR_TIMER_MARK("kick_yoself", 0); if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { specific_worker->reevaluate_polling_on_wakeup = 1; } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error(&error, + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); @@ -769,7 +792,8 @@ static void pollset_kick_ext(grpc_pollset *p, if (specific_worker != NULL) { GPR_TIMER_MARK("finally_kick", 0); push_back_worker(p, specific_worker); - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error( + &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } } else { GPR_TIMER_MARK("kicked_no_pollers", 0); @@ -778,20 +802,21 @@ static void pollset_kick_ext(grpc_pollset *p, } GPR_TIMER_END("pollset_kick_ext", 0); + return error; } -static void pollset_kick(grpc_pollset *p, - grpc_pollset_worker *specific_worker) { - pollset_kick_ext(p, specific_worker, 0); +static grpc_error *pollset_kick(grpc_pollset *p, + grpc_pollset_worker *specific_worker) { + return pollset_kick_ext(p, specific_worker, 0); } /* global state management */ -static void pollset_global_init(void) { +static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); gpr_tls_init(&g_current_thread_worker); grpc_wakeup_fd_global_init(); - grpc_wakeup_fd_init(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_init(&grpc_global_wakeup_fd); } static void pollset_global_shutdown(void) { @@ -801,7 +826,9 @@ static void pollset_global_shutdown(void) { grpc_wakeup_fd_global_destroy(); } -static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } +static grpc_error *kick_poller(void) { + return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); +} /* main interface */ @@ -864,14 +891,15 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); pollset->vtable->finish_shutdown(pollset); - grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); + grpc_exec_ctx_push(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } -static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker_hdl, gpr_timespec now, - gpr_timespec deadline) { +static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker_hdl, + gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; *worker_hdl = &worker; + grpc_error *error = GRPC_ERROR_NONE; /* pollset->mu already held */ int added_worker = 0; @@ -887,7 +915,10 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->local_wakeup_cache = worker.wakeup_fd->next; } else { worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); - grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + if (error != GRPC_ERROR_NONE) { + return error; + } } worker.kicked_specifically = 0; /* If there's work waiting for the pollset to be idle, and the @@ -924,8 +955,8 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, - deadline, now); + error = pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, + deadline, now); GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; gpr_tls_set(&g_current_thread_poller, 0); @@ -946,7 +977,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* If we're forced to re-evaluate polling (via pollset_kick with GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force a loop */ - if (worker.reevaluate_polling_on_wakeup) { + if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) { worker.reevaluate_polling_on_wakeup = 0; pollset->kicked_without_pollers = 0; if (queued_work || worker.kicked_specifically) { @@ -987,6 +1018,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } *worker_hdl = NULL; GPR_TIMER_END("pollset_work", 0); + return error; } static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -1035,7 +1067,7 @@ typedef struct grpc_unary_promote_args { } grpc_unary_promote_args; static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, - bool success) { + grpc_error *error) { grpc_unary_promote_args *up_args = args; const grpc_pollset_vtable *original_vtable = up_args->original_vtable; grpc_pollset *pollset = up_args->pollset; @@ -1137,7 +1169,8 @@ static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, up_args->promotion_closure.cb = basic_do_promote; up_args->promotion_closure.cb_arg = up_args; - grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); + grpc_closure_list_append(&pollset->idle_jobs, &up_args->promotion_closure, + GRPC_ERROR_NONE); pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); exit: @@ -1146,11 +1179,19 @@ exit: } } -static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, - gpr_timespec now) { +static void work_combine_error(grpc_error **composite, grpc_error *error) { + if (error == GRPC_ERROR_NONE) return; + if (*composite == GRPC_ERROR_NONE) { + *composite = GRPC_ERROR_CREATE("pollset_work"); + } + *composite = grpc_error_add_child(*composite, error); +} + +static grpc_error *basic_pollset_maybe_work_and_unlock( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now) { + grpc_error *error = GRPC_ERROR_NONE; + #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) @@ -1200,7 +1241,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, if (r < 0) { if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); } if (fd) { fd_end_poll(exec_ctx, &fd_watcher, 0, 0); @@ -1211,10 +1252,12 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, } } else { if (pfd[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); } if (pfd[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } if (nfds > 2) { fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, @@ -1227,6 +1270,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, if (fd) { GRPC_FD_UNREF(fd, "basicpoll_begin"); } + + return error; } static void basic_pollset_destroy(grpc_pollset *pollset) { @@ -1287,9 +1332,11 @@ exit: } } -static void multipoll_with_poll_pollset_maybe_work_and_unlock( +static grpc_error *multipoll_with_poll_pollset_maybe_work_and_unlock( grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { + grpc_error *error = GRPC_ERROR_NONE; + #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) @@ -1364,10 +1411,12 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( } } else { if (pfds[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); } if (pfds[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } for (i = 2; i < pfd_count; i++) { if (watchers[i].fd == NULL) { @@ -1381,6 +1430,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( gpr_free(pfds); gpr_free(watchers); + return error; } static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) { @@ -1560,7 +1610,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_status) { + grpc_error *error) { delayed_add *da = arg; if (!fd_is_orphaned(da->fd)) { @@ -1573,7 +1623,8 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, /* We don't care about this pollset anymore. */ if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) { da->pollset->called_shutdown = 1; - grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL); + grpc_exec_ctx_push(exec_ctx, da->pollset->shutdown_done, GRPC_ERROR_NONE, + NULL); } } gpr_mu_unlock(&da->pollset->mu); @@ -1597,14 +1648,14 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, GRPC_FD_REF(fd, "delayed_add"); grpc_closure_init(&da->closure, perform_delayed_add, da); pollset->in_flight_cbs++; - grpc_exec_ctx_enqueue(exec_ctx, &da->closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, &da->closure, GRPC_ERROR_NONE, NULL); } } /* TODO(klempner): We probably want to turn this down a bit */ #define GRPC_EPOLL_MAX_EVENTS 1000 -static void multipoll_with_epoll_pollset_maybe_work_and_unlock( +static grpc_error *multipoll_with_epoll_pollset_maybe_work_and_unlock( grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; @@ -1613,6 +1664,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( epoll_hdr *h = pollset->data.ptr; int timeout_ms; struct pollfd pfds[2]; + grpc_error *error = GRPC_ERROR_NONE; /* If you want to ignore epoll's ability to sanely handle parallel pollers, * for a more apples-to-apples performance comparison with poll, add a @@ -1647,7 +1699,8 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( /* do nothing */ } else { if (pfds[0].revents) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } if (pfds[1].revents) { do { @@ -1655,7 +1708,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); if (ep_rv < 0) { if (errno != EINTR) { - gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno)); + work_combine_error(&error, GRPC_OS_ERROR(errno, "epoll_wait")); } } else { int i; @@ -1667,7 +1720,8 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); int write_ev = ep_ev[i].events & EPOLLOUT; if (fd == NULL) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + work_combine_error(&error, grpc_wakeup_fd_consume_wakeup( + &grpc_global_wakeup_fd)); } else { if (read_ev || cancel) { fd_become_readable(exec_ctx, fd); @@ -1681,6 +1735,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); } } + return error; } static void multipoll_with_epoll_pollset_finish_shutdown( @@ -1923,14 +1978,23 @@ static const grpc_event_engine_vtable vtable = { }; const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void) { + const char *msg; + grpc_error *err = GRPC_ERROR_NONE; #ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL platform_become_multipoller = epoll_become_multipoller; #else platform_become_multipoller = poll_become_multipoller; #endif fd_global_init(); - pollset_global_init(); + err = pollset_global_init(); + if (err != GRPC_ERROR_NONE) goto error; return &vtable; + +error: + msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "%s", msg); + grpc_error_free_string(msg); + return NULL; } #endif diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 7df1751352..3b965c8b8c 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -101,15 +101,15 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { g_event_engine->pollset_destroy(pollset); } -void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) { - g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); +grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline) { + return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); } -void grpc_pollset_kick(grpc_pollset *pollset, - grpc_pollset_worker *specific_worker) { - g_event_engine->pollset_kick(pollset, specific_worker); +grpc_error *grpc_pollset_kick(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker) { + return g_event_engine->pollset_kick(pollset, specific_worker); } void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -159,6 +159,6 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, g_event_engine->pollset_set_del_fd(exec_ctx, pollset_set, fd); } -void grpc_kick_poller(void) { g_event_engine->kick_poller(); } +grpc_error *grpc_kick_poller(void) { return g_event_engine->kick_poller(); } #endif // GPR_POSIX_SOCKET diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1fa9f5ef2d..c9f9fd9957 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -61,11 +61,11 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*pollset_reset)(grpc_pollset *pollset); void (*pollset_destroy)(grpc_pollset *pollset); - void (*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); - void (*pollset_kick)(grpc_pollset *pollset, - grpc_pollset_worker *specific_worker); + grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline); + grpc_error *(*pollset_kick)(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker); void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd); @@ -88,7 +88,7 @@ typedef struct grpc_event_engine_vtable { void (*pollset_set_del_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd); - void (*kick_poller)(void); + grpc_error *(*kick_poller)(void); void (*shutdown_engine)(void); } grpc_event_engine_vtable; diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 2146c7dd1f..3155ed066a 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -47,11 +47,12 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { grpc_closure *c = exec_ctx->closure_list.head; exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; while (c != NULL) { - bool success = (bool)(c->final_data & 1); - grpc_closure *next = (grpc_closure *)(c->final_data & ~(uintptr_t)1); + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error; did_something = true; GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0); - c->cb(exec_ctx, c->cb_arg, success); + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0); c = next; } @@ -64,11 +65,11 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { grpc_exec_ctx_flush(exec_ctx); } -void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - bool success, - grpc_workqueue *offload_target_or_null) { +void grpc_exec_ctx_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error, + grpc_workqueue *offload_target_or_null) { GPR_ASSERT(offload_target_or_null == NULL); - grpc_closure_list_add(&exec_ctx->closure_list, closure, success); + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 976cc40347..a0dd1e5017 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -83,9 +83,9 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); /** Add a closure to be executed at the next flush/finish point */ -void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - bool success, - grpc_workqueue *offload_target_or_null); +void grpc_exec_ctx_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error, + grpc_workqueue *offload_target_or_null); /** Add a list of closures to be executed at the next flush/finish point. * Leaves \a list empty. */ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 36e22e4271..8d7535d6fe 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -112,10 +112,10 @@ static void maybe_spawn_locked() { g_executor.pending_join = 1; } -void grpc_executor_enqueue(grpc_closure *closure, bool success) { +void grpc_executor_push(grpc_closure *closure, grpc_error *error) { gpr_mu_lock(&g_executor.mu); if (g_executor.shutting_down == 0) { - grpc_closure_list_add(&g_executor.closures, closure, success); + grpc_closure_list_append(&g_executor.closures, closure, error); maybe_spawn_locked(); } gpr_mu_unlock(&g_executor.mu); diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index b7e6f51aa5..da9dcd07d0 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -45,7 +45,7 @@ void grpc_executor_init(); /** Enqueue \a closure for its eventual execution of \a f(arg) on a separate * thread */ -void grpc_executor_enqueue(grpc_closure *closure, bool success); +void grpc_executor_push(grpc_closure *closure, grpc_error *error); /** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(); diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index c40a474877..8d9edc8406 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -81,14 +81,15 @@ void grpc_pollset_destroy(grpc_pollset *pollset); May call grpc_closure_list_run on grpc_closure_list, without holding the pollset lock */ -void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); +grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline) GRPC_MUST_USE_RESULT; /* Break one polling thread out of polling work for this pollset. If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers. Otherwise, if specific_worker is non-NULL, then kick that worker. */ -void grpc_pollset_kick(grpc_pollset *pollset, - grpc_pollset_worker *specific_worker); +grpc_error *grpc_pollset_kick(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker) + GRPC_MUST_USE_RESULT; #endif /* GRPC_CORE_LIB_IOMGR_POLLSET_H */ diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index ef198fe0f6..ddbe375755 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -50,24 +50,20 @@ typedef struct { grpc_resolved_address *addrs; } grpc_resolved_addresses; -/* Async result callback: - On success: addresses is the result, and the callee must call - grpc_resolved_addresses_destroy when it's done with them - On failure: addresses is NULL */ -typedef void (*grpc_resolve_cb)(grpc_exec_ctx *exec_ctx, void *arg, - grpc_resolved_addresses *addresses); /* Asynchronously resolve addr. Use default_port if a port isn't designated in addr, otherwise use the port in addr. */ /* TODO(ctiller): add a timeout here */ extern void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *addr, const char *default_port, - grpc_resolve_cb cb, void *arg); + grpc_closure *on_done, + grpc_resolved_addresses **addresses); /* Destroy resolved addresses */ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses); /* Resolve addr in a blocking fashion. Returns NULL on failure. On success, result must be freed with grpc_resolved_addresses_destroy. */ -extern grpc_resolved_addresses *(*grpc_blocking_resolve_address)( - const char *name, const char *default_port); +extern grpc_error *(*grpc_blocking_resolve_address)( + const char *name, const char *default_port, + grpc_resolved_addresses **addresses); #endif /* GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H */ diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index cae91eec20..9a320dc952 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -54,38 +54,33 @@ #include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" -typedef struct { - char *name; - char *default_port; - grpc_resolve_cb cb; - grpc_closure request_closure; - void *arg; -} request; - -static grpc_resolved_addresses *blocking_resolve_address_impl( - const char *name, const char *default_port) { +static grpc_error *blocking_resolve_address_impl( + const char *name, const char *default_port, + grpc_resolved_addresses **addresses) { struct addrinfo hints; struct addrinfo *result = NULL, *resp; char *host; char *port; int s; size_t i; - grpc_resolved_addresses *addrs = NULL; + grpc_error *err; if (name[0] == 'u' && name[1] == 'n' && name[2] == 'i' && name[3] == 'x' && name[4] == ':' && name[5] != 0) { - return grpc_resolve_unix_domain_address(name + 5); + return grpc_resolve_unix_domain_address(name + 5, addresses); } /* parse name, splitting it into host and port parts */ gpr_split_host_port(name, &host, &port); if (host == NULL) { - gpr_log(GPR_ERROR, "unparseable host:port: '%s'", name); + err = grpc_error_set_str(GRPC_ERROR_CREATE("unparseable host:port"), + GRPC_ERROR_STR_TARGET_ADDRESS, name); goto done; } if (port == NULL) { if (default_port == NULL) { - gpr_log(GPR_ERROR, "no port in name '%s'", name); + err = grpc_error_set_str(GRPC_ERROR_CREATE("no port in name"), + GRPC_ERROR_STR_TARGET_ADDRESS, name); goto done; } port = gpr_strdup(default_port); @@ -115,23 +110,31 @@ static grpc_resolved_addresses *blocking_resolve_address_impl( } if (s != 0) { - gpr_log(GPR_ERROR, "getaddrinfo: %s", gai_strerror(s)); + err = grpc_error_set_str( + grpc_error_set_str( + grpc_error_set_str(grpc_error_set_int(GRPC_ERROR_CREATE("OS Error"), + GRPC_ERROR_INT_ERRNO, s), + GRPC_ERROR_STR_OS_ERROR, gai_strerror(s)), + GRPC_ERROR_STR_SYSCALL, "getaddrinfo"), + GRPC_ERROR_STR_TARGET_ADDRESS, name); goto done; } /* Success path: set addrs non-NULL, fill it in */ - addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); - addrs->naddrs = 0; + *addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); + (*addresses)->naddrs = 0; for (resp = result; resp != NULL; resp = resp->ai_next) { - addrs->naddrs++; + (*addresses)->naddrs++; } - addrs->addrs = gpr_malloc(sizeof(grpc_resolved_address) * addrs->naddrs); + (*addresses)->addrs = + gpr_malloc(sizeof(grpc_resolved_address) * (*addresses)->naddrs); i = 0; for (resp = result; resp != NULL; resp = resp->ai_next) { - memcpy(&addrs->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); - addrs->addrs[i].len = resp->ai_addrlen; + memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); + (*addresses)->addrs[i].len = resp->ai_addrlen; i++; } + err = GRPC_ERROR_NONE; done: gpr_free(host); @@ -139,45 +142,59 @@ done: if (result) { freeaddrinfo(result); } - return addrs; + return err; } -grpc_resolved_addresses *(*grpc_blocking_resolve_address)( - const char *name, const char *default_port) = blocking_resolve_address_impl; +grpc_error *(*grpc_blocking_resolve_address)( + const char *name, const char *default_port, + grpc_resolved_addresses **addresses) = blocking_resolve_address_impl; + +typedef struct { + char *name; + char *default_port; + grpc_closure *on_done; + grpc_resolved_addresses **addrs_out; + grpc_closure request_closure; + void *arg; +} request; /* Callback to be passed to grpc_executor to asynch-ify * grpc_blocking_resolve_address */ -static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) { +static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, + grpc_error *error) { request *r = rp; - grpc_resolved_addresses *resolved = - grpc_blocking_resolve_address(r->name, r->default_port); - void *arg = r->arg; - grpc_resolve_cb cb = r->cb; + grpc_exec_ctx_push( + exec_ctx, r->on_done, + grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out), + NULL); gpr_free(r->name); gpr_free(r->default_port); - cb(exec_ctx, arg, resolved); gpr_free(r); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { - gpr_free(addrs->addrs); + if (addrs != NULL) { + gpr_free(addrs->addrs); + } gpr_free(addrs); } static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, - const char *default_port, grpc_resolve_cb cb, - void *arg) { + const char *default_port, + grpc_closure *on_done, + grpc_resolved_addresses **addrs) { request *r = gpr_malloc(sizeof(request)); grpc_closure_init(&r->request_closure, do_request_thread, r); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); - r->cb = cb; - r->arg = arg; - grpc_executor_enqueue(&r->request_closure, 1); + r->on_done = on_done; + r->addrs_out = addrs; + grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE); } void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name, - const char *default_port, grpc_resolve_cb cb, - void *arg) = resolve_address_impl; + const char *default_port, grpc_closure *on_done, + grpc_resolved_addresses **addrs) = + resolve_address_impl; #endif diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c index fa83ceef30..d1721c910c 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.c +++ b/src/core/lib/iomgr/socket_utils_common_posix.c @@ -57,10 +57,10 @@ #include "src/core/lib/support/string.h" /* set a socket to non blocking mode */ -int grpc_set_socket_nonblocking(int fd, int non_blocking) { +grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking) { int oldflags = fcntl(fd, F_GETFL, 0); if (oldflags < 0) { - return 0; + return GRPC_OS_ERROR(errno, "fcntl"); } if (non_blocking) { @@ -70,52 +70,57 @@ int grpc_set_socket_nonblocking(int fd, int non_blocking) { } if (fcntl(fd, F_SETFL, oldflags) != 0) { - return 0; + return GRPC_OS_ERROR(errno, "fcntl"); } - return 1; + return GRPC_ERROR_NONE; } -int grpc_set_socket_no_sigpipe_if_possible(int fd) { +grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd) { #ifdef GPR_HAVE_SO_NOSIGPIPE int val = 1; int newval; socklen_t intlen = sizeof(newval); - return 0 == setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) && - 0 == getsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen) && - (newval != 0) == val; -#else - return 1; + if (0 != setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) { + return GRPC_OS_ERROR(errno, "setsockopt(SO_NOSIGPIPE)"); + } + if (0 != getsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) { + return GRPC_OS_ERROR(errno, "getsockopt(SO_NOSIGPIPE)"); + } + if ((newval != 0) != (val != 0)) { + return GRPC_ERROR_CREATE("Failed to set SO_NOSIGPIPE"); + } #endif + return GRPC_ERROR_NONE; } -int grpc_set_socket_ip_pktinfo_if_possible(int fd) { +grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd) { #ifdef GPR_HAVE_IP_PKTINFO int get_local_ip = 1; - return 0 == setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, - sizeof(get_local_ip)); -#else - (void)fd; - return 1; + if (0 != setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, + sizeof(get_local_ip))) { + return GRPC_OS_ERROR(errno, "setsockopt(IP_PKTINFO)"); + } #endif + return GRPC_ERROR_NONE; } -int grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) { +grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) { #ifdef GPR_HAVE_IPV6_RECVPKTINFO int get_local_ip = 1; - return 0 == setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip, - sizeof(get_local_ip)); -#else - (void)fd; - return 1; + if (0 != setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip, + sizeof(get_local_ip))) { + return GRPC_OS_ERROR(errno, "setsockopt(IPV6_RECVPKTINFO)"); + } #endif + return GRPC_ERROR_NONE; } /* set a socket to close on exec */ -int grpc_set_socket_cloexec(int fd, int close_on_exec) { +grpc_error *grpc_set_socket_cloexec(int fd, int close_on_exec) { int oldflags = fcntl(fd, F_GETFD, 0); if (oldflags < 0) { - return 0; + return GRPC_OS_ERROR(errno, "fcntl"); } if (close_on_exec) { @@ -125,30 +130,45 @@ int grpc_set_socket_cloexec(int fd, int close_on_exec) { } if (fcntl(fd, F_SETFD, oldflags) != 0) { - return 0; + return GRPC_OS_ERROR(errno, "fcntl"); } - return 1; + return GRPC_ERROR_NONE; } /* set a socket to reuse old addresses */ -int grpc_set_socket_reuse_addr(int fd, int reuse) { +grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse) { int val = (reuse != 0); int newval; socklen_t intlen = sizeof(newval); - return 0 == setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) && - 0 == getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen) && - (newval != 0) == val; + if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) { + return GRPC_OS_ERROR(errno, "setsockopt(SO_REUSEADDR)"); + } + if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen)) { + return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEADDR)"); + } + if ((newval != 0) != val) { + return GRPC_ERROR_CREATE("Failed to set SO_REUSEADDR"); + } + + return GRPC_ERROR_NONE; } /* disable nagle */ -int grpc_set_socket_low_latency(int fd, int low_latency) { +grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) { int val = (low_latency != 0); int newval; socklen_t intlen = sizeof(newval); - return 0 == setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) && - 0 == getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen) && - (newval != 0) == val; + if (0 != setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) { + return GRPC_OS_ERROR(errno, "setsockopt(TCP_NODELAY)"); + } + if (0 != getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen)) { + return GRPC_OS_ERROR(errno, "getsockopt(TCP_NODELAY)"); + } + if ((newval != 0) != val) { + return GRPC_ERROR_CREATE("Failed to set TCP_NODELAY"); + } + return GRPC_ERROR_NONE; } static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT; @@ -196,35 +216,40 @@ static int set_socket_dualstack(int fd) { } } -int grpc_create_dualstack_socket(const struct sockaddr *addr, int type, - int protocol, grpc_dualstack_mode *dsmode) { +grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type, + int protocol, + grpc_dualstack_mode *dsmode, + int *newfd) { int family = addr->sa_family; if (family == AF_INET6) { - int fd; if (grpc_ipv6_loopback_available()) { - fd = socket(family, type, protocol); + *newfd = socket(family, type, protocol); } else { - fd = -1; + *newfd = -1; errno = EAFNOSUPPORT; } /* Check if we've got a valid dualstack socket. */ - if (fd >= 0 && set_socket_dualstack(fd)) { + if (*newfd >= 0 && set_socket_dualstack(*newfd)) { *dsmode = GRPC_DSMODE_DUALSTACK; - return fd; + return GRPC_ERROR_NONE; } /* If this isn't an IPv4 address, then return whatever we've got. */ if (!grpc_sockaddr_is_v4mapped(addr, NULL)) { *dsmode = GRPC_DSMODE_IPV6; - return fd; + return GRPC_ERROR_NONE; } /* Fall back to AF_INET. */ - if (fd >= 0) { - close(fd); + if (*newfd >= 0) { + close(*newfd); } family = AF_INET; } *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE; - return socket(family, type, protocol); + *newfd = socket(family, type, protocol); + if (*newfd == -1) { + return GRPC_OS_ERROR(errno, "socket"); + } + return GRPC_ERROR_NONE; } #endif diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index a8f6e5e658..4e66cbc0c5 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -37,21 +37,23 @@ #include <sys/socket.h> #include <unistd.h> +#include "src/core/lib/iomgr/error.h" + /* a wrapper for accept or accept4 */ int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int nonblock, int cloexec); /* set a socket to non blocking mode */ -int grpc_set_socket_nonblocking(int fd, int non_blocking); +grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking); /* set a socket to close on exec */ -int grpc_set_socket_cloexec(int fd, int close_on_exec); +grpc_error *grpc_set_socket_cloexec(int fd, int close_on_exec); /* set a socket to reuse old addresses */ -int grpc_set_socket_reuse_addr(int fd, int reuse); +grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse); /* disable nagle */ -int grpc_set_socket_low_latency(int fd, int low_latency); +grpc_error *grpc_set_socket_low_latency(int fd, int low_latency); /* Returns true if this system can create AF_INET6 sockets bound to ::1. The value is probed once, and cached for the life of the process. @@ -66,17 +68,17 @@ int grpc_ipv6_loopback_available(void); /* Tries to set SO_NOSIGPIPE if available on this platform. Returns 1 on success, 0 on failure. If SO_NO_SIGPIPE is not available, returns 1. */ -int grpc_set_socket_no_sigpipe_if_possible(int fd); +grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd); /* Tries to set IP_PKTINFO if available on this platform. Returns 1 on success, 0 on failure. If IP_PKTINFO is not available, returns 1. */ -int grpc_set_socket_ip_pktinfo_if_possible(int fd); +grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd); /* Tries to set IPV6_RECVPKTINFO if available on this platform. Returns 1 on success, 0 on failure. If IPV6_RECVPKTINFO is not available, returns 1. */ -int grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd); +grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd); /* An enum to keep track of IPv4/IPv6 socket modes. @@ -117,7 +119,9 @@ extern int grpc_forbid_dualstack_sockets_for_testing; IPv4, so that bind() or connect() see the correct family. Also, it's important to distinguish between DUALSTACK and IPV6 when listening on the [::] wildcard address. */ -int grpc_create_dualstack_socket(const struct sockaddr *addr, int type, - int protocol, grpc_dualstack_mode *dsmode); +grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type, + int protocol, + grpc_dualstack_mode *dsmode, + int *newfd); #endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index e93d5734a0..9c4ca1617f 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -71,33 +71,39 @@ typedef struct { grpc_closure *closure; } async_connect; -static int prepare_socket(const struct sockaddr *addr, int fd) { - if (fd < 0) { - goto error; +static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) { + grpc_error *err = GRPC_ERROR_NONE; + + GPR_ASSERT(fd >= 0); + + err = grpc_set_socket_nonblocking(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_cloexec(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + if (!grpc_is_unix_socket(addr)) { + err = grpc_set_socket_low_latency(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; } - - if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) || - (!grpc_is_unix_socket(addr) && !grpc_set_socket_low_latency(fd, 1)) || - !grpc_set_socket_no_sigpipe_if_possible(fd)) { - gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, - strerror(errno)); - goto error; - } - return 1; + err = grpc_set_socket_no_sigpipe_if_possible(fd); + if (err != GRPC_ERROR_NONE) goto error; + goto done; error: if (fd >= 0) { close(fd); } - return 0; +done: + return err; } -static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) { +static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { int done; async_connect *ac = acp; if (grpc_tcp_trace) { - gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: success=%d", ac->addr_str, - success); + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s", ac->addr_str, + str); + grpc_error_free_string(str); } gpr_mu_lock(&ac->mu); if (ac->fd != NULL) { @@ -112,7 +118,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) { } } -static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { +static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { async_connect *ac = acp; int so_error = 0; socklen_t so_error_size; @@ -122,9 +128,13 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { grpc_closure *closure = ac->closure; grpc_fd *fd; + GRPC_ERROR_REF(error); + if (grpc_tcp_trace) { - gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: success=%d", - ac->addr_str, success); + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: error=%s", + ac->addr_str, str); + grpc_error_free_string(str); } gpr_mu_lock(&ac->mu); @@ -136,15 +146,14 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { grpc_timer_cancel(exec_ctx, &ac->alarm); gpr_mu_lock(&ac->mu); - if (success) { + if (error == GRPC_ERROR_NONE) { do { so_error_size = sizeof(so_error); err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); } while (err < 0 && errno == EINTR); if (err < 0) { - gpr_log(GPR_ERROR, "failed to connect to '%s': getsockopt(ERROR): %s", - ac->addr_str, strerror(errno)); + error = GRPC_OS_ERROR(errno, "getsockopt"); goto finish; } else if (so_error != 0) { if (so_error == ENOBUFS) { @@ -169,14 +178,12 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { } else { switch (so_error) { case ECONNREFUSED: - gpr_log( - GPR_ERROR, - "failed to connect to '%s': socket error: connection refused", - ac->addr_str); + error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, errno); + error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + "Connection refused"); break; default: - gpr_log(GPR_ERROR, "failed to connect to '%s': socket error: %d", - ac->addr_str, so_error); + error = GRPC_OS_ERROR(errno, "getsockopt(SO_ERROR)"); break; } goto finish; @@ -188,8 +195,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { goto finish; } } else { - gpr_log(GPR_ERROR, "failed to connect to '%s': timeout occurred", - ac->addr_str); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred"); goto finish; } @@ -203,12 +210,18 @@ finish: } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); + if (error != GRPC_ERROR_NONE) { + error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, + "Failed to connect to remote host"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str); + } if (done) { gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); gpr_free(ac); } - grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL, NULL); + grpc_exec_ctx_push(exec_ctx, closure, error, NULL); } static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, @@ -225,6 +238,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_fd *fdobj; char *name; char *addr_str; + grpc_error *error; *ep = NULL; @@ -234,9 +248,10 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, addr_len = sizeof(addr6_v4mapped); } - fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); - if (fd < 0) { - gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); + error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); + if (error != GRPC_ERROR_NONE) { + grpc_exec_ctx_push(exec_ctx, closure, error, NULL); + return; } if (dsmode == GRPC_DSMODE_IPV4) { /* If we got an AF_INET socket, map the address back to IPv4. */ @@ -244,8 +259,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, addr = (struct sockaddr *)&addr4_copy; addr_len = sizeof(addr4_copy); } - if (!prepare_socket(addr, fd)) { - grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); + if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) { + grpc_exec_ctx_push(exec_ctx, closure, error, NULL); return; } @@ -261,14 +276,14 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (err >= 0) { *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str); - grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); goto done; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { - gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno)); grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); - grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); + grpc_exec_ctx_push(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"), + NULL); goto done; } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index e2869224f1..766a976277 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -101,9 +101,9 @@ typedef struct { } grpc_tcp; static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - bool success); + grpc_error *error); static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - bool success); + grpc_error *error); static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; @@ -155,12 +155,15 @@ static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { TCP_UNREF(exec_ctx, tcp, "destroy"); } -static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) { +static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, + grpc_error *error) { grpc_closure *cb = tcp->read_cb; if (grpc_tcp_trace) { size_t i; - gpr_log(GPR_DEBUG, "read: success=%d", success); + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "read: error=%s", str); + grpc_error_free_string(str); for (i = 0; i < tcp->incoming_buffer->count; i++) { char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); @@ -171,7 +174,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) { tcp->read_cb = NULL; tcp->incoming_buffer = NULL; - cb->cb(exec_ctx, cb->cb_arg, success); + grpc_exec_ctx_push(exec_ctx, cb, error, NULL); } #define MAX_READ_IOVEC 4 @@ -219,15 +222,14 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { /* We've consumed the edge, request a new one */ grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - /* TODO(klempner): Log interesting errors */ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, 0); + call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg")); TCP_UNREF(exec_ctx, tcp, "read"); } } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, 0); + call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("EOF")); TCP_UNREF(exec_ctx, tcp, "read"); } else { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); @@ -240,7 +242,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { ++tcp->iov_size; } GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); - call_read_cb(exec_ctx, tcp, 1); + call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE); TCP_UNREF(exec_ctx, tcp, "read"); } @@ -248,13 +250,13 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { } static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - bool success) { + grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); - if (!success) { + if (error != GRPC_ERROR_NONE) { gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, 0); + call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); TCP_UNREF(exec_ctx, tcp, "read"); } else { tcp_continue_read(exec_ctx, tcp); @@ -274,14 +276,13 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->finished_edge = 0; grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL); } } -typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result; - +/* returns true if done, false if pending; if returning true, *error is set */ #define MAX_WRITE_IOVEC 16 -static flush_result tcp_flush(grpc_tcp *tcp) { +static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; msg_iovlen_type iov_size; @@ -331,10 +332,10 @@ static flush_result tcp_flush(grpc_tcp *tcp) { if (errno == EAGAIN) { tcp->outgoing_slice_idx = unwind_slice_idx; tcp->outgoing_byte_idx = unwind_byte_idx; - return FLUSH_PENDING; + return false; } else { - /* TODO(klempner): Log some of these */ - return FLUSH_ERROR; + *error = GRPC_OS_ERROR(errno, "sendmsg"); + return true; } } @@ -355,18 +356,18 @@ static flush_result tcp_flush(grpc_tcp *tcp) { } if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { - return FLUSH_DONE; + *error = GRPC_ERROR_NONE; + return true; } }; } static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - bool success) { + grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)arg; - flush_result status; grpc_closure *cb; - if (!success) { + if (error != GRPC_ERROR_NONE) { cb = tcp->write_cb; tcp->write_cb = NULL; cb->cb(exec_ctx, cb->cb_arg, 0); @@ -374,14 +375,13 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, return; } - status = tcp_flush(tcp); - if (status == FLUSH_PENDING) { + if (!tcp_flush(tcp, &error)) { grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { cb = tcp->write_cb; tcp->write_cb = NULL; GPR_TIMER_BEGIN("tcp_handle_write.cb", 0); - cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE); + cb->cb(exec_ctx, cb->cb_arg, error); GPR_TIMER_END("tcp_handle_write.cb", 0); TCP_UNREF(exec_ctx, tcp, "write"); } @@ -390,7 +390,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_slice_buffer *buf, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; - flush_result status; + grpc_error *error = GRPC_ERROR_NONE; if (grpc_tcp_trace) { size_t i; @@ -408,20 +408,19 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL); + grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); return; } tcp->outgoing_buffer = buf; tcp->outgoing_slice_idx = 0; tcp->outgoing_byte_idx = 0; - status = tcp_flush(tcp); - if (status == FLUSH_PENDING) { + if (!tcp_flush(tcp, &error)) { TCP_REF(tcp, "write"); tcp->write_cb = cb; grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { - grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE, NULL); + grpc_exec_ctx_push(exec_ctx, cb, error, NULL); } GPR_TIMER_END("tcp_write", 0); diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 99b9f29729..924121f0c7 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -57,7 +57,8 @@ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, /* Create a server, initially not bound to any ports. The caller owns one ref. If shutdown_complete is not NULL, it will be used by grpc_tcp_server_unref() when the ref count reaches zero. */ -grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete); +grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, + grpc_tcp_server **server); /* Start listening to bound ports */ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, @@ -73,8 +74,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, but not dualstack sockets. */ /* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle all of the multiple socket port matching logic in one place */ -int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - size_t addr_len); +grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, + size_t addr_len, int *out_port); /* Number of fds at the given port_index, or 0 if port_index is out of bounds. */ diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index aaeb384f6e..db70fada96 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -59,6 +59,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include <grpc/support/useful.h> + #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" @@ -130,7 +132,8 @@ struct grpc_tcp_server { size_t pollset_count; }; -grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) { +grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, + grpc_tcp_server **server) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); gpr_ref_init(&s->refs, 1); gpr_mu_init(&s->mu); @@ -145,12 +148,13 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) { s->head = NULL; s->tail = NULL; s->nports = 0; - return s; + *server = s; + return GRPC_ERROR_NONE; } static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, true, NULL); + grpc_exec_ctx_push(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); } gpr_mu_destroy(&s->mu); @@ -165,7 +169,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, - bool success) { + grpc_error *error) { grpc_tcp_server *s = server; gpr_mu_lock(&s->mu); s->destroyed_ports++; @@ -259,61 +263,68 @@ static int get_max_accept_queue_size(void) { } /* Prepare a recently-created socket for listening. */ -static int prepare_socket(int fd, const struct sockaddr *addr, - size_t addr_len) { +static grpc_error *prepare_socket(int fd, const struct sockaddr *addr, + size_t addr_len, int *port) { struct sockaddr_storage sockname_temp; socklen_t sockname_len; - - if (fd < 0) { - goto error; - } - - if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) || - (!grpc_is_unix_socket(addr) && (!grpc_set_socket_low_latency(fd, 1) || - !grpc_set_socket_reuse_addr(fd, 1))) || - !grpc_set_socket_no_sigpipe_if_possible(fd)) { - gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, - strerror(errno)); - goto error; + grpc_error *err = GRPC_ERROR_NONE; + + GPR_ASSERT(fd >= 0); + + err = grpc_set_socket_nonblocking(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_cloexec(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + if (!grpc_is_unix_socket(addr)) { + err = grpc_set_socket_low_latency(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_reuse_addr(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; } + err = grpc_set_socket_no_sigpipe_if_possible(fd); + if (err != GRPC_ERROR_NONE) goto error; GPR_ASSERT(addr_len < ~(socklen_t)0); if (bind(fd, addr, (socklen_t)addr_len) < 0) { - char *addr_str; - grpc_sockaddr_to_string(&addr_str, addr, 0); - gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); - gpr_free(addr_str); + err = GRPC_OS_ERROR(errno, "bind"); goto error; } if (listen(fd, get_max_accept_queue_size()) < 0) { - gpr_log(GPR_ERROR, "listen: %s", strerror(errno)); + err = GRPC_OS_ERROR(errno, "listen"); goto error; } sockname_len = sizeof(sockname_temp); if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) { + err = GRPC_OS_ERROR(errno, "getsockname"); goto error; } - return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + *port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + return GRPC_ERROR_NONE; error: + GPR_ASSERT(err != GRPC_ERROR_NONE); if (fd >= 0) { close(fd); } - return -1; + grpc_error *ret = grpc_error_set_int( + GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1), + GRPC_ERROR_INT_FD, fd); + GRPC_ERROR_UNREF(err); + return ret; } /* event manager callback when reads are ready */ -static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_tcp_listener *sp = arg; grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, sp->fd_index}; grpc_fd *fdobj; size_t i; - if (!success) { + if (err != GRPC_ERROR_NONE) { goto error; } @@ -376,18 +387,19 @@ error: } } -static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd, - const struct sockaddr *addr, - size_t addr_len, - unsigned port_index, - unsigned fd_index) { +static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, + const struct sockaddr *addr, + size_t addr_len, unsigned port_index, + unsigned fd_index, + grpc_tcp_listener **listener) { grpc_tcp_listener *sp = NULL; - int port; + int port = -1; char *addr_str; char *name; - port = prepare_socket(fd, addr, addr_len); - if (port >= 0) { + grpc_error *err = prepare_socket(fd, addr, addr_len, &port); + if (err == GRPC_ERROR_NONE) { + GPR_ASSERT(port > 0); grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); gpr_mu_lock(&s->mu); @@ -417,11 +429,12 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd, gpr_free(name); } - return sp; + *listener = sp; + return err; } -int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - size_t addr_len) { +grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, + size_t addr_len, int *out_port) { grpc_tcp_listener *sp; grpc_tcp_listener *sp2 = NULL; int fd; @@ -436,6 +449,7 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, int port; unsigned port_index = 0; unsigned fd_index = 0; + grpc_error *errs[2] = {GRPC_ERROR_NONE, GRPC_ERROR_NONE}; if (s->tail != NULL) { port_index = s->tail->port_index + 1; } @@ -474,33 +488,35 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, /* Try listening on IPv6 first. */ addr = (struct sockaddr *)&wild6; addr_len = sizeof(wild6); - fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); - sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index); - if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { - goto done; - } - if (sp != NULL) { - ++fd_index; - } - /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ - if (port == 0 && sp != NULL) { - grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port); + errs[0] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); + if (errs[0] == GRPC_ERROR_NONE) { + errs[0] = add_socket_to_server(s, fd, addr, addr_len, port_index, + fd_index, &sp); + if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { + goto done; + } + if (sp != NULL) { + ++fd_index; + } + /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ + if (port == 0 && sp != NULL) { + grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port); + } + addr = (struct sockaddr *)&wild4; + addr_len = sizeof(wild4); } - addr = (struct sockaddr *)&wild4; - addr_len = sizeof(wild4); } - fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); - if (fd < 0) { - gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); - } else { + errs[1] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); + if (errs[1] == GRPC_ERROR_NONE) { if (dsmode == GRPC_DSMODE_IPV4 && grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { addr = (struct sockaddr *)&addr4_copy; addr_len = sizeof(addr4_copy); } sp2 = sp; - sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index); + errs[1] = + add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index, &sp); if (sp2 != NULL && sp != NULL) { sp2->sibling = sp; sp->is_sibling = 1; @@ -510,9 +526,21 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, done: gpr_free(allocated_addr); if (sp != NULL) { - return sp->port; + *out_port = sp->port; + GRPC_ERROR_UNREF(errs[0]); + GRPC_ERROR_UNREF(errs[1]); + return GRPC_ERROR_NONE; } else { - return -1; + *out_port = -1; + char *addr_str = grpc_sockaddr_to_uri(addr); + grpc_error *err = grpc_error_set_str( + GRPC_ERROR_CREATE_REFERENCING("Failed to add port to server", errs, + GPR_ARRAY_SIZE(errs)), + GRPC_ERROR_STR_TARGET_ADDRESS, addr_str); + GRPC_ERROR_UNREF(errs[0]); + GRPC_ERROR_UNREF(errs[1]); + gpr_free(addr_str); + return err; } } @@ -581,7 +609,8 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, grpc_closure *shutdown_starting) { gpr_mu_lock(&s->mu); - grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1); + grpc_closure_list_append(&s->shutdown_starting, shutdown_starting, + GRPC_ERROR_NONE); gpr_mu_unlock(&s->mu); } diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer.c index acb5b26c87..b46c6df287 100644 --- a/src/core/lib/iomgr/timer.c +++ b/src/core/lib/iomgr/timer.c @@ -73,7 +73,7 @@ static shard_type *g_shard_queue[NUM_SHARDS]; static bool g_initialized = false; static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next, int success); + gpr_timespec *next, grpc_error *error); static gpr_timespec compute_min_deadline(shard_type *shard) { return grpc_timer_heap_is_empty(&shard->heap) @@ -105,7 +105,8 @@ void grpc_timer_list_init(gpr_timespec now) { void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { int i; - run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0); + run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, + GRPC_ERROR_CREATE("Timer list shutdown")); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); @@ -185,13 +186,16 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, if (!g_initialized) { timer->triggered = 1; - grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL); + grpc_exec_ctx_push( + exec_ctx, &timer->closure, + GRPC_ERROR_CREATE("Attempt to create timer before initialization"), + NULL); return; } if (gpr_time_cmp(deadline, now) <= 0) { timer->triggered = 1; - grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); return; } @@ -235,10 +239,15 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { + if (!g_initialized) { + /* must have already been cancelled, also the shard mutex is invalid */ + return; + } + shard_type *shard = &g_shards[shard_idx(timer)]; gpr_mu_lock(&shard->mu); if (!timer->triggered) { - grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL); + grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL); timer->triggered = 1; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); @@ -299,12 +308,12 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { /* REQUIRES: shard->mu unlocked */ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, gpr_timespec now, gpr_timespec *new_min_deadline, - int success) { + grpc_error *error) { size_t n = 0; grpc_timer *timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { - grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success, NULL); + grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_REF(error), NULL); n++; } *new_min_deadline = compute_min_deadline(shard); @@ -313,7 +322,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, } static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next, int success) { + gpr_timespec *next, grpc_error *error) { size_t n = 0; /* TODO(ctiller): verify that there are any timers (atomically) here */ @@ -327,8 +336,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, /* For efficiency, we pop as many available timers as we can from the shard. This may violate perfect timer deadline ordering, but that shouldn't be a big deal because we don't make ordering guarantees. */ - n += pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, - success); + n += + pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error); /* An grpc_timer_init() on the shard could intervene here, adding a new timer that is earlier than new_min_deadline. However, @@ -359,6 +368,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, *next, gpr_time_add(now, gpr_time_from_millis(1, GPR_TIMESPAN))); } + GRPC_ERROR_UNREF(error); + return (int)n; } @@ -367,5 +378,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, GPR_ASSERT(now.clock_type == g_clock_type); return run_some_expired_timers( exec_ctx, now, next, - gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0); + gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 + ? GRPC_ERROR_NONE + : GRPC_ERROR_CREATE("Shutting down timer system")); } diff --git a/src/core/lib/iomgr/unix_sockets_posix.c b/src/core/lib/iomgr/unix_sockets_posix.c index 5767c852df..0e7670e5a5 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.c +++ b/src/core/lib/iomgr/unix_sockets_posix.c @@ -47,17 +47,18 @@ void grpc_create_socketpair_if_unix(int sv[2]) { GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); } -grpc_resolved_addresses *grpc_resolve_unix_domain_address(const char *name) { +grpc_error *grpc_resolve_unix_domain_address(const char *name, + grpc_resolved_addresses **addrs) { struct sockaddr_un *un; - grpc_resolved_addresses *addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); - addrs->naddrs = 1; - addrs->addrs = gpr_malloc(sizeof(grpc_resolved_address)); - un = (struct sockaddr_un *)addrs->addrs->addr; + *addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); + (*addrs)->naddrs = 1; + (*addrs)->addrs = gpr_malloc(sizeof(grpc_resolved_address)); + un = (struct sockaddr_un *)(*addrs)->addrs->addr; un->sun_family = AF_UNIX; strcpy(un->sun_path, name); - addrs->addrs->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; - return addrs; + (*addrs)->addrs->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; + return GRPC_ERROR_NONE; } int grpc_is_unix_socket(const struct sockaddr *addr) { diff --git a/src/core/lib/iomgr/unix_sockets_posix.h b/src/core/lib/iomgr/unix_sockets_posix.h index 6758c498e5..db0516d945 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.h +++ b/src/core/lib/iomgr/unix_sockets_posix.h @@ -43,7 +43,8 @@ void grpc_create_socketpair_if_unix(int sv[2]); -grpc_resolved_addresses *grpc_resolve_unix_domain_address(const char *name); +grpc_error *grpc_resolve_unix_domain_address( + const char *name, grpc_resolved_addresses **addresses); int grpc_is_unix_socket(const struct sockaddr *addr); diff --git a/src/core/lib/iomgr/wakeup_fd_eventfd.c b/src/core/lib/iomgr/wakeup_fd_eventfd.c index 8a772add13..3047205833 100644 --- a/src/core/lib/iomgr/wakeup_fd_eventfd.c +++ b/src/core/lib/iomgr/wakeup_fd_eventfd.c @@ -44,29 +44,39 @@ #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -static void eventfd_create(grpc_wakeup_fd* fd_info) { +static grpc_error* eventfd_create(grpc_wakeup_fd* fd_info) { int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); - /* TODO(klempner): Handle failure more gracefully */ - GPR_ASSERT(efd >= 0); + if (efd < 0) { + return GRPC_OS_ERROR(errno, "eventfd"); + } fd_info->read_fd = efd; fd_info->write_fd = -1; + return GRPC_ERROR_NONE; } -static void eventfd_consume(grpc_wakeup_fd* fd_info) { +static grpc_error* eventfd_consume(grpc_wakeup_fd* fd_info) { eventfd_t value; int err; do { err = eventfd_read(fd_info->read_fd, &value); } while (err < 0 && errno == EINTR); + if (err < 0) { + return GRPC_OS_ERROR(errno, "eventfd_read"); + } + return GRPC_ERROR_NONE; } -static void eventfd_wakeup(grpc_wakeup_fd* fd_info) { +static grpc_error* eventfd_wakeup(grpc_wakeup_fd* fd_info) { int err; GPR_TIMER_BEGIN("eventfd_wakeup", 0); do { err = eventfd_write(fd_info->read_fd, 1); } while (err < 0 && errno == EINTR); + if (err < 0) { + return GRPC_OS_ERROR(errno, "eventfd_read"); + } GPR_TIMER_END("eventfd_wakeup", 0); + return GRPC_ERROR_NONE; } static void eventfd_destroy(grpc_wakeup_fd* fd_info) { diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.c index e9b9a0119f..4e5dbdcb73 100644 --- a/src/core/lib/iomgr/wakeup_fd_pipe.c +++ b/src/core/lib/iomgr/wakeup_fd_pipe.c @@ -45,7 +45,7 @@ #include "src/core/lib/iomgr/socket_utils_posix.h" -static void pipe_init(grpc_wakeup_fd* fd_info) { +static grpc_error* pipe_init(grpc_wakeup_fd* fd_info) { int pipefd[2]; /* TODO(klempner): Make this nonfatal */ int r = pipe(pipefd); @@ -53,36 +53,40 @@ static void pipe_init(grpc_wakeup_fd* fd_info) { gpr_log(GPR_ERROR, "pipe creation failed (%d): %s", errno, strerror(errno)); abort(); } - GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1)); - GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1)); + grpc_error* err; + err = grpc_set_socket_nonblocking(pipefd[0], 1); + if (err != GRPC_ERROR_NONE) return err; + err = grpc_set_socket_nonblocking(pipefd[1], 1); + if (err != GRPC_ERROR_NONE) return err; fd_info->read_fd = pipefd[0]; fd_info->write_fd = pipefd[1]; + return GRPC_ERROR_NONE; } -static void pipe_consume(grpc_wakeup_fd* fd_info) { +static grpc_error* pipe_consume(grpc_wakeup_fd* fd_info) { char buf[128]; ssize_t r; for (;;) { r = read(fd_info->read_fd, buf, sizeof(buf)); if (r > 0) continue; - if (r == 0) return; + if (r == 0) return GRPC_ERROR_NONE; switch (errno) { case EAGAIN: - return; + return GRPC_ERROR_NONE; case EINTR: continue; default: - gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno)); - return; + return GRPC_OS_ERROR(errno, "read"); } } } -static void pipe_wakeup(grpc_wakeup_fd* fd_info) { +static grpc_error* pipe_wakeup(grpc_wakeup_fd* fd_info) { char c = 0; while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR) ; + return GRPC_ERROR_NONE; } static void pipe_destroy(grpc_wakeup_fd* fd_info) { diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.c index 525369c356..046208abc8 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.c +++ b/src/core/lib/iomgr/wakeup_fd_posix.c @@ -53,16 +53,16 @@ void grpc_wakeup_fd_global_init(void) { void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; } -void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) { - wakeup_fd_vtable->init(fd_info); +grpc_error *grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) { + return wakeup_fd_vtable->init(fd_info); } -void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) { - wakeup_fd_vtable->consume(fd_info); +grpc_error *grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) { + return wakeup_fd_vtable->consume(fd_info); } -void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) { - wakeup_fd_vtable->wakeup(fd_info); +grpc_error *grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) { + return wakeup_fd_vtable->wakeup(fd_info); } void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) { diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h index 6b069c1837..e269f242d8 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.h +++ b/src/core/lib/iomgr/wakeup_fd_posix.h @@ -62,6 +62,8 @@ #ifndef GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H #define GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H +#include "src/core/lib/iomgr/error.h" + void grpc_wakeup_fd_global_init(void); void grpc_wakeup_fd_global_destroy(void); @@ -72,9 +74,9 @@ void grpc_wakeup_fd_global_init_force_fallback(void); typedef struct grpc_wakeup_fd grpc_wakeup_fd; typedef struct grpc_wakeup_fd_vtable { - void (*init)(grpc_wakeup_fd* fd_info); - void (*consume)(grpc_wakeup_fd* fd_info); - void (*wakeup)(grpc_wakeup_fd* fd_info); + grpc_error* (*init)(grpc_wakeup_fd* fd_info); + grpc_error* (*consume)(grpc_wakeup_fd* fd_info); + grpc_error* (*wakeup)(grpc_wakeup_fd* fd_info); void (*destroy)(grpc_wakeup_fd* fd_info); /* Must be called before calling any other functions */ int (*check_availability)(void); @@ -89,9 +91,10 @@ extern int grpc_allow_specialized_wakeup_fd; #define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd) -void grpc_wakeup_fd_init(grpc_wakeup_fd* fd_info); -void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd* fd_info); -void grpc_wakeup_fd_wakeup(grpc_wakeup_fd* fd_info); +grpc_error* grpc_wakeup_fd_init(grpc_wakeup_fd* fd_info) GRPC_MUST_USE_RESULT; +grpc_error* grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd* fd_info) + GRPC_MUST_USE_RESULT; +grpc_error* grpc_wakeup_fd_wakeup(grpc_wakeup_fd* fd_info) GRPC_MUST_USE_RESULT; void grpc_wakeup_fd_destroy(grpc_wakeup_fd* fd_info); /* Defined in some specialized implementation's .c file, or by diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 3e2b223670..6e8b1218be 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -50,7 +50,8 @@ /* grpc_workqueue is forward declared in exec_ctx.h */ /** Create a work queue */ -grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx); +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue); void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); @@ -77,7 +78,7 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); /** Add a work item to a workqueue */ -void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, - int success); +void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error); #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */ diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 80e7a0b206..297e8d3102 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -45,22 +45,27 @@ #include "src/core/lib/iomgr/ev_posix.h" -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success); +static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) { +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue) { char name[32]; - grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue)); - gpr_ref_init(&workqueue->refs, 1); - gpr_mu_init(&workqueue->mu); - workqueue->closure_list.head = workqueue->closure_list.tail = NULL; - grpc_wakeup_fd_init(&workqueue->wakeup_fd); - sprintf(name, "workqueue:%p", (void *)workqueue); - workqueue->wakeup_read_fd = - grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name); - grpc_closure_init(&workqueue->read_closure, on_readable, workqueue); - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); - return workqueue; + *workqueue = gpr_malloc(sizeof(grpc_workqueue)); + gpr_ref_init(&(*workqueue)->refs, 1); + gpr_mu_init(&(*workqueue)->mu); + (*workqueue)->closure_list.head = (*workqueue)->closure_list.tail = NULL; + grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd); + if (err != GRPC_ERROR_NONE) { + gpr_free(*workqueue); + return err; + } + sprintf(name, "workqueue:%p", (void *)(*workqueue)); + (*workqueue)->wakeup_read_fd = grpc_fd_create( + GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name); + grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue); + grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd, + &(*workqueue)->read_closure); + return GRPC_ERROR_NONE; } static void workqueue_destroy(grpc_exec_ctx *exec_ctx, @@ -103,17 +108,14 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { gpr_mu_lock(&workqueue->mu); - if (grpc_closure_list_empty(workqueue->closure_list)) { - grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); - } grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); gpr_mu_unlock(&workqueue->mu); } -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_workqueue *workqueue = arg; - if (!success) { + if (error != GRPC_ERROR_NONE) { gpr_mu_destroy(&workqueue->mu); /* HACK: let wakeup_fd code know that we stole the fd */ workqueue->wakeup_fd.read_fd = 0; @@ -123,20 +125,32 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { } else { gpr_mu_lock(&workqueue->mu); grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); - grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); + error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); gpr_mu_unlock(&workqueue->mu); - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); + if (error == GRPC_ERROR_NONE) { + grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, + &workqueue->read_closure); + } else { + /* recurse to get error handling */ + on_readable(exec_ctx, arg, error); + } } } -void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, - int success) { +void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error) { + grpc_error *push_error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { - grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + push_error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + } + grpc_closure_list_append(&workqueue->closure_list, closure, error); + if (push_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(push_error); + gpr_log(GPR_ERROR, "Failed to push to workqueue: %s", msg); + grpc_error_free_string(msg); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); } - grpc_closure_list_add(&workqueue->closure_list, closure, success); gpr_mu_unlock(&workqueue->mu); } |