aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-04-27 08:24:30 -0700
committerGravatar Mark D. Roth <roth@google.com>2018-04-27 08:24:30 -0700
commit717c100c8c5be62c76d9edbcd8b4036b3fb98bfe (patch)
tree854dc33c0f3fc27882cc073fefacfab2b6b96366
parent0839ac6d180c439abc23d50e254c61754a64d130 (diff)
Clean up connectivity state tracking.
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc17
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc74
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h51
3 files changed, 56 insertions, 86 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index dc98a92178..1fecdebccf 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -76,7 +76,8 @@ class PickFirst : public LoadBalancingPolicy {
: SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
combiner) {}
- void ProcessConnectivityChangeLocked(grpc_error* error) override;
+ void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) override;
};
class PickFirstSubchannelList
@@ -369,7 +370,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
}
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
- grpc_error* error) {
+ grpc_connectivity_state connectivity_state, grpc_error* error) {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
@@ -379,7 +380,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
"sd->subchannel_list->shutting_down=%d error=%s",
p, subchannel(), Index(), subchannel_list()->num_subchannels(),
subchannel_list(),
- grpc_connectivity_state_name(connectivity_state()), p->shutdown_,
+ grpc_connectivity_state_name(connectivity_state), p->shutdown_,
subchannel_list()->shutting_down(), grpc_error_string(error));
}
// The notification must be for a subchannel in either the current or
@@ -390,7 +391,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
if (p->selected_ == this) {
// If the new state is anything other than READY and there is a
// pending update, switch to the pending update.
- if (connectivity_state() != GRPC_CHANNEL_READY &&
+ if (connectivity_state != GRPC_CHANNEL_READY &&
p->latest_pending_subchannel_list_ != nullptr) {
p->selected_ = nullptr;
StopConnectivityWatchLocked();
@@ -404,8 +405,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected
// subchannel reconnecting.
- GPR_ASSERT(connectivity_state() != GRPC_CHANNEL_SHUTDOWN);
- if (connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
@@ -417,7 +418,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
UnrefSubchannelLocked("pf_selected_shutdown");
StopConnectivityWatchLocked();
} else {
- grpc_connectivity_state_set(&p->state_tracker_, connectivity_state(),
+ grpc_connectivity_state_set(&p->state_tracker_, connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
// Renew notification.
RenewConnectivityWatchLocked();
@@ -435,7 +436,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
- switch (connectivity_state()) {
+ switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list_ to
// p->subchannel_list_.
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index d6a738ffc9..764d2c8ad7 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -109,12 +109,20 @@ class RoundRobin : public LoadBalancingPolicy {
void* user_data() const { return user_data_; }
+ grpc_connectivity_state connectivity_state() const {
+ return last_connectivity_state_;
+ }
+
+ void UpdateConnectivityStateLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error);
+
private:
- void ProcessConnectivityChangeLocked(grpc_error* error) override;
+ void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) override;
const grpc_lb_user_data_vtable* user_data_vtable_;
void* user_data_ = nullptr;
- grpc_connectivity_state prev_connectivity_state_ = GRPC_CHANNEL_IDLE;
+ grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
};
// A list of subchannels.
@@ -137,9 +145,6 @@ class RoundRobin : public LoadBalancingPolicy {
// Starts watching the subchannels in this list.
void StartWatchingLocked();
- // Returns true if we have started watching.
- bool started_watching() const { return started_watching_; }
-
// Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state.
// transient_failure_error is the error that is reported when
@@ -158,7 +163,6 @@ class RoundRobin : public LoadBalancingPolicy {
void UpdateRoundRobinStateFromSubchannelStateCountsLocked();
private:
- bool started_watching_ = false;
size_t num_ready_ = 0;
size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0;
@@ -416,29 +420,16 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return;
// Check current state of each subchannel synchronously, since any
// subchannel already used by some other channel may have a non-IDLE
- // state. This will invoke ProcessConnectivityChangeLocked() for each
- // subchannel whose state is not IDLE. However, because started_watching_
- // is still false, the code there will do two special things:
- //
- // - It will skip re-resolution for any subchannel in state
- // TRANSIENT_FAILURE, since doing this at start-watching-time would
- // cause us to enter an endless loop of re-resolution (i.e.,
- // re-resolution would cause a new update, and the new update would
- // immediately trigger a new re-resolution).
- //
- // - It will not call UpdateRoundRobinStateFromSubchannelStateCountsLocked();
- // instead, we call that here after all subchannels have been checked.
- // This allows us to act more intelligently based on the state of all
- // subchannels, rather than just acting on the first one. For example,
- // if there is more than one pending pick, this allows us to spread the
- // picks across all READY subchannels rather than sending them all to
- // the first subchannel that reports READY.
+ // state.
for (size_t i = 0; i < num_subchannels(); ++i) {
- subchannel(i)->CheckConnectivityStateLocked();
+ grpc_error* error = GRPC_ERROR_NONE;
+ grpc_connectivity_state state =
+ subchannel(i)->CheckConnectivityStateLocked(&error);
+ if (state != GRPC_CHANNEL_IDLE) {
+ subchannel(i)->UpdateConnectivityStateLocked(state, error);
+ }
}
- // Now set started_watching_ to true and call
- // UpdateRoundRobinStateFromSubchannelStateCountsLocked().
- started_watching_ = true;
+ // Now set the LB policy's state based on the subchannels' states.
UpdateRoundRobinStateFromSubchannelStateCountsLocked();
// Start connectivity watch for each subchannel.
for (size_t i = 0; i < num_subchannels(); i++) {
@@ -544,8 +535,15 @@ void RoundRobin::RoundRobinSubchannelList::
MaybeUpdateRoundRobinConnectivityStateLocked();
}
+void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) {
+ subchannel_list()->UpdateStateCountersLocked(
+ last_connectivity_state_, connectivity_state, GRPC_ERROR_REF(error));
+ last_connectivity_state_ = connectivity_state;
+}
+
void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
- grpc_error* error) {
+ grpc_connectivity_state connectivity_state, grpc_error* error) {
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
@@ -556,8 +554,8 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
"p->shutdown=%d sd->subchannel_list->shutting_down=%d error=%s",
p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(),
- grpc_connectivity_state_name(prev_connectivity_state_),
- grpc_connectivity_state_name(connectivity_state()), p->shutdown_,
+ grpc_connectivity_state_name(last_connectivity_state_),
+ grpc_connectivity_state_name(connectivity_state), p->shutdown_,
subchannel_list()->shutting_down(), grpc_error_string(error));
}
GPR_ASSERT(subchannel() != nullptr);
@@ -566,8 +564,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
// when the subchannel list was created, we'd wind up in a constant
// loop of re-resolution.
- if (connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE &&
- subchannel_list()->started_watching()) {
+ if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
"[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
@@ -577,15 +574,10 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
}
// Update state counters.
- subchannel_list()->UpdateStateCountersLocked(
- prev_connectivity_state_, connectivity_state(), GRPC_ERROR_REF(error));
- prev_connectivity_state_ = connectivity_state();
- // If we've started watching, update overall state and renew notification.
- if (subchannel_list()->started_watching()) {
- subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
- RenewConnectivityWatchLocked();
- }
- GRPC_ERROR_UNREF(error);
+ UpdateConnectivityStateLocked(connectivity_state, error);
+ // Update overall state and renew notification.
+ subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
+ RenewConnectivityWatchLocked();
}
grpc_connectivity_state RoundRobin::CheckConnectivityLocked(
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index bae3f0ba71..bad50c461c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -49,7 +49,8 @@ class MySubchannelList; // Forward declaration.
class MySubchannelData
: public SubchannelData<MySubchannelList, MySubchannelData> {
public:
- void ProcessConnectivityChangeLocked(grpc_error* error) override {
+ void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) override {
// ...code to handle connectivity changes...
}
};
@@ -88,13 +89,7 @@ class SubchannelData {
return connected_subchannel_.get();
}
- // The current connectivity state.
- // May be called from ProcessConnectivityChangeLocked() to determine
- // the state that the subchannel has transitioned into.
- grpc_connectivity_state connectivity_state() const {
- return curr_connectivity_state_;
- }
-
+// FIXME: remove
// Used to set the connected subchannel in cases where we are retaining a
// subchannel from a previous subchannel list. This is slightly more
// efficient than getting the connected subchannel from the subchannel,
@@ -108,25 +103,17 @@ class SubchannelData {
connected_subchannel_ = other->connected_subchannel_; // Adds ref.
}
- // Synchronously checks the subchannel's connectivity state. Calls
- // ProcessConnectivityChangeLocked() if the state has changed.
+ // Synchronously checks the subchannel's connectivity state.
// Must not be called while there is a connectivity notification
// pending (i.e., between calling StartConnectivityWatchLocked() or
// RenewConnectivityWatchLocked() and the resulting invocation of
// ProcessConnectivityChangeLocked()).
- void CheckConnectivityStateLocked() {
+ grpc_connectivity_state CheckConnectivityStateLocked(grpc_error** error) {
GPR_ASSERT(!connectivity_notification_pending_);
- grpc_error* error = GRPC_ERROR_NONE;
pending_connectivity_state_unsafe_ =
- grpc_subchannel_check_connectivity(subchannel(), &error);
+ grpc_subchannel_check_connectivity(subchannel(), error);
UpdateConnectedSubchannelLocked();
-// FIXME: move the rest of this into RR
- if (pending_connectivity_state_unsafe_ != curr_connectivity_state_) {
- curr_connectivity_state_ = pending_connectivity_state_unsafe_;
- ProcessConnectivityChangeLocked(error);
- } else {
- GRPC_ERROR_UNREF(error);
- }
+ return pending_connectivity_state_unsafe_;
}
// Unrefs the subchannel. May be used if an individual subchannel is
@@ -170,11 +157,11 @@ class SubchannelData {
// After StartConnectivityWatchLocked() or RenewConnectivityWatchLocked()
// is called, this method will be invoked when the subchannel's connectivity
// state changes.
- // Implementations can use connectivity_state() to get the new
- // connectivity state.
// Implementations must invoke either RenewConnectivityWatchLocked() or
// StopConnectivityWatchLocked() before returning.
- virtual void ProcessConnectivityChangeLocked(grpc_error* error) GRPC_ABSTRACT;
+ virtual void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state,
+ grpc_error* error) GRPC_ABSTRACT;
private:
// Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
@@ -196,14 +183,8 @@ class SubchannelData {
bool connectivity_notification_pending_ = false;
// Connectivity state to be updated by
// grpc_subchannel_notify_on_state_change(), not guarded by
- // the combiner. Will be copied to curr_connectivity_state_ by
- // OnConnectivityChangedLocked().
+ // the combiner.
grpc_connectivity_state pending_connectivity_state_unsafe_;
- // Current connectivity state.
-// FIXME: move this into RR, not needed in PF because connectivity_state
-// is only used in ProcessConnectivityChangeLocked()
-// (maybe pass it as a param and eliminate the accessor method?)
- grpc_connectivity_state curr_connectivity_state_;
};
// A list of subchannels.
@@ -287,8 +268,7 @@ SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
subchannel_(subchannel),
// We assume that the current state is IDLE. If not, we'll get a
// callback telling us that.
- pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE),
- curr_connectivity_state_(GRPC_CHANNEL_IDLE) {
+ pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE) {
GRPC_CLOSURE_INIT(
&connectivity_changed_closure_,
(&SubchannelData<SubchannelListType,
@@ -457,12 +437,9 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
sd->RenewConnectivityWatchLocked();
return;
}
- // Now that we're inside the combiner, copy the pending connectivity
- // state (which was set by the connectivity state watcher) to
- // curr_connectivity_state_, which is what we use inside of the combiner.
- sd->curr_connectivity_state_ = sd->pending_connectivity_state_unsafe_;
// Call the subclass's ProcessConnectivityChangeLocked() method.
- sd->ProcessConnectivityChangeLocked(GRPC_ERROR_REF(error));
+ sd->ProcessConnectivityChangeLocked(sd->pending_connectivity_state_unsafe_,
+ GRPC_ERROR_REF(error));
}
template <typename SubchannelListType, typename SubchannelDataType>