diff options
author | 2018-04-26 13:58:39 -0700 | |
---|---|---|
committer | 2018-04-26 13:58:39 -0700 | |
commit | 757cd4105504b5ba8b23acc199907cf27c5233a5 (patch) | |
tree | 6cdf66eda25b83df475869af4e73e19ee806c9c0 | |
parent | 88832144f44d73ff797d0028ab54b2973c217b9f (diff) |
Make SubchannelList internally ref counted.
3 files changed, 44 insertions, 73 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index be00e871b8..6506dc99d6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -98,9 +98,9 @@ class PickFirst : public LoadBalancingPolicy { void DestroyUnselectedSubchannelsLocked(); // All our subchannels. - RefCountedPtr<PickFirstSubchannelList> subchannel_list_; + OrphanablePtr<PickFirstSubchannelList> subchannel_list_; // Latest pending subchannel list. - RefCountedPtr<PickFirstSubchannelList> latest_pending_subchannel_list_; + OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_; // Selected subchannel in \a subchannel_list_. PickFirstSubchannelData* selected_ = nullptr; // Have we started picking? @@ -160,14 +160,8 @@ void PickFirst::ShutdownLocked() { } grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "shutdown"); - if (subchannel_list_ != nullptr) { - subchannel_list_->ShutdownLocked("pf_shutdown"); - subchannel_list_.reset(); - } - if (latest_pending_subchannel_list_ != nullptr) { - latest_pending_subchannel_list_->ShutdownLocked("pf_shutdown"); - latest_pending_subchannel_list_.reset(); - } + subchannel_list_.reset(); + latest_pending_subchannel_list_.reset(); TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } @@ -300,7 +294,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { "Pick First %p received update with %" PRIuPTR " addresses", this, addresses->num_addresses); } - auto subchannel_list = MakeRefCounted<PickFirstSubchannelList>( + auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>( this, &grpc_lb_pick_first_trace, addresses, combiner(), client_channel_factory(), args); if (subchannel_list->num_subchannels() == 0) { @@ -310,9 +304,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "pf_update_empty"); - if (subchannel_list_ != nullptr) { - subchannel_list_->ShutdownLocked("sl_shutdown_empty_update"); - } subchannel_list_ = std::move(subchannel_list); // Empty list. selected_ = nullptr; return; @@ -320,9 +311,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { if (selected_ == nullptr) { // We don't yet have a selected subchannel, so replace the current // subchannel list immediately. - if (subchannel_list_ != nullptr) { - subchannel_list_->ShutdownLocked("pf_update_before_selected"); - } subchannel_list_ = std::move(subchannel_list); // If we've started picking, start trying to connect to the first // subchannel in the new list. @@ -347,20 +335,13 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { sd->SetConnectedSubchannelFromLocked(selected_); } selected_ = sd; - if (subchannel_list_ != nullptr) { - subchannel_list_->ShutdownLocked("pf_update_includes_selected"); - } subchannel_list_ = std::move(subchannel_list); DestroyUnselectedSubchannelsLocked(); sd->StartOrRenewConnectivityWatchLocked(); // If there was a previously pending update (which may or may // not have contained the currently selected subchannel), drop // it, so that it doesn't override what we've done here. - if (latest_pending_subchannel_list_ != nullptr) { - latest_pending_subchannel_list_->ShutdownLocked( - "pf_update_includes_selected+outdated"); - latest_pending_subchannel_list_.reset(); - } + latest_pending_subchannel_list_.reset(); return; } } @@ -376,7 +357,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { this, latest_pending_subchannel_list_.get(), subchannel_list.get()); } - latest_pending_subchannel_list_->ShutdownLocked("sl_outdated_dont_smash"); } latest_pending_subchannel_list_ = std::move(subchannel_list); // If we've started picking, start trying to connect to the first @@ -404,8 +384,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( } // The notification must be for a subchannel in either the current or // latest pending subchannel lists. - GPR_ASSERT(p->subchannel_list_ == subchannel_list() || - p->latest_pending_subchannel_list_ == subchannel_list()); + GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || + subchannel_list() == p->latest_pending_subchannel_list_.get()); // Handle updates for the currently selected subchannel. if (p->selected_ == this) { // If the new state is anything other than READY and there is a @@ -414,7 +394,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( p->latest_pending_subchannel_list_ != nullptr) { p->selected_ = nullptr; StopConnectivityWatchLocked(); - subchannel_list()->ShutdownLocked("selected_not_ready+switch_to_update"); p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); grpc_connectivity_state_set( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, @@ -460,9 +439,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list_ to // p->subchannel_list_. - if (p->latest_pending_subchannel_list_ == subchannel_list()) { + if (subchannel_list() == p->latest_pending_subchannel_list_.get()) { GPR_ASSERT(p->subchannel_list_ != nullptr); - p->subchannel_list_->ShutdownLocked("finish_update"); p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); } // Cases 1 and 2. @@ -502,7 +480,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( } while (sd->subchannel() == nullptr); // Case 1: Only set state to TRANSIENT_FAILURE if we've tried // all subchannels. - if (sd->Index() == 0 && p->subchannel_list_ == subchannel_list()) { + if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) { grpc_connectivity_state_set( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); @@ -513,7 +491,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: { // Only update connectivity state in case 1. - if (p->subchannel_list_ == subchannel_list()) { + if (subchannel_list() == p->subchannel_list_.get()) { grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_REF(error), "connecting_changed"); diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 2b2c0e5132..a4bf3e7398 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -174,13 +174,13 @@ class RoundRobin : public LoadBalancingPolicy { void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index); /** list of subchannels */ - RefCountedPtr<RoundRobinSubchannelList> subchannel_list_; + OrphanablePtr<RoundRobinSubchannelList> subchannel_list_; /** Latest version of the subchannel list. * Subchannel connectivity callbacks will only promote updated subchannel * lists if they equal \a latest_pending_subchannel_list. In other words, * racing callbacks that reference outdated subchannel lists won't perform any * update. */ - RefCountedPtr<RoundRobinSubchannelList> latest_pending_subchannel_list_; + OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_; /** have we started picking? */ bool started_picking_ = false; /** are we shutting down? */ @@ -303,14 +303,8 @@ void RoundRobin::ShutdownLocked() { } grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); - if (subchannel_list_ != nullptr) { - subchannel_list_->ShutdownLocked("rr_shutdown"); - subchannel_list_.reset(); - } - if (latest_pending_subchannel_list_ != nullptr) { - latest_pending_subchannel_list_->ShutdownLocked("rr_shutdown"); - latest_pending_subchannel_list_.reset(); - } + subchannel_list_.reset(); + latest_pending_subchannel_list_.reset(); TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } @@ -487,7 +481,7 @@ void RoundRobin::RoundRobinSubchannelList:: MaybeUpdateRoundRobinConnectivityStateLocked() { RoundRobin* p = static_cast<RoundRobin*>(policy()); // Only set connectivity state if this is the current subchannel list. - if (p->subchannel_list_ != this) return; + if (p->subchannel_list_.get() != this) return; /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). * @@ -523,12 +517,12 @@ void RoundRobin::RoundRobinSubchannelList:: UpdateRoundRobinStateFromSubchannelStateCountsLocked() { RoundRobin* p = static_cast<RoundRobin*>(policy()); if (num_ready_ > 0) { - if (p->subchannel_list_ != this) { + if (p->subchannel_list_.get() != this) { // Promote this list to p->subchannel_list_. // This list must be p->latest_pending_subchannel_list_, because // any previous update would have been shut down already and // therefore weeded out in ProcessConnectivityChangeLocked(). - GPR_ASSERT(p->latest_pending_subchannel_list_ == this); + GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this); GPR_ASSERT(!shutting_down()); if (grpc_lb_round_robin_trace.enabled()) { const size_t old_num_subchannels = @@ -541,10 +535,6 @@ void RoundRobin::RoundRobinSubchannelList:: p, p->subchannel_list_.get(), old_num_subchannels, this, num_subchannels()); } - if (p->subchannel_list_ != nullptr) { - // Dispose of the current subchannel_list. - p->subchannel_list_->ShutdownLocked("sl_phase_out_shutdown"); - } p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); p->last_ready_subchannel_index_ = -1; } @@ -652,9 +642,8 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) { "[RR %p] Shutting down previous pending subchannel list %p", this, latest_pending_subchannel_list_.get()); } - latest_pending_subchannel_list_->ShutdownLocked("sl_outdated"); } - latest_pending_subchannel_list_ = MakeRefCounted<RoundRobinSubchannelList>( + latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>( this, &grpc_lb_round_robin_trace, addresses, combiner(), client_channel_factory(), args); // If we haven't started picking yet or the new list is empty, @@ -667,9 +656,6 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "rr_update_empty"); } - if (subchannel_list_ != nullptr) { - subchannel_list_->ShutdownLocked("sl_shutdown_replace_on_update"); - } subchannel_list_ = std::move(latest_pending_subchannel_list_); last_ready_subchannel_index_ = -1; } else { diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 8746678041..5fb92e22f4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -31,6 +31,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/closure.h" @@ -118,6 +119,7 @@ class SubchannelData { pending_connectivity_state_unsafe_ = grpc_subchannel_check_connectivity(subchannel(), &error); UpdateConnectedSubchannelLocked(); +// FIXME: move the rest of this into RR if (pending_connectivity_state_unsafe_ != curr_connectivity_state_) { curr_connectivity_state_ = pending_connectivity_state_unsafe_; ProcessConnectivityChangeLocked(error); @@ -148,7 +150,7 @@ class SubchannelData { void CancelConnectivityWatchLocked(const char* reason); // Cancels any pending connectivity watch and unrefs the subchannel. - void ShutdownLocked(const char* reason); + void ShutdownLocked(); GRPC_ABSTRACT_BASE_CLASS @@ -199,11 +201,9 @@ class SubchannelData { }; // A list of subchannels. -// FIXME: make this InternallyRefCounted, and have Orphan() do -// ShutdownLocked()? -// (also, maybe we don't need to take a ref to the LB policy anymore?) template <typename SubchannelListType, typename SubchannelDataType> -class SubchannelList : public RefCountedWithTracing<SubchannelListType> { +class SubchannelList + : public InternallyRefCountedWithTracing<SubchannelListType> { public: typedef InlinedVector<SubchannelDataType, 10> SubchannelVector; @@ -213,9 +213,6 @@ class SubchannelList : public RefCountedWithTracing<SubchannelListType> { // The data for the subchannel at a particular index. SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; } - // Marks the subchannel_list as discarded. Unsubscribes all its subchannels. - void ShutdownLocked(const char* reason); - // Returns true if the subchannel list is shutting down. bool shutting_down() const { return shutting_down_; } @@ -223,6 +220,13 @@ class SubchannelList : public RefCountedWithTracing<SubchannelListType> { LoadBalancingPolicy* policy() const { return policy_; } TraceFlag* tracer() const { return tracer_; } + // Note: Caller must ensure that this is invoked inside of the combiner. + void Orphan() override { + ShutdownLocked(); + InternallyRefCountedWithTracing<SubchannelListType>::Unref(DEBUG_LOCATION, + "shutdown"); + } + GRPC_ABSTRACT_BASE_CLASS protected: @@ -238,6 +242,11 @@ class SubchannelList : public RefCountedWithTracing<SubchannelListType> { template <typename T, typename... Args> friend T* New(Args&&... args); + // For accessing Ref() and Unref(). + friend class SubchannelData<SubchannelListType, SubchannelDataType>; + + void ShutdownLocked(); + // Backpointer to owning policy. LoadBalancingPolicy* policy_; @@ -430,15 +439,14 @@ void SubchannelData<SubchannelListType, SubchannelDataType>:: } template <typename SubchannelListType, typename SubchannelDataType> -void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked( - const char* reason) { +void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() { // If there's a pending notification for this subchannel, cancel it; // the callback is responsible for unreffing the subchannel. // Otherwise, unref the subchannel directly. if (connectivity_notification_pending_) { - CancelConnectivityWatchLocked(reason); + CancelConnectivityWatchLocked("shutdown"); } else if (subchannel_ != nullptr) { - UnrefSubchannelLocked(reason); + UnrefSubchannelLocked("shutdown"); } } @@ -452,7 +460,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( const grpc_lb_addresses* addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, const grpc_channel_args& args) - : RefCountedWithTracing<SubchannelListType>(tracer), + : InternallyRefCountedWithTracing<SubchannelListType>(tracer), policy_(policy), tracer_(tracer) { if (tracer_->enabled()) { @@ -518,17 +526,16 @@ SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() { } template <typename SubchannelListType, typename SubchannelDataType> -void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked( - const char* reason) { +void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() { if (tracer_->enabled()) { - gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)", - tracer_->name(), policy_, this, reason); + gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p", + tracer_->name(), policy_, this); } GPR_ASSERT(!shutting_down_); shutting_down_ = true; for (size_t i = 0; i < subchannels_.size(); i++) { SubchannelDataType* sd = &subchannels_[i]; - sd->ShutdownLocked(reason); + sd->ShutdownLocked(); } } |