diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc | 97 |
1 files changed, 76 insertions, 21 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 42e8e88ec9..fea84331d8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -57,7 +57,7 @@ class RoundRobin : public LoadBalancingPolicy { explicit RoundRobin(const Args& args); void UpdateLocked(const grpc_channel_args& args) override; - bool PickLocked(PickState* pick) override; + bool PickLocked(PickState* pick, grpc_error** error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, @@ -67,8 +67,10 @@ class RoundRobin : public LoadBalancingPolicy { grpc_connectivity_state CheckConnectivityLocked( grpc_error** connectivity_error) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; - void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; void ExitIdleLocked() override; + void ResetBackoffLocked() override; + void FillChildRefsForChannelz(ChildRefsList* child_subchannels, + ChildRefsList* ignored) override; private: ~RoundRobin(); @@ -180,11 +182,24 @@ class RoundRobin : public LoadBalancingPolicy { size_t last_ready_index_ = -1; // Index into list of last pick. }; + // Helper class to ensure that any function that modifies the child refs + // data structures will update the channelz snapshot data structures before + // returning. + class AutoChildRefsUpdater { + public: + explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {} + ~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); } + + private: + RoundRobin* rr_; + }; + void ShutdownLocked() override; void StartPickingLocked(); bool DoPickLocked(PickState* pick); void DrainPendingPicksLocked(); + void UpdateChildRefsLocked(); /** list of subchannels */ OrphanablePtr<RoundRobinSubchannelList> subchannel_list_; @@ -202,10 +217,16 @@ class RoundRobin : public LoadBalancingPolicy { PickState* pending_picks_ = nullptr; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker_; + /// Lock and data used to capture snapshots of this channel's child + /// channels and subchannels. This data is consumed by channelz. + gpr_mu child_refs_mu_; + ChildRefsList child_subchannels_; + ChildRefsList child_channels_; }; RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { GPR_ASSERT(args.client_channel_factory != nullptr); + gpr_mu_init(&child_refs_mu_); grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "round_robin"); UpdateLocked(*args.args); @@ -220,6 +241,7 @@ RoundRobin::~RoundRobin() { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this); } + gpr_mu_destroy(&child_refs_mu_); GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); GPR_ASSERT(pending_picks_ == nullptr); @@ -231,14 +253,16 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { PickState* pick; while ((pick = pending_picks_) != nullptr) { pending_picks_ = pick->next; - if (new_policy->PickLocked(pick)) { + grpc_error* error = GRPC_ERROR_NONE; + if (new_policy->PickLocked(pick, &error)) { // Synchronous return, schedule closure. - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pick->on_complete, error); } } } void RoundRobin::ShutdownLocked() { + AutoChildRefsUpdater guard(this); grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Shutting down", this); @@ -310,6 +334,13 @@ void RoundRobin::ExitIdleLocked() { } } +void RoundRobin::ResetBackoffLocked() { + subchannel_list_->ResetBackoffLocked(); + if (latest_pending_subchannel_list_ != nullptr) { + latest_pending_subchannel_list_->ResetBackoffLocked(); + } +} + bool RoundRobin::DoPickLocked(PickState* pick) { const size_t next_ready_index = subchannel_list_->GetNextReadySubchannelIndexLocked(); @@ -345,7 +376,7 @@ void RoundRobin::DrainPendingPicksLocked() { } } -bool RoundRobin::PickLocked(PickState* pick) { +bool RoundRobin::PickLocked(PickState* pick, grpc_error** error) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_); } @@ -353,6 +384,11 @@ bool RoundRobin::PickLocked(PickState* pick) { if (subchannel_list_ != nullptr) { if (DoPickLocked(pick)) return true; } + if (pick->on_complete == nullptr) { + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No pick result available but synchronous result required."); + return true; + } /* no pick currently available. Save for later in list of pending picks */ pick->next = pending_picks_; pending_picks_ = pick; @@ -362,6 +398,39 @@ bool RoundRobin::PickLocked(PickState* pick) { return false; } +void RoundRobin::FillChildRefsForChannelz( + ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) { + mu_guard guard(&child_refs_mu_); + for (size_t i = 0; i < child_subchannels_.size(); ++i) { + // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might + // have to implement lightweight set. For now, we don't care about + // performance when channelz requests are made. + bool found = false; + for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) { + if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) { + found = true; + break; + } + } + if (!found) { + child_subchannels_to_fill->push_back(child_subchannels_[i]); + } + } +} + +void RoundRobin::UpdateChildRefsLocked() { + ChildRefsList cs; + if (subchannel_list_ != nullptr) { + subchannel_list_->PopulateChildRefsList(&cs); + } + if (latest_pending_subchannel_list_ != nullptr) { + latest_pending_subchannel_list_->PopulateChildRefsList(&cs); + } + // atomically update the data that channelz will actually be looking at. + mu_guard guard(&child_refs_mu_); + child_subchannels_ = std::move(cs); +} + void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { if (num_subchannels() == 0) return; // Check current state of each subchannel synchronously, since any @@ -452,6 +521,7 @@ void RoundRobin::RoundRobinSubchannelList:: void RoundRobin::RoundRobinSubchannelList:: UpdateRoundRobinStateFromSubchannelStateCountsLocked() { RoundRobin* p = static_cast<RoundRobin*>(policy()); + AutoChildRefsUpdater guard(p); if (num_ready_ > 0) { if (p->subchannel_list_.get() != this) { // Promote this list to p->subchannel_list_. @@ -590,24 +660,9 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, notify); } -void RoundRobin::PingOneLocked(grpc_closure* on_initiate, - grpc_closure* on_ack) { - const size_t next_ready_index = - subchannel_list_->GetNextReadySubchannelIndexLocked(); - if (next_ready_index < subchannel_list_->num_subchannels()) { - RoundRobinSubchannelData* selected = - subchannel_list_->subchannel(next_ready_index); - selected->connected_subchannel()->Ping(on_initiate, on_ack); - } else { - GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Round Robin not connected")); - GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Round Robin not connected")); - } -} - void RoundRobin::UpdateLocked(const grpc_channel_args& args) { const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); + AutoChildRefsUpdater guard(this); if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this); // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. |