From c027e77d6408c7ae9b1889f64e2629fa66872f3d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 3 May 2016 16:27:00 -0700 Subject: Progress --- src/core/lib/channel/compress_filter.c | 6 +- src/core/lib/channel/http_client_filter.c | 5 +- src/core/lib/channel/http_server_filter.c | 24 ++++--- src/core/lib/http/httpcli.c | 14 ++-- src/core/lib/iomgr/closure.c | 5 +- src/core/lib/iomgr/closure.h | 2 +- src/core/lib/iomgr/error.c | 92 ++++++++++++++++++++++++-- src/core/lib/iomgr/error.h | 29 ++++++-- src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 7 +- src/core/lib/iomgr/resolve_address_posix.c | 3 +- src/core/lib/iomgr/socket_utils_common_posix.c | 34 +++++----- src/core/lib/iomgr/tcp_client_posix.c | 25 ++++--- src/core/lib/iomgr/tcp_posix.c | 26 ++++---- src/core/lib/iomgr/tcp_server.h | 4 +- src/core/lib/iomgr/tcp_server_posix.c | 66 ++++++++++-------- src/core/lib/iomgr/timer.c | 27 +++++--- src/core/lib/iomgr/workqueue.h | 2 +- src/core/lib/iomgr/workqueue_posix.c | 10 +-- src/core/lib/surface/alarm.c | 4 +- src/core/lib/surface/call.c | 53 ++++++++------- src/core/lib/surface/completion_queue.h | 2 +- 21 files changed, 286 insertions(+), 154 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 5510c79b18..e029ec54be 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -155,11 +155,11 @@ static void process_send_initial_metadata( static void continue_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); -static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, bool success) { +static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; gpr_slice_buffer_reset_and_unref(&calld->slices); - calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, success); + calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); } static void finish_send_message(grpc_exec_ctx *exec_ctx, @@ -205,7 +205,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_call_next_op(exec_ctx, elem, &calld->send_op); } -static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, bool success) { +static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 516e708d1f..ced3d75cd2 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -80,7 +80,8 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { return md; } -static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { +static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_error *error) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; client_recv_filter_args a; @@ -88,7 +89,7 @@ static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { a.exec_ctx = exec_ctx; grpc_metadata_batch_filter(calld->recv_initial_metadata, client_recv_filter, &a); - calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); + calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, error); } static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index ba865416de..433ed0ed53 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -142,10 +142,11 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { } } -static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { +static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_error *err) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - if (success) { + if (err == GRPC_ERROR_NONE) { server_filter_args a; a.elem = elem; a.exec_ctx = exec_ctx; @@ -157,27 +158,32 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { calld->seen_path && calld->seen_authority) { /* do nothing */ } else { + err = GRPC_ERROR_CREATE("Bad incoming HTTP headers"); if (!calld->seen_path) { - gpr_log(GPR_ERROR, "Missing :path header"); + err = grpc_error_add_child(err, + GRPC_ERROR_CREATE("Missing :path header")); } if (!calld->seen_authority) { - gpr_log(GPR_ERROR, "Missing :authority header"); + err = grpc_error_add_child( + err, GRPC_ERROR_CREATE("Missing :authority header")); } if (!calld->seen_method) { - gpr_log(GPR_ERROR, "Missing :method header"); + err = grpc_error_add_child(err, + GRPC_ERROR_CREATE("Missing :method header")); } if (!calld->seen_scheme) { - gpr_log(GPR_ERROR, "Missing :scheme header"); + err = grpc_error_add_child(err, + GRPC_ERROR_CREATE("Missing :scheme header")); } if (!calld->seen_te_trailers) { - gpr_log(GPR_ERROR, "Missing te trailers header"); + err = grpc_error_add_child( + err, GRPC_ERROR_CREATE("Missing te: trailers header")); } /* Error this call out */ - success = 0; grpc_call_element_send_cancel(exec_ctx, elem); } } - calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); + calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, err); } static void hs_mutate_op(grpc_call_element *elem, diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index f22721ac8f..a59452fc59 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -117,13 +117,12 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, gpr_free(req); } -static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success); - static void do_read(grpc_exec_ctx *exec_ctx, internal_request *req) { grpc_endpoint_read(exec_ctx, req->ep, &req->incoming, &req->on_read); } -static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_error *error) { internal_request *req = user_data; size_t i; @@ -137,7 +136,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { } } - if (success) { + if (error == GRPC_ERROR_NONE) { do_read(exec_ctx, req); } else if (!req->have_read_byte) { next_address(exec_ctx, req); @@ -154,9 +153,9 @@ static void on_written(grpc_exec_ctx *exec_ctx, internal_request *req) { do_read(exec_ctx, req); } -static void done_write(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void done_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { internal_request *req = arg; - if (success) { + if (error == GRPC_ERROR_NONE) { on_written(exec_ctx, req); } else { next_address(exec_ctx, req); @@ -182,7 +181,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, start_write(exec_ctx, req); } -static void on_connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void on_connected(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { internal_request *req = arg; if (!req->ep) { diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 8738c4a5ce..885637efe9 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -87,12 +87,13 @@ typedef struct { grpc_closure wrapper; } wrapped_closure; -static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { wrapped_closure *wc = arg; grpc_iomgr_cb_func cb = wc->cb; void *cb_arg = wc->cb_arg; gpr_free(wc); - cb(exec_ctx, cb_arg, success); + cb(exec_ctx, cb_arg, error); } grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) { diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 8e76412ee0..14132d926e 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -56,7 +56,7 @@ typedef struct grpc_closure_list { * \param success An indication on the state of the iomgr. On false, cleanup * actions should be taken (eg, shutdown). */ typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg, - bool success); + grpc_error *error); /** A closure over a grpc_iomgr_cb_func. */ struct grpc_closure { diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 7dff3ece35..d0e4301722 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -58,6 +58,16 @@ static void destroy_err(void *err) { grpc_error_unref(err); } static void *copy_err(void *err) { return grpc_error_ref(err); } +static void destroy_time(void *tm) { gpr_free(tm); } + +static gpr_timespec *box_time(gpr_timespec tm) { + gpr_timespec *out = gpr_malloc(sizeof(*out)); + *out = tm; + return out; +} + +static void *copy_time(void *tm) { return box_time(*(gpr_timespec *)tm); } + static const gpr_avl_vtable avl_vtable_ints = {destroy_integer, copy_integer, compare_integers, destroy_integer, copy_integer}; @@ -66,6 +76,9 @@ static const gpr_avl_vtable avl_vtable_strs = {destroy_integer, copy_integer, compare_integers, destroy_string, copy_string}; +static const gpr_avl_vtable avl_vtable_times = { + destroy_integer, copy_integer, compare_integers, destroy_time, copy_time}; + static const gpr_avl_vtable avl_vtable_errs = { destroy_integer, copy_integer, compare_integers, destroy_err, copy_err}; @@ -75,6 +88,8 @@ static const char *error_int_name(grpc_error_ints key) { return "status_code"; case GRPC_ERROR_INT_ERRNO: return "errno"; + case GRPC_ERROR_INT_FILE_LINE: + return "file_line"; } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -89,6 +104,16 @@ static const char *error_str_name(grpc_error_strs key) { return "target_address"; case GRPC_ERROR_STR_SYSCALL: return "syscall"; + case GRPC_ERROR_STR_FILE: + return "file"; + } + GPR_UNREACHABLE_CODE(return "unknown"); +} + +static const char *error_time_name(grpc_error_times key) { + switch (key) { + case GRPC_ERROR_TIME_CREATED: + return "created"; } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -97,12 +122,14 @@ struct grpc_error { gpr_refcount refs; gpr_avl ints; gpr_avl strs; + gpr_avl times; gpr_avl errs; uintptr_t next_err; }; static bool is_special(grpc_error *err) { - return err == GRPC_ERROR_NONE || err == GRPC_ERROR_OOM; + return err == GRPC_ERROR_NONE || err == GRPC_ERROR_OOM || + err == GRPC_ERROR_CANCELLED; } grpc_error *grpc_error_ref(grpc_error *err) { @@ -116,6 +143,7 @@ static void error_destroy(grpc_error *err) { gpr_avl_unref(err->ints); gpr_avl_unref(err->strs); gpr_avl_unref(err->errs); + gpr_avl_unref(err->times); } void grpc_error_unref(grpc_error *err) { @@ -124,14 +152,29 @@ void grpc_error_unref(grpc_error *err) { } } -grpc_error *grpc_error_create(void) { +grpc_error *grpc_error_create(const char *file, int line, const char *desc, + grpc_error **referencing, + size_t num_referencing) { grpc_error *err = gpr_malloc(sizeof(*err)); if (err == NULL) { // TODO(ctiller): make gpr_malloc return NULL return GRPC_ERROR_OOM; } - err->ints = gpr_avl_create(&avl_vtable_ints); - err->strs = gpr_avl_create(&avl_vtable_strs); + err->ints = gpr_avl_add(gpr_avl_create(&avl_vtable_ints), + (void *)(uintptr_t)GRPC_ERROR_INT_FILE_LINE, + (void *)(uintptr_t)line); + err->strs = gpr_avl_add( + gpr_avl_add(gpr_avl_create(&avl_vtable_strs), + (void *)(uintptr_t)GRPC_ERROR_STR_FILE, (void *)file), + (void *)(uintptr_t)GRPC_ERROR_STR_DESCRIPTION, (void *)desc); err->errs = gpr_avl_create(&avl_vtable_errs); + for (size_t i = 0; i < num_referencing; i++) { + if (referencing[i] == GRPC_ERROR_NONE) continue; + err->errs = + gpr_avl_add(err->errs, (void *)(err->next_err++), referencing[i]); + } + err->times = gpr_avl_add(gpr_avl_create(&avl_vtable_times), + (void *)(uintptr_t)GRPC_ERROR_TIME_CREATED, + box_time(gpr_now(GPR_CLOCK_REALTIME))); err->next_err = 0; gpr_ref_init(&err->refs, 1); return err; @@ -139,12 +182,16 @@ grpc_error *grpc_error_create(void) { static grpc_error *copy_error_and_unref(grpc_error *in) { if (is_special(in)) { - return grpc_error_create(); + if (in == GRPC_ERROR_NONE) return GRPC_ERROR_CREATE("no error"); + if (in == GRPC_ERROR_OOM) return GRPC_ERROR_CREATE("oom"); + if (in == GRPC_ERROR_CANCELLED) return GRPC_ERROR_CREATE("cancelled"); + return GRPC_ERROR_CREATE("unknown"); } grpc_error *out = gpr_malloc(sizeof(*out)); out->ints = gpr_avl_ref(in->ints); out->strs = gpr_avl_ref(in->strs); out->errs = gpr_avl_ref(in->errs); + out->times = gpr_avl_ref(in->times); out->next_err = in->next_err; gpr_ref_init(&out->refs, 1); grpc_error_unref(in); @@ -173,6 +220,7 @@ grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) { static const char *no_error_string = "null"; static const char *oom_error_string = "\"Out of memory\""; +static const char *cancelled_error_string = "\"Cancelled\""; typedef struct { char *key; @@ -211,6 +259,10 @@ static char *key_str(void *p) { return gpr_strdup(error_str_name((grpc_error_strs)(uintptr_t)p)); } +static char *key_time(void *p) { + return gpr_strdup(error_time_name((grpc_error_times)(uintptr_t)p)); +} + static char *fmt_int(void *p) { char *s; gpr_asprintf(&s, "%lld", (intptr_t)p); @@ -277,6 +329,28 @@ static char *fmt_str(void *p) { return s; } +static char *fmt_time(void *p) { + gpr_timespec tm = *(gpr_timespec *)p; + char *out; + char *pfx = "!!"; + switch (tm.clock_type) { + case GPR_CLOCK_MONOTONIC: + pfx = "@monotonic:"; + break; + case GPR_CLOCK_REALTIME: + pfx = "@"; + break; + case GPR_CLOCK_PRECISE: + pfx = "@precise:"; + break; + case GPR_TIMESPAN: + pfx = ""; + break; + } + gpr_asprintf(&out, "%s%d.%09d", pfx, tm.tv_sec, tm.tv_nsec); + return out; +} + static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap) { if (n == NULL) return; add_errs(n->left, s, sz, cap); @@ -324,12 +398,14 @@ static const char *finish_kvs(kv_pairs *kvs) { const char *grpc_error_string(grpc_error *err) { if (err == GRPC_ERROR_NONE) return no_error_string; if (err == GRPC_ERROR_OOM) return oom_error_string; + if (err == GRPC_ERROR_CANCELLED) return cancelled_error_string; kv_pairs kvs; memset(&kvs, 0, sizeof(kvs)); collect_kvs(err->ints.root, key_int, fmt_int, &kvs); collect_kvs(err->strs.root, key_str, fmt_str, &kvs); + collect_kvs(err->times.root, key_time, fmt_time, &kvs); append_kv(&kvs, gpr_strdup("referenced_errors"), errs_string(err)); qsort(kvs.kvs, kvs.num_kvs, sizeof(kv_pair), cmp_kvs); @@ -337,10 +413,12 @@ const char *grpc_error_string(grpc_error *err) { return finish_kvs(&kvs); } -grpc_error *grpc_os_error(int err, const char *call_name) { +grpc_error *grpc_os_error(const char *file, int line, int err, + const char *call_name) { return grpc_error_set_str( grpc_error_set_str( - grpc_error_set_int(grpc_error_create(), GRPC_ERROR_INT_ERRNO, err), + grpc_error_set_int(grpc_error_create(file, line, "OS Error", NULL, 0), + GRPC_ERROR_INT_ERRNO, err), GRPC_ERROR_STR_OS_ERROR, strerror(err)), GRPC_ERROR_STR_SYSCALL, call_name); } diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 6909bf9da1..0ce684fdbc 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -36,34 +36,53 @@ #include +#include + typedef struct grpc_error grpc_error; typedef enum { + GRPC_ERROR_INT_ERRNO, + GRPC_ERROR_INT_FILE_LINE, GRPC_ERROR_INT_STATUS_CODE, - GRPC_ERROR_INT_ERRNO } grpc_error_ints; typedef enum { GRPC_ERROR_STR_DESCRIPTION, - GRPC_ERROR_STR_TARGET_ADDRESS, + GRPC_ERROR_STR_FILE, GRPC_ERROR_STR_OS_ERROR, - GRPC_ERROR_STR_SYSCALL + GRPC_ERROR_STR_SYSCALL, + GRPC_ERROR_STR_TARGET_ADDRESS, } grpc_error_strs; +typedef enum { + GRPC_ERROR_TIME_CREATED, +} grpc_error_times; + #define GRPC_ERROR_NONE ((grpc_error *)NULL) #define GRPC_ERROR_OOM ((grpc_error *)1) +#define GRPC_ERROR_CANCELLED ((grpc_error *)2) const char *grpc_error_string(grpc_error *error); void grpc_error_free_string(const char *str); -grpc_error *grpc_error_create(void); +grpc_error *grpc_error_create(const char *file, int line, const char *desc, + grpc_error **referencing, size_t num_referencing); +#define GRPC_ERROR_CREATE(desc) \ + grpc_error_create(__FILE__, __LINE__, desc, NULL, 0) +#define GRPC_ERROR_CREATE_REFERENCING(desc, errs, count) \ + grpc_error_create(__FILE__, __LINE__, desc, errs, count) grpc_error *grpc_error_ref(grpc_error *err); void grpc_error_unref(grpc_error *err); grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, intptr_t value); +grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which, + gpr_timespec value); grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, const char *value); grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child); -grpc_error *grpc_os_error(int err, const char *call_name); +grpc_error *grpc_os_error(const char *file, int line, int err, + const char *call_name); +#define GRPC_OS_ERROR(err, call_name) \ + grpc_os_error(__FILE__, __LINE__, err, call_name) #endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */ diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index e6a9cecbda..d11c947df2 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -512,8 +512,7 @@ static grpc_error *fd_shutdown_error(bool shutdown) { if (!shutdown) { return GRPC_ERROR_NONE; } else { - return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_DESCRIPTION, - "FD shutdown"); + return GRPC_ERROR_CREATE("FD shutdown"); } } @@ -1045,7 +1044,7 @@ typedef struct grpc_unary_promote_args { } grpc_unary_promote_args; static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, - bool success) { + grpc_error *error) { grpc_unary_promote_args *up_args = args; const grpc_pollset_vtable *original_vtable = up_args->original_vtable; grpc_pollset *pollset = up_args->pollset; @@ -1571,7 +1570,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_status) { + grpc_error *error) { delayed_add *da = arg; if (!fd_is_orphaned(da->fd)) { diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 7c137c5d6c..e7d90b9c60 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -147,7 +147,8 @@ grpc_resolved_addresses *(*grpc_blocking_resolve_address)( /* Callback to be passed to grpc_executor to asynch-ify * grpc_blocking_resolve_address */ -static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) { +static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, + grpc_error *error) { request *r = rp; grpc_resolved_addresses *resolved = grpc_blocking_resolve_address(r->name, r->default_port); diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c index 8b49d91dd9..11ab090495 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.c +++ b/src/core/lib/iomgr/socket_utils_common_posix.c @@ -60,7 +60,7 @@ grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking) { int oldflags = fcntl(fd, F_GETFL, 0); if (oldflags < 0) { - return grpc_os_error(errno, "fcntl"); + return GRPC_OS_ERROR(errno, "fcntl"); } if (non_blocking) { @@ -70,7 +70,7 @@ grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking) { } if (fcntl(fd, F_SETFL, oldflags) != 0) { - return grpc_os_error(errno, "fcntl"); + return GRPC_OS_ERROR(errno, "fcntl"); } return GRPC_ERROR_NONE; @@ -82,13 +82,13 @@ grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd) { int newval; socklen_t intlen = sizeof(newval); if (0 != setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) { - return grpc_os_error(errno, "setsockopt(SO_NOSIGPIPE)"); + return GRPC_OS_ERROR(errno, "setsockopt(SO_NOSIGPIPE)"); } if (0 == getsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) { - return grpc_os_error(errno, "getsockopt(SO_NOSIGPIPE)"); + return GRPC_OS_ERROR(errno, "getsockopt(SO_NOSIGPIPE)"); } if ((newval != 0) == val) { - return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_grpc_os_error, + return grpc_error_set_str(GRPC_ERROR_CREATE(), GRPC_ERROR_STR_grpc_os_error, "Failed to set SO_NOSIGPIPE"); } #endif @@ -100,7 +100,7 @@ grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd) { int get_local_ip = 1; if (0 != setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, sizeof(get_local_ip))) { - return grpc_os_error(errno, "setsockopt(IP_PKTINFO)"); + return GRPC_OS_ERROR(errno, "setsockopt(IP_PKTINFO)"); } #endif return GRPC_ERROR_NONE; @@ -111,7 +111,7 @@ grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) { int get_local_ip = 1; if (0 != setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip, sizeof(get_local_ip))) { - return grpc_os_error(errno, "setsockopt(IPV6_RECVPKTINFO)"); + return GRPC_OS_ERROR(errno, "setsockopt(IPV6_RECVPKTINFO)"); } #endif return GRPC_ERROR_NONE; @@ -121,7 +121,7 @@ grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) { grpc_error *grpc_set_socket_cloexec(int fd, int close_on_exec) { int oldflags = fcntl(fd, F_GETFD, 0); if (oldflags < 0) { - return grpc_os_error(errno, "fcntl"); + return GRPC_OS_ERROR(errno, "fcntl"); } if (close_on_exec) { @@ -131,7 +131,7 @@ grpc_error *grpc_set_socket_cloexec(int fd, int close_on_exec) { } if (fcntl(fd, F_SETFD, oldflags) != 0) { - return grpc_os_error(errno, "fcntl"); + return GRPC_OS_ERROR(errno, "fcntl"); } return GRPC_ERROR_NONE; @@ -143,14 +143,13 @@ grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse) { int newval; socklen_t intlen = sizeof(newval); if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) { - return grpc_os_error(errno, "setsockopt(SO_REUSEADDR)"); + return GRPC_OS_ERROR(errno, "setsockopt(SO_REUSEADDR)"); } if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen)) { - return grpc_os_error(errno, "getsockopt(SO_REUSEADDR)"); + return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEADDR)"); } if ((newval != 0) != val) { - return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_OS_ERROR, - "Failed to set SO_REUSEADDR"); + return GRPC_ERROR_CREATE("Failed to set SO_REUSEADDR"); } return GRPC_ERROR_NONE; @@ -162,14 +161,13 @@ grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) { int newval; socklen_t intlen = sizeof(newval); if (0 != setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) { - return grpc_os_error(errno, "setsockopt(TCP_NODELAY)"); + return GRPC_OS_ERROR(errno, "setsockopt(TCP_NODELAY)"); } if (0 != getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen)) { - return grpc_os_error(errno, "getsockopt(TCP_NODELAY)"); + return GRPC_OS_ERROR(errno, "getsockopt(TCP_NODELAY)"); } if ((newval != 0) != val) { - return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_OS_ERROR, - "Failed to set TCP_NODELAY"); + return GRPC_ERROR_CREATE("Failed to set TCP_NODELAY"); } return GRPC_ERROR_NONE; } @@ -250,7 +248,7 @@ grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type, *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE; *newfd = socket(family, type, protocol); if (*newfd == -1) { - return grpc_os_error(errno, "socket"); + return GRPC_OS_ERROR(errno, "socket"); } return GRPC_ERROR_NONE; } diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 190512c17c..04b5b625a1 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -95,12 +95,14 @@ done: return err; } -static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) { +static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { int done; async_connect *ac = acp; if (grpc_tcp_trace) { - gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: success=%d", ac->addr_str, - success); + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s", ac->addr_str, + str); + grpc_error_free_string(str); } gpr_mu_lock(&ac->mu); if (ac->fd != NULL) { @@ -115,7 +117,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) { } } -static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { +static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { async_connect *ac = acp; int so_error = 0; socklen_t so_error_size; @@ -124,11 +126,12 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { grpc_endpoint **ep = ac->ep; grpc_closure *closure = ac->closure; grpc_fd *fd; - grpc_error *error = GRPC_ERROR_NONE; if (grpc_tcp_trace) { - gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: success=%d", - ac->addr_str, success); + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: error=%s", + ac->addr_str, str); + grpc_error_free_string(str); } gpr_mu_lock(&ac->mu); @@ -140,14 +143,14 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { grpc_timer_cancel(exec_ctx, &ac->alarm); gpr_mu_lock(&ac->mu); - if (success) { + if (error == GRPC_ERROR_NONE) { do { so_error_size = sizeof(so_error); err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); } while (err < 0 && errno == EINTR); if (err < 0) { - error = grpc_os_error(errno, "getsockopt"); + error = GRPC_OS_ERROR(errno, "getsockopt"); goto finish; } else if (so_error != 0) { if (so_error == ENOBUFS) { @@ -177,7 +180,7 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) { "Connection refused"); break; default: - error = grpc_os_error(errno, "getsockopt(SO_ERROR)"); + error = GRPC_OS_ERROR(errno, "getsockopt(SO_ERROR)"); break; } goto finish; @@ -276,7 +279,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (errno != EWOULDBLOCK && errno != EINPROGRESS) { grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); - grpc_exec_ctx_push(exec_ctx, closure, grpc_os_error(errno, "connect"), + grpc_exec_ctx_push(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"), NULL); goto done; } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 7624d77d8f..925bcf2f6e 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -101,9 +101,9 @@ typedef struct { } grpc_tcp; static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - bool success); + grpc_error *error); static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - bool success); + grpc_error *error); static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; @@ -155,12 +155,15 @@ static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { TCP_UNREF(exec_ctx, tcp, "destroy"); } -static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) { +static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, + grpc_error *error) { grpc_closure *cb = tcp->read_cb; if (grpc_tcp_trace) { size_t i; - gpr_log(GPR_DEBUG, "read: success=%d", success); + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "read: error=%s", str); + grpc_error_free_string(str); for (i = 0; i < tcp->incoming_buffer->count; i++) { char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); @@ -171,7 +174,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) { tcp->read_cb = NULL; tcp->incoming_buffer = NULL; - cb->cb(exec_ctx, cb->cb_arg, success); + cb->cb(exec_ctx, cb->cb_arg, error); } #define MAX_READ_IOVEC 4 @@ -240,7 +243,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { ++tcp->iov_size; } GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); - call_read_cb(exec_ctx, tcp, 1); + call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE); TCP_UNREF(exec_ctx, tcp, "read"); } @@ -248,11 +251,11 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { } static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - bool success) { + grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); - if (!success) { + if (error != GRPC_ERROR_NONE) { gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); call_read_cb(exec_ctx, tcp, 0); TCP_UNREF(exec_ctx, tcp, "read"); @@ -332,7 +335,7 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { tcp->outgoing_byte_idx = unwind_byte_idx; return false; } else { - *error = grpc_os_error(errno, "sendmsg"); + *error = GRPC_OS_ERROR(errno, "sendmsg"); return true; } } @@ -361,12 +364,11 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { } static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - bool success) { + grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)arg; - grpc_error *error = GRPC_ERROR_NONE; grpc_closure *cb; - if (!success) { + if (error != GRPC_ERROR_NONE) { cb = tcp->write_cb; tcp->write_cb = NULL; cb->cb(exec_ctx, cb->cb_arg, 0); diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 99b9f29729..75c582d18f 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -73,8 +73,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, but not dualstack sockets. */ /* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle all of the multiple socket port matching logic in one place */ -int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - size_t addr_len); +grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, + size_t addr_len, int *out_port); /* Number of fds at the given port_index, or 0 if port_index is out of bounds. */ diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index aaeb384f6e..64ab1c144b 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -59,6 +59,8 @@ #include #include #include +#include + #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" @@ -150,7 +152,7 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) { static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, true, NULL); + grpc_exec_ctx_push(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); } gpr_mu_destroy(&s->mu); @@ -165,7 +167,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, - bool success) { + grpc_error *error) { grpc_tcp_server *s = server; gpr_mu_lock(&s->mu); s->destroyed_ports++; @@ -306,14 +308,14 @@ error: } /* event manager callback when reads are ready */ -static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_tcp_listener *sp = arg; grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, sp->fd_index}; grpc_fd *fdobj; size_t i; - if (!success) { + if (err != GRPC_ERROR_NONE) { goto error; } @@ -420,8 +422,8 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd, return sp; } -int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - size_t addr_len) { +grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, + size_t addr_len, int *out_port) { grpc_tcp_listener *sp; grpc_tcp_listener *sp2 = NULL; int fd; @@ -436,6 +438,7 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, int port; unsigned port_index = 0; unsigned fd_index = 0; + grpc_error *errs[2] = {GRPC_ERROR_NONE, GRPC_ERROR_NONE}; if (s->tail != NULL) { port_index = s->tail->port_index + 1; } @@ -474,26 +477,26 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, /* Try listening on IPv6 first. */ addr = (struct sockaddr *)&wild6; addr_len = sizeof(wild6); - fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); - sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index); - if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { - goto done; - } - if (sp != NULL) { - ++fd_index; - } - /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ - if (port == 0 && sp != NULL) { - grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port); + errs[0] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); + if (errs[0] == GRPC_ERROR_NONE) { + sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index); + if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { + goto done; + } + if (sp != NULL) { + ++fd_index; + } + /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ + if (port == 0 && sp != NULL) { + grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port); + } + addr = (struct sockaddr *)&wild4; + addr_len = sizeof(wild4); } - addr = (struct sockaddr *)&wild4; - addr_len = sizeof(wild4); } - fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); - if (fd < 0) { - gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); - } else { + errs[1] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); + if (errs[1] == GRPC_ERROR_NONE) { if (dsmode == GRPC_DSMODE_IPV4 && grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { addr = (struct sockaddr *)&addr4_copy; @@ -510,9 +513,19 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, done: gpr_free(allocated_addr); if (sp != NULL) { - return sp->port; + *out_port = sp->port; + grpc_error_unref(errs[0]); + grpc_error_unref(errs[1]); + return GRPC_ERROR_NONE; } else { - return -1; + *out_port = -1; + char *addr_str = grpc_sockaddr_to_uri(addr); + grpc_error *err = grpc_error_set_str( + GRPC_ERROR_CREATE_REFERENCING("Failed to add port to server", errs, + GPR_ARRAY_SIZE(errs)), + GRPC_ERROR_STR_TARGET_ADDRESS, addr_str); + gpr_free(addr_str); + return err; } } @@ -581,7 +594,8 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, grpc_closure *shutdown_starting) { gpr_mu_lock(&s->mu); - grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1); + grpc_closure_list_append(&s->shutdown_starting, shutdown_starting, + GRPC_ERROR_NONE); gpr_mu_unlock(&s->mu); } diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer.c index acb5b26c87..d785d1543f 100644 --- a/src/core/lib/iomgr/timer.c +++ b/src/core/lib/iomgr/timer.c @@ -73,7 +73,7 @@ static shard_type *g_shard_queue[NUM_SHARDS]; static bool g_initialized = false; static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next, int success); + gpr_timespec *next, grpc_error *error); static gpr_timespec compute_min_deadline(shard_type *shard) { return grpc_timer_heap_is_empty(&shard->heap) @@ -185,13 +185,16 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, if (!g_initialized) { timer->triggered = 1; - grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL); + grpc_exec_ctx_push( + exec_ctx, &timer->closure, + GRPC_ERROR_CREATE("Attempt to create timer before initialization"), + NULL); return; } if (gpr_time_cmp(deadline, now) <= 0) { timer->triggered = 1; - grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); return; } @@ -238,7 +241,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { shard_type *shard = &g_shards[shard_idx(timer)]; gpr_mu_lock(&shard->mu); if (!timer->triggered) { - grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL); + grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL); timer->triggered = 1; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); @@ -299,12 +302,12 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { /* REQUIRES: shard->mu unlocked */ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, gpr_timespec now, gpr_timespec *new_min_deadline, - int success) { + grpc_error *error) { size_t n = 0; grpc_timer *timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { - grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success, NULL); + grpc_exec_ctx_push(exec_ctx, &timer->closure, grpc_error_ref(error), NULL); n++; } *new_min_deadline = compute_min_deadline(shard); @@ -313,7 +316,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, } static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next, int success) { + gpr_timespec *next, grpc_error *error) { size_t n = 0; /* TODO(ctiller): verify that there are any timers (atomically) here */ @@ -327,8 +330,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, /* For efficiency, we pop as many available timers as we can from the shard. This may violate perfect timer deadline ordering, but that shouldn't be a big deal because we don't make ordering guarantees. */ - n += pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, - success); + n += + pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error); /* An grpc_timer_init() on the shard could intervene here, adding a new timer that is earlier than new_min_deadline. However, @@ -359,6 +362,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, *next, gpr_time_add(now, gpr_time_from_millis(1, GPR_TIMESPAN))); } + grpc_error_unref(error); + return (int)n; } @@ -367,5 +372,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, GPR_ASSERT(now.clock_type == g_clock_type); return run_some_expired_timers( exec_ctx, now, next, - gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0); + gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 + ? GRPC_ERROR_NONE + : GRPC_ERROR_CREATE("Shutting down timer system")); } diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 3e2b223670..20b47e1c14 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -78,6 +78,6 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, /** Add a work item to a workqueue */ void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, - int success); + grpc_error *error); #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */ diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 80e7a0b206..e27525dd50 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -45,7 +45,7 @@ #include "src/core/lib/iomgr/ev_posix.h" -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success); +static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) { char name[32]; @@ -110,10 +110,10 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { gpr_mu_unlock(&workqueue->mu); } -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_workqueue *workqueue = arg; - if (!success) { + if (error != GRPC_ERROR_NONE) { gpr_mu_destroy(&workqueue->mu); /* HACK: let wakeup_fd code know that we stole the fd */ workqueue->wakeup_fd.read_fd = 0; @@ -131,12 +131,12 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { } void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, - int success) { + grpc_error *error) { gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } - grpc_closure_list_add(&workqueue->closure_list, closure, success); + grpc_closure_list_append(&workqueue->closure_list, closure, error); gpr_mu_unlock(&workqueue->mu); } diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index 2cf2f00b31..aa9d60ee6a 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -48,9 +48,9 @@ struct grpc_alarm { static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg, grpc_cq_completion *c) {} -static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_alarm *alarm = arg; - grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, success, + grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, error, do_nothing_end_completion, NULL, &alarm->completion); } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 9b2b94eedf..0d6c58db62 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -122,6 +122,7 @@ typedef struct batch_control { grpc_closure finish_batch; void *notify_tag; gpr_refcount steps_to_complete; + grpc_error *error; uint8_t send_initial_metadata; uint8_t send_message; @@ -130,7 +131,6 @@ typedef struct batch_control { uint8_t recv_message; uint8_t recv_final_op; uint8_t is_notify_tag_closure; - uint8_t success; } batch_control; struct grpc_call { @@ -239,9 +239,9 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, - bool success); + grpc_error *error); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - bool success); + grpc_error *error); grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, @@ -361,7 +361,8 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } -static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { +static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, + grpc_error *error) { size_t i; int ii; grpc_call *c = call; @@ -706,13 +707,13 @@ typedef struct cancel_closure { grpc_status_code status; } cancel_closure; -static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { +static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, grpc_error *error) { cancel_closure *cc = ccp; GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel"); gpr_free(cc); } -static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { +static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, grpc_error *error) { grpc_transport_stream_op op; cancel_closure *cc = ccp; memset(&op, 0, sizeof(op)); @@ -739,7 +740,7 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, cc->call = c; cc->status = status; GRPC_CALL_INTERNAL_REF(c, "cancel"); - grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, &cc->closure, GRPC_ERROR_NONE, NULL); return GRPC_CALL_OK; } @@ -775,11 +776,11 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { return CALL_FROM_TOP_ELEM(elem); } -static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call *call = arg; gpr_mu_lock(&call->mu); call->have_alarm = 0; - if (success) { + if (error != GRPC_ERROR_NONE) { cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED, "Deadline Exceeded"); } @@ -961,7 +962,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; if (bctl->is_notify_tag_closure) { - grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success, NULL); + grpc_exec_ctx_push(exec_ctx, bctl->notify_tag, bctl->error, NULL); gpr_mu_lock(&call->mu); bctl->call->used_batches = (uint8_t)(bctl->call->used_batches & @@ -969,7 +970,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&call->mu); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { - grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success, + grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->error, finish_batch_completion, bctl, &bctl->cq_completion); } } @@ -1001,11 +1002,11 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, } static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - bool success) { + grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; - if (success) { + if (error == GRPC_ERROR_NONE) { gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, call->receiving_slice); continue_receiving_slices(exec_ctx, bctl); @@ -1058,15 +1059,15 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, } static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - bool success) { + grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; gpr_mu_lock(&bctl->call->mu); - if (bctl->call->has_initial_md_been_received || !success || + if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE || call->receiving_stream == NULL) { gpr_mu_unlock(&bctl->call->mu); - process_data_after_md(exec_ctx, bctlp, success); + process_data_after_md(exec_ctx, bctlp, error); } else { call->saved_receiving_stream_ready_bctlp = bctlp; gpr_mu_unlock(&bctl->call->mu); @@ -1074,14 +1075,14 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, - void *bctlp, bool success) { + void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; gpr_mu_lock(&call->mu); - if (!success) { - bctl->success = false; + if (error != GRPC_ERROR_NONE) { + bctl->error = grpc_error_ref(error); } else { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; @@ -1101,7 +1102,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, grpc_closure *saved_rsr_closure = grpc_closure_create( receiving_stream_ready, call->saved_receiving_stream_ready_bctlp); call->saved_receiving_stream_ready_bctlp = NULL; - grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, success, NULL); + grpc_exec_ctx_push(exec_ctx, saved_rsr_closure, error, NULL); } gpr_mu_unlock(&call->mu); @@ -1111,7 +1112,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, } } -static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { +static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, + grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; grpc_call *child_call; @@ -1119,7 +1121,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { gpr_mu_lock(&call->mu); if (bctl->send_initial_metadata) { - if (!success) { + if (error != GRPC_ERROR_NONE) { set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE); } grpc_metadata_batch_destroy( @@ -1165,9 +1167,10 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { call->final_op.server.cancelled); } - success = 1; + grpc_error_unref(error); + error = GRPC_ERROR_NONE; } - bctl->success = success != 0; + bctl->error = grpc_error_ref(error); gpr_mu_unlock(&call->mu); if (gpr_unref(&bctl->steps_to_complete)) { post_batch_completion(exec_ctx, bctl); @@ -1201,7 +1204,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (nops == 0) { GRPC_CALL_INTERNAL_REF(call, "completion"); - bctl->success = 1; + bctl->error = GRPC_ERROR_NONE; if (!is_notify_tag_closure) { grpc_cq_begin_op(call->cq, notify_tag); } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index eef82cf014..9da660ca8b 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -75,7 +75,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); /* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to grpc_cq_begin_op */ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - void *tag, int success, + void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); -- cgit v1.2.3