aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc155
1 files changed, 96 insertions, 59 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index d217dc0e63..9120abfa3c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -27,6 +27,7 @@
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -46,7 +47,7 @@ class PickFirst : public LoadBalancingPolicy {
explicit PickFirst(const Args& args);
void UpdateLocked(const grpc_channel_args& args) override;
- bool PickLocked(PickState* pick) override;
+ bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
@@ -56,8 +57,8 @@ class PickFirst : public LoadBalancingPolicy {
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
- void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
+ void ResetBackoffLocked() override;
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* ignored) override;
@@ -80,6 +81,11 @@ class PickFirst : public LoadBalancingPolicy {
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
+
+ // Processes the connectivity change to READY for an unselected subchannel.
+ void ProcessUnselectedReadyLocked();
+
+ void CheckConnectivityStateAndStartWatchingLocked();
};
class PickFirstSubchannelList
@@ -173,9 +179,10 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
- if (new_policy->PickLocked(pick)) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (new_policy->PickLocked(pick, &error)) {
// Synchronous return, schedule closure.
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(pick->on_complete, error);
}
}
}
@@ -246,7 +253,8 @@ void PickFirst::StartPickingLocked() {
if (subchannel_list_ != nullptr) {
for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
- subchannel_list_->subchannel(i)->StartConnectivityWatchLocked();
+ subchannel_list_->subchannel(i)
+ ->CheckConnectivityStateAndStartWatchingLocked();
break;
}
}
@@ -259,18 +267,30 @@ void PickFirst::ExitIdleLocked() {
}
}
-bool PickFirst::PickLocked(PickState* pick) {
+void PickFirst::ResetBackoffLocked() {
+ subchannel_list_->ResetBackoffLocked();
+ if (latest_pending_subchannel_list_ != nullptr) {
+ latest_pending_subchannel_list_->ResetBackoffLocked();
+ }
+}
+
+bool PickFirst::PickLocked(PickState* pick, grpc_error** error) {
// If we have a selected subchannel already, return synchronously.
if (selected_ != nullptr) {
pick->connected_subchannel = selected_->connected_subchannel()->Ref();
return true;
}
// No subchannel selected yet, so handle asynchronously.
- if (!started_picking_) {
- StartPickingLocked();
+ if (pick->on_complete == nullptr) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "No pick result available but synchronous result required.");
+ return true;
}
pick->next = pending_picks_;
pending_picks_ = pick;
+ if (!started_picking_) {
+ StartPickingLocked();
+ }
return false;
}
@@ -293,20 +313,9 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
notify);
}
-void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
- if (selected_ != nullptr) {
- selected_->connected_subchannel()->Ping(on_initiate, on_ack);
- } else {
- GRPC_CLOSURE_SCHED(on_initiate,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
- GRPC_CLOSURE_SCHED(on_ack,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
- }
-}
-
void PickFirst::FillChildRefsForChannelz(
ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
- mu_guard guard(&child_refs_mu_);
+ MutexLock lock(&child_refs_mu_);
for (size_t i = 0; i < child_subchannels_.size(); ++i) {
// TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
// have to implement lightweight set. For now, we don't care about
@@ -333,7 +342,7 @@ void PickFirst::UpdateChildRefsLocked() {
latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
- mu_guard guard(&child_refs_mu_);
+ MutexLock lock(&child_refs_mu_);
child_subchannels_ = std::move(cs);
}
@@ -384,7 +393,8 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// If we've started picking, start trying to connect to the first
// subchannel in the new list.
if (started_picking_) {
- subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
+ subchannel_list_->subchannel(0)
+ ->CheckConnectivityStateAndStartWatchingLocked();
}
} else {
// We do have a selected subchannel.
@@ -438,7 +448,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// subchannel in the new list.
if (started_picking_) {
latest_pending_subchannel_list_->subchannel(0)
- ->StartConnectivityWatchLocked();
+ ->CheckConnectivityStateAndStartWatchingLocked();
}
}
}
@@ -517,41 +527,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
// select in place of the current one.
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
- // Case 2. Promote p->latest_pending_subchannel_list_ to
- // p->subchannel_list_.
- if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO,
- "Pick First %p promoting pending subchannel list %p to "
- "replace %p",
- p, p->latest_pending_subchannel_list_.get(),
- p->subchannel_list_.get());
- }
- p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
- }
- // Cases 1 and 2.
- grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
- GRPC_ERROR_NONE, "connecting_ready");
- p->selected_ = this;
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
- subchannel());
- }
- // Drop all other subchannels, since we are now connected.
- p->DestroyUnselectedSubchannelsLocked();
- // Update any calls that were waiting for a pick.
- PickState* pick;
- while ((pick = p->pending_picks_)) {
- p->pending_picks_ = pick->next;
- pick->connected_subchannel =
- p->selected_->connected_subchannel()->Ref();
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO,
- "Servicing pending pick with selected subchannel %p",
- p->selected_->subchannel());
- }
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
- }
+ ProcessUnselectedReadyLocked();
// Renew notification.
RenewConnectivityWatchLocked();
break;
@@ -572,7 +548,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "exhausted_subchannels");
}
- sd->StartConnectivityWatchLocked();
+ sd->CheckConnectivityStateAndStartWatchingLocked();
break;
}
case GRPC_CHANNEL_CONNECTING:
@@ -593,6 +569,67 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
GRPC_ERROR_UNREF(error);
}
+void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
+ PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
+ // If we get here, there are two possible cases:
+ // 1. We do not currently have a selected subchannel, and the update is
+ // for a subchannel in p->subchannel_list_ that we're trying to
+ // connect to. The goal here is to find a subchannel that we can
+ // select.
+ // 2. We do currently have a selected subchannel, and the update is
+ // for a subchannel in p->latest_pending_subchannel_list_. The
+ // goal here is to find a subchannel from the update that we can
+ // select in place of the current one.
+ GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
+ subchannel_list() == p->latest_pending_subchannel_list_.get());
+ // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
+ if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "Pick First %p promoting pending subchannel list %p to "
+ "replace %p",
+ p, p->latest_pending_subchannel_list_.get(),
+ p->subchannel_list_.get());
+ }
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
+ }
+ // Cases 1 and 2.
+ grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
+ GRPC_ERROR_NONE, "subchannel_ready");
+ p->selected_ = this;
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
+ }
+ // Drop all other subchannels, since we are now connected.
+ p->DestroyUnselectedSubchannelsLocked();
+ // Update any calls that were waiting for a pick.
+ PickState* pick;
+ while ((pick = p->pending_picks_)) {
+ p->pending_picks_ = pick->next;
+ pick->connected_subchannel = p->selected_->connected_subchannel()->Ref();
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p",
+ p->selected_->subchannel());
+ }
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ }
+}
+
+void PickFirst::PickFirstSubchannelData::
+ CheckConnectivityStateAndStartWatchingLocked() {
+ PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (p->selected_ != this &&
+ CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
+ // We must process the READY subchannel before we start watching it.
+ // Otherwise, we won't know it's READY because we will be waiting for its
+ // connectivity state to change from READY.
+ ProcessUnselectedReadyLocked();
+ }
+ GRPC_ERROR_UNREF(error);
+ StartConnectivityWatchLocked();
+}
+
//
// factory
//