aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-04-25 12:39:45 -0700
committerGravatar Mark D. Roth <roth@google.com>2018-04-25 12:41:00 -0700
commitb1c1309bfcbe7d8834965169d9e527528d477c2e (patch)
treeba4412d59ea665203fe309cc069f3943376a7ac8 /src/core/ext/filters/client_channel
parent253358da4283d8bec9292c197fcf24935d790986 (diff)
Clean up refcounting.
Diffstat (limited to 'src/core/ext/filters/client_channel')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc66
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc40
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h29
3 files changed, 33 insertions, 102 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 24a0c83b1a..0883a4ed6c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -90,9 +90,6 @@ class PickFirst : public LoadBalancingPolicy {
const grpc_channel_args& args)
: SubchannelList(policy, tracer, addresses, combiner,
client_channel_factory, args) {}
-
- void RefForConnectivityWatch(const char* reason);
- void UnrefForConnectivityWatch(const char* reason);
};
void ShutdownLocked() override;
@@ -220,9 +217,7 @@ void PickFirst::StartPickingLocked() {
if (subchannel_list_ != nullptr) {
for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
- subchannel_list_->RefForConnectivityWatch(
- "connectivity_watch+start_picking");
- subchannel_list_->subchannel(i)->StartConnectivityWatchLocked();
+ subchannel_list_->subchannel(i)->StartOrRenewConnectivityWatchLocked();
break;
}
}
@@ -332,8 +327,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// If we've started picking, start trying to connect to the first
// subchannel in the new list.
if (started_picking_) {
- subchannel_list_->RefForConnectivityWatch("connectivity_watch+update");
- subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
+ subchannel_list_->subchannel(0)->StartOrRenewConnectivityWatchLocked();
}
} else {
// We do have a selected subchannel.
@@ -359,9 +353,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
}
subchannel_list_ = std::move(subchannel_list);
DestroyUnselectedSubchannelsLocked();
- subchannel_list_->RefForConnectivityWatch(
- "connectivity_watch+replace_selected");
- sd->StartConnectivityWatchLocked();
+ sd->StartOrRenewConnectivityWatchLocked();
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
@@ -391,35 +383,12 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// If we've started picking, start trying to connect to the first
// subchannel in the new list.
if (started_picking_) {
- latest_pending_subchannel_list_->RefForConnectivityWatch(
- "connectivity_watch+update");
latest_pending_subchannel_list_->subchannel(0)
- ->StartConnectivityWatchLocked();
+ ->StartOrRenewConnectivityWatchLocked();
}
}
}
-void PickFirst::PickFirstSubchannelList::RefForConnectivityWatch(
- const char* reason) {
- // TODO(roth): We currently track these refs manually. Once the new
- // ClosureRef API is ready, find a way to pass the RefCountedPtr<>
- // along with the closures instead of doing this manually.
- // Ref subchannel list.
- Ref(DEBUG_LOCATION, reason).release();
- // Ref LB policy.
- PickFirst* p = static_cast<PickFirst*>(policy());
- p->Ref(DEBUG_LOCATION, reason).release();
-}
-
-void PickFirst::PickFirstSubchannelList::UnrefForConnectivityWatch(
- const char* reason) {
- // Unref LB policy.
- PickFirst* p = static_cast<PickFirst*>(policy());
- p->Unref(DEBUG_LOCATION, reason);
- // Unref subchannel list.
- Unref(DEBUG_LOCATION, reason);
-}
-
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
grpc_error* error) {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
@@ -434,17 +403,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state_name(connectivity_state()), p->shutdown_,
subchannel_list()->shutting_down(), grpc_error_string(error));
}
-// FIXME: move this to SubchannelData::OnConnectivityChangedLocked()
- // If the subchannel list is shutting down, stop watching.
- if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) {
- StopConnectivityWatchLocked();
- UnrefSubchannelLocked("pf_sl_shutdown");
- subchannel_list()->UnrefForConnectivityWatch("pf_sl_shutdown");
- GRPC_ERROR_UNREF(error);
- return;
- }
- // If we're still here, the notification must be for a subchannel in
- // either the current or latest pending subchannel lists.
+ // The notification must be for a subchannel in either the current or
+ // latest pending subchannel lists.
GPR_ASSERT(p->subchannel_list_ == subchannel_list() ||
p->latest_pending_subchannel_list_ == subchannel_list());
// Handle updates for the currently selected subchannel.
@@ -455,8 +415,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->latest_pending_subchannel_list_ != nullptr) {
p->selected_ = nullptr;
StopConnectivityWatchLocked();
- subchannel_list()->UnrefForConnectivityWatch(
- "selected_not_ready+switch_to_update");
subchannel_list()->ShutdownLocked("selected_not_ready+switch_to_update");
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
grpc_connectivity_state_set(
@@ -478,14 +436,13 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
// In transient failure. Rely on re-resolution to recover.
p->selected_ = nullptr;
- StopConnectivityWatchLocked();
- subchannel_list()->UnrefForConnectivityWatch("pf_selected_shutdown");
UnrefSubchannelLocked("pf_selected_shutdown");
+ StopConnectivityWatchLocked();
} else {
grpc_connectivity_state_set(&p->state_tracker_, connectivity_state(),
GRPC_ERROR_REF(error), "selected_changed");
// Renew notification.
- StartConnectivityWatchLocked();
+ StartOrRenewConnectivityWatchLocked();
}
}
GRPC_ERROR_UNREF(error);
@@ -533,7 +490,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
// Renew notification.
- StartConnectivityWatchLocked();
+ StartOrRenewConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
@@ -551,8 +508,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
}
- // Reuses the connectivity refs from the previous watch.
- sd->StartConnectivityWatchLocked();
+ sd->StartOrRenewConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_CONNECTING:
@@ -564,7 +520,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
"connecting_changed");
}
// Renew notification.
- StartConnectivityWatchLocked();
+ StartOrRenewConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_SHUTDOWN:
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 889616e056..2b2c0e5132 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -134,10 +134,6 @@ class RoundRobin : public LoadBalancingPolicy {
GRPC_ERROR_UNREF(last_transient_failure_error_);
}
- // Manages references for connectivity watches.
- void RefForConnectivityWatch(const char* reason);
- void UnrefForConnectivityWatch(const char* reason);
-
// Starts watching the subchannels in this list.
void StartWatchingLocked();
@@ -377,6 +373,7 @@ bool RoundRobin::DoPickLocked(PickState* pick) {
/* readily available, report right away */
RoundRobinSubchannelData* sd =
subchannel_list_->subchannel(next_ready_index);
+ GPR_ASSERT(sd->connected_subchannel() != nullptr);
pick->connected_subchannel = sd->connected_subchannel()->Ref();
if (pick->user_data != nullptr) {
*pick->user_data = sd->user_data();
@@ -422,27 +419,6 @@ bool RoundRobin::PickLocked(PickState* pick) {
return false;
}
-void RoundRobin::RoundRobinSubchannelList::RefForConnectivityWatch(
- const char* reason) {
- // TODO(roth): We currently track these refs manually. Once the new
- // ClosureRef API is ready, find a way to pass the RefCountedPtr<>
- // along with the closures instead of doing this manually.
- // Ref subchannel list.
- Ref(DEBUG_LOCATION, reason).release();
- // Ref LB policy.
- RoundRobin* p = static_cast<RoundRobin*>(policy());
- p->Ref(DEBUG_LOCATION, reason).release();
-}
-
-void RoundRobin::RoundRobinSubchannelList::UnrefForConnectivityWatch(
- const char* reason) {
- // Unref LB policy.
- RoundRobin* p = static_cast<RoundRobin*>(policy());
- p->Unref(DEBUG_LOCATION, reason);
- // Unref subchannel list.
- Unref(DEBUG_LOCATION, reason);
-}
-
void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return;
// Check current state of each subchannel synchronously, since any
@@ -474,8 +450,7 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
// Start connectivity watch for each subchannel.
for (size_t i = 0; i < num_subchannels(); i++) {
if (subchannel(i)->subchannel() != nullptr) {
- RefForConnectivityWatch("connectivity_watch");
- subchannel(i)->StartConnectivityWatchLocked();
+ subchannel(i)->StartOrRenewConnectivityWatchLocked();
}
}
}
@@ -597,15 +572,6 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
subchannel_list()->shutting_down(), grpc_error_string(error));
}
GPR_ASSERT(subchannel() != nullptr);
-// FIXME: move this to SubchannelData::OnConnectivityChangedLocked()
- // If the subchannel list is shutting down, stop watching.
- if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) {
- StopConnectivityWatchLocked();
- UnrefSubchannelLocked("rr_sl_shutdown");
- subchannel_list()->UnrefForConnectivityWatch("rr_sl_shutdown");
- GRPC_ERROR_UNREF(error);
- return;
- }
// If the new state is TRANSIENT_FAILURE, re-resolve.
// Only do this if we've started watching, not at startup time.
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
@@ -628,7 +594,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
// If we've started watching, update overall state and renew notification.
if (subchannel_list()->started_watching()) {
subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
- StartConnectivityWatchLocked();
+ StartOrRenewConnectivityWatchLocked();
}
GRPC_ERROR_UNREF(error);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index b88719b747..8b843b16c0 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -109,8 +109,8 @@ class SubchannelData {
// Synchronously checks the subchannel's connectivity state. Calls
// ProcessConnectivityChangeLocked() if the state has changed.
// Must not be called while there is a connectivity notification
- // pending (i.e., between calling StartConnectivityWatchLocked() and
- // the resulting invocation of ProcessConnectivityChangeLocked()).
+ // pending (i.e., between calling StartOrRenewConnectivityWatchLocked()
+ // and the resulting invocation of ProcessConnectivityChangeLocked()).
void CheckConnectivityStateLocked() {
GPR_ASSERT(!connectivity_notification_pending_);
grpc_error* error = GRPC_ERROR_NONE;
@@ -133,15 +133,15 @@ class SubchannelData {
// Starts or renewes watching the connectivity state of the subchannel.
// ProcessConnectivityChangeLocked() will be called when the
// connectivity state changes.
- void StartConnectivityWatchLocked();
+ void StartOrRenewConnectivityWatchLocked();
// Stops watching the connectivity state of the subchannel.
void StopConnectivityWatchLocked();
// Cancels watching the connectivity state of the subchannel.
// Must be called only while there is a connectivity notification
- // pending (i.e., between calling StartConnectivityWatchLocked() and
- // the resulting invocation of ProcessConnectivityChangeLocked()).
+ // pending (i.e., between calling StartOrRenewConnectivityWatchLocked()
+ // and the resulting invocation of ProcessConnectivityChangeLocked()).
// From within ProcessConnectivityChangeLocked(), use
// StopConnectivityWatchLocked() instead.
void CancelConnectivityWatchLocked(const char* reason);
@@ -159,8 +159,8 @@ class SubchannelData {
virtual ~SubchannelData();
- // After StartConnectivityWatchLocked() is called, this method will be
- // invoked when the subchannel's connectivity state changes.
+ // After StartOrRenewConnectivityWatchLocked() is called, this method
+ // will be invoked when the subchannel's connectivity state changes.
// Implementations can use connectivity_state() to get the new
// connectivity state.
// Implementations must invoke either StopConnectivityWatch() or again
@@ -302,7 +302,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType,
- SubchannelDataType>::StartConnectivityWatchLocked() {
+ SubchannelDataType>::StartOrRenewConnectivityWatchLocked() {
if (subchannel_list_->tracer()->enabled()) {
gpr_log(GPR_DEBUG,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
@@ -313,7 +313,10 @@ void SubchannelData<SubchannelListType,
subchannel_,
grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
}
- connectivity_notification_pending_ = true;
+ if (!connectivity_notification_pending_) {
+ subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release();
+ connectivity_notification_pending_ = true;
+ }
grpc_subchannel_notify_on_state_change(
subchannel_, subchannel_list_->policy()->interested_parties(),
&pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
@@ -332,6 +335,7 @@ void SubchannelData<SubchannelListType,
}
GPR_ASSERT(connectivity_notification_pending_);
connectivity_notification_pending_ = false;
+ subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch");
}
template <typename SubchannelListType, typename SubchannelDataType>
@@ -387,9 +391,14 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
OnConnectivityChangedLocked(void* arg, grpc_error* error) {
SubchannelData* sd = static_cast<SubchannelData*>(arg);
// FIXME: add trace logging
+ if (sd->subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) {
+ sd->UnrefSubchannelLocked("connectivity_shutdown");
+ sd->StopConnectivityWatchLocked();
+ return;
+ }
if (!sd->UpdateConnectedSubchannelLocked()) {
// We don't want to report this connectivity state, so renew the watch.
- sd->StartConnectivityWatchLocked();
+ sd->StartOrRenewConnectivityWatchLocked();
return;
}
// Now that we're inside the combiner, copy the pending connectivity