diff options
Diffstat (limited to 'src/core/ext/filters/client_channel')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 42 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc | 2 |
2 files changed, 24 insertions, 20 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 8119377504..f321fec444 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -397,10 +397,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, sd->subchannel_list == p->latest_pending_subchannel_list); // Update state counters. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - ++sd->subchannel_list->num_shutdown; - } - sd->prev_connectivity_state = sd->curr_connectivity_state; // Handle updates for the currently selected subchannel. if (p->selected == sd) { // If the new state is anything other than READY and there is a @@ -447,6 +443,10 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, // for a subchannel in p->latest_pending_subchannel_list. The // goal here is to find a subchannel from the update that we can // select in place of the current one. + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE || + sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + } while (true) { switch (sd->curr_connectivity_state) { case GRPC_CHANNEL_INIT: @@ -494,10 +494,13 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, return; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; + do { + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; + } while (sd->subchannel == NULL); // Case 1: Only set state to TRANSIENT_FAILURE if we've tried // all subchannels. if (sd->subchannel_list->checking_subchannel == 0 && @@ -506,10 +509,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; sd->curr_connectivity_state = grpc_subchannel_check_connectivity(sd->subchannel, &error); + GRPC_ERROR_UNREF(error); if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // Reuses the connectivity refs from the previous watch. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); @@ -530,11 +532,18 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, return; } case GRPC_CHANNEL_SHUTDOWN: { - grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_candidate_shutdown"); - if (sd->subchannel_list->num_shutdown == - sd->subchannel_list->num_subchannels) { + // Advance to next subchannel and check its state. + grpc_lb_subchannel_data *original_sd = sd; + do { + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; + } while (sd->subchannel == NULL && sd != original_sd); + if (sd == original_sd) { grpc_lb_subchannel_list_unref_for_connectivity_watch( exec_ctx, sd->subchannel_list, "pf_candidate_shutdown"); shutdown_locked(exec_ctx, p, @@ -547,14 +556,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "subchannel_failed"); } - // Advance to next subchannel and check its state. - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; sd->curr_connectivity_state = grpc_subchannel_check_connectivity(sd->subchannel, &error); + GRPC_ERROR_UNREF(error); if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // Reuses the connectivity refs from the previous watch. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index 8d7e084a2e..c30416d124 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -79,7 +79,6 @@ void grpc_lb_subchannel_data_start_connectivity_watch( void grpc_lb_subchannel_data_stop_connectivity_watch( grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { - GPR_ASSERT(sd->connectivity_notification_pending); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) || GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_DEBUG, @@ -89,6 +88,7 @@ void grpc_lb_subchannel_data_stop_connectivity_watch( (size_t)(sd - sd->subchannel_list->subchannels), sd->subchannel_list->num_subchannels, sd->subchannel); } + GPR_ASSERT(sd->connectivity_notification_pending); sd->connectivity_notification_pending = false; } |