aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-03-30 13:28:56 -0700
committerGravatar Mark D. Roth <roth@google.com>2018-03-30 13:28:56 -0700
commit7c1b5db3bb000a7c69d9d8151c66fecbacce64c3 (patch)
tree29f3f4b62097d3c0b7235e4b1ee89eb6b4023bc1 /src/core/ext/filters/client_channel
parentd536dba4d3234ba28066a31f7a092ce9daa89ec4 (diff)
Convert subchannel_list code to C++.
Diffstat (limited to 'src/core/ext/filters/client_channel')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc27
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc322
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc428
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc253
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h468
5 files changed, 791 insertions, 707 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 51f9ae000a..a10bfea8b1 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -924,7 +924,9 @@ typedef struct client_channel_call_data {
// Note: We inline the cache for the first 3 send_message ops and use
// dynamic allocation after that. This number was essentially picked
// at random; it could be changed in the future to tune performance.
- grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages;
+ grpc_core::ManualConstructor<
+ grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>>
+ send_messages;
// send_trailing_metadata
bool seen_send_trailing_metadata;
grpc_linked_mdelem* send_trailing_metadata_storage;
@@ -974,7 +976,7 @@ static void maybe_cache_send_ops_for_batch(call_data* calld,
gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
new (cache) grpc_core::ByteStreamCache(
std::move(batch->payload->send_message.send_message));
- calld->send_messages.push_back(cache);
+ calld->send_messages->push_back(cache);
}
// Save metadata batch for send_trailing_metadata ops.
if (batch->send_trailing_metadata) {
@@ -1008,7 +1010,7 @@ static void free_cached_send_op_data_after_commit(
"]",
chand, calld, i);
}
- calld->send_messages[i]->Destroy();
+ (*calld->send_messages)[i]->Destroy();
}
if (retry_state->completed_send_trailing_metadata) {
grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
@@ -1032,7 +1034,7 @@ static void free_cached_send_op_data_for_completed_batch(
"]",
chand, calld, retry_state->completed_send_message_count - 1);
}
- calld->send_messages[retry_state->completed_send_message_count - 1]
+ (*calld->send_messages)[retry_state->completed_send_message_count - 1]
->Destroy();
}
if (batch_data->batch.send_trailing_metadata) {
@@ -1280,7 +1282,8 @@ static bool pending_batch_is_completed(
return false;
}
if (pending->batch->send_message &&
- retry_state->completed_send_message_count < calld->send_messages.size()) {
+ retry_state->completed_send_message_count <
+ calld->send_messages->size()) {
return false;
}
if (pending->batch->send_trailing_metadata &&
@@ -1315,7 +1318,7 @@ static bool pending_batch_is_unstarted(
return true;
}
if (pending->batch->send_message &&
- retry_state->started_send_message_count < calld->send_messages.size()) {
+ retry_state->started_send_message_count < calld->send_messages->size()) {
return true;
}
if (pending->batch->send_trailing_metadata &&
@@ -1817,7 +1820,7 @@ static void add_closures_for_replay_or_pending_send_ops(
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
bool have_pending_send_message_ops =
- retry_state->started_send_message_count < calld->send_messages.size();
+ retry_state->started_send_message_count < calld->send_messages->size();
bool have_pending_send_trailing_metadata_op =
calld->seen_send_trailing_metadata &&
!retry_state->started_send_trailing_metadata;
@@ -2133,7 +2136,7 @@ static void add_retriable_send_message_op(
chand, calld, retry_state->started_send_message_count);
}
grpc_core::ByteStreamCache* cache =
- calld->send_messages[retry_state->started_send_message_count];
+ (*calld->send_messages)[retry_state->started_send_message_count];
++retry_state->started_send_message_count;
batch_data->send_message.Init(cache);
batch_data->batch.send_message = true;
@@ -2254,7 +2257,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
}
// send_message.
// Note that we can only have one send_message op in flight at a time.
- if (retry_state->started_send_message_count < calld->send_messages.size() &&
+ if (retry_state->started_send_message_count < calld->send_messages->size() &&
retry_state->started_send_message_count ==
retry_state->completed_send_message_count &&
!calld->pending_send_message) {
@@ -2274,7 +2277,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
// to start, since we can't send down any more send_message ops after
// send_trailing_metadata.
if (calld->seen_send_trailing_metadata &&
- retry_state->started_send_message_count == calld->send_messages.size() &&
+ retry_state->started_send_message_count == calld->send_messages->size() &&
!retry_state->started_send_trailing_metadata &&
!calld->pending_send_trailing_metadata) {
if (grpc_client_channel_trace.enabled()) {
@@ -2325,7 +2328,7 @@ static void add_subchannel_batches_for_pending_batches(
// send_message ops after send_trailing_metadata.
if (batch->send_trailing_metadata &&
(retry_state->started_send_message_count + batch->send_message <
- calld->send_messages.size() ||
+ calld->send_messages->size() ||
retry_state->started_send_trailing_metadata)) {
continue;
}
@@ -2976,6 +2979,7 @@ static grpc_error* cc_init_call_elem(grpc_call_element* elem,
calld->deadline);
}
calld->enable_retries = chand->enable_retries;
+ calld->send_messages.Init();
return GRPC_ERROR_NONE;
}
@@ -3011,6 +3015,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem,
calld->pick.subchannel_call_context[i].value);
}
}
+ calld->send_messages.Destroy();
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 9090c34412..b593f93d7b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -62,31 +62,57 @@ class PickFirst : public LoadBalancingPolicy {
private:
~PickFirst();
+ class PickFirstSubchannelList;
+
+ class PickFirstSubchannelData
+ : public SubchannelData<PickFirstSubchannelList,
+ PickFirstSubchannelData> {
+ public:
+ PickFirstSubchannelData(PickFirstSubchannelList* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address,
+ grpc_subchannel* subchannel,
+ grpc_combiner* combiner)
+ : SubchannelData(subchannel_list, user_data_vtable, address,
+ subchannel, combiner) {}
+
+ void ProcessConnectivityChangeLocked(grpc_error* error) override;
+ };
+
+ class PickFirstSubchannelList
+ : public SubchannelList<PickFirstSubchannelList,
+ PickFirstSubchannelData> {
+ public:
+ PickFirstSubchannelList(
+ PickFirst* policy, TraceFlag* tracer,
+ const grpc_lb_addresses* addresses, grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ const grpc_channel_args& args)
+ : SubchannelList(policy, tracer, addresses, combiner,
+ client_channel_factory, args) {}
+
+ void RefForConnectivityWatch(const char* reason);
+ void UnrefForConnectivityWatch(const char* reason);
+ };
+
void ShutdownLocked() override;
void StartPickingLocked();
void DestroyUnselectedSubchannelsLocked();
- static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
-
- void SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
- void SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
-
- /** all our subchannels */
- grpc_lb_subchannel_list* subchannel_list_ = nullptr;
- /** latest pending subchannel list */
- grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr;
- /** selected subchannel in \a subchannel_list */
- grpc_lb_subchannel_data* selected_ = nullptr;
- /** have we started picking? */
+ // All our subchannels.
+ RefCountedPtr<PickFirstSubchannelList> subchannel_list_;
+ // Latest pending subchannel list.
+ RefCountedPtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
+ // Selected subchannel in \a subchannel_list_.
+ PickFirstSubchannelData* selected_ = nullptr;
+ // Have we started picking?
bool started_picking_ = false;
- /** are we shut down? */
+ // Are we shut down?
bool shutdown_ = false;
- /** list of picks that are waiting on connectivity */
+ // List of picks that are waiting on connectivity.
PickState* pending_picks_ = nullptr;
- /** our connectivity state tracker */
+ // Our connectivity state tracker.
grpc_connectivity_state_tracker state_tracker_;
};
@@ -138,13 +164,12 @@ void PickFirst::ShutdownLocked() {
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "shutdown");
if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "pf_shutdown");
- subchannel_list_ = nullptr;
+ subchannel_list_->ShutdownLocked("pf_shutdown");
+ subchannel_list_.reset();
}
if (latest_pending_subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(latest_pending_subchannel_list_,
- "pf_shutdown");
- latest_pending_subchannel_list_ = nullptr;
+ latest_pending_subchannel_list_->ShutdownLocked("pf_shutdown");
+ latest_pending_subchannel_list_.reset();
}
TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
@@ -192,14 +217,12 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
void PickFirst::StartPickingLocked() {
started_picking_ = true;
- if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels > 0) {
- subchannel_list_->checking_subchannel = 0;
- for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
- if (subchannel_list_->subchannels[i].subchannel != nullptr) {
- SubchannelListRefForConnectivityWatch(
- subchannel_list_, "connectivity_watch+start_picking");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list_->subchannels[i]);
+ if (subchannel_list_ != nullptr) {
+ for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
+ if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
+ subchannel_list_->RefForConnectivityWatch(
+ "connectivity_watch+start_picking");
+ subchannel_list_->subchannel(i)->StartConnectivityWatchLocked();
break;
}
}
@@ -215,7 +238,7 @@ void PickFirst::ExitIdleLocked() {
bool PickFirst::PickLocked(PickState* pick) {
// If we have a selected subchannel already, return synchronously.
if (selected_ != nullptr) {
- pick->connected_subchannel = selected_->connected_subchannel;
+ pick->connected_subchannel = selected_->connected_subchannel()->Ref();
return true;
}
// No subchannel selected yet, so handle asynchronously.
@@ -228,11 +251,10 @@ bool PickFirst::PickLocked(PickState* pick) {
}
void PickFirst::DestroyUnselectedSubchannelsLocked() {
- for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
- grpc_lb_subchannel_data* sd = &subchannel_list_->subchannels[i];
+ for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
+ PickFirstSubchannelData* sd = subchannel_list_->subchannel(i);
if (selected_ != sd) {
- grpc_lb_subchannel_data_unref_subchannel(sd,
- "selected_different_subchannel");
+ sd->UnrefSubchannelLocked("selected_different_subchannel");
}
}
}
@@ -249,7 +271,7 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
if (selected_ != nullptr) {
- selected_->connected_subchannel->Ping(on_initiate, on_ack);
+ selected_->connected_subchannel()->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
@@ -258,24 +280,6 @@ void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
}
}
-void PickFirst::SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- // TODO(roth): We currently track this ref manually. Once the new
- // ClosureRef API is ready and the subchannel_list code has been
- // converted to a C++ API, find a way to hold the RefCountedPtr<>
- // somewhere (maybe in the subchannel_data object) instead of doing
- // this manually.
- auto self = Ref(DEBUG_LOCATION, reason);
- self.release();
- grpc_lb_subchannel_list_ref(subchannel_list, reason);
-}
-
-void PickFirst::SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- Unref(DEBUG_LOCATION, reason);
- grpc_lb_subchannel_list_unref(subchannel_list, reason);
-}
-
void PickFirst::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
@@ -301,10 +305,10 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
"Pick First %p received update with %" PRIuPTR " addresses", this,
addresses->num_addresses);
}
- grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
+ auto subchannel_list = MakeRefCounted<PickFirstSubchannelList>(
this, &grpc_lb_pick_first_trace, addresses, combiner(),
- client_channel_factory(), args, &PickFirst::OnConnectivityChangedLocked);
- if (subchannel_list->num_subchannels == 0) {
+ client_channel_factory(), args);
+ if (subchannel_list->num_subchannels() == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current
// subchannels and put the channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
@@ -312,10 +316,9 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"pf_update_empty");
if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "sl_shutdown_empty_update");
+ subchannel_list_->ShutdownLocked("sl_shutdown_empty_update");
}
- subchannel_list_ = subchannel_list; // Empty list.
+ subchannel_list_ = std::move(subchannel_list); // Empty list.
selected_ = nullptr;
return;
}
@@ -323,45 +326,48 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// We don't yet have a selected subchannel, so replace the current
// subchannel list immediately.
if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "pf_update_before_selected");
+ subchannel_list_->ShutdownLocked("pf_update_before_selected");
+ }
+ subchannel_list_ = std::move(subchannel_list);
+ // If we've started picking, start trying to connect to the first
+ // subchannel in the new list.
+ if (started_picking_) {
+ subchannel_list_->RefForConnectivityWatch("connectivity_watch+update");
+ subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
}
- subchannel_list_ = subchannel_list;
} else {
// We do have a selected subchannel.
// Check if it's present in the new list. If so, we're done.
- for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
- grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i];
- if (sd->subchannel == selected_->subchannel) {
+ for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
+ PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
+ if (sd->subchannel() == selected_->subchannel()) {
// The currently selected subchannel is in the update: we are done.
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p found already selected subchannel %p "
"at update index %" PRIuPTR " of %" PRIuPTR "; update done",
- this, selected_->subchannel, i,
- subchannel_list->num_subchannels);
+ this, selected_->subchannel(), i,
+ subchannel_list->num_subchannels());
}
- if (selected_->connected_subchannel != nullptr) {
- sd->connected_subchannel = selected_->connected_subchannel;
+ if (selected_->connected_subchannel() != nullptr) {
+ sd->SetConnectedSubchannelFromLocked(selected_);
}
selected_ = sd;
if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- subchannel_list_, "pf_update_includes_selected");
+ subchannel_list_->ShutdownLocked("pf_update_includes_selected");
}
- subchannel_list_ = subchannel_list;
+ subchannel_list_ = std::move(subchannel_list);
DestroyUnselectedSubchannelsLocked();
- SubchannelListRefForConnectivityWatch(
- subchannel_list, "connectivity_watch+replace_selected");
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ subchannel_list_->RefForConnectivityWatch(
+ "connectivity_watch+replace_selected");
+ sd->StartConnectivityWatchLocked();
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
if (latest_pending_subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_,
+ latest_pending_subchannel_list_->ShutdownLocked(
"pf_update_includes_selected+outdated");
- latest_pending_subchannel_list_ = nullptr;
+ latest_pending_subchannel_list_.reset();
}
return;
}
@@ -375,74 +381,89 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
gpr_log(GPR_DEBUG,
"Pick First %p Shutting down latest pending subchannel list "
"%p, about to be replaced by newer latest %p",
- this, latest_pending_subchannel_list_, subchannel_list);
+ this, latest_pending_subchannel_list_.get(),
+ subchannel_list.get());
}
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_, "sl_outdated_dont_smash");
+ latest_pending_subchannel_list_->ShutdownLocked("sl_outdated_dont_smash");
+ }
+ latest_pending_subchannel_list_ = std::move(subchannel_list);
+ // If we've started picking, start trying to connect to the first
+ // subchannel in the new list.
+ if (started_picking_) {
+ latest_pending_subchannel_list_->RefForConnectivityWatch(
+ "connectivity_watch+update");
+ latest_pending_subchannel_list_->subchannel(0)
+ ->StartConnectivityWatchLocked();
}
- latest_pending_subchannel_list_ = subchannel_list;
- }
- // If we've started picking, start trying to connect to the first
- // subchannel in the new list.
- if (started_picking_) {
- SubchannelListRefForConnectivityWatch(subchannel_list,
- "connectivity_watch+update");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list->subchannels[0]);
}
}
-void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
- grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg);
- PickFirst* p = static_cast<PickFirst*>(sd->subchannel_list->policy);
+void PickFirst::PickFirstSubchannelList::RefForConnectivityWatch(
+ const char* reason) {
+ // TODO(roth): We currently track these refs manually. Once the new
+ // ClosureRef API is ready, find a way to pass the RefCountedPtr<>
+ // along with the closures instead of doing this manually.
+ // Ref subchannel list.
+ Ref(DEBUG_LOCATION, reason).release();
+ // Ref LB policy.
+ PickFirst* p = static_cast<PickFirst*>(policy());
+ p->Ref(DEBUG_LOCATION, reason).release();
+}
+
+void PickFirst::PickFirstSubchannelList::UnrefForConnectivityWatch(
+ const char* reason) {
+ // Unref LB policy.
+ PickFirst* p = static_cast<PickFirst*>(policy());
+ p->Unref(DEBUG_LOCATION, reason);
+ // Unref subchannel list.
+ Unref(DEBUG_LOCATION, reason);
+}
+
+void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
+ grpc_error* error) {
+ PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG,
"Pick First %p connectivity changed for subchannel %p (%" PRIuPTR
" of %" PRIuPTR
"), subchannel_list %p: state=%s p->shutdown_=%d "
"sd->subchannel_list->shutting_down=%d error=%s",
- p, sd->subchannel, sd->subchannel_list->checking_subchannel,
- sd->subchannel_list->num_subchannels, sd->subchannel_list,
- grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
- p->shutdown_, sd->subchannel_list->shutting_down,
+ p, subchannel(), Index(), subchannel_list()->num_subchannels(),
+ subchannel_list(),
+ grpc_connectivity_state_name(connectivity_state()),
+ p->shutdown_, subchannel_list()->shutting_down(),
grpc_error_string(error));
}
// If the policy is shutting down, unref and return.
if (p->shutdown_) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "pf_shutdown");
+ StopConnectivityWatchLocked();
+ UnrefSubchannelLocked("pf_shutdown");
+ subchannel_list()->UnrefForConnectivityWatch("pf_shutdown");
return;
}
// If the subchannel list is shutting down, stop watching.
- if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_sl_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "pf_sl_shutdown");
+ if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) {
+ StopConnectivityWatchLocked();
+ UnrefSubchannelLocked("pf_sl_shutdown");
+ subchannel_list()->UnrefForConnectivityWatch("pf_sl_shutdown");
return;
}
// If we're still here, the notification must be for a subchannel in
// either the current or latest pending subchannel lists.
- GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ ||
- sd->subchannel_list == p->latest_pending_subchannel_list_);
- // Update state.
- sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
+ GPR_ASSERT(p->subchannel_list_ == subchannel_list() ||
+ p->latest_pending_subchannel_list_ == subchannel_list());
// Handle updates for the currently selected subchannel.
- if (p->selected_ == sd) {
+ if (p->selected_ == this) {
// If the new state is anything other than READY and there is a
// pending update, switch to the pending update.
- if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
+ if (connectivity_state() != GRPC_CHANNEL_READY &&
p->latest_pending_subchannel_list_ != nullptr) {
p->selected_ = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- p->SubchannelListUnrefForConnectivityWatch(
- sd->subchannel_list, "selected_not_ready+switch_to_update");
- grpc_lb_subchannel_list_shutdown_and_unref(
- p->subchannel_list_, "selected_not_ready+switch_to_update");
- p->subchannel_list_ = p->latest_pending_subchannel_list_;
- p->latest_pending_subchannel_list_ = nullptr;
+ StopConnectivityWatchLocked();
+ subchannel_list()->UnrefForConnectivityWatch(
+ "selected_not_ready+switch_to_update");
+ subchannel_list()->ShutdownLocked("selected_not_ready+switch_to_update");
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
@@ -452,8 +473,8 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
// re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected
// subchannel reconnecting.
- GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
- if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(connectivity_state() != GRPC_CHANNEL_SHUTDOWN);
+ if (connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
@@ -462,17 +483,14 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
// In transient failure. Rely on re-resolution to recover.
p->selected_ = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "pf_selected_shutdown");
- grpc_lb_subchannel_data_unref_subchannel(
- sd, "pf_selected_shutdown"); // Unrefs connected subchannel
+ StopConnectivityWatchLocked();
+ subchannel_list()->UnrefForConnectivityWatch("pf_selected_shutdown");
+ UnrefSubchannelLocked("pf_selected_shutdown");
} else {
- grpc_connectivity_state_set(&p->state_tracker_,
- sd->curr_connectivity_state,
+ grpc_connectivity_state_set(&p->state_tracker_, connectivity_state(),
GRPC_ERROR_REF(error), "selected_changed");
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ StartConnectivityWatchLocked();
}
}
return;
@@ -486,26 +504,23 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
- switch (sd->curr_connectivity_state) {
+ switch (connectivity_state()) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list_ to
// p->subchannel_list_.
- sd->connected_subchannel =
- grpc_subchannel_get_connected_subchannel(sd->subchannel);
- if (sd->subchannel_list == p->latest_pending_subchannel_list_) {
+ GetConnectedSubchannelFromSubchannelLocked();
+ if (p->latest_pending_subchannel_list_ == subchannel_list()) {
GPR_ASSERT(p->subchannel_list_ != nullptr);
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_,
- "finish_update");
- p->subchannel_list_ = p->latest_pending_subchannel_list_;
- p->latest_pending_subchannel_list_ = nullptr;
+ p->subchannel_list_->ShutdownLocked("finish_update");
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready");
- p->selected_ = sd;
+ p->selected_ = this;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
- sd->subchannel);
+ subchannel());
}
// Drop all other subchannels, since we are now connected.
p->DestroyUnselectedSubchannelsLocked();
@@ -513,7 +528,8 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
PickState* pick;
while ((pick = p->pending_picks_)) {
p->pending_picks_ = pick->next;
- pick->connected_subchannel = p->selected_->connected_subchannel;
+ pick->connected_subchannel =
+ p->selected_->connected_subchannel()->Ref();
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
@@ -522,40 +538,38 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ StartConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ StopConnectivityWatchLocked();
+ PickFirstSubchannelData* sd = this;
do {
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
- } while (sd->subchannel == nullptr);
+ size_t next_index =
+ (sd->Index() + 1) % subchannel_list()->num_subchannels();
+ sd = subchannel_list()->subchannel(next_index);
+ } while (sd->subchannel() == nullptr);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
- if (sd->subchannel_list->checking_subchannel == 0 &&
- sd->subchannel_list == p->subchannel_list_) {
+ if (sd->Index() == 0 && p->subchannel_list_ == subchannel_list()) {
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
}
// Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ sd->StartConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1.
- if (sd->subchannel_list == p->subchannel_list_) {
+ if (p->subchannel_list_ == subchannel_list()) {
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_REF(error),
"connecting_changed");
}
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ StartConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_SHUTDOWN:
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index e534131c02..a9d9227ea5 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -73,23 +73,93 @@ class RoundRobin : public LoadBalancingPolicy {
private:
~RoundRobin();
+ class RoundRobinSubchannelList;
+
+ class RoundRobinSubchannelData
+ : public SubchannelData<RoundRobinSubchannelList,
+ RoundRobinSubchannelData> {
+ public:
+ RoundRobinSubchannelData(RoundRobinSubchannelList* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address,
+ grpc_subchannel* subchannel,
+ grpc_combiner* combiner)
+ : SubchannelData(subchannel_list, user_data_vtable, address,
+ subchannel, combiner),
+ user_data_vtable_(user_data_vtable),
+ user_data_(user_data_vtable_ != nullptr
+ ? user_data_vtable_->copy(address.user_data)
+ : nullptr) {}
+
+ void ProcessConnectivityChangeLocked(grpc_error* error) override;
+
+ void UnrefSubchannelLocked(const char* reason) override {
+ SubchannelData::UnrefSubchannelLocked(reason);
+ if (user_data_ != nullptr) {
+ GPR_ASSERT(user_data_vtable_ != nullptr);
+ user_data_vtable_->destroy(user_data_);
+ user_data_ = nullptr;
+ }
+ }
+
+ void* user_data() const { return user_data_; }
+
+ grpc_connectivity_state CheckConnectivityStateLocked() override {
+ prev_connectivity_state_ = SubchannelData::CheckConnectivityStateLocked();
+ return prev_connectivity_state_;
+ }
+
+ private:
+ const grpc_lb_user_data_vtable* user_data_vtable_;
+ void* user_data_ = nullptr;
+ grpc_connectivity_state prev_connectivity_state_ = GRPC_CHANNEL_IDLE;
+ };
+
+ class RoundRobinSubchannelList
+ : public SubchannelList<RoundRobinSubchannelList,
+ RoundRobinSubchannelData> {
+ public:
+ RoundRobinSubchannelList(
+ RoundRobin* policy, TraceFlag* tracer,
+ const grpc_lb_addresses* addresses, grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ const grpc_channel_args& args)
+ : SubchannelList(policy, tracer, addresses, combiner,
+ client_channel_factory, args),
+ num_idle_(num_subchannels()) {}
+
+ void RefForConnectivityWatch(const char* reason);
+ void UnrefForConnectivityWatch(const char* reason);
+
+ void UpdateStateCountersLocked(grpc_connectivity_state old_state,
+ grpc_connectivity_state new_state);
+
+ size_t num_ready() const { return num_ready_; }
+ size_t num_transient_failure() const { return num_transient_failure_; }
+ size_t num_idle() const { return num_idle_; }
+
+ private:
+ size_t num_ready_ = 0;
+ size_t num_transient_failure_ = 0;
+ size_t num_idle_;
+ };
+
void ShutdownLocked() override;
void StartPickingLocked();
size_t GetNextReadySubchannelIndexLocked();
void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
- void UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
- grpc_error* error);
-
- static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
-
- void SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
- void SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
+ void UpdateConnectivityStateLocked(grpc_connectivity_state state,
+ grpc_error* error);
/** list of subchannels */
- grpc_lb_subchannel_list* subchannel_list_ = nullptr;
+ RefCountedPtr<RoundRobinSubchannelList> subchannel_list_;
+ /** Latest version of the subchannel list.
+ * Subchannel connectivity callbacks will only promote updated subchannel
+ * lists if they equal \a latest_pending_subchannel_list. In other words,
+ * racing callbacks that reference outdated subchannel lists won't perform any
+ * update. */
+ RefCountedPtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
/** have we started picking? */
bool started_picking_ = false;
/** are we shutting down? */
@@ -98,14 +168,8 @@ class RoundRobin : public LoadBalancingPolicy {
PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker_;
- /** Index into subchannels for last pick. */
+ /** Index into subchannel_list_ for last pick. */
size_t last_ready_subchannel_index_ = 0;
- /** Latest version of the subchannel list.
- * Subchannel connectivity callbacks will only promote updated subchannel
- * lists if they equal \a latest_pending_subchannel_list. In other words,
- * racing callbacks that reference outdated subchannel lists won't perform any
- * update. */
- grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr;
};
RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
@@ -115,7 +179,7 @@ RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
UpdateLocked(*args.args);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Created with %" PRIuPTR " subchannels", this,
- subchannel_list_->num_subchannels);
+ subchannel_list_->num_subchannels());
}
grpc_subchannel_index_ref();
}
@@ -144,30 +208,30 @@ size_t RoundRobin::GetNextReadySubchannelIndexLocked() {
"[RR %p] getting next ready subchannel (out of %" PRIuPTR
"), "
"last_ready_subchannel_index=%" PRIuPTR,
- this, subchannel_list_->num_subchannels,
+ this, subchannel_list_->num_subchannels(),
last_ready_subchannel_index_);
}
- for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
+ for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
const size_t index = (i + last_ready_subchannel_index_ + 1) %
- subchannel_list_->num_subchannels;
+ subchannel_list_->num_subchannels();
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR
": state=%s",
- this, subchannel_list_->subchannels[index].subchannel,
- subchannel_list_, index,
+ this, subchannel_list_->subchannel(index)->subchannel(),
+ subchannel_list_.get(), index,
grpc_connectivity_state_name(
- subchannel_list_->subchannels[index].curr_connectivity_state));
+ subchannel_list_->subchannel(index)->connectivity_state()));
}
- if (subchannel_list_->subchannels[index].curr_connectivity_state ==
+ if (subchannel_list_->subchannel(index)->connectivity_state() ==
GRPC_CHANNEL_READY) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] found next ready subchannel (%p) at index %" PRIuPTR
" of subchannel_list %p",
- this, subchannel_list_->subchannels[index].subchannel, index,
- subchannel_list_);
+ this, subchannel_list_->subchannel(index)->subchannel(),
+ index, subchannel_list_.get());
}
return index;
}
@@ -175,21 +239,21 @@ size_t RoundRobin::GetNextReadySubchannelIndexLocked() {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", this);
}
- return subchannel_list_->num_subchannels;
+ return subchannel_list_->num_subchannels();
}
// Sets last_ready_subchannel_index_ to last_ready_index.
void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) {
- GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels);
+ GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels());
last_ready_subchannel_index_ = last_ready_index;
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] setting last_ready_subchannel_index=%" PRIuPTR
" (SC %p, CSC %p)",
this, last_ready_index,
- subchannel_list_->subchannels[last_ready_index].subchannel,
- subchannel_list_->subchannels[last_ready_index]
- .connected_subchannel.get());
+ subchannel_list_->subchannel(last_ready_index)->subchannel(),
+ subchannel_list_->subchannel(last_ready_index)
+ ->connected_subchannel());
}
}
@@ -219,14 +283,12 @@ void RoundRobin::ShutdownLocked() {
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "rr_shutdown");
if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "sl_shutdown_rr_shutdown");
- subchannel_list_ = nullptr;
+ subchannel_list_->ShutdownLocked("rr_shutdown");
+ subchannel_list_.reset();
}
if (latest_pending_subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_, "sl_shutdown_pending_rr_shutdown");
- latest_pending_subchannel_list_ = nullptr;
+ latest_pending_subchannel_list_->ShutdownLocked("rr_shutdown");
+ latest_pending_subchannel_list_.reset();
}
TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
@@ -273,32 +335,12 @@ void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
GRPC_ERROR_UNREF(error);
}
-void RoundRobin::SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- // TODO(roth): We currently track this ref manually. Once the new
- // ClosureRef API is ready and the subchannel_list code has been
- // converted to a C++ API, find a way to hold the RefCountedPtr<>
- // somewhere (maybe in the subchannel_data object) instead of doing
- // this manually.
- auto self = Ref(DEBUG_LOCATION, reason);
- self.release();
- grpc_lb_subchannel_list_ref(subchannel_list, reason);
-}
-
-void RoundRobin::SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- Unref(DEBUG_LOCATION, reason);
- grpc_lb_subchannel_list_unref(subchannel_list, reason);
-}
-
void RoundRobin::StartPickingLocked() {
started_picking_ = true;
- for (size_t i = 0; i < subchannel_list_->num_subchannels; i++) {
- if (subchannel_list_->subchannels[i].subchannel != nullptr) {
- SubchannelListRefForConnectivityWatch(subchannel_list_,
- "connectivity_watch");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list_->subchannels[i]);
+ for (size_t i = 0; i < subchannel_list_->num_subchannels(); i++) {
+ if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
+ subchannel_list_->RefForConnectivityWatch("connectivity_watch");
+ subchannel_list_->subchannel(i)->StartConnectivityWatchLocked();
}
}
}
@@ -317,21 +359,21 @@ bool RoundRobin::PickLocked(PickState* pick) {
GPR_ASSERT(!shutdown_);
if (subchannel_list_ != nullptr) {
const size_t next_ready_index = GetNextReadySubchannelIndexLocked();
- if (next_ready_index < subchannel_list_->num_subchannels) {
+ if (next_ready_index < subchannel_list_->num_subchannels()) {
/* readily available, report right away */
- grpc_lb_subchannel_data* sd =
- &subchannel_list_->subchannels[next_ready_index];
- pick->connected_subchannel = sd->connected_subchannel;
+ RoundRobinSubchannelData* sd =
+ subchannel_list_->subchannel(next_ready_index);
+ pick->connected_subchannel = sd->connected_subchannel()->Ref();
if (pick->user_data != nullptr) {
- *pick->user_data = sd->user_data;
+ *pick->user_data = sd->user_data();
}
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
"index %" PRIuPTR ")",
- this, sd->subchannel, pick->connected_subchannel.get(),
- sd->subchannel_list, next_ready_index);
+ this, sd->subchannel(), pick->connected_subchannel.get(),
+ sd->subchannel_list(), next_ready_index);
}
/* only advance the last picked pointer if the selection was used */
UpdateLastReadySubchannelIndexLocked(next_ready_index);
@@ -347,36 +389,12 @@ bool RoundRobin::PickLocked(PickState* pick) {
return false;
}
-void UpdateStateCountersLocked(grpc_lb_subchannel_data* sd) {
- grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
- GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
- GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
- if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
- GPR_ASSERT(subchannel_list->num_ready > 0);
- --subchannel_list->num_ready;
- } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- GPR_ASSERT(subchannel_list->num_transient_failures > 0);
- --subchannel_list->num_transient_failures;
- } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
- GPR_ASSERT(subchannel_list->num_idle > 0);
- --subchannel_list->num_idle;
- }
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
- ++subchannel_list->num_ready;
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- ++subchannel_list->num_transient_failures;
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
- ++subchannel_list->num_idle;
- }
-}
-
/** Sets the policy's connectivity status based on that of the passed-in \a sd
* (the grpc_lb_subchannel_data associated with the updated subchannel) and the
* subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used
* only if the policy transitions to state TRANSIENT_FAILURE. */
-void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
- grpc_error* error) {
+void RoundRobin::UpdateConnectivityStateLocked(grpc_connectivity_state state,
+ grpc_error* error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
@@ -391,18 +409,16 @@ void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
* CHECK: subchannel_list->num_transient_failures ==
* subchannel_list->num_subchannels.
*/
- grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
- GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE);
- if (subchannel_list->num_ready > 0) {
+ if (subchannel_list_->num_ready() > 0) {
/* 1) READY */
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) {
+ } else if (state == GRPC_CHANNEL_CONNECTING) {
/* 2) CONNECTING */
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "rr_connecting");
- } else if (subchannel_list->num_transient_failures ==
- subchannel_list->num_subchannels) {
+ } else if (subchannel_list_->num_transient_failure() ==
+ subchannel_list_->num_subchannels()) {
/* 3) TRANSIENT_FAILURE */
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error),
@@ -411,99 +427,134 @@ void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
GRPC_ERROR_UNREF(error);
}
-void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
- grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg);
- RoundRobin* p = static_cast<RoundRobin*>(sd->subchannel_list->policy);
+void RoundRobin::RoundRobinSubchannelList::RefForConnectivityWatch(
+ const char* reason) {
+ // TODO(roth): We currently track these refs manually. Once the new
+ // ClosureRef API is ready, find a way to pass the RefCountedPtr<>
+ // along with the closures instead of doing this manually.
+ // Ref subchannel list.
+ Ref(DEBUG_LOCATION, reason).release();
+ // Ref LB policy.
+ RoundRobin* p = static_cast<RoundRobin*>(policy());
+ p->Ref(DEBUG_LOCATION, reason).release();
+}
+
+void RoundRobin::RoundRobinSubchannelList::UnrefForConnectivityWatch(
+ const char* reason) {
+ // Unref LB policy.
+ RoundRobin* p = static_cast<RoundRobin*>(policy());
+ p->Unref(DEBUG_LOCATION, reason);
+ // Unref subchannel list.
+ Unref(DEBUG_LOCATION, reason);
+}
+
+void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
+ grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
+ GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN);
+ GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
+ if (old_state == GRPC_CHANNEL_READY) {
+ GPR_ASSERT(num_ready_ > 0);
+ --num_ready_;
+ } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(num_transient_failure_ > 0);
+ --num_transient_failure_;
+ } else if (old_state == GRPC_CHANNEL_IDLE) {
+ GPR_ASSERT(num_idle_ > 0);
+ --num_idle_;
+ }
+ if (new_state == GRPC_CHANNEL_READY) {
+ ++num_ready_;
+ } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ ++num_transient_failure_;
+ } else if (new_state == GRPC_CHANNEL_IDLE) {
+ ++num_idle_;
+ }
+}
+
+void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
+ grpc_error* error) {
+ RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"[RR %p] connectivity changed for subchannel %p, subchannel_list %p: "
"prev_state=%s new_state=%s p->shutdown=%d "
"sd->subchannel_list->shutting_down=%d error=%s",
- p, sd->subchannel, sd->subchannel_list,
- grpc_connectivity_state_name(sd->prev_connectivity_state),
- grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
- p->shutdown_, sd->subchannel_list->shutting_down,
+ p, subchannel(), subchannel_list(),
+ grpc_connectivity_state_name(prev_connectivity_state_),
+ grpc_connectivity_state_name(connectivity_state()),
+ p->shutdown_, subchannel_list()->shutting_down(),
grpc_error_string(error));
}
- GPR_ASSERT(sd->subchannel != nullptr);
+ GPR_ASSERT(subchannel() != nullptr);
// If the policy is shutting down, unref and return.
if (p->shutdown_) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "rr_shutdown");
+ StopConnectivityWatchLocked();
+ UnrefSubchannelLocked("rr_shutdown");
+ subchannel_list()->UnrefForConnectivityWatch("rr_shutdown");
return;
}
// If the subchannel list is shutting down, stop watching.
- if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "rr_sl_shutdown");
+ if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) {
+ StopConnectivityWatchLocked();
+ UnrefSubchannelLocked("rr_sl_shutdown");
+ subchannel_list()->UnrefForConnectivityWatch("rr_sl_shutdown");
return;
}
+ GPR_ASSERT(connectivity_state() != GRPC_CHANNEL_SHUTDOWN);
// If we're still here, the notification must be for a subchannel in
// either the current or latest pending subchannel lists.
- GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ ||
- sd->subchannel_list == p->latest_pending_subchannel_list_);
- GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN);
- // Now that we're inside the combiner, copy the pending connectivity
- // state (which was set by the connectivity state watcher) to
- // curr_connectivity_state, which is what we use inside of the combiner.
- sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
+ GPR_ASSERT(p->subchannel_list_ == subchannel_list() ||
+ p->latest_pending_subchannel_list_ == subchannel_list());
// If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
// subchannel, if any.
- switch (sd->curr_connectivity_state) {
+ switch (connectivity_state()) {
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- sd->connected_subchannel.reset();
+ clear_connected_subchannel();
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
"Requesting re-resolution",
- p, sd->subchannel);
+ p, subchannel());
}
p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
break;
}
case GRPC_CHANNEL_READY: {
- if (sd->connected_subchannel == nullptr) {
- sd->connected_subchannel =
- grpc_subchannel_get_connected_subchannel(sd->subchannel);
+ if (connected_subchannel() == nullptr) {
+ GetConnectedSubchannelFromSubchannelLocked();
}
- if (sd->subchannel_list != p->subchannel_list_) {
- // promote sd->subchannel_list to p->subchannel_list_.
- // sd->subchannel_list must be equal to
+ if (p->subchannel_list_ != subchannel_list()) {
+ // promote subchannel_list() to p->subchannel_list_.
+ // subchannel_list() must be equal to
// p->latest_pending_subchannel_list_ because we have already filtered
- // for sds belonging to outdated subchannel lists.
- GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list_);
- GPR_ASSERT(!sd->subchannel_list->shutting_down);
+ // for subchannels belonging to outdated subchannel lists.
+ GPR_ASSERT(p->latest_pending_subchannel_list_ == subchannel_list());
+ GPR_ASSERT(!subchannel_list()->shutting_down());
if (grpc_lb_round_robin_trace.enabled()) {
const size_t num_subchannels =
p->subchannel_list_ != nullptr
- ? p->subchannel_list_->num_subchannels
+ ? p->subchannel_list_->num_subchannels()
: 0;
gpr_log(GPR_DEBUG,
"[RR %p] phasing out subchannel list %p (size %" PRIuPTR
") in favor of %p (size %" PRIuPTR ")",
- p, p->subchannel_list_, num_subchannels, sd->subchannel_list,
- num_subchannels);
+ p, p->subchannel_list_.get(), num_subchannels,
+ subchannel_list(), subchannel_list()->num_subchannels());
}
if (p->subchannel_list_ != nullptr) {
// dispose of the current subchannel_list
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_,
- "sl_phase_out_shutdown");
+ p->subchannel_list_->ShutdownLocked("sl_phase_out_shutdown");
}
- p->subchannel_list_ = p->latest_pending_subchannel_list_;
- p->latest_pending_subchannel_list_ = nullptr;
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemptively replicates rr_pick()'s actions. */
const size_t next_ready_index = p->GetNextReadySubchannelIndexLocked();
- GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels);
- grpc_lb_subchannel_data* selected =
- &p->subchannel_list_->subchannels[next_ready_index];
+ GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels());
+ RoundRobinSubchannelData* selected =
+ p->subchannel_list_->subchannel(next_ready_index);
if (p->pending_picks_ != nullptr) {
// if the selected subchannel is going to be used for the pending
// picks, update the last picked pointer
@@ -512,15 +563,15 @@ void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
PickState* pick;
while ((pick = p->pending_picks_)) {
p->pending_picks_ = pick->next;
- pick->connected_subchannel = selected->connected_subchannel;
+ pick->connected_subchannel = selected->connected_subchannel()->Ref();
if (pick->user_data != nullptr) {
- *pick->user_data = selected->user_data;
+ *pick->user_data = selected->user_data();
}
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] Fulfilling pending pick. Target <-- subchannel %p "
"(subchannel_list %p, index %" PRIuPTR ")",
- p, selected->subchannel, p->subchannel_list_,
+ p, selected->subchannel(), p->subchannel_list_.get(),
next_ready_index);
}
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
@@ -533,13 +584,16 @@ void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_IDLE:; // fallthrough
}
// Update state counters.
- UpdateStateCountersLocked(sd);
+ subchannel_list()->UpdateStateCountersLocked(prev_connectivity_state_,
+ connectivity_state());
+ prev_connectivity_state_ = connectivity_state();
// Only update connectivity based on the selected subchannel list.
- if (sd->subchannel_list == p->subchannel_list_) {
- p->UpdateConnectivityStatusLocked(sd, GRPC_ERROR_REF(error));
+ if (p->subchannel_list_ == subchannel_list()) {
+ p->UpdateConnectivityStateLocked(connectivity_state(),
+ GRPC_ERROR_REF(error));
}
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ StartConnectivityWatchLocked();
}
grpc_connectivity_state RoundRobin::CheckConnectivityLocked(
@@ -556,10 +610,10 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
grpc_closure* on_ack) {
const size_t next_ready_index = GetNextReadySubchannelIndexLocked();
- if (next_ready_index < subchannel_list_->num_subchannels) {
- grpc_lb_subchannel_data* selected =
- &subchannel_list_->subchannels[next_ready_index];
- selected->connected_subchannel->Ping(on_initiate, on_ack);
+ if (next_ready_index < subchannel_list_->num_subchannels()) {
+ RoundRobinSubchannelData* selected =
+ subchannel_list_->subchannel(next_ready_index);
+ selected->connected_subchannel()->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));
@@ -587,30 +641,29 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses",
this, addresses->num_addresses);
}
- grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
+ auto subchannel_list = MakeRefCounted<RoundRobinSubchannelList>(
this, &grpc_lb_round_robin_trace, addresses, combiner(),
- client_channel_factory(), args, &RoundRobin::OnConnectivityChangedLocked);
- if (subchannel_list->num_subchannels == 0) {
+ client_channel_factory(), args);
+ if (subchannel_list->num_subchannels() == 0) {
grpc_connectivity_state_set(
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"rr_update_empty");
if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "sl_shutdown_empty_update");
+ subchannel_list_->ShutdownLocked("sl_shutdown_empty_update");
}
- subchannel_list_ = subchannel_list; // empty list
+ subchannel_list_ = std::move(subchannel_list); // empty list
return;
}
if (started_picking_) {
- for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
+ for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
const grpc_connectivity_state subchannel_state =
- grpc_subchannel_check_connectivity(
- subchannel_list->subchannels[i].subchannel, nullptr);
+ subchannel_list->subchannel(i)->CheckConnectivityStateLocked();
// Override the default setting of IDLE for connectivity notification
// purposes if the subchannel is already in transient failure. Otherwise
// we'd be immediately notified of the IDLE-TRANSIENT_FAILURE
- // discrepancy, attempt to re-resolve and end up here again.
+ // discrepancy, attempt to re-resolve, and end up here again.
+// FIXME
// TODO(roth): As part of C++-ifying the subchannel_list API, design a
// better API for notifying the LB policy of subchannel states, which can
// be used both for the subchannel's initial state and for subsequent
@@ -619,43 +672,36 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
// pending picks across all READY subchannels rather than sending them all
// to the first one).
if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- subchannel_list->subchannels[i].pending_connectivity_state_unsafe =
- subchannel_list->subchannels[i].curr_connectivity_state =
- subchannel_list->subchannels[i].prev_connectivity_state =
- subchannel_state;
- --subchannel_list->num_idle;
- ++subchannel_list->num_transient_failures;
+ subchannel_list->UpdateStateCountersLocked(GRPC_CHANNEL_IDLE,
+ subchannel_state);
}
}
+ for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
+ /* Watch every new subchannel. A subchannel list becomes active the
+ * moment one of its subchannels is READY. At that moment, we swap
+ * p->subchannel_list for sd->subchannel_list, provided the subchannel
+ * list is still valid (ie, isn't shutting down) */
+ subchannel_list->RefForConnectivityWatch("connectivity_watch");
+ subchannel_list->subchannel(i)->StartConnectivityWatchLocked();
+ }
if (latest_pending_subchannel_list_ != nullptr) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] Shutting down latest pending subchannel list %p, "
"about to be replaced by newer latest %p",
- this, latest_pending_subchannel_list_, subchannel_list);
+ this, latest_pending_subchannel_list_.get(),
+ subchannel_list.get());
}
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_, "sl_outdated");
- }
- latest_pending_subchannel_list_ = subchannel_list;
- for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
- /* Watch every new subchannel. A subchannel list becomes active the
- * moment one of its subchannels is READY. At that moment, we swap
- * p->subchannel_list for sd->subchannel_list, provided the subchannel
- * list is still valid (ie, isn't shutting down) */
- SubchannelListRefForConnectivityWatch(subchannel_list,
- "connectivity_watch");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list->subchannels[i]);
+ latest_pending_subchannel_list_->ShutdownLocked("sl_outdated");
}
+ latest_pending_subchannel_list_ = std::move(subchannel_list);
} else {
// The policy isn't picking yet. Save the update for later, disposing of
// previous version if any.
if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- subchannel_list_, "rr_update_before_started_picking");
+ subchannel_list_->ShutdownLocked("rr_update_before_started_picking");
}
- subchannel_list_ = subchannel_list;
+ subchannel_list_ = std::move(subchannel_list);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
deleted file mode 100644
index 79cb64c6c6..0000000000
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-
-#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
-#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/combiner.h"
-#include "src/core/lib/iomgr/sockaddr_utils.h"
-#include "src/core/lib/transport/connectivity_state.h"
-
-void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
- const char* reason) {
- if (sd->subchannel != nullptr) {
- if (sd->subchannel_list->tracer->enabled()) {
- gpr_log(GPR_DEBUG,
- "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
- " (subchannel %p): unreffing subchannel",
- sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
- sd->subchannel_list,
- static_cast<size_t>(sd - sd->subchannel_list->subchannels),
- sd->subchannel_list->num_subchannels, sd->subchannel);
- }
- GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason);
- sd->subchannel = nullptr;
- sd->connected_subchannel.reset();
- if (sd->user_data != nullptr) {
- GPR_ASSERT(sd->user_data_vtable != nullptr);
- sd->user_data_vtable->destroy(sd->user_data);
- sd->user_data = nullptr;
- }
- }
-}
-
-void grpc_lb_subchannel_data_start_connectivity_watch(
- grpc_lb_subchannel_data* sd) {
- if (sd->subchannel_list->tracer->enabled()) {
- gpr_log(
- GPR_DEBUG,
- "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
- " (subchannel %p): requesting connectivity change "
- "notification (from %s)",
- sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
- sd->subchannel_list,
- static_cast<size_t>(sd - sd->subchannel_list->subchannels),
- sd->subchannel_list->num_subchannels, sd->subchannel,
- grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe));
- }
- sd->connectivity_notification_pending = true;
- grpc_subchannel_notify_on_state_change(
- sd->subchannel, sd->subchannel_list->policy->interested_parties(),
- &sd->pending_connectivity_state_unsafe,
- &sd->connectivity_changed_closure);
-}
-
-void grpc_lb_subchannel_data_stop_connectivity_watch(
- grpc_lb_subchannel_data* sd) {
- if (sd->subchannel_list->tracer->enabled()) {
- gpr_log(GPR_DEBUG,
- "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
- " (subchannel %p): stopping connectivity watch",
- sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
- sd->subchannel_list,
- static_cast<size_t>(sd - sd->subchannel_list->subchannels),
- sd->subchannel_list->num_subchannels, sd->subchannel);
- }
- GPR_ASSERT(sd->connectivity_notification_pending);
- sd->connectivity_notification_pending = false;
-}
-
-grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
- grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer,
- const grpc_lb_addresses* addresses, grpc_combiner* combiner,
- grpc_client_channel_factory* client_channel_factory,
- const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb) {
- grpc_lb_subchannel_list* subchannel_list =
- static_cast<grpc_lb_subchannel_list*>(
- gpr_zalloc(sizeof(*subchannel_list)));
- if (tracer->enabled()) {
- gpr_log(GPR_DEBUG,
- "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
- tracer->name(), p, subchannel_list, addresses->num_addresses);
- }
- subchannel_list->policy = p;
- subchannel_list->tracer = tracer;
- gpr_ref_init(&subchannel_list->refcount, 1);
- subchannel_list->subchannels = static_cast<grpc_lb_subchannel_data*>(
- gpr_zalloc(sizeof(grpc_lb_subchannel_data) * addresses->num_addresses));
- // We need to remove the LB addresses in order to be able to compare the
- // subchannel keys of subchannels from a different batch of addresses.
- static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
- GRPC_ARG_LB_ADDRESSES};
- // Create a subchannel for each address.
- grpc_subchannel_args sc_args;
- size_t subchannel_index = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- // If there were any balancer, we would have chosen grpclb policy instead.
- GPR_ASSERT(!addresses->addresses[i].is_balancer);
- memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- grpc_arg addr_arg =
- grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
- grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
- &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1);
- gpr_free(addr_arg.value.string);
- sc_args.args = new_args;
- grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
- client_channel_factory, &sc_args);
- grpc_channel_args_destroy(new_args);
- if (subchannel == nullptr) {
- // Subchannel could not be created.
- if (tracer->enabled()) {
- char* address_uri =
- grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_DEBUG,
- "[%s %p] could not create subchannel for address uri %s, "
- "ignoring",
- tracer->name(), subchannel_list->policy, address_uri);
- gpr_free(address_uri);
- }
- continue;
- }
- if (tracer->enabled()) {
- char* address_uri =
- grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_DEBUG,
- "[%s %p] subchannel list %p index %" PRIuPTR
- ": Created subchannel %p for address uri %s",
- tracer->name(), p, subchannel_list, subchannel_index, subchannel,
- address_uri);
- gpr_free(address_uri);
- }
- grpc_lb_subchannel_data* sd =
- &subchannel_list->subchannels[subchannel_index++];
- sd->subchannel_list = subchannel_list;
- sd->subchannel = subchannel;
- GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure,
- connectivity_changed_cb, sd,
- grpc_combiner_scheduler(combiner));
- // We assume that the current state is IDLE. If not, we'll get a
- // callback telling us that.
- sd->prev_connectivity_state = GRPC_CHANNEL_IDLE;
- sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
- sd->pending_connectivity_state_unsafe = GRPC_CHANNEL_IDLE;
- sd->user_data_vtable = addresses->user_data_vtable;
- if (sd->user_data_vtable != nullptr) {
- sd->user_data =
- sd->user_data_vtable->copy(addresses->addresses[i].user_data);
- }
- }
- subchannel_list->num_subchannels = subchannel_index;
- subchannel_list->num_idle = subchannel_index;
- return subchannel_list;
-}
-
-static void subchannel_list_destroy(grpc_lb_subchannel_list* subchannel_list) {
- if (subchannel_list->tracer->enabled()) {
- gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p",
- subchannel_list->tracer->name(), subchannel_list->policy,
- subchannel_list);
- }
- for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
- grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i];
- grpc_lb_subchannel_data_unref_subchannel(sd, "subchannel_list_destroy");
- }
- gpr_free(subchannel_list->subchannels);
- gpr_free(subchannel_list);
-}
-
-void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason) {
- gpr_ref_non_zero(&subchannel_list->refcount);
- if (subchannel_list->tracer->enabled()) {
- const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
- gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p REF %lu->%lu (%s)",
- subchannel_list->tracer->name(), subchannel_list->policy,
- subchannel_list, static_cast<unsigned long>(count - 1),
- static_cast<unsigned long>(count), reason);
- }
-}
-
-void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason) {
- const bool done = gpr_unref(&subchannel_list->refcount);
- if (subchannel_list->tracer->enabled()) {
- const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
- gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p UNREF %lu->%lu (%s)",
- subchannel_list->tracer->name(), subchannel_list->policy,
- subchannel_list, static_cast<unsigned long>(count + 1),
- static_cast<unsigned long>(count), reason);
- }
- if (done) {
- subchannel_list_destroy(subchannel_list);
- }
-}
-
-static void subchannel_data_cancel_connectivity_watch(
- grpc_lb_subchannel_data* sd, const char* reason) {
- if (sd->subchannel_list->tracer->enabled()) {
- gpr_log(GPR_DEBUG,
- "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
- " (subchannel %p): canceling connectivity watch (%s)",
- sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
- sd->subchannel_list,
- static_cast<size_t>(sd - sd->subchannel_list->subchannels),
- sd->subchannel_list->num_subchannels, sd->subchannel, reason);
- }
- grpc_subchannel_notify_on_state_change(sd->subchannel, nullptr, nullptr,
- &sd->connectivity_changed_closure);
-}
-
-void grpc_lb_subchannel_list_shutdown_and_unref(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- if (subchannel_list->tracer->enabled()) {
- gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)",
- subchannel_list->tracer->name(), subchannel_list->policy,
- subchannel_list, reason);
- }
- GPR_ASSERT(!subchannel_list->shutting_down);
- subchannel_list->shutting_down = true;
- for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
- grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i];
- // If there's a pending notification for this subchannel, cancel it;
- // the callback is responsible for unreffing the subchannel.
- // Otherwise, unref the subchannel directly.
- if (sd->connectivity_notification_pending) {
- subchannel_data_cancel_connectivity_watch(sd, reason);
- } else if (sd->subchannel != nullptr) {
- grpc_lb_subchannel_data_unref_subchannel(sd, reason);
- }
- }
- grpc_lb_subchannel_list_unref(subchannel_list, reason);
-}
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 6889d596ac..d717ba0e37 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -21,116 +21,388 @@
#include <grpc/support/port_platform.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
-// TODO(roth): This code is intended to be shared between pick_first and
-// round_robin. However, the interface needs more work to provide clean
-// encapsulation. For example, the structs here have some fields that are
-// only used in one of the two (e.g., the state counters in
-// grpc_lb_subchannel_list and the prev_connectivity_state field in
-// grpc_lb_subchannel_data are only used in round_robin, and the
-// checking_subchannel field in grpc_lb_subchannel_list is only used by
-// pick_first). Also, there is probably some code duplication between the
-// connectivity state notification callback code in both pick_first and
-// round_robin that could be refactored and moved here. In a future PR,
-// need to clean this up.
-
-typedef struct grpc_lb_subchannel_list grpc_lb_subchannel_list;
-
-typedef struct {
- /** backpointer to owning subchannel list */
- grpc_lb_subchannel_list* subchannel_list;
- /** subchannel itself */
- grpc_subchannel* subchannel;
- grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
- /** Is a connectivity notification pending? */
- bool connectivity_notification_pending;
- /** notification that connectivity has changed on subchannel */
- grpc_closure connectivity_changed_closure;
- /** previous and current connectivity states. Updated by \a
- * \a connectivity_changed_closure based on
- * \a pending_connectivity_state_unsafe. */
- grpc_connectivity_state prev_connectivity_state;
- grpc_connectivity_state curr_connectivity_state;
- /** connectivity state to be updated by
- * grpc_subchannel_notify_on_state_change(), not guarded by
- * the combiner. To be copied to \a curr_connectivity_state by
- * \a connectivity_changed_closure. */
- grpc_connectivity_state pending_connectivity_state_unsafe;
- /** the subchannel's target user data */
- void* user_data;
- /** vtable to operate over \a user_data */
- const grpc_lb_user_data_vtable* user_data_vtable;
-} grpc_lb_subchannel_data;
-
-/// Unrefs the subchannel contained in sd.
-void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
- const char* reason);
-
-/// Starts watching the connectivity state of the subchannel.
-/// The connectivity_changed_cb callback must invoke either
-/// grpc_lb_subchannel_data_stop_connectivity_watch() or again call
-/// grpc_lb_subchannel_data_start_connectivity_watch().
-void grpc_lb_subchannel_data_start_connectivity_watch(
- grpc_lb_subchannel_data* sd);
-
-/// Stops watching the connectivity state of the subchannel.
-void grpc_lb_subchannel_data_stop_connectivity_watch(
- grpc_lb_subchannel_data* sd);
-
-struct grpc_lb_subchannel_list {
- /** backpointer to owning policy */
- grpc_core::LoadBalancingPolicy* policy;
-
- grpc_core::TraceFlag* tracer;
-
- /** all our subchannels */
- size_t num_subchannels;
- grpc_lb_subchannel_data* subchannels;
-
- /** Index into subchannels of the one we're currently checking.
- * Used when connecting to subchannels serially instead of in parallel. */
- // TODO(roth): When we have time, we can probably make this go away
- // and compute the index dynamically by subtracting
- // subchannel_list->subchannels from the subchannel_data pointer.
- size_t checking_subchannel;
-
- /** how many subchannels are in state READY */
- size_t num_ready;
- /** how many subchannels are in state TRANSIENT_FAILURE */
- size_t num_transient_failures;
- /** how many subchannels are in state IDLE */
- size_t num_idle;
-
- /** There will be one ref for each entry in subchannels for which there is a
- * pending connectivity state watcher callback. */
- gpr_refcount refcount;
-
- /** Is this list shutting down? This may be true due to the shutdown of the
- * policy itself or because a newer update has arrived while this one hadn't
- * finished processing. */
- bool shutting_down;
+// FIXME: add comments
+
+namespace grpc_core {
+
+template <typename SubchannelListType, typename SubchannelDataType>
+class SubchannelData {
+ public:
+ // Returns the index into subchannel_list_ of this object.
+ size_t Index() const {
+ return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
+ subchannel_list_->subchannel(0));
+ }
+
+ SubchannelListType* subchannel_list() const { return subchannel_list_; }
+
+ grpc_subchannel* subchannel() const { return subchannel_; }
+
+ ConnectedSubchannel* connected_subchannel() const {
+ return connected_subchannel_.get();
+ }
+
+// FIXME: maybe do this automatically in OnConnectivityChangedLocked()
+// when the new state is TRANSIENT_FAILURE?
+ void clear_connected_subchannel() { connected_subchannel_.reset(); }
+
+ void GetConnectedSubchannelFromSubchannelLocked() {
+ connected_subchannel_ =
+ grpc_subchannel_get_connected_subchannel(subchannel_);
+ }
+
+ void SetConnectedSubchannelFromLocked(SubchannelData* other) {
+ connected_subchannel_ = other->connected_subchannel_; // Adds ref.
+ }
+
+ bool connectivity_notification_pending() const {
+ return connectivity_notification_pending_;
+ }
+ grpc_connectivity_state connectivity_state() const {
+ return curr_connectivity_state_;
+ }
+
+ virtual grpc_connectivity_state CheckConnectivityStateLocked() {
+ pending_connectivity_state_unsafe_ =
+ grpc_subchannel_check_connectivity(subchannel(), nullptr);
+ curr_connectivity_state_ = pending_connectivity_state_unsafe_;
+ return curr_connectivity_state_;
+ }
+
+ // Unrefs the subchannel.
+ virtual void UnrefSubchannelLocked(const char* reason);
+
+ /// Starts watching the connectivity state of the subchannel.
+ /// The connectivity_changed_cb callback must invoke either
+ /// StopConnectivityWatch() or again call StartConnectivityWatch().
+ void StartConnectivityWatchLocked();
+
+ /// Stops watching the connectivity state of the subchannel.
+ void StopConnectivityWatchLocked();
+
+ /// Cancels watching the connectivity state of the subchannel.
+ void CancelConnectivityWatchLocked(const char* reason);
+
+ void ShutdownLocked(const char* reason);
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ SubchannelData(
+ SubchannelListType* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
+ grpc_combiner* combiner);
+
+ virtual ~SubchannelData();
+
+// FIXME: define API
+ virtual void ProcessConnectivityChangeLocked(grpc_error* error) GRPC_ABSTRACT;
+
+ private:
+ static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
+
+ // Backpointer to owning subchannel list. Not owned.
+ SubchannelListType* subchannel_list_;
+
+ // The subchannel and connected subchannel.
+ grpc_subchannel* subchannel_;
+ RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
+
+ // Notification that connectivity has changed on subchannel.
+ grpc_closure connectivity_changed_closure_;
+ // Is a connectivity notification pending?
+ bool connectivity_notification_pending_;
+ // Connectivity state to be updated by
+ // grpc_subchannel_notify_on_state_change(), not guarded by
+ // the combiner. Will be copied to \a curr_connectivity_state_ by
+ // \a connectivity_changed_closure_.
+ grpc_connectivity_state pending_connectivity_state_unsafe_;
+ // Current connectivity state.
+ grpc_connectivity_state curr_connectivity_state_;
};
-grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
- grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer,
+template <typename SubchannelListType, typename SubchannelDataType>
+class SubchannelList
+ : public RefCountedWithTracing<SubchannelListType> {
+ public:
+ typedef InlinedVector<SubchannelDataType, 10> SubchannelVector;
+
+ size_t num_subchannels() const { return subchannels_.size(); }
+ SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
+
+ // Marks the subchannel_list as discarded. Unsubscribes all its subchannels.
+ void ShutdownLocked(const char* reason);
+
+ bool shutting_down() const { return shutting_down_; }
+
+ LoadBalancingPolicy* policy() const { return policy_; }
+ TraceFlag* tracer() const { return tracer_; }
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
+ const grpc_lb_addresses* addresses, grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ const grpc_channel_args& args);
+
+ virtual ~SubchannelList();
+
+ private:
+ // So New() can call our private ctor.
+ template <typename T, typename... Args>
+ friend T* New(Args&&... args);
+
+ // Backpointer to owning policy.
+ LoadBalancingPolicy* policy_;
+
+ TraceFlag* tracer_;
+
+ // The list of subchannels.
+ SubchannelVector subchannels_;
+
+ // Is this list shutting down? This may be true due to the shutdown of the
+ // policy itself or because a newer update has arrived while this one hadn't
+ // finished processing.
+ bool shutting_down_ = false;
+};
+
+//
+// implementation -- no user-servicable parts below
+//
+
+//
+// SubchannelData
+//
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
+ SubchannelListType* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
+ grpc_combiner* combiner)
+ : subchannel_list_(subchannel_list),
+ subchannel_(subchannel),
+ // We assume that the current state is IDLE. If not, we'll get a
+ // callback telling us that.
+ pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE),
+ curr_connectivity_state_(GRPC_CHANNEL_IDLE) {
+ GRPC_CLOSURE_INIT(
+ &connectivity_changed_closure_,
+ (&SubchannelData<SubchannelListType,
+ SubchannelDataType>::OnConnectivityChangedLocked),
+ this, grpc_combiner_scheduler(combiner));
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
+ UnrefSubchannelLocked("subchannel_data_destroy");
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::UnrefSubchannelLocked(
+ const char* reason) {
+ if (subchannel_ != nullptr) {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_DEBUG,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): unreffing subchannel",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(),
+ subchannel_list_->num_subchannels(), subchannel_);
+ }
+ GRPC_SUBCHANNEL_UNREF(subchannel_, reason);
+ subchannel_ = nullptr;
+ connected_subchannel_.reset();
+ }
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::StartConnectivityWatchLocked() {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(
+ GPR_DEBUG,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): requesting connectivity change "
+ "notification (from %s)",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(),
+ subchannel_list_->num_subchannels(), subchannel_,
+ grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
+ }
+ connectivity_notification_pending_ = true;
+ grpc_subchannel_notify_on_state_change(
+ subchannel_, subchannel_list_->policy()->interested_parties(),
+ &pending_connectivity_state_unsafe_,
+ &connectivity_changed_closure_);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::StopConnectivityWatchLocked() {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_DEBUG,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): stopping connectivity watch",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(),
+ subchannel_list_->num_subchannels(), subchannel_);
+ }
+ GPR_ASSERT(connectivity_notification_pending_);
+ connectivity_notification_pending_ = false;
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::CancelConnectivityWatchLocked(
+ const char* reason) {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_DEBUG,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): canceling connectivity watch (%s)",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(),
+ subchannel_list_->num_subchannels(), subchannel_, reason);
+ }
+ grpc_subchannel_notify_on_state_change(subchannel_, nullptr, nullptr,
+ &connectivity_changed_closure_);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::OnConnectivityChangedLocked(
+ void* arg, grpc_error* error) {
+ SubchannelData* sd = static_cast<SubchannelData*>(arg);
+ // Now that we're inside the combiner, copy the pending connectivity
+ // state (which was set by the connectivity state watcher) to
+ // curr_connectivity_state_, which is what we use inside of the combiner.
+ sd->curr_connectivity_state_ = sd->pending_connectivity_state_unsafe_;
+ sd->ProcessConnectivityChangeLocked(error);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::ShutdownLocked(const char* reason) {
+ // If there's a pending notification for this subchannel, cancel it;
+ // the callback is responsible for unreffing the subchannel.
+ // Otherwise, unref the subchannel directly.
+ if (connectivity_notification_pending_) {
+ CancelConnectivityWatchLocked(reason);
+ } else if (subchannel_ != nullptr) {
+ UnrefSubchannelLocked(reason);
+ }
+}
+
+//
+// SubchannelList
+//
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
+ LoadBalancingPolicy* policy, TraceFlag* tracer,
const grpc_lb_addresses* addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
- const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb);
+ const grpc_channel_args& args)
+ : RefCountedWithTracing<SubchannelListType>(tracer),
+ policy_(policy),
+ tracer_(tracer) {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_DEBUG,
+ "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
+ tracer_->name(), policy, this, addresses->num_addresses);
+ }
+ subchannels_.reserve(addresses->num_addresses);
+ // We need to remove the LB addresses in order to be able to compare the
+ // subchannel keys of subchannels from a different batch of addresses.
+ static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
+ GRPC_ARG_LB_ADDRESSES};
+ // Create a subchannel for each address.
+ grpc_subchannel_args sc_args;
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ // If there were any balancer, we would have chosen grpclb policy instead.
+ GPR_ASSERT(!addresses->addresses[i].is_balancer);
+ memset(&sc_args, 0, sizeof(grpc_subchannel_args));
+ grpc_arg addr_arg =
+ grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
+ grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
+ &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1);
+ gpr_free(addr_arg.value.string);
+ sc_args.args = new_args;
+ grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
+ client_channel_factory, &sc_args);
+ grpc_channel_args_destroy(new_args);
+ if (subchannel == nullptr) {
+ // Subchannel could not be created.
+ if (tracer_->enabled()) {
+ char* address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_DEBUG,
+ "[%s %p] could not create subchannel for address uri %s, "
+ "ignoring",
+ tracer_->name(), policy_, address_uri);
+ gpr_free(address_uri);
+ }
+ continue;
+ }
+ if (tracer_->enabled()) {
+ char* address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_DEBUG,
+ "[%s %p] subchannel list %p index %" PRIuPTR
+ ": Created subchannel %p for address uri %s",
+ tracer_->name(), policy_, this, subchannels_.size(), subchannel,
+ address_uri);
+ gpr_free(address_uri);
+ }
+ subchannels_.emplace_back(static_cast<SubchannelListType*>(this),
+ addresses->user_data_vtable,
+ addresses->addresses[i], subchannel, combiner);
+ }
+}
-void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason);
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p",
+ tracer_->name(), policy_, this);
+ }
+}
-void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason);
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelList<SubchannelListType,
+ SubchannelDataType>::ShutdownLocked(const char* reason) {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)",
+ tracer_->name(), policy_, this, reason);
+ }
+ GPR_ASSERT(!shutting_down_);
+ shutting_down_ = true;
+ for (size_t i = 0; i < subchannels_.size(); i++) {
+ SubchannelDataType* sd = &subchannels_[i];
+ sd->ShutdownLocked(reason);
+ }
+}
-/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The
-/// connectivity state notification callback will ultimately unref it.
-void grpc_lb_subchannel_list_shutdown_and_unref(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
+} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */