diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/xds/xds.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/xds/xds.cc | 247 |
1 files changed, 59 insertions, 188 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index faedc0a919..3c25de2386 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -79,6 +79,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" @@ -116,7 +117,7 @@ namespace { class XdsLb : public LoadBalancingPolicy { public: - XdsLb(const grpc_lb_addresses* addresses, const Args& args); + explicit XdsLb(const Args& args); void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; @@ -156,9 +157,6 @@ class XdsLb : public LoadBalancingPolicy { // Our on_complete closure and the original one. grpc_closure on_complete; grpc_closure* original_on_complete; - // The LB token associated with the pick. This is set via user_data in - // the pick. - grpc_mdelem lb_token; // Stats for client-side load reporting. RefCountedPtr<XdsLbClientStats> client_stats; // Next pending pick. @@ -256,7 +254,7 @@ class XdsLb : public LoadBalancingPolicy { grpc_error* error); // Pending pick methods. - static void PendingPickSetMetadataAndContext(PendingPick* pp); + static void PendingPickCleanup(PendingPick* pp); PendingPick* PendingPickCreate(PickState* pick); void AddPendingPick(PendingPick* pp); static void OnPendingPickComplete(void* arg, grpc_error* error); @@ -319,7 +317,7 @@ class XdsLb : public LoadBalancingPolicy { // 0 means not using fallback. int lb_fallback_timeout_ms_ = 0; // The backend addresses from the resolver. - grpc_lb_addresses* fallback_backend_addresses_ = nullptr; + UniquePtr<ServerAddressList> fallback_backend_addresses_; // Fallback timer. bool fallback_timer_callback_pending_ = false; grpc_timer lb_fallback_timer_; @@ -339,47 +337,15 @@ class XdsLb : public LoadBalancingPolicy { // serverlist parsing code // -// vtable for LB tokens in grpc_lb_addresses -void* lb_token_copy(void* token) { - return token == nullptr - ? nullptr - : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload; -} -void lb_token_destroy(void* token) { - if (token != nullptr) { - GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token}); - } -} -int lb_token_cmp(void* token1, void* token2) { - if (token1 > token2) return 1; - if (token1 < token2) return -1; - return 0; -} -const grpc_lb_user_data_vtable lb_token_vtable = { - lb_token_copy, lb_token_destroy, lb_token_cmp}; - // Returns the backend addresses extracted from the given addresses. -grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) { - // First pass: count the number of backend addresses. - size_t num_backends = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) { - ++num_backends; +UniquePtr<ServerAddressList> ExtractBackendAddresses( + const ServerAddressList& addresses) { + auto backend_addresses = MakeUnique<ServerAddressList>(); + for (size_t i = 0; i < addresses.size(); ++i) { + if (!addresses[i].IsBalancer()) { + backend_addresses->emplace_back(addresses[i]); } } - // Second pass: actually populate the addresses and (empty) LB tokens. - grpc_lb_addresses* backend_addresses = - grpc_lb_addresses_create(num_backends, &lb_token_vtable); - size_t num_copied = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) continue; - const grpc_resolved_address* addr = &addresses->addresses[i].address; - grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, - addr->len, false /* is_balancer */, - nullptr /* balancer_name */, - (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); - ++num_copied; - } return backend_addresses; } @@ -429,56 +395,17 @@ void ParseServer(const xds_grpclb_server* server, grpc_resolved_address* addr) { } // Returns addresses extracted from \a serverlist. -grpc_lb_addresses* ProcessServerlist(const xds_grpclb_serverlist* serverlist) { - size_t num_valid = 0; - /* first pass: count how many are valid in order to allocate the necessary - * memory in a single block */ +UniquePtr<ServerAddressList> ProcessServerlist( + const xds_grpclb_serverlist* serverlist) { + auto addresses = MakeUnique<ServerAddressList>(); for (size_t i = 0; i < serverlist->num_servers; ++i) { - if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid; - } - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_valid, &lb_token_vtable); - /* second pass: actually populate the addresses and LB tokens (aka user data - * to the outside world) to be read by the child policy during its creation. - * Given that the validity tests are very cheap, they are performed again - * instead of marking the valid ones during the first pass, as this would - * incurr in an allocation due to the arbitrary number of server */ - size_t addr_idx = 0; - for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { - const xds_grpclb_server* server = serverlist->servers[sl_idx]; - if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue; - GPR_ASSERT(addr_idx < num_valid); - /* address processing */ + const xds_grpclb_server* server = serverlist->servers[i]; + if (!IsServerValid(serverlist->servers[i], i, false)) continue; grpc_resolved_address addr; ParseServer(server, &addr); - /* lb token processing */ - void* user_data; - if (server->has_load_balance_token) { - const size_t lb_token_max_length = - GPR_ARRAY_SIZE(server->load_balance_token); - const size_t lb_token_length = - strnlen(server->load_balance_token, lb_token_max_length); - grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( - server->load_balance_token, lb_token_length); - user_data = - (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr) - .payload; - } else { - char* uri = grpc_sockaddr_to_uri(&addr); - gpr_log(GPR_INFO, - "Missing LB token for backend address '%s'. The empty token will " - "be used instead", - uri); - gpr_free(uri); - user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; - } - grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, - false /* is_balancer */, - nullptr /* balancer_name */, user_data); - ++addr_idx; + addresses->emplace_back(addr, nullptr); } - GPR_ASSERT(addr_idx == num_valid); - return lb_addresses; + return addresses; } // @@ -789,8 +716,7 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( xds_grpclb_destroy_serverlist(xdslb_policy->serverlist_); } else { /* or dispose of the fallback */ - grpc_lb_addresses_destroy(xdslb_policy->fallback_backend_addresses_); - xdslb_policy->fallback_backend_addresses_ = nullptr; + xdslb_policy->fallback_backend_addresses_.reset(); if (xdslb_policy->fallback_timer_callback_pending_) { grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_); } @@ -876,31 +802,15 @@ void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked( // helper code for creating balancer channel // -grpc_lb_addresses* ExtractBalancerAddresses( - const grpc_lb_addresses* addresses) { - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; - } - // There must be at least one balancer address, or else the - // client_channel would not have chosen this LB policy. - GPR_ASSERT(num_grpclb_addrs > 0); - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_grpclb_addrs, nullptr); - size_t lb_addresses_idx = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) continue; - if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); +UniquePtr<ServerAddressList> ExtractBalancerAddresses( + const ServerAddressList& addresses) { + auto balancer_addresses = MakeUnique<ServerAddressList>(); + for (size_t i = 0; i < addresses.size(); ++i) { + if (addresses[i].IsBalancer()) { + balancer_addresses->emplace_back(addresses[i]); } - grpc_lb_addresses_set_address( - lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr, - addresses->addresses[i].address.len, false /* is balancer */, - addresses->addresses[i].balancer_name, nullptr /* user data */); } - GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx); - return lb_addresses; + return balancer_addresses; } /* Returns the channel args for the LB channel, used to create a bidirectional @@ -912,10 +822,11 @@ grpc_lb_addresses* ExtractBalancerAddresses( * above the grpclb policy. * - \a args: other args inherited from the xds policy. */ grpc_channel_args* BuildBalancerChannelArgs( - const grpc_lb_addresses* addresses, + const ServerAddressList& addresses, FakeResolverResponseGenerator* response_generator, const grpc_channel_args* args) { - grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses); + UniquePtr<ServerAddressList> balancer_addresses = + ExtractBalancerAddresses(addresses); // Channel args to remove. static const char* args_to_remove[] = { // LB policy name, since we want to use the default (pick_first) in @@ -933,7 +844,7 @@ grpc_channel_args* BuildBalancerChannelArgs( // is_balancer=true. We need the LB channel to return addresses with // is_balancer=false so that it does not wind up recursively using the // xds LB policy, as per the special case logic in client_channel.c. - GRPC_ARG_LB_ADDRESSES, + GRPC_ARG_SERVER_ADDRESS_LIST, // The fake resolver response generator, because we are replacing it // with the one from the xds policy, used to propagate updates to // the LB channel. @@ -949,10 +860,10 @@ grpc_channel_args* BuildBalancerChannelArgs( }; // Channel args to add. const grpc_arg args_to_add[] = { - // New LB addresses. + // New server address list. // Note that we pass these in both when creating the LB channel // and via the fake resolver. The latter is what actually gets used. - grpc_lb_addresses_create_channel_arg(lb_addresses), + CreateServerAddressListChannelArg(balancer_addresses.get()), // The fake resolver response generator, which we use to inject // address updates into the LB channel. grpc_core::FakeResolverResponseGenerator::MakeChannelArg( @@ -970,10 +881,7 @@ grpc_channel_args* BuildBalancerChannelArgs( args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add, GPR_ARRAY_SIZE(args_to_add)); // Make any necessary modifications for security. - new_args = grpc_lb_policy_xds_modify_lb_channel_args(new_args); - // Clean up. - grpc_lb_addresses_destroy(lb_addresses); - return new_args; + return grpc_lb_policy_xds_modify_lb_channel_args(new_args); } // @@ -981,8 +889,7 @@ grpc_channel_args* BuildBalancerChannelArgs( // // TODO(vishalpowar): Use lb_config in args to configure LB policy. -XdsLb::XdsLb(const grpc_lb_addresses* addresses, - const LoadBalancingPolicy::Args& args) +XdsLb::XdsLb(const LoadBalancingPolicy::Args& args) : LoadBalancingPolicy(args), response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()), lb_call_backoff_( @@ -1038,9 +945,6 @@ XdsLb::~XdsLb() { if (serverlist_ != nullptr) { xds_grpclb_destroy_serverlist(serverlist_); } - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } grpc_subchannel_index_unref(); } @@ -1088,7 +992,6 @@ void XdsLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { while ((pp = pending_picks_) != nullptr) { pending_picks_ = pp->next; pp->pick->on_complete = pp->original_on_complete; - pp->pick->user_data = nullptr; grpc_error* error = GRPC_ERROR_NONE; if (new_policy->PickLocked(pp->pick, &error)) { // Synchronous return; schedule closure. @@ -1241,21 +1144,16 @@ void XdsLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current, } void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { + const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); + if (addresses == nullptr) { // Ignore this update. gpr_log(GPR_ERROR, "[xdslb %p] No valid LB addresses channel arg in update, ignoring.", this); return; } - const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); // Update fallback address list. - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } - fallback_backend_addresses_ = ExtractBackendAddresses(addresses); + fallback_backend_addresses_ = ExtractBackendAddresses(*addresses); // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; @@ -1266,7 +1164,7 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); // Construct args for balancer channel. grpc_channel_args* lb_channel_args = - BuildBalancerChannelArgs(addresses, response_generator_.get(), &args); + BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args); // Create balancer channel if needed. if (lb_channel_ == nullptr) { char* uri_str; @@ -1457,37 +1355,15 @@ void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg, // PendingPick // -// Adds lb_token of selected subchannel (address) to the call's initial -// metadata. -grpc_error* AddLbTokenToInitialMetadata( - grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage, - grpc_metadata_batch* initial_metadata) { - GPR_ASSERT(lb_token_mdelem_storage != nullptr); - GPR_ASSERT(!GRPC_MDISNULL(lb_token)); - return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, - lb_token); -} - // Destroy function used when embedding client stats in call context. void DestroyClientStats(void* arg) { static_cast<XdsLbClientStats*>(arg)->Unref(); } -void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) { - /* if connected_subchannel is nullptr, no pick has been made by the - * child policy (e.g., all addresses failed to connect). There won't be any - * user_data/token available */ +void XdsLb::PendingPickCleanup(PendingPick* pp) { + // If connected_subchannel is nullptr, no pick has been made by the + // child policy (e.g., all addresses failed to connect). if (pp->pick->connected_subchannel != nullptr) { - if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) { - AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token), - &pp->pick->lb_token_mdelem_storage, - pp->pick->initial_metadata); - } else { - gpr_log(GPR_ERROR, - "[xdslb %p] No LB token for connected subchannel pick %p", - pp->xdslb_policy, pp->pick); - abort(); - } // Pass on client stats via context. Passes ownership of the reference. if (pp->client_stats != nullptr) { pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = @@ -1505,7 +1381,7 @@ void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) { * order to unref the child policy instance upon its invocation */ void XdsLb::OnPendingPickComplete(void* arg, grpc_error* error) { PendingPick* pp = static_cast<PendingPick*>(arg); - PendingPickSetMetadataAndContext(pp); + PendingPickCleanup(pp); GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error)); Delete(pp); } @@ -1537,16 +1413,14 @@ void XdsLb::AddPendingPick(PendingPick* pp) { // completion callback even if the pick is available immediately. bool XdsLb::PickFromChildPolicyLocked(bool force_async, PendingPick* pp, grpc_error** error) { - // Set client_stats and user_data. + // Set client_stats. if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { pp->client_stats = lb_calld_->client_stats()->Ref(); } - GPR_ASSERT(pp->pick->user_data == nullptr); - pp->pick->user_data = (void**)&pp->lb_token; // Pick via the child policy. bool pick_done = child_policy_->PickLocked(pp->pick, error); if (pick_done) { - PendingPickSetMetadataAndContext(pp); + PendingPickCleanup(pp); if (force_async) { GRPC_CLOSURE_SCHED(pp->original_on_complete, *error); *error = GRPC_ERROR_NONE; @@ -1608,20 +1482,19 @@ void XdsLb::CreateChildPolicyLocked(const Args& args) { } grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { - grpc_lb_addresses* addresses; bool is_backend_from_grpclb_load_balancer = false; // This should never be invoked if we do not have serverlist_, as fallback // mode is disabled for xDS plugin. GPR_ASSERT(serverlist_ != nullptr); GPR_ASSERT(serverlist_->num_servers > 0); - addresses = ProcessServerlist(serverlist_); - is_backend_from_grpclb_load_balancer = true; + UniquePtr<ServerAddressList> addresses = ProcessServerlist(serverlist_); GPR_ASSERT(addresses != nullptr); - // Replace the LB addresses in the channel args that we pass down to + is_backend_from_grpclb_load_balancer = true; + // Replace the server address list in the channel args that we pass down to // the subchannel. - static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; + static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST}; const grpc_arg args_to_add[] = { - grpc_lb_addresses_create_channel_arg(addresses), + CreateServerAddressListChannelArg(addresses.get()), // A channel arg indicating if the target is a backend inferred from a // grpclb load balancer. grpc_channel_arg_integer_create( @@ -1631,7 +1504,6 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, GPR_ARRAY_SIZE(args_to_add)); - grpc_lb_addresses_destroy(addresses); return args; } @@ -1765,19 +1637,18 @@ class XdsFactory : public LoadBalancingPolicyFactory { OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( const LoadBalancingPolicy::Args& args) const override { /* Count the number of gRPC-LB addresses. There must be at least one. */ - const grpc_arg* arg = - grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { - return nullptr; - } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + const ServerAddressList* addresses = + FindServerAddressListChannelArg(args.args); + if (addresses == nullptr) return nullptr; + bool found_balancer_address = false; + for (size_t i = 0; i < addresses->size(); ++i) { + if ((*addresses)[i].IsBalancer()) { + found_balancer_address = true; + break; + } } - if (num_grpclb_addrs == 0) return nullptr; - return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(addresses, args)); + if (!found_balancer_address) return nullptr; + return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(args)); } const char* name() const override { return "xds_experimental"; } |