diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/iomgr/closure.c | 26 | ||||
-rw-r--r-- | src/core/lib/iomgr/closure.h | 21 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.c | 346 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.h | 69 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 26 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.c | 15 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.h | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_posix.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_utils_common_posix.c | 118 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_utils_posix.h | 22 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.c | 53 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 32 |
14 files changed, 605 insertions, 137 deletions
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 27793c32e4..8738c4a5ce 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -39,25 +39,29 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg) { closure->cb = cb; closure->cb_arg = cb_arg; - closure->final_data = 0; } -void grpc_closure_list_add(grpc_closure_list *closure_list, - grpc_closure *closure, bool success) { +void grpc_closure_list_append(grpc_closure_list *closure_list, + grpc_closure *closure, grpc_error *error) { if (closure == NULL) return; - closure->final_data = (success != 0); + closure->final_data.error = error; + closure->next = NULL; if (closure_list->head == NULL) { closure_list->head = closure; } else { - closure_list->tail->final_data |= (uintptr_t)closure; + closure_list->tail->next = closure; } closure_list->tail = closure; } -void grpc_closure_list_fail_all(grpc_closure_list *list) { - for (grpc_closure *c = list->head; c != NULL; c = grpc_closure_next(c)) { - c->final_data &= ~(uintptr_t)1; +void grpc_closure_list_fail_all(grpc_closure_list *list, + grpc_error *forced_failure) { + for (grpc_closure *c = list->head; c != NULL; c = c->next) { + if (c->final_data.error == NULL) { + c->final_data.error = grpc_error_ref(forced_failure); + } } + grpc_error_unref(forced_failure); } bool grpc_closure_list_empty(grpc_closure_list closure_list) { @@ -71,7 +75,7 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) { if (dst->head == NULL) { *dst = *src; } else { - dst->tail->final_data |= (uintptr_t)src->head; + dst->tail->next = src->head; dst->tail = src->tail; } src->head = src->tail = NULL; @@ -98,7 +102,3 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) { grpc_closure_init(&wc->wrapper, closure_wrapper, wc); return &wc->wrapper; } - -grpc_closure *grpc_closure_next(grpc_closure *closure) { - return (grpc_closure *)(closure->final_data & ~(uintptr_t)1); -} diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index fdc2daed9d..8e76412ee0 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -36,6 +36,7 @@ #include <grpc/support/port_platform.h> #include <stdbool.h> +#include "src/core/lib/iomgr/error.h" struct grpc_closure; typedef struct grpc_closure grpc_closure; @@ -65,10 +66,12 @@ struct grpc_closure { /** Arguments to be passed to "cb". */ void *cb_arg; - /** Once enqueued, contains in the lower bit the success of the closure, - and in the upper bits the pointer to the next closure in the list. - Before enqueing for execution, this is usable for scratch data. */ - uintptr_t final_data; + union { + grpc_error *error; + uintptr_t scratch; + } final_data; + + grpc_closure *next; }; /** Initializes \a closure with \a cb and \a cb_arg. */ @@ -83,11 +86,12 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg); /** add \a closure to the end of \a list and set \a closure's success to \a * success */ -void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure, - bool success); +void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure, + grpc_error *error); /** force all success bits in \a list to false */ -void grpc_closure_list_fail_all(grpc_closure_list *list); +void grpc_closure_list_fail_all(grpc_closure_list *list, + grpc_error *forced_failure); /** append all closures from \a src to \a dst and empty \a src. */ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); @@ -95,7 +99,4 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); /** return whether \a list is empty. */ bool grpc_closure_list_empty(grpc_closure_list list); -/** return the next pointer for a queued closure list */ -grpc_closure *grpc_closure_next(grpc_closure *closure); - #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c new file mode 100644 index 0000000000..7dff3ece35 --- /dev/null +++ b/src/core/lib/iomgr/error.c @@ -0,0 +1,346 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/error.h" + +#include <stdbool.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/avl.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + +static void destroy_integer(void *key) {} + +static void *copy_integer(void *key) { return key; } + +static long compare_integers(void *key1, void *key2) { + return GPR_ICMP((uintptr_t)key1, (uintptr_t)key2); +} + +static void destroy_string(void *str) { gpr_free(str); } + +static void *copy_string(void *str) { return gpr_strdup(str); } + +static void destroy_err(void *err) { grpc_error_unref(err); } + +static void *copy_err(void *err) { return grpc_error_ref(err); } + +static const gpr_avl_vtable avl_vtable_ints = {destroy_integer, copy_integer, + compare_integers, + destroy_integer, copy_integer}; + +static const gpr_avl_vtable avl_vtable_strs = {destroy_integer, copy_integer, + compare_integers, destroy_string, + copy_string}; + +static const gpr_avl_vtable avl_vtable_errs = { + destroy_integer, copy_integer, compare_integers, destroy_err, copy_err}; + +static const char *error_int_name(grpc_error_ints key) { + switch (key) { + case GRPC_ERROR_INT_STATUS_CODE: + return "status_code"; + case GRPC_ERROR_INT_ERRNO: + return "errno"; + } + GPR_UNREACHABLE_CODE(return "unknown"); +} + +static const char *error_str_name(grpc_error_strs key) { + switch (key) { + case GRPC_ERROR_STR_DESCRIPTION: + return "description"; + case GRPC_ERROR_STR_OS_ERROR: + return "os_error"; + case GRPC_ERROR_STR_TARGET_ADDRESS: + return "target_address"; + case GRPC_ERROR_STR_SYSCALL: + return "syscall"; + } + GPR_UNREACHABLE_CODE(return "unknown"); +} + +struct grpc_error { + gpr_refcount refs; + gpr_avl ints; + gpr_avl strs; + gpr_avl errs; + uintptr_t next_err; +}; + +static bool is_special(grpc_error *err) { + return err == GRPC_ERROR_NONE || err == GRPC_ERROR_OOM; +} + +grpc_error *grpc_error_ref(grpc_error *err) { + if (is_special(err)) return err; + gpr_ref(&err->refs); + return err; +} + +static void error_destroy(grpc_error *err) { + GPR_ASSERT(!is_special(err)); + gpr_avl_unref(err->ints); + gpr_avl_unref(err->strs); + gpr_avl_unref(err->errs); +} + +void grpc_error_unref(grpc_error *err) { + if (!is_special(err) && gpr_unref(&err->refs)) { + error_destroy(err); + } +} + +grpc_error *grpc_error_create(void) { + grpc_error *err = gpr_malloc(sizeof(*err)); + if (err == NULL) { // TODO(ctiller): make gpr_malloc return NULL + return GRPC_ERROR_OOM; + } + err->ints = gpr_avl_create(&avl_vtable_ints); + err->strs = gpr_avl_create(&avl_vtable_strs); + err->errs = gpr_avl_create(&avl_vtable_errs); + err->next_err = 0; + gpr_ref_init(&err->refs, 1); + return err; +} + +static grpc_error *copy_error_and_unref(grpc_error *in) { + if (is_special(in)) { + return grpc_error_create(); + } + grpc_error *out = gpr_malloc(sizeof(*out)); + out->ints = gpr_avl_ref(in->ints); + out->strs = gpr_avl_ref(in->strs); + out->errs = gpr_avl_ref(in->errs); + out->next_err = in->next_err; + gpr_ref_init(&out->refs, 1); + grpc_error_unref(in); + return out; +} + +grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, + intptr_t value) { + grpc_error *new = copy_error_and_unref(src); + new->ints = gpr_avl_add(new->ints, (void *)(uintptr_t)which, (void *)value); + return new; +} + +grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, + const char *value) { + grpc_error *new = copy_error_and_unref(src); + new->strs = gpr_avl_add(new->strs, (void *)(uintptr_t)which, (void *)value); + return new; +} + +grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) { + grpc_error *new = copy_error_and_unref(src); + new->errs = gpr_avl_add(new->errs, (void *)(new->next_err++), child); + return new; +} + +static const char *no_error_string = "null"; +static const char *oom_error_string = "\"Out of memory\""; + +typedef struct { + char *key; + char *value; +} kv_pair; + +typedef struct { + kv_pair *kvs; + size_t num_kvs; + size_t cap_kvs; +} kv_pairs; + +static void append_kv(kv_pairs *kvs, char *key, char *value) { + if (kvs->num_kvs == kvs->cap_kvs) { + kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4); + kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs); + } + kvs->kvs[kvs->num_kvs].key = key; + kvs->kvs[kvs->num_kvs].value = value; + kvs->num_kvs++; +} + +static void collect_kvs(gpr_avl_node *node, char *key(void *k), + char *fmt(void *v), kv_pairs *kvs) { + if (node == NULL) return; + append_kv(kvs, key(node->key), fmt(node->value)); + collect_kvs(node->left, key, fmt, kvs); + collect_kvs(node->right, key, fmt, kvs); +} + +static char *key_int(void *p) { + return gpr_strdup(error_int_name((grpc_error_ints)(uintptr_t)p)); +} + +static char *key_str(void *p) { + return gpr_strdup(error_str_name((grpc_error_strs)(uintptr_t)p)); +} + +static char *fmt_int(void *p) { + char *s; + gpr_asprintf(&s, "%lld", (intptr_t)p); + return s; +} + +static void append_chr(char c, char **s, size_t *sz, size_t *cap) { + if (*sz == *cap) { + *cap = GPR_MAX(8, 3 * *cap / 2); + *s = gpr_realloc(*s, *cap); + } + (*s)[(*sz)++] = c; +} + +static void append_str(const char *str, char **s, size_t *sz, size_t *cap) { + for (const char *c = str; *c; c++) { + append_chr(*c, s, sz, cap); + } +} + +static void append_esc_str(const char *str, char **s, size_t *sz, size_t *cap) { + static const char *hex = "0123456789abcdef"; + append_chr('"', s, sz, cap); + for (const uint8_t *c = (const uint8_t *)str; *c; c++) { + if (*c < 32 || *c >= 127) { + append_chr('\\', s, sz, cap); + switch (*c) { + case '\b': + append_chr('b', s, sz, cap); + break; + case '\f': + append_chr('f', s, sz, cap); + break; + case '\n': + append_chr('n', s, sz, cap); + break; + case '\r': + append_chr('r', s, sz, cap); + break; + case '\t': + append_chr('t', s, sz, cap); + break; + default: + append_chr('u', s, sz, cap); + append_chr('0', s, sz, cap); + append_chr('0', s, sz, cap); + append_chr(hex[*c >> 4], s, sz, cap); + append_chr(hex[*c & 0x0f], s, sz, cap); + break; + } + } else { + append_chr((char)*c, s, sz, cap); + } + } + append_chr('"', s, sz, cap); + append_chr(0, s, sz, cap); +} + +static char *fmt_str(void *p) { + char *s = NULL; + size_t sz = 0; + size_t cap = 0; + append_esc_str(p, &s, &sz, &cap); + return s; +} + +static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap) { + if (n == NULL) return; + add_errs(n->left, s, sz, cap); + const char *e = grpc_error_string(n->value); + append_str(e, s, sz, cap); + grpc_error_free_string(e); + add_errs(n->right, s, sz, cap); +} + +static char *errs_string(grpc_error *err) { + char *s = NULL; + size_t sz = 0; + size_t cap = 0; + append_chr('[', &s, &sz, &cap); + add_errs(err->errs.root, &s, &sz, &cap); + append_chr(']', &s, &sz, &cap); + return s; +} + +static int cmp_kvs(const void *a, const void *b) { + const kv_pair *ka = a; + const kv_pair *kb = b; + return strcmp(ka->key, kb->key); +} + +static const char *finish_kvs(kv_pairs *kvs) { + char *s = NULL; + size_t sz = 0; + size_t cap = 0; + + append_chr('{', &s, &sz, &cap); + for (size_t i = 0; i < kvs->num_kvs; i++) { + append_esc_str(kvs->kvs[i].key, &s, &sz, &cap); + gpr_free(kvs->kvs[i].key); + append_chr(':', &s, &sz, &cap); + append_str(kvs->kvs[i].value, &s, &sz, &cap); + gpr_free(kvs->kvs[i].value); + } + append_chr('}', &s, &sz, &cap); + + gpr_free(kvs->kvs); + return s; +} + +const char *grpc_error_string(grpc_error *err) { + if (err == GRPC_ERROR_NONE) return no_error_string; + if (err == GRPC_ERROR_OOM) return oom_error_string; + + kv_pairs kvs; + memset(&kvs, 0, sizeof(kvs)); + + collect_kvs(err->ints.root, key_int, fmt_int, &kvs); + collect_kvs(err->strs.root, key_str, fmt_str, &kvs); + append_kv(&kvs, gpr_strdup("referenced_errors"), errs_string(err)); + + qsort(kvs.kvs, kvs.num_kvs, sizeof(kv_pair), cmp_kvs); + + return finish_kvs(&kvs); +} + +grpc_error *grpc_os_error(int err, const char *call_name) { + return grpc_error_set_str( + grpc_error_set_str( + grpc_error_set_int(grpc_error_create(), GRPC_ERROR_INT_ERRNO, err), + GRPC_ERROR_STR_OS_ERROR, strerror(err)), + GRPC_ERROR_STR_SYSCALL, call_name); +} diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h new file mode 100644 index 0000000000..6909bf9da1 --- /dev/null +++ b/src/core/lib/iomgr/error.h @@ -0,0 +1,69 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_ERROR_H +#define GRPC_CORE_LIB_IOMGR_ERROR_H + +#include <stdint.h> + +typedef struct grpc_error grpc_error; + +typedef enum { + GRPC_ERROR_INT_STATUS_CODE, + GRPC_ERROR_INT_ERRNO +} grpc_error_ints; + +typedef enum { + GRPC_ERROR_STR_DESCRIPTION, + GRPC_ERROR_STR_TARGET_ADDRESS, + GRPC_ERROR_STR_OS_ERROR, + GRPC_ERROR_STR_SYSCALL +} grpc_error_strs; + +#define GRPC_ERROR_NONE ((grpc_error *)NULL) +#define GRPC_ERROR_OOM ((grpc_error *)1) + +const char *grpc_error_string(grpc_error *error); +void grpc_error_free_string(const char *str); + +grpc_error *grpc_error_create(void); +grpc_error *grpc_error_ref(grpc_error *err); +void grpc_error_unref(grpc_error *err); +grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, + intptr_t value); +grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, + const char *value); +grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child); +grpc_error *grpc_os_error(int err, const char *call_name); + +#endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */ diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 3c8127e1a8..e6a9cecbda 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -459,7 +459,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { } else { remove_fd_from_all_epoll_sets(fd->fd); } - grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL); } static int fd_wrapped_fd(grpc_fd *fd) { @@ -508,6 +508,15 @@ static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); } static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif +static grpc_error *fd_shutdown_error(bool shutdown) { + if (!shutdown) { + return GRPC_ERROR_NONE; + } else { + return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_DESCRIPTION, + "FD shutdown"); + } +} + static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { if (*st == CLOSURE_NOT_READY) { @@ -516,7 +525,8 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL); + grpc_exec_ctx_push(exec_ctx, closure, fd_shutdown_error(fd->shutdown), + NULL); maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ @@ -539,7 +549,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL); + grpc_exec_ctx_push(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL); *st = CLOSURE_NOT_READY; return 1; } @@ -864,7 +874,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); pollset->vtable->finish_shutdown(pollset); - grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); + grpc_exec_ctx_push(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -1137,7 +1147,8 @@ static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, up_args->promotion_closure.cb = basic_do_promote; up_args->promotion_closure.cb_arg = up_args; - grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); + grpc_closure_list_append(&pollset->idle_jobs, &up_args->promotion_closure, + GRPC_ERROR_NONE); pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); exit: @@ -1573,7 +1584,8 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, /* We don't care about this pollset anymore. */ if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) { da->pollset->called_shutdown = 1; - grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL); + grpc_exec_ctx_push(exec_ctx, da->pollset->shutdown_done, GRPC_ERROR_NONE, + NULL); } } gpr_mu_unlock(&da->pollset->mu); @@ -1597,7 +1609,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, GRPC_FD_REF(fd, "delayed_add"); grpc_closure_init(&da->closure, perform_delayed_add, da); pollset->in_flight_cbs++; - grpc_exec_ctx_enqueue(exec_ctx, &da->closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, &da->closure, GRPC_ERROR_NONE, NULL); } } diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 2146c7dd1f..1111462132 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -47,11 +47,12 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { grpc_closure *c = exec_ctx->closure_list.head; exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; while (c != NULL) { - bool success = (bool)(c->final_data & 1); - grpc_closure *next = (grpc_closure *)(c->final_data & ~(uintptr_t)1); + grpc_closure *next = c->next; + grpc_error *error = c->final_data.error; did_something = true; GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0); - c->cb(exec_ctx, c->cb_arg, success); + c->cb(exec_ctx, c->cb_arg, error); + grpc_error_unref(error); GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0); c = next; } @@ -64,11 +65,11 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { grpc_exec_ctx_flush(exec_ctx); } -void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - bool success, - grpc_workqueue *offload_target_or_null) { +void grpc_exec_ctx_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error, + grpc_workqueue *offload_target_or_null) { GPR_ASSERT(offload_target_or_null == NULL); - grpc_closure_list_add(&exec_ctx->closure_list, closure, success); + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 976cc40347..a0dd1e5017 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -83,9 +83,9 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); /** Add a closure to be executed at the next flush/finish point */ -void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - bool success, - grpc_workqueue *offload_target_or_null); +void grpc_exec_ctx_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error, + grpc_workqueue *offload_target_or_null); /** Add a list of closures to be executed at the next flush/finish point. * Leaves \a list empty. */ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 36e22e4271..8d7535d6fe 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -112,10 +112,10 @@ static void maybe_spawn_locked() { g_executor.pending_join = 1; } -void grpc_executor_enqueue(grpc_closure *closure, bool success) { +void grpc_executor_push(grpc_closure *closure, grpc_error *error) { gpr_mu_lock(&g_executor.mu); if (g_executor.shutting_down == 0) { - grpc_closure_list_add(&g_executor.closures, closure, success); + grpc_closure_list_append(&g_executor.closures, closure, error); maybe_spawn_locked(); } gpr_mu_unlock(&g_executor.mu); diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index b7e6f51aa5..da9dcd07d0 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -45,7 +45,7 @@ void grpc_executor_init(); /** Enqueue \a closure for its eventual execution of \a f(arg) on a separate * thread */ -void grpc_executor_enqueue(grpc_closure *closure, bool success); +void grpc_executor_push(grpc_closure *closure, grpc_error *error); /** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(); diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index cae91eec20..7c137c5d6c 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -173,7 +173,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, r->default_port = gpr_strdup(default_port); r->cb = cb; r->arg = arg; - grpc_executor_enqueue(&r->request_closure, 1); + grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE); } void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name, diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c index fa83ceef30..8b49d91dd9 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.c +++ b/src/core/lib/iomgr/socket_utils_common_posix.c @@ -57,10 +57,10 @@ #include "src/core/lib/support/string.h" /* set a socket to non blocking mode */ -int grpc_set_socket_nonblocking(int fd, int non_blocking) { +grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking) { int oldflags = fcntl(fd, F_GETFL, 0); if (oldflags < 0) { - return 0; + return grpc_os_error(errno, "fcntl"); } if (non_blocking) { @@ -70,52 +70,58 @@ int grpc_set_socket_nonblocking(int fd, int non_blocking) { } if (fcntl(fd, F_SETFL, oldflags) != 0) { - return 0; + return grpc_os_error(errno, "fcntl"); } - return 1; + return GRPC_ERROR_NONE; } -int grpc_set_socket_no_sigpipe_if_possible(int fd) { +grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd) { #ifdef GPR_HAVE_SO_NOSIGPIPE int val = 1; int newval; socklen_t intlen = sizeof(newval); - return 0 == setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) && - 0 == getsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen) && - (newval != 0) == val; -#else - return 1; + if (0 != setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) { + return grpc_os_error(errno, "setsockopt(SO_NOSIGPIPE)"); + } + if (0 == getsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) { + return grpc_os_error(errno, "getsockopt(SO_NOSIGPIPE)"); + } + if ((newval != 0) == val) { + return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_grpc_os_error, + "Failed to set SO_NOSIGPIPE"); + } #endif + return GRPC_ERROR_NONE; } -int grpc_set_socket_ip_pktinfo_if_possible(int fd) { +grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd) { #ifdef GPR_HAVE_IP_PKTINFO int get_local_ip = 1; - return 0 == setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, - sizeof(get_local_ip)); -#else - (void)fd; - return 1; + if (0 != setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, + sizeof(get_local_ip))) { + return grpc_os_error(errno, "setsockopt(IP_PKTINFO)"); + } #endif + return GRPC_ERROR_NONE; } -int grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) { +grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) { #ifdef GPR_HAVE_IPV6_RECVPKTINFO int get_local_ip = 1; - return 0 == setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip, - sizeof(get_local_ip)); -#else - (void)fd; - return 1; + if (0 != setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip, + sizeof(get_local_ip))) { + return grpc_os_error(errno, "setsockopt(IPV6_RECVPKTINFO)"); + } #endif + return GRPC_ERROR_NONE; } /* set a socket to close on exec */ -int grpc_set_socket_cloexec(int fd, int close_on_exec) { +grpc_error *grpc_set_socket_cloexec(int fd, int close_on_exec) { int oldflags = fcntl(fd, F_GETFD, 0); if (oldflags < 0) { - return 0; + return grpc_os_error(errno, "fcntl"); } if (close_on_exec) { @@ -125,30 +131,47 @@ int grpc_set_socket_cloexec(int fd, int close_on_exec) { } if (fcntl(fd, F_SETFD, oldflags) != 0) { - return 0; + return grpc_os_error(errno, "fcntl"); } - return 1; + return GRPC_ERROR_NONE; } /* set a socket to reuse old addresses */ -int grpc_set_socket_reuse_addr(int fd, int reuse) { +grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse) { int val = (reuse != 0); int newval; socklen_t intlen = sizeof(newval); - return 0 == setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) && - 0 == getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen) && - (newval != 0) == val; + if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) { + return grpc_os_error(errno, "setsockopt(SO_REUSEADDR)"); + } + if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen)) { + return grpc_os_error(errno, "getsockopt(SO_REUSEADDR)"); + } + if ((newval != 0) != val) { + return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_OS_ERROR, + "Failed to set SO_REUSEADDR"); + } + + return GRPC_ERROR_NONE; } /* disable nagle */ -int grpc_set_socket_low_latency(int fd, int low_latency) { +grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) { int val = (low_latency != 0); int newval; socklen_t intlen = sizeof(newval); - return 0 == setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) && - 0 == getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen) && - (newval != 0) == val; + if (0 != setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) { + return grpc_os_error(errno, "setsockopt(TCP_NODELAY)"); + } + if (0 != getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen)) { + return grpc_os_error(errno, "getsockopt(TCP_NODELAY)"); + } + if ((newval != 0) != val) { + return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_OS_ERROR, + "Failed to set TCP_NODELAY"); + } + return GRPC_ERROR_NONE; } static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT; @@ -196,35 +219,40 @@ static int set_socket_dualstack(int fd) { } } -int grpc_create_dualstack_socket(const struct sockaddr *addr, int type, - int protocol, grpc_dualstack_mode *dsmode) { +grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type, + int protocol, + grpc_dualstack_mode *dsmode, + int *newfd) { int family = addr->sa_family; if (family == AF_INET6) { - int fd; if (grpc_ipv6_loopback_available()) { - fd = socket(family, type, protocol); + *newfd = socket(family, type, protocol); } else { - fd = -1; + *newfd = -1; errno = EAFNOSUPPORT; } /* Check if we've got a valid dualstack socket. */ - if (fd >= 0 && set_socket_dualstack(fd)) { + if (*newfd >= 0 && set_socket_dualstack(*newfd)) { *dsmode = GRPC_DSMODE_DUALSTACK; - return fd; + return GRPC_ERROR_NONE; } /* If this isn't an IPv4 address, then return whatever we've got. */ if (!grpc_sockaddr_is_v4mapped(addr, NULL)) { *dsmode = GRPC_DSMODE_IPV6; - return fd; + return GRPC_ERROR_NONE; } /* Fall back to AF_INET. */ - if (fd >= 0) { - close(fd); + if (*newfd >= 0) { + close(*newfd); } family = AF_INET; } *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE; - return socket(family, type, protocol); + *newfd = socket(family, type, protocol); + if (*newfd == -1) { + return grpc_os_error(errno, "socket"); + } + return GRPC_ERROR_NONE; } #endif diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index a8f6e5e658..4e66cbc0c5 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -37,21 +37,23 @@ #include <sys/socket.h> #include <unistd.h> +#include "src/core/lib/iomgr/error.h" + /* a wrapper for accept or accept4 */ int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int nonblock, int cloexec); /* set a socket to non blocking mode */ -int grpc_set_socket_nonblocking(int fd, int non_blocking); +grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking); /* set a socket to close on exec */ -int grpc_set_socket_cloexec(int fd, int close_on_exec); +grpc_error *grpc_set_socket_cloexec(int fd, int close_on_exec); /* set a socket to reuse old addresses */ -int grpc_set_socket_reuse_addr(int fd, int reuse); +grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse); /* disable nagle */ -int grpc_set_socket_low_latency(int fd, int low_latency); +grpc_error *grpc_set_socket_low_latency(int fd, int low_latency); /* Returns true if this system can create AF_INET6 sockets bound to ::1. The value is probed once, and cached for the life of the process. @@ -66,17 +68,17 @@ int grpc_ipv6_loopback_available(void); /* Tries to set SO_NOSIGPIPE if available on this platform. Returns 1 on success, 0 on failure. If SO_NO_SIGPIPE is not available, returns 1. */ -int grpc_set_socket_no_sigpipe_if_possible(int fd); +grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd); /* Tries to set IP_PKTINFO if available on this platform. Returns 1 on success, 0 on failure. If IP_PKTINFO is not available, returns 1. */ -int grpc_set_socket_ip_pktinfo_if_possible(int fd); +grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd); /* Tries to set IPV6_RECVPKTINFO if available on this platform. Returns 1 on success, 0 on failure. If IPV6_RECVPKTINFO is not available, returns 1. */ -int grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd); +grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd); /* An enum to keep track of IPv4/IPv6 socket modes. @@ -117,7 +119,9 @@ extern int grpc_forbid_dualstack_sockets_for_testing; IPv4, so that bind() or connect() see the correct family. Also, it's important to distinguish between DUALSTACK and IPV6 when listening on the [::] wildcard address. */ -int grpc_create_dualstack_socket(const struct sockaddr *addr, int type, - int protocol, grpc_dualstack_mode *dsmode); +grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type, + int protocol, + grpc_dualstack_mode *dsmode, + int *newfd); #endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index e93d5734a0..190512c17c 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -71,7 +71,9 @@ typedef struct { grpc_closure *closure; } async_connect; -static int prepare_socket(const struct sockaddr *addr, int fd) { +static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) { + grpc_error *err = GRPC_ERROR_NONE; + if (fd < 0) { goto error; } @@ -83,13 +85,14 @@ static int prepare_socket(const struct sockaddr *addr, int fd) { strerror(errno)); goto error; } - return 1; + goto done; error: if (fd >= 0) { close(fd); } - return 0; +done: + return err; } static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) { @@ -121,6 +124,7 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { grpc_endpoint **ep = ac->ep; grpc_closure *closure = ac->closure; grpc_fd *fd; + grpc_error *error = GRPC_ERROR_NONE; if (grpc_tcp_trace) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: success=%d", @@ -143,8 +147,7 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { &so_error_size); } while (err < 0 && errno == EINTR); if (err < 0) { - gpr_log(GPR_ERROR, "failed to connect to '%s': getsockopt(ERROR): %s", - ac->addr_str, strerror(errno)); + error = grpc_os_error(errno, "getsockopt"); goto finish; } else if (so_error != 0) { if (so_error == ENOBUFS) { @@ -169,14 +172,12 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { } else { switch (so_error) { case ECONNREFUSED: - gpr_log( - GPR_ERROR, - "failed to connect to '%s': socket error: connection refused", - ac->addr_str); + error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, errno); + error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + "Connection refused"); break; default: - gpr_log(GPR_ERROR, "failed to connect to '%s': socket error: %d", - ac->addr_str, so_error); + error = grpc_os_error(errno, "getsockopt(SO_ERROR)"); break; } goto finish; @@ -188,8 +189,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { goto finish; } } else { - gpr_log(GPR_ERROR, "failed to connect to '%s': timeout occurred", - ac->addr_str); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred"); goto finish; } @@ -203,12 +204,18 @@ finish: } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); + if (error != GRPC_ERROR_NONE) { + error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, + "Failed to connect to remote host"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str); + } if (done) { gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); gpr_free(ac); } - grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL, NULL); + grpc_exec_ctx_push(exec_ctx, closure, error, NULL); } static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, @@ -225,6 +232,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_fd *fdobj; char *name; char *addr_str; + grpc_error *error; *ep = NULL; @@ -234,9 +242,10 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, addr_len = sizeof(addr6_v4mapped); } - fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); - if (fd < 0) { - gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); + error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); + if (error != GRPC_ERROR_NONE) { + grpc_exec_ctx_push(exec_ctx, closure, error, NULL); + return; } if (dsmode == GRPC_DSMODE_IPV4) { /* If we got an AF_INET socket, map the address back to IPv4. */ @@ -244,8 +253,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, addr = (struct sockaddr *)&addr4_copy; addr_len = sizeof(addr4_copy); } - if (!prepare_socket(addr, fd)) { - grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); + if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) { + grpc_exec_ctx_push(exec_ctx, closure, error, NULL); return; } @@ -261,14 +270,14 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (err >= 0) { *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str); - grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); goto done; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { - gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno)); grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); - grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); + grpc_exec_ctx_push(exec_ctx, closure, grpc_os_error(errno, "connect"), + NULL); goto done; } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 7210aef5d5..7624d77d8f 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -274,14 +274,13 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->finished_edge = 0; grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL); } } -typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result; - +/* returns true if done, false if pending; if returning true, *error is set */ #define MAX_WRITE_IOVEC 16 -static flush_result tcp_flush(grpc_tcp *tcp) { +static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; msg_iovlen_type iov_size; @@ -331,10 +330,10 @@ static flush_result tcp_flush(grpc_tcp *tcp) { if (errno == EAGAIN) { tcp->outgoing_slice_idx = unwind_slice_idx; tcp->outgoing_byte_idx = unwind_byte_idx; - return FLUSH_PENDING; + return false; } else { - /* TODO(klempner): Log some of these */ - return FLUSH_ERROR; + *error = grpc_os_error(errno, "sendmsg"); + return true; } } @@ -355,7 +354,8 @@ static flush_result tcp_flush(grpc_tcp *tcp) { } if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { - return FLUSH_DONE; + *error = GRPC_ERROR_NONE; + return true; } }; } @@ -363,7 +363,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) { static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, bool success) { grpc_tcp *tcp = (grpc_tcp *)arg; - flush_result status; + grpc_error *error = GRPC_ERROR_NONE; grpc_closure *cb; if (!success) { @@ -374,14 +374,13 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, return; } - status = tcp_flush(tcp); - if (status == FLUSH_PENDING) { + if (!tcp_flush(tcp, &error)) { grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { cb = tcp->write_cb; tcp->write_cb = NULL; GPR_TIMER_BEGIN("tcp_handle_write.cb", 0); - cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE); + cb->cb(exec_ctx, cb->cb_arg, error); GPR_TIMER_END("tcp_handle_write.cb", 0); TCP_UNREF(exec_ctx, tcp, "write"); } @@ -390,7 +389,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_slice_buffer *buf, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; - flush_result status; + grpc_error *error = GRPC_ERROR_NONE; if (grpc_tcp_trace) { size_t i; @@ -408,20 +407,19 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL); + grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); return; } tcp->outgoing_buffer = buf; tcp->outgoing_slice_idx = 0; tcp->outgoing_byte_idx = 0; - status = tcp_flush(tcp); - if (status == FLUSH_PENDING) { + if (!tcp_flush(tcp, &error)) { TCP_REF(tcp, "write"); tcp->write_cb = cb; grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { - grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE, NULL); + grpc_exec_ctx_push(exec_ctx, cb, error, NULL); } GPR_TIMER_END("tcp_write", 0); |