diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 155 |
1 files changed, 96 insertions, 59 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index d217dc0e63..9120abfa3c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -27,6 +27,7 @@ #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" @@ -46,7 +47,7 @@ class PickFirst : public LoadBalancingPolicy { explicit PickFirst(const Args& args); void UpdateLocked(const grpc_channel_args& args) override; - bool PickLocked(PickState* pick) override; + bool PickLocked(PickState* pick, grpc_error** error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, @@ -56,8 +57,8 @@ class PickFirst : public LoadBalancingPolicy { grpc_connectivity_state CheckConnectivityLocked( grpc_error** connectivity_error) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; - void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; void ExitIdleLocked() override; + void ResetBackoffLocked() override; void FillChildRefsForChannelz(ChildRefsList* child_subchannels, ChildRefsList* ignored) override; @@ -80,6 +81,11 @@ class PickFirst : public LoadBalancingPolicy { void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state, grpc_error* error) override; + + // Processes the connectivity change to READY for an unselected subchannel. + void ProcessUnselectedReadyLocked(); + + void CheckConnectivityStateAndStartWatchingLocked(); }; class PickFirstSubchannelList @@ -173,9 +179,10 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { PickState* pick; while ((pick = pending_picks_) != nullptr) { pending_picks_ = pick->next; - if (new_policy->PickLocked(pick)) { + grpc_error* error = GRPC_ERROR_NONE; + if (new_policy->PickLocked(pick, &error)) { // Synchronous return, schedule closure. - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pick->on_complete, error); } } } @@ -246,7 +253,8 @@ void PickFirst::StartPickingLocked() { if (subchannel_list_ != nullptr) { for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { - subchannel_list_->subchannel(i)->StartConnectivityWatchLocked(); + subchannel_list_->subchannel(i) + ->CheckConnectivityStateAndStartWatchingLocked(); break; } } @@ -259,18 +267,30 @@ void PickFirst::ExitIdleLocked() { } } -bool PickFirst::PickLocked(PickState* pick) { +void PickFirst::ResetBackoffLocked() { + subchannel_list_->ResetBackoffLocked(); + if (latest_pending_subchannel_list_ != nullptr) { + latest_pending_subchannel_list_->ResetBackoffLocked(); + } +} + +bool PickFirst::PickLocked(PickState* pick, grpc_error** error) { // If we have a selected subchannel already, return synchronously. if (selected_ != nullptr) { pick->connected_subchannel = selected_->connected_subchannel()->Ref(); return true; } // No subchannel selected yet, so handle asynchronously. - if (!started_picking_) { - StartPickingLocked(); + if (pick->on_complete == nullptr) { + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No pick result available but synchronous result required."); + return true; } pick->next = pending_picks_; pending_picks_ = pick; + if (!started_picking_) { + StartPickingLocked(); + } return false; } @@ -293,20 +313,9 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current, notify); } -void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { - if (selected_ != nullptr) { - selected_->connected_subchannel()->Ping(on_initiate, on_ack); - } else { - GRPC_CLOSURE_SCHED(on_initiate, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); - GRPC_CLOSURE_SCHED(on_ack, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); - } -} - void PickFirst::FillChildRefsForChannelz( ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) { - mu_guard guard(&child_refs_mu_); + MutexLock lock(&child_refs_mu_); for (size_t i = 0; i < child_subchannels_.size(); ++i) { // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might // have to implement lightweight set. For now, we don't care about @@ -333,7 +342,7 @@ void PickFirst::UpdateChildRefsLocked() { latest_pending_subchannel_list_->PopulateChildRefsList(&cs); } // atomically update the data that channelz will actually be looking at. - mu_guard guard(&child_refs_mu_); + MutexLock lock(&child_refs_mu_); child_subchannels_ = std::move(cs); } @@ -384,7 +393,8 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { // If we've started picking, start trying to connect to the first // subchannel in the new list. if (started_picking_) { - subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); + subchannel_list_->subchannel(0) + ->CheckConnectivityStateAndStartWatchingLocked(); } } else { // We do have a selected subchannel. @@ -438,7 +448,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { // subchannel in the new list. if (started_picking_) { latest_pending_subchannel_list_->subchannel(0) - ->StartConnectivityWatchLocked(); + ->CheckConnectivityStateAndStartWatchingLocked(); } } } @@ -517,41 +527,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // select in place of the current one. switch (connectivity_state) { case GRPC_CHANNEL_READY: { - // Case 2. Promote p->latest_pending_subchannel_list_ to - // p->subchannel_list_. - if (subchannel_list() == p->latest_pending_subchannel_list_.get()) { - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, - "Pick First %p promoting pending subchannel list %p to " - "replace %p", - p, p->latest_pending_subchannel_list_.get(), - p->subchannel_list_.get()); - } - p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); - } - // Cases 1 and 2. - grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY, - GRPC_ERROR_NONE, "connecting_ready"); - p->selected_ = this; - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, - subchannel()); - } - // Drop all other subchannels, since we are now connected. - p->DestroyUnselectedSubchannelsLocked(); - // Update any calls that were waiting for a pick. - PickState* pick; - while ((pick = p->pending_picks_)) { - p->pending_picks_ = pick->next; - pick->connected_subchannel = - p->selected_->connected_subchannel()->Ref(); - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, - "Servicing pending pick with selected subchannel %p", - p->selected_->subchannel()); - } - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); - } + ProcessUnselectedReadyLocked(); // Renew notification. RenewConnectivityWatchLocked(); break; @@ -572,7 +548,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "exhausted_subchannels"); } - sd->StartConnectivityWatchLocked(); + sd->CheckConnectivityStateAndStartWatchingLocked(); break; } case GRPC_CHANNEL_CONNECTING: @@ -593,6 +569,67 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( GRPC_ERROR_UNREF(error); } +void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() { + PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); + // If we get here, there are two possible cases: + // 1. We do not currently have a selected subchannel, and the update is + // for a subchannel in p->subchannel_list_ that we're trying to + // connect to. The goal here is to find a subchannel that we can + // select. + // 2. We do currently have a selected subchannel, and the update is + // for a subchannel in p->latest_pending_subchannel_list_. The + // goal here is to find a subchannel from the update that we can + // select in place of the current one. + GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || + subchannel_list() == p->latest_pending_subchannel_list_.get()); + // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_. + if (subchannel_list() == p->latest_pending_subchannel_list_.get()) { + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, + "Pick First %p promoting pending subchannel list %p to " + "replace %p", + p, p->latest_pending_subchannel_list_.get(), + p->subchannel_list_.get()); + } + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); + } + // Cases 1 and 2. + grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY, + GRPC_ERROR_NONE, "subchannel_ready"); + p->selected_ = this; + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel()); + } + // Drop all other subchannels, since we are now connected. + p->DestroyUnselectedSubchannelsLocked(); + // Update any calls that were waiting for a pick. + PickState* pick; + while ((pick = p->pending_picks_)) { + p->pending_picks_ = pick->next; + pick->connected_subchannel = p->selected_->connected_subchannel()->Ref(); + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", + p->selected_->subchannel()); + } + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + } +} + +void PickFirst::PickFirstSubchannelData:: + CheckConnectivityStateAndStartWatchingLocked() { + PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); + grpc_error* error = GRPC_ERROR_NONE; + if (p->selected_ != this && + CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) { + // We must process the READY subchannel before we start watching it. + // Otherwise, we won't know it's READY because we will be waiting for its + // connectivity state to change from READY. + ProcessUnselectedReadyLocked(); + } + GRPC_ERROR_UNREF(error); + StartConnectivityWatchLocked(); +} + // // factory // |