aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc42
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc2
2 files changed, 24 insertions, 20 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 8119377504..f321fec444 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -397,10 +397,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
sd->subchannel_list == p->latest_pending_subchannel_list);
// Update state counters.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
- if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- ++sd->subchannel_list->num_shutdown;
- }
- sd->prev_connectivity_state = sd->curr_connectivity_state;
// Handle updates for the currently selected subchannel.
if (p->selected == sd) {
// If the new state is anything other than READY and there is a
@@ -447,6 +443,10 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
// for a subchannel in p->latest_pending_subchannel_list. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
+ sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ }
while (true) {
switch (sd->curr_connectivity_state) {
case GRPC_CHANNEL_INIT:
@@ -494,10 +494,13 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
return;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
+ do {
+ sd->subchannel_list->checking_subchannel =
+ (sd->subchannel_list->checking_subchannel + 1) %
+ sd->subchannel_list->num_subchannels;
+ sd = &sd->subchannel_list
+ ->subchannels[sd->subchannel_list->checking_subchannel];
+ } while (sd->subchannel == NULL);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->subchannel_list->checking_subchannel == 0 &&
@@ -506,10 +509,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
}
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
sd->curr_connectivity_state =
grpc_subchannel_check_connectivity(sd->subchannel, &error);
+ GRPC_ERROR_UNREF(error);
if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// Reuses the connectivity refs from the previous watch.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
@@ -530,11 +532,18 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
return;
}
case GRPC_CHANNEL_SHUTDOWN: {
- grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
"pf_candidate_shutdown");
- if (sd->subchannel_list->num_shutdown ==
- sd->subchannel_list->num_subchannels) {
+ // Advance to next subchannel and check its state.
+ grpc_lb_subchannel_data *original_sd = sd;
+ do {
+ sd->subchannel_list->checking_subchannel =
+ (sd->subchannel_list->checking_subchannel + 1) %
+ sd->subchannel_list->num_subchannels;
+ sd = &sd->subchannel_list
+ ->subchannels[sd->subchannel_list->checking_subchannel];
+ } while (sd->subchannel == NULL && sd != original_sd);
+ if (sd == original_sd) {
grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "pf_candidate_shutdown");
shutdown_locked(exec_ctx, p,
@@ -547,14 +556,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "subchannel_failed");
}
- // Advance to next subchannel and check its state.
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
sd->curr_connectivity_state =
grpc_subchannel_check_connectivity(sd->subchannel, &error);
+ GRPC_ERROR_UNREF(error);
if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// Reuses the connectivity refs from the previous watch.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
index 8d7e084a2e..c30416d124 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
@@ -79,7 +79,6 @@ void grpc_lb_subchannel_data_start_connectivity_watch(
void grpc_lb_subchannel_data_stop_connectivity_watch(
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) {
- GPR_ASSERT(sd->connectivity_notification_pending);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace) ||
GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG,
@@ -89,6 +88,7 @@ void grpc_lb_subchannel_data_stop_connectivity_watch(
(size_t)(sd - sd->subchannel_list->subchannels),
sd->subchannel_list->num_subchannels, sd->subchannel);
}
+ GPR_ASSERT(sd->connectivity_notification_pending);
sd->connectivity_notification_pending = false;
}