aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc367
1 files changed, 169 insertions, 198 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 9090c34412..ff2140e628 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -62,31 +62,65 @@ class PickFirst : public LoadBalancingPolicy {
private:
~PickFirst();
+ class PickFirstSubchannelList;
+
+ class PickFirstSubchannelData
+ : public SubchannelData<PickFirstSubchannelList,
+ PickFirstSubchannelData> {
+ public:
+ PickFirstSubchannelData(PickFirstSubchannelList* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address,
+ grpc_subchannel* subchannel,
+ grpc_combiner* combiner)
+ : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
+ combiner) {}
+
+ void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) override;
+ };
+
+ class PickFirstSubchannelList
+ : public SubchannelList<PickFirstSubchannelList,
+ PickFirstSubchannelData> {
+ public:
+ PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
+ const grpc_lb_addresses* addresses,
+ grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ const grpc_channel_args& args)
+ : SubchannelList(policy, tracer, addresses, combiner,
+ client_channel_factory, args) {
+ // Need to maintain a ref to the LB policy as long as we maintain
+ // any references to subchannels, since the subchannels'
+ // pollset_sets will include the LB policy's pollset_set.
+ policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
+ }
+
+ ~PickFirstSubchannelList() {
+ PickFirst* p = static_cast<PickFirst*>(policy());
+ p->Unref(DEBUG_LOCATION, "subchannel_list");
+ }
+ };
+
void ShutdownLocked() override;
void StartPickingLocked();
void DestroyUnselectedSubchannelsLocked();
- static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
-
- void SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
- void SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
-
- /** all our subchannels */
- grpc_lb_subchannel_list* subchannel_list_ = nullptr;
- /** latest pending subchannel list */
- grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr;
- /** selected subchannel in \a subchannel_list */
- grpc_lb_subchannel_data* selected_ = nullptr;
- /** have we started picking? */
+ // All our subchannels.
+ OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
+ // Latest pending subchannel list.
+ OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
+ // Selected subchannel in \a subchannel_list_.
+ PickFirstSubchannelData* selected_ = nullptr;
+ // Have we started picking?
bool started_picking_ = false;
- /** are we shut down? */
+ // Are we shut down?
bool shutdown_ = false;
- /** list of picks that are waiting on connectivity */
+ // List of picks that are waiting on connectivity.
PickState* pending_picks_ = nullptr;
- /** our connectivity state tracker */
+ // Our connectivity state tracker.
grpc_connectivity_state_tracker state_tracker_;
};
@@ -95,7 +129,7 @@ PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"pick_first");
if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_DEBUG, "Pick First %p created.", this);
+ gpr_log(GPR_INFO, "Pick First %p created.", this);
}
UpdateLocked(*args.args);
grpc_subchannel_index_ref();
@@ -103,7 +137,7 @@ PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
PickFirst::~PickFirst() {
if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_DEBUG, "Destroying Pick First %p", this);
+ gpr_log(GPR_INFO, "Destroying Pick First %p", this);
}
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
@@ -126,7 +160,7 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
void PickFirst::ShutdownLocked() {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_DEBUG, "Pick First %p Shutting down", this);
+ gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
}
shutdown_ = true;
PickState* pick;
@@ -137,15 +171,8 @@ void PickFirst::ShutdownLocked() {
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "shutdown");
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "pf_shutdown");
- subchannel_list_ = nullptr;
- }
- if (latest_pending_subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(latest_pending_subchannel_list_,
- "pf_shutdown");
- latest_pending_subchannel_list_ = nullptr;
- }
+ subchannel_list_.reset();
+ latest_pending_subchannel_list_.reset();
TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
@@ -192,14 +219,10 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
void PickFirst::StartPickingLocked() {
started_picking_ = true;
- if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels > 0) {
- subchannel_list_->checking_subchannel = 0;
- for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
- if (subchannel_list_->subchannels[i].subchannel != nullptr) {
- SubchannelListRefForConnectivityWatch(
- subchannel_list_, "connectivity_watch+start_picking");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list_->subchannels[i]);
+ if (subchannel_list_ != nullptr) {
+ for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
+ if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
+ subchannel_list_->subchannel(i)->StartConnectivityWatchLocked();
break;
}
}
@@ -215,7 +238,7 @@ void PickFirst::ExitIdleLocked() {
bool PickFirst::PickLocked(PickState* pick) {
// If we have a selected subchannel already, return synchronously.
if (selected_ != nullptr) {
- pick->connected_subchannel = selected_->connected_subchannel;
+ pick->connected_subchannel = selected_->connected_subchannel()->Ref();
return true;
}
// No subchannel selected yet, so handle asynchronously.
@@ -228,11 +251,10 @@ bool PickFirst::PickLocked(PickState* pick) {
}
void PickFirst::DestroyUnselectedSubchannelsLocked() {
- for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
- grpc_lb_subchannel_data* sd = &subchannel_list_->subchannels[i];
+ for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
+ PickFirstSubchannelData* sd = subchannel_list_->subchannel(i);
if (selected_ != sd) {
- grpc_lb_subchannel_data_unref_subchannel(sd,
- "selected_different_subchannel");
+ sd->UnrefSubchannelLocked("selected_different_subchannel");
}
}
}
@@ -249,7 +271,7 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
if (selected_ != nullptr) {
- selected_->connected_subchannel->Ping(on_initiate, on_ack);
+ selected_->connected_subchannel()->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
@@ -258,24 +280,6 @@ void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
}
}
-void PickFirst::SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- // TODO(roth): We currently track this ref manually. Once the new
- // ClosureRef API is ready and the subchannel_list code has been
- // converted to a C++ API, find a way to hold the RefCountedPtr<>
- // somewhere (maybe in the subchannel_data object) instead of doing
- // this manually.
- auto self = Ref(DEBUG_LOCATION, reason);
- self.release();
- grpc_lb_subchannel_list_ref(subchannel_list, reason);
-}
-
-void PickFirst::SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- Unref(DEBUG_LOCATION, reason);
- grpc_lb_subchannel_list_unref(subchannel_list, reason);
-}
-
void PickFirst::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
@@ -295,75 +299,67 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
return;
}
const grpc_lb_addresses* addresses =
- (const grpc_lb_addresses*)arg->value.pointer.p;
+ static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
addresses->num_addresses);
}
- grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
+ auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
this, &grpc_lb_pick_first_trace, addresses, combiner(),
- client_channel_factory(), args, &PickFirst::OnConnectivityChangedLocked);
- if (subchannel_list->num_subchannels == 0) {
+ client_channel_factory(), args);
+ if (subchannel_list->num_subchannels() == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current
// subchannels and put the channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"pf_update_empty");
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "sl_shutdown_empty_update");
- }
- subchannel_list_ = subchannel_list; // Empty list.
+ subchannel_list_ = std::move(subchannel_list); // Empty list.
selected_ = nullptr;
return;
}
if (selected_ == nullptr) {
// We don't yet have a selected subchannel, so replace the current
// subchannel list immediately.
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "pf_update_before_selected");
+ subchannel_list_ = std::move(subchannel_list);
+ // If we've started picking, start trying to connect to the first
+ // subchannel in the new list.
+ if (started_picking_) {
+ subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
}
- subchannel_list_ = subchannel_list;
} else {
// We do have a selected subchannel.
// Check if it's present in the new list. If so, we're done.
- for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
- grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i];
- if (sd->subchannel == selected_->subchannel) {
+ for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
+ PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
+ if (sd->subchannel() == selected_->subchannel()) {
// The currently selected subchannel is in the update: we are done.
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p found already selected subchannel %p "
"at update index %" PRIuPTR " of %" PRIuPTR "; update done",
- this, selected_->subchannel, i,
- subchannel_list->num_subchannels);
- }
- if (selected_->connected_subchannel != nullptr) {
- sd->connected_subchannel = selected_->connected_subchannel;
+ this, selected_->subchannel(), i,
+ subchannel_list->num_subchannels());
}
- selected_ = sd;
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- subchannel_list_, "pf_update_includes_selected");
+ // Make sure it's in state READY. It might not be if we grabbed
+ // the combiner while a connectivity state notification
+ // informing us otherwise is pending.
+ // Note that CheckConnectivityStateLocked() also takes a ref to
+ // the connected subchannel.
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
+ selected_ = sd;
+ subchannel_list_ = std::move(subchannel_list);
+ DestroyUnselectedSubchannelsLocked();
+ sd->StartConnectivityWatchLocked();
+ // If there was a previously pending update (which may or may
+ // not have contained the currently selected subchannel), drop
+ // it, so that it doesn't override what we've done here.
+ latest_pending_subchannel_list_.reset();
+ return;
}
- subchannel_list_ = subchannel_list;
- DestroyUnselectedSubchannelsLocked();
- SubchannelListRefForConnectivityWatch(
- subchannel_list, "connectivity_watch+replace_selected");
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- // If there was a previously pending update (which may or may
- // not have contained the currently selected subchannel), drop
- // it, so that it doesn't override what we've done here.
- if (latest_pending_subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_,
- "pf_update_includes_selected+outdated");
- latest_pending_subchannel_list_ = nullptr;
- }
- return;
+ GRPC_ERROR_UNREF(error);
}
}
// Not keeping the previous selected subchannel, so set the latest
@@ -372,88 +368,66 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// subchannel list.
if (latest_pending_subchannel_list_ != nullptr) {
if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"Pick First %p Shutting down latest pending subchannel list "
"%p, about to be replaced by newer latest %p",
- this, latest_pending_subchannel_list_, subchannel_list);
+ this, latest_pending_subchannel_list_.get(),
+ subchannel_list.get());
}
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_, "sl_outdated_dont_smash");
}
- latest_pending_subchannel_list_ = subchannel_list;
- }
- // If we've started picking, start trying to connect to the first
- // subchannel in the new list.
- if (started_picking_) {
- SubchannelListRefForConnectivityWatch(subchannel_list,
- "connectivity_watch+update");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list->subchannels[0]);
+ latest_pending_subchannel_list_ = std::move(subchannel_list);
+ // If we've started picking, start trying to connect to the first
+ // subchannel in the new list.
+ if (started_picking_) {
+ latest_pending_subchannel_list_->subchannel(0)
+ ->StartConnectivityWatchLocked();
+ }
}
}
-void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
- grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg);
- PickFirst* p = static_cast<PickFirst*>(sd->subchannel_list->policy);
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "Pick First %p connectivity changed for subchannel %p (%" PRIuPTR
- " of %" PRIuPTR
- "), subchannel_list %p: state=%s p->shutdown_=%d "
- "sd->subchannel_list->shutting_down=%d error=%s",
- p, sd->subchannel, sd->subchannel_list->checking_subchannel,
- sd->subchannel_list->num_subchannels, sd->subchannel_list,
- grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
- p->shutdown_, sd->subchannel_list->shutting_down,
- grpc_error_string(error));
- }
- // If the policy is shutting down, unref and return.
- if (p->shutdown_) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "pf_shutdown");
- return;
- }
- // If the subchannel list is shutting down, stop watching.
- if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_sl_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "pf_sl_shutdown");
- return;
- }
- // If we're still here, the notification must be for a subchannel in
- // either the current or latest pending subchannel lists.
- GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ ||
- sd->subchannel_list == p->latest_pending_subchannel_list_);
- // Update state.
- sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
+void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) {
+ PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
+ // The notification must be for a subchannel in either the current or
+ // latest pending subchannel lists.
+ GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
+ subchannel_list() == p->latest_pending_subchannel_list_.get());
// Handle updates for the currently selected subchannel.
- if (p->selected_ == sd) {
+ if (p->selected_ == this) {
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "Pick First %p connectivity changed for selected subchannel", p);
+ }
// If the new state is anything other than READY and there is a
// pending update, switch to the pending update.
- if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
+ if (connectivity_state != GRPC_CHANNEL_READY &&
p->latest_pending_subchannel_list_ != nullptr) {
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "Pick First %p promoting pending subchannel list %p to "
+ "replace %p",
+ p, p->latest_pending_subchannel_list_.get(),
+ p->subchannel_list_.get());
+ }
p->selected_ = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- p->SubchannelListUnrefForConnectivityWatch(
- sd->subchannel_list, "selected_not_ready+switch_to_update");
- grpc_lb_subchannel_list_shutdown_and_unref(
- p->subchannel_list_, "selected_not_ready+switch_to_update");
- p->subchannel_list_ = p->latest_pending_subchannel_list_;
- p->latest_pending_subchannel_list_ = nullptr;
+ StopConnectivityWatchLocked();
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
+ error != GRPC_ERROR_NONE
+ ? GRPC_ERROR_REF(error)
+ : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "selected subchannel not ready; switching to pending "
+ "update"),
+ "selected_not_ready+switch_to_update");
} else {
// TODO(juanlishen): we re-resolve when the selected subchannel goes to
// TRANSIENT_FAILURE because we used to shut down in this case before
// re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected
// subchannel reconnecting.
- GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
- if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
@@ -462,19 +436,16 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
// In transient failure. Rely on re-resolution to recover.
p->selected_ = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "pf_selected_shutdown");
- grpc_lb_subchannel_data_unref_subchannel(
- sd, "pf_selected_shutdown"); // Unrefs connected subchannel
+ UnrefSubchannelLocked("pf_selected_shutdown");
+ StopConnectivityWatchLocked();
} else {
- grpc_connectivity_state_set(&p->state_tracker_,
- sd->curr_connectivity_state,
+ grpc_connectivity_state_set(&p->state_tracker_, connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ RenewConnectivityWatchLocked();
}
}
+ GRPC_ERROR_UNREF(error);
return;
}
// If we get here, there are two possible cases:
@@ -486,26 +457,27 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
- switch (sd->curr_connectivity_state) {
+ switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list_ to
// p->subchannel_list_.
- sd->connected_subchannel =
- grpc_subchannel_get_connected_subchannel(sd->subchannel);
- if (sd->subchannel_list == p->latest_pending_subchannel_list_) {
- GPR_ASSERT(p->subchannel_list_ != nullptr);
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_,
- "finish_update");
- p->subchannel_list_ = p->latest_pending_subchannel_list_;
- p->latest_pending_subchannel_list_ = nullptr;
+ if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "Pick First %p promoting pending subchannel list %p to "
+ "replace %p",
+ p, p->latest_pending_subchannel_list_.get(),
+ p->subchannel_list_.get());
+ }
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready");
- p->selected_ = sd;
+ p->selected_ = this;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
- sd->subchannel);
+ subchannel());
}
// Drop all other subchannels, since we are now connected.
p->DestroyUnselectedSubchannelsLocked();
@@ -513,54 +485,53 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
PickState* pick;
while ((pick = p->pending_picks_)) {
p->pending_picks_ = pick->next;
- pick->connected_subchannel = p->selected_->connected_subchannel;
+ pick->connected_subchannel =
+ p->selected_->connected_subchannel()->Ref();
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
- p->selected_);
+ p->selected_->subchannel());
}
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ RenewConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ StopConnectivityWatchLocked();
+ PickFirstSubchannelData* sd = this;
do {
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
- } while (sd->subchannel == nullptr);
+ size_t next_index =
+ (sd->Index() + 1) % subchannel_list()->num_subchannels();
+ sd = subchannel_list()->subchannel(next_index);
+ } while (sd->subchannel() == nullptr);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
- if (sd->subchannel_list->checking_subchannel == 0 &&
- sd->subchannel_list == p->subchannel_list_) {
+ if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
}
- // Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ sd->StartConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1.
- if (sd->subchannel_list == p->subchannel_list_) {
+ if (subchannel_list() == p->subchannel_list_.get()) {
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_REF(error),
"connecting_changed");
}
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ RenewConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_SHUTDOWN:
GPR_UNREACHABLE_CODE(break);
}
+ GRPC_ERROR_UNREF(error);
}
//