diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc | 743 |
1 files changed, 369 insertions, 374 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index e534131c02..b177385065 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -73,23 +73,127 @@ class RoundRobin : public LoadBalancingPolicy { private: ~RoundRobin(); - void ShutdownLocked() override; + // Forward declaration. + class RoundRobinSubchannelList; + + // Data for a particular subchannel in a subchannel list. + // This subclass adds the following functionality: + // - Tracks user_data associated with each address, which will be + // returned along with picks that select the subchannel. + // - Tracks the previous connectivity state of the subchannel, so that + // we know how many subchannels are in each state. + class RoundRobinSubchannelData + : public SubchannelData<RoundRobinSubchannelList, + RoundRobinSubchannelData> { + public: + RoundRobinSubchannelData(RoundRobinSubchannelList* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, + grpc_subchannel* subchannel, + grpc_combiner* combiner) + : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, + combiner), + user_data_vtable_(user_data_vtable), + user_data_(user_data_vtable_ != nullptr + ? user_data_vtable_->copy(address.user_data) + : nullptr) {} + + void UnrefSubchannelLocked(const char* reason) override { + SubchannelData::UnrefSubchannelLocked(reason); + if (user_data_ != nullptr) { + GPR_ASSERT(user_data_vtable_ != nullptr); + user_data_vtable_->destroy(user_data_); + user_data_ = nullptr; + } + } - void StartPickingLocked(); - size_t GetNextReadySubchannelIndexLocked(); - void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index); - void UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, - grpc_error* error); + void* user_data() const { return user_data_; } + + grpc_connectivity_state connectivity_state() const { + return last_connectivity_state_; + } - static void OnConnectivityChangedLocked(void* arg, grpc_error* error); + void UpdateConnectivityStateLocked( + grpc_connectivity_state connectivity_state, grpc_error* error); + + private: + void ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) override; + + const grpc_lb_user_data_vtable* user_data_vtable_; + void* user_data_ = nullptr; + grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE; + }; + + // A list of subchannels. + class RoundRobinSubchannelList + : public SubchannelList<RoundRobinSubchannelList, + RoundRobinSubchannelData> { + public: + RoundRobinSubchannelList( + RoundRobin* policy, TraceFlag* tracer, + const grpc_lb_addresses* addresses, grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + const grpc_channel_args& args) + : SubchannelList(policy, tracer, addresses, combiner, + client_channel_factory, args) { + // Need to maintain a ref to the LB policy as long as we maintain + // any references to subchannels, since the subchannels' + // pollset_sets will include the LB policy's pollset_set. + policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); + } - void SubchannelListRefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); - void SubchannelListUnrefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); + ~RoundRobinSubchannelList() { + GRPC_ERROR_UNREF(last_transient_failure_error_); + RoundRobin* p = static_cast<RoundRobin*>(policy()); + p->Unref(DEBUG_LOCATION, "subchannel_list"); + } + + // Starts watching the subchannels in this list. + void StartWatchingLocked(); + + // Updates the counters of subchannels in each state when a + // subchannel transitions from old_state to new_state. + // transient_failure_error is the error that is reported when + // new_state is TRANSIENT_FAILURE. + void UpdateStateCountersLocked(grpc_connectivity_state old_state, + grpc_connectivity_state new_state, + grpc_error* transient_failure_error); + + // If this subchannel list is the RR policy's current subchannel + // list, updates the RR policy's connectivity state based on the + // subchannel list's state counters. + void MaybeUpdateRoundRobinConnectivityStateLocked(); + + // Updates the RR policy's overall state based on the counters of + // subchannels in each state. + void UpdateRoundRobinStateFromSubchannelStateCountsLocked(); + + size_t GetNextReadySubchannelIndexLocked(); + void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index); + + private: + size_t num_ready_ = 0; + size_t num_connecting_ = 0; + size_t num_transient_failure_ = 0; + grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE; + size_t last_ready_index_ = -1; // Index into list of last pick. + }; + + void ShutdownLocked() override; + + void StartPickingLocked(); + bool DoPickLocked(PickState* pick); + void DrainPendingPicksLocked(); /** list of subchannels */ - grpc_lb_subchannel_list* subchannel_list_ = nullptr; + OrphanablePtr<RoundRobinSubchannelList> subchannel_list_; + /** Latest version of the subchannel list. + * Subchannel connectivity callbacks will only promote updated subchannel + * lists if they equal \a latest_pending_subchannel_list. In other words, + * racing callbacks that reference outdated subchannel lists won't perform any + * update. */ + OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_; /** have we started picking? */ bool started_picking_ = false; /** are we shutting down? */ @@ -98,14 +202,6 @@ class RoundRobin : public LoadBalancingPolicy { PickState* pending_picks_ = nullptr; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker_; - /** Index into subchannels for last pick. */ - size_t last_ready_subchannel_index_ = 0; - /** Latest version of the subchannel list. - * Subchannel connectivity callbacks will only promote updated subchannel - * lists if they equal \a latest_pending_subchannel_list. In other words, - * racing callbacks that reference outdated subchannel lists won't perform any - * update. */ - grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr; }; RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { @@ -114,15 +210,15 @@ RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { "round_robin"); UpdateLocked(*args.args); if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Created with %" PRIuPTR " subchannels", this, - subchannel_list_->num_subchannels); + gpr_log(GPR_INFO, "[RR %p] Created with %" PRIuPTR " subchannels", this, + subchannel_list_->num_subchannels()); } grpc_subchannel_index_ref(); } RoundRobin::~RoundRobin() { if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy", this); + gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this); } GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); @@ -131,68 +227,6 @@ RoundRobin::~RoundRobin() { grpc_subchannel_index_unref(); } -/** Returns the index into p->subchannel_list->subchannels of the next - * subchannel in READY state, or p->subchannel_list->num_subchannels if no - * subchannel is READY. - * - * Note that this function does *not* update p->last_ready_subchannel_index. - * The caller must do that if it returns a pick. */ -size_t RoundRobin::GetNextReadySubchannelIndexLocked() { - GPR_ASSERT(subchannel_list_ != nullptr); - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_INFO, - "[RR %p] getting next ready subchannel (out of %" PRIuPTR - "), " - "last_ready_subchannel_index=%" PRIuPTR, - this, subchannel_list_->num_subchannels, - last_ready_subchannel_index_); - } - for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { - const size_t index = (i + last_ready_subchannel_index_ + 1) % - subchannel_list_->num_subchannels; - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log( - GPR_DEBUG, - "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR - ": state=%s", - this, subchannel_list_->subchannels[index].subchannel, - subchannel_list_, index, - grpc_connectivity_state_name( - subchannel_list_->subchannels[index].curr_connectivity_state)); - } - if (subchannel_list_->subchannels[index].curr_connectivity_state == - GRPC_CHANNEL_READY) { - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, - "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR - " of subchannel_list %p", - this, subchannel_list_->subchannels[index].subchannel, index, - subchannel_list_); - } - return index; - } - } - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", this); - } - return subchannel_list_->num_subchannels; -} - -// Sets last_ready_subchannel_index_ to last_ready_index. -void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) { - GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels); - last_ready_subchannel_index_ = last_ready_index; - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, - "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR - " (SC %p, CSC %p)", - this, last_ready_index, - subchannel_list_->subchannels[last_ready_index].subchannel, - subchannel_list_->subchannels[last_ready_index] - .connected_subchannel.get()); - } -} - void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { PickState* pick; while ((pick = pending_picks_) != nullptr) { @@ -207,7 +241,7 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { void RoundRobin::ShutdownLocked() { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Shutting down", this); + gpr_log(GPR_INFO, "[RR %p] Shutting down", this); } shutdown_ = true; PickState* pick; @@ -218,16 +252,8 @@ void RoundRobin::ShutdownLocked() { } grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, - "sl_shutdown_rr_shutdown"); - subchannel_list_ = nullptr; - } - if (latest_pending_subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - latest_pending_subchannel_list_, "sl_shutdown_pending_rr_shutdown"); - latest_pending_subchannel_list_ = nullptr; - } + subchannel_list_.reset(); + latest_pending_subchannel_list_.reset(); TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } @@ -273,70 +299,59 @@ void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, GRPC_ERROR_UNREF(error); } -void RoundRobin::SubchannelListRefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - // TODO(roth): We currently track this ref manually. Once the new - // ClosureRef API is ready and the subchannel_list code has been - // converted to a C++ API, find a way to hold the RefCountedPtr<> - // somewhere (maybe in the subchannel_data object) instead of doing - // this manually. - auto self = Ref(DEBUG_LOCATION, reason); - self.release(); - grpc_lb_subchannel_list_ref(subchannel_list, reason); +void RoundRobin::StartPickingLocked() { + started_picking_ = true; + subchannel_list_->StartWatchingLocked(); } -void RoundRobin::SubchannelListUnrefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - Unref(DEBUG_LOCATION, reason); - grpc_lb_subchannel_list_unref(subchannel_list, reason); +void RoundRobin::ExitIdleLocked() { + if (!started_picking_) { + StartPickingLocked(); + } } -void RoundRobin::StartPickingLocked() { - started_picking_ = true; - for (size_t i = 0; i < subchannel_list_->num_subchannels; i++) { - if (subchannel_list_->subchannels[i].subchannel != nullptr) { - SubchannelListRefForConnectivityWatch(subchannel_list_, - "connectivity_watch"); - grpc_lb_subchannel_data_start_connectivity_watch( - &subchannel_list_->subchannels[i]); +bool RoundRobin::DoPickLocked(PickState* pick) { + const size_t next_ready_index = + subchannel_list_->GetNextReadySubchannelIndexLocked(); + if (next_ready_index < subchannel_list_->num_subchannels()) { + /* readily available, report right away */ + RoundRobinSubchannelData* sd = + subchannel_list_->subchannel(next_ready_index); + GPR_ASSERT(sd->connected_subchannel() != nullptr); + pick->connected_subchannel = sd->connected_subchannel()->Ref(); + if (pick->user_data != nullptr) { + *pick->user_data = sd->user_data(); } + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " + "index %" PRIuPTR ")", + this, sd->subchannel(), pick->connected_subchannel.get(), + sd->subchannel_list(), next_ready_index); + } + /* only advance the last picked pointer if the selection was used */ + subchannel_list_->UpdateLastReadySubchannelIndexLocked(next_ready_index); + return true; } + return false; } -void RoundRobin::ExitIdleLocked() { - if (!started_picking_) { - StartPickingLocked(); +void RoundRobin::DrainPendingPicksLocked() { + PickState* pick; + while ((pick = pending_picks_)) { + pending_picks_ = pick->next; + GPR_ASSERT(DoPickLocked(pick)); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } } bool RoundRobin::PickLocked(PickState* pick) { if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Trying to pick (shutdown: %d)", this, - shutdown_); + gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_); } GPR_ASSERT(!shutdown_); if (subchannel_list_ != nullptr) { - const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); - if (next_ready_index < subchannel_list_->num_subchannels) { - /* readily available, report right away */ - grpc_lb_subchannel_data* sd = - &subchannel_list_->subchannels[next_ready_index]; - pick->connected_subchannel = sd->connected_subchannel; - if (pick->user_data != nullptr) { - *pick->user_data = sd->user_data; - } - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log( - GPR_DEBUG, - "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " - "index %" PRIuPTR ")", - this, sd->subchannel, pick->connected_subchannel.get(), - sd->subchannel_list, next_ready_index); - } - /* only advance the last picked pointer if the selection was used */ - UpdateLastReadySubchannelIndexLocked(next_ready_index); - return true; - } + if (DoPickLocked(pick)) return true; } /* no pick currently available. Save for later in list of pending picks */ if (!started_picking_) { @@ -347,36 +362,62 @@ bool RoundRobin::PickLocked(PickState* pick) { return false; } -void UpdateStateCountersLocked(grpc_lb_subchannel_data* sd) { - grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; - GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN); - GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); - if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { - GPR_ASSERT(subchannel_list->num_ready > 0); - --subchannel_list->num_ready; - } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - GPR_ASSERT(subchannel_list->num_transient_failures > 0); - --subchannel_list->num_transient_failures; - } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { - GPR_ASSERT(subchannel_list->num_idle > 0); - --subchannel_list->num_idle; +void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { + if (num_subchannels() == 0) return; + // Check current state of each subchannel synchronously, since any + // subchannel already used by some other channel may have a non-IDLE + // state. + for (size_t i = 0; i < num_subchannels(); ++i) { + grpc_error* error = GRPC_ERROR_NONE; + grpc_connectivity_state state = + subchannel(i)->CheckConnectivityStateLocked(&error); + if (state != GRPC_CHANNEL_IDLE) { + subchannel(i)->UpdateConnectivityStateLocked(state, error); + } } - sd->prev_connectivity_state = sd->curr_connectivity_state; - if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { - ++subchannel_list->num_ready; - } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - ++subchannel_list->num_transient_failures; - } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { - ++subchannel_list->num_idle; + // Now set the LB policy's state based on the subchannels' states. + UpdateRoundRobinStateFromSubchannelStateCountsLocked(); + // Start connectivity watch for each subchannel. + for (size_t i = 0; i < num_subchannels(); i++) { + if (subchannel(i)->subchannel() != nullptr) { + subchannel(i)->StartConnectivityWatchLocked(); + } } } -/** Sets the policy's connectivity status based on that of the passed-in \a sd - * (the grpc_lb_subchannel_data associated with the updated subchannel) and the - * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used - * only if the policy transitions to state TRANSIENT_FAILURE. */ -void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, - grpc_error* error) { +void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked( + grpc_connectivity_state old_state, grpc_connectivity_state new_state, + grpc_error* transient_failure_error) { + GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN); + GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN); + if (old_state == GRPC_CHANNEL_READY) { + GPR_ASSERT(num_ready_ > 0); + --num_ready_; + } else if (old_state == GRPC_CHANNEL_CONNECTING) { + GPR_ASSERT(num_connecting_ > 0); + --num_connecting_; + } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(num_transient_failure_ > 0); + --num_transient_failure_; + } + if (new_state == GRPC_CHANNEL_READY) { + ++num_ready_; + } else if (new_state == GRPC_CHANNEL_CONNECTING) { + ++num_connecting_; + } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + ++num_transient_failure_; + } + GRPC_ERROR_UNREF(last_transient_failure_error_); + last_transient_failure_error_ = transient_failure_error; +} + +// Sets the RR policy's connectivity state based on the current +// subchannel list. +void RoundRobin::RoundRobinSubchannelList:: + MaybeUpdateRoundRobinConnectivityStateLocked() { + RoundRobin* p = static_cast<RoundRobin*>(policy()); + // Only set connectivity state if this is the current subchannel list. + if (p->subchannel_list_.get() != this) return; /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). * @@ -391,155 +432,151 @@ void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, * CHECK: subchannel_list->num_transient_failures == * subchannel_list->num_subchannels. */ - grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; - GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE); - if (subchannel_list->num_ready > 0) { + if (num_ready_ > 0) { /* 1) READY */ - grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_READY, + grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); - } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) { + } else if (num_connecting_ > 0) { /* 2) CONNECTING */ - grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_CONNECTING, + grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); - } else if (subchannel_list->num_transient_failures == - subchannel_list->num_subchannels) { + } else if (num_transient_failure_ == num_subchannels()) { /* 3) TRANSIENT_FAILURE */ - grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), + grpc_connectivity_state_set(&p->state_tracker_, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(last_transient_failure_error_), "rr_exhausted_subchannels"); } - GRPC_ERROR_UNREF(error); } -void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) { - grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg); - RoundRobin* p = static_cast<RoundRobin*>(sd->subchannel_list->policy); +void RoundRobin::RoundRobinSubchannelList:: + UpdateRoundRobinStateFromSubchannelStateCountsLocked() { + RoundRobin* p = static_cast<RoundRobin*>(policy()); + if (num_ready_ > 0) { + if (p->subchannel_list_.get() != this) { + // Promote this list to p->subchannel_list_. + // This list must be p->latest_pending_subchannel_list_, because + // any previous update would have been shut down already and + // therefore we would not be receiving a notification for them. + GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this); + GPR_ASSERT(!shutting_down()); + if (grpc_lb_round_robin_trace.enabled()) { + const size_t old_num_subchannels = + p->subchannel_list_ != nullptr + ? p->subchannel_list_->num_subchannels() + : 0; + gpr_log(GPR_INFO, + "[RR %p] phasing out subchannel list %p (size %" PRIuPTR + ") in favor of %p (size %" PRIuPTR ")", + p, p->subchannel_list_.get(), old_num_subchannels, this, + num_subchannels()); + } + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); + } + // Drain pending picks. + p->DrainPendingPicksLocked(); + } + // Update the RR policy's connectivity state if needed. + MaybeUpdateRoundRobinConnectivityStateLocked(); +} + +void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) { + RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy()); if (grpc_lb_round_robin_trace.enabled()) { gpr_log( - GPR_DEBUG, - "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: " - "prev_state=%s new_state=%s p->shutdown=%d " - "sd->subchannel_list->shutting_down=%d error=%s", - p, sd->subchannel, sd->subchannel_list, - grpc_connectivity_state_name(sd->prev_connectivity_state), - grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), - p->shutdown_, sd->subchannel_list->shutting_down, - grpc_error_string(error)); - } - GPR_ASSERT(sd->subchannel != nullptr); - // If the policy is shutting down, unref and return. - if (p->shutdown_) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown"); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "rr_shutdown"); - return; + GPR_INFO, + "[RR %p] connectivity changed for subchannel %p, subchannel_list %p " + "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", + p, subchannel(), subchannel_list(), Index(), + subchannel_list()->num_subchannels(), + grpc_connectivity_state_name(last_connectivity_state_), + grpc_connectivity_state_name(connectivity_state)); + } + subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_, + connectivity_state, error); + last_connectivity_state_ = connectivity_state; +} + +void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) { + RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy()); + GPR_ASSERT(subchannel() != nullptr); + // If the new state is TRANSIENT_FAILURE, re-resolve. + // Only do this if we've started watching, not at startup time. + // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE + // when the subchannel list was created, we'd wind up in a constant + // loop of re-resolution. + if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " + "Requesting re-resolution", + p, subchannel()); + } + p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); } - // If the subchannel list is shutting down, stop watching. - if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown"); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "rr_sl_shutdown"); - return; + // Update state counters. + UpdateConnectivityStateLocked(connectivity_state, error); + // Update overall state and renew notification. + subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked(); + RenewConnectivityWatchLocked(); +} + +/** Returns the index into p->subchannel_list->subchannels of the next + * subchannel in READY state, or p->subchannel_list->num_subchannels if no + * subchannel is READY. + * + * Note that this function does *not* update p->last_ready_subchannel_index. + * The caller must do that if it returns a pick. */ +size_t +RoundRobin::RoundRobinSubchannelList::GetNextReadySubchannelIndexLocked() { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] getting next ready subchannel (out of %" PRIuPTR + "), last_ready_index=%" PRIuPTR, + policy(), num_subchannels(), last_ready_index_); } - // If we're still here, the notification must be for a subchannel in - // either the current or latest pending subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ || - sd->subchannel_list == p->latest_pending_subchannel_list_); - GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN); - // Now that we're inside the combiner, copy the pending connectivity - // state (which was set by the connectivity state watcher) to - // curr_connectivity_state, which is what we use inside of the combiner. - sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* - // subchannel, if any. - switch (sd->curr_connectivity_state) { - case GRPC_CHANNEL_TRANSIENT_FAILURE: { - sd->connected_subchannel.reset(); - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, - "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " - "Requesting re-resolution", - p, sd->subchannel); - } - p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); - break; + for (size_t i = 0; i < num_subchannels(); ++i) { + const size_t index = (i + last_ready_index_ + 1) % num_subchannels(); + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log( + GPR_INFO, + "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR + ": state=%s", + policy(), subchannel(index)->subchannel(), this, index, + grpc_connectivity_state_name( + subchannel(index)->connectivity_state())); } - case GRPC_CHANNEL_READY: { - if (sd->connected_subchannel == nullptr) { - sd->connected_subchannel = - grpc_subchannel_get_connected_subchannel(sd->subchannel); - } - if (sd->subchannel_list != p->subchannel_list_) { - // promote sd->subchannel_list to p->subchannel_list_. - // sd->subchannel_list must be equal to - // p->latest_pending_subchannel_list_ because we have already filtered - // for sds belonging to outdated subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list_); - GPR_ASSERT(!sd->subchannel_list->shutting_down); - if (grpc_lb_round_robin_trace.enabled()) { - const size_t num_subchannels = - p->subchannel_list_ != nullptr - ? p->subchannel_list_->num_subchannels - : 0; - gpr_log(GPR_DEBUG, - "[RR %p] phasing out subchannel list %p (size %" PRIuPTR - ") in favor of %p (size %" PRIuPTR ")", - p, p->subchannel_list_, num_subchannels, sd->subchannel_list, - num_subchannels); - } - if (p->subchannel_list_ != nullptr) { - // dispose of the current subchannel_list - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_, - "sl_phase_out_shutdown"); - } - p->subchannel_list_ = p->latest_pending_subchannel_list_; - p->latest_pending_subchannel_list_ = nullptr; - } - /* at this point we know there's at least one suitable subchannel. Go - * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemptively replicates rr_pick()'s actions. */ - const size_t next_ready_index = p->GetNextReadySubchannelIndexLocked(); - GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels); - grpc_lb_subchannel_data* selected = - &p->subchannel_list_->subchannels[next_ready_index]; - if (p->pending_picks_ != nullptr) { - // if the selected subchannel is going to be used for the pending - // picks, update the last picked pointer - p->UpdateLastReadySubchannelIndexLocked(next_ready_index); - } - PickState* pick; - while ((pick = p->pending_picks_)) { - p->pending_picks_ = pick->next; - pick->connected_subchannel = selected->connected_subchannel; - if (pick->user_data != nullptr) { - *pick->user_data = selected->user_data; - } - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, - "[RR %p] Fulfilling pending pick. Target <-- subchannel %p " - "(subchannel_list %p, index %" PRIuPTR ")", - p, selected->subchannel, p->subchannel_list_, - next_ready_index); - } - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + if (subchannel(index)->connectivity_state() == GRPC_CHANNEL_READY) { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR + " of subchannel_list %p", + policy(), subchannel(index)->subchannel(), index, this); } - break; + return index; } - case GRPC_CHANNEL_SHUTDOWN: - GPR_UNREACHABLE_CODE(return ); - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE:; // fallthrough } - // Update state counters. - UpdateStateCountersLocked(sd); - // Only update connectivity based on the selected subchannel list. - if (sd->subchannel_list == p->subchannel_list_) { - p->UpdateConnectivityStatusLocked(sd, GRPC_ERROR_REF(error)); + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, "[RR %p] no subchannels in ready state", this); + } + return num_subchannels(); +} + +// Sets last_ready_index_ to last_ready_index. +void RoundRobin::RoundRobinSubchannelList::UpdateLastReadySubchannelIndexLocked( + size_t last_ready_index) { + GPR_ASSERT(last_ready_index < num_subchannels()); + last_ready_index_ = last_ready_index; + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR + " (SC %p, CSC %p)", + policy(), last_ready_index, + subchannel(last_ready_index)->subchannel(), + subchannel(last_ready_index)->connected_subchannel()); } - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); } grpc_connectivity_state RoundRobin::CheckConnectivityLocked( @@ -555,11 +592,12 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, void RoundRobin::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { - const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); - if (next_ready_index < subchannel_list_->num_subchannels) { - grpc_lb_subchannel_data* selected = - &subchannel_list_->subchannels[next_ready_index]; - selected->connected_subchannel->Ping(on_initiate, on_ack); + const size_t next_ready_index = + subchannel_list_->GetNextReadySubchannelIndexLocked(); + if (next_ready_index < subchannel_list_->num_subchannels()) { + RoundRobinSubchannelData* selected = + subchannel_list_->subchannel(next_ready_index); + selected->connected_subchannel()->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); @@ -570,7 +608,7 @@ void RoundRobin::PingOneLocked(grpc_closure* on_initiate, void RoundRobin::UpdateLocked(const grpc_channel_args& args) { const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { + if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this); // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. // Otherwise, keep using the current subchannel list (ignore this update). @@ -582,80 +620,37 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) { } return; } - grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p; + grpc_lb_addresses* addresses = + static_cast<grpc_lb_addresses*>(arg->value.pointer.p); if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", + gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", this, addresses->num_addresses); } - grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( - this, &grpc_lb_round_robin_trace, addresses, combiner(), - client_channel_factory(), args, &RoundRobin::OnConnectivityChangedLocked); - if (subchannel_list->num_subchannels == 0) { - grpc_connectivity_state_set( - &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), - "rr_update_empty"); - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, - "sl_shutdown_empty_update"); + // Replace latest_pending_subchannel_list_. + if (latest_pending_subchannel_list_ != nullptr) { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] Shutting down previous pending subchannel list %p", this, + latest_pending_subchannel_list_.get()); } - subchannel_list_ = subchannel_list; // empty list - return; } - if (started_picking_) { - for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { - const grpc_connectivity_state subchannel_state = - grpc_subchannel_check_connectivity( - subchannel_list->subchannels[i].subchannel, nullptr); - // Override the default setting of IDLE for connectivity notification - // purposes if the subchannel is already in transient failure. Otherwise - // we'd be immediately notified of the IDLE-TRANSIENT_FAILURE - // discrepancy, attempt to re-resolve and end up here again. - // TODO(roth): As part of C++-ifying the subchannel_list API, design a - // better API for notifying the LB policy of subchannel states, which can - // be used both for the subchannel's initial state and for subsequent - // state changes. This will allow us to handle this more generally instead - // of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any - // pending picks across all READY subchannels rather than sending them all - // to the first one). - if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - subchannel_list->subchannels[i].pending_connectivity_state_unsafe = - subchannel_list->subchannels[i].curr_connectivity_state = - subchannel_list->subchannels[i].prev_connectivity_state = - subchannel_state; - --subchannel_list->num_idle; - ++subchannel_list->num_transient_failures; - } - } - if (latest_pending_subchannel_list_ != nullptr) { - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, - "[RR %p] Shutting down latest pending subchannel list %p, " - "about to be replaced by newer latest %p", - this, latest_pending_subchannel_list_, subchannel_list); - } - grpc_lb_subchannel_list_shutdown_and_unref( - latest_pending_subchannel_list_, "sl_outdated"); - } - latest_pending_subchannel_list_ = subchannel_list; - for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { - /* Watch every new subchannel. A subchannel list becomes active the - * moment one of its subchannels is READY. At that moment, we swap - * p->subchannel_list for sd->subchannel_list, provided the subchannel - * list is still valid (ie, isn't shutting down) */ - SubchannelListRefForConnectivityWatch(subchannel_list, - "connectivity_watch"); - grpc_lb_subchannel_data_start_connectivity_watch( - &subchannel_list->subchannels[i]); + latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>( + this, &grpc_lb_round_robin_trace, addresses, combiner(), + client_channel_factory(), args); + // If we haven't started picking yet or the new list is empty, + // immediately promote the new list to the current list. + if (!started_picking_ || + latest_pending_subchannel_list_->num_subchannels() == 0) { + if (latest_pending_subchannel_list_->num_subchannels() == 0) { + grpc_connectivity_state_set( + &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), + "rr_update_empty"); } + subchannel_list_ = std::move(latest_pending_subchannel_list_); } else { - // The policy isn't picking yet. Save the update for later, disposing of - // previous version if any. - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - subchannel_list_, "rr_update_before_started_picking"); - } - subchannel_list_ = subchannel_list; + // If we've started picking, start watching the new list. + latest_pending_subchannel_list_->StartWatchingLocked(); } } |