diff options
author | 2018-08-09 18:50:53 -0700 | |
---|---|---|
committer | 2018-08-09 21:54:11 -0700 | |
commit | 9b72650125cd0f8b21ebd77825388e7dc3a9d191 (patch) | |
tree | 0f26a1ec357fb807255842522678dc188fd0bf43 /src/core | |
parent | 4bdb0e398c686fa6af586bb5cd28f32b0f458da4 (diff) |
PF: Check connectivity state before watching
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 103 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/subchannel_list.h | 20 |
2 files changed, 84 insertions, 39 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 2b6a9ba8c5..2c08245a8e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -80,6 +80,11 @@ class PickFirst : public LoadBalancingPolicy { void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state, grpc_error* error) override; + + // Processes the connectivity change to READY for an unselected subchannel. + void ProcessUnselectedReadyLocked(); + + void StartConnectivityWatchLocked() override; }; class PickFirstSubchannelList @@ -519,41 +524,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // select in place of the current one. switch (connectivity_state) { case GRPC_CHANNEL_READY: { - // Case 2. Promote p->latest_pending_subchannel_list_ to - // p->subchannel_list_. - if (subchannel_list() == p->latest_pending_subchannel_list_.get()) { - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, - "Pick First %p promoting pending subchannel list %p to " - "replace %p", - p, p->latest_pending_subchannel_list_.get(), - p->subchannel_list_.get()); - } - p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); - } - // Cases 1 and 2. - grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY, - GRPC_ERROR_NONE, "connecting_ready"); - p->selected_ = this; - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, - subchannel()); - } - // Drop all other subchannels, since we are now connected. - p->DestroyUnselectedSubchannelsLocked(); - // Update any calls that were waiting for a pick. - PickState* pick; - while ((pick = p->pending_picks_)) { - p->pending_picks_ = pick->next; - pick->connected_subchannel = - p->selected_->connected_subchannel()->Ref(); - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, - "Servicing pending pick with selected subchannel %p", - p->selected_->subchannel()); - } - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); - } + ProcessUnselectedReadyLocked(); // Renew notification. RenewConnectivityWatchLocked(); break; @@ -595,6 +566,68 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( GRPC_ERROR_UNREF(error); } +void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() { + PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); + GPR_ASSERT(p->selected_ != this); + GPR_ASSERT(connectivity_state() == GRPC_CHANNEL_READY); + // If we get here, there are two possible cases: + // 1. We do not currently have a selected subchannel, and the update is + // for a subchannel in p->subchannel_list_ that we're trying to + // connect to. The goal here is to find a subchannel that we can + // select. + // 2. We do currently have a selected subchannel, and the update is + // for a subchannel in p->latest_pending_subchannel_list_. The + // goal here is to find a subchannel from the update that we can + // select in place of the current one. + GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || + subchannel_list() == p->latest_pending_subchannel_list_.get()); + // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_. + if (subchannel_list() == p->latest_pending_subchannel_list_.get()) { + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, + "Pick First %p promoting pending subchannel list %p to " + "replace %p", + p, p->latest_pending_subchannel_list_.get(), + p->subchannel_list_.get()); + } + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); + } + // Cases 1 and 2. + grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY, + GRPC_ERROR_NONE, "subchannel_ready"); + p->selected_ = this; + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel()); + } + // Drop all other subchannels, since we are now connected. + p->DestroyUnselectedSubchannelsLocked(); + // Update any calls that were waiting for a pick. + PickState* pick; + while ((pick = p->pending_picks_)) { + p->pending_picks_ = pick->next; + pick->connected_subchannel = p->selected_->connected_subchannel()->Ref(); + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", + p->selected_->subchannel()); + } + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + } +} + +void PickFirst::PickFirstSubchannelData::StartConnectivityWatchLocked() { + PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); + grpc_error* error = GRPC_ERROR_NONE; + if (p->selected_ != this && + CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) { + // We must process the READY subchannel before we start watching it. + // Otherwise, we won't know it's READY because we will be waiting for its + // connectivity state to change from READY. + ProcessUnselectedReadyLocked(); + } + GRPC_ERROR_UNREF(error); + SubchannelData::StartConnectivityWatchLocked(); +} + // // factory // diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 0fa2f04e73..0a75fc10c0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -115,7 +115,7 @@ class SubchannelData { // Starts watching the connectivity state of the subchannel. // ProcessConnectivityChangeLocked() will be called when the // connectivity state changes. - void StartConnectivityWatchLocked(); + virtual void StartConnectivityWatchLocked(); // Renews watching the connectivity state of the subchannel. void RenewConnectivityWatchLocked(); @@ -154,6 +154,10 @@ class SubchannelData { grpc_connectivity_state connectivity_state, grpc_error* error) GRPC_ABSTRACT; + // Returns the connectivity state. Must be called only while there is no + // connectivity notification pending. + grpc_connectivity_state connectivity_state() const; + private: // Updates connected_subchannel_ based on pending_connectivity_state_unsafe_. // Returns true if the connectivity state should be reported. @@ -317,6 +321,13 @@ void SubchannelData<SubchannelListType, } template <typename SubchannelListType, typename SubchannelDataType> +grpc_connectivity_state SubchannelData< + SubchannelListType, SubchannelDataType>::connectivity_state() const { + GPR_ASSERT(!connectivity_notification_pending_); + return pending_connectivity_state_unsafe_; +} + +template <typename SubchannelListType, typename SubchannelDataType> void SubchannelData<SubchannelListType, SubchannelDataType>::StartConnectivityWatchLocked() { if (subchannel_list_->tracer()->enabled()) { @@ -350,7 +361,8 @@ void SubchannelData<SubchannelListType, subchannel_, grpc_connectivity_state_name(pending_connectivity_state_unsafe_)); } - GPR_ASSERT(connectivity_notification_pending_); + GPR_ASSERT(!connectivity_notification_pending_); + connectivity_notification_pending_ = true; grpc_subchannel_notify_on_state_change( subchannel_, subchannel_list_->policy()->interested_parties(), &pending_connectivity_state_unsafe_, &connectivity_changed_closure_); @@ -367,8 +379,7 @@ void SubchannelData<SubchannelListType, subchannel_list_, Index(), subchannel_list_->num_subchannels(), subchannel_); } - GPR_ASSERT(connectivity_notification_pending_); - connectivity_notification_pending_ = false; + GPR_ASSERT(!connectivity_notification_pending_); subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch"); } @@ -442,6 +453,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>:: grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe_), grpc_error_string(error), sd->subchannel_list_->shutting_down()); } + sd->connectivity_notification_pending_ = false; // If shutting down, unref subchannel and stop watching. if (sd->subchannel_list_->shutting_down() || error == GRPC_ERROR_CANCELLED) { sd->UnrefSubchannelLocked("connectivity_shutdown"); |