diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/xds/xds.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/xds/xds.cc | 645 |
1 files changed, 246 insertions, 399 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 59923ab861..8787f5bcc2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -26,30 +26,26 @@ /// channel that uses pick_first to select from the list of balancer /// addresses. /// -/// The first time the policy gets a request for a pick, a ping, or to exit -/// the idle state, \a StartPickingLocked() is called. This method is -/// responsible for instantiating the internal *streaming* call to the LB -/// server (whichever address pick_first chose). The call will be complete -/// when either the balancer sends status or when we cancel the call (e.g., -/// because we are shutting down). In needed, we retry the call. If we -/// received at least one valid message from the server, a new call attempt -/// will be made immediately; otherwise, we apply back-off delays between -/// attempts. +/// The first time the xDS policy gets a request for a pick or to exit the idle +/// state, \a StartPickingLocked() is called. This method is responsible for +/// instantiating the internal *streaming* call to the LB server (whichever +/// address pick_first chose). The call will be complete when either the +/// balancer sends status or when we cancel the call (e.g., because we are +/// shutting down). In needed, we retry the call. If we received at least one +/// valid message from the server, a new call attempt will be made immediately; +/// otherwise, we apply back-off delays between attempts. /// -/// We maintain an internal round_robin policy instance for distributing +/// We maintain an internal child policy (round_robin) instance for distributing /// requests across backends. Whenever we receive a new serverlist from -/// the balancer, we update the round_robin policy with the new list of -/// addresses. If we cannot communicate with the balancer on startup, -/// however, we may enter fallback mode, in which case we will populate -/// the RR policy's addresses from the backend addresses returned by the -/// resolver. +/// the balancer, we update the child policy with the new list of +/// addresses. /// -/// Once an RR policy instance is in place (and getting updated as described), -/// calls for a pick, a ping, or a cancellation will be serviced right -/// away by forwarding them to the RR instance. Any time there's no RR -/// policy available (i.e., right after the creation of the gRPCLB policy), -/// pick and ping requests are added to a list of pending picks and pings -/// to be flushed and serviced when the RR policy instance becomes available. +/// Once a child policy instance is in place (and getting updated as +/// described), calls for a pick, or a cancellation will be serviced right away +/// by forwarding them to the child policy instance. Any time there's no child +/// policy available (i.e., right after the creation of the xDS policy), pick +/// requests are added to a list of pending picks to be flushed and serviced +/// when the child policy instance becomes available. /// /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the /// high level design and details. @@ -83,6 +79,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" @@ -118,11 +115,16 @@ TraceFlag grpc_lb_xds_trace(false, "xds"); namespace { +constexpr char kXds[] = "xds_experimental"; + class XdsLb : public LoadBalancingPolicy { public: - XdsLb(const grpc_lb_addresses* addresses, const Args& args); + explicit XdsLb(const Args& args); + + const char* name() const override { return kXds; } - void UpdateLocked(const grpc_channel_args& args) override; + void UpdateLocked(const grpc_channel_args& args, + grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, @@ -141,10 +143,10 @@ class XdsLb : public LoadBalancingPolicy { private: /// Linked list of pending pick requests. It stores all information needed to - /// eventually call (Round Robin's) pick() on them. They mainly stay pending - /// waiting for the RR policy to be created. + /// eventually call pick() on them. They mainly stay pending waiting for the + /// child policy to be created. /// - /// Note that when a pick is sent to the RR policy, we inject our own + /// Note that when a pick is sent to the child policy, we inject our own /// on_complete callback, so that we can intercept the result before /// invoking the original on_complete callback. This allows us to set the /// LB token metadata and add client_stats to the call context. @@ -159,9 +161,6 @@ class XdsLb : public LoadBalancingPolicy { // Our on_complete closure and the original one. grpc_closure on_complete; grpc_closure* original_on_complete; - // The LB token associated with the pick. This is set via user_data in - // the pick. - grpc_mdelem lb_token; // Stats for client-side load reporting. RefCountedPtr<XdsLbClientStats> client_stats; // Next pending pick. @@ -169,8 +168,7 @@ class XdsLb : public LoadBalancingPolicy { }; /// Contains a call to the LB server and all the data related to the call. - class BalancerCallState - : public InternallyRefCountedWithTracing<BalancerCallState> { + class BalancerCallState : public InternallyRefCounted<BalancerCallState> { public: explicit BalancerCallState( RefCountedPtr<LoadBalancingPolicy> parent_xdslb_policy); @@ -202,7 +200,6 @@ class XdsLb : public LoadBalancingPolicy { static bool LoadReportCountersAreZero(xds_grpclb_request* request); static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error); - static void ClientLoadReportDoneLocked(void* arg, grpc_error* error); static void OnInitialRequestSentLocked(void* arg, grpc_error* error); static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error); static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error); @@ -261,23 +258,23 @@ class XdsLb : public LoadBalancingPolicy { grpc_error* error); // Pending pick methods. - static void PendingPickSetMetadataAndContext(PendingPick* pp); + static void PendingPickCleanup(PendingPick* pp); PendingPick* PendingPickCreate(PickState* pick); void AddPendingPick(PendingPick* pp); static void OnPendingPickComplete(void* arg, grpc_error* error); - // Methods for dealing with the RR policy. - void CreateOrUpdateRoundRobinPolicyLocked(); - grpc_channel_args* CreateRoundRobinPolicyArgsLocked(); - void CreateRoundRobinPolicyLocked(const Args& args); - bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, - grpc_error** error); - void UpdateConnectivityStateFromRoundRobinPolicyLocked( - grpc_error* rr_state_error); - static void OnRoundRobinConnectivityChangedLocked(void* arg, - grpc_error* error); - static void OnRoundRobinRequestReresolutionLocked(void* arg, - grpc_error* error); + // Methods for dealing with the child policy. + void CreateOrUpdateChildPolicyLocked(); + grpc_channel_args* CreateChildPolicyArgsLocked(); + void CreateChildPolicyLocked(const Args& args); + bool PickFromChildPolicyLocked(bool force_async, PendingPick* pp, + grpc_error** error); + void UpdateConnectivityStateFromChildPolicyLocked( + grpc_error* child_state_error); + static void OnChildPolicyConnectivityChangedLocked(void* arg, + grpc_error* error); + static void OnChildPolicyRequestReresolutionLocked(void* arg, + grpc_error* error); // Who the client is trying to communicate with. const char* server_name_ = nullptr; @@ -324,67 +321,35 @@ class XdsLb : public LoadBalancingPolicy { // 0 means not using fallback. int lb_fallback_timeout_ms_ = 0; // The backend addresses from the resolver. - grpc_lb_addresses* fallback_backend_addresses_ = nullptr; + UniquePtr<ServerAddressList> fallback_backend_addresses_; // Fallback timer. bool fallback_timer_callback_pending_ = false; grpc_timer lb_fallback_timer_; grpc_closure lb_on_fallback_; - // Pending picks that are waiting on the RR policy's connectivity. + // Pending picks that are waiting on the xDS policy's connectivity. PendingPick* pending_picks_ = nullptr; - // The RR policy to use for the backends. - OrphanablePtr<LoadBalancingPolicy> rr_policy_; - grpc_connectivity_state rr_connectivity_state_; - grpc_closure on_rr_connectivity_changed_; - grpc_closure on_rr_request_reresolution_; + // The policy to use for the backends. + OrphanablePtr<LoadBalancingPolicy> child_policy_; + grpc_connectivity_state child_connectivity_state_; + grpc_closure on_child_connectivity_changed_; + grpc_closure on_child_request_reresolution_; }; // // serverlist parsing code // -// vtable for LB tokens in grpc_lb_addresses -void* lb_token_copy(void* token) { - return token == nullptr - ? nullptr - : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload; -} -void lb_token_destroy(void* token) { - if (token != nullptr) { - GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token}); - } -} -int lb_token_cmp(void* token1, void* token2) { - if (token1 > token2) return 1; - if (token1 < token2) return -1; - return 0; -} -const grpc_lb_user_data_vtable lb_token_vtable = { - lb_token_copy, lb_token_destroy, lb_token_cmp}; - // Returns the backend addresses extracted from the given addresses. -grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) { - // First pass: count the number of backend addresses. - size_t num_backends = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) { - ++num_backends; +UniquePtr<ServerAddressList> ExtractBackendAddresses( + const ServerAddressList& addresses) { + auto backend_addresses = MakeUnique<ServerAddressList>(); + for (size_t i = 0; i < addresses.size(); ++i) { + if (!addresses[i].IsBalancer()) { + backend_addresses->emplace_back(addresses[i]); } } - // Second pass: actually populate the addresses and (empty) LB tokens. - grpc_lb_addresses* backend_addresses = - grpc_lb_addresses_create(num_backends, &lb_token_vtable); - size_t num_copied = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) continue; - const grpc_resolved_address* addr = &addresses->addresses[i].address; - grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, - addr->len, false /* is_balancer */, - nullptr /* balancer_name */, - (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); - ++num_copied; - } return backend_addresses; } @@ -434,56 +399,17 @@ void ParseServer(const xds_grpclb_server* server, grpc_resolved_address* addr) { } // Returns addresses extracted from \a serverlist. -grpc_lb_addresses* ProcessServerlist(const xds_grpclb_serverlist* serverlist) { - size_t num_valid = 0; - /* first pass: count how many are valid in order to allocate the necessary - * memory in a single block */ +UniquePtr<ServerAddressList> ProcessServerlist( + const xds_grpclb_serverlist* serverlist) { + auto addresses = MakeUnique<ServerAddressList>(); for (size_t i = 0; i < serverlist->num_servers; ++i) { - if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid; - } - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_valid, &lb_token_vtable); - /* second pass: actually populate the addresses and LB tokens (aka user data - * to the outside world) to be read by the RR policy during its creation. - * Given that the validity tests are very cheap, they are performed again - * instead of marking the valid ones during the first pass, as this would - * incurr in an allocation due to the arbitrary number of server */ - size_t addr_idx = 0; - for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { - const xds_grpclb_server* server = serverlist->servers[sl_idx]; - if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue; - GPR_ASSERT(addr_idx < num_valid); - /* address processing */ + const xds_grpclb_server* server = serverlist->servers[i]; + if (!IsServerValid(serverlist->servers[i], i, false)) continue; grpc_resolved_address addr; ParseServer(server, &addr); - /* lb token processing */ - void* user_data; - if (server->has_load_balance_token) { - const size_t lb_token_max_length = - GPR_ARRAY_SIZE(server->load_balance_token); - const size_t lb_token_length = - strnlen(server->load_balance_token, lb_token_max_length); - grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( - server->load_balance_token, lb_token_length); - user_data = - (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr) - .payload; - } else { - char* uri = grpc_sockaddr_to_uri(&addr); - gpr_log(GPR_INFO, - "Missing LB token for backend address '%s'. The empty token will " - "be used instead", - uri); - gpr_free(uri); - user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; - } - grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, - false /* is_balancer */, - nullptr /* balancer_name */, user_data); - ++addr_idx; + addresses->emplace_back(addr, nullptr); } - GPR_ASSERT(addr_idx == num_valid); - return lb_addresses; + return addresses; } // @@ -492,7 +418,7 @@ grpc_lb_addresses* ProcessServerlist(const xds_grpclb_serverlist* serverlist) { XdsLb::BalancerCallState::BalancerCallState( RefCountedPtr<LoadBalancingPolicy> parent_xdslb_policy) - : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_xds_trace), + : InternallyRefCounted<BalancerCallState>(&grpc_lb_xds_trace), xdslb_policy_(std::move(parent_xdslb_policy)) { GPR_ASSERT(xdslb_policy_ != nullptr); GPR_ASSERT(!xdslb_policy()->shutting_down_); @@ -671,6 +597,7 @@ bool XdsLb::BalancerCallState::LoadReportCountersAreZero( (drop_entries == nullptr || drop_entries->empty()); } +// TODO(vpowar): Use LRS to send the client Load Report. void XdsLb::BalancerCallState::SendClientLoadReportLocked() { // Construct message payload. GPR_ASSERT(send_message_payload_ == nullptr); @@ -688,38 +615,8 @@ void XdsLb::BalancerCallState::SendClientLoadReportLocked() { } else { last_client_load_report_counters_were_zero_ = false; } - grpc_slice request_payload_slice = xds_grpclb_request_encode(request); - send_message_payload_ = - grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_slice_unref_internal(request_payload_slice); + // TODO(vpowar): Send the report on LRS stream. xds_grpclb_request_destroy(request); - // Send the report. - grpc_op op; - memset(&op, 0, sizeof(op)); - op.op = GRPC_OP_SEND_MESSAGE; - op.data.send_message.send_message = send_message_payload_; - GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked, - this, grpc_combiner_scheduler(xdslb_policy()->combiner())); - grpc_call_error call_error = grpc_call_start_batch_and_execute( - lb_call_, &op, 1, &client_load_report_closure_); - if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { - gpr_log(GPR_ERROR, "[xdslb %p] call_error=%d", xdslb_policy_.get(), - call_error); - GPR_ASSERT(GRPC_CALL_OK == call_error); - } -} - -void XdsLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg, - grpc_error* error) { - BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); - XdsLb* xdslb_policy = lb_calld->xdslb_policy(); - grpc_byte_buffer_destroy(lb_calld->send_message_payload_); - lb_calld->send_message_payload_ = nullptr; - if (error != GRPC_ERROR_NONE || lb_calld != xdslb_policy->lb_calld_.get()) { - lb_calld->Unref(DEBUG_LOCATION, "client_load_report"); - return; - } - lb_calld->ScheduleNextClientLoadReportLocked(); } void XdsLb::BalancerCallState::OnInitialRequestSentLocked(void* arg, @@ -823,8 +720,7 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( xds_grpclb_destroy_serverlist(xdslb_policy->serverlist_); } else { /* or dispose of the fallback */ - grpc_lb_addresses_destroy(xdslb_policy->fallback_backend_addresses_); - xdslb_policy->fallback_backend_addresses_ = nullptr; + xdslb_policy->fallback_backend_addresses_.reset(); if (xdslb_policy->fallback_timer_callback_pending_) { grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_); } @@ -833,7 +729,7 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( // serverlist instance will be destroyed either upon the next // update or when the XdsLb instance is destroyed. xdslb_policy->serverlist_ = serverlist; - xdslb_policy->CreateOrUpdateRoundRobinPolicyLocked(); + xdslb_policy->CreateOrUpdateChildPolicyLocked(); } } else { if (grpc_lb_xds_trace.enabled()) { @@ -866,7 +762,7 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( &lb_calld->lb_on_balancer_message_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); } else { - lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown"); + lb_calld->Unref(DEBUG_LOCATION, "on_message_received+xds_shutdown"); } } @@ -910,31 +806,15 @@ void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked( // helper code for creating balancer channel // -grpc_lb_addresses* ExtractBalancerAddresses( - const grpc_lb_addresses* addresses) { - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; - } - // There must be at least one balancer address, or else the - // client_channel would not have chosen this LB policy. - GPR_ASSERT(num_grpclb_addrs > 0); - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_grpclb_addrs, nullptr); - size_t lb_addresses_idx = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) continue; - if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); +UniquePtr<ServerAddressList> ExtractBalancerAddresses( + const ServerAddressList& addresses) { + auto balancer_addresses = MakeUnique<ServerAddressList>(); + for (size_t i = 0; i < addresses.size(); ++i) { + if (addresses[i].IsBalancer()) { + balancer_addresses->emplace_back(addresses[i]); } - grpc_lb_addresses_set_address( - lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr, - addresses->addresses[i].address.len, false /* is balancer */, - addresses->addresses[i].balancer_name, nullptr /* user data */); } - GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx); - return lb_addresses; + return balancer_addresses; } /* Returns the channel args for the LB channel, used to create a bidirectional @@ -944,12 +824,13 @@ grpc_lb_addresses* ExtractBalancerAddresses( * - \a addresses: corresponding to the balancers. * - \a response_generator: in order to propagate updates from the resolver * above the grpclb policy. - * - \a args: other args inherited from the grpclb policy. */ + * - \a args: other args inherited from the xds policy. */ grpc_channel_args* BuildBalancerChannelArgs( - const grpc_lb_addresses* addresses, + const ServerAddressList& addresses, FakeResolverResponseGenerator* response_generator, const grpc_channel_args* args) { - grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses); + UniquePtr<ServerAddressList> balancer_addresses = + ExtractBalancerAddresses(addresses); // Channel args to remove. static const char* args_to_remove[] = { // LB policy name, since we want to use the default (pick_first) in @@ -966,10 +847,10 @@ grpc_channel_args* BuildBalancerChannelArgs( // resolver will have is_balancer=false, whereas our own addresses have // is_balancer=true. We need the LB channel to return addresses with // is_balancer=false so that it does not wind up recursively using the - // grpclb LB policy, as per the special case logic in client_channel.c. - GRPC_ARG_LB_ADDRESSES, + // xds LB policy, as per the special case logic in client_channel.c. + GRPC_ARG_SERVER_ADDRESS_LIST, // The fake resolver response generator, because we are replacing it - // with the one from the grpclb policy, used to propagate updates to + // with the one from the xds policy, used to propagate updates to // the LB channel. GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, // The LB channel should use the authority indicated by the target @@ -983,15 +864,15 @@ grpc_channel_args* BuildBalancerChannelArgs( }; // Channel args to add. const grpc_arg args_to_add[] = { - // New LB addresses. + // New server address list. // Note that we pass these in both when creating the LB channel // and via the fake resolver. The latter is what actually gets used. - grpc_lb_addresses_create_channel_arg(lb_addresses), + CreateServerAddressListChannelArg(balancer_addresses.get()), // The fake resolver response generator, which we use to inject // address updates into the LB channel. grpc_core::FakeResolverResponseGenerator::MakeChannelArg( response_generator), - // A channel arg indicating the target is a grpclb load balancer. + // A channel arg indicating the target is a xds load balancer. grpc_channel_arg_integer_create( const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER), 1), // A channel arg indicating this is an internal channels, aka it is @@ -1004,18 +885,15 @@ grpc_channel_args* BuildBalancerChannelArgs( args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add, GPR_ARRAY_SIZE(args_to_add)); // Make any necessary modifications for security. - new_args = grpc_lb_policy_xds_modify_lb_channel_args(new_args); - // Clean up. - grpc_lb_addresses_destroy(lb_addresses); - return new_args; + return grpc_lb_policy_xds_modify_lb_channel_args(new_args); } // // ctor and dtor // -XdsLb::XdsLb(const grpc_lb_addresses* addresses, - const LoadBalancingPolicy::Args& args) +// TODO(vishalpowar): Use lb_config in args to configure LB policy. +XdsLb::XdsLb(const LoadBalancingPolicy::Args& args) : LoadBalancingPolicy(args), response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()), lb_call_backoff_( @@ -1031,11 +909,11 @@ XdsLb::XdsLb(const grpc_lb_addresses* addresses, GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, &XdsLb::OnBalancerChannelConnectivityChangedLocked, this, grpc_combiner_scheduler(args.combiner)); - GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_, - &XdsLb::OnRoundRobinConnectivityChangedLocked, this, + GRPC_CLOSURE_INIT(&on_child_connectivity_changed_, + &XdsLb::OnChildPolicyConnectivityChangedLocked, this, grpc_combiner_scheduler(args.combiner)); - GRPC_CLOSURE_INIT(&on_rr_request_reresolution_, - &XdsLb::OnRoundRobinRequestReresolutionLocked, this, + GRPC_CLOSURE_INIT(&on_child_request_reresolution_, + &XdsLb::OnChildPolicyRequestReresolutionLocked, this, grpc_combiner_scheduler(args.combiner)); grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "xds"); // Record server name. @@ -1071,9 +949,6 @@ XdsLb::~XdsLb() { if (serverlist_ != nullptr) { xds_grpclb_destroy_serverlist(serverlist_); } - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } grpc_subchannel_index_unref(); } @@ -1087,7 +962,7 @@ void XdsLb::ShutdownLocked() { if (fallback_timer_callback_pending_) { grpc_timer_cancel(&lb_fallback_timer_); } - rr_policy_.reset(); + child_policy_.reset(); TryReresolutionLocked(&grpc_lb_xds_trace, GRPC_ERROR_CANCELLED); // We destroy the LB channel here instead of in our destructor because // destroying the channel triggers a last callback to @@ -1100,7 +975,7 @@ void XdsLb::ShutdownLocked() { gpr_mu_unlock(&lb_channel_mu_); } grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_REF(error), "grpclb_shutdown"); + GRPC_ERROR_REF(error), "xds_shutdown"); // Clear pending picks. PendingPick* pp; while ((pp = pending_picks_) != nullptr) { @@ -1121,7 +996,6 @@ void XdsLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { while ((pp = pending_picks_) != nullptr) { pending_picks_ = pp->next; pp->pick->on_complete = pp->original_on_complete; - pp->pick->user_data = nullptr; grpc_error* error = GRPC_ERROR_NONE; if (new_policy->PickLocked(pp->pick, &error)) { // Synchronous return; schedule closure. @@ -1133,13 +1007,13 @@ void XdsLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { // Cancel a specific pending pick. // -// A grpclb pick progresses as follows: -// - If there's a Round Robin policy (rr_policy_) available, it'll be -// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From -// that point onwards, it'll be RR's responsibility. For cancellations, that -// implies the pick needs also be cancelled by the RR instance. -// - Otherwise, without an RR instance, picks stay pending at this policy's -// level (grpclb), inside the pending_picks_ list. To cancel these, +// A pick progresses as follows: +// - If there's a child policy available, it'll be handed over to child policy +// (in CreateChildPolicyLocked()). From that point onwards, it'll be the +// child policy's responsibility. For cancellations, that implies the pick +// needs to be also cancelled by the child policy instance. +// - Otherwise, without a child policy instance, picks stay pending at this +// policy's level (xds), inside the pending_picks_ list. To cancel these, // we invoke the completion closure and set the pick's connected // subchannel to nullptr right here. void XdsLb::CancelPickLocked(PickState* pick, grpc_error* error) { @@ -1159,21 +1033,21 @@ void XdsLb::CancelPickLocked(PickState* pick, grpc_error* error) { } pp = next; } - if (rr_policy_ != nullptr) { - rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error)); + if (child_policy_ != nullptr) { + child_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } // Cancel all pending picks. // -// A grpclb pick progresses as follows: -// - If there's a Round Robin policy (rr_policy_) available, it'll be -// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From -// that point onwards, it'll be RR's responsibility. For cancellations, that -// implies the pick needs also be cancelled by the RR instance. -// - Otherwise, without an RR instance, picks stay pending at this policy's -// level (grpclb), inside the pending_picks_ list. To cancel these, +// A pick progresses as follows: +// - If there's a child policy available, it'll be handed over to child policy +// (in CreateChildPolicyLocked()). From that point onwards, it'll be the +// child policy's responsibility. For cancellations, that implies the pick +// needs to be also cancelled by the child policy instance. +// - Otherwise, without a child policy instance, picks stay pending at this +// policy's level (xds), inside the pending_picks_ list. To cancel these, // we invoke the completion closure and set the pick's connected // subchannel to nullptr right here. void XdsLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, @@ -1183,7 +1057,7 @@ void XdsLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, pending_picks_ = nullptr; while (pp != nullptr) { PendingPick* next = pp->next; - if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((*pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { // Note: pp is deleted in this callback. GRPC_CLOSURE_SCHED(&pp->on_complete, @@ -1195,10 +1069,10 @@ void XdsLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, } pp = next; } - if (rr_policy_ != nullptr) { - rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask, - initial_metadata_flags_eq, - GRPC_ERROR_REF(error)); + if (child_policy_ != nullptr) { + child_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask, + initial_metadata_flags_eq, + GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } @@ -1213,22 +1087,21 @@ void XdsLb::ResetBackoffLocked() { if (lb_channel_ != nullptr) { grpc_channel_reset_connect_backoff(lb_channel_); } - if (rr_policy_ != nullptr) { - rr_policy_->ResetBackoffLocked(); + if (child_policy_ != nullptr) { + child_policy_->ResetBackoffLocked(); } } bool XdsLb::PickLocked(PickState* pick, grpc_error** error) { PendingPick* pp = PendingPickCreate(pick); bool pick_done = false; - if (rr_policy_ != nullptr) { + if (child_policy_ != nullptr) { if (grpc_lb_xds_trace.enabled()) { - gpr_log(GPR_INFO, "[xdslb %p] about to PICK from RR %p", this, - rr_policy_.get()); + gpr_log(GPR_INFO, "[xdslb %p] about to PICK from policy %p", this, + child_policy_.get()); } - pick_done = - PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error); - } else { // rr_policy_ == NULL + pick_done = PickFromChildPolicyLocked(false /* force_async */, pp, error); + } else { // child_policy_ == NULL if (pick->on_complete == nullptr) { *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "No pick result available but synchronous result required."); @@ -1236,7 +1109,7 @@ bool XdsLb::PickLocked(PickState* pick, grpc_error** error) { } else { if (grpc_lb_xds_trace.enabled()) { gpr_log(GPR_INFO, - "[xdslb %p] No RR policy. Adding to grpclb's pending picks", + "[xdslb %p] No child policy. Adding to xds's pending picks", this); } AddPendingPick(pp); @@ -1251,8 +1124,8 @@ bool XdsLb::PickLocked(PickState* pick, grpc_error** error) { void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels, channelz::ChildRefsList* child_channels) { - // delegate to the RoundRobin to fill the children subchannels. - rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); + // delegate to the child_policy_ to fill the children subchannels. + child_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); MutexLock lock(&lb_channel_mu_); if (lb_channel_ != nullptr) { grpc_core::channelz::ChannelNode* channel_node = @@ -1275,21 +1148,16 @@ void XdsLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current, } void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { + const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); + if (addresses == nullptr) { // Ignore this update. gpr_log(GPR_ERROR, "[xdslb %p] No valid LB addresses channel arg in update, ignoring.", this); return; } - const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); // Update fallback address list. - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } - fallback_backend_addresses_ = ExtractBackendAddresses(addresses); + fallback_backend_addresses_ = ExtractBackendAddresses(*addresses); // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; @@ -1300,7 +1168,7 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); // Construct args for balancer channel. grpc_channel_args* lb_channel_args = - BuildBalancerChannelArgs(addresses, response_generator_.get(), &args); + BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args); // Create balancer channel if needed. if (lb_channel_ == nullptr) { char* uri_str; @@ -1319,12 +1187,15 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { grpc_channel_args_destroy(lb_channel_args); } -void XdsLb::UpdateLocked(const grpc_channel_args& args) { +// TODO(vishalpowar): Use lb_config to configure LB policy. +void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { ProcessChannelArgsLocked(args); - // Note: We have disabled fallback mode in the code, so we don't need to - // handle fallback address changes. + // Update the existing child policy. + // Note: We have disabled fallback mode in the code, so this child policy must + // have been created from a serverlist. // TODO(vpowar): Handle the fallback_address changes when we add support for // fallback in xDS. + if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked(); // Start watching the LB channel connectivity for connection, if not // already doing so. if (!watching_lb_channel_) { @@ -1445,8 +1316,8 @@ void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg, XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); if (xdslb_policy->shutting_down_) goto done; // Re-initialize the lb_call. This should also take care of updating the - // embedded RR policy. Note that the current RR policy, if any, will stay in - // effect until an update from the new lb_call is received. + // child policy. Note that the current child policy, if any, will + // stay in effect until an update from the new lb_call is received. switch (xdslb_policy->lb_channel_connectivity_) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_TRANSIENT_FAILURE: { @@ -1488,37 +1359,15 @@ void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg, // PendingPick // -// Adds lb_token of selected subchannel (address) to the call's initial -// metadata. -grpc_error* AddLbTokenToInitialMetadata( - grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage, - grpc_metadata_batch* initial_metadata) { - GPR_ASSERT(lb_token_mdelem_storage != nullptr); - GPR_ASSERT(!GRPC_MDISNULL(lb_token)); - return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, - lb_token); -} - // Destroy function used when embedding client stats in call context. void DestroyClientStats(void* arg) { static_cast<XdsLbClientStats*>(arg)->Unref(); } -void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) { - /* if connected_subchannel is nullptr, no pick has been made by the RR - * policy (e.g., all addresses failed to connect). There won't be any - * user_data/token available */ +void XdsLb::PendingPickCleanup(PendingPick* pp) { + // If connected_subchannel is nullptr, no pick has been made by the + // child policy (e.g., all addresses failed to connect). if (pp->pick->connected_subchannel != nullptr) { - if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) { - AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token), - &pp->pick->lb_token_mdelem_storage, - pp->pick->initial_metadata); - } else { - gpr_log(GPR_ERROR, - "[xdslb %p] No LB token for connected subchannel pick %p", - pp->xdslb_policy, pp->pick); - abort(); - } // Pass on client stats via context. Passes ownership of the reference. if (pp->client_stats != nullptr) { pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = @@ -1532,11 +1381,11 @@ void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) { } /* The \a on_complete closure passed as part of the pick requires keeping a - * reference to its associated round robin instance. We wrap this closure in - * order to unref the round robin instance upon its invocation */ + * reference to its associated child policy instance. We wrap this closure in + * order to unref the child policy instance upon its invocation */ void XdsLb::OnPendingPickComplete(void* arg, grpc_error* error) { PendingPick* pp = static_cast<PendingPick*>(arg); - PendingPickSetMetadataAndContext(pp); + PendingPickCleanup(pp); GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error)); Delete(pp); } @@ -1558,26 +1407,24 @@ void XdsLb::AddPendingPick(PendingPick* pp) { } // -// code for interacting with the RR policy +// code for interacting with the child policy // -// Performs a pick over \a rr_policy_. Given that a pick can return +// Performs a pick over \a child_policy_. Given that a pick can return // immediately (ignoring its completion callback), we need to perform the // cleanups this callback would otherwise be responsible for. // If \a force_async is true, then we will manually schedule the // completion callback even if the pick is available immediately. -bool XdsLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, - grpc_error** error) { - // Set client_stats and user_data. +bool XdsLb::PickFromChildPolicyLocked(bool force_async, PendingPick* pp, + grpc_error** error) { + // Set client_stats. if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { pp->client_stats = lb_calld_->client_stats()->Ref(); } - GPR_ASSERT(pp->pick->user_data == nullptr); - pp->pick->user_data = (void**)&pp->lb_token; - // Pick via the RR policy. - bool pick_done = rr_policy_->PickLocked(pp->pick, error); + // Pick via the child policy. + bool pick_done = child_policy_->PickLocked(pp->pick, error); if (pick_done) { - PendingPickSetMetadataAndContext(pp); + PendingPickCleanup(pp); if (force_async) { GRPC_CLOSURE_SCHED(pp->original_on_complete, *error); *error = GRPC_ERROR_NONE; @@ -1586,71 +1433,72 @@ bool XdsLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, Delete(pp); } // else, the pending pick will be registered and taken care of by the - // pending pick list inside the RR policy. Eventually, + // pending pick list inside the child policy. Eventually, // OnPendingPickComplete() will be called, which will (among other // things) add the LB token to the call's initial metadata. return pick_done; } -void XdsLb::CreateRoundRobinPolicyLocked(const Args& args) { - GPR_ASSERT(rr_policy_ == nullptr); - rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( +void XdsLb::CreateChildPolicyLocked(const Args& args) { + GPR_ASSERT(child_policy_ == nullptr); + child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( "round_robin", args); - if (GPR_UNLIKELY(rr_policy_ == nullptr)) { - gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a RoundRobin policy", this); + if (GPR_UNLIKELY(child_policy_ == nullptr)) { + gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a child policy", this); return; } // TODO(roth): We currently track this ref manually. Once the new // ClosureRef API is done, pass the RefCountedPtr<> along with the closure. - auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested"); + auto self = Ref(DEBUG_LOCATION, "on_child_reresolution_requested"); self.release(); - rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_); - grpc_error* rr_state_error = nullptr; - rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error); - // Connectivity state is a function of the RR policy updated/created. - UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error); - // Add the gRPC LB's interested_parties pollset_set to that of the newly - // created RR policy. This will make the RR policy progress upon activity on - // gRPC LB, which in turn is tied to the application's call. - grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(), + child_policy_->SetReresolutionClosureLocked(&on_child_request_reresolution_); + grpc_error* child_state_error = nullptr; + child_connectivity_state_ = + child_policy_->CheckConnectivityLocked(&child_state_error); + // Connectivity state is a function of the child policy updated/created. + UpdateConnectivityStateFromChildPolicyLocked(child_state_error); + // Add the xDS's interested_parties pollset_set to that of the newly created + // child policy. This will make the child policy progress upon activity on + // xDS LB, which in turn is tied to the application's call. + grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(), interested_parties()); - // Subscribe to changes to the connectivity of the new RR. + // Subscribe to changes to the connectivity of the new child policy. // TODO(roth): We currently track this ref manually. Once the new // ClosureRef API is done, pass the RefCountedPtr<> along with the closure. - self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed"); + self = Ref(DEBUG_LOCATION, "on_child_connectivity_changed"); self.release(); - rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_, - &on_rr_connectivity_changed_); - rr_policy_->ExitIdleLocked(); - // Send pending picks to RR policy. + child_policy_->NotifyOnStateChangeLocked(&child_connectivity_state_, + &on_child_connectivity_changed_); + child_policy_->ExitIdleLocked(); + // Send pending picks to child policy. PendingPick* pp; while ((pp = pending_picks_)) { pending_picks_ = pp->next; if (grpc_lb_xds_trace.enabled()) { - gpr_log(GPR_INFO, - "[xdslb %p] Pending pick about to (async) PICK from RR %p", this, - rr_policy_.get()); + gpr_log( + GPR_INFO, + "[xdslb %p] Pending pick about to (async) PICK from child policy %p", + this, child_policy_.get()); } grpc_error* error = GRPC_ERROR_NONE; - PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error); + PickFromChildPolicyLocked(true /* force_async */, pp, &error); } } -grpc_channel_args* XdsLb::CreateRoundRobinPolicyArgsLocked() { - grpc_lb_addresses* addresses; +grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { bool is_backend_from_grpclb_load_balancer = false; // This should never be invoked if we do not have serverlist_, as fallback // mode is disabled for xDS plugin. GPR_ASSERT(serverlist_ != nullptr); GPR_ASSERT(serverlist_->num_servers > 0); - addresses = ProcessServerlist(serverlist_); - is_backend_from_grpclb_load_balancer = true; + UniquePtr<ServerAddressList> addresses = ProcessServerlist(serverlist_); GPR_ASSERT(addresses != nullptr); - // Replace the LB addresses in the channel args that we pass down to + is_backend_from_grpclb_load_balancer = true; + // Replace the server address list in the channel args that we pass down to // the subchannel. - static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; + static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST}; const grpc_arg args_to_add[] = { - grpc_lb_addresses_create_channel_arg(addresses), + CreateServerAddressListChannelArg(addresses.get()), // A channel arg indicating if the target is a backend inferred from a // grpclb load balancer. grpc_channel_arg_integer_create( @@ -1660,70 +1508,71 @@ grpc_channel_args* XdsLb::CreateRoundRobinPolicyArgsLocked() { grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, GPR_ARRAY_SIZE(args_to_add)); - grpc_lb_addresses_destroy(addresses); return args; } -void XdsLb::CreateOrUpdateRoundRobinPolicyLocked() { +void XdsLb::CreateOrUpdateChildPolicyLocked() { if (shutting_down_) return; - grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked(); + grpc_channel_args* args = CreateChildPolicyArgsLocked(); GPR_ASSERT(args != nullptr); - if (rr_policy_ != nullptr) { + if (child_policy_ != nullptr) { if (grpc_lb_xds_trace.enabled()) { - gpr_log(GPR_INFO, "[xdslb %p] Updating RR policy %p", this, - rr_policy_.get()); + gpr_log(GPR_INFO, "[xdslb %p] Updating the child policy %p", this, + child_policy_.get()); } - rr_policy_->UpdateLocked(*args); + // TODO(vishalpowar): Pass the correct LB config. + child_policy_->UpdateLocked(*args, nullptr); } else { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = combiner(); lb_policy_args.client_channel_factory = client_channel_factory(); lb_policy_args.args = args; - CreateRoundRobinPolicyLocked(lb_policy_args); + CreateChildPolicyLocked(lb_policy_args); if (grpc_lb_xds_trace.enabled()) { - gpr_log(GPR_INFO, "[xdslb %p] Created new RR policy %p", this, - rr_policy_.get()); + gpr_log(GPR_INFO, "[xdslb %p] Created a new child policy %p", this, + child_policy_.get()); } } grpc_channel_args_destroy(args); } -void XdsLb::OnRoundRobinRequestReresolutionLocked(void* arg, - grpc_error* error) { +void XdsLb::OnChildPolicyRequestReresolutionLocked(void* arg, + grpc_error* error) { XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); if (xdslb_policy->shutting_down_ || error != GRPC_ERROR_NONE) { - xdslb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested"); + xdslb_policy->Unref(DEBUG_LOCATION, "on_child_reresolution_requested"); return; } if (grpc_lb_xds_trace.enabled()) { - gpr_log( - GPR_INFO, - "[xdslb %p] Re-resolution requested from the internal RR policy (%p).", - xdslb_policy, xdslb_policy->rr_policy_.get()); + gpr_log(GPR_INFO, + "[xdslb %p] Re-resolution requested from child policy " + "(%p).", + xdslb_policy, xdslb_policy->child_policy_.get()); } // If we are talking to a balancer, we expect to get updated addresses form - // the balancer, so we can ignore the re-resolution request from the RR - // policy. Otherwise, handle the re-resolution request using the - // grpclb policy's original re-resolution closure. + // the balancer, so we can ignore the re-resolution request from the child + // policy. + // Otherwise, handle the re-resolution request using the xds policy's + // original re-resolution closure. if (xdslb_policy->lb_calld_ == nullptr || !xdslb_policy->lb_calld_->seen_initial_response()) { xdslb_policy->TryReresolutionLocked(&grpc_lb_xds_trace, GRPC_ERROR_NONE); } - // Give back the wrapper closure to the RR policy. - xdslb_policy->rr_policy_->SetReresolutionClosureLocked( - &xdslb_policy->on_rr_request_reresolution_); + // Give back the wrapper closure to the child policy. + xdslb_policy->child_policy_->SetReresolutionClosureLocked( + &xdslb_policy->on_child_request_reresolution_); } -void XdsLb::UpdateConnectivityStateFromRoundRobinPolicyLocked( - grpc_error* rr_state_error) { +void XdsLb::UpdateConnectivityStateFromChildPolicyLocked( + grpc_error* child_state_error) { const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(&state_tracker_); /* The new connectivity status is a function of the previous one and the new - * input coming from the status of the RR policy. + * input coming from the status of the child policy. * - * current state (grpclb's) + * current state (xds's) * | - * v || I | C | R | TF | SD | <- new state (RR's) + * v || I | C | R | TF | SD | <- new state (child policy's) * ===++====+=====+=====+======+======+ * I || I | C | R | [I] | [I] | * ---++----+-----+-----+------+------+ @@ -1736,52 +1585,51 @@ void XdsLb::UpdateConnectivityStateFromRoundRobinPolicyLocked( * SD || NA | NA | NA | NA | NA | (*) * ---++----+-----+-----+------+------+ * - * A [STATE] indicates that the old RR policy is kept. In those cases, STATE - * is the current state of grpclb, which is left untouched. + * A [STATE] indicates that the old child policy is kept. In those cases, + * STATE is the current state of xds, which is left untouched. * * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to - * the previous RR instance. + * the previous child policy instance. * * Note that the status is never updated to SHUTDOWN as a result of calling * this function. Only glb_shutdown() has the power to set that state. * * (*) This function mustn't be called during shutting down. */ GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); - switch (rr_connectivity_state_) { + switch (child_connectivity_state_) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_SHUTDOWN: - GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); + GPR_ASSERT(child_state_error != GRPC_ERROR_NONE); break; case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_READY: - GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); + GPR_ASSERT(child_state_error == GRPC_ERROR_NONE); } if (grpc_lb_xds_trace.enabled()) { - gpr_log( - GPR_INFO, - "[xdslb %p] Setting grpclb's state to %s from new RR policy %p state.", - this, grpc_connectivity_state_name(rr_connectivity_state_), - rr_policy_.get()); + gpr_log(GPR_INFO, + "[xdslb %p] Setting xds's state to %s from child policy %p state.", + this, grpc_connectivity_state_name(child_connectivity_state_), + child_policy_.get()); } - grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_, - rr_state_error, + grpc_connectivity_state_set(&state_tracker_, child_connectivity_state_, + child_state_error, "update_lb_connectivity_status_locked"); } -void XdsLb::OnRoundRobinConnectivityChangedLocked(void* arg, - grpc_error* error) { +void XdsLb::OnChildPolicyConnectivityChangedLocked(void* arg, + grpc_error* error) { XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); if (xdslb_policy->shutting_down_) { - xdslb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed"); + xdslb_policy->Unref(DEBUG_LOCATION, "on_child_connectivity_changed"); return; } - xdslb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked( + xdslb_policy->UpdateConnectivityStateFromChildPolicyLocked( GRPC_ERROR_REF(error)); - // Resubscribe. Reuse the "on_rr_connectivity_changed" ref. - xdslb_policy->rr_policy_->NotifyOnStateChangeLocked( - &xdslb_policy->rr_connectivity_state_, - &xdslb_policy->on_rr_connectivity_changed_); + // Resubscribe. Reuse the "on_child_connectivity_changed" ref. + xdslb_policy->child_policy_->NotifyOnStateChangeLocked( + &xdslb_policy->child_connectivity_state_, + &xdslb_policy->on_child_connectivity_changed_); } // @@ -1793,22 +1641,21 @@ class XdsFactory : public LoadBalancingPolicyFactory { OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( const LoadBalancingPolicy::Args& args) const override { /* Count the number of gRPC-LB addresses. There must be at least one. */ - const grpc_arg* arg = - grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { - return nullptr; - } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + const ServerAddressList* addresses = + FindServerAddressListChannelArg(args.args); + if (addresses == nullptr) return nullptr; + bool found_balancer_address = false; + for (size_t i = 0; i < addresses->size(); ++i) { + if ((*addresses)[i].IsBalancer()) { + found_balancer_address = true; + break; + } } - if (num_grpclb_addrs == 0) return nullptr; - return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(addresses, args)); + if (!found_balancer_address) return nullptr; + return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(args)); } - const char* name() const override { return "xds"; } + const char* name() const override { return kXds; } }; } // namespace |