diff options
Diffstat (limited to 'src/core')
3 files changed, 60 insertions, 40 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 24c381a46d..ab6d3e6a03 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -328,18 +328,11 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. * CHECK: sd->curr_connectivity_state == CONNECTING. * - * 3) RULE: ALL subchannels are SHUTDOWN => policy is IDLE (and requests - * re-resolution). - * CHECK: subchannel_list->num_shutdown == - * subchannel_list->num_subchannels. - * - * 4) RULE: ALL subchannels are SHUTDOWN or TRANSIENT_FAILURE => policy is - * TRANSIENT_FAILURE. - * CHECK: subchannel_list->num_shutdown + - * subchannel_list->num_transient_failures == + * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is + * TRANSIENT_FAILURE. + * CHECK: subchannel_list->num_transient_failures == * subchannel_list->num_subchannels. */ - // TODO(juanlishen): For rule 4, we may want to re-resolve instead. grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy; GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE); @@ -351,22 +344,12 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, /* 2) CONNECTING */ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); - } else if (subchannel_list->num_shutdown == + } else if (subchannel_list->num_transient_failures == subchannel_list->num_subchannels) { - /* 3) IDLE and re-resolve */ - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, - GRPC_ERROR_NONE, - "rr_exhausted_subchannels+reresolve"); - p->started_picking = false; - grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, - GRPC_ERROR_NONE); - } else if (subchannel_list->num_shutdown + - subchannel_list->num_transient_failures == - subchannel_list->num_subchannels) { - /* 4) TRANSIENT_FAILURE */ - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "rr_transient_failure"); + /* 3) TRANSIENT_FAILURE */ + grpc_connectivity_state_set( + &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "rr_exhausted_subchannels"); } GRPC_ERROR_UNREF(error); } @@ -387,6 +370,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { p->shutdown, sd->subchannel_list->shutting_down, grpc_error_string(error)); } + GPR_ASSERT(sd->subchannel != nullptr); // If the policy is shutting down, unref and return. if (p->shutdown) { grpc_lb_subchannel_data_stop_connectivity_watch(sd); @@ -412,14 +396,19 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - // Update state counters and new overall state. - update_state_counters_locked(sd); - update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* // subchannel, if any. switch (sd->curr_connectivity_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: { sd->connected_subchannel.reset(); + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_DEBUG, + "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " + "Requesting re-resolution", + p, sd->subchannel); + } + grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, + GRPC_ERROR_NONE); break; } case GRPC_CHANNEL_READY: { @@ -442,8 +431,8 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { gpr_log(GPR_DEBUG, "[RR %p] phasing out subchannel list %p (size %lu) in favor " "of %p (size %lu)", - (void*)p, (void*)p->subchannel_list, num_subchannels, - (void*)sd->subchannel_list, num_subchannels); + p, p->subchannel_list, num_subchannels, sd->subchannel_list, + num_subchannels); } if (p->subchannel_list != nullptr) { // dispose of the current subchannel_list @@ -455,7 +444,8 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { } /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemptively replicates rr_pick()'s actions. */ + * p->pending_picks. This preemptively replicates rr_pick()'s actions. + */ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); grpc_lb_subchannel_data* selected = @@ -488,6 +478,12 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE:; // fallthrough } + // Update state counters and new overall state. + update_state_counters_locked(sd); + // Only update connectivity based on the selected subchannel list. + if (sd->subchannel_list == p->subchannel_list) { + update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); + } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); } @@ -562,6 +558,30 @@ static void rr_update_locked(grpc_lb_policy* policy, return; } if (p->started_picking) { + for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { + const grpc_connectivity_state subchannel_state = + grpc_subchannel_check_connectivity( + subchannel_list->subchannels[i].subchannel, nullptr); + // Override the default setting of IDLE for connectivity notification + // purposes if the subchannel is already in transient failure. Otherwise + // we'd be immediately notified of the IDLE-TRANSIENT_FAILURE + // discrepancy, attempt to re-resolve and end up here again. + // TODO(roth): As part of C++-ifying the subchannel_list API, design a + // better API for notifying the LB policy of subchannel states, which can + // be used both for the subchannel's initial state and for subsequent + // state changes. This will allow us to handle this more generally instead + // of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any + // pending picks across all READY subchannels rather than sending them all + // to the first one). + if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + subchannel_list->subchannels[i].pending_connectivity_state_unsafe = + subchannel_list->subchannels[i].curr_connectivity_state = + subchannel_list->subchannels[i].prev_connectivity_state = + subchannel_state; + --subchannel_list->num_idle; + ++subchannel_list->num_transient_failures; + } + } if (p->latest_pending_subchannel_list != nullptr) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index fa2ffcc796..75f7ca2d12 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -54,13 +54,15 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, void grpc_lb_subchannel_data_start_connectivity_watch( grpc_lb_subchannel_data* sd) { if (sd->subchannel_list->tracer->enabled()) { - gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): requesting connectivity change notification", - sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, - sd->subchannel_list, - (size_t)(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel); + gpr_log( + GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): requesting connectivity change " + "notification (from %s)", + sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, + sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), + sd->subchannel_list->num_subchannels, sd->subchannel, + grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe)); } sd->connectivity_notification_pending = true; grpc_subchannel_notify_on_state_change( diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 3377605263..91537f3afe 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -101,8 +101,6 @@ struct grpc_lb_subchannel_list { size_t num_ready; /** how many subchannels are in state TRANSIENT_FAILURE */ size_t num_transient_failures; - /** how many subchannels are in state SHUTDOWN */ - size_t num_shutdown; /** how many subchannels are in state IDLE */ size_t num_idle; |