diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 103 |
1 files changed, 82 insertions, 21 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index ff2140e628..2b6a9ba8c5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -46,7 +46,7 @@ class PickFirst : public LoadBalancingPolicy { explicit PickFirst(const Args& args); void UpdateLocked(const grpc_channel_args& args) override; - bool PickLocked(PickState* pick) override; + bool PickLocked(PickState* pick, grpc_error** error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, @@ -56,8 +56,10 @@ class PickFirst : public LoadBalancingPolicy { grpc_connectivity_state CheckConnectivityLocked( grpc_error** connectivity_error) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; - void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; void ExitIdleLocked() override; + void ResetBackoffLocked() override; + void FillChildRefsForChannelz(ChildRefsList* child_subchannels, + ChildRefsList* ignored) override; private: ~PickFirst(); @@ -103,10 +105,23 @@ class PickFirst : public LoadBalancingPolicy { } }; + // Helper class to ensure that any function that modifies the child refs + // data structures will update the channelz snapshot data structures before + // returning. + class AutoChildRefsUpdater { + public: + explicit AutoChildRefsUpdater(PickFirst* pf) : pf_(pf) {} + ~AutoChildRefsUpdater() { pf_->UpdateChildRefsLocked(); } + + private: + PickFirst* pf_; + }; + void ShutdownLocked() override; void StartPickingLocked(); void DestroyUnselectedSubchannelsLocked(); + void UpdateChildRefsLocked(); // All our subchannels. OrphanablePtr<PickFirstSubchannelList> subchannel_list_; @@ -122,10 +137,17 @@ class PickFirst : public LoadBalancingPolicy { PickState* pending_picks_ = nullptr; // Our connectivity state tracker. grpc_connectivity_state_tracker state_tracker_; + + /// Lock and data used to capture snapshots of this channels child + /// channels and subchannels. This data is consumed by channelz. + gpr_mu child_refs_mu_; + ChildRefsList child_subchannels_; + ChildRefsList child_channels_; }; PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) { GPR_ASSERT(args.client_channel_factory != nullptr); + gpr_mu_init(&child_refs_mu_); grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "pick_first"); if (grpc_lb_pick_first_trace.enabled()) { @@ -139,6 +161,7 @@ PickFirst::~PickFirst() { if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Destroying Pick First %p", this); } + gpr_mu_destroy(&child_refs_mu_); GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); GPR_ASSERT(pending_picks_ == nullptr); @@ -150,14 +173,16 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { PickState* pick; while ((pick = pending_picks_) != nullptr) { pending_picks_ = pick->next; - if (new_policy->PickLocked(pick)) { + grpc_error* error = GRPC_ERROR_NONE; + if (new_policy->PickLocked(pick, &error)) { // Synchronous return, schedule closure. - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pick->on_complete, error); } } } void PickFirst::ShutdownLocked() { + AutoChildRefsUpdater guard(this); grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p Shutting down", this); @@ -235,13 +260,25 @@ void PickFirst::ExitIdleLocked() { } } -bool PickFirst::PickLocked(PickState* pick) { +void PickFirst::ResetBackoffLocked() { + subchannel_list_->ResetBackoffLocked(); + if (latest_pending_subchannel_list_ != nullptr) { + latest_pending_subchannel_list_->ResetBackoffLocked(); + } +} + +bool PickFirst::PickLocked(PickState* pick, grpc_error** error) { // If we have a selected subchannel already, return synchronously. if (selected_ != nullptr) { pick->connected_subchannel = selected_->connected_subchannel()->Ref(); return true; } // No subchannel selected yet, so handle asynchronously. + if (pick->on_complete == nullptr) { + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No pick result available but synchronous result required."); + return true; + } if (!started_picking_) { StartPickingLocked(); } @@ -269,18 +306,41 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current, notify); } -void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { - if (selected_ != nullptr) { - selected_->connected_subchannel()->Ping(on_initiate, on_ack); - } else { - GRPC_CLOSURE_SCHED(on_initiate, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); - GRPC_CLOSURE_SCHED(on_ack, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); +void PickFirst::FillChildRefsForChannelz( + ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) { + mu_guard guard(&child_refs_mu_); + for (size_t i = 0; i < child_subchannels_.size(); ++i) { + // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might + // have to implement lightweight set. For now, we don't care about + // performance when channelz requests are made. + bool found = false; + for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) { + if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) { + found = true; + break; + } + } + if (!found) { + child_subchannels_to_fill->push_back(child_subchannels_[i]); + } } } +void PickFirst::UpdateChildRefsLocked() { + ChildRefsList cs; + if (subchannel_list_ != nullptr) { + subchannel_list_->PopulateChildRefsList(&cs); + } + if (latest_pending_subchannel_list_ != nullptr) { + latest_pending_subchannel_list_->PopulateChildRefsList(&cs); + } + // atomically update the data that channelz will actually be looking at. + mu_guard guard(&child_refs_mu_); + child_subchannels_ = std::move(cs); +} + void PickFirst::UpdateLocked(const grpc_channel_args& args) { + AutoChildRefsUpdater guard(this); const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { if (subchannel_list_ == nullptr) { @@ -388,10 +448,12 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state, grpc_error* error) { PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); + AutoChildRefsUpdater guard(p); // The notification must be for a subchannel in either the current or // latest pending subchannel lists. GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || subchannel_list() == p->latest_pending_subchannel_list_.get()); + GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN); // Handle updates for the currently selected subchannel. if (p->selected_ == this) { if (grpc_lb_pick_first_trace.enabled()) { @@ -421,14 +483,12 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( "update"), "selected_not_ready+switch_to_update"); } else { - // TODO(juanlishen): we re-resolve when the selected subchannel goes to - // TRANSIENT_FAILURE because we used to shut down in this case before - // re-resolution is introduced. But we need to investigate whether we - // really want to take any action instead of waiting for the selected - // subchannel reconnecting. - GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN); if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - // If the selected channel goes bad, request a re-resolution. + // If the selected subchannel goes bad, request a re-resolution. We also + // set the channel state to IDLE and reset started_picking_. The reason + // is that if the new state is TRANSIENT_FAILURE due to a GOAWAY + // reception we don't want to connect to the re-resolved backends until + // we leave the IDLE state. grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, "selected_changed+reresolve"); @@ -509,9 +569,10 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // Case 1: Only set state to TRANSIENT_FAILURE if we've tried // all subchannels. if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) { + p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE); grpc_connectivity_state_set( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "connecting_transient_failure"); + GRPC_ERROR_REF(error), "exhausted_subchannels"); } sd->StartConnectivityWatchLocked(); break; |