aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-05-02 07:26:12 -0700
committerGravatar GitHub <noreply@github.com>2018-05-02 07:26:12 -0700
commit7df24412237760f9dbae6b1bd7b8f89c95c9c913 (patch)
tree49a029c8d5663b9aa8b29fea38e9c509f4cb7644 /src
parente21c2ef5cc1d64f3934ead91287b9ba0a6b6aa8b (diff)
parent026b20eebba34889892b07a16ebe7ec6b674c404 (diff)
Merge pull request #14886 from markdroth/c++_subchannel_list
Convert subchannel_list code to C++.
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc357
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc726
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc253
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h596
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
5 files changed, 1023 insertions, 910 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index a07e90322c..76df976698 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -62,31 +62,65 @@ class PickFirst : public LoadBalancingPolicy {
private:
~PickFirst();
+ class PickFirstSubchannelList;
+
+ class PickFirstSubchannelData
+ : public SubchannelData<PickFirstSubchannelList,
+ PickFirstSubchannelData> {
+ public:
+ PickFirstSubchannelData(PickFirstSubchannelList* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address,
+ grpc_subchannel* subchannel,
+ grpc_combiner* combiner)
+ : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
+ combiner) {}
+
+ void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) override;
+ };
+
+ class PickFirstSubchannelList
+ : public SubchannelList<PickFirstSubchannelList,
+ PickFirstSubchannelData> {
+ public:
+ PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
+ const grpc_lb_addresses* addresses,
+ grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ const grpc_channel_args& args)
+ : SubchannelList(policy, tracer, addresses, combiner,
+ client_channel_factory, args) {
+ // Need to maintain a ref to the LB policy as long as we maintain
+ // any references to subchannels, since the subchannels'
+ // pollset_sets will include the LB policy's pollset_set.
+ policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
+ }
+
+ ~PickFirstSubchannelList() {
+ PickFirst* p = static_cast<PickFirst*>(policy());
+ p->Unref(DEBUG_LOCATION, "subchannel_list");
+ }
+ };
+
void ShutdownLocked() override;
void StartPickingLocked();
void DestroyUnselectedSubchannelsLocked();
- static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
-
- void SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
- void SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
-
- /** all our subchannels */
- grpc_lb_subchannel_list* subchannel_list_ = nullptr;
- /** latest pending subchannel list */
- grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr;
- /** selected subchannel in \a subchannel_list */
- grpc_lb_subchannel_data* selected_ = nullptr;
- /** have we started picking? */
+ // All our subchannels.
+ OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
+ // Latest pending subchannel list.
+ OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
+ // Selected subchannel in \a subchannel_list_.
+ PickFirstSubchannelData* selected_ = nullptr;
+ // Have we started picking?
bool started_picking_ = false;
- /** are we shut down? */
+ // Are we shut down?
bool shutdown_ = false;
- /** list of picks that are waiting on connectivity */
+ // List of picks that are waiting on connectivity.
PickState* pending_picks_ = nullptr;
- /** our connectivity state tracker */
+ // Our connectivity state tracker.
grpc_connectivity_state_tracker state_tracker_;
};
@@ -137,15 +171,8 @@ void PickFirst::ShutdownLocked() {
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "shutdown");
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "pf_shutdown");
- subchannel_list_ = nullptr;
- }
- if (latest_pending_subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(latest_pending_subchannel_list_,
- "pf_shutdown");
- latest_pending_subchannel_list_ = nullptr;
- }
+ subchannel_list_.reset();
+ latest_pending_subchannel_list_.reset();
TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
@@ -192,14 +219,10 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
void PickFirst::StartPickingLocked() {
started_picking_ = true;
- if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels > 0) {
- subchannel_list_->checking_subchannel = 0;
- for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
- if (subchannel_list_->subchannels[i].subchannel != nullptr) {
- SubchannelListRefForConnectivityWatch(
- subchannel_list_, "connectivity_watch+start_picking");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list_->subchannels[i]);
+ if (subchannel_list_ != nullptr) {
+ for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
+ if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
+ subchannel_list_->subchannel(i)->StartConnectivityWatchLocked();
break;
}
}
@@ -215,7 +238,7 @@ void PickFirst::ExitIdleLocked() {
bool PickFirst::PickLocked(PickState* pick) {
// If we have a selected subchannel already, return synchronously.
if (selected_ != nullptr) {
- pick->connected_subchannel = selected_->connected_subchannel;
+ pick->connected_subchannel = selected_->connected_subchannel()->Ref();
return true;
}
// No subchannel selected yet, so handle asynchronously.
@@ -228,11 +251,10 @@ bool PickFirst::PickLocked(PickState* pick) {
}
void PickFirst::DestroyUnselectedSubchannelsLocked() {
- for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
- grpc_lb_subchannel_data* sd = &subchannel_list_->subchannels[i];
+ for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
+ PickFirstSubchannelData* sd = subchannel_list_->subchannel(i);
if (selected_ != sd) {
- grpc_lb_subchannel_data_unref_subchannel(sd,
- "selected_different_subchannel");
+ sd->UnrefSubchannelLocked("selected_different_subchannel");
}
}
}
@@ -249,7 +271,7 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
if (selected_ != nullptr) {
- selected_->connected_subchannel->Ping(on_initiate, on_ack);
+ selected_->connected_subchannel()->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
@@ -258,24 +280,6 @@ void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
}
}
-void PickFirst::SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- // TODO(roth): We currently track this ref manually. Once the new
- // ClosureRef API is ready and the subchannel_list code has been
- // converted to a C++ API, find a way to hold the RefCountedPtr<>
- // somewhere (maybe in the subchannel_data object) instead of doing
- // this manually.
- auto self = Ref(DEBUG_LOCATION, reason);
- self.release();
- grpc_lb_subchannel_list_ref(subchannel_list, reason);
-}
-
-void PickFirst::SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- Unref(DEBUG_LOCATION, reason);
- grpc_lb_subchannel_list_unref(subchannel_list, reason);
-}
-
void PickFirst::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
@@ -295,75 +299,67 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
return;
}
const grpc_lb_addresses* addresses =
- (const grpc_lb_addresses*)arg->value.pointer.p;
+ static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
addresses->num_addresses);
}
- grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
+ auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
this, &grpc_lb_pick_first_trace, addresses, combiner(),
- client_channel_factory(), args, &PickFirst::OnConnectivityChangedLocked);
- if (subchannel_list->num_subchannels == 0) {
+ client_channel_factory(), args);
+ if (subchannel_list->num_subchannels() == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current
// subchannels and put the channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"pf_update_empty");
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "sl_shutdown_empty_update");
- }
- subchannel_list_ = subchannel_list; // Empty list.
+ subchannel_list_ = std::move(subchannel_list); // Empty list.
selected_ = nullptr;
return;
}
if (selected_ == nullptr) {
// We don't yet have a selected subchannel, so replace the current
// subchannel list immediately.
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "pf_update_before_selected");
+ subchannel_list_ = std::move(subchannel_list);
+ // If we've started picking, start trying to connect to the first
+ // subchannel in the new list.
+ if (started_picking_) {
+ subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
}
- subchannel_list_ = subchannel_list;
} else {
// We do have a selected subchannel.
// Check if it's present in the new list. If so, we're done.
- for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
- grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i];
- if (sd->subchannel == selected_->subchannel) {
+ for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
+ PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
+ if (sd->subchannel() == selected_->subchannel()) {
// The currently selected subchannel is in the update: we are done.
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p found already selected subchannel %p "
"at update index %" PRIuPTR " of %" PRIuPTR "; update done",
- this, selected_->subchannel, i,
- subchannel_list->num_subchannels);
- }
- if (selected_->connected_subchannel != nullptr) {
- sd->connected_subchannel = selected_->connected_subchannel;
- }
- selected_ = sd;
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- subchannel_list_, "pf_update_includes_selected");
+ this, selected_->subchannel(), i,
+ subchannel_list->num_subchannels());
}
- subchannel_list_ = subchannel_list;
- DestroyUnselectedSubchannelsLocked();
- SubchannelListRefForConnectivityWatch(
- subchannel_list, "connectivity_watch+replace_selected");
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- // If there was a previously pending update (which may or may
- // not have contained the currently selected subchannel), drop
- // it, so that it doesn't override what we've done here.
- if (latest_pending_subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_,
- "pf_update_includes_selected+outdated");
- latest_pending_subchannel_list_ = nullptr;
+ // Make sure it's in state READY. It might not be if we grabbed
+ // the combiner while a connectivity state notification
+ // informing us otherwise is pending.
+ // Note that CheckConnectivityStateLocked() also takes a ref to
+ // the connected subchannel.
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
+ selected_ = sd;
+ subchannel_list_ = std::move(subchannel_list);
+ DestroyUnselectedSubchannelsLocked();
+ sd->StartConnectivityWatchLocked();
+ // If there was a previously pending update (which may or may
+ // not have contained the currently selected subchannel), drop
+ // it, so that it doesn't override what we've done here.
+ latest_pending_subchannel_list_.reset();
+ return;
}
- return;
+ GRPC_ERROR_UNREF(error);
}
}
// Not keeping the previous selected subchannel, so set the latest
@@ -375,85 +371,63 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
gpr_log(GPR_INFO,
"Pick First %p Shutting down latest pending subchannel list "
"%p, about to be replaced by newer latest %p",
- this, latest_pending_subchannel_list_, subchannel_list);
+ this, latest_pending_subchannel_list_.get(),
+ subchannel_list.get());
}
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_, "sl_outdated_dont_smash");
}
- latest_pending_subchannel_list_ = subchannel_list;
- }
- // If we've started picking, start trying to connect to the first
- // subchannel in the new list.
- if (started_picking_) {
- SubchannelListRefForConnectivityWatch(subchannel_list,
- "connectivity_watch+update");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list->subchannels[0]);
+ latest_pending_subchannel_list_ = std::move(subchannel_list);
+ // If we've started picking, start trying to connect to the first
+ // subchannel in the new list.
+ if (started_picking_) {
+ latest_pending_subchannel_list_->subchannel(0)
+ ->StartConnectivityWatchLocked();
+ }
}
}
-void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
- grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg);
- PickFirst* p = static_cast<PickFirst*>(sd->subchannel_list->policy);
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO,
- "Pick First %p connectivity changed for subchannel %p (%" PRIuPTR
- " of %" PRIuPTR
- "), subchannel_list %p: state=%s p->shutdown_=%d "
- "sd->subchannel_list->shutting_down=%d error=%s",
- p, sd->subchannel, sd->subchannel_list->checking_subchannel,
- sd->subchannel_list->num_subchannels, sd->subchannel_list,
- grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
- p->shutdown_, sd->subchannel_list->shutting_down,
- grpc_error_string(error));
- }
- // If the policy is shutting down, unref and return.
- if (p->shutdown_) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "pf_shutdown");
- return;
- }
- // If the subchannel list is shutting down, stop watching.
- if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_sl_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "pf_sl_shutdown");
- return;
- }
- // If we're still here, the notification must be for a subchannel in
- // either the current or latest pending subchannel lists.
- GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ ||
- sd->subchannel_list == p->latest_pending_subchannel_list_);
- // Update state.
- sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
+void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) {
+ PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
+ // The notification must be for a subchannel in either the current or
+ // latest pending subchannel lists.
+ GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
+ subchannel_list() == p->latest_pending_subchannel_list_.get());
// Handle updates for the currently selected subchannel.
- if (p->selected_ == sd) {
+ if (p->selected_ == this) {
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "Pick First %p connectivity changed for selected subchannel", p);
+ }
// If the new state is anything other than READY and there is a
// pending update, switch to the pending update.
- if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
+ if (connectivity_state != GRPC_CHANNEL_READY &&
p->latest_pending_subchannel_list_ != nullptr) {
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "Pick First %p promoting pending subchannel list %p to "
+ "replace %p",
+ p, p->latest_pending_subchannel_list_.get(),
+ p->subchannel_list_.get());
+ }
p->selected_ = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- p->SubchannelListUnrefForConnectivityWatch(
- sd->subchannel_list, "selected_not_ready+switch_to_update");
- grpc_lb_subchannel_list_shutdown_and_unref(
- p->subchannel_list_, "selected_not_ready+switch_to_update");
- p->subchannel_list_ = p->latest_pending_subchannel_list_;
- p->latest_pending_subchannel_list_ = nullptr;
+ StopConnectivityWatchLocked();
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
+ error != GRPC_ERROR_NONE
+ ? GRPC_ERROR_REF(error)
+ : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "selected subchannel not ready; switching to pending "
+ "update"),
+ "selected_not_ready+switch_to_update");
} else {
// TODO(juanlishen): we re-resolve when the selected subchannel goes to
// TRANSIENT_FAILURE because we used to shut down in this case before
// re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected
// subchannel reconnecting.
- GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
- if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
@@ -462,19 +436,16 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
// In transient failure. Rely on re-resolution to recover.
p->selected_ = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "pf_selected_shutdown");
- grpc_lb_subchannel_data_unref_subchannel(
- sd, "pf_selected_shutdown"); // Unrefs connected subchannel
+ UnrefSubchannelLocked("pf_selected_shutdown");
+ StopConnectivityWatchLocked();
} else {
- grpc_connectivity_state_set(&p->state_tracker_,
- sd->curr_connectivity_state,
+ grpc_connectivity_state_set(&p->state_tracker_, connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ RenewConnectivityWatchLocked();
}
}
+ GRPC_ERROR_UNREF(error);
return;
}
// If we get here, there are two possible cases:
@@ -486,26 +457,27 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
- switch (sd->curr_connectivity_state) {
+ switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list_ to
// p->subchannel_list_.
- sd->connected_subchannel =
- grpc_subchannel_get_connected_subchannel(sd->subchannel);
- if (sd->subchannel_list == p->latest_pending_subchannel_list_) {
- GPR_ASSERT(p->subchannel_list_ != nullptr);
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_,
- "finish_update");
- p->subchannel_list_ = p->latest_pending_subchannel_list_;
- p->latest_pending_subchannel_list_ = nullptr;
+ if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "Pick First %p promoting pending subchannel list %p to "
+ "replace %p",
+ p, p->latest_pending_subchannel_list_.get(),
+ p->subchannel_list_.get());
+ }
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready");
- p->selected_ = sd;
+ p->selected_ = this;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
- sd->subchannel);
+ subchannel());
}
// Drop all other subchannels, since we are now connected.
p->DestroyUnselectedSubchannelsLocked();
@@ -513,7 +485,8 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
PickState* pick;
while ((pick = p->pending_picks_)) {
p->pending_picks_ = pick->next;
- pick->connected_subchannel = p->selected_->connected_subchannel;
+ pick->connected_subchannel =
+ p->selected_->connected_subchannel()->Ref();
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
@@ -522,45 +495,43 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ RenewConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ StopConnectivityWatchLocked();
+ PickFirstSubchannelData* sd = this;
do {
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
- } while (sd->subchannel == nullptr);
+ size_t next_index =
+ (sd->Index() + 1) % subchannel_list()->num_subchannels();
+ sd = subchannel_list()->subchannel(next_index);
+ } while (sd->subchannel() == nullptr);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
- if (sd->subchannel_list->checking_subchannel == 0 &&
- sd->subchannel_list == p->subchannel_list_) {
+ if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
}
- // Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ sd->StartConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1.
- if (sd->subchannel_list == p->subchannel_list_) {
+ if (subchannel_list() == p->subchannel_list_.get()) {
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_REF(error),
"connecting_changed");
}
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ RenewConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_SHUTDOWN:
GPR_UNREACHABLE_CODE(break);
}
+ GRPC_ERROR_UNREF(error);
}
//
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index b470016bd9..79e8ad5663 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -73,23 +73,127 @@ class RoundRobin : public LoadBalancingPolicy {
private:
~RoundRobin();
- void ShutdownLocked() override;
+ // Forward declaration.
+ class RoundRobinSubchannelList;
+
+ // Data for a particular subchannel in a subchannel list.
+ // This subclass adds the following functionality:
+ // - Tracks user_data associated with each address, which will be
+ // returned along with picks that select the subchannel.
+ // - Tracks the previous connectivity state of the subchannel, so that
+ // we know how many subchannels are in each state.
+ class RoundRobinSubchannelData
+ : public SubchannelData<RoundRobinSubchannelList,
+ RoundRobinSubchannelData> {
+ public:
+ RoundRobinSubchannelData(RoundRobinSubchannelList* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address,
+ grpc_subchannel* subchannel,
+ grpc_combiner* combiner)
+ : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
+ combiner),
+ user_data_vtable_(user_data_vtable),
+ user_data_(user_data_vtable_ != nullptr
+ ? user_data_vtable_->copy(address.user_data)
+ : nullptr) {}
+
+ void UnrefSubchannelLocked(const char* reason) override {
+ SubchannelData::UnrefSubchannelLocked(reason);
+ if (user_data_ != nullptr) {
+ GPR_ASSERT(user_data_vtable_ != nullptr);
+ user_data_vtable_->destroy(user_data_);
+ user_data_ = nullptr;
+ }
+ }
- void StartPickingLocked();
- size_t GetNextReadySubchannelIndexLocked();
- void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
- void UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
- grpc_error* error);
+ void* user_data() const { return user_data_; }
+
+ grpc_connectivity_state connectivity_state() const {
+ return last_connectivity_state_;
+ }
+
+ void UpdateConnectivityStateLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error);
+
+ private:
+ void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) override;
+
+ const grpc_lb_user_data_vtable* user_data_vtable_;
+ void* user_data_ = nullptr;
+ grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
+ };
+
+ // A list of subchannels.
+ class RoundRobinSubchannelList
+ : public SubchannelList<RoundRobinSubchannelList,
+ RoundRobinSubchannelData> {
+ public:
+ RoundRobinSubchannelList(
+ RoundRobin* policy, TraceFlag* tracer,
+ const grpc_lb_addresses* addresses, grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ const grpc_channel_args& args)
+ : SubchannelList(policy, tracer, addresses, combiner,
+ client_channel_factory, args) {
+ // Need to maintain a ref to the LB policy as long as we maintain
+ // any references to subchannels, since the subchannels'
+ // pollset_sets will include the LB policy's pollset_set.
+ policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
+ }
+
+ ~RoundRobinSubchannelList() {
+ GRPC_ERROR_UNREF(last_transient_failure_error_);
+ RoundRobin* p = static_cast<RoundRobin*>(policy());
+ p->Unref(DEBUG_LOCATION, "subchannel_list");
+ }
- static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
+ // Starts watching the subchannels in this list.
+ void StartWatchingLocked();
+
+ // Updates the counters of subchannels in each state when a
+ // subchannel transitions from old_state to new_state.
+ // transient_failure_error is the error that is reported when
+ // new_state is TRANSIENT_FAILURE.
+ void UpdateStateCountersLocked(grpc_connectivity_state old_state,
+ grpc_connectivity_state new_state,
+ grpc_error* transient_failure_error);
+
+ // If this subchannel list is the RR policy's current subchannel
+ // list, updates the RR policy's connectivity state based on the
+ // subchannel list's state counters.
+ void MaybeUpdateRoundRobinConnectivityStateLocked();
+
+ // Updates the RR policy's overall state based on the counters of
+ // subchannels in each state.
+ void UpdateRoundRobinStateFromSubchannelStateCountsLocked();
+
+ size_t GetNextReadySubchannelIndexLocked();
+ void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
+
+ private:
+ size_t num_ready_ = 0;
+ size_t num_connecting_ = 0;
+ size_t num_transient_failure_ = 0;
+ grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE;
+ size_t last_ready_index_ = -1; // Index into list of last pick.
+ };
- void SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
- void SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
+ void ShutdownLocked() override;
+
+ void StartPickingLocked();
+ bool DoPickLocked(PickState* pick);
+ void DrainPendingPicksLocked();
/** list of subchannels */
- grpc_lb_subchannel_list* subchannel_list_ = nullptr;
+ OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
+ /** Latest version of the subchannel list.
+ * Subchannel connectivity callbacks will only promote updated subchannel
+ * lists if they equal \a latest_pending_subchannel_list. In other words,
+ * racing callbacks that reference outdated subchannel lists won't perform any
+ * update. */
+ OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
/** have we started picking? */
bool started_picking_ = false;
/** are we shutting down? */
@@ -98,14 +202,6 @@ class RoundRobin : public LoadBalancingPolicy {
PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker_;
- /** Index into subchannels for last pick. */
- size_t last_ready_subchannel_index_ = 0;
- /** Latest version of the subchannel list.
- * Subchannel connectivity callbacks will only promote updated subchannel
- * lists if they equal \a latest_pending_subchannel_list. In other words,
- * racing callbacks that reference outdated subchannel lists won't perform any
- * update. */
- grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr;
};
RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
@@ -115,7 +211,7 @@ RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
UpdateLocked(*args.args);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Created with %" PRIuPTR " subchannels", this,
- subchannel_list_->num_subchannels);
+ subchannel_list_->num_subchannels());
}
grpc_subchannel_index_ref();
}
@@ -131,68 +227,6 @@ RoundRobin::~RoundRobin() {
grpc_subchannel_index_unref();
}
-/** Returns the index into p->subchannel_list->subchannels of the next
- * subchannel in READY state, or p->subchannel_list->num_subchannels if no
- * subchannel is READY.
- *
- * Note that this function does *not* update p->last_ready_subchannel_index.
- * The caller must do that if it returns a pick. */
-size_t RoundRobin::GetNextReadySubchannelIndexLocked() {
- GPR_ASSERT(subchannel_list_ != nullptr);
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[RR %p] getting next ready subchannel (out of %" PRIuPTR
- "), "
- "last_ready_subchannel_index=%" PRIuPTR,
- this, subchannel_list_->num_subchannels,
- last_ready_subchannel_index_);
- }
- for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
- const size_t index = (i + last_ready_subchannel_index_ + 1) %
- subchannel_list_->num_subchannels;
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(
- GPR_INFO,
- "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR
- ": state=%s",
- this, subchannel_list_->subchannels[index].subchannel,
- subchannel_list_, index,
- grpc_connectivity_state_name(
- subchannel_list_->subchannels[index].curr_connectivity_state));
- }
- if (subchannel_list_->subchannels[index].curr_connectivity_state ==
- GRPC_CHANNEL_READY) {
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR
- " of subchannel_list %p",
- this, subchannel_list_->subchannels[index].subchannel, index,
- subchannel_list_);
- }
- return index;
- }
- }
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_INFO, "[RR %p] no subchannels in ready state", this);
- }
- return subchannel_list_->num_subchannels;
-}
-
-// Sets last_ready_subchannel_index_ to last_ready_index.
-void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) {
- GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels);
- last_ready_subchannel_index_ = last_ready_index;
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR
- " (SC %p, CSC %p)",
- this, last_ready_index,
- subchannel_list_->subchannels[last_ready_index].subchannel,
- subchannel_list_->subchannels[last_ready_index]
- .connected_subchannel.get());
- }
-}
-
void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
@@ -218,16 +252,8 @@ void RoundRobin::ShutdownLocked() {
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "rr_shutdown");
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "sl_shutdown_rr_shutdown");
- subchannel_list_ = nullptr;
- }
- if (latest_pending_subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_, "sl_shutdown_pending_rr_shutdown");
- latest_pending_subchannel_list_ = nullptr;
- }
+ subchannel_list_.reset();
+ latest_pending_subchannel_list_.reset();
TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
@@ -273,39 +299,49 @@ void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
GRPC_ERROR_UNREF(error);
}
-void RoundRobin::SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- // TODO(roth): We currently track this ref manually. Once the new
- // ClosureRef API is ready and the subchannel_list code has been
- // converted to a C++ API, find a way to hold the RefCountedPtr<>
- // somewhere (maybe in the subchannel_data object) instead of doing
- // this manually.
- auto self = Ref(DEBUG_LOCATION, reason);
- self.release();
- grpc_lb_subchannel_list_ref(subchannel_list, reason);
+void RoundRobin::StartPickingLocked() {
+ started_picking_ = true;
+ subchannel_list_->StartWatchingLocked();
}
-void RoundRobin::SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- Unref(DEBUG_LOCATION, reason);
- grpc_lb_subchannel_list_unref(subchannel_list, reason);
+void RoundRobin::ExitIdleLocked() {
+ if (!started_picking_) {
+ StartPickingLocked();
+ }
}
-void RoundRobin::StartPickingLocked() {
- started_picking_ = true;
- for (size_t i = 0; i < subchannel_list_->num_subchannels; i++) {
- if (subchannel_list_->subchannels[i].subchannel != nullptr) {
- SubchannelListRefForConnectivityWatch(subchannel_list_,
- "connectivity_watch");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list_->subchannels[i]);
+bool RoundRobin::DoPickLocked(PickState* pick) {
+ const size_t next_ready_index =
+ subchannel_list_->GetNextReadySubchannelIndexLocked();
+ if (next_ready_index < subchannel_list_->num_subchannels()) {
+ /* readily available, report right away */
+ RoundRobinSubchannelData* sd =
+ subchannel_list_->subchannel(next_ready_index);
+ GPR_ASSERT(sd->connected_subchannel() != nullptr);
+ pick->connected_subchannel = sd->connected_subchannel()->Ref();
+ if (pick->user_data != nullptr) {
+ *pick->user_data = sd->user_data();
}
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
+ "index %" PRIuPTR ")",
+ this, sd->subchannel(), pick->connected_subchannel.get(),
+ sd->subchannel_list(), next_ready_index);
+ }
+ /* only advance the last picked pointer if the selection was used */
+ subchannel_list_->UpdateLastReadySubchannelIndexLocked(next_ready_index);
+ return true;
}
+ return false;
}
-void RoundRobin::ExitIdleLocked() {
- if (!started_picking_) {
- StartPickingLocked();
+void RoundRobin::DrainPendingPicksLocked() {
+ PickState* pick;
+ while ((pick = pending_picks_)) {
+ pending_picks_ = pick->next;
+ GPR_ASSERT(DoPickLocked(pick));
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
}
@@ -315,27 +351,7 @@ bool RoundRobin::PickLocked(PickState* pick) {
}
GPR_ASSERT(!shutdown_);
if (subchannel_list_ != nullptr) {
- const size_t next_ready_index = GetNextReadySubchannelIndexLocked();
- if (next_ready_index < subchannel_list_->num_subchannels) {
- /* readily available, report right away */
- grpc_lb_subchannel_data* sd =
- &subchannel_list_->subchannels[next_ready_index];
- pick->connected_subchannel = sd->connected_subchannel;
- if (pick->user_data != nullptr) {
- *pick->user_data = sd->user_data;
- }
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(
- GPR_INFO,
- "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
- "index %" PRIuPTR ")",
- this, sd->subchannel, pick->connected_subchannel.get(),
- sd->subchannel_list, next_ready_index);
- }
- /* only advance the last picked pointer if the selection was used */
- UpdateLastReadySubchannelIndexLocked(next_ready_index);
- return true;
- }
+ if (DoPickLocked(pick)) return true;
}
/* no pick currently available. Save for later in list of pending picks */
if (!started_picking_) {
@@ -346,36 +362,62 @@ bool RoundRobin::PickLocked(PickState* pick) {
return false;
}
-void UpdateStateCountersLocked(grpc_lb_subchannel_data* sd) {
- grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
- GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
- GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
- if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
- GPR_ASSERT(subchannel_list->num_ready > 0);
- --subchannel_list->num_ready;
- } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- GPR_ASSERT(subchannel_list->num_transient_failures > 0);
- --subchannel_list->num_transient_failures;
- } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
- GPR_ASSERT(subchannel_list->num_idle > 0);
- --subchannel_list->num_idle;
+void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
+ if (num_subchannels() == 0) return;
+ // Check current state of each subchannel synchronously, since any
+ // subchannel already used by some other channel may have a non-IDLE
+ // state.
+ for (size_t i = 0; i < num_subchannels(); ++i) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ grpc_connectivity_state state =
+ subchannel(i)->CheckConnectivityStateLocked(&error);
+ if (state != GRPC_CHANNEL_IDLE) {
+ subchannel(i)->UpdateConnectivityStateLocked(state, error);
+ }
}
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
- ++subchannel_list->num_ready;
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- ++subchannel_list->num_transient_failures;
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
- ++subchannel_list->num_idle;
+ // Now set the LB policy's state based on the subchannels' states.
+ UpdateRoundRobinStateFromSubchannelStateCountsLocked();
+ // Start connectivity watch for each subchannel.
+ for (size_t i = 0; i < num_subchannels(); i++) {
+ if (subchannel(i)->subchannel() != nullptr) {
+ subchannel(i)->StartConnectivityWatchLocked();
+ }
}
}
-/** Sets the policy's connectivity status based on that of the passed-in \a sd
- * (the grpc_lb_subchannel_data associated with the updated subchannel) and the
- * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used
- * only if the policy transitions to state TRANSIENT_FAILURE. */
-void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
- grpc_error* error) {
+void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
+ grpc_connectivity_state old_state, grpc_connectivity_state new_state,
+ grpc_error* transient_failure_error) {
+ GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN);
+ GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
+ if (old_state == GRPC_CHANNEL_READY) {
+ GPR_ASSERT(num_ready_ > 0);
+ --num_ready_;
+ } else if (old_state == GRPC_CHANNEL_CONNECTING) {
+ GPR_ASSERT(num_connecting_ > 0);
+ --num_connecting_;
+ } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(num_transient_failure_ > 0);
+ --num_transient_failure_;
+ }
+ if (new_state == GRPC_CHANNEL_READY) {
+ ++num_ready_;
+ } else if (new_state == GRPC_CHANNEL_CONNECTING) {
+ ++num_connecting_;
+ } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ ++num_transient_failure_;
+ }
+ GRPC_ERROR_UNREF(last_transient_failure_error_);
+ last_transient_failure_error_ = transient_failure_error;
+}
+
+// Sets the RR policy's connectivity state based on the current
+// subchannel list.
+void RoundRobin::RoundRobinSubchannelList::
+ MaybeUpdateRoundRobinConnectivityStateLocked() {
+ RoundRobin* p = static_cast<RoundRobin*>(policy());
+ // Only set connectivity state if this is the current subchannel list.
+ if (p->subchannel_list_.get() != this) return;
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
@@ -390,155 +432,151 @@ void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
* CHECK: subchannel_list->num_transient_failures ==
* subchannel_list->num_subchannels.
*/
- grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
- GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE);
- if (subchannel_list->num_ready > 0) {
+ if (num_ready_ > 0) {
/* 1) READY */
- grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_READY,
+ grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) {
+ } else if (num_connecting_ > 0) {
/* 2) CONNECTING */
- grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_CONNECTING,
+ grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "rr_connecting");
- } else if (subchannel_list->num_transient_failures ==
- subchannel_list->num_subchannels) {
+ } else if (num_transient_failure_ == num_subchannels()) {
/* 3) TRANSIENT_FAILURE */
- grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error),
+ grpc_connectivity_state_set(&p->state_tracker_,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(last_transient_failure_error_),
"rr_exhausted_subchannels");
}
- GRPC_ERROR_UNREF(error);
}
-void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
- grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg);
- RoundRobin* p = static_cast<RoundRobin*>(sd->subchannel_list->policy);
+void RoundRobin::RoundRobinSubchannelList::
+ UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
+ RoundRobin* p = static_cast<RoundRobin*>(policy());
+ if (num_ready_ > 0) {
+ if (p->subchannel_list_.get() != this) {
+ // Promote this list to p->subchannel_list_.
+ // This list must be p->latest_pending_subchannel_list_, because
+ // any previous update would have been shut down already and
+ // therefore we would not be receiving a notification for them.
+ GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this);
+ GPR_ASSERT(!shutting_down());
+ if (grpc_lb_round_robin_trace.enabled()) {
+ const size_t old_num_subchannels =
+ p->subchannel_list_ != nullptr
+ ? p->subchannel_list_->num_subchannels()
+ : 0;
+ gpr_log(GPR_INFO,
+ "[RR %p] phasing out subchannel list %p (size %" PRIuPTR
+ ") in favor of %p (size %" PRIuPTR ")",
+ p, p->subchannel_list_.get(), old_num_subchannels, this,
+ num_subchannels());
+ }
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
+ }
+ // Drain pending picks.
+ p->DrainPendingPicksLocked();
+ }
+ // Update the RR policy's connectivity state if needed.
+ MaybeUpdateRoundRobinConnectivityStateLocked();
+}
+
+void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) {
+ RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_INFO,
- "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: "
- "prev_state=%s new_state=%s p->shutdown=%d "
- "sd->subchannel_list->shutting_down=%d error=%s",
- p, sd->subchannel, sd->subchannel_list,
- grpc_connectivity_state_name(sd->prev_connectivity_state),
- grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
- p->shutdown_, sd->subchannel_list->shutting_down,
- grpc_error_string(error));
- }
- GPR_ASSERT(sd->subchannel != nullptr);
- // If the policy is shutting down, unref and return.
- if (p->shutdown_) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "rr_shutdown");
- return;
+ "[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
+ "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
+ p, subchannel(), subchannel_list(), Index(),
+ subchannel_list()->num_subchannels(),
+ grpc_connectivity_state_name(last_connectivity_state_),
+ grpc_connectivity_state_name(connectivity_state));
+ }
+ subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
+ connectivity_state, error);
+ last_connectivity_state_ = connectivity_state;
+}
+
+void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) {
+ RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
+ GPR_ASSERT(subchannel() != nullptr);
+ // If the new state is TRANSIENT_FAILURE, re-resolve.
+ // Only do this if we've started watching, not at startup time.
+ // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
+ // when the subchannel list was created, we'd wind up in a constant
+ // loop of re-resolution.
+ if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
+ "Requesting re-resolution",
+ p, subchannel());
+ }
+ p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
}
- // If the subchannel list is shutting down, stop watching.
- if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "rr_sl_shutdown");
- return;
+ // Update state counters.
+ UpdateConnectivityStateLocked(connectivity_state, error);
+ // Update overall state and renew notification.
+ subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
+ RenewConnectivityWatchLocked();
+}
+
+/** Returns the index into p->subchannel_list->subchannels of the next
+ * subchannel in READY state, or p->subchannel_list->num_subchannels if no
+ * subchannel is READY.
+ *
+ * Note that this function does *not* update p->last_ready_subchannel_index.
+ * The caller must do that if it returns a pick. */
+size_t
+RoundRobin::RoundRobinSubchannelList::GetNextReadySubchannelIndexLocked() {
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[RR %p] getting next ready subchannel (out of %" PRIuPTR
+ "), last_ready_index=%" PRIuPTR,
+ policy(), num_subchannels(), last_ready_index_);
}
- // If we're still here, the notification must be for a subchannel in
- // either the current or latest pending subchannel lists.
- GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ ||
- sd->subchannel_list == p->latest_pending_subchannel_list_);
- GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN);
- // Now that we're inside the combiner, copy the pending connectivity
- // state (which was set by the connectivity state watcher) to
- // curr_connectivity_state, which is what we use inside of the combiner.
- sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
- // If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
- // subchannel, if any.
- switch (sd->curr_connectivity_state) {
- case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- sd->connected_subchannel.reset();
+ for (size_t i = 0; i < num_subchannels(); ++i) {
+ const size_t index = (i + last_ready_index_ + 1) % num_subchannels();
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(
+ GPR_INFO,
+ "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR
+ ": state=%s",
+ policy(), subchannel(index)->subchannel(), this, index,
+ grpc_connectivity_state_name(
+ subchannel(index)->connectivity_state()));
+ }
+ if (subchannel(index)->connectivity_state() == GRPC_CHANNEL_READY) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
- "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
- "Requesting re-resolution",
- p, sd->subchannel);
- }
- p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
- break;
- }
- case GRPC_CHANNEL_READY: {
- if (sd->connected_subchannel == nullptr) {
- sd->connected_subchannel =
- grpc_subchannel_get_connected_subchannel(sd->subchannel);
- }
- if (sd->subchannel_list != p->subchannel_list_) {
- // promote sd->subchannel_list to p->subchannel_list_.
- // sd->subchannel_list must be equal to
- // p->latest_pending_subchannel_list_ because we have already filtered
- // for sds belonging to outdated subchannel lists.
- GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list_);
- GPR_ASSERT(!sd->subchannel_list->shutting_down);
- if (grpc_lb_round_robin_trace.enabled()) {
- const size_t num_subchannels =
- p->subchannel_list_ != nullptr
- ? p->subchannel_list_->num_subchannels
- : 0;
- gpr_log(GPR_INFO,
- "[RR %p] phasing out subchannel list %p (size %" PRIuPTR
- ") in favor of %p (size %" PRIuPTR ")",
- p, p->subchannel_list_, num_subchannels, sd->subchannel_list,
- num_subchannels);
- }
- if (p->subchannel_list_ != nullptr) {
- // dispose of the current subchannel_list
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_,
- "sl_phase_out_shutdown");
- }
- p->subchannel_list_ = p->latest_pending_subchannel_list_;
- p->latest_pending_subchannel_list_ = nullptr;
- }
- /* at this point we know there's at least one suitable subchannel. Go
- * ahead and pick one and notify the pending suitors in
- * p->pending_picks. This preemptively replicates rr_pick()'s actions. */
- const size_t next_ready_index = p->GetNextReadySubchannelIndexLocked();
- GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels);
- grpc_lb_subchannel_data* selected =
- &p->subchannel_list_->subchannels[next_ready_index];
- if (p->pending_picks_ != nullptr) {
- // if the selected subchannel is going to be used for the pending
- // picks, update the last picked pointer
- p->UpdateLastReadySubchannelIndexLocked(next_ready_index);
- }
- PickState* pick;
- while ((pick = p->pending_picks_)) {
- p->pending_picks_ = pick->next;
- pick->connected_subchannel = selected->connected_subchannel;
- if (pick->user_data != nullptr) {
- *pick->user_data = selected->user_data;
- }
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[RR %p] Fulfilling pending pick. Target <-- subchannel %p "
- "(subchannel_list %p, index %" PRIuPTR ")",
- p, selected->subchannel, p->subchannel_list_,
- next_ready_index);
- }
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR
+ " of subchannel_list %p",
+ policy(), subchannel(index)->subchannel(), index, this);
}
- break;
+ return index;
}
- case GRPC_CHANNEL_SHUTDOWN:
- GPR_UNREACHABLE_CODE(return );
- case GRPC_CHANNEL_CONNECTING:
- case GRPC_CHANNEL_IDLE:; // fallthrough
}
- // Update state counters.
- UpdateStateCountersLocked(sd);
- // Only update connectivity based on the selected subchannel list.
- if (sd->subchannel_list == p->subchannel_list_) {
- p->UpdateConnectivityStatusLocked(sd, GRPC_ERROR_REF(error));
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_INFO, "[RR %p] no subchannels in ready state", this);
+ }
+ return num_subchannels();
+}
+
+// Sets last_ready_index_ to last_ready_index.
+void RoundRobin::RoundRobinSubchannelList::UpdateLastReadySubchannelIndexLocked(
+ size_t last_ready_index) {
+ GPR_ASSERT(last_ready_index < num_subchannels());
+ last_ready_index_ = last_ready_index;
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR
+ " (SC %p, CSC %p)",
+ policy(), last_ready_index,
+ subchannel(last_ready_index)->subchannel(),
+ subchannel(last_ready_index)->connected_subchannel());
}
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
grpc_connectivity_state RoundRobin::CheckConnectivityLocked(
@@ -554,11 +592,12 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
grpc_closure* on_ack) {
- const size_t next_ready_index = GetNextReadySubchannelIndexLocked();
- if (next_ready_index < subchannel_list_->num_subchannels) {
- grpc_lb_subchannel_data* selected =
- &subchannel_list_->subchannels[next_ready_index];
- selected->connected_subchannel->Ping(on_initiate, on_ack);
+ const size_t next_ready_index =
+ subchannel_list_->GetNextReadySubchannelIndexLocked();
+ if (next_ready_index < subchannel_list_->num_subchannels()) {
+ RoundRobinSubchannelData* selected =
+ subchannel_list_->subchannel(next_ready_index);
+ selected->connected_subchannel()->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));
@@ -581,80 +620,37 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
}
return;
}
- grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p;
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
this, addresses->num_addresses);
}
- grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
- this, &grpc_lb_round_robin_trace, addresses, combiner(),
- client_channel_factory(), args, &RoundRobin::OnConnectivityChangedLocked);
- if (subchannel_list->num_subchannels == 0) {
- grpc_connectivity_state_set(
- &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
- "rr_update_empty");
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "sl_shutdown_empty_update");
+ // Replace latest_pending_subchannel_list_.
+ if (latest_pending_subchannel_list_ != nullptr) {
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[RR %p] Shutting down previous pending subchannel list %p", this,
+ latest_pending_subchannel_list_.get());
}
- subchannel_list_ = subchannel_list; // empty list
- return;
}
- if (started_picking_) {
- for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
- const grpc_connectivity_state subchannel_state =
- grpc_subchannel_check_connectivity(
- subchannel_list->subchannels[i].subchannel, nullptr);
- // Override the default setting of IDLE for connectivity notification
- // purposes if the subchannel is already in transient failure. Otherwise
- // we'd be immediately notified of the IDLE-TRANSIENT_FAILURE
- // discrepancy, attempt to re-resolve and end up here again.
- // TODO(roth): As part of C++-ifying the subchannel_list API, design a
- // better API for notifying the LB policy of subchannel states, which can
- // be used both for the subchannel's initial state and for subsequent
- // state changes. This will allow us to handle this more generally instead
- // of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any
- // pending picks across all READY subchannels rather than sending them all
- // to the first one).
- if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- subchannel_list->subchannels[i].pending_connectivity_state_unsafe =
- subchannel_list->subchannels[i].curr_connectivity_state =
- subchannel_list->subchannels[i].prev_connectivity_state =
- subchannel_state;
- --subchannel_list->num_idle;
- ++subchannel_list->num_transient_failures;
- }
- }
- if (latest_pending_subchannel_list_ != nullptr) {
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[RR %p] Shutting down latest pending subchannel list %p, "
- "about to be replaced by newer latest %p",
- this, latest_pending_subchannel_list_, subchannel_list);
- }
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_, "sl_outdated");
- }
- latest_pending_subchannel_list_ = subchannel_list;
- for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
- /* Watch every new subchannel. A subchannel list becomes active the
- * moment one of its subchannels is READY. At that moment, we swap
- * p->subchannel_list for sd->subchannel_list, provided the subchannel
- * list is still valid (ie, isn't shutting down) */
- SubchannelListRefForConnectivityWatch(subchannel_list,
- "connectivity_watch");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list->subchannels[i]);
+ latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
+ this, &grpc_lb_round_robin_trace, addresses, combiner(),
+ client_channel_factory(), args);
+ // If we haven't started picking yet or the new list is empty,
+ // immediately promote the new list to the current list.
+ if (!started_picking_ ||
+ latest_pending_subchannel_list_->num_subchannels() == 0) {
+ if (latest_pending_subchannel_list_->num_subchannels() == 0) {
+ grpc_connectivity_state_set(
+ &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
+ "rr_update_empty");
}
+ subchannel_list_ = std::move(latest_pending_subchannel_list_);
} else {
- // The policy isn't picking yet. Save the update for later, disposing of
- // previous version if any.
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- subchannel_list_, "rr_update_before_started_picking");
- }
- subchannel_list_ = subchannel_list;
+ // If we've started picking, start watching the new list.
+ latest_pending_subchannel_list_->StartWatchingLocked();
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
deleted file mode 100644
index 257db57575..0000000000
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-
-#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
-#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/combiner.h"
-#include "src/core/lib/iomgr/sockaddr_utils.h"
-#include "src/core/lib/transport/connectivity_state.h"
-
-void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
- const char* reason) {
- if (sd->subchannel != nullptr) {
- if (sd->subchannel_list->tracer->enabled()) {
- gpr_log(GPR_INFO,
- "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
- " (subchannel %p): unreffing subchannel",
- sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
- sd->subchannel_list,
- static_cast<size_t>(sd - sd->subchannel_list->subchannels),
- sd->subchannel_list->num_subchannels, sd->subchannel);
- }
- GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason);
- sd->subchannel = nullptr;
- sd->connected_subchannel.reset();
- if (sd->user_data != nullptr) {
- GPR_ASSERT(sd->user_data_vtable != nullptr);
- sd->user_data_vtable->destroy(sd->user_data);
- sd->user_data = nullptr;
- }
- }
-}
-
-void grpc_lb_subchannel_data_start_connectivity_watch(
- grpc_lb_subchannel_data* sd) {
- if (sd->subchannel_list->tracer->enabled()) {
- gpr_log(
- GPR_INFO,
- "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
- " (subchannel %p): requesting connectivity change "
- "notification (from %s)",
- sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
- sd->subchannel_list,
- static_cast<size_t>(sd - sd->subchannel_list->subchannels),
- sd->subchannel_list->num_subchannels, sd->subchannel,
- grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe));
- }
- sd->connectivity_notification_pending = true;
- grpc_subchannel_notify_on_state_change(
- sd->subchannel, sd->subchannel_list->policy->interested_parties(),
- &sd->pending_connectivity_state_unsafe,
- &sd->connectivity_changed_closure);
-}
-
-void grpc_lb_subchannel_data_stop_connectivity_watch(
- grpc_lb_subchannel_data* sd) {
- if (sd->subchannel_list->tracer->enabled()) {
- gpr_log(GPR_INFO,
- "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
- " (subchannel %p): stopping connectivity watch",
- sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
- sd->subchannel_list,
- static_cast<size_t>(sd - sd->subchannel_list->subchannels),
- sd->subchannel_list->num_subchannels, sd->subchannel);
- }
- GPR_ASSERT(sd->connectivity_notification_pending);
- sd->connectivity_notification_pending = false;
-}
-
-grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
- grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer,
- const grpc_lb_addresses* addresses, grpc_combiner* combiner,
- grpc_client_channel_factory* client_channel_factory,
- const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb) {
- grpc_lb_subchannel_list* subchannel_list =
- static_cast<grpc_lb_subchannel_list*>(
- gpr_zalloc(sizeof(*subchannel_list)));
- if (tracer->enabled()) {
- gpr_log(GPR_INFO,
- "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
- tracer->name(), p, subchannel_list, addresses->num_addresses);
- }
- subchannel_list->policy = p;
- subchannel_list->tracer = tracer;
- gpr_ref_init(&subchannel_list->refcount, 1);
- subchannel_list->subchannels = static_cast<grpc_lb_subchannel_data*>(
- gpr_zalloc(sizeof(grpc_lb_subchannel_data) * addresses->num_addresses));
- // We need to remove the LB addresses in order to be able to compare the
- // subchannel keys of subchannels from a different batch of addresses.
- static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
- GRPC_ARG_LB_ADDRESSES};
- // Create a subchannel for each address.
- grpc_subchannel_args sc_args;
- size_t subchannel_index = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- // If there were any balancer, we would have chosen grpclb policy instead.
- GPR_ASSERT(!addresses->addresses[i].is_balancer);
- memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- grpc_arg addr_arg =
- grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
- grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
- &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1);
- gpr_free(addr_arg.value.string);
- sc_args.args = new_args;
- grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
- client_channel_factory, &sc_args);
- grpc_channel_args_destroy(new_args);
- if (subchannel == nullptr) {
- // Subchannel could not be created.
- if (tracer->enabled()) {
- char* address_uri =
- grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_INFO,
- "[%s %p] could not create subchannel for address uri %s, "
- "ignoring",
- tracer->name(), subchannel_list->policy, address_uri);
- gpr_free(address_uri);
- }
- continue;
- }
- if (tracer->enabled()) {
- char* address_uri =
- grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_INFO,
- "[%s %p] subchannel list %p index %" PRIuPTR
- ": Created subchannel %p for address uri %s",
- tracer->name(), p, subchannel_list, subchannel_index, subchannel,
- address_uri);
- gpr_free(address_uri);
- }
- grpc_lb_subchannel_data* sd =
- &subchannel_list->subchannels[subchannel_index++];
- sd->subchannel_list = subchannel_list;
- sd->subchannel = subchannel;
- GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure,
- connectivity_changed_cb, sd,
- grpc_combiner_scheduler(combiner));
- // We assume that the current state is IDLE. If not, we'll get a
- // callback telling us that.
- sd->prev_connectivity_state = GRPC_CHANNEL_IDLE;
- sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
- sd->pending_connectivity_state_unsafe = GRPC_CHANNEL_IDLE;
- sd->user_data_vtable = addresses->user_data_vtable;
- if (sd->user_data_vtable != nullptr) {
- sd->user_data =
- sd->user_data_vtable->copy(addresses->addresses[i].user_data);
- }
- }
- subchannel_list->num_subchannels = subchannel_index;
- subchannel_list->num_idle = subchannel_index;
- return subchannel_list;
-}
-
-static void subchannel_list_destroy(grpc_lb_subchannel_list* subchannel_list) {
- if (subchannel_list->tracer->enabled()) {
- gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p",
- subchannel_list->tracer->name(), subchannel_list->policy,
- subchannel_list);
- }
- for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
- grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i];
- grpc_lb_subchannel_data_unref_subchannel(sd, "subchannel_list_destroy");
- }
- gpr_free(subchannel_list->subchannels);
- gpr_free(subchannel_list);
-}
-
-void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason) {
- gpr_ref_non_zero(&subchannel_list->refcount);
- if (subchannel_list->tracer->enabled()) {
- const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
- gpr_log(GPR_INFO, "[%s %p] subchannel_list %p REF %lu->%lu (%s)",
- subchannel_list->tracer->name(), subchannel_list->policy,
- subchannel_list, static_cast<unsigned long>(count - 1),
- static_cast<unsigned long>(count), reason);
- }
-}
-
-void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason) {
- const bool done = gpr_unref(&subchannel_list->refcount);
- if (subchannel_list->tracer->enabled()) {
- const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
- gpr_log(GPR_INFO, "[%s %p] subchannel_list %p UNREF %lu->%lu (%s)",
- subchannel_list->tracer->name(), subchannel_list->policy,
- subchannel_list, static_cast<unsigned long>(count + 1),
- static_cast<unsigned long>(count), reason);
- }
- if (done) {
- subchannel_list_destroy(subchannel_list);
- }
-}
-
-static void subchannel_data_cancel_connectivity_watch(
- grpc_lb_subchannel_data* sd, const char* reason) {
- if (sd->subchannel_list->tracer->enabled()) {
- gpr_log(GPR_INFO,
- "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
- " (subchannel %p): canceling connectivity watch (%s)",
- sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
- sd->subchannel_list,
- static_cast<size_t>(sd - sd->subchannel_list->subchannels),
- sd->subchannel_list->num_subchannels, sd->subchannel, reason);
- }
- grpc_subchannel_notify_on_state_change(sd->subchannel, nullptr, nullptr,
- &sd->connectivity_changed_closure);
-}
-
-void grpc_lb_subchannel_list_shutdown_and_unref(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- if (subchannel_list->tracer->enabled()) {
- gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p (%s)",
- subchannel_list->tracer->name(), subchannel_list->policy,
- subchannel_list, reason);
- }
- GPR_ASSERT(!subchannel_list->shutting_down);
- subchannel_list->shutting_down = true;
- for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
- grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i];
- // If there's a pending notification for this subchannel, cancel it;
- // the callback is responsible for unreffing the subchannel.
- // Otherwise, unref the subchannel directly.
- if (sd->connectivity_notification_pending) {
- subchannel_data_cancel_connectivity_watch(sd, reason);
- } else if (sd->subchannel != nullptr) {
- grpc_lb_subchannel_data_unref_subchannel(sd, reason);
- }
- }
- grpc_lb_subchannel_list_unref(subchannel_list, reason);
-}
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 6889d596ac..7e2046bcdc 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -21,116 +21,516 @@
#include <grpc/support/port_platform.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
-// TODO(roth): This code is intended to be shared between pick_first and
-// round_robin. However, the interface needs more work to provide clean
-// encapsulation. For example, the structs here have some fields that are
-// only used in one of the two (e.g., the state counters in
-// grpc_lb_subchannel_list and the prev_connectivity_state field in
-// grpc_lb_subchannel_data are only used in round_robin, and the
-// checking_subchannel field in grpc_lb_subchannel_list is only used by
-// pick_first). Also, there is probably some code duplication between the
-// connectivity state notification callback code in both pick_first and
-// round_robin that could be refactored and moved here. In a future PR,
-// need to clean this up.
-
-typedef struct grpc_lb_subchannel_list grpc_lb_subchannel_list;
-
-typedef struct {
- /** backpointer to owning subchannel list */
- grpc_lb_subchannel_list* subchannel_list;
- /** subchannel itself */
- grpc_subchannel* subchannel;
- grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
- /** Is a connectivity notification pending? */
- bool connectivity_notification_pending;
- /** notification that connectivity has changed on subchannel */
- grpc_closure connectivity_changed_closure;
- /** previous and current connectivity states. Updated by \a
- * \a connectivity_changed_closure based on
- * \a pending_connectivity_state_unsafe. */
- grpc_connectivity_state prev_connectivity_state;
- grpc_connectivity_state curr_connectivity_state;
- /** connectivity state to be updated by
- * grpc_subchannel_notify_on_state_change(), not guarded by
- * the combiner. To be copied to \a curr_connectivity_state by
- * \a connectivity_changed_closure. */
- grpc_connectivity_state pending_connectivity_state_unsafe;
- /** the subchannel's target user data */
- void* user_data;
- /** vtable to operate over \a user_data */
- const grpc_lb_user_data_vtable* user_data_vtable;
-} grpc_lb_subchannel_data;
-
-/// Unrefs the subchannel contained in sd.
-void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
- const char* reason);
-
-/// Starts watching the connectivity state of the subchannel.
-/// The connectivity_changed_cb callback must invoke either
-/// grpc_lb_subchannel_data_stop_connectivity_watch() or again call
-/// grpc_lb_subchannel_data_start_connectivity_watch().
-void grpc_lb_subchannel_data_start_connectivity_watch(
- grpc_lb_subchannel_data* sd);
-
-/// Stops watching the connectivity state of the subchannel.
-void grpc_lb_subchannel_data_stop_connectivity_watch(
- grpc_lb_subchannel_data* sd);
-
-struct grpc_lb_subchannel_list {
- /** backpointer to owning policy */
- grpc_core::LoadBalancingPolicy* policy;
-
- grpc_core::TraceFlag* tracer;
-
- /** all our subchannels */
- size_t num_subchannels;
- grpc_lb_subchannel_data* subchannels;
-
- /** Index into subchannels of the one we're currently checking.
- * Used when connecting to subchannels serially instead of in parallel. */
- // TODO(roth): When we have time, we can probably make this go away
- // and compute the index dynamically by subtracting
- // subchannel_list->subchannels from the subchannel_data pointer.
- size_t checking_subchannel;
-
- /** how many subchannels are in state READY */
- size_t num_ready;
- /** how many subchannels are in state TRANSIENT_FAILURE */
- size_t num_transient_failures;
- /** how many subchannels are in state IDLE */
- size_t num_idle;
-
- /** There will be one ref for each entry in subchannels for which there is a
- * pending connectivity state watcher callback. */
- gpr_refcount refcount;
-
- /** Is this list shutting down? This may be true due to the shutdown of the
- * policy itself or because a newer update has arrived while this one hadn't
- * finished processing. */
- bool shutting_down;
+// Code for maintaining a list of subchannels within an LB policy.
+//
+// To use this, callers must create their own subclasses, like so:
+/*
+
+class MySubchannelList; // Forward declaration.
+
+class MySubchannelData
+ : public SubchannelData<MySubchannelList, MySubchannelData> {
+ public:
+ void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) override {
+ // ...code to handle connectivity changes...
+ }
+};
+
+class MySubchannelList
+ : public SubchannelList<MySubchannelList, MySubchannelData> {
};
-grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
- grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer,
+*/
+// All methods with a Locked() suffix must be called from within the
+// client_channel combiner.
+
+namespace grpc_core {
+
+// Stores data for a particular subchannel in a subchannel list.
+// Callers must create a subclass that implements the
+// ProcessConnectivityChangeLocked() method.
+template <typename SubchannelListType, typename SubchannelDataType>
+class SubchannelData {
+ public:
+ // Returns a pointer to the subchannel list containing this object.
+ SubchannelListType* subchannel_list() const { return subchannel_list_; }
+
+ // Returns the index into the subchannel list of this object.
+ size_t Index() const {
+ return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
+ subchannel_list_->subchannel(0));
+ }
+
+ // Returns a pointer to the subchannel.
+ grpc_subchannel* subchannel() const { return subchannel_; }
+
+ // Returns the connected subchannel. Will be null if the subchannel
+ // is not connected.
+ ConnectedSubchannel* connected_subchannel() const {
+ return connected_subchannel_.get();
+ }
+
+ // Synchronously checks the subchannel's connectivity state.
+ // Must not be called while there is a connectivity notification
+ // pending (i.e., between calling StartConnectivityWatchLocked() or
+ // RenewConnectivityWatchLocked() and the resulting invocation of
+ // ProcessConnectivityChangeLocked()).
+ grpc_connectivity_state CheckConnectivityStateLocked(grpc_error** error) {
+ GPR_ASSERT(!connectivity_notification_pending_);
+ pending_connectivity_state_unsafe_ =
+ grpc_subchannel_check_connectivity(subchannel(), error);
+ UpdateConnectedSubchannelLocked();
+ return pending_connectivity_state_unsafe_;
+ }
+
+ // Unrefs the subchannel. May be used if an individual subchannel is
+ // no longer needed even though the subchannel list as a whole is not
+ // being unreffed.
+ virtual void UnrefSubchannelLocked(const char* reason);
+
+ // Starts watching the connectivity state of the subchannel.
+ // ProcessConnectivityChangeLocked() will be called when the
+ // connectivity state changes.
+ void StartConnectivityWatchLocked();
+
+ // Renews watching the connectivity state of the subchannel.
+ void RenewConnectivityWatchLocked();
+
+ // Stops watching the connectivity state of the subchannel.
+ void StopConnectivityWatchLocked();
+
+ // Cancels watching the connectivity state of the subchannel.
+ // Must be called only while there is a connectivity notification
+ // pending (i.e., between calling StartConnectivityWatchLocked() or
+ // RenewConnectivityWatchLocked() and the resulting invocation of
+ // ProcessConnectivityChangeLocked()).
+ // From within ProcessConnectivityChangeLocked(), use
+ // StopConnectivityWatchLocked() instead.
+ void CancelConnectivityWatchLocked(const char* reason);
+
+ // Cancels any pending connectivity watch and unrefs the subchannel.
+ void ShutdownLocked();
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ SubchannelData(SubchannelListType* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
+ grpc_combiner* combiner);
+
+ virtual ~SubchannelData();
+
+ // After StartConnectivityWatchLocked() or RenewConnectivityWatchLocked()
+ // is called, this method will be invoked when the subchannel's connectivity
+ // state changes.
+ // Implementations must invoke either RenewConnectivityWatchLocked() or
+ // StopConnectivityWatchLocked() before returning.
+ virtual void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state,
+ grpc_error* error) GRPC_ABSTRACT;
+
+ private:
+ // Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
+ // Returns true if the connectivity state should be reported.
+ bool UpdateConnectedSubchannelLocked();
+
+ static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
+
+ // Backpointer to owning subchannel list. Not owned.
+ SubchannelListType* subchannel_list_;
+
+ // The subchannel and connected subchannel.
+ grpc_subchannel* subchannel_;
+ RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
+
+ // Notification that connectivity has changed on subchannel.
+ grpc_closure connectivity_changed_closure_;
+ // Is a connectivity notification pending?
+ bool connectivity_notification_pending_ = false;
+ // Connectivity state to be updated by
+ // grpc_subchannel_notify_on_state_change(), not guarded by
+ // the combiner.
+ grpc_connectivity_state pending_connectivity_state_unsafe_;
+};
+
+// A list of subchannels.
+template <typename SubchannelListType, typename SubchannelDataType>
+class SubchannelList
+ : public InternallyRefCountedWithTracing<SubchannelListType> {
+ public:
+ typedef InlinedVector<SubchannelDataType, 10> SubchannelVector;
+
+ // The number of subchannels in the list.
+ size_t num_subchannels() const { return subchannels_.size(); }
+
+ // The data for the subchannel at a particular index.
+ SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
+
+ // Returns true if the subchannel list is shutting down.
+ bool shutting_down() const { return shutting_down_; }
+
+ // Accessors.
+ LoadBalancingPolicy* policy() const { return policy_; }
+ TraceFlag* tracer() const { return tracer_; }
+
+ // Note: Caller must ensure that this is invoked inside of the combiner.
+ void Orphan() override {
+ ShutdownLocked();
+ InternallyRefCountedWithTracing<SubchannelListType>::Unref(DEBUG_LOCATION,
+ "shutdown");
+ }
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
+ const grpc_lb_addresses* addresses, grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ const grpc_channel_args& args);
+
+ virtual ~SubchannelList();
+
+ private:
+ // So New() can call our private ctor.
+ template <typename T, typename... Args>
+ friend T* New(Args&&... args);
+
+ // For accessing Ref() and Unref().
+ friend class SubchannelData<SubchannelListType, SubchannelDataType>;
+
+ void ShutdownLocked();
+
+ // Backpointer to owning policy.
+ LoadBalancingPolicy* policy_;
+
+ TraceFlag* tracer_;
+
+ grpc_combiner* combiner_;
+
+ // The list of subchannels.
+ SubchannelVector subchannels_;
+
+ // Is this list shutting down? This may be true due to the shutdown of the
+ // policy itself or because a newer update has arrived while this one hadn't
+ // finished processing.
+ bool shutting_down_ = false;
+};
+
+//
+// implementation -- no user-servicable parts below
+//
+
+//
+// SubchannelData
+//
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
+ SubchannelListType* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
+ grpc_combiner* combiner)
+ : subchannel_list_(subchannel_list),
+ subchannel_(subchannel),
+ // We assume that the current state is IDLE. If not, we'll get a
+ // callback telling us that.
+ pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE) {
+ GRPC_CLOSURE_INIT(
+ &connectivity_changed_closure_,
+ (&SubchannelData<SubchannelListType,
+ SubchannelDataType>::OnConnectivityChangedLocked),
+ this, grpc_combiner_scheduler(combiner));
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
+ UnrefSubchannelLocked("subchannel_data_destroy");
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::
+ UnrefSubchannelLocked(const char* reason) {
+ if (subchannel_ != nullptr) {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): unreffing subchannel",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+ subchannel_);
+ }
+ GRPC_SUBCHANNEL_UNREF(subchannel_, reason);
+ subchannel_ = nullptr;
+ connected_subchannel_.reset();
+ }
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::StartConnectivityWatchLocked() {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): starting watch: requesting connectivity change "
+ "notification (from %s)",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+ subchannel_,
+ grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
+ }
+ GPR_ASSERT(!connectivity_notification_pending_);
+ connectivity_notification_pending_ = true;
+ subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release();
+ grpc_subchannel_notify_on_state_change(
+ subchannel_, subchannel_list_->policy()->interested_parties(),
+ &pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::RenewConnectivityWatchLocked() {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): renewing watch: requesting connectivity change "
+ "notification (from %s)",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+ subchannel_,
+ grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
+ }
+ GPR_ASSERT(connectivity_notification_pending_);
+ grpc_subchannel_notify_on_state_change(
+ subchannel_, subchannel_list_->policy()->interested_parties(),
+ &pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::StopConnectivityWatchLocked() {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): stopping connectivity watch",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+ subchannel_);
+ }
+ GPR_ASSERT(connectivity_notification_pending_);
+ connectivity_notification_pending_ = false;
+ subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch");
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::
+ CancelConnectivityWatchLocked(const char* reason) {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): canceling connectivity watch (%s)",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+ subchannel_, reason);
+ }
+ GPR_ASSERT(connectivity_notification_pending_);
+ grpc_subchannel_notify_on_state_change(subchannel_, nullptr, nullptr,
+ &connectivity_changed_closure_);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+bool SubchannelData<SubchannelListType,
+ SubchannelDataType>::UpdateConnectedSubchannelLocked() {
+ // If the subchannel is READY, take a ref to the connected subchannel.
+ if (pending_connectivity_state_unsafe_ == GRPC_CHANNEL_READY) {
+ connected_subchannel_ =
+ grpc_subchannel_get_connected_subchannel(subchannel_);
+ // If the subchannel became disconnected between the time that READY
+ // was reported and the time we got here (e.g., between when a
+ // notification callback is scheduled and when it was actually run in
+ // the combiner), then the connected subchannel may have disappeared out
+ // from under us. In that case, we don't actually want to consider the
+ // subchannel to be in state READY. Instead, we use IDLE as the
+ // basis for any future connectivity watch; this is the one state that
+ // the subchannel will never transition back into, so this ensures
+ // that we will get a notification for the next state, even if that state
+ // is READY again (e.g., if the subchannel has transitioned back to
+ // READY before the next watch gets requested).
+ if (connected_subchannel_ == nullptr) {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): state is READY but connected subchannel is "
+ "null; moving to state IDLE",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+ subchannel_);
+ }
+ pending_connectivity_state_unsafe_ = GRPC_CHANNEL_IDLE;
+ return false;
+ }
+ } else {
+ // For any state other than READY, unref the connected subchannel.
+ connected_subchannel_.reset();
+ }
+ return true;
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::
+ OnConnectivityChangedLocked(void* arg, grpc_error* error) {
+ SubchannelData* sd = static_cast<SubchannelData*>(arg);
+ if (sd->subchannel_list_->tracer()->enabled()) {
+ gpr_log(
+ GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): connectivity changed: state=%s, error=%s, "
+ "shutting_down=%d",
+ sd->subchannel_list_->tracer()->name(), sd->subchannel_list_->policy(),
+ sd->subchannel_list_, sd->Index(),
+ sd->subchannel_list_->num_subchannels(), sd->subchannel_,
+ grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe_),
+ grpc_error_string(error), sd->subchannel_list_->shutting_down());
+ }
+ // If shutting down, unref subchannel and stop watching.
+ if (sd->subchannel_list_->shutting_down() || error == GRPC_ERROR_CANCELLED) {
+ sd->UnrefSubchannelLocked("connectivity_shutdown");
+ sd->StopConnectivityWatchLocked();
+ return;
+ }
+ // Get or release ref to connected subchannel.
+ if (!sd->UpdateConnectedSubchannelLocked()) {
+ // We don't want to report this connectivity state, so renew the watch.
+ sd->RenewConnectivityWatchLocked();
+ return;
+ }
+ // Call the subclass's ProcessConnectivityChangeLocked() method.
+ sd->ProcessConnectivityChangeLocked(sd->pending_connectivity_state_unsafe_,
+ GRPC_ERROR_REF(error));
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
+ // If there's a pending notification for this subchannel, cancel it;
+ // the callback is responsible for unreffing the subchannel.
+ // Otherwise, unref the subchannel directly.
+ if (connectivity_notification_pending_) {
+ CancelConnectivityWatchLocked("shutdown");
+ } else if (subchannel_ != nullptr) {
+ UnrefSubchannelLocked("shutdown");
+ }
+}
+
+//
+// SubchannelList
+//
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
+ LoadBalancingPolicy* policy, TraceFlag* tracer,
const grpc_lb_addresses* addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
- const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb);
+ const grpc_channel_args& args)
+ : InternallyRefCountedWithTracing<SubchannelListType>(tracer),
+ policy_(policy),
+ tracer_(tracer),
+ combiner_(GRPC_COMBINER_REF(combiner, "subchannel_list")) {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
+ tracer_->name(), policy, this, addresses->num_addresses);
+ }
+ subchannels_.reserve(addresses->num_addresses);
+ // We need to remove the LB addresses in order to be able to compare the
+ // subchannel keys of subchannels from a different batch of addresses.
+ static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
+ GRPC_ARG_LB_ADDRESSES};
+ // Create a subchannel for each address.
+ grpc_subchannel_args sc_args;
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ // If there were any balancer, we would have chosen grpclb policy instead.
+ GPR_ASSERT(!addresses->addresses[i].is_balancer);
+ memset(&sc_args, 0, sizeof(grpc_subchannel_args));
+ grpc_arg addr_arg =
+ grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
+ grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
+ &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1);
+ gpr_free(addr_arg.value.string);
+ sc_args.args = new_args;
+ grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
+ client_channel_factory, &sc_args);
+ grpc_channel_args_destroy(new_args);
+ if (subchannel == nullptr) {
+ // Subchannel could not be created.
+ if (tracer_->enabled()) {
+ char* address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_INFO,
+ "[%s %p] could not create subchannel for address uri %s, "
+ "ignoring",
+ tracer_->name(), policy_, address_uri);
+ gpr_free(address_uri);
+ }
+ continue;
+ }
+ if (tracer_->enabled()) {
+ char* address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR
+ ": Created subchannel %p for address uri %s",
+ tracer_->name(), policy_, this, subchannels_.size(), subchannel,
+ address_uri);
+ gpr_free(address_uri);
+ }
+ subchannels_.emplace_back(static_cast<SubchannelListType*>(this),
+ addresses->user_data_vtable,
+ addresses->addresses[i], subchannel, combiner);
+ }
+}
-void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason);
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
+ policy_, this);
+ }
+ GRPC_COMBINER_UNREF(combiner_, "subchannel_list");
+}
-void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason);
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p",
+ tracer_->name(), policy_, this);
+ }
+ GPR_ASSERT(!shutting_down_);
+ shutting_down_ = true;
+ for (size_t i = 0; i < subchannels_.size(); i++) {
+ SubchannelDataType* sd = &subchannels_[i];
+ sd->ShutdownLocked();
+ }
+}
-/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The
-/// connectivity state notification callback will ultimately unref it.
-void grpc_lb_subchannel_list_shutdown_and_unref(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
+} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index b1f9d2018e..234f7634e2 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -344,7 +344,6 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
- 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc',