diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 93 |
1 files changed, 43 insertions, 50 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index d454401a66..74c17612a2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -24,6 +24,7 @@ #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" @@ -75,11 +76,9 @@ class PickFirst : public LoadBalancingPolicy { PickFirstSubchannelData( SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, grpc_subchannel* subchannel, + const ServerAddress& address, grpc_subchannel* subchannel, grpc_combiner* combiner) - : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, - combiner) {} + : SubchannelData(subchannel_list, address, subchannel, combiner) {} void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state, grpc_error* error) override; @@ -95,7 +94,7 @@ class PickFirst : public LoadBalancingPolicy { PickFirstSubchannelData> { public: PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer, - const grpc_lb_addresses* addresses, + const ServerAddressList& addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, const grpc_channel_args& args) @@ -337,8 +336,8 @@ void PickFirst::UpdateChildRefsLocked() { void PickFirst::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { AutoChildRefsUpdater guard(this); - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { + const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); + if (addresses == nullptr) { if (subchannel_list_ == nullptr) { // If we don't have a current subchannel list, go into TRANSIENT FAILURE. grpc_connectivity_state_set( @@ -354,19 +353,17 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, } return; } - const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p received update with %" PRIuPTR " addresses", this, - addresses->num_addresses); + addresses->size()); } grpc_arg new_arg = grpc_channel_arg_integer_create( const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1); grpc_channel_args* new_args = grpc_channel_args_copy_and_add(&args, &new_arg, 1); auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>( - this, &grpc_lb_pick_first_trace, addresses, combiner(), + this, &grpc_lb_pick_first_trace, *addresses, combiner(), client_channel_factory(), *new_args); grpc_channel_args_destroy(new_args); if (subchannel_list->num_subchannels() == 0) { @@ -380,6 +377,31 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, selected_ = nullptr; return; } + // If one of the subchannels in the new list is already in state + // READY, then select it immediately. This can happen when the + // currently selected subchannel is also present in the update. It + // can also happen if one of the subchannels in the update is already + // in the subchannel index because it's in use by another channel. + for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { + PickFirstSubchannelData* sd = subchannel_list->subchannel(i); + grpc_error* error = GRPC_ERROR_NONE; + grpc_connectivity_state state = sd->CheckConnectivityStateLocked(&error); + GRPC_ERROR_UNREF(error); + if (state == GRPC_CHANNEL_READY) { + subchannel_list_ = std::move(subchannel_list); + sd->ProcessUnselectedReadyLocked(); + sd->StartConnectivityWatchLocked(); + // If there was a previously pending update (which may or may + // not have contained the currently selected subchannel), drop + // it, so that it doesn't override what we've done here. + latest_pending_subchannel_list_.reset(); + // Make sure that subsequent calls to ExitIdleLocked() don't cause + // us to start watching a subchannel other than the one we've + // selected. + started_picking_ = true; + return; + } + } if (selected_ == nullptr) { // We don't yet have a selected subchannel, so replace the current // subchannel list immediately. @@ -387,46 +409,14 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, // If we've started picking, start trying to connect to the first // subchannel in the new list. if (started_picking_) { - subchannel_list_->subchannel(0) - ->CheckConnectivityStateAndStartWatchingLocked(); + // Note: No need to use CheckConnectivityStateAndStartWatchingLocked() + // here, since we've already checked the initial connectivity + // state of all subchannels above. + subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); } } else { - // We do have a selected subchannel. - // Check if it's present in the new list. If so, we're done. - for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { - PickFirstSubchannelData* sd = subchannel_list->subchannel(i); - if (sd->subchannel() == selected_->subchannel()) { - // The currently selected subchannel is in the update: we are done. - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, - "Pick First %p found already selected subchannel %p " - "at update index %" PRIuPTR " of %" PRIuPTR "; update done", - this, selected_->subchannel(), i, - subchannel_list->num_subchannels()); - } - // Make sure it's in state READY. It might not be if we grabbed - // the combiner while a connectivity state notification - // informing us otherwise is pending. - // Note that CheckConnectivityStateLocked() also takes a ref to - // the connected subchannel. - grpc_error* error = GRPC_ERROR_NONE; - if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) { - selected_ = sd; - subchannel_list_ = std::move(subchannel_list); - sd->StartConnectivityWatchLocked(); - // If there was a previously pending update (which may or may - // not have contained the currently selected subchannel), drop - // it, so that it doesn't override what we've done here. - latest_pending_subchannel_list_.reset(); - return; - } - GRPC_ERROR_UNREF(error); - } - } - // Not keeping the previous selected subchannel, so set the latest - // pending subchannel list to the new subchannel list. We will wait - // for it to report READY before swapping it into the current - // subchannel list. + // We do have a selected subchannel, so keep using it until one of + // the subchannels in the new list reports READY. if (latest_pending_subchannel_list_ != nullptr) { if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, @@ -440,8 +430,11 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, // If we've started picking, start trying to connect to the first // subchannel in the new list. if (started_picking_) { + // Note: No need to use CheckConnectivityStateAndStartWatchingLocked() + // here, since we've already checked the initial connectivity + // state of all subchannels above. latest_pending_subchannel_list_->subchannel(0) - ->CheckConnectivityStateAndStartWatchingLocked(); + ->StartConnectivityWatchLocked(); } } } |