aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Juanli Shen <juanlishen@google.com>2018-08-09 18:50:53 -0700
committerGravatar Juanli Shen <juanlishen@google.com>2018-08-09 21:54:11 -0700
commit9b72650125cd0f8b21ebd77825388e7dc3a9d191 (patch)
tree0f26a1ec357fb807255842522678dc188fd0bf43 /src
parent4bdb0e398c686fa6af586bb5cd28f32b0f458da4 (diff)
PF: Check connectivity state before watching
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc103
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h20
2 files changed, 84 insertions, 39 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 2b6a9ba8c5..2c08245a8e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -80,6 +80,11 @@ class PickFirst : public LoadBalancingPolicy {
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
+
+ // Processes the connectivity change to READY for an unselected subchannel.
+ void ProcessUnselectedReadyLocked();
+
+ void StartConnectivityWatchLocked() override;
};
class PickFirstSubchannelList
@@ -519,41 +524,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// select in place of the current one.
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
- // Case 2. Promote p->latest_pending_subchannel_list_ to
- // p->subchannel_list_.
- if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO,
- "Pick First %p promoting pending subchannel list %p to "
- "replace %p",
- p, p->latest_pending_subchannel_list_.get(),
- p->subchannel_list_.get());
- }
- p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
- }
- // Cases 1 and 2.
- grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
- GRPC_ERROR_NONE, "connecting_ready");
- p->selected_ = this;
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
- subchannel());
- }
- // Drop all other subchannels, since we are now connected.
- p->DestroyUnselectedSubchannelsLocked();
- // Update any calls that were waiting for a pick.
- PickState* pick;
- while ((pick = p->pending_picks_)) {
- p->pending_picks_ = pick->next;
- pick->connected_subchannel =
- p->selected_->connected_subchannel()->Ref();
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO,
- "Servicing pending pick with selected subchannel %p",
- p->selected_->subchannel());
- }
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
- }
+ ProcessUnselectedReadyLocked();
// Renew notification.
RenewConnectivityWatchLocked();
break;
@@ -595,6 +566,68 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
GRPC_ERROR_UNREF(error);
}
+void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
+ PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
+ GPR_ASSERT(p->selected_ != this);
+ GPR_ASSERT(connectivity_state() == GRPC_CHANNEL_READY);
+ // If we get here, there are two possible cases:
+ // 1. We do not currently have a selected subchannel, and the update is
+ // for a subchannel in p->subchannel_list_ that we're trying to
+ // connect to. The goal here is to find a subchannel that we can
+ // select.
+ // 2. We do currently have a selected subchannel, and the update is
+ // for a subchannel in p->latest_pending_subchannel_list_. The
+ // goal here is to find a subchannel from the update that we can
+ // select in place of the current one.
+ GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
+ subchannel_list() == p->latest_pending_subchannel_list_.get());
+ // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
+ if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "Pick First %p promoting pending subchannel list %p to "
+ "replace %p",
+ p, p->latest_pending_subchannel_list_.get(),
+ p->subchannel_list_.get());
+ }
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
+ }
+ // Cases 1 and 2.
+ grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
+ GRPC_ERROR_NONE, "subchannel_ready");
+ p->selected_ = this;
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
+ }
+ // Drop all other subchannels, since we are now connected.
+ p->DestroyUnselectedSubchannelsLocked();
+ // Update any calls that were waiting for a pick.
+ PickState* pick;
+ while ((pick = p->pending_picks_)) {
+ p->pending_picks_ = pick->next;
+ pick->connected_subchannel = p->selected_->connected_subchannel()->Ref();
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p",
+ p->selected_->subchannel());
+ }
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ }
+}
+
+void PickFirst::PickFirstSubchannelData::StartConnectivityWatchLocked() {
+ PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (p->selected_ != this &&
+ CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
+ // We must process the READY subchannel before we start watching it.
+ // Otherwise, we won't know it's READY because we will be waiting for its
+ // connectivity state to change from READY.
+ ProcessUnselectedReadyLocked();
+ }
+ GRPC_ERROR_UNREF(error);
+ SubchannelData::StartConnectivityWatchLocked();
+}
+
//
// factory
//
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 0fa2f04e73..0a75fc10c0 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -115,7 +115,7 @@ class SubchannelData {
// Starts watching the connectivity state of the subchannel.
// ProcessConnectivityChangeLocked() will be called when the
// connectivity state changes.
- void StartConnectivityWatchLocked();
+ virtual void StartConnectivityWatchLocked();
// Renews watching the connectivity state of the subchannel.
void RenewConnectivityWatchLocked();
@@ -154,6 +154,10 @@ class SubchannelData {
grpc_connectivity_state connectivity_state,
grpc_error* error) GRPC_ABSTRACT;
+ // Returns the connectivity state. Must be called only while there is no
+ // connectivity notification pending.
+ grpc_connectivity_state connectivity_state() const;
+
private:
// Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
// Returns true if the connectivity state should be reported.
@@ -317,6 +321,13 @@ void SubchannelData<SubchannelListType,
}
template <typename SubchannelListType, typename SubchannelDataType>
+grpc_connectivity_state SubchannelData<
+ SubchannelListType, SubchannelDataType>::connectivity_state() const {
+ GPR_ASSERT(!connectivity_notification_pending_);
+ return pending_connectivity_state_unsafe_;
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType,
SubchannelDataType>::StartConnectivityWatchLocked() {
if (subchannel_list_->tracer()->enabled()) {
@@ -350,7 +361,8 @@ void SubchannelData<SubchannelListType,
subchannel_,
grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
}
- GPR_ASSERT(connectivity_notification_pending_);
+ GPR_ASSERT(!connectivity_notification_pending_);
+ connectivity_notification_pending_ = true;
grpc_subchannel_notify_on_state_change(
subchannel_, subchannel_list_->policy()->interested_parties(),
&pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
@@ -367,8 +379,7 @@ void SubchannelData<SubchannelListType,
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_);
}
- GPR_ASSERT(connectivity_notification_pending_);
- connectivity_notification_pending_ = false;
+ GPR_ASSERT(!connectivity_notification_pending_);
subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch");
}
@@ -442,6 +453,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe_),
grpc_error_string(error), sd->subchannel_list_->shutting_down());
}
+ sd->connectivity_notification_pending_ = false;
// If shutting down, unref subchannel and stop watching.
if (sd->subchannel_list_->shutting_down() || error == GRPC_ERROR_CANCELLED) {
sd->UnrefSubchannelLocked("connectivity_shutdown");