diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 367 |
1 files changed, 169 insertions, 198 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 9090c34412..ff2140e628 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -62,31 +62,65 @@ class PickFirst : public LoadBalancingPolicy { private: ~PickFirst(); + class PickFirstSubchannelList; + + class PickFirstSubchannelData + : public SubchannelData<PickFirstSubchannelList, + PickFirstSubchannelData> { + public: + PickFirstSubchannelData(PickFirstSubchannelList* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, + grpc_subchannel* subchannel, + grpc_combiner* combiner) + : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, + combiner) {} + + void ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) override; + }; + + class PickFirstSubchannelList + : public SubchannelList<PickFirstSubchannelList, + PickFirstSubchannelData> { + public: + PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer, + const grpc_lb_addresses* addresses, + grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + const grpc_channel_args& args) + : SubchannelList(policy, tracer, addresses, combiner, + client_channel_factory, args) { + // Need to maintain a ref to the LB policy as long as we maintain + // any references to subchannels, since the subchannels' + // pollset_sets will include the LB policy's pollset_set. + policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); + } + + ~PickFirstSubchannelList() { + PickFirst* p = static_cast<PickFirst*>(policy()); + p->Unref(DEBUG_LOCATION, "subchannel_list"); + } + }; + void ShutdownLocked() override; void StartPickingLocked(); void DestroyUnselectedSubchannelsLocked(); - static void OnConnectivityChangedLocked(void* arg, grpc_error* error); - - void SubchannelListRefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); - void SubchannelListUnrefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); - - /** all our subchannels */ - grpc_lb_subchannel_list* subchannel_list_ = nullptr; - /** latest pending subchannel list */ - grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr; - /** selected subchannel in \a subchannel_list */ - grpc_lb_subchannel_data* selected_ = nullptr; - /** have we started picking? */ + // All our subchannels. + OrphanablePtr<PickFirstSubchannelList> subchannel_list_; + // Latest pending subchannel list. + OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_; + // Selected subchannel in \a subchannel_list_. + PickFirstSubchannelData* selected_ = nullptr; + // Have we started picking? bool started_picking_ = false; - /** are we shut down? */ + // Are we shut down? bool shutdown_ = false; - /** list of picks that are waiting on connectivity */ + // List of picks that are waiting on connectivity. PickState* pending_picks_ = nullptr; - /** our connectivity state tracker */ + // Our connectivity state tracker. grpc_connectivity_state_tracker state_tracker_; }; @@ -95,7 +129,7 @@ PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) { grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "pick_first"); if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_DEBUG, "Pick First %p created.", this); + gpr_log(GPR_INFO, "Pick First %p created.", this); } UpdateLocked(*args.args); grpc_subchannel_index_ref(); @@ -103,7 +137,7 @@ PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) { PickFirst::~PickFirst() { if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_DEBUG, "Destroying Pick First %p", this); + gpr_log(GPR_INFO, "Destroying Pick First %p", this); } GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); @@ -126,7 +160,7 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { void PickFirst::ShutdownLocked() { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_DEBUG, "Pick First %p Shutting down", this); + gpr_log(GPR_INFO, "Pick First %p Shutting down", this); } shutdown_ = true; PickState* pick; @@ -137,15 +171,8 @@ void PickFirst::ShutdownLocked() { } grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "shutdown"); - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "pf_shutdown"); - subchannel_list_ = nullptr; - } - if (latest_pending_subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(latest_pending_subchannel_list_, - "pf_shutdown"); - latest_pending_subchannel_list_ = nullptr; - } + subchannel_list_.reset(); + latest_pending_subchannel_list_.reset(); TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } @@ -192,14 +219,10 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, void PickFirst::StartPickingLocked() { started_picking_ = true; - if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels > 0) { - subchannel_list_->checking_subchannel = 0; - for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { - if (subchannel_list_->subchannels[i].subchannel != nullptr) { - SubchannelListRefForConnectivityWatch( - subchannel_list_, "connectivity_watch+start_picking"); - grpc_lb_subchannel_data_start_connectivity_watch( - &subchannel_list_->subchannels[i]); + if (subchannel_list_ != nullptr) { + for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { + if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { + subchannel_list_->subchannel(i)->StartConnectivityWatchLocked(); break; } } @@ -215,7 +238,7 @@ void PickFirst::ExitIdleLocked() { bool PickFirst::PickLocked(PickState* pick) { // If we have a selected subchannel already, return synchronously. if (selected_ != nullptr) { - pick->connected_subchannel = selected_->connected_subchannel; + pick->connected_subchannel = selected_->connected_subchannel()->Ref(); return true; } // No subchannel selected yet, so handle asynchronously. @@ -228,11 +251,10 @@ bool PickFirst::PickLocked(PickState* pick) { } void PickFirst::DestroyUnselectedSubchannelsLocked() { - for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { - grpc_lb_subchannel_data* sd = &subchannel_list_->subchannels[i]; + for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { + PickFirstSubchannelData* sd = subchannel_list_->subchannel(i); if (selected_ != sd) { - grpc_lb_subchannel_data_unref_subchannel(sd, - "selected_different_subchannel"); + sd->UnrefSubchannelLocked("selected_different_subchannel"); } } } @@ -249,7 +271,7 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current, void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { if (selected_ != nullptr) { - selected_->connected_subchannel->Ping(on_initiate, on_ack); + selected_->connected_subchannel()->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); @@ -258,24 +280,6 @@ void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { } } -void PickFirst::SubchannelListRefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - // TODO(roth): We currently track this ref manually. Once the new - // ClosureRef API is ready and the subchannel_list code has been - // converted to a C++ API, find a way to hold the RefCountedPtr<> - // somewhere (maybe in the subchannel_data object) instead of doing - // this manually. - auto self = Ref(DEBUG_LOCATION, reason); - self.release(); - grpc_lb_subchannel_list_ref(subchannel_list, reason); -} - -void PickFirst::SubchannelListUnrefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - Unref(DEBUG_LOCATION, reason); - grpc_lb_subchannel_list_unref(subchannel_list, reason); -} - void PickFirst::UpdateLocked(const grpc_channel_args& args) { const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { @@ -295,75 +299,67 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { return; } const grpc_lb_addresses* addresses = - (const grpc_lb_addresses*)arg->value.pointer.p; + static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p received update with %" PRIuPTR " addresses", this, addresses->num_addresses); } - grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( + auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>( this, &grpc_lb_pick_first_trace, addresses, combiner(), - client_channel_factory(), args, &PickFirst::OnConnectivityChangedLocked); - if (subchannel_list->num_subchannels == 0) { + client_channel_factory(), args); + if (subchannel_list->num_subchannels() == 0) { // Empty update or no valid subchannels. Unsubscribe from all current // subchannels and put the channel in TRANSIENT_FAILURE. grpc_connectivity_state_set( &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "pf_update_empty"); - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, - "sl_shutdown_empty_update"); - } - subchannel_list_ = subchannel_list; // Empty list. + subchannel_list_ = std::move(subchannel_list); // Empty list. selected_ = nullptr; return; } if (selected_ == nullptr) { // We don't yet have a selected subchannel, so replace the current // subchannel list immediately. - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, - "pf_update_before_selected"); + subchannel_list_ = std::move(subchannel_list); + // If we've started picking, start trying to connect to the first + // subchannel in the new list. + if (started_picking_) { + subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); } - subchannel_list_ = subchannel_list; } else { // We do have a selected subchannel. // Check if it's present in the new list. If so, we're done. - for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { - grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; - if (sd->subchannel == selected_->subchannel) { + for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { + PickFirstSubchannelData* sd = subchannel_list->subchannel(i); + if (sd->subchannel() == selected_->subchannel()) { // The currently selected subchannel is in the update: we are done. if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p found already selected subchannel %p " "at update index %" PRIuPTR " of %" PRIuPTR "; update done", - this, selected_->subchannel, i, - subchannel_list->num_subchannels); - } - if (selected_->connected_subchannel != nullptr) { - sd->connected_subchannel = selected_->connected_subchannel; + this, selected_->subchannel(), i, + subchannel_list->num_subchannels()); } - selected_ = sd; - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - subchannel_list_, "pf_update_includes_selected"); + // Make sure it's in state READY. It might not be if we grabbed + // the combiner while a connectivity state notification + // informing us otherwise is pending. + // Note that CheckConnectivityStateLocked() also takes a ref to + // the connected subchannel. + grpc_error* error = GRPC_ERROR_NONE; + if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) { + selected_ = sd; + subchannel_list_ = std::move(subchannel_list); + DestroyUnselectedSubchannelsLocked(); + sd->StartConnectivityWatchLocked(); + // If there was a previously pending update (which may or may + // not have contained the currently selected subchannel), drop + // it, so that it doesn't override what we've done here. + latest_pending_subchannel_list_.reset(); + return; } - subchannel_list_ = subchannel_list; - DestroyUnselectedSubchannelsLocked(); - SubchannelListRefForConnectivityWatch( - subchannel_list, "connectivity_watch+replace_selected"); - grpc_lb_subchannel_data_start_connectivity_watch(sd); - // If there was a previously pending update (which may or may - // not have contained the currently selected subchannel), drop - // it, so that it doesn't override what we've done here. - if (latest_pending_subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - latest_pending_subchannel_list_, - "pf_update_includes_selected+outdated"); - latest_pending_subchannel_list_ = nullptr; - } - return; + GRPC_ERROR_UNREF(error); } } // Not keeping the previous selected subchannel, so set the latest @@ -372,88 +368,66 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { // subchannel list. if (latest_pending_subchannel_list_ != nullptr) { if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "Pick First %p Shutting down latest pending subchannel list " "%p, about to be replaced by newer latest %p", - this, latest_pending_subchannel_list_, subchannel_list); + this, latest_pending_subchannel_list_.get(), + subchannel_list.get()); } - grpc_lb_subchannel_list_shutdown_and_unref( - latest_pending_subchannel_list_, "sl_outdated_dont_smash"); } - latest_pending_subchannel_list_ = subchannel_list; - } - // If we've started picking, start trying to connect to the first - // subchannel in the new list. - if (started_picking_) { - SubchannelListRefForConnectivityWatch(subchannel_list, - "connectivity_watch+update"); - grpc_lb_subchannel_data_start_connectivity_watch( - &subchannel_list->subchannels[0]); + latest_pending_subchannel_list_ = std::move(subchannel_list); + // If we've started picking, start trying to connect to the first + // subchannel in the new list. + if (started_picking_) { + latest_pending_subchannel_list_->subchannel(0) + ->StartConnectivityWatchLocked(); + } } } -void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { - grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg); - PickFirst* p = static_cast<PickFirst*>(sd->subchannel_list->policy); - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_DEBUG, - "Pick First %p connectivity changed for subchannel %p (%" PRIuPTR - " of %" PRIuPTR - "), subchannel_list %p: state=%s p->shutdown_=%d " - "sd->subchannel_list->shutting_down=%d error=%s", - p, sd->subchannel, sd->subchannel_list->checking_subchannel, - sd->subchannel_list->num_subchannels, sd->subchannel_list, - grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), - p->shutdown_, sd->subchannel_list->shutting_down, - grpc_error_string(error)); - } - // If the policy is shutting down, unref and return. - if (p->shutdown_) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_shutdown"); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "pf_shutdown"); - return; - } - // If the subchannel list is shutting down, stop watching. - if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_sl_shutdown"); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "pf_sl_shutdown"); - return; - } - // If we're still here, the notification must be for a subchannel in - // either the current or latest pending subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ || - sd->subchannel_list == p->latest_pending_subchannel_list_); - // Update state. - sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; +void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) { + PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); + // The notification must be for a subchannel in either the current or + // latest pending subchannel lists. + GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || + subchannel_list() == p->latest_pending_subchannel_list_.get()); // Handle updates for the currently selected subchannel. - if (p->selected_ == sd) { + if (p->selected_ == this) { + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, + "Pick First %p connectivity changed for selected subchannel", p); + } // If the new state is anything other than READY and there is a // pending update, switch to the pending update. - if (sd->curr_connectivity_state != GRPC_CHANNEL_READY && + if (connectivity_state != GRPC_CHANNEL_READY && p->latest_pending_subchannel_list_ != nullptr) { + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, + "Pick First %p promoting pending subchannel list %p to " + "replace %p", + p, p->latest_pending_subchannel_list_.get(), + p->subchannel_list_.get()); + } p->selected_ = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - p->SubchannelListUnrefForConnectivityWatch( - sd->subchannel_list, "selected_not_ready+switch_to_update"); - grpc_lb_subchannel_list_shutdown_and_unref( - p->subchannel_list_, "selected_not_ready+switch_to_update"); - p->subchannel_list_ = p->latest_pending_subchannel_list_; - p->latest_pending_subchannel_list_ = nullptr; + StopConnectivityWatchLocked(); + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); grpc_connectivity_state_set( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); + error != GRPC_ERROR_NONE + ? GRPC_ERROR_REF(error) + : GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "selected subchannel not ready; switching to pending " + "update"), + "selected_not_ready+switch_to_update"); } else { // TODO(juanlishen): we re-resolve when the selected subchannel goes to // TRANSIENT_FAILURE because we used to shut down in this case before // re-resolution is introduced. But we need to investigate whether we // really want to take any action instead of waiting for the selected // subchannel reconnecting. - GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN); + if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected channel goes bad, request a re-resolution. grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, @@ -462,19 +436,16 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE); // In transient failure. Rely on re-resolution to recover. p->selected_ = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "pf_selected_shutdown"); - grpc_lb_subchannel_data_unref_subchannel( - sd, "pf_selected_shutdown"); // Unrefs connected subchannel + UnrefSubchannelLocked("pf_selected_shutdown"); + StopConnectivityWatchLocked(); } else { - grpc_connectivity_state_set(&p->state_tracker_, - sd->curr_connectivity_state, + grpc_connectivity_state_set(&p->state_tracker_, connectivity_state, GRPC_ERROR_REF(error), "selected_changed"); // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + RenewConnectivityWatchLocked(); } } + GRPC_ERROR_UNREF(error); return; } // If we get here, there are two possible cases: @@ -486,26 +457,27 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { // for a subchannel in p->latest_pending_subchannel_list_. The // goal here is to find a subchannel from the update that we can // select in place of the current one. - switch (sd->curr_connectivity_state) { + switch (connectivity_state) { case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list_ to // p->subchannel_list_. - sd->connected_subchannel = - grpc_subchannel_get_connected_subchannel(sd->subchannel); - if (sd->subchannel_list == p->latest_pending_subchannel_list_) { - GPR_ASSERT(p->subchannel_list_ != nullptr); - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_, - "finish_update"); - p->subchannel_list_ = p->latest_pending_subchannel_list_; - p->latest_pending_subchannel_list_ = nullptr; + if (subchannel_list() == p->latest_pending_subchannel_list_.get()) { + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, + "Pick First %p promoting pending subchannel list %p to " + "replace %p", + p, p->latest_pending_subchannel_list_.get(), + p->subchannel_list_.get()); + } + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); } // Cases 1 and 2. grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); - p->selected_ = sd; + p->selected_ = this; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, - sd->subchannel); + subchannel()); } // Drop all other subchannels, since we are now connected. p->DestroyUnselectedSubchannelsLocked(); @@ -513,54 +485,53 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { PickState* pick; while ((pick = p->pending_picks_)) { p->pending_picks_ = pick->next; - pick->connected_subchannel = p->selected_->connected_subchannel; + pick->connected_subchannel = + p->selected_->connected_subchannel()->Ref(); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", - p->selected_); + p->selected_->subchannel()); } GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + RenewConnectivityWatchLocked(); break; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); + StopConnectivityWatchLocked(); + PickFirstSubchannelData* sd = this; do { - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; - } while (sd->subchannel == nullptr); + size_t next_index = + (sd->Index() + 1) % subchannel_list()->num_subchannels(); + sd = subchannel_list()->subchannel(next_index); + } while (sd->subchannel() == nullptr); // Case 1: Only set state to TRANSIENT_FAILURE if we've tried // all subchannels. - if (sd->subchannel_list->checking_subchannel == 0 && - sd->subchannel_list == p->subchannel_list_) { + if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) { grpc_connectivity_state_set( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + sd->StartConnectivityWatchLocked(); break; } case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: { // Only update connectivity state in case 1. - if (sd->subchannel_list == p->subchannel_list_) { + if (subchannel_list() == p->subchannel_list_.get()) { grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_REF(error), "connecting_changed"); } // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + RenewConnectivityWatchLocked(); break; } case GRPC_CHANNEL_SHUTDOWN: GPR_UNREACHABLE_CODE(break); } + GRPC_ERROR_UNREF(error); } // |