aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-05-03 16:27:00 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-05-03 16:27:00 -0700
commitc027e77d6408c7ae9b1889f64e2629fa66872f3d (patch)
treea5c4ad14fe26fa4de665de77016ad7e69a44304c
parent7c6ba9bae47bdeb6f532b457d9848e7dc94c8e14 (diff)
Progress
-rw-r--r--src/core/lib/channel/compress_filter.c6
-rw-r--r--src/core/lib/channel/http_client_filter.c5
-rw-r--r--src/core/lib/channel/http_server_filter.c24
-rw-r--r--src/core/lib/http/httpcli.c14
-rw-r--r--src/core/lib/iomgr/closure.c5
-rw-r--r--src/core/lib/iomgr/closure.h2
-rw-r--r--src/core/lib/iomgr/error.c92
-rw-r--r--src/core/lib/iomgr/error.h29
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c7
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c3
-rw-r--r--src/core/lib/iomgr/socket_utils_common_posix.c34
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c25
-rw-r--r--src/core/lib/iomgr/tcp_posix.c26
-rw-r--r--src/core/lib/iomgr/tcp_server.h4
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c66
-rw-r--r--src/core/lib/iomgr/timer.c27
-rw-r--r--src/core/lib/iomgr/workqueue.h2
-rw-r--r--src/core/lib/iomgr/workqueue_posix.c10
-rw-r--r--src/core/lib/surface/alarm.c4
-rw-r--r--src/core/lib/surface/call.c53
-rw-r--r--src/core/lib/surface/completion_queue.h2
21 files changed, 286 insertions, 154 deletions
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 5510c79b18..e029ec54be 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -155,11 +155,11 @@ static void process_send_initial_metadata(
static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem);
-static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, bool success) {
+static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
gpr_slice_buffer_reset_and_unref(&calld->slices);
- calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, success);
+ calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
}
static void finish_send_message(grpc_exec_ctx *exec_ctx,
@@ -205,7 +205,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_next_op(exec_ctx, elem, &calld->send_op);
}
-static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, bool success) {
+static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 516e708d1f..ced3d75cd2 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -80,7 +80,8 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
+static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_error *error) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
client_recv_filter_args a;
@@ -88,7 +89,7 @@ static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
a.exec_ctx = exec_ctx;
grpc_metadata_batch_filter(calld->recv_initial_metadata, client_recv_filter,
&a);
- calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
+ calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, error);
}
static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) {
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index ba865416de..433ed0ed53 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -142,10 +142,11 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
}
}
-static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
+static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_error *err) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- if (success) {
+ if (err == GRPC_ERROR_NONE) {
server_filter_args a;
a.elem = elem;
a.exec_ctx = exec_ctx;
@@ -157,27 +158,32 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
calld->seen_path && calld->seen_authority) {
/* do nothing */
} else {
+ err = GRPC_ERROR_CREATE("Bad incoming HTTP headers");
if (!calld->seen_path) {
- gpr_log(GPR_ERROR, "Missing :path header");
+ err = grpc_error_add_child(err,
+ GRPC_ERROR_CREATE("Missing :path header"));
}
if (!calld->seen_authority) {
- gpr_log(GPR_ERROR, "Missing :authority header");
+ err = grpc_error_add_child(
+ err, GRPC_ERROR_CREATE("Missing :authority header"));
}
if (!calld->seen_method) {
- gpr_log(GPR_ERROR, "Missing :method header");
+ err = grpc_error_add_child(err,
+ GRPC_ERROR_CREATE("Missing :method header"));
}
if (!calld->seen_scheme) {
- gpr_log(GPR_ERROR, "Missing :scheme header");
+ err = grpc_error_add_child(err,
+ GRPC_ERROR_CREATE("Missing :scheme header"));
}
if (!calld->seen_te_trailers) {
- gpr_log(GPR_ERROR, "Missing te trailers header");
+ err = grpc_error_add_child(
+ err, GRPC_ERROR_CREATE("Missing te: trailers header"));
}
/* Error this call out */
- success = 0;
grpc_call_element_send_cancel(exec_ctx, elem);
}
}
- calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
+ calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, err);
}
static void hs_mutate_op(grpc_call_element *elem,
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c
index f22721ac8f..a59452fc59 100644
--- a/src/core/lib/http/httpcli.c
+++ b/src/core/lib/http/httpcli.c
@@ -117,13 +117,12 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
gpr_free(req);
}
-static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success);
-
static void do_read(grpc_exec_ctx *exec_ctx, internal_request *req) {
grpc_endpoint_read(exec_ctx, req->ep, &req->incoming, &req->on_read);
}
-static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_error *error) {
internal_request *req = user_data;
size_t i;
@@ -137,7 +136,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
}
}
- if (success) {
+ if (error == GRPC_ERROR_NONE) {
do_read(exec_ctx, req);
} else if (!req->have_read_byte) {
next_address(exec_ctx, req);
@@ -154,9 +153,9 @@ static void on_written(grpc_exec_ctx *exec_ctx, internal_request *req) {
do_read(exec_ctx, req);
}
-static void done_write(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void done_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
internal_request *req = arg;
- if (success) {
+ if (error == GRPC_ERROR_NONE) {
on_written(exec_ctx, req);
} else {
next_address(exec_ctx, req);
@@ -182,7 +181,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
start_write(exec_ctx, req);
}
-static void on_connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void on_connected(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
internal_request *req = arg;
if (!req->ep) {
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index 8738c4a5ce..885637efe9 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -87,12 +87,13 @@ typedef struct {
grpc_closure wrapper;
} wrapped_closure;
-static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
wrapped_closure *wc = arg;
grpc_iomgr_cb_func cb = wc->cb;
void *cb_arg = wc->cb_arg;
gpr_free(wc);
- cb(exec_ctx, cb_arg, success);
+ cb(exec_ctx, cb_arg, error);
}
grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 8e76412ee0..14132d926e 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -56,7 +56,7 @@ typedef struct grpc_closure_list {
* \param success An indication on the state of the iomgr. On false, cleanup
* actions should be taken (eg, shutdown). */
typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg,
- bool success);
+ grpc_error *error);
/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index 7dff3ece35..d0e4301722 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -58,6 +58,16 @@ static void destroy_err(void *err) { grpc_error_unref(err); }
static void *copy_err(void *err) { return grpc_error_ref(err); }
+static void destroy_time(void *tm) { gpr_free(tm); }
+
+static gpr_timespec *box_time(gpr_timespec tm) {
+ gpr_timespec *out = gpr_malloc(sizeof(*out));
+ *out = tm;
+ return out;
+}
+
+static void *copy_time(void *tm) { return box_time(*(gpr_timespec *)tm); }
+
static const gpr_avl_vtable avl_vtable_ints = {destroy_integer, copy_integer,
compare_integers,
destroy_integer, copy_integer};
@@ -66,6 +76,9 @@ static const gpr_avl_vtable avl_vtable_strs = {destroy_integer, copy_integer,
compare_integers, destroy_string,
copy_string};
+static const gpr_avl_vtable avl_vtable_times = {
+ destroy_integer, copy_integer, compare_integers, destroy_time, copy_time};
+
static const gpr_avl_vtable avl_vtable_errs = {
destroy_integer, copy_integer, compare_integers, destroy_err, copy_err};
@@ -75,6 +88,8 @@ static const char *error_int_name(grpc_error_ints key) {
return "status_code";
case GRPC_ERROR_INT_ERRNO:
return "errno";
+ case GRPC_ERROR_INT_FILE_LINE:
+ return "file_line";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -89,6 +104,16 @@ static const char *error_str_name(grpc_error_strs key) {
return "target_address";
case GRPC_ERROR_STR_SYSCALL:
return "syscall";
+ case GRPC_ERROR_STR_FILE:
+ return "file";
+ }
+ GPR_UNREACHABLE_CODE(return "unknown");
+}
+
+static const char *error_time_name(grpc_error_times key) {
+ switch (key) {
+ case GRPC_ERROR_TIME_CREATED:
+ return "created";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -97,12 +122,14 @@ struct grpc_error {
gpr_refcount refs;
gpr_avl ints;
gpr_avl strs;
+ gpr_avl times;
gpr_avl errs;
uintptr_t next_err;
};
static bool is_special(grpc_error *err) {
- return err == GRPC_ERROR_NONE || err == GRPC_ERROR_OOM;
+ return err == GRPC_ERROR_NONE || err == GRPC_ERROR_OOM ||
+ err == GRPC_ERROR_CANCELLED;
}
grpc_error *grpc_error_ref(grpc_error *err) {
@@ -116,6 +143,7 @@ static void error_destroy(grpc_error *err) {
gpr_avl_unref(err->ints);
gpr_avl_unref(err->strs);
gpr_avl_unref(err->errs);
+ gpr_avl_unref(err->times);
}
void grpc_error_unref(grpc_error *err) {
@@ -124,14 +152,29 @@ void grpc_error_unref(grpc_error *err) {
}
}
-grpc_error *grpc_error_create(void) {
+grpc_error *grpc_error_create(const char *file, int line, const char *desc,
+ grpc_error **referencing,
+ size_t num_referencing) {
grpc_error *err = gpr_malloc(sizeof(*err));
if (err == NULL) { // TODO(ctiller): make gpr_malloc return NULL
return GRPC_ERROR_OOM;
}
- err->ints = gpr_avl_create(&avl_vtable_ints);
- err->strs = gpr_avl_create(&avl_vtable_strs);
+ err->ints = gpr_avl_add(gpr_avl_create(&avl_vtable_ints),
+ (void *)(uintptr_t)GRPC_ERROR_INT_FILE_LINE,
+ (void *)(uintptr_t)line);
+ err->strs = gpr_avl_add(
+ gpr_avl_add(gpr_avl_create(&avl_vtable_strs),
+ (void *)(uintptr_t)GRPC_ERROR_STR_FILE, (void *)file),
+ (void *)(uintptr_t)GRPC_ERROR_STR_DESCRIPTION, (void *)desc);
err->errs = gpr_avl_create(&avl_vtable_errs);
+ for (size_t i = 0; i < num_referencing; i++) {
+ if (referencing[i] == GRPC_ERROR_NONE) continue;
+ err->errs =
+ gpr_avl_add(err->errs, (void *)(err->next_err++), referencing[i]);
+ }
+ err->times = gpr_avl_add(gpr_avl_create(&avl_vtable_times),
+ (void *)(uintptr_t)GRPC_ERROR_TIME_CREATED,
+ box_time(gpr_now(GPR_CLOCK_REALTIME)));
err->next_err = 0;
gpr_ref_init(&err->refs, 1);
return err;
@@ -139,12 +182,16 @@ grpc_error *grpc_error_create(void) {
static grpc_error *copy_error_and_unref(grpc_error *in) {
if (is_special(in)) {
- return grpc_error_create();
+ if (in == GRPC_ERROR_NONE) return GRPC_ERROR_CREATE("no error");
+ if (in == GRPC_ERROR_OOM) return GRPC_ERROR_CREATE("oom");
+ if (in == GRPC_ERROR_CANCELLED) return GRPC_ERROR_CREATE("cancelled");
+ return GRPC_ERROR_CREATE("unknown");
}
grpc_error *out = gpr_malloc(sizeof(*out));
out->ints = gpr_avl_ref(in->ints);
out->strs = gpr_avl_ref(in->strs);
out->errs = gpr_avl_ref(in->errs);
+ out->times = gpr_avl_ref(in->times);
out->next_err = in->next_err;
gpr_ref_init(&out->refs, 1);
grpc_error_unref(in);
@@ -173,6 +220,7 @@ grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) {
static const char *no_error_string = "null";
static const char *oom_error_string = "\"Out of memory\"";
+static const char *cancelled_error_string = "\"Cancelled\"";
typedef struct {
char *key;
@@ -211,6 +259,10 @@ static char *key_str(void *p) {
return gpr_strdup(error_str_name((grpc_error_strs)(uintptr_t)p));
}
+static char *key_time(void *p) {
+ return gpr_strdup(error_time_name((grpc_error_times)(uintptr_t)p));
+}
+
static char *fmt_int(void *p) {
char *s;
gpr_asprintf(&s, "%lld", (intptr_t)p);
@@ -277,6 +329,28 @@ static char *fmt_str(void *p) {
return s;
}
+static char *fmt_time(void *p) {
+ gpr_timespec tm = *(gpr_timespec *)p;
+ char *out;
+ char *pfx = "!!";
+ switch (tm.clock_type) {
+ case GPR_CLOCK_MONOTONIC:
+ pfx = "@monotonic:";
+ break;
+ case GPR_CLOCK_REALTIME:
+ pfx = "@";
+ break;
+ case GPR_CLOCK_PRECISE:
+ pfx = "@precise:";
+ break;
+ case GPR_TIMESPAN:
+ pfx = "";
+ break;
+ }
+ gpr_asprintf(&out, "%s%d.%09d", pfx, tm.tv_sec, tm.tv_nsec);
+ return out;
+}
+
static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap) {
if (n == NULL) return;
add_errs(n->left, s, sz, cap);
@@ -324,12 +398,14 @@ static const char *finish_kvs(kv_pairs *kvs) {
const char *grpc_error_string(grpc_error *err) {
if (err == GRPC_ERROR_NONE) return no_error_string;
if (err == GRPC_ERROR_OOM) return oom_error_string;
+ if (err == GRPC_ERROR_CANCELLED) return cancelled_error_string;
kv_pairs kvs;
memset(&kvs, 0, sizeof(kvs));
collect_kvs(err->ints.root, key_int, fmt_int, &kvs);
collect_kvs(err->strs.root, key_str, fmt_str, &kvs);
+ collect_kvs(err->times.root, key_time, fmt_time, &kvs);
append_kv(&kvs, gpr_strdup("referenced_errors"), errs_string(err));
qsort(kvs.kvs, kvs.num_kvs, sizeof(kv_pair), cmp_kvs);
@@ -337,10 +413,12 @@ const char *grpc_error_string(grpc_error *err) {
return finish_kvs(&kvs);
}
-grpc_error *grpc_os_error(int err, const char *call_name) {
+grpc_error *grpc_os_error(const char *file, int line, int err,
+ const char *call_name) {
return grpc_error_set_str(
grpc_error_set_str(
- grpc_error_set_int(grpc_error_create(), GRPC_ERROR_INT_ERRNO, err),
+ grpc_error_set_int(grpc_error_create(file, line, "OS Error", NULL, 0),
+ GRPC_ERROR_INT_ERRNO, err),
GRPC_ERROR_STR_OS_ERROR, strerror(err)),
GRPC_ERROR_STR_SYSCALL, call_name);
}
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 6909bf9da1..0ce684fdbc 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -36,34 +36,53 @@
#include <stdint.h>
+#include <grpc/support/time.h>
+
typedef struct grpc_error grpc_error;
typedef enum {
+ GRPC_ERROR_INT_ERRNO,
+ GRPC_ERROR_INT_FILE_LINE,
GRPC_ERROR_INT_STATUS_CODE,
- GRPC_ERROR_INT_ERRNO
} grpc_error_ints;
typedef enum {
GRPC_ERROR_STR_DESCRIPTION,
- GRPC_ERROR_STR_TARGET_ADDRESS,
+ GRPC_ERROR_STR_FILE,
GRPC_ERROR_STR_OS_ERROR,
- GRPC_ERROR_STR_SYSCALL
+ GRPC_ERROR_STR_SYSCALL,
+ GRPC_ERROR_STR_TARGET_ADDRESS,
} grpc_error_strs;
+typedef enum {
+ GRPC_ERROR_TIME_CREATED,
+} grpc_error_times;
+
#define GRPC_ERROR_NONE ((grpc_error *)NULL)
#define GRPC_ERROR_OOM ((grpc_error *)1)
+#define GRPC_ERROR_CANCELLED ((grpc_error *)2)
const char *grpc_error_string(grpc_error *error);
void grpc_error_free_string(const char *str);
-grpc_error *grpc_error_create(void);
+grpc_error *grpc_error_create(const char *file, int line, const char *desc,
+ grpc_error **referencing, size_t num_referencing);
+#define GRPC_ERROR_CREATE(desc) \
+ grpc_error_create(__FILE__, __LINE__, desc, NULL, 0)
+#define GRPC_ERROR_CREATE_REFERENCING(desc, errs, count) \
+ grpc_error_create(__FILE__, __LINE__, desc, errs, count)
grpc_error *grpc_error_ref(grpc_error *err);
void grpc_error_unref(grpc_error *err);
grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
intptr_t value);
+grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which,
+ gpr_timespec value);
grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
const char *value);
grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child);
-grpc_error *grpc_os_error(int err, const char *call_name);
+grpc_error *grpc_os_error(const char *file, int line, int err,
+ const char *call_name);
+#define GRPC_OS_ERROR(err, call_name) \
+ grpc_os_error(__FILE__, __LINE__, err, call_name)
#endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index e6a9cecbda..d11c947df2 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -512,8 +512,7 @@ static grpc_error *fd_shutdown_error(bool shutdown) {
if (!shutdown) {
return GRPC_ERROR_NONE;
} else {
- return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_DESCRIPTION,
- "FD shutdown");
+ return GRPC_ERROR_CREATE("FD shutdown");
}
}
@@ -1045,7 +1044,7 @@ typedef struct grpc_unary_promote_args {
} grpc_unary_promote_args;
static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args,
- bool success) {
+ grpc_error *error) {
grpc_unary_promote_args *up_args = args;
const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
grpc_pollset *pollset = up_args->pollset;
@@ -1571,7 +1570,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_status) {
+ grpc_error *error) {
delayed_add *da = arg;
if (!fd_is_orphaned(da->fd)) {
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index 7c137c5d6c..e7d90b9c60 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -147,7 +147,8 @@ grpc_resolved_addresses *(*grpc_blocking_resolve_address)(
/* Callback to be passed to grpc_executor to asynch-ify
* grpc_blocking_resolve_address */
-static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) {
+static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp,
+ grpc_error *error) {
request *r = rp;
grpc_resolved_addresses *resolved =
grpc_blocking_resolve_address(r->name, r->default_port);
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c
index 8b49d91dd9..11ab090495 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.c
+++ b/src/core/lib/iomgr/socket_utils_common_posix.c
@@ -60,7 +60,7 @@
grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking) {
int oldflags = fcntl(fd, F_GETFL, 0);
if (oldflags < 0) {
- return grpc_os_error(errno, "fcntl");
+ return GRPC_OS_ERROR(errno, "fcntl");
}
if (non_blocking) {
@@ -70,7 +70,7 @@ grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking) {
}
if (fcntl(fd, F_SETFL, oldflags) != 0) {
- return grpc_os_error(errno, "fcntl");
+ return GRPC_OS_ERROR(errno, "fcntl");
}
return GRPC_ERROR_NONE;
@@ -82,13 +82,13 @@ grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd) {
int newval;
socklen_t intlen = sizeof(newval);
if (0 != setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) {
- return grpc_os_error(errno, "setsockopt(SO_NOSIGPIPE)");
+ return GRPC_OS_ERROR(errno, "setsockopt(SO_NOSIGPIPE)");
}
if (0 == getsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) {
- return grpc_os_error(errno, "getsockopt(SO_NOSIGPIPE)");
+ return GRPC_OS_ERROR(errno, "getsockopt(SO_NOSIGPIPE)");
}
if ((newval != 0) == val) {
- return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_grpc_os_error,
+ return grpc_error_set_str(GRPC_ERROR_CREATE(), GRPC_ERROR_STR_grpc_os_error,
"Failed to set SO_NOSIGPIPE");
}
#endif
@@ -100,7 +100,7 @@ grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd) {
int get_local_ip = 1;
if (0 != setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
sizeof(get_local_ip))) {
- return grpc_os_error(errno, "setsockopt(IP_PKTINFO)");
+ return GRPC_OS_ERROR(errno, "setsockopt(IP_PKTINFO)");
}
#endif
return GRPC_ERROR_NONE;
@@ -111,7 +111,7 @@ grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) {
int get_local_ip = 1;
if (0 != setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
sizeof(get_local_ip))) {
- return grpc_os_error(errno, "setsockopt(IPV6_RECVPKTINFO)");
+ return GRPC_OS_ERROR(errno, "setsockopt(IPV6_RECVPKTINFO)");
}
#endif
return GRPC_ERROR_NONE;
@@ -121,7 +121,7 @@ grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) {
grpc_error *grpc_set_socket_cloexec(int fd, int close_on_exec) {
int oldflags = fcntl(fd, F_GETFD, 0);
if (oldflags < 0) {
- return grpc_os_error(errno, "fcntl");
+ return GRPC_OS_ERROR(errno, "fcntl");
}
if (close_on_exec) {
@@ -131,7 +131,7 @@ grpc_error *grpc_set_socket_cloexec(int fd, int close_on_exec) {
}
if (fcntl(fd, F_SETFD, oldflags) != 0) {
- return grpc_os_error(errno, "fcntl");
+ return GRPC_OS_ERROR(errno, "fcntl");
}
return GRPC_ERROR_NONE;
@@ -143,14 +143,13 @@ grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse) {
int newval;
socklen_t intlen = sizeof(newval);
if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) {
- return grpc_os_error(errno, "setsockopt(SO_REUSEADDR)");
+ return GRPC_OS_ERROR(errno, "setsockopt(SO_REUSEADDR)");
}
if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen)) {
- return grpc_os_error(errno, "getsockopt(SO_REUSEADDR)");
+ return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEADDR)");
}
if ((newval != 0) != val) {
- return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_OS_ERROR,
- "Failed to set SO_REUSEADDR");
+ return GRPC_ERROR_CREATE("Failed to set SO_REUSEADDR");
}
return GRPC_ERROR_NONE;
@@ -162,14 +161,13 @@ grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) {
int newval;
socklen_t intlen = sizeof(newval);
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
- return grpc_os_error(errno, "setsockopt(TCP_NODELAY)");
+ return GRPC_OS_ERROR(errno, "setsockopt(TCP_NODELAY)");
}
if (0 != getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen)) {
- return grpc_os_error(errno, "getsockopt(TCP_NODELAY)");
+ return GRPC_OS_ERROR(errno, "getsockopt(TCP_NODELAY)");
}
if ((newval != 0) != val) {
- return grpc_error_set_str(grpc_error_create(), GRPC_ERROR_STR_OS_ERROR,
- "Failed to set TCP_NODELAY");
+ return GRPC_ERROR_CREATE("Failed to set TCP_NODELAY");
}
return GRPC_ERROR_NONE;
}
@@ -250,7 +248,7 @@ grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
*dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE;
*newfd = socket(family, type, protocol);
if (*newfd == -1) {
- return grpc_os_error(errno, "socket");
+ return GRPC_OS_ERROR(errno, "socket");
}
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 190512c17c..04b5b625a1 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -95,12 +95,14 @@ done:
return err;
}
-static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
+static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
int done;
async_connect *ac = acp;
if (grpc_tcp_trace) {
- gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: success=%d", ac->addr_str,
- success);
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s", ac->addr_str,
+ str);
+ grpc_error_free_string(str);
}
gpr_mu_lock(&ac->mu);
if (ac->fd != NULL) {
@@ -115,7 +117,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
}
}
-static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
+static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect *ac = acp;
int so_error = 0;
socklen_t so_error_size;
@@ -124,11 +126,12 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
grpc_endpoint **ep = ac->ep;
grpc_closure *closure = ac->closure;
grpc_fd *fd;
- grpc_error *error = GRPC_ERROR_NONE;
if (grpc_tcp_trace) {
- gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: success=%d",
- ac->addr_str, success);
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: error=%s",
+ ac->addr_str, str);
+ grpc_error_free_string(str);
}
gpr_mu_lock(&ac->mu);
@@ -140,14 +143,14 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
grpc_timer_cancel(exec_ctx, &ac->alarm);
gpr_mu_lock(&ac->mu);
- if (success) {
+ if (error == GRPC_ERROR_NONE) {
do {
so_error_size = sizeof(so_error);
err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
&so_error_size);
} while (err < 0 && errno == EINTR);
if (err < 0) {
- error = grpc_os_error(errno, "getsockopt");
+ error = GRPC_OS_ERROR(errno, "getsockopt");
goto finish;
} else if (so_error != 0) {
if (so_error == ENOBUFS) {
@@ -177,7 +180,7 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
"Connection refused");
break;
default:
- error = grpc_os_error(errno, "getsockopt(SO_ERROR)");
+ error = GRPC_OS_ERROR(errno, "getsockopt(SO_ERROR)");
break;
}
goto finish;
@@ -276,7 +279,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
- grpc_exec_ctx_push(exec_ctx, closure, grpc_os_error(errno, "connect"),
+ grpc_exec_ctx_push(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"),
NULL);
goto done;
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 7624d77d8f..925bcf2f6e 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -101,9 +101,9 @@ typedef struct {
} grpc_tcp;
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- bool success);
+ grpc_error *error);
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- bool success);
+ grpc_error *error);
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
@@ -155,12 +155,15 @@ static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
TCP_UNREF(exec_ctx, tcp, "destroy");
}
-static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) {
+static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
+ grpc_error *error) {
grpc_closure *cb = tcp->read_cb;
if (grpc_tcp_trace) {
size_t i;
- gpr_log(GPR_DEBUG, "read: success=%d", success);
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "read: error=%s", str);
+ grpc_error_free_string(str);
for (i = 0; i < tcp->incoming_buffer->count; i++) {
char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
@@ -171,7 +174,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) {
tcp->read_cb = NULL;
tcp->incoming_buffer = NULL;
- cb->cb(exec_ctx, cb->cb_arg, success);
+ cb->cb(exec_ctx, cb->cb_arg, error);
}
#define MAX_READ_IOVEC 4
@@ -240,7 +243,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
++tcp->iov_size;
}
GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
- call_read_cb(exec_ctx, tcp, 1);
+ call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE);
TCP_UNREF(exec_ctx, tcp, "read");
}
@@ -248,11 +251,11 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
}
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- bool success) {
+ grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
call_read_cb(exec_ctx, tcp, 0);
TCP_UNREF(exec_ctx, tcp, "read");
@@ -332,7 +335,7 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
tcp->outgoing_byte_idx = unwind_byte_idx;
return false;
} else {
- *error = grpc_os_error(errno, "sendmsg");
+ *error = GRPC_OS_ERROR(errno, "sendmsg");
return true;
}
}
@@ -361,12 +364,11 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
}
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- bool success) {
+ grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)arg;
- grpc_error *error = GRPC_ERROR_NONE;
grpc_closure *cb;
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
cb = tcp->write_cb;
tcp->write_cb = NULL;
cb->cb(exec_ctx, cb->cb_arg, 0);
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 99b9f29729..75c582d18f 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -73,8 +73,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
but not dualstack sockets. */
/* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
-int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
- size_t addr_len);
+grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
+ size_t addr_len, int *out_port);
/* Number of fds at the given port_index, or 0 if port_index is out of
bounds. */
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index aaeb384f6e..64ab1c144b 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -59,6 +59,8 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
@@ -150,7 +152,7 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
}
gpr_mu_destroy(&s->mu);
@@ -165,7 +167,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server,
- bool success) {
+ grpc_error *error) {
grpc_tcp_server *s = server;
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
@@ -306,14 +308,14 @@ error:
}
/* event manager callback when reads are ready */
-static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
sp->fd_index};
grpc_fd *fdobj;
size_t i;
- if (!success) {
+ if (err != GRPC_ERROR_NONE) {
goto error;
}
@@ -420,8 +422,8 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
return sp;
}
-int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
- size_t addr_len) {
+grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
+ size_t addr_len, int *out_port) {
grpc_tcp_listener *sp;
grpc_tcp_listener *sp2 = NULL;
int fd;
@@ -436,6 +438,7 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
int port;
unsigned port_index = 0;
unsigned fd_index = 0;
+ grpc_error *errs[2] = {GRPC_ERROR_NONE, GRPC_ERROR_NONE};
if (s->tail != NULL) {
port_index = s->tail->port_index + 1;
}
@@ -474,26 +477,26 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
/* Try listening on IPv6 first. */
addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6);
- fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
- sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index);
- if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
- goto done;
- }
- if (sp != NULL) {
- ++fd_index;
- }
- /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
- if (port == 0 && sp != NULL) {
- grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port);
+ errs[0] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
+ if (errs[0] == GRPC_ERROR_NONE) {
+ sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index);
+ if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
+ goto done;
+ }
+ if (sp != NULL) {
+ ++fd_index;
+ }
+ /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
+ if (port == 0 && sp != NULL) {
+ grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port);
+ }
+ addr = (struct sockaddr *)&wild4;
+ addr_len = sizeof(wild4);
}
- addr = (struct sockaddr *)&wild4;
- addr_len = sizeof(wild4);
}
- fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
- if (fd < 0) {
- gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
- } else {
+ errs[1] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
+ if (errs[1] == GRPC_ERROR_NONE) {
if (dsmode == GRPC_DSMODE_IPV4 &&
grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
addr = (struct sockaddr *)&addr4_copy;
@@ -510,9 +513,19 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
done:
gpr_free(allocated_addr);
if (sp != NULL) {
- return sp->port;
+ *out_port = sp->port;
+ grpc_error_unref(errs[0]);
+ grpc_error_unref(errs[1]);
+ return GRPC_ERROR_NONE;
} else {
- return -1;
+ *out_port = -1;
+ char *addr_str = grpc_sockaddr_to_uri(addr);
+ grpc_error *err = grpc_error_set_str(
+ GRPC_ERROR_CREATE_REFERENCING("Failed to add port to server", errs,
+ GPR_ARRAY_SIZE(errs)),
+ GRPC_ERROR_STR_TARGET_ADDRESS, addr_str);
+ gpr_free(addr_str);
+ return err;
}
}
@@ -581,7 +594,8 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
grpc_closure *shutdown_starting) {
gpr_mu_lock(&s->mu);
- grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1);
+ grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
+ GRPC_ERROR_NONE);
gpr_mu_unlock(&s->mu);
}
diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer.c
index acb5b26c87..d785d1543f 100644
--- a/src/core/lib/iomgr/timer.c
+++ b/src/core/lib/iomgr/timer.c
@@ -73,7 +73,7 @@ static shard_type *g_shard_queue[NUM_SHARDS];
static bool g_initialized = false;
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
- gpr_timespec *next, int success);
+ gpr_timespec *next, grpc_error *error);
static gpr_timespec compute_min_deadline(shard_type *shard) {
return grpc_timer_heap_is_empty(&shard->heap)
@@ -185,13 +185,16 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
if (!g_initialized) {
timer->triggered = 1;
- grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL);
+ grpc_exec_ctx_push(
+ exec_ctx, &timer->closure,
+ GRPC_ERROR_CREATE("Attempt to create timer before initialization"),
+ NULL);
return;
}
if (gpr_time_cmp(deadline, now) <= 0) {
timer->triggered = 1;
- grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
return;
}
@@ -238,7 +241,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
shard_type *shard = &g_shards[shard_idx(timer)];
gpr_mu_lock(&shard->mu);
if (!timer->triggered) {
- grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL);
+ grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL);
timer->triggered = 1;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
@@ -299,12 +302,12 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) {
/* REQUIRES: shard->mu unlocked */
static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
gpr_timespec now, gpr_timespec *new_min_deadline,
- int success) {
+ grpc_error *error) {
size_t n = 0;
grpc_timer *timer;
gpr_mu_lock(&shard->mu);
while ((timer = pop_one(shard, now))) {
- grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success, NULL);
+ grpc_exec_ctx_push(exec_ctx, &timer->closure, grpc_error_ref(error), NULL);
n++;
}
*new_min_deadline = compute_min_deadline(shard);
@@ -313,7 +316,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
}
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
- gpr_timespec *next, int success) {
+ gpr_timespec *next, grpc_error *error) {
size_t n = 0;
/* TODO(ctiller): verify that there are any timers (atomically) here */
@@ -327,8 +330,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
/* For efficiency, we pop as many available timers as we can from the
shard. This may violate perfect timer deadline ordering, but that
shouldn't be a big deal because we don't make ordering guarantees. */
- n += pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline,
- success);
+ n +=
+ pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error);
/* An grpc_timer_init() on the shard could intervene here, adding a new
timer that is earlier than new_min_deadline. However,
@@ -359,6 +362,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
*next, gpr_time_add(now, gpr_time_from_millis(1, GPR_TIMESPAN)));
}
+ grpc_error_unref(error);
+
return (int)n;
}
@@ -367,5 +372,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
GPR_ASSERT(now.clock_type == g_clock_type);
return run_some_expired_timers(
exec_ctx, now, next,
- gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0);
+ gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0
+ ? GRPC_ERROR_NONE
+ : GRPC_ERROR_CREATE("Shutting down timer system"));
}
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 3e2b223670..20b47e1c14 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -78,6 +78,6 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
/** Add a work item to a workqueue */
void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
- int success);
+ grpc_error *error);
#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
index 80e7a0b206..e27525dd50 100644
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ b/src/core/lib/iomgr/workqueue_posix.c
@@ -45,7 +45,7 @@
#include "src/core/lib/iomgr/ev_posix.h"
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success);
+static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) {
char name[32];
@@ -110,10 +110,10 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
gpr_mu_unlock(&workqueue->mu);
}
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_workqueue *workqueue = arg;
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
gpr_mu_destroy(&workqueue->mu);
/* HACK: let wakeup_fd code know that we stole the fd */
workqueue->wakeup_fd.read_fd = 0;
@@ -131,12 +131,12 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
}
void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
- int success) {
+ grpc_error *error) {
gpr_mu_lock(&workqueue->mu);
if (grpc_closure_list_empty(workqueue->closure_list)) {
grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
}
- grpc_closure_list_add(&workqueue->closure_list, closure, success);
+ grpc_closure_list_append(&workqueue->closure_list, closure, error);
gpr_mu_unlock(&workqueue->mu);
}
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index 2cf2f00b31..aa9d60ee6a 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -48,9 +48,9 @@ struct grpc_alarm {
static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg,
grpc_cq_completion *c) {}
-static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_alarm *alarm = arg;
- grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, success,
+ grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, error,
do_nothing_end_completion, NULL, &alarm->completion);
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 9b2b94eedf..0d6c58db62 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -122,6 +122,7 @@ typedef struct batch_control {
grpc_closure finish_batch;
void *notify_tag;
gpr_refcount steps_to_complete;
+ grpc_error *error;
uint8_t send_initial_metadata;
uint8_t send_message;
@@ -130,7 +131,6 @@ typedef struct batch_control {
uint8_t recv_message;
uint8_t recv_final_op;
uint8_t is_notify_tag_closure;
- uint8_t success;
} batch_control;
struct grpc_call {
@@ -239,9 +239,9 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description);
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
- bool success);
+ grpc_error *error);
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- bool success);
+ grpc_error *error);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
uint32_t propagation_mask,
@@ -361,7 +361,8 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
}
-static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
+static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
+ grpc_error *error) {
size_t i;
int ii;
grpc_call *c = call;
@@ -706,13 +707,13 @@ typedef struct cancel_closure {
grpc_status_code status;
} cancel_closure;
-static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
+static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, grpc_error *error) {
cancel_closure *cc = ccp;
GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel");
gpr_free(cc);
}
-static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
+static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, grpc_error *error) {
grpc_transport_stream_op op;
cancel_closure *cc = ccp;
memset(&op, 0, sizeof(op));
@@ -739,7 +740,7 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
cc->call = c;
cc->status = status;
GRPC_CALL_INTERNAL_REF(c, "cancel");
- grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, &cc->closure, GRPC_ERROR_NONE, NULL);
return GRPC_CALL_OK;
}
@@ -775,11 +776,11 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
-static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_call *call = arg;
gpr_mu_lock(&call->mu);
call->have_alarm = 0;
- if (success) {
+ if (error != GRPC_ERROR_NONE) {
cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded");
}
@@ -961,7 +962,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
if (bctl->is_notify_tag_closure) {
- grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success, NULL);
+ grpc_exec_ctx_push(exec_ctx, bctl->notify_tag, bctl->error, NULL);
gpr_mu_lock(&call->mu);
bctl->call->used_batches =
(uint8_t)(bctl->call->used_batches &
@@ -969,7 +970,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&call->mu);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
- grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success,
+ grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->error,
finish_batch_completion, bctl, &bctl->cq_completion);
}
}
@@ -1001,11 +1002,11 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
}
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- bool success) {
+ grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
- if (success) {
+ if (error == GRPC_ERROR_NONE) {
gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
call->receiving_slice);
continue_receiving_slices(exec_ctx, bctl);
@@ -1058,15 +1059,15 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
}
static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- bool success) {
+ grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
gpr_mu_lock(&bctl->call->mu);
- if (bctl->call->has_initial_md_been_received || !success ||
+ if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
call->receiving_stream == NULL) {
gpr_mu_unlock(&bctl->call->mu);
- process_data_after_md(exec_ctx, bctlp, success);
+ process_data_after_md(exec_ctx, bctlp, error);
} else {
call->saved_receiving_stream_ready_bctlp = bctlp;
gpr_mu_unlock(&bctl->call->mu);
@@ -1074,14 +1075,14 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
- void *bctlp, bool success) {
+ void *bctlp, grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
gpr_mu_lock(&call->mu);
- if (!success) {
- bctl->success = false;
+ if (error != GRPC_ERROR_NONE) {
+ bctl->error = grpc_error_ref(error);
} else {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
@@ -1101,7 +1102,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
grpc_closure *saved_rsr_closure = grpc_closure_create(
receiving_stream_ready, call->saved_receiving_stream_ready_bctlp);
call->saved_receiving_stream_ready_bctlp = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, success, NULL);
+ grpc_exec_ctx_push(exec_ctx, saved_rsr_closure, error, NULL);
}
gpr_mu_unlock(&call->mu);
@@ -1111,7 +1112,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
}
}
-static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
+static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
+ grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
grpc_call *child_call;
@@ -1119,7 +1121,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
gpr_mu_lock(&call->mu);
if (bctl->send_initial_metadata) {
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE);
}
grpc_metadata_batch_destroy(
@@ -1165,9 +1167,10 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
call->final_op.server.cancelled);
}
- success = 1;
+ grpc_error_unref(error);
+ error = GRPC_ERROR_NONE;
}
- bctl->success = success != 0;
+ bctl->error = grpc_error_ref(error);
gpr_mu_unlock(&call->mu);
if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(exec_ctx, bctl);
@@ -1201,7 +1204,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (nops == 0) {
GRPC_CALL_INTERNAL_REF(call, "completion");
- bctl->success = 1;
+ bctl->error = GRPC_ERROR_NONE;
if (!is_notify_tag_closure) {
grpc_cq_begin_op(call->cq, notify_tag);
}
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index eef82cf014..9da660ca8b 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -75,7 +75,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
grpc_cq_begin_op */
void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
- void *tag, int success,
+ void *tag, grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);