diff options
author | 2018-03-30 13:28:56 -0700 | |
---|---|---|
committer | 2018-03-30 13:28:56 -0700 | |
commit | 7c1b5db3bb000a7c69d9d8151c66fecbacce64c3 (patch) | |
tree | 29f3f4b62097d3c0b7235e4b1ee89eb6b4023bc1 /src/core/ext/filters/client_channel/lb_policy | |
parent | d536dba4d3234ba28066a31f7a092ce9daa89ec4 (diff) |
Convert subchannel_list code to C++.
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy')
4 files changed, 775 insertions, 696 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 9090c34412..b593f93d7b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -62,31 +62,57 @@ class PickFirst : public LoadBalancingPolicy { private: ~PickFirst(); + class PickFirstSubchannelList; + + class PickFirstSubchannelData + : public SubchannelData<PickFirstSubchannelList, + PickFirstSubchannelData> { + public: + PickFirstSubchannelData(PickFirstSubchannelList* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, + grpc_subchannel* subchannel, + grpc_combiner* combiner) + : SubchannelData(subchannel_list, user_data_vtable, address, + subchannel, combiner) {} + + void ProcessConnectivityChangeLocked(grpc_error* error) override; + }; + + class PickFirstSubchannelList + : public SubchannelList<PickFirstSubchannelList, + PickFirstSubchannelData> { + public: + PickFirstSubchannelList( + PickFirst* policy, TraceFlag* tracer, + const grpc_lb_addresses* addresses, grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + const grpc_channel_args& args) + : SubchannelList(policy, tracer, addresses, combiner, + client_channel_factory, args) {} + + void RefForConnectivityWatch(const char* reason); + void UnrefForConnectivityWatch(const char* reason); + }; + void ShutdownLocked() override; void StartPickingLocked(); void DestroyUnselectedSubchannelsLocked(); - static void OnConnectivityChangedLocked(void* arg, grpc_error* error); - - void SubchannelListRefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); - void SubchannelListUnrefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); - - /** all our subchannels */ - grpc_lb_subchannel_list* subchannel_list_ = nullptr; - /** latest pending subchannel list */ - grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr; - /** selected subchannel in \a subchannel_list */ - grpc_lb_subchannel_data* selected_ = nullptr; - /** have we started picking? */ + // All our subchannels. + RefCountedPtr<PickFirstSubchannelList> subchannel_list_; + // Latest pending subchannel list. + RefCountedPtr<PickFirstSubchannelList> latest_pending_subchannel_list_; + // Selected subchannel in \a subchannel_list_. + PickFirstSubchannelData* selected_ = nullptr; + // Have we started picking? bool started_picking_ = false; - /** are we shut down? */ + // Are we shut down? bool shutdown_ = false; - /** list of picks that are waiting on connectivity */ + // List of picks that are waiting on connectivity. PickState* pending_picks_ = nullptr; - /** our connectivity state tracker */ + // Our connectivity state tracker. grpc_connectivity_state_tracker state_tracker_; }; @@ -138,13 +164,12 @@ void PickFirst::ShutdownLocked() { grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "shutdown"); if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "pf_shutdown"); - subchannel_list_ = nullptr; + subchannel_list_->ShutdownLocked("pf_shutdown"); + subchannel_list_.reset(); } if (latest_pending_subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(latest_pending_subchannel_list_, - "pf_shutdown"); - latest_pending_subchannel_list_ = nullptr; + latest_pending_subchannel_list_->ShutdownLocked("pf_shutdown"); + latest_pending_subchannel_list_.reset(); } TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); @@ -192,14 +217,12 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, void PickFirst::StartPickingLocked() { started_picking_ = true; - if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels > 0) { - subchannel_list_->checking_subchannel = 0; - for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { - if (subchannel_list_->subchannels[i].subchannel != nullptr) { - SubchannelListRefForConnectivityWatch( - subchannel_list_, "connectivity_watch+start_picking"); - grpc_lb_subchannel_data_start_connectivity_watch( - &subchannel_list_->subchannels[i]); + if (subchannel_list_ != nullptr) { + for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { + if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { + subchannel_list_->RefForConnectivityWatch( + "connectivity_watch+start_picking"); + subchannel_list_->subchannel(i)->StartConnectivityWatchLocked(); break; } } @@ -215,7 +238,7 @@ void PickFirst::ExitIdleLocked() { bool PickFirst::PickLocked(PickState* pick) { // If we have a selected subchannel already, return synchronously. if (selected_ != nullptr) { - pick->connected_subchannel = selected_->connected_subchannel; + pick->connected_subchannel = selected_->connected_subchannel()->Ref(); return true; } // No subchannel selected yet, so handle asynchronously. @@ -228,11 +251,10 @@ bool PickFirst::PickLocked(PickState* pick) { } void PickFirst::DestroyUnselectedSubchannelsLocked() { - for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { - grpc_lb_subchannel_data* sd = &subchannel_list_->subchannels[i]; + for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { + PickFirstSubchannelData* sd = subchannel_list_->subchannel(i); if (selected_ != sd) { - grpc_lb_subchannel_data_unref_subchannel(sd, - "selected_different_subchannel"); + sd->UnrefSubchannelLocked("selected_different_subchannel"); } } } @@ -249,7 +271,7 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current, void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { if (selected_ != nullptr) { - selected_->connected_subchannel->Ping(on_initiate, on_ack); + selected_->connected_subchannel()->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); @@ -258,24 +280,6 @@ void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { } } -void PickFirst::SubchannelListRefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - // TODO(roth): We currently track this ref manually. Once the new - // ClosureRef API is ready and the subchannel_list code has been - // converted to a C++ API, find a way to hold the RefCountedPtr<> - // somewhere (maybe in the subchannel_data object) instead of doing - // this manually. - auto self = Ref(DEBUG_LOCATION, reason); - self.release(); - grpc_lb_subchannel_list_ref(subchannel_list, reason); -} - -void PickFirst::SubchannelListUnrefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - Unref(DEBUG_LOCATION, reason); - grpc_lb_subchannel_list_unref(subchannel_list, reason); -} - void PickFirst::UpdateLocked(const grpc_channel_args& args) { const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { @@ -301,10 +305,10 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { "Pick First %p received update with %" PRIuPTR " addresses", this, addresses->num_addresses); } - grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( + auto subchannel_list = MakeRefCounted<PickFirstSubchannelList>( this, &grpc_lb_pick_first_trace, addresses, combiner(), - client_channel_factory(), args, &PickFirst::OnConnectivityChangedLocked); - if (subchannel_list->num_subchannels == 0) { + client_channel_factory(), args); + if (subchannel_list->num_subchannels() == 0) { // Empty update or no valid subchannels. Unsubscribe from all current // subchannels and put the channel in TRANSIENT_FAILURE. grpc_connectivity_state_set( @@ -312,10 +316,9 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "pf_update_empty"); if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, - "sl_shutdown_empty_update"); + subchannel_list_->ShutdownLocked("sl_shutdown_empty_update"); } - subchannel_list_ = subchannel_list; // Empty list. + subchannel_list_ = std::move(subchannel_list); // Empty list. selected_ = nullptr; return; } @@ -323,45 +326,48 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { // We don't yet have a selected subchannel, so replace the current // subchannel list immediately. if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, - "pf_update_before_selected"); + subchannel_list_->ShutdownLocked("pf_update_before_selected"); + } + subchannel_list_ = std::move(subchannel_list); + // If we've started picking, start trying to connect to the first + // subchannel in the new list. + if (started_picking_) { + subchannel_list_->RefForConnectivityWatch("connectivity_watch+update"); + subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); } - subchannel_list_ = subchannel_list; } else { // We do have a selected subchannel. // Check if it's present in the new list. If so, we're done. - for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { - grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; - if (sd->subchannel == selected_->subchannel) { + for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { + PickFirstSubchannelData* sd = subchannel_list->subchannel(i); + if (sd->subchannel() == selected_->subchannel()) { // The currently selected subchannel is in the update: we are done. if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p found already selected subchannel %p " "at update index %" PRIuPTR " of %" PRIuPTR "; update done", - this, selected_->subchannel, i, - subchannel_list->num_subchannels); + this, selected_->subchannel(), i, + subchannel_list->num_subchannels()); } - if (selected_->connected_subchannel != nullptr) { - sd->connected_subchannel = selected_->connected_subchannel; + if (selected_->connected_subchannel() != nullptr) { + sd->SetConnectedSubchannelFromLocked(selected_); } selected_ = sd; if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - subchannel_list_, "pf_update_includes_selected"); + subchannel_list_->ShutdownLocked("pf_update_includes_selected"); } - subchannel_list_ = subchannel_list; + subchannel_list_ = std::move(subchannel_list); DestroyUnselectedSubchannelsLocked(); - SubchannelListRefForConnectivityWatch( - subchannel_list, "connectivity_watch+replace_selected"); - grpc_lb_subchannel_data_start_connectivity_watch(sd); + subchannel_list_->RefForConnectivityWatch( + "connectivity_watch+replace_selected"); + sd->StartConnectivityWatchLocked(); // If there was a previously pending update (which may or may // not have contained the currently selected subchannel), drop // it, so that it doesn't override what we've done here. if (latest_pending_subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - latest_pending_subchannel_list_, + latest_pending_subchannel_list_->ShutdownLocked( "pf_update_includes_selected+outdated"); - latest_pending_subchannel_list_ = nullptr; + latest_pending_subchannel_list_.reset(); } return; } @@ -375,74 +381,89 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { gpr_log(GPR_DEBUG, "Pick First %p Shutting down latest pending subchannel list " "%p, about to be replaced by newer latest %p", - this, latest_pending_subchannel_list_, subchannel_list); + this, latest_pending_subchannel_list_.get(), + subchannel_list.get()); } - grpc_lb_subchannel_list_shutdown_and_unref( - latest_pending_subchannel_list_, "sl_outdated_dont_smash"); + latest_pending_subchannel_list_->ShutdownLocked("sl_outdated_dont_smash"); + } + latest_pending_subchannel_list_ = std::move(subchannel_list); + // If we've started picking, start trying to connect to the first + // subchannel in the new list. + if (started_picking_) { + latest_pending_subchannel_list_->RefForConnectivityWatch( + "connectivity_watch+update"); + latest_pending_subchannel_list_->subchannel(0) + ->StartConnectivityWatchLocked(); } - latest_pending_subchannel_list_ = subchannel_list; - } - // If we've started picking, start trying to connect to the first - // subchannel in the new list. - if (started_picking_) { - SubchannelListRefForConnectivityWatch(subchannel_list, - "connectivity_watch+update"); - grpc_lb_subchannel_data_start_connectivity_watch( - &subchannel_list->subchannels[0]); } } -void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { - grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg); - PickFirst* p = static_cast<PickFirst*>(sd->subchannel_list->policy); +void PickFirst::PickFirstSubchannelList::RefForConnectivityWatch( + const char* reason) { + // TODO(roth): We currently track these refs manually. Once the new + // ClosureRef API is ready, find a way to pass the RefCountedPtr<> + // along with the closures instead of doing this manually. + // Ref subchannel list. + Ref(DEBUG_LOCATION, reason).release(); + // Ref LB policy. + PickFirst* p = static_cast<PickFirst*>(policy()); + p->Ref(DEBUG_LOCATION, reason).release(); +} + +void PickFirst::PickFirstSubchannelList::UnrefForConnectivityWatch( + const char* reason) { + // Unref LB policy. + PickFirst* p = static_cast<PickFirst*>(policy()); + p->Unref(DEBUG_LOCATION, reason); + // Unref subchannel list. + Unref(DEBUG_LOCATION, reason); +} + +void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( + grpc_error* error) { + PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p connectivity changed for subchannel %p (%" PRIuPTR " of %" PRIuPTR "), subchannel_list %p: state=%s p->shutdown_=%d " "sd->subchannel_list->shutting_down=%d error=%s", - p, sd->subchannel, sd->subchannel_list->checking_subchannel, - sd->subchannel_list->num_subchannels, sd->subchannel_list, - grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), - p->shutdown_, sd->subchannel_list->shutting_down, + p, subchannel(), Index(), subchannel_list()->num_subchannels(), + subchannel_list(), + grpc_connectivity_state_name(connectivity_state()), + p->shutdown_, subchannel_list()->shutting_down(), grpc_error_string(error)); } // If the policy is shutting down, unref and return. if (p->shutdown_) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_shutdown"); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "pf_shutdown"); + StopConnectivityWatchLocked(); + UnrefSubchannelLocked("pf_shutdown"); + subchannel_list()->UnrefForConnectivityWatch("pf_shutdown"); return; } // If the subchannel list is shutting down, stop watching. - if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_sl_shutdown"); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "pf_sl_shutdown"); + if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) { + StopConnectivityWatchLocked(); + UnrefSubchannelLocked("pf_sl_shutdown"); + subchannel_list()->UnrefForConnectivityWatch("pf_sl_shutdown"); return; } // If we're still here, the notification must be for a subchannel in // either the current or latest pending subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ || - sd->subchannel_list == p->latest_pending_subchannel_list_); - // Update state. - sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; + GPR_ASSERT(p->subchannel_list_ == subchannel_list() || + p->latest_pending_subchannel_list_ == subchannel_list()); // Handle updates for the currently selected subchannel. - if (p->selected_ == sd) { + if (p->selected_ == this) { // If the new state is anything other than READY and there is a // pending update, switch to the pending update. - if (sd->curr_connectivity_state != GRPC_CHANNEL_READY && + if (connectivity_state() != GRPC_CHANNEL_READY && p->latest_pending_subchannel_list_ != nullptr) { p->selected_ = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - p->SubchannelListUnrefForConnectivityWatch( - sd->subchannel_list, "selected_not_ready+switch_to_update"); - grpc_lb_subchannel_list_shutdown_and_unref( - p->subchannel_list_, "selected_not_ready+switch_to_update"); - p->subchannel_list_ = p->latest_pending_subchannel_list_; - p->latest_pending_subchannel_list_ = nullptr; + StopConnectivityWatchLocked(); + subchannel_list()->UnrefForConnectivityWatch( + "selected_not_ready+switch_to_update"); + subchannel_list()->ShutdownLocked("selected_not_ready+switch_to_update"); + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); grpc_connectivity_state_set( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); @@ -452,8 +473,8 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { // re-resolution is introduced. But we need to investigate whether we // really want to take any action instead of waiting for the selected // subchannel reconnecting. - GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(connectivity_state() != GRPC_CHANNEL_SHUTDOWN); + if (connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected channel goes bad, request a re-resolution. grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, @@ -462,17 +483,14 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE); // In transient failure. Rely on re-resolution to recover. p->selected_ = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "pf_selected_shutdown"); - grpc_lb_subchannel_data_unref_subchannel( - sd, "pf_selected_shutdown"); // Unrefs connected subchannel + StopConnectivityWatchLocked(); + subchannel_list()->UnrefForConnectivityWatch("pf_selected_shutdown"); + UnrefSubchannelLocked("pf_selected_shutdown"); } else { - grpc_connectivity_state_set(&p->state_tracker_, - sd->curr_connectivity_state, + grpc_connectivity_state_set(&p->state_tracker_, connectivity_state(), GRPC_ERROR_REF(error), "selected_changed"); // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + StartConnectivityWatchLocked(); } } return; @@ -486,26 +504,23 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { // for a subchannel in p->latest_pending_subchannel_list_. The // goal here is to find a subchannel from the update that we can // select in place of the current one. - switch (sd->curr_connectivity_state) { + switch (connectivity_state()) { case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list_ to // p->subchannel_list_. - sd->connected_subchannel = - grpc_subchannel_get_connected_subchannel(sd->subchannel); - if (sd->subchannel_list == p->latest_pending_subchannel_list_) { + GetConnectedSubchannelFromSubchannelLocked(); + if (p->latest_pending_subchannel_list_ == subchannel_list()) { GPR_ASSERT(p->subchannel_list_ != nullptr); - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_, - "finish_update"); - p->subchannel_list_ = p->latest_pending_subchannel_list_; - p->latest_pending_subchannel_list_ = nullptr; + p->subchannel_list_->ShutdownLocked("finish_update"); + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); } // Cases 1 and 2. grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); - p->selected_ = sd; + p->selected_ = this; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, - sd->subchannel); + subchannel()); } // Drop all other subchannels, since we are now connected. p->DestroyUnselectedSubchannelsLocked(); @@ -513,7 +528,8 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { PickState* pick; while ((pick = p->pending_picks_)) { p->pending_picks_ = pick->next; - pick->connected_subchannel = p->selected_->connected_subchannel; + pick->connected_subchannel = + p->selected_->connected_subchannel()->Ref(); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", @@ -522,40 +538,38 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + StartConnectivityWatchLocked(); break; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); + StopConnectivityWatchLocked(); + PickFirstSubchannelData* sd = this; do { - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; - } while (sd->subchannel == nullptr); + size_t next_index = + (sd->Index() + 1) % subchannel_list()->num_subchannels(); + sd = subchannel_list()->subchannel(next_index); + } while (sd->subchannel() == nullptr); // Case 1: Only set state to TRANSIENT_FAILURE if we've tried // all subchannels. - if (sd->subchannel_list->checking_subchannel == 0 && - sd->subchannel_list == p->subchannel_list_) { + if (sd->Index() == 0 && p->subchannel_list_ == subchannel_list()) { grpc_connectivity_state_set( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + sd->StartConnectivityWatchLocked(); break; } case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: { // Only update connectivity state in case 1. - if (sd->subchannel_list == p->subchannel_list_) { + if (p->subchannel_list_ == subchannel_list()) { grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_REF(error), "connecting_changed"); } // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + StartConnectivityWatchLocked(); break; } case GRPC_CHANNEL_SHUTDOWN: diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index e534131c02..a9d9227ea5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -73,23 +73,93 @@ class RoundRobin : public LoadBalancingPolicy { private: ~RoundRobin(); + class RoundRobinSubchannelList; + + class RoundRobinSubchannelData + : public SubchannelData<RoundRobinSubchannelList, + RoundRobinSubchannelData> { + public: + RoundRobinSubchannelData(RoundRobinSubchannelList* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, + grpc_subchannel* subchannel, + grpc_combiner* combiner) + : SubchannelData(subchannel_list, user_data_vtable, address, + subchannel, combiner), + user_data_vtable_(user_data_vtable), + user_data_(user_data_vtable_ != nullptr + ? user_data_vtable_->copy(address.user_data) + : nullptr) {} + + void ProcessConnectivityChangeLocked(grpc_error* error) override; + + void UnrefSubchannelLocked(const char* reason) override { + SubchannelData::UnrefSubchannelLocked(reason); + if (user_data_ != nullptr) { + GPR_ASSERT(user_data_vtable_ != nullptr); + user_data_vtable_->destroy(user_data_); + user_data_ = nullptr; + } + } + + void* user_data() const { return user_data_; } + + grpc_connectivity_state CheckConnectivityStateLocked() override { + prev_connectivity_state_ = SubchannelData::CheckConnectivityStateLocked(); + return prev_connectivity_state_; + } + + private: + const grpc_lb_user_data_vtable* user_data_vtable_; + void* user_data_ = nullptr; + grpc_connectivity_state prev_connectivity_state_ = GRPC_CHANNEL_IDLE; + }; + + class RoundRobinSubchannelList + : public SubchannelList<RoundRobinSubchannelList, + RoundRobinSubchannelData> { + public: + RoundRobinSubchannelList( + RoundRobin* policy, TraceFlag* tracer, + const grpc_lb_addresses* addresses, grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + const grpc_channel_args& args) + : SubchannelList(policy, tracer, addresses, combiner, + client_channel_factory, args), + num_idle_(num_subchannels()) {} + + void RefForConnectivityWatch(const char* reason); + void UnrefForConnectivityWatch(const char* reason); + + void UpdateStateCountersLocked(grpc_connectivity_state old_state, + grpc_connectivity_state new_state); + + size_t num_ready() const { return num_ready_; } + size_t num_transient_failure() const { return num_transient_failure_; } + size_t num_idle() const { return num_idle_; } + + private: + size_t num_ready_ = 0; + size_t num_transient_failure_ = 0; + size_t num_idle_; + }; + void ShutdownLocked() override; void StartPickingLocked(); size_t GetNextReadySubchannelIndexLocked(); void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index); - void UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, - grpc_error* error); - - static void OnConnectivityChangedLocked(void* arg, grpc_error* error); - - void SubchannelListRefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); - void SubchannelListUnrefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); + void UpdateConnectivityStateLocked(grpc_connectivity_state state, + grpc_error* error); /** list of subchannels */ - grpc_lb_subchannel_list* subchannel_list_ = nullptr; + RefCountedPtr<RoundRobinSubchannelList> subchannel_list_; + /** Latest version of the subchannel list. + * Subchannel connectivity callbacks will only promote updated subchannel + * lists if they equal \a latest_pending_subchannel_list. In other words, + * racing callbacks that reference outdated subchannel lists won't perform any + * update. */ + RefCountedPtr<RoundRobinSubchannelList> latest_pending_subchannel_list_; /** have we started picking? */ bool started_picking_ = false; /** are we shutting down? */ @@ -98,14 +168,8 @@ class RoundRobin : public LoadBalancingPolicy { PickState* pending_picks_ = nullptr; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker_; - /** Index into subchannels for last pick. */ + /** Index into subchannel_list_ for last pick. */ size_t last_ready_subchannel_index_ = 0; - /** Latest version of the subchannel list. - * Subchannel connectivity callbacks will only promote updated subchannel - * lists if they equal \a latest_pending_subchannel_list. In other words, - * racing callbacks that reference outdated subchannel lists won't perform any - * update. */ - grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr; }; RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { @@ -115,7 +179,7 @@ RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { UpdateLocked(*args.args); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Created with %" PRIuPTR " subchannels", this, - subchannel_list_->num_subchannels); + subchannel_list_->num_subchannels()); } grpc_subchannel_index_ref(); } @@ -144,30 +208,30 @@ size_t RoundRobin::GetNextReadySubchannelIndexLocked() { "[RR %p] getting next ready subchannel (out of %" PRIuPTR "), " "last_ready_subchannel_index=%" PRIuPTR, - this, subchannel_list_->num_subchannels, + this, subchannel_list_->num_subchannels(), last_ready_subchannel_index_); } - for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { + for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { const size_t index = (i + last_ready_subchannel_index_ + 1) % - subchannel_list_->num_subchannels; + subchannel_list_->num_subchannels(); if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR ": state=%s", - this, subchannel_list_->subchannels[index].subchannel, - subchannel_list_, index, + this, subchannel_list_->subchannel(index)->subchannel(), + subchannel_list_.get(), index, grpc_connectivity_state_name( - subchannel_list_->subchannels[index].curr_connectivity_state)); + subchannel_list_->subchannel(index)->connectivity_state())); } - if (subchannel_list_->subchannels[index].curr_connectivity_state == + if (subchannel_list_->subchannel(index)->connectivity_state() == GRPC_CHANNEL_READY) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR " of subchannel_list %p", - this, subchannel_list_->subchannels[index].subchannel, index, - subchannel_list_); + this, subchannel_list_->subchannel(index)->subchannel(), + index, subchannel_list_.get()); } return index; } @@ -175,21 +239,21 @@ size_t RoundRobin::GetNextReadySubchannelIndexLocked() { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", this); } - return subchannel_list_->num_subchannels; + return subchannel_list_->num_subchannels(); } // Sets last_ready_subchannel_index_ to last_ready_index. void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) { - GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels); + GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels()); last_ready_subchannel_index_ = last_ready_index; if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR " (SC %p, CSC %p)", this, last_ready_index, - subchannel_list_->subchannels[last_ready_index].subchannel, - subchannel_list_->subchannels[last_ready_index] - .connected_subchannel.get()); + subchannel_list_->subchannel(last_ready_index)->subchannel(), + subchannel_list_->subchannel(last_ready_index) + ->connected_subchannel()); } } @@ -219,14 +283,12 @@ void RoundRobin::ShutdownLocked() { grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, - "sl_shutdown_rr_shutdown"); - subchannel_list_ = nullptr; + subchannel_list_->ShutdownLocked("rr_shutdown"); + subchannel_list_.reset(); } if (latest_pending_subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - latest_pending_subchannel_list_, "sl_shutdown_pending_rr_shutdown"); - latest_pending_subchannel_list_ = nullptr; + latest_pending_subchannel_list_->ShutdownLocked("rr_shutdown"); + latest_pending_subchannel_list_.reset(); } TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); @@ -273,32 +335,12 @@ void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, GRPC_ERROR_UNREF(error); } -void RoundRobin::SubchannelListRefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - // TODO(roth): We currently track this ref manually. Once the new - // ClosureRef API is ready and the subchannel_list code has been - // converted to a C++ API, find a way to hold the RefCountedPtr<> - // somewhere (maybe in the subchannel_data object) instead of doing - // this manually. - auto self = Ref(DEBUG_LOCATION, reason); - self.release(); - grpc_lb_subchannel_list_ref(subchannel_list, reason); -} - -void RoundRobin::SubchannelListUnrefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - Unref(DEBUG_LOCATION, reason); - grpc_lb_subchannel_list_unref(subchannel_list, reason); -} - void RoundRobin::StartPickingLocked() { started_picking_ = true; - for (size_t i = 0; i < subchannel_list_->num_subchannels; i++) { - if (subchannel_list_->subchannels[i].subchannel != nullptr) { - SubchannelListRefForConnectivityWatch(subchannel_list_, - "connectivity_watch"); - grpc_lb_subchannel_data_start_connectivity_watch( - &subchannel_list_->subchannels[i]); + for (size_t i = 0; i < subchannel_list_->num_subchannels(); i++) { + if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { + subchannel_list_->RefForConnectivityWatch("connectivity_watch"); + subchannel_list_->subchannel(i)->StartConnectivityWatchLocked(); } } } @@ -317,21 +359,21 @@ bool RoundRobin::PickLocked(PickState* pick) { GPR_ASSERT(!shutdown_); if (subchannel_list_ != nullptr) { const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); - if (next_ready_index < subchannel_list_->num_subchannels) { + if (next_ready_index < subchannel_list_->num_subchannels()) { /* readily available, report right away */ - grpc_lb_subchannel_data* sd = - &subchannel_list_->subchannels[next_ready_index]; - pick->connected_subchannel = sd->connected_subchannel; + RoundRobinSubchannelData* sd = + subchannel_list_->subchannel(next_ready_index); + pick->connected_subchannel = sd->connected_subchannel()->Ref(); if (pick->user_data != nullptr) { - *pick->user_data = sd->user_data; + *pick->user_data = sd->user_data(); } if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " "index %" PRIuPTR ")", - this, sd->subchannel, pick->connected_subchannel.get(), - sd->subchannel_list, next_ready_index); + this, sd->subchannel(), pick->connected_subchannel.get(), + sd->subchannel_list(), next_ready_index); } /* only advance the last picked pointer if the selection was used */ UpdateLastReadySubchannelIndexLocked(next_ready_index); @@ -347,36 +389,12 @@ bool RoundRobin::PickLocked(PickState* pick) { return false; } -void UpdateStateCountersLocked(grpc_lb_subchannel_data* sd) { - grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; - GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN); - GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); - if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { - GPR_ASSERT(subchannel_list->num_ready > 0); - --subchannel_list->num_ready; - } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - GPR_ASSERT(subchannel_list->num_transient_failures > 0); - --subchannel_list->num_transient_failures; - } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { - GPR_ASSERT(subchannel_list->num_idle > 0); - --subchannel_list->num_idle; - } - sd->prev_connectivity_state = sd->curr_connectivity_state; - if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { - ++subchannel_list->num_ready; - } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - ++subchannel_list->num_transient_failures; - } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { - ++subchannel_list->num_idle; - } -} - /** Sets the policy's connectivity status based on that of the passed-in \a sd * (the grpc_lb_subchannel_data associated with the updated subchannel) and the * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used * only if the policy transitions to state TRANSIENT_FAILURE. */ -void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, - grpc_error* error) { +void RoundRobin::UpdateConnectivityStateLocked(grpc_connectivity_state state, + grpc_error* error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). * @@ -391,18 +409,16 @@ void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, * CHECK: subchannel_list->num_transient_failures == * subchannel_list->num_subchannels. */ - grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; - GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE); - if (subchannel_list->num_ready > 0) { + if (subchannel_list_->num_ready() > 0) { /* 1) READY */ grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); - } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) { + } else if (state == GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */ grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); - } else if (subchannel_list->num_transient_failures == - subchannel_list->num_subchannels) { + } else if (subchannel_list_->num_transient_failure() == + subchannel_list_->num_subchannels()) { /* 3) TRANSIENT_FAILURE */ grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), @@ -411,99 +427,134 @@ void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, GRPC_ERROR_UNREF(error); } -void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) { - grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg); - RoundRobin* p = static_cast<RoundRobin*>(sd->subchannel_list->policy); +void RoundRobin::RoundRobinSubchannelList::RefForConnectivityWatch( + const char* reason) { + // TODO(roth): We currently track these refs manually. Once the new + // ClosureRef API is ready, find a way to pass the RefCountedPtr<> + // along with the closures instead of doing this manually. + // Ref subchannel list. + Ref(DEBUG_LOCATION, reason).release(); + // Ref LB policy. + RoundRobin* p = static_cast<RoundRobin*>(policy()); + p->Ref(DEBUG_LOCATION, reason).release(); +} + +void RoundRobin::RoundRobinSubchannelList::UnrefForConnectivityWatch( + const char* reason) { + // Unref LB policy. + RoundRobin* p = static_cast<RoundRobin*>(policy()); + p->Unref(DEBUG_LOCATION, reason); + // Unref subchannel list. + Unref(DEBUG_LOCATION, reason); +} + +void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked( + grpc_connectivity_state old_state, grpc_connectivity_state new_state) { + GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN); + GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN); + if (old_state == GRPC_CHANNEL_READY) { + GPR_ASSERT(num_ready_ > 0); + --num_ready_; + } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(num_transient_failure_ > 0); + --num_transient_failure_; + } else if (old_state == GRPC_CHANNEL_IDLE) { + GPR_ASSERT(num_idle_ > 0); + --num_idle_; + } + if (new_state == GRPC_CHANNEL_READY) { + ++num_ready_; + } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + ++num_transient_failure_; + } else if (new_state == GRPC_CHANNEL_IDLE) { + ++num_idle_; + } +} + +void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( + grpc_error* error) { + RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy()); if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: " "prev_state=%s new_state=%s p->shutdown=%d " "sd->subchannel_list->shutting_down=%d error=%s", - p, sd->subchannel, sd->subchannel_list, - grpc_connectivity_state_name(sd->prev_connectivity_state), - grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), - p->shutdown_, sd->subchannel_list->shutting_down, + p, subchannel(), subchannel_list(), + grpc_connectivity_state_name(prev_connectivity_state_), + grpc_connectivity_state_name(connectivity_state()), + p->shutdown_, subchannel_list()->shutting_down(), grpc_error_string(error)); } - GPR_ASSERT(sd->subchannel != nullptr); + GPR_ASSERT(subchannel() != nullptr); // If the policy is shutting down, unref and return. if (p->shutdown_) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown"); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "rr_shutdown"); + StopConnectivityWatchLocked(); + UnrefSubchannelLocked("rr_shutdown"); + subchannel_list()->UnrefForConnectivityWatch("rr_shutdown"); return; } // If the subchannel list is shutting down, stop watching. - if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown"); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "rr_sl_shutdown"); + if (subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) { + StopConnectivityWatchLocked(); + UnrefSubchannelLocked("rr_sl_shutdown"); + subchannel_list()->UnrefForConnectivityWatch("rr_sl_shutdown"); return; } + GPR_ASSERT(connectivity_state() != GRPC_CHANNEL_SHUTDOWN); // If we're still here, the notification must be for a subchannel in // either the current or latest pending subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ || - sd->subchannel_list == p->latest_pending_subchannel_list_); - GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN); - // Now that we're inside the combiner, copy the pending connectivity - // state (which was set by the connectivity state watcher) to - // curr_connectivity_state, which is what we use inside of the combiner. - sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; + GPR_ASSERT(p->subchannel_list_ == subchannel_list() || + p->latest_pending_subchannel_list_ == subchannel_list()); // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* // subchannel, if any. - switch (sd->curr_connectivity_state) { + switch (connectivity_state()) { case GRPC_CHANNEL_TRANSIENT_FAILURE: { - sd->connected_subchannel.reset(); + clear_connected_subchannel(); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " "Requesting re-resolution", - p, sd->subchannel); + p, subchannel()); } p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); break; } case GRPC_CHANNEL_READY: { - if (sd->connected_subchannel == nullptr) { - sd->connected_subchannel = - grpc_subchannel_get_connected_subchannel(sd->subchannel); + if (connected_subchannel() == nullptr) { + GetConnectedSubchannelFromSubchannelLocked(); } - if (sd->subchannel_list != p->subchannel_list_) { - // promote sd->subchannel_list to p->subchannel_list_. - // sd->subchannel_list must be equal to + if (p->subchannel_list_ != subchannel_list()) { + // promote subchannel_list() to p->subchannel_list_. + // subchannel_list() must be equal to // p->latest_pending_subchannel_list_ because we have already filtered - // for sds belonging to outdated subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list_); - GPR_ASSERT(!sd->subchannel_list->shutting_down); + // for subchannels belonging to outdated subchannel lists. + GPR_ASSERT(p->latest_pending_subchannel_list_ == subchannel_list()); + GPR_ASSERT(!subchannel_list()->shutting_down()); if (grpc_lb_round_robin_trace.enabled()) { const size_t num_subchannels = p->subchannel_list_ != nullptr - ? p->subchannel_list_->num_subchannels + ? p->subchannel_list_->num_subchannels() : 0; gpr_log(GPR_DEBUG, "[RR %p] phasing out subchannel list %p (size %" PRIuPTR ") in favor of %p (size %" PRIuPTR ")", - p, p->subchannel_list_, num_subchannels, sd->subchannel_list, - num_subchannels); + p, p->subchannel_list_.get(), num_subchannels, + subchannel_list(), subchannel_list()->num_subchannels()); } if (p->subchannel_list_ != nullptr) { // dispose of the current subchannel_list - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_, - "sl_phase_out_shutdown"); + p->subchannel_list_->ShutdownLocked("sl_phase_out_shutdown"); } - p->subchannel_list_ = p->latest_pending_subchannel_list_; - p->latest_pending_subchannel_list_ = nullptr; + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); } /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemptively replicates rr_pick()'s actions. */ const size_t next_ready_index = p->GetNextReadySubchannelIndexLocked(); - GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels); - grpc_lb_subchannel_data* selected = - &p->subchannel_list_->subchannels[next_ready_index]; + GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels()); + RoundRobinSubchannelData* selected = + p->subchannel_list_->subchannel(next_ready_index); if (p->pending_picks_ != nullptr) { // if the selected subchannel is going to be used for the pending // picks, update the last picked pointer @@ -512,15 +563,15 @@ void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) { PickState* pick; while ((pick = p->pending_picks_)) { p->pending_picks_ = pick->next; - pick->connected_subchannel = selected->connected_subchannel; + pick->connected_subchannel = selected->connected_subchannel()->Ref(); if (pick->user_data != nullptr) { - *pick->user_data = selected->user_data; + *pick->user_data = selected->user_data(); } if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Fulfilling pending pick. Target <-- subchannel %p " "(subchannel_list %p, index %" PRIuPTR ")", - p, selected->subchannel, p->subchannel_list_, + p, selected->subchannel(), p->subchannel_list_.get(), next_ready_index); } GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); @@ -533,13 +584,16 @@ void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) { case GRPC_CHANNEL_IDLE:; // fallthrough } // Update state counters. - UpdateStateCountersLocked(sd); + subchannel_list()->UpdateStateCountersLocked(prev_connectivity_state_, + connectivity_state()); + prev_connectivity_state_ = connectivity_state(); // Only update connectivity based on the selected subchannel list. - if (sd->subchannel_list == p->subchannel_list_) { - p->UpdateConnectivityStatusLocked(sd, GRPC_ERROR_REF(error)); + if (p->subchannel_list_ == subchannel_list()) { + p->UpdateConnectivityStateLocked(connectivity_state(), + GRPC_ERROR_REF(error)); } // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + StartConnectivityWatchLocked(); } grpc_connectivity_state RoundRobin::CheckConnectivityLocked( @@ -556,10 +610,10 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, void RoundRobin::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); - if (next_ready_index < subchannel_list_->num_subchannels) { - grpc_lb_subchannel_data* selected = - &subchannel_list_->subchannels[next_ready_index]; - selected->connected_subchannel->Ping(on_initiate, on_ack); + if (next_ready_index < subchannel_list_->num_subchannels()) { + RoundRobinSubchannelData* selected = + subchannel_list_->subchannel(next_ready_index); + selected->connected_subchannel()->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); @@ -587,30 +641,29 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) { gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", this, addresses->num_addresses); } - grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( + auto subchannel_list = MakeRefCounted<RoundRobinSubchannelList>( this, &grpc_lb_round_robin_trace, addresses, combiner(), - client_channel_factory(), args, &RoundRobin::OnConnectivityChangedLocked); - if (subchannel_list->num_subchannels == 0) { + client_channel_factory(), args); + if (subchannel_list->num_subchannels() == 0) { grpc_connectivity_state_set( &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "rr_update_empty"); if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, - "sl_shutdown_empty_update"); + subchannel_list_->ShutdownLocked("sl_shutdown_empty_update"); } - subchannel_list_ = subchannel_list; // empty list + subchannel_list_ = std::move(subchannel_list); // empty list return; } if (started_picking_) { - for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { + for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { const grpc_connectivity_state subchannel_state = - grpc_subchannel_check_connectivity( - subchannel_list->subchannels[i].subchannel, nullptr); + subchannel_list->subchannel(i)->CheckConnectivityStateLocked(); // Override the default setting of IDLE for connectivity notification // purposes if the subchannel is already in transient failure. Otherwise // we'd be immediately notified of the IDLE-TRANSIENT_FAILURE - // discrepancy, attempt to re-resolve and end up here again. + // discrepancy, attempt to re-resolve, and end up here again. +// FIXME // TODO(roth): As part of C++-ifying the subchannel_list API, design a // better API for notifying the LB policy of subchannel states, which can // be used both for the subchannel's initial state and for subsequent @@ -619,43 +672,36 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) { // pending picks across all READY subchannels rather than sending them all // to the first one). if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - subchannel_list->subchannels[i].pending_connectivity_state_unsafe = - subchannel_list->subchannels[i].curr_connectivity_state = - subchannel_list->subchannels[i].prev_connectivity_state = - subchannel_state; - --subchannel_list->num_idle; - ++subchannel_list->num_transient_failures; + subchannel_list->UpdateStateCountersLocked(GRPC_CHANNEL_IDLE, + subchannel_state); } } + for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { + /* Watch every new subchannel. A subchannel list becomes active the + * moment one of its subchannels is READY. At that moment, we swap + * p->subchannel_list for sd->subchannel_list, provided the subchannel + * list is still valid (ie, isn't shutting down) */ + subchannel_list->RefForConnectivityWatch("connectivity_watch"); + subchannel_list->subchannel(i)->StartConnectivityWatchLocked(); + } if (latest_pending_subchannel_list_ != nullptr) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down latest pending subchannel list %p, " "about to be replaced by newer latest %p", - this, latest_pending_subchannel_list_, subchannel_list); + this, latest_pending_subchannel_list_.get(), + subchannel_list.get()); } - grpc_lb_subchannel_list_shutdown_and_unref( - latest_pending_subchannel_list_, "sl_outdated"); - } - latest_pending_subchannel_list_ = subchannel_list; - for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { - /* Watch every new subchannel. A subchannel list becomes active the - * moment one of its subchannels is READY. At that moment, we swap - * p->subchannel_list for sd->subchannel_list, provided the subchannel - * list is still valid (ie, isn't shutting down) */ - SubchannelListRefForConnectivityWatch(subchannel_list, - "connectivity_watch"); - grpc_lb_subchannel_data_start_connectivity_watch( - &subchannel_list->subchannels[i]); + latest_pending_subchannel_list_->ShutdownLocked("sl_outdated"); } + latest_pending_subchannel_list_ = std::move(subchannel_list); } else { // The policy isn't picking yet. Save the update for later, disposing of // previous version if any. if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - subchannel_list_, "rr_update_before_started_picking"); + subchannel_list_->ShutdownLocked("rr_update_before_started_picking"); } - subchannel_list_ = subchannel_list; + subchannel_list_ = std::move(subchannel_list); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc deleted file mode 100644 index 79cb64c6c6..0000000000 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ /dev/null @@ -1,253 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#include <string.h> - -#include <grpc/support/alloc.h> - -#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/debug/trace.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/combiner.h" -#include "src/core/lib/iomgr/sockaddr_utils.h" -#include "src/core/lib/transport/connectivity_state.h" - -void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, - const char* reason) { - if (sd->subchannel != nullptr) { - if (sd->subchannel_list->tracer->enabled()) { - gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): unreffing subchannel", - sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, - sd->subchannel_list, - static_cast<size_t>(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel); - } - GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason); - sd->subchannel = nullptr; - sd->connected_subchannel.reset(); - if (sd->user_data != nullptr) { - GPR_ASSERT(sd->user_data_vtable != nullptr); - sd->user_data_vtable->destroy(sd->user_data); - sd->user_data = nullptr; - } - } -} - -void grpc_lb_subchannel_data_start_connectivity_watch( - grpc_lb_subchannel_data* sd) { - if (sd->subchannel_list->tracer->enabled()) { - gpr_log( - GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): requesting connectivity change " - "notification (from %s)", - sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, - sd->subchannel_list, - static_cast<size_t>(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel, - grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe)); - } - sd->connectivity_notification_pending = true; - grpc_subchannel_notify_on_state_change( - sd->subchannel, sd->subchannel_list->policy->interested_parties(), - &sd->pending_connectivity_state_unsafe, - &sd->connectivity_changed_closure); -} - -void grpc_lb_subchannel_data_stop_connectivity_watch( - grpc_lb_subchannel_data* sd) { - if (sd->subchannel_list->tracer->enabled()) { - gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): stopping connectivity watch", - sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, - sd->subchannel_list, - static_cast<size_t>(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel); - } - GPR_ASSERT(sd->connectivity_notification_pending); - sd->connectivity_notification_pending = false; -} - -grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( - grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer, - const grpc_lb_addresses* addresses, grpc_combiner* combiner, - grpc_client_channel_factory* client_channel_factory, - const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb) { - grpc_lb_subchannel_list* subchannel_list = - static_cast<grpc_lb_subchannel_list*>( - gpr_zalloc(sizeof(*subchannel_list))); - if (tracer->enabled()) { - gpr_log(GPR_DEBUG, - "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", - tracer->name(), p, subchannel_list, addresses->num_addresses); - } - subchannel_list->policy = p; - subchannel_list->tracer = tracer; - gpr_ref_init(&subchannel_list->refcount, 1); - subchannel_list->subchannels = static_cast<grpc_lb_subchannel_data*>( - gpr_zalloc(sizeof(grpc_lb_subchannel_data) * addresses->num_addresses)); - // We need to remove the LB addresses in order to be able to compare the - // subchannel keys of subchannels from a different batch of addresses. - static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, - GRPC_ARG_LB_ADDRESSES}; - // Create a subchannel for each address. - grpc_subchannel_args sc_args; - size_t subchannel_index = 0; - for (size_t i = 0; i < addresses->num_addresses; i++) { - // If there were any balancer, we would have chosen grpclb policy instead. - GPR_ASSERT(!addresses->addresses[i].is_balancer); - memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - grpc_arg addr_arg = - grpc_create_subchannel_address_arg(&addresses->addresses[i].address); - grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( - &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1); - gpr_free(addr_arg.value.string); - sc_args.args = new_args; - grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( - client_channel_factory, &sc_args); - grpc_channel_args_destroy(new_args); - if (subchannel == nullptr) { - // Subchannel could not be created. - if (tracer->enabled()) { - char* address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, - "[%s %p] could not create subchannel for address uri %s, " - "ignoring", - tracer->name(), subchannel_list->policy, address_uri); - gpr_free(address_uri); - } - continue; - } - if (tracer->enabled()) { - char* address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIuPTR - ": Created subchannel %p for address uri %s", - tracer->name(), p, subchannel_list, subchannel_index, subchannel, - address_uri); - gpr_free(address_uri); - } - grpc_lb_subchannel_data* sd = - &subchannel_list->subchannels[subchannel_index++]; - sd->subchannel_list = subchannel_list; - sd->subchannel = subchannel; - GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, - connectivity_changed_cb, sd, - grpc_combiner_scheduler(combiner)); - // We assume that the current state is IDLE. If not, we'll get a - // callback telling us that. - sd->prev_connectivity_state = GRPC_CHANNEL_IDLE; - sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; - sd->pending_connectivity_state_unsafe = GRPC_CHANNEL_IDLE; - sd->user_data_vtable = addresses->user_data_vtable; - if (sd->user_data_vtable != nullptr) { - sd->user_data = - sd->user_data_vtable->copy(addresses->addresses[i].user_data); - } - } - subchannel_list->num_subchannels = subchannel_index; - subchannel_list->num_idle = subchannel_index; - return subchannel_list; -} - -static void subchannel_list_destroy(grpc_lb_subchannel_list* subchannel_list) { - if (subchannel_list->tracer->enabled()) { - gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p", - subchannel_list->tracer->name(), subchannel_list->policy, - subchannel_list); - } - for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { - grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; - grpc_lb_subchannel_data_unref_subchannel(sd, "subchannel_list_destroy"); - } - gpr_free(subchannel_list->subchannels); - gpr_free(subchannel_list); -} - -void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list, - const char* reason) { - gpr_ref_non_zero(&subchannel_list->refcount); - if (subchannel_list->tracer->enabled()) { - const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p REF %lu->%lu (%s)", - subchannel_list->tracer->name(), subchannel_list->policy, - subchannel_list, static_cast<unsigned long>(count - 1), - static_cast<unsigned long>(count), reason); - } -} - -void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list, - const char* reason) { - const bool done = gpr_unref(&subchannel_list->refcount); - if (subchannel_list->tracer->enabled()) { - const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p UNREF %lu->%lu (%s)", - subchannel_list->tracer->name(), subchannel_list->policy, - subchannel_list, static_cast<unsigned long>(count + 1), - static_cast<unsigned long>(count), reason); - } - if (done) { - subchannel_list_destroy(subchannel_list); - } -} - -static void subchannel_data_cancel_connectivity_watch( - grpc_lb_subchannel_data* sd, const char* reason) { - if (sd->subchannel_list->tracer->enabled()) { - gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): canceling connectivity watch (%s)", - sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, - sd->subchannel_list, - static_cast<size_t>(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel, reason); - } - grpc_subchannel_notify_on_state_change(sd->subchannel, nullptr, nullptr, - &sd->connectivity_changed_closure); -} - -void grpc_lb_subchannel_list_shutdown_and_unref( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - if (subchannel_list->tracer->enabled()) { - gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)", - subchannel_list->tracer->name(), subchannel_list->policy, - subchannel_list, reason); - } - GPR_ASSERT(!subchannel_list->shutting_down); - subchannel_list->shutting_down = true; - for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { - grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; - // If there's a pending notification for this subchannel, cancel it; - // the callback is responsible for unreffing the subchannel. - // Otherwise, unref the subchannel directly. - if (sd->connectivity_notification_pending) { - subchannel_data_cancel_connectivity_watch(sd, reason); - } else if (sd->subchannel != nullptr) { - grpc_lb_subchannel_data_unref_subchannel(sd, reason); - } - } - grpc_lb_subchannel_list_unref(subchannel_list, reason); -} diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 6889d596ac..d717ba0e37 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -21,116 +21,388 @@ #include <grpc/support/port_platform.h> +#include <string.h> + +#include <grpc/support/alloc.h> + #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/combiner.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" -// TODO(roth): This code is intended to be shared between pick_first and -// round_robin. However, the interface needs more work to provide clean -// encapsulation. For example, the structs here have some fields that are -// only used in one of the two (e.g., the state counters in -// grpc_lb_subchannel_list and the prev_connectivity_state field in -// grpc_lb_subchannel_data are only used in round_robin, and the -// checking_subchannel field in grpc_lb_subchannel_list is only used by -// pick_first). Also, there is probably some code duplication between the -// connectivity state notification callback code in both pick_first and -// round_robin that could be refactored and moved here. In a future PR, -// need to clean this up. - -typedef struct grpc_lb_subchannel_list grpc_lb_subchannel_list; - -typedef struct { - /** backpointer to owning subchannel list */ - grpc_lb_subchannel_list* subchannel_list; - /** subchannel itself */ - grpc_subchannel* subchannel; - grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; - /** Is a connectivity notification pending? */ - bool connectivity_notification_pending; - /** notification that connectivity has changed on subchannel */ - grpc_closure connectivity_changed_closure; - /** previous and current connectivity states. Updated by \a - * \a connectivity_changed_closure based on - * \a pending_connectivity_state_unsafe. */ - grpc_connectivity_state prev_connectivity_state; - grpc_connectivity_state curr_connectivity_state; - /** connectivity state to be updated by - * grpc_subchannel_notify_on_state_change(), not guarded by - * the combiner. To be copied to \a curr_connectivity_state by - * \a connectivity_changed_closure. */ - grpc_connectivity_state pending_connectivity_state_unsafe; - /** the subchannel's target user data */ - void* user_data; - /** vtable to operate over \a user_data */ - const grpc_lb_user_data_vtable* user_data_vtable; -} grpc_lb_subchannel_data; - -/// Unrefs the subchannel contained in sd. -void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, - const char* reason); - -/// Starts watching the connectivity state of the subchannel. -/// The connectivity_changed_cb callback must invoke either -/// grpc_lb_subchannel_data_stop_connectivity_watch() or again call -/// grpc_lb_subchannel_data_start_connectivity_watch(). -void grpc_lb_subchannel_data_start_connectivity_watch( - grpc_lb_subchannel_data* sd); - -/// Stops watching the connectivity state of the subchannel. -void grpc_lb_subchannel_data_stop_connectivity_watch( - grpc_lb_subchannel_data* sd); - -struct grpc_lb_subchannel_list { - /** backpointer to owning policy */ - grpc_core::LoadBalancingPolicy* policy; - - grpc_core::TraceFlag* tracer; - - /** all our subchannels */ - size_t num_subchannels; - grpc_lb_subchannel_data* subchannels; - - /** Index into subchannels of the one we're currently checking. - * Used when connecting to subchannels serially instead of in parallel. */ - // TODO(roth): When we have time, we can probably make this go away - // and compute the index dynamically by subtracting - // subchannel_list->subchannels from the subchannel_data pointer. - size_t checking_subchannel; - - /** how many subchannels are in state READY */ - size_t num_ready; - /** how many subchannels are in state TRANSIENT_FAILURE */ - size_t num_transient_failures; - /** how many subchannels are in state IDLE */ - size_t num_idle; - - /** There will be one ref for each entry in subchannels for which there is a - * pending connectivity state watcher callback. */ - gpr_refcount refcount; - - /** Is this list shutting down? This may be true due to the shutdown of the - * policy itself or because a newer update has arrived while this one hadn't - * finished processing. */ - bool shutting_down; +// FIXME: add comments + +namespace grpc_core { + +template <typename SubchannelListType, typename SubchannelDataType> +class SubchannelData { + public: + // Returns the index into subchannel_list_ of this object. + size_t Index() const { + return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) - + subchannel_list_->subchannel(0)); + } + + SubchannelListType* subchannel_list() const { return subchannel_list_; } + + grpc_subchannel* subchannel() const { return subchannel_; } + + ConnectedSubchannel* connected_subchannel() const { + return connected_subchannel_.get(); + } + +// FIXME: maybe do this automatically in OnConnectivityChangedLocked() +// when the new state is TRANSIENT_FAILURE? + void clear_connected_subchannel() { connected_subchannel_.reset(); } + + void GetConnectedSubchannelFromSubchannelLocked() { + connected_subchannel_ = + grpc_subchannel_get_connected_subchannel(subchannel_); + } + + void SetConnectedSubchannelFromLocked(SubchannelData* other) { + connected_subchannel_ = other->connected_subchannel_; // Adds ref. + } + + bool connectivity_notification_pending() const { + return connectivity_notification_pending_; + } + grpc_connectivity_state connectivity_state() const { + return curr_connectivity_state_; + } + + virtual grpc_connectivity_state CheckConnectivityStateLocked() { + pending_connectivity_state_unsafe_ = + grpc_subchannel_check_connectivity(subchannel(), nullptr); + curr_connectivity_state_ = pending_connectivity_state_unsafe_; + return curr_connectivity_state_; + } + + // Unrefs the subchannel. + virtual void UnrefSubchannelLocked(const char* reason); + + /// Starts watching the connectivity state of the subchannel. + /// The connectivity_changed_cb callback must invoke either + /// StopConnectivityWatch() or again call StartConnectivityWatch(). + void StartConnectivityWatchLocked(); + + /// Stops watching the connectivity state of the subchannel. + void StopConnectivityWatchLocked(); + + /// Cancels watching the connectivity state of the subchannel. + void CancelConnectivityWatchLocked(const char* reason); + + void ShutdownLocked(const char* reason); + + GRPC_ABSTRACT_BASE_CLASS + + protected: + SubchannelData( + SubchannelListType* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, grpc_subchannel* subchannel, + grpc_combiner* combiner); + + virtual ~SubchannelData(); + +// FIXME: define API + virtual void ProcessConnectivityChangeLocked(grpc_error* error) GRPC_ABSTRACT; + + private: + static void OnConnectivityChangedLocked(void* arg, grpc_error* error); + + // Backpointer to owning subchannel list. Not owned. + SubchannelListType* subchannel_list_; + + // The subchannel and connected subchannel. + grpc_subchannel* subchannel_; + RefCountedPtr<ConnectedSubchannel> connected_subchannel_; + + // Notification that connectivity has changed on subchannel. + grpc_closure connectivity_changed_closure_; + // Is a connectivity notification pending? + bool connectivity_notification_pending_; + // Connectivity state to be updated by + // grpc_subchannel_notify_on_state_change(), not guarded by + // the combiner. Will be copied to \a curr_connectivity_state_ by + // \a connectivity_changed_closure_. + grpc_connectivity_state pending_connectivity_state_unsafe_; + // Current connectivity state. + grpc_connectivity_state curr_connectivity_state_; }; -grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( - grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer, +template <typename SubchannelListType, typename SubchannelDataType> +class SubchannelList + : public RefCountedWithTracing<SubchannelListType> { + public: + typedef InlinedVector<SubchannelDataType, 10> SubchannelVector; + + size_t num_subchannels() const { return subchannels_.size(); } + SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; } + + // Marks the subchannel_list as discarded. Unsubscribes all its subchannels. + void ShutdownLocked(const char* reason); + + bool shutting_down() const { return shutting_down_; } + + LoadBalancingPolicy* policy() const { return policy_; } + TraceFlag* tracer() const { return tracer_; } + + GRPC_ABSTRACT_BASE_CLASS + + protected: + SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer, + const grpc_lb_addresses* addresses, grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + const grpc_channel_args& args); + + virtual ~SubchannelList(); + + private: + // So New() can call our private ctor. + template <typename T, typename... Args> + friend T* New(Args&&... args); + + // Backpointer to owning policy. + LoadBalancingPolicy* policy_; + + TraceFlag* tracer_; + + // The list of subchannels. + SubchannelVector subchannels_; + + // Is this list shutting down? This may be true due to the shutdown of the + // policy itself or because a newer update has arrived while this one hadn't + // finished processing. + bool shutting_down_ = false; +}; + +// +// implementation -- no user-servicable parts below +// + +// +// SubchannelData +// + +template <typename SubchannelListType, typename SubchannelDataType> +SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData( + SubchannelListType* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, grpc_subchannel* subchannel, + grpc_combiner* combiner) + : subchannel_list_(subchannel_list), + subchannel_(subchannel), + // We assume that the current state is IDLE. If not, we'll get a + // callback telling us that. + pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE), + curr_connectivity_state_(GRPC_CHANNEL_IDLE) { + GRPC_CLOSURE_INIT( + &connectivity_changed_closure_, + (&SubchannelData<SubchannelListType, + SubchannelDataType>::OnConnectivityChangedLocked), + this, grpc_combiner_scheduler(combiner)); +} + +template <typename SubchannelListType, typename SubchannelDataType> +SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() { + UnrefSubchannelLocked("subchannel_data_destroy"); +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::UnrefSubchannelLocked( + const char* reason) { + if (subchannel_ != nullptr) { + if (subchannel_list_->tracer()->enabled()) { + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): unreffing subchannel", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), + subchannel_list_->num_subchannels(), subchannel_); + } + GRPC_SUBCHANNEL_UNREF(subchannel_, reason); + subchannel_ = nullptr; + connected_subchannel_.reset(); + } +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::StartConnectivityWatchLocked() { + if (subchannel_list_->tracer()->enabled()) { + gpr_log( + GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): requesting connectivity change " + "notification (from %s)", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), + subchannel_list_->num_subchannels(), subchannel_, + grpc_connectivity_state_name(pending_connectivity_state_unsafe_)); + } + connectivity_notification_pending_ = true; + grpc_subchannel_notify_on_state_change( + subchannel_, subchannel_list_->policy()->interested_parties(), + &pending_connectivity_state_unsafe_, + &connectivity_changed_closure_); +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::StopConnectivityWatchLocked() { + if (subchannel_list_->tracer()->enabled()) { + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): stopping connectivity watch", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), + subchannel_list_->num_subchannels(), subchannel_); + } + GPR_ASSERT(connectivity_notification_pending_); + connectivity_notification_pending_ = false; +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::CancelConnectivityWatchLocked( + const char* reason) { + if (subchannel_list_->tracer()->enabled()) { + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): canceling connectivity watch (%s)", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), + subchannel_list_->num_subchannels(), subchannel_, reason); + } + grpc_subchannel_notify_on_state_change(subchannel_, nullptr, nullptr, + &connectivity_changed_closure_); +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::OnConnectivityChangedLocked( + void* arg, grpc_error* error) { + SubchannelData* sd = static_cast<SubchannelData*>(arg); + // Now that we're inside the combiner, copy the pending connectivity + // state (which was set by the connectivity state watcher) to + // curr_connectivity_state_, which is what we use inside of the combiner. + sd->curr_connectivity_state_ = sd->pending_connectivity_state_unsafe_; + sd->ProcessConnectivityChangeLocked(error); +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::ShutdownLocked(const char* reason) { + // If there's a pending notification for this subchannel, cancel it; + // the callback is responsible for unreffing the subchannel. + // Otherwise, unref the subchannel directly. + if (connectivity_notification_pending_) { + CancelConnectivityWatchLocked(reason); + } else if (subchannel_ != nullptr) { + UnrefSubchannelLocked(reason); + } +} + +// +// SubchannelList +// + +template <typename SubchannelListType, typename SubchannelDataType> +SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( + LoadBalancingPolicy* policy, TraceFlag* tracer, const grpc_lb_addresses* addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, - const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb); + const grpc_channel_args& args) + : RefCountedWithTracing<SubchannelListType>(tracer), + policy_(policy), + tracer_(tracer) { + if (tracer_->enabled()) { + gpr_log(GPR_DEBUG, + "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", + tracer_->name(), policy, this, addresses->num_addresses); + } + subchannels_.reserve(addresses->num_addresses); + // We need to remove the LB addresses in order to be able to compare the + // subchannel keys of subchannels from a different batch of addresses. + static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + GRPC_ARG_LB_ADDRESSES}; + // Create a subchannel for each address. + grpc_subchannel_args sc_args; + for (size_t i = 0; i < addresses->num_addresses; i++) { + // If there were any balancer, we would have chosen grpclb policy instead. + GPR_ASSERT(!addresses->addresses[i].is_balancer); + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + grpc_arg addr_arg = + grpc_create_subchannel_address_arg(&addresses->addresses[i].address); + grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( + &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1); + gpr_free(addr_arg.value.string); + sc_args.args = new_args; + grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( + client_channel_factory, &sc_args); + grpc_channel_args_destroy(new_args); + if (subchannel == nullptr) { + // Subchannel could not be created. + if (tracer_->enabled()) { + char* address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log(GPR_DEBUG, + "[%s %p] could not create subchannel for address uri %s, " + "ignoring", + tracer_->name(), policy_, address_uri); + gpr_free(address_uri); + } + continue; + } + if (tracer_->enabled()) { + char* address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR + ": Created subchannel %p for address uri %s", + tracer_->name(), policy_, this, subchannels_.size(), subchannel, + address_uri); + gpr_free(address_uri); + } + subchannels_.emplace_back(static_cast<SubchannelListType*>(this), + addresses->user_data_vtable, + addresses->addresses[i], subchannel, combiner); + } +} -void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list, - const char* reason); +template <typename SubchannelListType, typename SubchannelDataType> +SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() { + if (tracer_->enabled()) { + gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p", + tracer_->name(), policy_, this); + } +} -void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list, - const char* reason); +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelList<SubchannelListType, + SubchannelDataType>::ShutdownLocked(const char* reason) { + if (tracer_->enabled()) { + gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)", + tracer_->name(), policy_, this, reason); + } + GPR_ASSERT(!shutting_down_); + shutting_down_ = true; + for (size_t i = 0; i < subchannels_.size(); i++) { + SubchannelDataType* sd = &subchannels_[i]; + sd->ShutdownLocked(reason); + } +} -/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The -/// connectivity state notification callback will ultimately unref it. -void grpc_lb_subchannel_list_shutdown_and_unref( - grpc_lb_subchannel_list* subchannel_list, const char* reason); +} // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */ |