aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/client_config/client_channel.c
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-06-20 22:04:48 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2016-06-20 22:05:32 -0700
commit280fd2a1dcc4b8873fe6b2f1df57d50c92c27e1e (patch)
treebb2ddbda53db7f8418cd8504bc18dbeb7f91bebd /src/core/ext/client_config/client_channel.c
parent8782d1b7e104b74e0bf1ecba386420088140c76c (diff)
parent5988716d9d6e33cd59631865527d73d3caa87387 (diff)
Merge branch 'master' of github.com:grpc/grpc into grpclb_v0
Diffstat (limited to 'src/core/ext/client_config/client_channel.c')
-rw-r--r--src/core/ext/client_config/client_channel.c111
1 files changed, 67 insertions, 44 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index e297dfa0ef..1d5a7d5224 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -117,6 +117,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
channel_data *chand,
grpc_connectivity_state state,
+ grpc_error *error,
const char *reason) {
if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
state == GRPC_CHANNEL_SHUTDOWN) &&
@@ -127,11 +128,13 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
/* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
/* check= */ 0);
}
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, reason);
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
+ reason);
}
-static void on_lb_policy_state_changed_locked(
- grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
+static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
+ lb_policy_connectivity_watcher *w,
+ grpc_error *error) {
grpc_connectivity_state publish_state = w->state;
/* check if the notification is for a stale policy */
if (w->lb_policy != w->chand->lb_policy) return;
@@ -143,18 +146,18 @@ static void on_lb_policy_state_changed_locked(
w->chand->lb_policy = NULL;
}
set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
- "lb_changed");
+ GRPC_ERROR_REF(error), "lb_changed");
if (w->state != GRPC_CHANNEL_SHUTDOWN) {
watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
}
}
static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
lb_policy_connectivity_watcher *w = arg;
gpr_mu_lock(&w->chand->mu_config);
- on_lb_policy_state_changed_locked(exec_ctx, w);
+ on_lb_policy_state_changed_locked(exec_ctx, w, error);
gpr_mu_unlock(&w->chand->mu_config);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
@@ -176,19 +179,22 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
}
static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
int exit_idle = 0;
+ grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
if (chand->incoming_configuration != NULL) {
lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "channel");
GRPC_LB_POLICY_REF(lb_policy, "config_change");
- state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy);
+ GRPC_ERROR_UNREF(state_error);
+ state =
+ grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
grpc_client_config_unref(exec_ctx, chand->incoming_configuration);
@@ -208,7 +214,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
NULL);
} else if (chand->resolver == NULL /* disconnected */) {
- grpc_closure_list_fail_all(&chand->waiting_for_config_closures);
+ grpc_closure_list_fail_all(
+ &chand->waiting_for_config_closures,
+ GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1));
grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
NULL);
}
@@ -218,9 +226,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
chand->exit_idle_when_lb_policy_arrives = 0;
}
- if (iomgr_success && chand->resolver) {
- set_channel_connectivity_state_locked(exec_ctx, chand, state,
- "new_lb+resolver");
+ if (error == GRPC_ERROR_NONE && chand->resolver) {
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
@@ -235,8 +243,12 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
+ grpc_error *refs[] = {error, state_error};
set_channel_connectivity_state_locked(
- exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, "resolver_gone");
+ exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs,
+ GPR_ARRAY_SIZE(refs)),
+ "resolver_gone");
gpr_mu_unlock(&chand->mu_config);
}
@@ -256,6 +268,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
+ GRPC_ERROR_UNREF(state_error);
}
static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -263,7 +276,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_transport_op *op) {
channel_data *chand = elem->channel_data;
- grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
GPR_ASSERT(op->set_accept_stream == false);
if (op->bind_pollset != NULL) {
@@ -282,7 +295,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
if (op->send_ping != NULL) {
if (chand->lb_policy == NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, op->send_ping,
+ GRPC_ERROR_CREATE("Ping with no load balancing"),
+ NULL);
} else {
grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
op->bind_pollset = NULL;
@@ -290,24 +305,29 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
op->send_ping = NULL;
}
- if (op->disconnect && chand->resolver != NULL) {
- set_channel_connectivity_state_locked(exec_ctx, chand,
- GRPC_CHANNEL_SHUTDOWN, "disconnect");
- grpc_resolver_shutdown(exec_ctx, chand->resolver);
- GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
- chand->resolver = NULL;
- if (!chand->started_resolving) {
- grpc_closure_list_fail_all(&chand->waiting_for_config_closures);
- grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
- NULL);
- }
- if (chand->lb_policy != NULL) {
- grpc_pollset_set_del_pollset_set(exec_ctx,
- chand->lb_policy->interested_parties,
- chand->interested_parties);
- GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
- chand->lb_policy = NULL;
+ if (op->disconnect_with_error != GRPC_ERROR_NONE) {
+ if (chand->resolver != NULL) {
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
+ grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
+ chand->resolver = NULL;
+ if (!chand->started_resolving) {
+ grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
+ GRPC_ERROR_REF(op->disconnect_with_error));
+ grpc_exec_ctx_enqueue_list(exec_ctx,
+ &chand->waiting_for_config_closures, NULL);
+ }
+ if (chand->lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ chand->lb_policy->interested_parties,
+ chand->interested_parties);
+ GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
+ chand->lb_policy = NULL;
+ }
}
+ GRPC_ERROR_UNREF(op->disconnect_with_error);
}
gpr_mu_unlock(&chand->mu_config);
}
@@ -327,16 +347,17 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
-static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
continue_picking_args *cpa = arg;
if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
- } else if (!success) {
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
+ } else if (error != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL);
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready)) {
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
}
gpr_free(cpa);
}
@@ -361,11 +382,12 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
connected_subchannel);
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
- closure = grpc_closure_next(closure)) {
+ closure = closure->next_data.next) {
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, cpa->on_ready,
+ GRPC_ERROR_CREATE("Pick cancelled"), NULL);
}
}
gpr_mu_unlock(&chand->mu_config);
@@ -397,10 +419,11 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
cpa->on_ready = on_ready;
cpa->elem = elem;
grpc_closure_init(&cpa->closure, continue_picking, cpa);
- grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure,
- 1);
+ grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
+ GRPC_ERROR_NONE);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, on_ready, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"),
+ NULL);
}
gpr_mu_unlock(&chand->mu_config);
return 0;
@@ -507,7 +530,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
gpr_mu_lock(&chand->mu_config);
- out = grpc_connectivity_state_check(&chand->state_tracker);
+ out = grpc_connectivity_state_check(&chand->state_tracker, NULL);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
if (chand->lb_policy != NULL) {
grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
@@ -534,7 +557,7 @@ typedef struct {
} external_connectivity_watcher;
static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
external_connectivity_watcher *w = arg;
grpc_closure *follow_up = w->on_complete;
grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
@@ -542,7 +565,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
gpr_free(w);
- follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, error);
}
void grpc_client_channel_watch_connectivity_state(