diff options
Diffstat (limited to 'src/core/lib/channel/client_channel.c')
-rw-r--r-- | src/core/lib/channel/client_channel.c | 40 |
1 files changed, 31 insertions, 9 deletions
diff --git a/src/core/lib/channel/client_channel.c b/src/core/lib/channel/client_channel.c index 3f7cf1cf97..55867ba80c 100644 --- a/src/core/lib/channel/client_channel.c +++ b/src/core/lib/channel/client_channel.c @@ -114,6 +114,22 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state); +static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, + channel_data *chand, + grpc_connectivity_state state, + const char *reason) { + if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE || + state == GRPC_CHANNEL_FATAL_FAILURE) && + chand->lb_policy != NULL) { + /* cancel fail-fast picks */ + grpc_lb_policy_cancel_picks( + exec_ctx, chand->lb_policy, + /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY, + /* check= */ 0); + } + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, reason); +} + static void on_lb_policy_state_changed_locked( grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) { grpc_connectivity_state publish_state = w->state; @@ -127,8 +143,8 @@ static void on_lb_policy_state_changed_locked( GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); w->chand->lb_policy = NULL; } - grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state, - "lb_changed"); + set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, + "lb_changed"); if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); } @@ -200,8 +216,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (iomgr_success && chand->resolver) { - grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, - "new_lb+resolver"); + set_channel_connectivity_state_locked(exec_ctx, chand, state, + "new_lb+resolver"); if (lb_policy != NULL) { watch_lb_policy(exec_ctx, chand, lb_policy, state); } @@ -216,8 +232,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; } - grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); + set_channel_connectivity_state_locked( + exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); gpr_mu_unlock(&chand->mu_config); } @@ -272,8 +288,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } if (op->disconnect && chand->resolver != NULL) { - grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); + set_channel_connectivity_state_locked( + exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); grpc_resolver_shutdown(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; @@ -290,6 +306,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, typedef struct { grpc_metadata_batch *initial_metadata; + uint32_t initial_metadata_flags; grpc_connected_subchannel **connected_subchannel; grpc_closure *on_ready; grpc_call_element *elem; @@ -298,6 +315,7 @@ typedef struct { static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, + uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready); @@ -308,6 +326,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) { } else if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, + cpa->initial_metadata_flags, cpa->connected_subchannel, cpa->on_ready)) { grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL); } @@ -316,6 +335,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) { static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, grpc_metadata_batch *initial_metadata, + uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready) { grpc_call_element *elem = elemp; @@ -349,7 +369,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel"); gpr_mu_unlock(&chand->mu_config); r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset, - initial_metadata, connected_subchannel, on_ready); + initial_metadata, initial_metadata_flags, + connected_subchannel, on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel"); return r; } @@ -362,6 +383,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, } cpa = gpr_malloc(sizeof(*cpa)); cpa->initial_metadata = initial_metadata; + cpa->initial_metadata_flags = initial_metadata_flags; cpa->connected_subchannel = connected_subchannel; cpa->on_ready = on_ready; cpa->elem = elem; |