diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc | 40 |
1 files changed, 33 insertions, 7 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 63089afbd7..2a16975131 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -82,6 +82,8 @@ class RoundRobin : public LoadBalancingPolicy { // Data for a particular subchannel in a subchannel list. // This subclass adds the following functionality: + // - Tracks user_data associated with each address, which will be + // returned along with picks that select the subchannel. // - Tracks the previous connectivity state of the subchannel, so that // we know how many subchannels are in each state. class RoundRobinSubchannelData @@ -91,9 +93,26 @@ class RoundRobin : public LoadBalancingPolicy { RoundRobinSubchannelData( SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>* subchannel_list, - const ServerAddress& address, grpc_subchannel* subchannel, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, grpc_subchannel* subchannel, grpc_combiner* combiner) - : SubchannelData(subchannel_list, address, subchannel, combiner) {} + : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, + combiner), + user_data_vtable_(user_data_vtable), + user_data_(user_data_vtable_ != nullptr + ? user_data_vtable_->copy(address.user_data) + : nullptr) {} + + void UnrefSubchannelLocked(const char* reason) override { + SubchannelData::UnrefSubchannelLocked(reason); + if (user_data_ != nullptr) { + GPR_ASSERT(user_data_vtable_ != nullptr); + user_data_vtable_->destroy(user_data_); + user_data_ = nullptr; + } + } + + void* user_data() const { return user_data_; } grpc_connectivity_state connectivity_state() const { return last_connectivity_state_; @@ -106,6 +125,8 @@ class RoundRobin : public LoadBalancingPolicy { void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state, grpc_error* error) override; + const grpc_lb_user_data_vtable* user_data_vtable_; + void* user_data_ = nullptr; grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE; }; @@ -116,7 +137,7 @@ class RoundRobin : public LoadBalancingPolicy { public: RoundRobinSubchannelList( RoundRobin* policy, TraceFlag* tracer, - const ServerAddressList& addresses, grpc_combiner* combiner, + const grpc_lb_addresses* addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, const grpc_channel_args& args) : SubchannelList(policy, tracer, addresses, combiner, @@ -333,6 +354,9 @@ bool RoundRobin::DoPickLocked(PickState* pick) { subchannel_list_->subchannel(next_ready_index); GPR_ASSERT(sd->connected_subchannel() != nullptr); pick->connected_subchannel = sd->connected_subchannel()->Ref(); + if (pick->user_data != nullptr) { + *pick->user_data = sd->user_data(); + } if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " @@ -643,9 +667,9 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, void RoundRobin::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { + const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); AutoChildRefsUpdater guard(this); - const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); - if (addresses == nullptr) { + if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this); // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. // Otherwise, keep using the current subchannel list (ignore this update). @@ -657,9 +681,11 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args, } return; } + grpc_lb_addresses* addresses = + static_cast<grpc_lb_addresses*>(arg->value.pointer.p); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", - this, addresses->size()); + this, addresses->num_addresses); } // Replace latest_pending_subchannel_list_. if (latest_pending_subchannel_list_ != nullptr) { @@ -670,7 +696,7 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args, } } latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>( - this, &grpc_lb_round_robin_trace, *addresses, combiner(), + this, &grpc_lb_round_robin_trace, addresses, combiner(), client_channel_factory(), args); // If we haven't started picking yet or the new list is empty, // immediately promote the new list to the current list. |