aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-04-12 15:08:36 -0700
committerGravatar Mark D. Roth <roth@google.com>2018-04-12 15:08:36 -0700
commit542bceb573b4c52a883f95ff240c6aba473790bc (patch)
tree7660eb1fc119fed412e36c866f7e3001ded3e555
parent75d9edab09f90d70dee2483feffe3e465d870bf1 (diff)
Fix race between READY notification and reffing connected subchannel.
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc3
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc41
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h43
3 files changed, 49 insertions, 38 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 03e5c89281..24a0c83b1a 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -350,6 +350,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
subchannel_list->num_subchannels());
}
if (selected_->connected_subchannel() != nullptr) {
+// FIXME: restructure to work more like RR?
sd->SetConnectedSubchannelFromLocked(selected_);
}
selected_ = sd;
@@ -433,6 +434,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state_name(connectivity_state()), p->shutdown_,
subchannel_list()->shutting_down(), grpc_error_string(error));
}
+// FIXME: move this to SubchannelData::OnConnectivityChangedLocked()
// If the subchannel list is shutting down, stop watching.
if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) {
StopConnectivityWatchLocked();
@@ -502,7 +504,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list_ to
// p->subchannel_list_.
- SetConnectedSubchannelFromSubchannelLocked();
if (p->latest_pending_subchannel_list_ == subchannel_list()) {
GPR_ASSERT(p->subchannel_list_ != nullptr);
p->subchannel_list_->ShutdownLocked("finish_update");
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index f9bd0c0eb4..889616e056 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -597,6 +597,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
subchannel_list()->shutting_down(), grpc_error_string(error));
}
GPR_ASSERT(subchannel() != nullptr);
+// FIXME: move this to SubchannelData::OnConnectivityChangedLocked()
// If the subchannel list is shutting down, stop watching.
if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) {
StopConnectivityWatchLocked();
@@ -605,34 +606,20 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
GRPC_ERROR_UNREF(error);
return;
}
- // Process the state change.
- switch (connectivity_state()) {
- case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- // Only re-resolve if we've started watching, not at startup time.
- // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
- // when the subchannel list was created, we'd wind up in a constant
- // loop of re-resolution.
- if (subchannel_list()->started_watching()) {
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
- "Requesting re-resolution",
- p, subchannel());
- }
- p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
- }
- break;
- }
- case GRPC_CHANNEL_READY: {
- if (connected_subchannel() == nullptr) {
- SetConnectedSubchannelFromSubchannelLocked();
- }
- break;
+ // If the new state is TRANSIENT_FAILURE, re-resolve.
+ // Only do this if we've started watching, not at startup time.
+ // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
+ // when the subchannel list was created, we'd wind up in a constant
+ // loop of re-resolution.
+ if (connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE &&
+ subchannel_list()->started_watching()) {
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
+ "Requesting re-resolution",
+ p, subchannel());
}
- case GRPC_CHANNEL_SHUTDOWN:
- GPR_UNREACHABLE_CODE(return );
- case GRPC_CHANNEL_CONNECTING:
- case GRPC_CHANNEL_IDLE:; // fallthrough
+ p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
}
// Update state counters.
subchannel_list()->UpdateStateCountersLocked(
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index b3fc5fefe9..e13504313d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -94,12 +94,7 @@ class SubchannelData {
return curr_connectivity_state_;
}
- // Sets the connected subchannel from the subchannel.
- void SetConnectedSubchannelFromSubchannelLocked() {
- connected_subchannel_ =
- grpc_subchannel_get_connected_subchannel(subchannel_);
- }
-
+// FIXME: remove
// An alternative to SetConnectedSubchannelFromSubchannelLocked() for
// cases where we are retaining a connected subchannel from a previous
// subchannel list. This is slightly more efficient than getting the
@@ -191,10 +186,16 @@ class SubchannelData {
// OnConnectivityChangedLocked().
grpc_connectivity_state pending_connectivity_state_unsafe_;
// Current connectivity state.
+// FIXME: move this into RR, not needed in PF because connectivity_state
+// is only used in ProcessConnectivityChangeLocked()
+// (maybe pass it as a param and eliminate the accessor method?)
grpc_connectivity_state curr_connectivity_state_;
};
// A list of subchannels.
+// FIXME: make this InternallyRefCounted, and have Orphan() do
+// ShutdownLocked()?
+// (also, maybe we don't need to take a ref to the LB policy anymore?)
template <typename SubchannelListType, typename SubchannelDataType>
class SubchannelList : public RefCountedWithTracing<SubchannelListType> {
public:
@@ -348,14 +349,36 @@ template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::
OnConnectivityChangedLocked(void* arg, grpc_error* error) {
SubchannelData* sd = static_cast<SubchannelData*>(arg);
+// FIXME: add trace logging
+ // If the subchannel is READY, get a ref to the connected subchannel.
+ if (sd->pending_connectivity_state_unsafe_ == GRPC_CHANNEL_READY) {
+ sd->connected_subchannel_ =
+ grpc_subchannel_get_connected_subchannel(sd->subchannel_);
+ // If the subchannel became disconnected between the time that this
+ // callback was scheduled and the time that it was actually run in the
+ // combiner, then the connected subchannel may have disappeared out from
+ // under us. In that case, instead of propagating the READY notification,
+ // we simply renew our watch and wait for the next notification.
+ // Note that we start the renewed watch from IDLE to make sure we
+ // get a notification for the next state, even if that state is
+ // READY again (e.g., if the subchannel has transitioned back to
+ // READY before the callback gets scheduled).
+ if (sd->connected_subchannel_ == nullptr) {
+ sd->pending_connectivity_state_unsafe_ = GRPC_CHANNEL_IDLE;
+ sd->StartConnectivityWatchLocked();
+ return;
+ }
+ }
+ // If we get TRANSIENT_FAILURE, unref the connected subchannel.
+ else if (sd->pending_connectivity_state_unsafe_ ==
+ GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ sd->connected_subchannel_.reset();
+ }
// Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state_, which is what we use inside of the combiner.
sd->curr_connectivity_state_ = sd->pending_connectivity_state_unsafe_;
- // If we get TRANSIENT_FAILURE, unref the connected subchannel.
- if (sd->curr_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- sd->connected_subchannel_.reset();
- }
+ // Call the subclass's ProcessConnectivityChangeLocked() method.
sd->ProcessConnectivityChangeLocked(GRPC_ERROR_REF(error));
}