diff options
author | Yuchen Zeng <zyc@google.com> | 2016-06-20 14:48:40 -0700 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2016-06-20 14:48:40 -0700 |
commit | e5633f8cb1f2f10d0bc393f7656cfdbb39434846 (patch) | |
tree | dfc2f6e659060e933b03bc4ce664452bc55db2bf /src/core/ext/client_config | |
parent | 9790e97704161cdd9a476d61a5568283727fe611 (diff) | |
parent | 1468d4b8d5873e3f210d391ff1b74eac13436fe9 (diff) |
Merge remote-tracking branch 'upstream/master' into handler_http_response
Diffstat (limited to 'src/core/ext/client_config')
-rw-r--r-- | src/core/ext/client_config/channel_connectivity.c | 34 | ||||
-rw-r--r-- | src/core/ext/client_config/client_channel.c | 111 | ||||
-rw-r--r-- | src/core/ext/client_config/connector.h | 2 | ||||
-rw-r--r-- | src/core/ext/client_config/lb_policy.c | 6 | ||||
-rw-r--r-- | src/core/ext/client_config/lb_policy.h | 8 | ||||
-rw-r--r-- | src/core/ext/client_config/subchannel.c | 89 | ||||
-rw-r--r-- | src/core/ext/client_config/subchannel.h | 2 | ||||
-rw-r--r-- | src/core/ext/client_config/subchannel_call_holder.c | 47 |
8 files changed, 186 insertions, 113 deletions
diff --git a/src/core/ext/client_config/channel_connectivity.c b/src/core/ext/client_config/channel_connectivity.c index cc60f2485a..20c01a9a7c 100644 --- a/src/core/ext/client_config/channel_connectivity.c +++ b/src/core/ext/client_config/channel_connectivity.c @@ -75,7 +75,6 @@ typedef enum { typedef struct { gpr_mu mu; callback_phase phase; - int success; grpc_closure on_complete; grpc_timer alarm; grpc_connectivity_state state; @@ -122,7 +121,7 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, } static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, - int due_to_completion) { + bool due_to_completion, grpc_error *error) { int delete = 0; if (due_to_completion) { @@ -130,14 +129,26 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, } gpr_mu_lock(&w->mu); + if (due_to_completion) { - w->success = 1; + if (grpc_trace_operation_failures) { + GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error)); + } + GRPC_ERROR_UNREF(error); + error = GRPC_ERROR_NONE; + } else { + if (error == GRPC_ERROR_NONE) { + error = + GRPC_ERROR_CREATE("Timed out waiting for connection state change"); + } else if (error == GRPC_ERROR_CANCELLED) { + error = GRPC_ERROR_NONE; + } } switch (w->phase) { case WAITING: w->phase = CALLING_BACK; - grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->success, finished_completion, - w, &w->completion_storage); + grpc_cq_end_op(exec_ctx, w->cq, w->tag, GRPC_ERROR_REF(error), + finished_completion, w, &w->completion_storage); break; case CALLING_BACK: w->phase = CALLING_BACK_AND_FINISHED; @@ -153,14 +164,18 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, if (delete) { delete_state_watcher(exec_ctx, w); } + + GRPC_ERROR_UNREF(error); } -static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) { - partly_done(exec_ctx, pw, 1); +static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, + grpc_error *error) { + partly_done(exec_ctx, pw, true, GRPC_ERROR_REF(error)); } -static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) { - partly_done(exec_ctx, pw, 0); +static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, + grpc_error *error) { + partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error)); } void grpc_channel_watch_connectivity_state( @@ -185,7 +200,6 @@ void grpc_channel_watch_connectivity_state( grpc_closure_init(&w->on_complete, watch_complete, w); w->phase = WAITING; w->state = last_observed_state; - w->success = 0; w->cq = cq; w->tag = tag; w->channel = channel; diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index e297dfa0ef..1d5a7d5224 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -117,6 +117,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_connectivity_state state, + grpc_error *error, const char *reason) { if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE || state == GRPC_CHANNEL_SHUTDOWN) && @@ -127,11 +128,13 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY, /* check= */ 0); } - grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, reason); + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, + reason); } -static void on_lb_policy_state_changed_locked( - grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) { +static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, + lb_policy_connectivity_watcher *w, + grpc_error *error) { grpc_connectivity_state publish_state = w->state; /* check if the notification is for a stale policy */ if (w->lb_policy != w->chand->lb_policy) return; @@ -143,18 +146,18 @@ static void on_lb_policy_state_changed_locked( w->chand->lb_policy = NULL; } set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, - "lb_changed"); + GRPC_ERROR_REF(error), "lb_changed"); if (w->state != GRPC_CHANNEL_SHUTDOWN) { watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); } } static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_success) { + grpc_error *error) { lb_policy_connectivity_watcher *w = arg; gpr_mu_lock(&w->chand->mu_config); - on_lb_policy_state_changed_locked(exec_ctx, w); + on_lb_policy_state_changed_locked(exec_ctx, w, error); gpr_mu_unlock(&w->chand->mu_config); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); @@ -176,19 +179,22 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, } static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_success) { + grpc_error *error) { channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; int exit_idle = 0; + grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); if (chand->incoming_configuration != NULL) { lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); if (lb_policy != NULL) { GRPC_LB_POLICY_REF(lb_policy, "channel"); GRPC_LB_POLICY_REF(lb_policy, "config_change"); - state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy); + GRPC_ERROR_UNREF(state_error); + state = + grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); } grpc_client_config_unref(exec_ctx, chand->incoming_configuration); @@ -208,7 +214,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, NULL); } else if (chand->resolver == NULL /* disconnected */) { - grpc_closure_list_fail_all(&chand->waiting_for_config_closures); + grpc_closure_list_fail_all( + &chand->waiting_for_config_closures, + GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1)); grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, NULL); } @@ -218,9 +226,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->exit_idle_when_lb_policy_arrives = 0; } - if (iomgr_success && chand->resolver) { - set_channel_connectivity_state_locked(exec_ctx, chand, state, - "new_lb+resolver"); + if (error == GRPC_ERROR_NONE && chand->resolver) { + set_channel_connectivity_state_locked( + exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); if (lb_policy != NULL) { watch_lb_policy(exec_ctx, chand, lb_policy, state); } @@ -235,8 +243,12 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; } + grpc_error *refs[] = {error, state_error}; set_channel_connectivity_state_locked( - exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, "resolver_gone"); + exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs, + GPR_ARRAY_SIZE(refs)), + "resolver_gone"); gpr_mu_unlock(&chand->mu_config); } @@ -256,6 +268,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver"); + GRPC_ERROR_UNREF(state_error); } static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -263,7 +276,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport_op *op) { channel_data *chand = elem->channel_data; - grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { @@ -282,7 +295,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->send_ping != NULL) { if (chand->lb_policy == NULL) { - grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, false, NULL); + grpc_exec_ctx_sched(exec_ctx, op->send_ping, + GRPC_ERROR_CREATE("Ping with no load balancing"), + NULL); } else { grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); op->bind_pollset = NULL; @@ -290,24 +305,29 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, op->send_ping = NULL; } - if (op->disconnect && chand->resolver != NULL) { - set_channel_connectivity_state_locked(exec_ctx, chand, - GRPC_CHANNEL_SHUTDOWN, "disconnect"); - grpc_resolver_shutdown(exec_ctx, chand->resolver); - GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); - chand->resolver = NULL; - if (!chand->started_resolving) { - grpc_closure_list_fail_all(&chand->waiting_for_config_closures); - grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, - NULL); - } - if (chand->lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, - chand->lb_policy->interested_parties, - chand->interested_parties); - GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); - chand->lb_policy = NULL; + if (op->disconnect_with_error != GRPC_ERROR_NONE) { + if (chand->resolver != NULL) { + set_channel_connectivity_state_locked( + exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_REF(op->disconnect_with_error), "disconnect"); + grpc_resolver_shutdown(exec_ctx, chand->resolver); + GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + chand->resolver = NULL; + if (!chand->started_resolving) { + grpc_closure_list_fail_all(&chand->waiting_for_config_closures, + GRPC_ERROR_REF(op->disconnect_with_error)); + grpc_exec_ctx_enqueue_list(exec_ctx, + &chand->waiting_for_config_closures, NULL); + } + if (chand->lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, + chand->lb_policy->interested_parties, + chand->interested_parties); + GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); + chand->lb_policy = NULL; + } } + GRPC_ERROR_UNREF(op->disconnect_with_error); } gpr_mu_unlock(&chand->mu_config); } @@ -327,16 +347,17 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready); -static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { continue_picking_args *cpa = arg; if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ - } else if (!success) { - grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); + } else if (error != GRPC_ERROR_NONE) { + grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, cpa->connected_subchannel, cpa->on_ready)) { - grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL); + grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); } gpr_free(cpa); } @@ -361,11 +382,12 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, connected_subchannel); } for (closure = chand->waiting_for_config_closures.head; closure != NULL; - closure = grpc_closure_next(closure)) { + closure = closure->next_data.next) { cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); + grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, + GRPC_ERROR_CREATE("Pick cancelled"), NULL); } } gpr_mu_unlock(&chand->mu_config); @@ -397,10 +419,11 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, cpa->on_ready = on_ready; cpa->elem = elem; grpc_closure_init(&cpa->closure, continue_picking, cpa); - grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, - 1); + grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, + GRPC_ERROR_NONE); } else { - grpc_exec_ctx_enqueue(exec_ctx, on_ready, false, NULL); + grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"), + NULL); } gpr_mu_unlock(&chand->mu_config); return 0; @@ -507,7 +530,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( channel_data *chand = elem->channel_data; grpc_connectivity_state out; gpr_mu_lock(&chand->mu_config); - out = grpc_connectivity_state_check(&chand->state_tracker); + out = grpc_connectivity_state_check(&chand->state_tracker, NULL); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { if (chand->lb_policy != NULL) { grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); @@ -534,7 +557,7 @@ typedef struct { } external_connectivity_watcher; static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_success) { + grpc_error *error) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, @@ -542,7 +565,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); gpr_free(w); - follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success); + follow_up->cb(exec_ctx, follow_up->cb_arg, error); } void grpc_client_channel_watch_connectivity_state( diff --git a/src/core/ext/client_config/connector.h b/src/core/ext/client_config/connector.h index dd85dfcb7d..ea9d23706e 100644 --- a/src/core/ext/client_config/connector.h +++ b/src/core/ext/client_config/connector.h @@ -64,7 +64,7 @@ typedef struct { grpc_transport *transport; /** channel arguments (to be passed to the filters) */ - const grpc_channel_args *channel_args; + grpc_channel_args *channel_args; } grpc_connect_out_args; struct grpc_connector_vtable { diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c index dc1612428e..8b980b2cca 100644 --- a/src/core/ext/client_config/lb_policy.c +++ b/src/core/ext/client_config/lb_policy.c @@ -139,6 +139,8 @@ void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, } grpc_connectivity_state grpc_lb_policy_check_connectivity( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { - return policy->vtable->check_connectivity(exec_ctx, policy); + grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_error **connectivity_error) { + return policy->vtable->check_connectivity(exec_ctx, policy, + connectivity_error); } diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index 56fa11198b..3cfd041d3a 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -77,8 +77,9 @@ struct grpc_lb_policy_vtable { void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** check the current connectivity of the lb_policy */ - grpc_connectivity_state (*check_connectivity)(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *policy); + grpc_connectivity_state (*check_connectivity)( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_error **connectivity_error); /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the policy */ @@ -154,6 +155,7 @@ void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_closure *closure); grpc_connectivity_state grpc_lb_policy_check_connectivity( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); + grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_error **connectivity_error); #endif /* GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H */ diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 85183b0564..468067ea57 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -54,7 +54,7 @@ #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) #define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20 -#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 2 +#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 @@ -147,7 +147,7 @@ struct grpc_subchannel_call { (((grpc_subchannel_call *)(callstack)) - 1) static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, - bool iomgr_success); + grpc_error *error); #ifdef GRPC_STREAM_REFCOUNT_DEBUG #define REF_REASON reason @@ -177,7 +177,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, */ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, - bool success) { + grpc_error *error) { grpc_connected_subchannel *c = arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c)); gpr_free(c); @@ -200,7 +200,7 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, */ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, - bool success) { + grpc_error *error) { grpc_subchannel *c = arg; gpr_free((void *)c->filters); grpc_channel_args_destroy(c->args); @@ -290,8 +290,8 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_refs; old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), - true, NULL); + grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c), + GRPC_ERROR_NONE, NULL); } } @@ -382,7 +382,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { args.initial_connect_string = c->initial_connect_string; grpc_connectivity_state_set(exec_ctx, &c->state_tracker, - GRPC_CHANNEL_CONNECTING, "state_change"); + GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, + "state_change"); grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result, &c->connected); } @@ -393,16 +394,17 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { continue_connect(exec_ctx, c); } -grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { +grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c, + grpc_error **error) { grpc_connectivity_state state; gpr_mu_lock(&c->mu); - state = grpc_connectivity_state_check(&c->state_tracker); + state = grpc_connectivity_state_check(&c->state_tracker, error); gpr_mu_unlock(&c->mu); return state; } static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, - bool success) { + grpc_error *error) { external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; if (w->pollset_set != NULL) { @@ -415,7 +417,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&w->subchannel->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); gpr_free(w); - follow_up->cb(exec_ctx, follow_up->cb_arg, success); + follow_up->cb(exec_ctx, follow_up->cb_arg, error); } void grpc_subchannel_notify_on_state_change( @@ -469,7 +471,7 @@ void grpc_connected_subchannel_process_transport_op( } static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, - bool iomgr_success) { + grpc_error *error) { state_watcher *sw = p; grpc_subchannel *c = sw->subchannel; gpr_mu *mu = &c->mu; @@ -477,20 +479,19 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, gpr_mu_lock(mu); /* if we failed just leave this closure */ - if (iomgr_success) { - if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* any errors on a subchannel ==> we're done, create a new one */ - sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN; - } - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, - sw->connectivity_state, "reflect_child"); - if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL, - &sw->connectivity_state, &sw->closure); - GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - sw = NULL; - } + if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + /* any errors on a subchannel ==> we're done, create a new one */ + sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN; + } + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + sw->connectivity_state, GRPC_ERROR_REF(error), + "reflect_child"); + if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL, + &sw->connectivity_state, &sw->closure); + GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); + sw = NULL; } gpr_mu_unlock(mu); @@ -592,17 +593,20 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, /* signal completion */ grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, - "connected"); + GRPC_ERROR_NONE, "connected"); } -static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { +static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_subchannel *c = arg; gpr_mu_lock(&c->mu); c->have_alarm = 0; if (c->disconnected) { - iomgr_success = 0; + error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1); + } else { + GRPC_ERROR_REF(error); } - if (iomgr_success) { + if (error == GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); c->next_attempt = gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); continue_connect(exec_ctx, c); @@ -611,11 +615,13 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { gpr_mu_unlock(&c->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } + GRPC_ERROR_UNREF(error); } static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_success) { + grpc_error *error) { grpc_subchannel *c = arg; + grpc_channel_args *delete_channel_args = c->connecting_result.channel_args; GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); gpr_mu_lock(&c->mu); @@ -627,13 +633,26 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "connect_failed"); + grpc_connectivity_state_set( + exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1), + "connect_failed"); + gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now); + const char *errmsg = grpc_error_string(error); + gpr_log(GPR_INFO, "Connect failed: %s", errmsg); + if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <= + 0) { + gpr_log(GPR_INFO, "Retry immediately"); + } else { + gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds", + time_til_next.tv_sec, time_til_next.tv_nsec); + } grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); + grpc_error_free_string(errmsg); } gpr_mu_unlock(&c->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); + grpc_channel_args_destroy(delete_channel_args); } /* @@ -641,7 +660,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, */ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, - bool success) { + grpc_error *error) { grpc_subchannel_call *c = call; GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); grpc_connected_subchannel *connection = c->connection; diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h index 525f854a44..b6d39f5dc5 100644 --- a/src/core/ext/client_config/subchannel.h +++ b/src/core/ext/client_config/subchannel.h @@ -119,7 +119,7 @@ void grpc_connected_subchannel_process_transport_op( /** poll the current connectivity state of a channel */ grpc_connectivity_state grpc_subchannel_check_connectivity( - grpc_subchannel *channel); + grpc_subchannel *channel, grpc_error **error); /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the channel */ diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c index 3df1f254d6..e31800edd9 100644 --- a/src/core/ext/client_config/subchannel_call_holder.c +++ b/src/core/ext/client_config/subchannel_call_holder.c @@ -43,14 +43,14 @@ #define CANCELLED_CALL ((grpc_subchannel_call *)1) static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder, - bool success); + grpc_error *error); static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args, - bool success); + grpc_error *error); static void add_waiting_locked(grpc_subchannel_call_holder *holder, grpc_transport_stream_op *op); static void fail_locked(grpc_exec_ctx *exec_ctx, - grpc_subchannel_call_holder *holder); + grpc_subchannel_call_holder *holder, grpc_error *error); static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder); @@ -91,7 +91,8 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call = GET_CALL(holder); GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0); if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op, + GRPC_ERROR_CANCELLED); GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); return; } @@ -107,7 +108,8 @@ retry: call = GET_CALL(holder); if (call == CANCELLED_CALL) { gpr_mu_unlock(&holder->mu); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op, + GRPC_ERROR_CANCELLED); GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); return; } @@ -124,7 +126,10 @@ retry: } else { switch (holder->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: - fail_locked(exec_ctx, holder); + fail_locked(exec_ctx, holder, + grpc_error_set_int(GRPC_ERROR_CREATE("Cancelled"), + GRPC_ERROR_INT_GRPC_STATUS, + op->cancel_with_status)); break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL, @@ -132,7 +137,8 @@ retry: break; } gpr_mu_unlock(&holder->mu); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op, + GRPC_ERROR_CANCELLED); GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); return; } @@ -168,7 +174,8 @@ retry: GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); } -static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_subchannel_call_holder *holder = arg; gpr_mu_lock(&holder->mu); GPR_ASSERT(holder->creation_phase == @@ -176,10 +183,14 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (holder->connected_subchannel == NULL) { gpr_atm_no_barrier_store(&holder->subchannel_call, 1); - fail_locked(exec_ctx, holder); + fail_locked(exec_ctx, holder, + GRPC_ERROR_CREATE_REFERENCING("Failed to create subchannel", + &error, 1)); } else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) { /* already cancelled before subchannel became ready */ - fail_locked(exec_ctx, holder); + fail_locked(exec_ctx, holder, + GRPC_ERROR_CREATE_REFERENCING( + "Cancelled before creating subchannel", &error, 1)); } else { gpr_atm_rel_store( &holder->subchannel_call, @@ -205,18 +216,18 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, a->call = GET_CALL(holder); if (a->call == CANCELLED_CALL) { gpr_free(a); - fail_locked(exec_ctx, holder); + fail_locked(exec_ctx, holder, GRPC_ERROR_CANCELLED); return; } holder->waiting_ops = NULL; holder->waiting_ops_count = 0; holder->waiting_ops_capacity = 0; GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), true, - NULL); + grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a), + GRPC_ERROR_NONE, NULL); } -static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, bool success) { +static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { retry_ops_args *a = args; size_t i; for (i = 0; i < a->nops; i++) { @@ -241,13 +252,15 @@ static void add_waiting_locked(grpc_subchannel_call_holder *holder, } static void fail_locked(grpc_exec_ctx *exec_ctx, - grpc_subchannel_call_holder *holder) { + grpc_subchannel_call_holder *holder, + grpc_error *error) { size_t i; for (i = 0; i < holder->waiting_ops_count; i++) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, - &holder->waiting_ops[i]); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, &holder->waiting_ops[i], GRPC_ERROR_REF(error)); } holder->waiting_ops_count = 0; + GRPC_ERROR_UNREF(error); } char *grpc_subchannel_call_holder_get_peer( |