aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-04-26 13:58:39 -0700
committerGravatar Mark D. Roth <roth@google.com>2018-04-26 13:58:39 -0700
commit757cd4105504b5ba8b23acc199907cf27c5233a5 (patch)
tree6cdf66eda25b83df475869af4e73e19ee806c9c0
parent88832144f44d73ff797d0028ab54b2973c217b9f (diff)
Make SubchannelList internally ref counted.
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc44
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc30
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h43
3 files changed, 44 insertions, 73 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index be00e871b8..6506dc99d6 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -98,9 +98,9 @@ class PickFirst : public LoadBalancingPolicy {
void DestroyUnselectedSubchannelsLocked();
// All our subchannels.
- RefCountedPtr<PickFirstSubchannelList> subchannel_list_;
+ OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
// Latest pending subchannel list.
- RefCountedPtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
+ OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
// Selected subchannel in \a subchannel_list_.
PickFirstSubchannelData* selected_ = nullptr;
// Have we started picking?
@@ -160,14 +160,8 @@ void PickFirst::ShutdownLocked() {
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "shutdown");
- if (subchannel_list_ != nullptr) {
- subchannel_list_->ShutdownLocked("pf_shutdown");
- subchannel_list_.reset();
- }
- if (latest_pending_subchannel_list_ != nullptr) {
- latest_pending_subchannel_list_->ShutdownLocked("pf_shutdown");
- latest_pending_subchannel_list_.reset();
- }
+ subchannel_list_.reset();
+ latest_pending_subchannel_list_.reset();
TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
@@ -300,7 +294,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
"Pick First %p received update with %" PRIuPTR " addresses", this,
addresses->num_addresses);
}
- auto subchannel_list = MakeRefCounted<PickFirstSubchannelList>(
+ auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
this, &grpc_lb_pick_first_trace, addresses, combiner(),
client_channel_factory(), args);
if (subchannel_list->num_subchannels() == 0) {
@@ -310,9 +304,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"pf_update_empty");
- if (subchannel_list_ != nullptr) {
- subchannel_list_->ShutdownLocked("sl_shutdown_empty_update");
- }
subchannel_list_ = std::move(subchannel_list); // Empty list.
selected_ = nullptr;
return;
@@ -320,9 +311,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
if (selected_ == nullptr) {
// We don't yet have a selected subchannel, so replace the current
// subchannel list immediately.
- if (subchannel_list_ != nullptr) {
- subchannel_list_->ShutdownLocked("pf_update_before_selected");
- }
subchannel_list_ = std::move(subchannel_list);
// If we've started picking, start trying to connect to the first
// subchannel in the new list.
@@ -347,20 +335,13 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
sd->SetConnectedSubchannelFromLocked(selected_);
}
selected_ = sd;
- if (subchannel_list_ != nullptr) {
- subchannel_list_->ShutdownLocked("pf_update_includes_selected");
- }
subchannel_list_ = std::move(subchannel_list);
DestroyUnselectedSubchannelsLocked();
sd->StartOrRenewConnectivityWatchLocked();
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
- if (latest_pending_subchannel_list_ != nullptr) {
- latest_pending_subchannel_list_->ShutdownLocked(
- "pf_update_includes_selected+outdated");
- latest_pending_subchannel_list_.reset();
- }
+ latest_pending_subchannel_list_.reset();
return;
}
}
@@ -376,7 +357,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
this, latest_pending_subchannel_list_.get(),
subchannel_list.get());
}
- latest_pending_subchannel_list_->ShutdownLocked("sl_outdated_dont_smash");
}
latest_pending_subchannel_list_ = std::move(subchannel_list);
// If we've started picking, start trying to connect to the first
@@ -404,8 +384,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
}
// The notification must be for a subchannel in either the current or
// latest pending subchannel lists.
- GPR_ASSERT(p->subchannel_list_ == subchannel_list() ||
- p->latest_pending_subchannel_list_ == subchannel_list());
+ GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
+ subchannel_list() == p->latest_pending_subchannel_list_.get());
// Handle updates for the currently selected subchannel.
if (p->selected_ == this) {
// If the new state is anything other than READY and there is a
@@ -414,7 +394,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->latest_pending_subchannel_list_ != nullptr) {
p->selected_ = nullptr;
StopConnectivityWatchLocked();
- subchannel_list()->ShutdownLocked("selected_not_ready+switch_to_update");
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
@@ -460,9 +439,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list_ to
// p->subchannel_list_.
- if (p->latest_pending_subchannel_list_ == subchannel_list()) {
+ if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
GPR_ASSERT(p->subchannel_list_ != nullptr);
- p->subchannel_list_->ShutdownLocked("finish_update");
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Cases 1 and 2.
@@ -502,7 +480,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
} while (sd->subchannel() == nullptr);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
- if (sd->Index() == 0 && p->subchannel_list_ == subchannel_list()) {
+ if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
@@ -513,7 +491,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1.
- if (p->subchannel_list_ == subchannel_list()) {
+ if (subchannel_list() == p->subchannel_list_.get()) {
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_REF(error),
"connecting_changed");
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 2b2c0e5132..a4bf3e7398 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -174,13 +174,13 @@ class RoundRobin : public LoadBalancingPolicy {
void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
/** list of subchannels */
- RefCountedPtr<RoundRobinSubchannelList> subchannel_list_;
+ OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
/** Latest version of the subchannel list.
* Subchannel connectivity callbacks will only promote updated subchannel
* lists if they equal \a latest_pending_subchannel_list. In other words,
* racing callbacks that reference outdated subchannel lists won't perform any
* update. */
- RefCountedPtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
+ OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
/** have we started picking? */
bool started_picking_ = false;
/** are we shutting down? */
@@ -303,14 +303,8 @@ void RoundRobin::ShutdownLocked() {
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "rr_shutdown");
- if (subchannel_list_ != nullptr) {
- subchannel_list_->ShutdownLocked("rr_shutdown");
- subchannel_list_.reset();
- }
- if (latest_pending_subchannel_list_ != nullptr) {
- latest_pending_subchannel_list_->ShutdownLocked("rr_shutdown");
- latest_pending_subchannel_list_.reset();
- }
+ subchannel_list_.reset();
+ latest_pending_subchannel_list_.reset();
TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
@@ -487,7 +481,7 @@ void RoundRobin::RoundRobinSubchannelList::
MaybeUpdateRoundRobinConnectivityStateLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
// Only set connectivity state if this is the current subchannel list.
- if (p->subchannel_list_ != this) return;
+ if (p->subchannel_list_.get() != this) return;
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
@@ -523,12 +517,12 @@ void RoundRobin::RoundRobinSubchannelList::
UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
if (num_ready_ > 0) {
- if (p->subchannel_list_ != this) {
+ if (p->subchannel_list_.get() != this) {
// Promote this list to p->subchannel_list_.
// This list must be p->latest_pending_subchannel_list_, because
// any previous update would have been shut down already and
// therefore weeded out in ProcessConnectivityChangeLocked().
- GPR_ASSERT(p->latest_pending_subchannel_list_ == this);
+ GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this);
GPR_ASSERT(!shutting_down());
if (grpc_lb_round_robin_trace.enabled()) {
const size_t old_num_subchannels =
@@ -541,10 +535,6 @@ void RoundRobin::RoundRobinSubchannelList::
p, p->subchannel_list_.get(), old_num_subchannels, this,
num_subchannels());
}
- if (p->subchannel_list_ != nullptr) {
- // Dispose of the current subchannel_list.
- p->subchannel_list_->ShutdownLocked("sl_phase_out_shutdown");
- }
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
p->last_ready_subchannel_index_ = -1;
}
@@ -652,9 +642,8 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
"[RR %p] Shutting down previous pending subchannel list %p", this,
latest_pending_subchannel_list_.get());
}
- latest_pending_subchannel_list_->ShutdownLocked("sl_outdated");
}
- latest_pending_subchannel_list_ = MakeRefCounted<RoundRobinSubchannelList>(
+ latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
this, &grpc_lb_round_robin_trace, addresses, combiner(),
client_channel_factory(), args);
// If we haven't started picking yet or the new list is empty,
@@ -667,9 +656,6 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"rr_update_empty");
}
- if (subchannel_list_ != nullptr) {
- subchannel_list_->ShutdownLocked("sl_shutdown_replace_on_update");
- }
subchannel_list_ = std::move(latest_pending_subchannel_list_);
last_ready_subchannel_index_ = -1;
} else {
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 8746678041..5fb92e22f4 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -31,6 +31,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
@@ -118,6 +119,7 @@ class SubchannelData {
pending_connectivity_state_unsafe_ =
grpc_subchannel_check_connectivity(subchannel(), &error);
UpdateConnectedSubchannelLocked();
+// FIXME: move the rest of this into RR
if (pending_connectivity_state_unsafe_ != curr_connectivity_state_) {
curr_connectivity_state_ = pending_connectivity_state_unsafe_;
ProcessConnectivityChangeLocked(error);
@@ -148,7 +150,7 @@ class SubchannelData {
void CancelConnectivityWatchLocked(const char* reason);
// Cancels any pending connectivity watch and unrefs the subchannel.
- void ShutdownLocked(const char* reason);
+ void ShutdownLocked();
GRPC_ABSTRACT_BASE_CLASS
@@ -199,11 +201,9 @@ class SubchannelData {
};
// A list of subchannels.
-// FIXME: make this InternallyRefCounted, and have Orphan() do
-// ShutdownLocked()?
-// (also, maybe we don't need to take a ref to the LB policy anymore?)
template <typename SubchannelListType, typename SubchannelDataType>
-class SubchannelList : public RefCountedWithTracing<SubchannelListType> {
+class SubchannelList
+ : public InternallyRefCountedWithTracing<SubchannelListType> {
public:
typedef InlinedVector<SubchannelDataType, 10> SubchannelVector;
@@ -213,9 +213,6 @@ class SubchannelList : public RefCountedWithTracing<SubchannelListType> {
// The data for the subchannel at a particular index.
SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
- // Marks the subchannel_list as discarded. Unsubscribes all its subchannels.
- void ShutdownLocked(const char* reason);
-
// Returns true if the subchannel list is shutting down.
bool shutting_down() const { return shutting_down_; }
@@ -223,6 +220,13 @@ class SubchannelList : public RefCountedWithTracing<SubchannelListType> {
LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; }
+ // Note: Caller must ensure that this is invoked inside of the combiner.
+ void Orphan() override {
+ ShutdownLocked();
+ InternallyRefCountedWithTracing<SubchannelListType>::Unref(DEBUG_LOCATION,
+ "shutdown");
+ }
+
GRPC_ABSTRACT_BASE_CLASS
protected:
@@ -238,6 +242,11 @@ class SubchannelList : public RefCountedWithTracing<SubchannelListType> {
template <typename T, typename... Args>
friend T* New(Args&&... args);
+ // For accessing Ref() and Unref().
+ friend class SubchannelData<SubchannelListType, SubchannelDataType>;
+
+ void ShutdownLocked();
+
// Backpointer to owning policy.
LoadBalancingPolicy* policy_;
@@ -430,15 +439,14 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
}
template <typename SubchannelListType, typename SubchannelDataType>
-void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked(
- const char* reason) {
+void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
// If there's a pending notification for this subchannel, cancel it;
// the callback is responsible for unreffing the subchannel.
// Otherwise, unref the subchannel directly.
if (connectivity_notification_pending_) {
- CancelConnectivityWatchLocked(reason);
+ CancelConnectivityWatchLocked("shutdown");
} else if (subchannel_ != nullptr) {
- UnrefSubchannelLocked(reason);
+ UnrefSubchannelLocked("shutdown");
}
}
@@ -452,7 +460,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
const grpc_lb_addresses* addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args)
- : RefCountedWithTracing<SubchannelListType>(tracer),
+ : InternallyRefCountedWithTracing<SubchannelListType>(tracer),
policy_(policy),
tracer_(tracer) {
if (tracer_->enabled()) {
@@ -518,17 +526,16 @@ SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
}
template <typename SubchannelListType, typename SubchannelDataType>
-void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked(
- const char* reason) {
+void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
if (tracer_->enabled()) {
- gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)",
- tracer_->name(), policy_, this, reason);
+ gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p",
+ tracer_->name(), policy_, this);
}
GPR_ASSERT(!shutting_down_);
shutting_down_ = true;
for (size_t i = 0; i < subchannels_.size(); i++) {
SubchannelDataType* sd = &subchannels_[i];
- sd->ShutdownLocked(reason);
+ sd->ShutdownLocked();
}
}