aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/census/grpc_filter.c6
-rw-r--r--src/core/ext/client_config/channel_connectivity.c23
-rw-r--r--src/core/ext/client_config/client_channel.c77
-rw-r--r--src/core/ext/client_config/lb_policy.c6
-rw-r--r--src/core/ext/client_config/lb_policy.h8
-rw-r--r--src/core/ext/client_config/subchannel.c71
-rw-r--r--src/core/ext/client_config/subchannel.h2
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c47
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c78
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c60
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c24
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c4
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c6
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c8
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c47
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c28
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c67
-rw-r--r--src/core/lib/iomgr/error.c6
-rw-r--r--src/core/lib/iomgr/error.h3
-rw-r--r--src/core/lib/security/credentials/credentials.h1
-rw-r--r--src/core/lib/security/credentials/fake/fake_credentials.c8
-rw-r--r--src/core/lib/security/credentials/google_default/google_default_credentials.c23
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_verifier.c20
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.c21
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.h4
-rw-r--r--src/core/lib/security/transport/handshake.c62
-rw-r--r--src/core/lib/security/transport/secure_endpoint.c25
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c14
-rw-r--r--src/core/lib/security/transport/tsi_error.c40
-rw-r--r--src/core/lib/security/transport/tsi_error.h42
-rw-r--r--src/core/lib/surface/channel.c2
-rw-r--r--src/core/lib/surface/server.c13
-rw-r--r--src/core/lib/transport/connectivity_state.c24
-rw-r--r--src/core/lib/transport/connectivity_state.h5
-rw-r--r--src/core/lib/transport/transport.h2
35 files changed, 572 insertions, 305 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 5e278ef127..df7016efe3 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -91,14 +91,14 @@ static void client_start_transport_op(grpc_exec_ctx *exec_ctx,
}
static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
- bool success) {
+ grpc_error *error) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (success) {
+ if (error == GRPC_ERROR_NONE) {
extract_and_annotate_method_tag(calld->recv_initial_metadata, calld, chand);
}
- calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
+ calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, error);
}
static void server_mutate_op(grpc_call_element *elem,
diff --git a/src/core/ext/client_config/channel_connectivity.c b/src/core/ext/client_config/channel_connectivity.c
index 3ebc333608..1898bf6279 100644
--- a/src/core/ext/client_config/channel_connectivity.c
+++ b/src/core/ext/client_config/channel_connectivity.c
@@ -75,7 +75,7 @@ typedef enum {
typedef struct {
gpr_mu mu;
callback_phase phase;
- int success;
+ grpc_error *error;
grpc_closure on_complete;
grpc_timer alarm;
grpc_connectivity_state state;
@@ -122,7 +122,7 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
}
static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
- int due_to_completion) {
+ bool due_to_completion, grpc_error *error) {
int delete = 0;
if (due_to_completion) {
@@ -131,13 +131,14 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
gpr_mu_lock(&w->mu);
if (due_to_completion) {
- w->success = 1;
+ grpc_error_unref(w->error);
+ w->error = GRPC_ERROR_NONE;
}
switch (w->phase) {
case WAITING:
w->phase = CALLING_BACK;
- grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->success, finished_completion,
- w, &w->completion_storage);
+ grpc_cq_end_op(exec_ctx, w->cq, w->tag, grpc_error_ref(w->error),
+ finished_completion, w, &w->completion_storage);
break;
case CALLING_BACK:
w->phase = CALLING_BACK_AND_FINISHED;
@@ -155,12 +156,14 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
}
}
-static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) {
- partly_done(exec_ctx, pw, 1);
+static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw,
+ grpc_error *error) {
+ partly_done(exec_ctx, pw, true, error);
}
-static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) {
- partly_done(exec_ctx, pw, 0);
+static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw,
+ grpc_error *error) {
+ partly_done(exec_ctx, pw, false, error);
}
void grpc_channel_watch_connectivity_state(
@@ -185,7 +188,7 @@ void grpc_channel_watch_connectivity_state(
grpc_closure_init(&w->on_complete, watch_complete, w);
w->phase = WAITING;
w->state = last_observed_state;
- w->success = 0;
+ w->error = GRPC_ERROR_CREATE("Some error");
w->cq = cq;
w->tag = tag;
w->channel = channel;
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 9b5a078aec..be9e962bcd 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -117,6 +117,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
channel_data *chand,
grpc_connectivity_state state,
+ grpc_error *error,
const char *reason) {
if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
state == GRPC_CHANNEL_FATAL_FAILURE) &&
@@ -127,11 +128,13 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
/* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
/* check= */ 0);
}
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, reason);
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
+ reason);
}
-static void on_lb_policy_state_changed_locked(
- grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
+static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
+ lb_policy_connectivity_watcher *w,
+ grpc_error *error) {
grpc_connectivity_state publish_state = w->state;
/* check if the notification is for a stale policy */
if (w->lb_policy != w->chand->lb_policy) return;
@@ -144,18 +147,18 @@ static void on_lb_policy_state_changed_locked(
w->chand->lb_policy = NULL;
}
set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
- "lb_changed");
+ error, "lb_changed");
if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
}
}
static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
lb_policy_connectivity_watcher *w = arg;
gpr_mu_lock(&w->chand->mu_config);
- on_lb_policy_state_changed_locked(exec_ctx, w);
+ on_lb_policy_state_changed_locked(exec_ctx, w, error);
gpr_mu_unlock(&w->chand->mu_config);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
@@ -177,19 +180,22 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
}
static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
int exit_idle = 0;
+ grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
if (chand->incoming_configuration != NULL) {
lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "channel");
GRPC_LB_POLICY_REF(lb_policy, "config_change");
- state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy);
+ grpc_error_unref(state_error);
+ state =
+ grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
grpc_client_config_unref(exec_ctx, chand->incoming_configuration);
@@ -209,7 +215,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
NULL);
} else if (chand->resolver == NULL /* disconnected */) {
- grpc_closure_list_fail_all(&chand->waiting_for_config_closures);
+ grpc_closure_list_fail_all(
+ &chand->waiting_for_config_closures,
+ GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1));
grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
NULL);
}
@@ -219,8 +227,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
chand->exit_idle_when_lb_policy_arrives = 0;
}
- if (iomgr_success && chand->resolver) {
- set_channel_connectivity_state_locked(exec_ctx, chand, state,
+ if (error == GRPC_ERROR_NONE && chand->resolver) {
+ set_channel_connectivity_state_locked(exec_ctx, chand, state, state_error,
"new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(exec_ctx, chand, lb_policy, state);
@@ -236,8 +244,12 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
+ grpc_error *refs[] = {error, state_error};
set_channel_connectivity_state_locked(
- exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
+ exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE,
+ GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs,
+ GPR_ARRAY_SIZE(refs)),
+ "resolver_gone");
gpr_mu_unlock(&chand->mu_config);
}
@@ -264,7 +276,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_transport_op *op) {
channel_data *chand = elem->channel_data;
- grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
GPR_ASSERT(op->set_accept_stream == false);
if (op->bind_pollset != NULL) {
@@ -283,7 +295,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
if (op->send_ping != NULL) {
if (chand->lb_policy == NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, false, NULL);
+ grpc_exec_ctx_push(exec_ctx, op->send_ping,
+ GRPC_ERROR_CREATE("Ping with no load balancing"),
+ NULL);
} else {
grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
op->bind_pollset = NULL;
@@ -291,14 +305,16 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
op->send_ping = NULL;
}
- if (op->disconnect && chand->resolver != NULL) {
+ if (op->disconnect_with_error != GRPC_ERROR_NONE && chand->resolver != NULL) {
set_channel_connectivity_state_locked(
- exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+ exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE,
+ grpc_error_ref(op->disconnect_with_error), "disconnect");
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
if (!chand->started_resolving) {
- grpc_closure_list_fail_all(&chand->waiting_for_config_closures);
+ grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
+ op->disconnect_with_error);
grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
NULL);
}
@@ -328,16 +344,17 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
-static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
continue_picking_args *cpa = arg;
if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
- } else if (!success) {
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
+ } else if (error != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_push(exec_ctx, cpa->on_ready, grpc_error_ref(error), NULL);
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready)) {
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
}
gpr_free(cpa);
}
@@ -362,11 +379,12 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
connected_subchannel);
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
- closure = grpc_closure_next(closure)) {
+ closure = closure->next_data.next) {
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
+ grpc_exec_ctx_push(exec_ctx, cpa->on_ready,
+ GRPC_ERROR_CREATE("Pick cancelled"), NULL);
}
}
gpr_mu_unlock(&chand->mu_config);
@@ -398,10 +416,11 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
cpa->on_ready = on_ready;
cpa->elem = elem;
grpc_closure_init(&cpa->closure, continue_picking, cpa);
- grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure,
- 1);
+ grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
+ GRPC_ERROR_NONE);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, on_ready, false, NULL);
+ grpc_exec_ctx_push(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"),
+ NULL);
}
gpr_mu_unlock(&chand->mu_config);
return 0;
@@ -506,7 +525,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
gpr_mu_lock(&chand->mu_config);
- out = grpc_connectivity_state_check(&chand->state_tracker);
+ out = grpc_connectivity_state_check(&chand->state_tracker, NULL);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
if (chand->lb_policy != NULL) {
grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
@@ -533,7 +552,7 @@ typedef struct {
} external_connectivity_watcher;
static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
external_connectivity_watcher *w = arg;
grpc_closure *follow_up = w->on_complete;
grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
@@ -541,7 +560,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
gpr_free(w);
- follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, error);
}
void grpc_client_channel_watch_connectivity_state(
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index a7ad9842dc..fcff0c9a1b 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -138,6 +138,8 @@ void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
}
grpc_connectivity_state grpc_lb_policy_check_connectivity(
- grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
- return policy->vtable->check_connectivity(exec_ctx, policy);
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_error **connectivity_error) {
+ return policy->vtable->check_connectivity(exec_ctx, policy,
+ connectivity_error);
}
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index 0384e0b2eb..13b9abc474 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -75,8 +75,9 @@ struct grpc_lb_policy_vtable {
void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
/** check the current connectivity of the lb_policy */
- grpc_connectivity_state (*check_connectivity)(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *policy);
+ grpc_connectivity_state (*check_connectivity)(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_error **connectivity_error);
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy */
@@ -152,6 +153,7 @@ void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_closure *closure);
grpc_connectivity_state grpc_lb_policy_check_connectivity(
- grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_error **connectivity_error);
#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H */
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index bd45d3825c..2c7b227cb8 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -147,7 +147,7 @@ struct grpc_subchannel_call {
(((grpc_subchannel_call *)(callstack)) - 1)
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
- bool iomgr_success);
+ grpc_error *error);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define REF_REASON reason
@@ -177,7 +177,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
*/
static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
grpc_connected_subchannel *c = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c);
@@ -200,7 +200,7 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
*/
static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
grpc_subchannel *c = arg;
gpr_free((void *)c->filters);
grpc_channel_args_destroy(c->args);
@@ -290,8 +290,8 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
gpr_atm old_refs;
old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
if (old_refs == 1) {
- grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c),
- true, NULL);
+ grpc_exec_ctx_push(exec_ctx, grpc_closure_create(subchannel_destroy, c),
+ GRPC_ERROR_NONE, NULL);
}
}
@@ -382,7 +382,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
args.initial_connect_string = c->initial_connect_string;
grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
- GRPC_CHANNEL_CONNECTING, "state_change");
+ GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
+ "state_change");
grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
&c->connected);
}
@@ -393,16 +394,17 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
continue_connect(exec_ctx, c);
}
-grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
+grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
+ grpc_error **error) {
grpc_connectivity_state state;
gpr_mu_lock(&c->mu);
- state = grpc_connectivity_state_check(&c->state_tracker);
+ state = grpc_connectivity_state_check(&c->state_tracker, error);
gpr_mu_unlock(&c->mu);
return state;
}
static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
external_state_watcher *w = arg;
grpc_closure *follow_up = w->notify;
if (w->pollset_set != NULL) {
@@ -415,7 +417,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&w->subchannel->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
gpr_free(w);
- follow_up->cb(exec_ctx, follow_up->cb_arg, success);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, error);
}
void grpc_subchannel_notify_on_state_change(
@@ -469,7 +471,7 @@ void grpc_connected_subchannel_process_transport_op(
}
static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
- bool iomgr_success) {
+ grpc_error *error) {
state_watcher *sw = p;
grpc_subchannel *c = sw->subchannel;
gpr_mu *mu = &c->mu;
@@ -477,20 +479,18 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
gpr_mu_lock(mu);
/* if we failed just leave this closure */
- if (iomgr_success) {
- if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- /* any errors on a subchannel ==> we're done, create a new one */
- sw->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE;
- }
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
- sw->connectivity_state, "reflect_child");
- if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
- grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL,
- &sw->connectivity_state, &sw->closure);
- GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- sw = NULL;
- }
+ if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ /* any errors on a subchannel ==> we're done, create a new one */
+ sw->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE;
+ }
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ sw->connectivity_state, error, "reflect_child");
+ if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL,
+ &sw->connectivity_state, &sw->closure);
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ sw = NULL;
}
gpr_mu_unlock(mu);
@@ -592,17 +592,19 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
/* signal completion */
grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY,
- "connected");
+ GRPC_ERROR_NONE, "connected");
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_subchannel *c = arg;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
if (c->disconnected) {
- iomgr_success = 0;
+ error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
+ } else {
+ grpc_error_ref(error);
}
- if (iomgr_success) {
+ if (error != GRPC_ERROR_NONE) {
c->next_attempt =
gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
continue_connect(exec_ctx, c);
@@ -614,7 +616,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
}
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
grpc_subchannel *c = arg;
GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
@@ -627,9 +629,10 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connect_failed");
+ grpc_connectivity_state_set(
+ exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
+ "connect_failed");
grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
}
gpr_mu_unlock(&c->mu);
@@ -641,7 +644,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
*/
static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
- bool success) {
+ grpc_error *error) {
grpc_subchannel_call *c = call;
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_connected_subchannel *connection = c->connection;
diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h
index 0765a544e8..e73f394584 100644
--- a/src/core/ext/client_config/subchannel.h
+++ b/src/core/ext/client_config/subchannel.h
@@ -118,7 +118,7 @@ void grpc_connected_subchannel_process_transport_op(
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
- grpc_subchannel *channel);
+ grpc_subchannel *channel, grpc_error **error);
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the channel */
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index 9918fbdcb4..3b10aa4474 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -43,14 +43,14 @@
#define CANCELLED_CALL ((grpc_subchannel_call *)1)
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
- bool success);
+ grpc_error *error);
static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
- bool success);
+ grpc_error *error);
static void add_waiting_locked(grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op);
static void fail_locked(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call_holder *holder);
+ grpc_subchannel_call_holder *holder, grpc_error *error);
static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder);
@@ -90,7 +90,8 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call = GET_CALL(holder);
GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0);
if (call == CANCELLED_CALL) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
+ GRPC_ERROR_CANCELLED);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
@@ -106,7 +107,8 @@ retry:
call = GET_CALL(holder);
if (call == CANCELLED_CALL) {
gpr_mu_unlock(&holder->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
+ GRPC_ERROR_CANCELLED);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
@@ -123,7 +125,10 @@ retry:
} else {
switch (holder->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
- fail_locked(exec_ctx, holder);
+ fail_locked(exec_ctx, holder,
+ grpc_error_set_int(GRPC_ERROR_CREATE("Cancelled"),
+ GRPC_ERROR_INT_GRPC_STATUS,
+ op->cancel_with_status));
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
@@ -131,7 +136,8 @@ retry:
break;
}
gpr_mu_unlock(&holder->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
+ GRPC_ERROR_CANCELLED);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
@@ -167,17 +173,22 @@ retry:
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
}
-static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
grpc_subchannel_call_holder *holder = arg;
gpr_mu_lock(&holder->mu);
GPR_ASSERT(holder->creation_phase ==
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
if (holder->connected_subchannel == NULL) {
- fail_locked(exec_ctx, holder);
+ fail_locked(exec_ctx, holder,
+ GRPC_ERROR_CREATE_REFERENCING("Failed to create subchannel",
+ &error, 1));
} else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) {
/* already cancelled before subchannel became ready */
- fail_locked(exec_ctx, holder);
+ fail_locked(exec_ctx, holder,
+ GRPC_ERROR_CREATE_REFERENCING(
+ "Cancelled before creating subchannel", &error, 1));
} else {
gpr_atm_rel_store(
&holder->subchannel_call,
@@ -203,18 +214,18 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
a->call = GET_CALL(holder);
if (a->call == CANCELLED_CALL) {
gpr_free(a);
- fail_locked(exec_ctx, holder);
+ fail_locked(exec_ctx, holder, GRPC_ERROR_CANCELLED);
return;
}
holder->waiting_ops = NULL;
holder->waiting_ops_count = 0;
holder->waiting_ops_capacity = 0;
GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
- grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), true,
- NULL);
+ grpc_exec_ctx_push(exec_ctx, grpc_closure_create(retry_ops, a),
+ GRPC_ERROR_NONE, NULL);
}
-static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, bool success) {
+static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
retry_ops_args *a = args;
size_t i;
for (i = 0; i < a->nops; i++) {
@@ -239,13 +250,15 @@ static void add_waiting_locked(grpc_subchannel_call_holder *holder,
}
static void fail_locked(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call_holder *holder) {
+ grpc_subchannel_call_holder *holder,
+ grpc_error *error) {
size_t i;
for (i = 0; i < holder->waiting_ops_count; i++) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx,
- &holder->waiting_ops[i]);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, &holder->waiting_ops[i], grpc_error_ref(error));
}
holder->waiting_ops_count = 0;
+ grpc_error_unref(error);
}
char *grpc_subchannel_call_holder_get_peer(
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 0d215cd196..48e6246781 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -103,8 +103,9 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
p->shutdown = 1;
pp = p->pending_picks;
p->pending_picks = NULL;
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
+ GRPC_ERROR_CREATE("Channel shutdown"), "shutdown");
/* cancel subscription */
if (selected != NULL) {
grpc_connected_subchannel_notify_on_state_change(
@@ -120,7 +121,7 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
*pp->target = NULL;
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
pp = next;
}
@@ -139,7 +140,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
*target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_push(exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -164,7 +166,8 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
initial_metadata_flags_eq) {
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_push(exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -235,7 +238,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
pick_first_lb_policy *p = arg;
size_t i;
size_t num_subchannels = p->num_subchannels;
@@ -256,12 +259,14 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
}
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
pick_first_lb_policy *p = arg;
grpc_subchannel *selected_subchannel;
pending_pick *pp;
grpc_connected_subchannel *selected;
+ grpc_error_ref(error);
+
gpr_mu_lock(&p->mu);
selected = GET_SELECTED(p);
@@ -276,7 +281,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_connectivity = GRPC_CHANNEL_FATAL_FAILURE;
}
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- p->checking_connectivity, "selected_changed");
+ p->checking_connectivity, grpc_error_ref(error),
+ "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, selected, p->base.interested_parties,
@@ -289,7 +295,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
switch (p->checking_connectivity) {
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_READY, "connecting_ready");
+ GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
+ "connecting_ready");
selected_subchannel = p->subchannels[p->checking_subchannel];
selected =
grpc_subchannel_get_connected_subchannel(selected_subchannel);
@@ -298,15 +305,16 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
- grpc_exec_ctx_enqueue(
- exec_ctx, grpc_closure_create(destroy_subchannels, p), true, NULL);
+ grpc_exec_ctx_push(exec_ctx,
+ grpc_closure_create(destroy_subchannels, p),
+ GRPC_ERROR_NONE, NULL);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = selected;
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
@@ -318,12 +326,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
(p->checking_subchannel + 1) % p->num_subchannels;
if (p->checking_subchannel == 0) {
/* only trigger transient failure when we've tried all alternatives */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ grpc_error_ref(error), "connecting_transient_failure");
}
+ grpc_error_unref(error);
p->checking_connectivity = grpc_subchannel_check_connectivity(
- p->subchannels[p->checking_subchannel]);
+ p->subchannels[p->checking_subchannel], &error);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
@@ -335,9 +344,9 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
break;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_CONNECTING,
- "connecting_changed");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
+ grpc_error_ref(error), "connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
p->base.interested_parties, &p->checking_connectivity,
@@ -350,38 +359,46 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
"pick_first");
if (p->num_subchannels == 0) {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE,
- "no_more_channels");
+ grpc_error_ref(error);
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
+ GRPC_ERROR_CREATE_REFERENCING("Pick first exhausted channels",
+ &error, 1),
+ "no_more_channels");
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE,
+ NULL);
gpr_free(pp);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
"pick_first_connectivity");
} else {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "subchannel_failed");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ grpc_error_ref(error), "subchannel_failed");
p->checking_subchannel %= p->num_subchannels;
+ grpc_error_unref(error);
p->checking_connectivity = grpc_subchannel_check_connectivity(
- p->subchannels[p->checking_subchannel]);
+ p->subchannels[p->checking_subchannel], &error);
goto loop;
}
}
}
gpr_mu_unlock(&p->mu);
+
+ grpc_error_unref(error);
}
static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol) {
+ grpc_lb_policy *pol,
+ grpc_error **error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_check(&p->state_tracker);
+ st = grpc_connectivity_state_check(&p->state_tracker, error);
gpr_mu_unlock(&p->mu);
return st;
}
@@ -404,7 +421,8 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (selected) {
grpc_connected_subchannel_ping(exec_ctx, selected, closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
+ grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"),
+ NULL);
}
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 3f6051b892..c119a25772 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -239,11 +239,13 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_push(exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE("Channel Shutdown"), NULL);
gpr_free(pp);
}
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
+ GRPC_ERROR_CREATE("Channel Shutdown"), "shutdown");
for (i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
@@ -265,7 +267,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
*target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -291,7 +293,7 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -366,7 +368,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy;
pending_pick *pp;
@@ -374,6 +376,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
int unref = 0;
+ grpc_error_ref(error);
gpr_mu_lock(&p->mu);
if (p->shutdown) {
@@ -382,7 +385,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
switch (sd->connectivity_state) {
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_READY, "connecting_ready");
+ GRPC_CHANNEL_READY, grpc_error_ref(error),
+ "connecting_ready");
/* add the newly connected subchannel to the list of connected ones.
* Note that it goes to the "end of the line". */
sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel);
@@ -406,7 +410,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
@@ -415,9 +419,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
break;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- sd->connectivity_state,
- "connecting_changed");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, sd->connectivity_state,
+ grpc_error_ref(error), "connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
@@ -433,9 +437,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
remove_disconnected_sc_locked(p, sd->ready_list_node);
sd->ready_list_node = NULL;
}
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ grpc_error_ref(error), "connecting_transient_failure");
break;
case GRPC_CHANNEL_FATAL_FAILURE:
if (sd->ready_list_node != NULL) {
@@ -452,19 +456,23 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
unref = 1;
if (p->num_subchannels == 0) {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE,
- "no_more_channels");
+ grpc_error_ref(error);
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
+ GRPC_ERROR_CREATE_REFERENCING("Round Robin Channels Exhausted",
+ &error, 1),
+ "no_more_channels");
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE,
+ NULL);
gpr_free(pp);
}
} else {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "subchannel_failed");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ grpc_error_ref(error), "subchannel_failed");
}
} /* switch */
} /* !unref */
@@ -474,14 +482,17 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (unref) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "round_robin_connectivity");
}
+
+ grpc_error_unref(error);
}
static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol) {
+ grpc_lb_policy *pol,
+ grpc_error **error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_check(&p->state_tracker);
+ st = grpc_connectivity_state_check(&p->state_tracker, error);
gpr_mu_unlock(&p->mu);
return st;
}
@@ -509,7 +520,8 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel_ping(exec_ctx, target, closure);
} else {
gpr_mu_unlock(&p->mu);
- grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
+ grpc_exec_ctx_push(exec_ctx, closure,
+ GRPC_ERROR_CREATE("Round Robin not connected"), NULL);
}
}
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 620ba4e2aa..0f9f5a438b 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -82,6 +82,9 @@ typedef struct {
grpc_timer retry_timer;
/** retry backoff state */
gpr_backoff backoff_state;
+
+ /** currently resolving addresses */
+ grpc_resolved_addresses *addresses;
} dns_resolver;
static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
@@ -108,7 +111,8 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
}
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, r->next_completion,
+ GRPC_ERROR_CREATE("Resolver Shutdown"), NULL);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -143,12 +147,12 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
}
static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
dns_resolver *r = arg;
gpr_mu_lock(&r->mu);
r->have_retry_timer = false;
- if (success) {
+ if (error == GRPC_ERROR_NONE) {
if (!r->resolving) {
dns_start_resolving_locked(exec_ctx, r);
}
@@ -159,13 +163,14 @@ static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
}
static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_resolved_addresses *addresses) {
+ grpc_error *error) {
dns_resolver *r = arg;
grpc_client_config *config = NULL;
grpc_lb_policy *lb_policy;
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
r->resolving = 0;
+ grpc_resolved_addresses *addresses = r->addresses;
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create();
@@ -183,8 +188,10 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
gpr_timespec timeout = gpr_time_sub(next_try, now);
- gpr_log(GPR_DEBUG, "dns resolution failed: retrying in %d.%09d seconds",
- timeout.tv_sec, timeout.tv_nsec);
+ const char *msg = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "dns resolution failed: retrying in %d.%09d seconds: %s",
+ timeout.tv_sec, timeout.tv_nsec, msg);
+ grpc_error_free_string(msg);
GPR_ASSERT(!r->have_retry_timer);
r->have_retry_timer = true;
GRPC_RESOLVER_REF(&r->base, "retry-timer");
@@ -207,7 +214,8 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
GRPC_RESOLVER_REF(&r->base, "dns-resolving");
GPR_ASSERT(!r->resolving);
r->resolving = 1;
- grpc_resolve_address(exec_ctx, r->name, r->default_port, dns_on_resolved, r);
+ grpc_resolve_address(exec_ctx, r->name, r->default_port,
+ grpc_closure_create(dns_on_resolved, r), &r->addresses);
}
static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
@@ -218,7 +226,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
if (r->resolved_config) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index a4fa9acf22..90da6a93a0 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -92,7 +92,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -133,7 +133,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
r->published = 1;
*r->target_config = cfg;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
}
}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index c5d3d8d9cc..28d1be878f 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -79,11 +79,11 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
connector_unref(exec_ctx, arg);
}
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
connector *c = arg;
grpc_closure *notify;
grpc_endpoint *tcp = c->tcp;
@@ -109,7 +109,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
}
notify = c->notify;
c->notify = NULL;
- notify->cb(exec_ctx, notify->cb_arg, 1);
+ notify->cb(exec_ctx, notify->cb_arg, error);
}
static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {}
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index a262306085..bceef152be 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -116,19 +116,19 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
notify = c->notify;
c->notify = NULL;
/* look at c->args which are connector args. */
- notify->cb(exec_ctx, notify->cb_arg, 1);
+ notify->cb(exec_ctx, notify->cb_arg, GRPC_ERROR_NONE);
if (args_copy != NULL) grpc_channel_args_destroy(args_copy);
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
connector *c = arg;
grpc_channel_security_connector_do_handshake(exec_ctx, c->security_connector,
c->connecting_endpoint,
on_secure_handshake_done, c);
}
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
connector *c = arg;
grpc_closure *notify;
grpc_endpoint *tcp = c->newly_connecting_endpoint;
@@ -153,7 +153,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
memset(c->result, 0, sizeof(*c->result));
notify = c->notify;
c->notify = NULL;
- notify->cb(exec_ctx, notify->cb_arg, 1);
+ notify->cb(exec_ctx, notify->cb_arg, GRPC_ERROR_NONE);
}
}
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index e21fa2a072..ef860b4223 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -35,6 +35,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/http_server_filter.h"
@@ -79,7 +80,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
grpc_closure *destroy_done) {
grpc_tcp_server *tcp = tcpp;
grpc_tcp_server_unref(exec_ctx, tcp);
- grpc_exec_ctx_enqueue(exec_ctx, destroy_done, true, NULL);
+ grpc_exec_ctx_push(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL);
}
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
@@ -90,23 +91,27 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
int port_num = -1;
int port_temp;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_error *err = GRPC_ERROR_NONE;
GRPC_API_TRACE("grpc_server_add_insecure_http2_port(server=%p, addr=%s)", 2,
(server, addr));
- resolved = grpc_blocking_resolve_address(addr, "http");
- if (!resolved) {
+ err = grpc_blocking_resolve_address(addr, "https", &resolved);
+ if (err != GRPC_ERROR_NONE) {
goto error;
}
- tcp = grpc_tcp_server_create(NULL);
- GPR_ASSERT(tcp);
+ err = grpc_tcp_server_create(NULL, &tcp);
+ if (err != GRPC_ERROR_NONE) {
+ goto error;
+ }
+ grpc_error **errors = gpr_malloc(sizeof(*errors) * resolved->naddrs);
for (i = 0; i < resolved->naddrs; i++) {
- port_temp = grpc_tcp_server_add_port(
+ errors[i] = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr,
- resolved->addrs[i].len);
- if (port_temp > 0) {
+ resolved->addrs[i].len, &port_temp);
+ if (errors[i] == GRPC_ERROR_NONE) {
if (port_num == -1) {
port_num = port_temp;
} else {
@@ -116,14 +121,28 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
}
}
if (count == 0) {
- gpr_log(GPR_ERROR, "No address added out of total %d resolved",
- resolved->naddrs);
+ char *msg;
+ gpr_asprintf(&msg, "No address added out of total %d resolved",
+ resolved->naddrs);
+ err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs);
goto error;
+ } else if (count != resolved->naddrs) {
+ char *msg;
+ gpr_asprintf(&msg, "Only %d addresses added out of total %d resolved",
+ count, resolved->naddrs);
+ err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs);
+ gpr_free(msg);
+
+ const char *warning_message = grpc_error_string(err);
+ gpr_log(GPR_INFO, "WARNING: %s", warning_message);
+ grpc_error_free_string(warning_message);
+ /* we managed to bind some addresses: continue */
+ } else {
+ for (i = 0; i < resolved->naddrs; i++) {
+ grpc_error_unref(errors[i]);
+ }
}
- if (count != resolved->naddrs) {
- gpr_log(GPR_ERROR, "Only %d addresses added out of total %d resolved",
- count, resolved->naddrs);
- }
+ gpr_free(errors);
grpc_resolved_addresses_destroy(resolved);
/* Register with the server only upon success */
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index 65d0d31c4a..0702fe9118 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
@@ -230,15 +231,28 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
}
}
if (count == 0) {
- gpr_log(GPR_ERROR, "No address added out of total %d resolved",
- resolved->naddrs);
+ char *msg;
+ gpr_asprintf(&msg, "No address added out of total %d resolved",
+ resolved->naddrs);
+ err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs);
goto error;
+ } else if (count != resolved->naddrs) {
+ char *msg;
+ gpr_asprintf(&msg, "Only %d addresses added out of total %d resolved",
+ count, resolved->naddrs);
+ err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs);
+ gpr_free(msg);
+
+ const char *warning_message = grpc_error_string(err);
+ gpr_log(GPR_INFO, "WARNING: %s", warning_message);
+ grpc_error_free_string(warning_message);
+ /* we managed to bind some addresses: continue */
+ } else {
+ for (i = 0; i < resolved->naddrs; i++) {
+ grpc_error_unref(errors[i]);
+ }
}
- if (count != resolved->naddrs) {
- gpr_log(GPR_ERROR, "Only %d addresses added out of total %d resolved",
- count, resolved->naddrs);
- /* if it's an error, don't we want to goto error; here ? */
- }
+ gpr_free(errors);
grpc_resolved_addresses_destroy(resolved);
/* Register with the server only upon success */
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 25d37711e5..14d1c26c3a 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -91,7 +91,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
uint32_t value);
/** Start disconnection chain */
-static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
+static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_error *error);
/** Perform a transport_op */
static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
@@ -128,7 +129,7 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
static void connectivity_state_set(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
- grpc_connectivity_state state, const char *reason);
+ grpc_connectivity_state state, grpc_error *error, const char *reason);
static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global);
@@ -387,7 +388,7 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s_ignored,
void *arg_ignored) {
t->destroying = 1;
- drop_connection(exec_ctx, t);
+ drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));
}
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
@@ -423,12 +424,11 @@ static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored,
- void *arg_ignored) {
+ grpc_error *error) {
if (!t->closed) {
t->closed = 1;
connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_FATAL_FAILURE,
- "close_transport");
+ error, "close_transport");
if (t->ep) {
allow_endpoint_shutdown_locked(exec_ctx, t);
}
@@ -529,7 +529,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
s->global.id == 0);
GPR_ASSERT(!s->global.in_stream_map);
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
- close_transport_locked(exec_ctx, t, NULL, NULL);
+ close_transport_locked(
+ exec_ctx, t,
+ GRPC_ERROR_CREATE("Last stream closed after sending goaway"));
}
if (!t->executor.parsing_active && s->global.id) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
@@ -739,7 +741,7 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
allow_endpoint_shutdown_locked(exec_ctx, t);
if (error != GRPC_ERROR_NONE) {
- drop_connection(exec_ctx, t);
+ drop_connection(exec_ctx, t, grpc_error_ref(error));
}
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
@@ -782,11 +784,16 @@ void grpc_chttp2_add_incoming_goaway(
uint32_t goaway_error, gpr_slice goaway_text) {
char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg);
- gpr_free(msg);
gpr_slice_unref(goaway_text);
transport_global->seen_goaway = 1;
- connectivity_state_set(exec_ctx, transport_global, GRPC_CHANNEL_FATAL_FAILURE,
- "got_goaway");
+ connectivity_state_set(
+ exec_ctx, transport_global, GRPC_CHANNEL_FATAL_FAILURE,
+ grpc_error_set_str(
+ grpc_error_set_int(GRPC_ERROR_CREATE("GOAWAY received"),
+ GRPC_ERROR_INT_HTTP2_ERROR, goaway_error),
+ GRPC_ERROR_STR_RAW_BYTES, msg),
+ "got_goaway");
+ gpr_free(msg);
}
static void maybe_start_some_streams(
@@ -815,9 +822,9 @@ static void maybe_start_some_streams(
transport_global->next_stream_id += 2;
if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
- connectivity_state_set(exec_ctx, transport_global,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "no_more_stream_ids");
+ connectivity_state_set(
+ exec_ctx, transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE("Stream IDs exhausted"), "no_more_stream_ids");
}
stream_global->outgoing_window =
@@ -1085,7 +1092,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s_unused,
void *stream_op) {
grpc_transport_op *op = stream_op;
- bool close_transport = op->disconnect;
+ grpc_error *close_transport = op->disconnect_with_error;
/* If there's a set_accept_stream ensure that we're not parsing
to avoid changing things out from underneath */
@@ -1110,7 +1117,9 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
t->global.last_incoming_stream_id,
(uint32_t)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
gpr_slice_ref(*op->goaway_message), &t->global.qbuf);
- close_transport = !grpc_chttp2_has_streams(t);
+ close_transport = grpc_chttp2_has_streams(t)
+ ? GRPC_ERROR_NONE
+ : GRPC_ERROR_CREATE("GOAWAY sent");
}
if (op->set_accept_stream) {
@@ -1131,8 +1140,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
send_ping_locked(t, op->send_ping);
}
- if (close_transport) {
- close_transport_locked(exec_ctx, t, NULL, NULL);
+ if (close_transport != GRPC_ERROR_NONE) {
+ close_transport_locked(exec_ctx, t, close_transport);
}
}
@@ -1231,7 +1240,9 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
- close_transport_locked(exec_ctx, t, NULL, NULL);
+ close_transport_locked(
+ exec_ctx, t,
+ GRPC_ERROR_CREATE("Last stream closed after sending GOAWAY"));
}
if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing");
@@ -1490,8 +1501,9 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx,
grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb);
}
-static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
- close_transport_locked(exec_ctx, t, NULL, NULL);
+static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_error *error) {
+ close_transport_locked(exec_ctx, t, error);
end_all_the_calls(exec_ctx, t);
}
@@ -1625,10 +1637,13 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s_unused,
void *arg) {
- bool success = (bool)(uintptr_t)arg;
+ grpc_error *error = arg;
bool keep_reading = false;
- if (!success || t->closed) {
- drop_connection(exec_ctx, t);
+ if (error == GRPC_ERROR_NONE && t->closed) {
+ error = GRPC_ERROR_CREATE("Transport closed");
+ }
+ if (error != GRPC_ERROR_NONE) {
+ drop_connection(exec_ctx, t, error);
t->endpoint_reading = 0;
if (!t->executor.writing_active && t->ep) {
grpc_endpoint_destroy(exec_ctx, t->ep);
@@ -1658,13 +1673,13 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
static void connectivity_state_set(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
- grpc_connectivity_state state, const char *reason) {
+ grpc_connectivity_state state, grpc_error *error, const char *reason) {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
grpc_connectivity_state_set(
exec_ctx,
&TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
- state, reason);
+ state, error, reason);
}
/*******************************************************************************
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index 5812af5c93..2c5f9f6cbd 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -104,6 +104,10 @@ static const char *error_int_name(grpc_error_ints key) {
return "size";
case GRPC_ERROR_INT_HTTP2_ERROR:
return "http2_error";
+ case GRPC_ERROR_INT_TSI_CODE:
+ return "tsi_code";
+ case GRPC_ERROR_INT_SECURITY_STATUS:
+ return "security_status";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -124,6 +128,8 @@ static const char *error_str_name(grpc_error_strs key) {
return "grpc_message";
case GRPC_ERROR_STR_RAW_BYTES:
return "raw_bytes";
+ case GRPC_ERROR_STR_TSI_ERROR:
+ return "tsi_error";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 234782ab03..143c8beffc 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -51,6 +51,8 @@ typedef enum {
GRPC_ERROR_INT_INDEX,
GRPC_ERROR_INT_SIZE,
GRPC_ERROR_INT_HTTP2_ERROR,
+ GRPC_ERROR_INT_TSI_CODE,
+ GRPC_ERROR_INT_SECURITY_STATUS,
} grpc_error_ints;
typedef enum {
@@ -61,6 +63,7 @@ typedef enum {
GRPC_ERROR_STR_TARGET_ADDRESS,
GRPC_ERROR_STR_GRPC_MESSAGE,
GRPC_ERROR_STR_RAW_BYTES,
+ GRPC_ERROR_STR_TSI_ERROR,
} grpc_error_strs;
typedef enum {
diff --git a/src/core/lib/security/credentials/credentials.h b/src/core/lib/security/credentials/credentials.h
index 5f44c7c3e3..675e02b58d 100644
--- a/src/core/lib/security/credentials/credentials.h
+++ b/src/core/lib/security/credentials/credentials.h
@@ -223,6 +223,7 @@ grpc_server_credentials *grpc_find_server_credentials_in_args(
typedef struct {
grpc_call_credentials *creds;
grpc_credentials_metadata_cb cb;
+ grpc_http_response response;
void *user_data;
} grpc_credentials_metadata_request;
diff --git a/src/core/lib/security/credentials/fake/fake_credentials.c b/src/core/lib/security/credentials/fake/fake_credentials.c
index 2a5d225078..1ff7bd14a5 100644
--- a/src/core/lib/security/credentials/fake/fake_credentials.c
+++ b/src/core/lib/security/credentials/fake/fake_credentials.c
@@ -95,7 +95,7 @@ static void md_only_test_destruct(grpc_call_credentials *creds) {
}
static void on_simulated_token_fetch_done(grpc_exec_ctx *exec_ctx,
- void *user_data, bool success) {
+ void *user_data, grpc_error *error) {
grpc_credentials_metadata_request *r =
(grpc_credentials_metadata_request *)user_data;
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds;
@@ -113,8 +113,9 @@ static void md_only_test_get_request_metadata(
if (c->is_async) {
grpc_credentials_metadata_request *cb_arg =
grpc_credentials_metadata_request_create(creds, cb, user_data);
- grpc_executor_enqueue(
- grpc_closure_create(on_simulated_token_fetch_done, cb_arg), true);
+ grpc_executor_push(
+ grpc_closure_create(on_simulated_token_fetch_done, cb_arg),
+ GRPC_ERROR_NONE);
} else {
cb(exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK);
}
@@ -136,4 +137,3 @@ grpc_call_credentials *grpc_md_only_test_credentials_create(
c->is_async = is_async;
return &c->base;
}
-
diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c
index da23bba62b..a67f19ae0f 100644
--- a/src/core/lib/security/credentials/google_default/google_default_credentials.c
+++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c
@@ -41,8 +41,8 @@
#include "src/core/lib/http/httpcli.h"
#include "src/core/lib/http/parser.h"
-#include "src/core/lib/security/credentials/oauth2/oauth2_credentials.h"
#include "src/core/lib/security/credentials/jwt/jwt_credentials.h"
+#include "src/core/lib/security/credentials/oauth2/oauth2_credentials.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/load_file.h"
#include "src/core/lib/surface/api_trace.h"
@@ -65,18 +65,20 @@ typedef struct {
grpc_pollset *pollset;
int is_done;
int success;
+ grpc_http_response response;
} compute_engine_detector;
-static void on_compute_engine_detection_http_response(
- grpc_exec_ctx *exec_ctx, void *user_data,
- const grpc_http_response *response) {
+static void on_compute_engine_detection_http_response(grpc_exec_ctx *exec_ctx,
+ void *user_data,
+ grpc_error *error) {
compute_engine_detector *detector = (compute_engine_detector *)user_data;
- if (response != NULL && response->status == 200 && response->hdr_count > 0) {
+ if (error == GRPC_ERROR_NONE && detector->response.status == 200 &&
+ detector->response.hdr_count > 0) {
/* Internet providers can return a generic response to all requests, so
it is necessary to check that metadata header is present also. */
size_t i;
- for (i = 0; i < response->hdr_count; i++) {
- grpc_http_header *header = &response->hdrs[i];
+ for (i = 0; i < detector->response.hdr_count; i++) {
+ grpc_http_header *header = &detector->response.hdrs[i];
if (strcmp(header->key, "Metadata-Flavor") == 0 &&
strcmp(header->value, "Google") == 0) {
detector->success = 1;
@@ -90,7 +92,7 @@ static void on_compute_engine_detection_http_response(
gpr_mu_unlock(g_polling_mu);
}
-static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool s) {
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *e) {
grpc_pollset_destroy(p);
}
@@ -119,9 +121,10 @@ static int is_stack_running_on_compute_engine(void) {
grpc_httpcli_get(
&exec_ctx, &context, detector.pollset, &request,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
- on_compute_engine_detection_http_response, &detector);
+ grpc_closure_create(on_compute_engine_detection_http_response, &detector),
+ &detector.response);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_exec_ctx_flush(&exec_ctx);
/* Block until we get the response. This is not ideal but this should only be
called once for the lifetime of the process by the default credentials. */
diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c
index cd6c7ce392..66aaae2c5a 100644
--- a/src/core/lib/security/credentials/jwt/jwt_verifier.c
+++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c
@@ -329,6 +329,7 @@ typedef struct {
gpr_slice signed_data;
void *user_data;
grpc_jwt_verification_done_cb user_cb;
+ grpc_http_response response;
} verifier_cb_ctx;
/* Takes ownership of the header, claims and signature. */
@@ -571,9 +572,9 @@ end:
}
static void on_keys_retrieved(grpc_exec_ctx *exec_ctx, void *user_data,
- const grpc_httpcli_response *response) {
- grpc_json *json = json_from_http(response);
+ grpc_error *error) {
verifier_cb_ctx *ctx = (verifier_cb_ctx *)user_data;
+ grpc_json *json = json_from_http(&ctx->response);
EVP_PKEY *verification_key = NULL;
grpc_jwt_verifier_status status = GRPC_JWT_VERIFIER_GENERIC_ERROR;
grpc_jwt_claims *claims = NULL;
@@ -612,10 +613,11 @@ end:
}
static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data,
- const grpc_httpcli_response *response) {
+ grpc_error *error) {
const grpc_json *cur;
- grpc_json *json = json_from_http(response);
verifier_cb_ctx *ctx = (verifier_cb_ctx *)user_data;
+ const grpc_http_response *response = &ctx->response;
+ grpc_json *json = json_from_http(response);
grpc_httpcli_request req;
const char *jwks_uri;
@@ -644,7 +646,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_httpcli_get(
exec_ctx, &ctx->verifier->http_ctx, ctx->pollset, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
- on_keys_retrieved, ctx);
+ grpc_closure_create(on_keys_retrieved, ctx), &ctx->response);
grpc_json_destroy(json);
gpr_free(req.host);
return;
@@ -686,7 +688,7 @@ static void verifier_put_mapping(grpc_jwt_verifier *v, const char *email_domain,
static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
verifier_cb_ctx *ctx) {
const char *at_sign;
- grpc_httpcli_response_cb http_cb;
+ grpc_closure *http_cb;
char *path_prefix = NULL;
const char *iss;
grpc_httpcli_request req;
@@ -730,7 +732,7 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
*(path_prefix++) = '\0';
gpr_asprintf(&req.http.path, "/%s/%s", path_prefix, iss);
}
- http_cb = on_keys_retrieved;
+ http_cb = grpc_closure_create(on_keys_retrieved, ctx);
} else {
req.host = gpr_strdup(strstr(iss, "https://") == iss ? iss + 8 : iss);
path_prefix = strchr(req.host, '/');
@@ -741,13 +743,13 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
gpr_asprintf(&req.http.path, "/%s%s", path_prefix,
GRPC_OPENID_CONFIG_URL_SUFFIX);
}
- http_cb = on_openid_config_retrieved;
+ http_cb = grpc_closure_create(on_openid_config_retrieved, ctx);
}
grpc_httpcli_get(
exec_ctx, &ctx->verifier->http_ctx, ctx->pollset, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
- http_cb, ctx);
+ http_cb, &ctx->response);
gpr_free(req.host);
gpr_free(req.http.path);
return;
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
index 0984d1f53f..671dea31e6 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
@@ -216,9 +216,9 @@ end:
return status;
}
-static void on_oauth2_token_fetcher_http_response(
- grpc_exec_ctx *exec_ctx, void *user_data,
- const grpc_http_response *response) {
+static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx,
+ void *user_data,
+ grpc_error *error) {
grpc_credentials_metadata_request *r =
(grpc_credentials_metadata_request *)user_data;
grpc_oauth2_token_fetcher_credentials *c =
@@ -228,7 +228,7 @@ static void on_oauth2_token_fetcher_http_response(
gpr_mu_lock(&c->mu);
status = grpc_oauth2_token_fetcher_credentials_parse_server_response(
- response, &c->access_token_md, &token_lifetime);
+ &r->response, &c->access_token_md, &token_lifetime);
if (status == GRPC_CREDENTIALS_OK) {
c->token_expiration =
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), token_lifetime);
@@ -296,7 +296,7 @@ static grpc_call_credentials_vtable compute_engine_vtable = {
static void compute_engine_fetch_oauth2(
grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req,
grpc_httpcli_context *httpcli_context, grpc_pollset *pollset,
- grpc_httpcli_response_cb response_cb, gpr_timespec deadline) {
+ grpc_iomgr_cb_func response_cb, gpr_timespec deadline) {
grpc_http_header header = {"Metadata-Flavor", "Google"};
grpc_httpcli_request request;
memset(&request, 0, sizeof(grpc_httpcli_request));
@@ -305,7 +305,8 @@ static void compute_engine_fetch_oauth2(
request.http.hdr_count = 1;
request.http.hdrs = &header;
grpc_httpcli_get(exec_ctx, httpcli_context, pollset, &request, deadline,
- response_cb, metadata_req);
+ grpc_closure_create(response_cb, metadata_req),
+ &metadata_req->response);
}
grpc_call_credentials *grpc_google_compute_engine_credentials_create(
@@ -337,7 +338,7 @@ static grpc_call_credentials_vtable refresh_token_vtable = {
static void refresh_token_fetch_oauth2(
grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req,
grpc_httpcli_context *httpcli_context, grpc_pollset *pollset,
- grpc_httpcli_response_cb response_cb, gpr_timespec deadline) {
+ grpc_iomgr_cb_func response_cb, gpr_timespec deadline) {
grpc_google_refresh_token_credentials *c =
(grpc_google_refresh_token_credentials *)metadata_req->creds;
grpc_http_header header = {"Content-Type",
@@ -354,7 +355,9 @@ static void refresh_token_fetch_oauth2(
request.http.hdrs = &header;
request.handshaker = &grpc_httpcli_ssl;
grpc_httpcli_post(exec_ctx, httpcli_context, pollset, &request, body,
- strlen(body), deadline, response_cb, metadata_req);
+ strlen(body), deadline,
+ grpc_closure_create(response_cb, metadata_req),
+ &metadata_req->response);
gpr_free(body);
}
@@ -426,5 +429,3 @@ grpc_call_credentials *grpc_access_token_credentials_create(
gpr_free(token_md_value);
return &c->base;
}
-
-
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
index 6cdcc68514..017f823ef4 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
@@ -71,7 +71,7 @@ typedef void (*grpc_fetch_oauth2_func)(grpc_exec_ctx *exec_ctx,
grpc_credentials_metadata_request *req,
grpc_httpcli_context *http_context,
grpc_pollset *pollset,
- grpc_httpcli_response_cb response_cb,
+ grpc_iomgr_cb_func cb,
gpr_timespec deadline);
typedef struct {
grpc_call_credentials base;
@@ -82,7 +82,6 @@ typedef struct {
grpc_fetch_oauth2_func fetch_func;
} grpc_oauth2_token_fetcher_credentials;
-
// Google refresh token credentials.
typedef struct {
grpc_oauth2_token_fetcher_credentials base;
@@ -108,4 +107,3 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response(
grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime);
#endif // GRPC_CORE_LIB_SECURITY_CREDENTIALS_OAUTH2_CREDENTIALS_H
-
diff --git a/src/core/lib/security/transport/handshake.c b/src/core/lib/security/transport/handshake.c
index 6561f4b47d..b3fc3d56d9 100644
--- a/src/core/lib/security/transport/handshake.c
+++ b/src/core/lib/security/transport/handshake.c
@@ -41,6 +41,7 @@
#include <grpc/support/slice_buffer.h>
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/transport/secure_endpoint.h"
+#include "src/core/lib/security/transport/tsi_error.h"
#define GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE 256
@@ -63,10 +64,11 @@ typedef struct {
} grpc_security_handshake;
static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
- void *setup, bool success);
+ void *setup,
+ grpc_error *error);
static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *setup,
- bool success);
+ grpc_error *error);
static void security_connector_remove_handshake(grpc_security_handshake *h) {
GPR_ASSERT(!h->is_client_side);
@@ -97,11 +99,11 @@ static void security_connector_remove_handshake(grpc_security_handshake *h) {
static void security_handshake_done(grpc_exec_ctx *exec_ctx,
grpc_security_handshake *h,
- int is_success) {
+ grpc_error *error) {
if (!h->is_client_side) {
security_connector_remove_handshake(h);
}
- if (is_success) {
+ if (error == GRPC_ERROR_NONE) {
h->cb(exec_ctx, h->user_data, GRPC_SECURITY_OK, h->secure_endpoint,
h->auth_context);
} else {
@@ -130,17 +132,20 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *user_data,
tsi_frame_protector *protector;
tsi_result result;
if (status != GRPC_SECURITY_OK) {
- gpr_log(GPR_ERROR, "Error checking peer.");
- security_handshake_done(exec_ctx, h, 0);
+ security_handshake_done(
+ exec_ctx, h,
+ grpc_error_set_int(GRPC_ERROR_CREATE("Error checking peer."),
+ GRPC_ERROR_INT_SECURITY_STATUS, status));
return;
}
h->auth_context = GRPC_AUTH_CONTEXT_REF(auth_context, "handshake");
result =
tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector);
if (result != TSI_OK) {
- gpr_log(GPR_ERROR, "Frame protector creation failed with error %s.",
- tsi_result_to_string(result));
- security_handshake_done(exec_ctx, h, 0);
+ security_handshake_done(
+ exec_ctx, h,
+ grpc_set_tsi_error_bits(
+ GRPC_ERROR_CREATE("Frame protector creation failed"), result));
return;
}
h->secure_endpoint =
@@ -148,7 +153,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *user_data,
h->left_overs.slices, h->left_overs.count);
h->left_overs.count = 0;
h->left_overs.length = 0;
- security_handshake_done(exec_ctx, h, 1);
+ security_handshake_done(exec_ctx, h, GRPC_ERROR_NONE);
return;
}
@@ -157,9 +162,9 @@ static void check_peer(grpc_exec_ctx *exec_ctx, grpc_security_handshake *h) {
tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer);
if (result != TSI_OK) {
- gpr_log(GPR_ERROR, "Peer extraction failed with error %s",
- tsi_result_to_string(result));
- security_handshake_done(exec_ctx, h, 0);
+ security_handshake_done(
+ exec_ctx, h, grpc_set_tsi_error_bits(
+ GRPC_ERROR_CREATE("Peer extraction failed"), result));
return;
}
grpc_security_connector_check_peer(exec_ctx, h->connector, peer,
@@ -185,9 +190,9 @@ static void send_handshake_bytes_to_peer(grpc_exec_ctx *exec_ctx,
} while (result == TSI_INCOMPLETE_DATA);
if (result != TSI_OK) {
- gpr_log(GPR_ERROR, "Handshake failed with error %s",
- tsi_result_to_string(result));
- security_handshake_done(exec_ctx, h, 0);
+ security_handshake_done(
+ exec_ctx, h,
+ grpc_set_tsi_error_bits(GRPC_ERROR_CREATE("Handshake failed"), result));
return;
}
@@ -203,7 +208,7 @@ static void send_handshake_bytes_to_peer(grpc_exec_ctx *exec_ctx,
static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
void *handshake,
- bool success) {
+ grpc_error *error) {
grpc_security_handshake *h = handshake;
size_t consumed_slice_size = 0;
tsi_result result = TSI_OK;
@@ -211,9 +216,10 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
size_t num_left_overs;
int has_left_overs_in_current_slice = 0;
- if (!success) {
- gpr_log(GPR_ERROR, "Read failed.");
- security_handshake_done(exec_ctx, h, 0);
+ if (error != GRPC_ERROR_NONE) {
+ security_handshake_done(
+ exec_ctx, h,
+ GRPC_ERROR_CREATE_REFERENCING("Handshake read failed", &error, 1));
return;
}
@@ -238,9 +244,9 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
}
if (result != TSI_OK) {
- gpr_log(GPR_ERROR, "Handshake failed with error %s",
- tsi_result_to_string(result));
- security_handshake_done(exec_ctx, h, 0);
+ security_handshake_done(
+ exec_ctx, h,
+ grpc_set_tsi_error_bits(GRPC_ERROR_CREATE("Handshake failed"), result));
return;
}
@@ -270,13 +276,15 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
/* If handshake is NULL, the handshake is done. */
static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx,
- void *handshake, bool success) {
+ void *handshake, grpc_error *error) {
grpc_security_handshake *h = handshake;
/* Make sure that write is OK. */
- if (!success) {
- gpr_log(GPR_ERROR, "Write failed.");
- if (handshake != NULL) security_handshake_done(exec_ctx, h, 0);
+ if (error != GRPC_ERROR_NONE) {
+ if (handshake != NULL)
+ security_handshake_done(
+ exec_ctx, h,
+ GRPC_ERROR_CREATE_REFERENCING("Handshake write failed", &error, 1));
return;
}
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index 4438c8e559..9bd5305f80 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -38,6 +38,7 @@
#include <grpc/support/slice_buffer.h>
#include <grpc/support/sync.h>
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/security/transport/tsi_error.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/tsi/transport_security_interface.h"
@@ -126,7 +127,7 @@ static void flush_read_staging_buffer(secure_endpoint *ep, uint8_t **cur,
}
static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep,
- bool success) {
+ grpc_error *error) {
if (grpc_trace_secure_endpoint) {
size_t i;
for (i = 0; i < ep->read_buffer->count; i++) {
@@ -137,11 +138,12 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep,
}
}
ep->read_buffer = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, ep->read_cb, success, NULL);
+ grpc_exec_ctx_push(exec_ctx, ep->read_cb, grpc_error_ref(error), NULL);
SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read");
}
-static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_error *error) {
unsigned i;
uint8_t keep_looping = 0;
tsi_result result = TSI_OK;
@@ -149,9 +151,10 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
uint8_t *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
uint8_t *end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
gpr_slice_buffer_reset_and_unref(ep->read_buffer);
- call_read_cb(exec_ctx, ep, 0);
+ call_read_cb(exec_ctx, ep, GRPC_ERROR_CREATE_REFERENCING(
+ "Secure read failed", &error, 1));
return;
}
@@ -208,11 +211,12 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
if (result != TSI_OK) {
gpr_slice_buffer_reset_and_unref(ep->read_buffer);
- call_read_cb(exec_ctx, ep, 0);
+ call_read_cb(exec_ctx, ep, grpc_set_tsi_error_bits(
+ GRPC_ERROR_CREATE("Unwrap failed"), result));
return;
}
- call_read_cb(exec_ctx, ep, 1);
+ call_read_cb(exec_ctx, ep, GRPC_ERROR_NONE);
}
static void endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
@@ -226,7 +230,7 @@ static void endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
if (ep->leftover_bytes.count) {
gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
GPR_ASSERT(ep->leftover_bytes.count == 0);
- on_read(exec_ctx, ep, 1);
+ on_read(exec_ctx, ep, GRPC_ERROR_NONE);
return;
}
@@ -315,7 +319,10 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
- grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
+ grpc_exec_ctx_push(
+ exec_ctx, cb,
+ grpc_set_tsi_error_bits(GRPC_ERROR_CREATE("Wrap failed"), result),
+ NULL);
return;
}
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 006a30f0c6..e0a7fb50d1 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -128,7 +128,7 @@ static void on_md_processing_done(
grpc_metadata_batch_filter(calld->recv_initial_metadata, remove_consumed_md,
elem);
grpc_metadata_array_destroy(&calld->md);
- calld->on_done_recv->cb(&exec_ctx, calld->on_done_recv->cb_arg, 1);
+ grpc_exec_ctx_push(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL);
} else {
gpr_slice message;
grpc_transport_stream_op close_op;
@@ -146,18 +146,21 @@ static void on_md_processing_done(
calld->transport_op.send_trailing_metadata = NULL;
grpc_transport_stream_op_add_close(&close_op, status, &message);
grpc_call_next_op(&exec_ctx, elem, &close_op);
- calld->on_done_recv->cb(&exec_ctx, calld->on_done_recv->cb_arg, 0);
+ grpc_exec_ctx_push(&exec_ctx, calld->on_done_recv,
+ grpc_error_set_int(GRPC_ERROR_CREATE(error_details),
+ GRPC_ERROR_INT_GRPC_STATUS, status),
+ NULL);
}
grpc_exec_ctx_finish(&exec_ctx);
}
static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
- bool success) {
+ grpc_error *error) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (success) {
+ if (error == GRPC_ERROR_NONE) {
if (chand->creds->processor.process != NULL) {
calld->md = metadata_batch_to_md_array(calld->recv_initial_metadata);
chand->creds->processor.process(
@@ -166,7 +169,8 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
return;
}
}
- calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
+ grpc_exec_ctx_push(exec_ctx, calld->on_done_recv, grpc_error_ref(error),
+ NULL);
}
static void set_recv_ops_md_callbacks(grpc_call_element *elem,
diff --git a/src/core/lib/security/transport/tsi_error.c b/src/core/lib/security/transport/tsi_error.c
new file mode 100644
index 0000000000..b9fb814905
--- /dev/null
+++ b/src/core/lib/security/transport/tsi_error.c
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/security/transport/tsi_error.h"
+
+grpc_error *grpc_set_tsi_error_bits(grpc_error *error, tsi_result result) {
+ return grpc_error_set_int(grpc_error_set_str(error, GRPC_ERROR_STR_TSI_ERROR,
+ tsi_result_to_string(result)),
+ GRPC_ERROR_INT_TSI_CODE, result);
+}
diff --git a/src/core/lib/security/transport/tsi_error.h b/src/core/lib/security/transport/tsi_error.h
new file mode 100644
index 0000000000..5406b8492b
--- /dev/null
+++ b/src/core/lib/security/transport/tsi_error.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef TSI_ERROR_H
+#define TSI_ERROR_H
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/tsi/transport_security_interface.h"
+
+grpc_error *grpc_set_tsi_error_bits(grpc_error *error, tsi_result result);
+
+#endif
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index ac5e1fcc0c..3c68dc0c4b 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -294,7 +294,7 @@ void grpc_channel_destroy(grpc_channel *channel) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel));
memset(&op, 0, sizeof(op));
- op.disconnect = 1;
+ op.disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
elem->filter->start_transport_op(&exec_ctx, elem, &op);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 61d1624f49..ad4d0b5481 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -273,7 +273,7 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
}
static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
- int send_goaway, int send_disconnect) {
+ int send_goaway, grpc_error *send_disconnect) {
grpc_transport_op op;
struct shutdown_cleanup_args *sc;
grpc_channel_element *elem;
@@ -284,7 +284,7 @@ static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
sc->slice = gpr_slice_from_copied_string("Server shutdown");
op.goaway_message = &sc->slice;
op.goaway_status = GRPC_STATUS_OK;
- op.disconnect = send_disconnect;
+ op.disconnect_with_error = send_disconnect;
grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
op.on_consumed = &sc->closure;
@@ -295,7 +295,7 @@ static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
channel_broadcaster *cb,
int send_goaway,
- int force_disconnect) {
+ grpc_error *force_disconnect) {
size_t i;
for (i = 0; i < cb->num_channels; i++) {
@@ -1100,7 +1100,9 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
op.set_accept_stream_user_data = chand;
op.on_connectivity_state_change = &chand->channel_connectivity_changed;
op.connectivity_state = &chand->connectivity_state;
- op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0;
+ if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
+ op.disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
+ }
grpc_transport_perform_op(exec_ctx, transport, &op);
}
@@ -1185,7 +1187,8 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
channel_broadcaster_init(server, &broadcaster);
gpr_mu_unlock(&server->mu_global);
- channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 0, 1);
+ channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 0,
+ GRPC_ERROR_CREATE("Cancelling all calls"));
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index cdfc298fa9..c6b274a5da 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -69,6 +69,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_tracker *tracker) {
grpc_error *error;
grpc_connectivity_state_watcher *w;
+ grpc_error_unref(tracker->current_error);
while ((w = tracker->watchers)) {
tracker->watchers = w->next;
@@ -85,11 +86,14 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx,
}
grpc_connectivity_state grpc_connectivity_state_check(
- grpc_connectivity_state_tracker *tracker) {
+ grpc_connectivity_state_tracker *tracker, grpc_error **error) {
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name,
grpc_connectivity_state_name(tracker->current_state));
}
+ if (error != NULL) {
+ *error = grpc_error_ref(tracker->current_error);
+ }
return tracker->current_state;
}
@@ -143,13 +147,26 @@ int grpc_connectivity_state_notify_on_state_change(
void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
- const char *reason) {
+ grpc_error *error, const char *reason) {
grpc_connectivity_state_watcher *w;
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "SET: %p %s: %s --> %s [%s]", tracker, tracker->name,
grpc_connectivity_state_name(tracker->current_state),
grpc_connectivity_state_name(state), reason);
}
+ switch (state) {
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_IDLE:
+ case GRPC_CHANNEL_READY:
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ break;
+ case GRPC_CHANNEL_FATAL_FAILURE:
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ break;
+ }
+ grpc_error_unref(tracker->current_error);
+ tracker->current_error = error;
if (tracker->current_state == state) {
return;
}
@@ -158,7 +175,8 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
while ((w = tracker->watchers) != NULL) {
*w->current = tracker->current_state;
tracker->watchers = w->next;
- grpc_exec_ctx_push(exec_ctx, w->notify, GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_push(exec_ctx, w->notify,
+ grpc_error_ref(tracker->current_error), NULL);
gpr_free(w);
}
}
diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h
index 2eb7e09124..7a2fa52c10 100644
--- a/src/core/lib/transport/connectivity_state.h
+++ b/src/core/lib/transport/connectivity_state.h
@@ -49,6 +49,8 @@ typedef struct grpc_connectivity_state_watcher {
typedef struct {
/** current connectivity state */
grpc_connectivity_state current_state;
+ /** error associated with state */
+ grpc_error *current_error;
/** all our watchers */
grpc_connectivity_state_watcher *watchers;
/** a name to help debugging */
@@ -70,10 +72,11 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx,
void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
+ grpc_error *associated_error,
const char *reason);
grpc_connectivity_state grpc_connectivity_state_check(
- grpc_connectivity_state_tracker *tracker);
+ grpc_connectivity_state_tracker *tracker, grpc_error **current_error);
/** Return 1 if the channel should start connecting, 0 otherwise.
If current==NULL cancel notify if it is already queued (success==0 in that
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index c4da1027cd..f2d750e870 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -154,7 +154,7 @@ typedef struct grpc_transport_op {
grpc_closure *on_connectivity_state_change;
grpc_connectivity_state *connectivity_state;
/** should the transport be disconnected */
- int disconnect;
+ grpc_error *disconnect_with_error;
/** should we send a goaway?
after a goaway is sent, once there are no more active calls on
the transport, the transport should disconnect */