diff options
author | Mark D. Roth <roth@google.com> | 2018-12-07 12:41:51 -0800 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2018-12-07 12:41:51 -0800 |
commit | dedff37b4f569e888836b0cf92a9d6de2ddec326 (patch) | |
tree | f4640b9fd8893b735a790aa69e1a6bfbedcabc6f /src/core | |
parent | 680a3546815d564fca8f2abe0cae441e78c25798 (diff) |
Allow encoding arbitrary channel args on a per-address basis.
Diffstat (limited to 'src/core')
32 files changed, 618 insertions, 849 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index ebc412b468..70aac47231 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -38,6 +38,7 @@ #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/resolver_result_parsing.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/deadline/deadline_filter.h" #include "src/core/lib/backoff/backoff.h" @@ -62,6 +63,7 @@ #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/status_metadata.h" +using grpc_core::ServerAddressList; using grpc_core::internal::ClientChannelMethodParams; using grpc_core::internal::ClientChannelMethodParamsTable; using grpc_core::internal::ProcessedResolverResult; @@ -383,16 +385,10 @@ static void create_new_lb_policy_locked( static void maybe_add_trace_message_for_address_changes_locked( channel_data* chand, TraceStringVector* trace_strings) { - int resolution_contains_addresses = false; - const grpc_arg* channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); - if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p); - if (addresses->num_addresses > 0) { - resolution_contains_addresses = true; - } - } + const ServerAddressList* addresses = + grpc_core::FindServerAddressListChannelArg(chand->resolver_result); + const bool resolution_contains_addresses = + addresses != nullptr && addresses->size() > 0; if (!resolution_contains_addresses && chand->previous_resolution_contained_addresses) { trace_strings->push_back(gpr_strdup("Address list became empty")); diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 7034da6249..6b76fe5d5d 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -55,7 +55,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> { grpc_client_channel_factory* client_channel_factory = nullptr; /// Channel args from the resolver. /// Note that the LB policy gets the set of addresses from the - /// GRPC_ARG_LB_ADDRESSES channel arg. + /// GRPC_ARG_SERVER_ADDRESS_LIST channel arg. grpc_channel_args* args = nullptr; /// Load balancing config from the resolver. grpc_json* lb_config = nullptr; @@ -80,11 +80,6 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> { /// Will be populated with context to pass to the subchannel call, if /// needed. grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT] = {}; - /// Upon success, \a *user_data will be set to whatever opaque information - /// may need to be propagated from the LB policy, or nullptr if not needed. - // TODO(roth): As part of revamping our metadata APIs, try to find a - // way to clean this up and C++-ify it. - void** user_data = nullptr; /// Next pointer. For internal use by LB policy. PickState* next = nullptr; }; @@ -95,7 +90,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> { /// Updates the policy with a new set of \a args and a new \a lb_config from /// the resolver. Note that the LB policy gets the set of addresses from the - /// GRPC_ARG_LB_ADDRESSES channel arg. + /// GRPC_ARG_SERVER_ADDRESS_LIST channel arg. virtual void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) GRPC_ABSTRACT; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index a46579c7f7..a9a5965ed1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -84,6 +84,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" @@ -113,6 +114,8 @@ #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000 +#define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token" + namespace grpc_core { TraceFlag grpc_lb_glb_trace(false, "glb"); @@ -121,7 +124,7 @@ namespace { class GrpcLb : public LoadBalancingPolicy { public: - GrpcLb(const grpc_lb_addresses* addresses, const Args& args); + explicit GrpcLb(const Args& args); void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; @@ -161,9 +164,6 @@ class GrpcLb : public LoadBalancingPolicy { // Our on_complete closure and the original one. grpc_closure on_complete; grpc_closure* original_on_complete; - // The LB token associated with the pick. This is set via user_data in - // the pick. - grpc_mdelem lb_token; // Stats for client-side load reporting. RefCountedPtr<GrpcLbClientStats> client_stats; // Next pending pick. @@ -329,7 +329,7 @@ class GrpcLb : public LoadBalancingPolicy { // 0 means not using fallback. int lb_fallback_timeout_ms_ = 0; // The backend addresses from the resolver. - grpc_lb_addresses* fallback_backend_addresses_ = nullptr; + UniquePtr<ServerAddressList> fallback_backend_addresses_; // Fallback timer. bool fallback_timer_callback_pending_ = false; grpc_timer lb_fallback_timer_; @@ -349,7 +349,7 @@ class GrpcLb : public LoadBalancingPolicy { // serverlist parsing code // -// vtable for LB tokens in grpc_lb_addresses +// vtable for LB token channel arg. void* lb_token_copy(void* token) { return token == nullptr ? nullptr @@ -361,38 +361,11 @@ void lb_token_destroy(void* token) { } } int lb_token_cmp(void* token1, void* token2) { - if (token1 > token2) return 1; - if (token1 < token2) return -1; - return 0; + return GPR_ICMP(token1, token2); } -const grpc_lb_user_data_vtable lb_token_vtable = { +const grpc_arg_pointer_vtable lb_token_arg_vtable = { lb_token_copy, lb_token_destroy, lb_token_cmp}; -// Returns the backend addresses extracted from the given addresses. -grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) { - // First pass: count the number of backend addresses. - size_t num_backends = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) { - ++num_backends; - } - } - // Second pass: actually populate the addresses and (empty) LB tokens. - grpc_lb_addresses* backend_addresses = - grpc_lb_addresses_create(num_backends, &lb_token_vtable); - size_t num_copied = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) continue; - const grpc_resolved_address* addr = &addresses->addresses[i].address; - grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, - addr->len, false /* is_balancer */, - nullptr /* balancer_name */, - (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); - ++num_copied; - } - return backend_addresses; -} - bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) { if (server->drop) return false; const grpc_grpclb_ip_address* ip = &server->ip_address; @@ -440,30 +413,16 @@ void ParseServer(const grpc_grpclb_server* server, } // Returns addresses extracted from \a serverlist. -grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { - size_t num_valid = 0; - /* first pass: count how many are valid in order to allocate the necessary - * memory in a single block */ +ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { + ServerAddressList addresses; for (size_t i = 0; i < serverlist->num_servers; ++i) { - if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid; - } - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_valid, &lb_token_vtable); - /* second pass: actually populate the addresses and LB tokens (aka user data - * to the outside world) to be read by the RR policy during its creation. - * Given that the validity tests are very cheap, they are performed again - * instead of marking the valid ones during the first pass, as this would - * incurr in an allocation due to the arbitrary number of server */ - size_t addr_idx = 0; - for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { - const grpc_grpclb_server* server = serverlist->servers[sl_idx]; - if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue; - GPR_ASSERT(addr_idx < num_valid); - /* address processing */ + const grpc_grpclb_server* server = serverlist->servers[i]; + if (!IsServerValid(serverlist->servers[i], i, false)) continue; + // Address processing. grpc_resolved_address addr; ParseServer(server, &addr); - /* lb token processing */ - void* user_data; + // LB token processing. + void* lb_token; if (server->has_load_balance_token) { const size_t lb_token_max_length = GPR_ARRAY_SIZE(server->load_balance_token); @@ -471,7 +430,7 @@ grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { strnlen(server->load_balance_token, lb_token_max_length); grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( server->load_balance_token, lb_token_length); - user_data = + lb_token = (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr) .payload; } else { @@ -481,15 +440,16 @@ grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { "be used instead", uri); gpr_free(uri); - user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; + lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; } - grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, - false /* is_balancer */, - nullptr /* balancer_name */, user_data); - ++addr_idx; - } - GPR_ASSERT(addr_idx == num_valid); - return lb_addresses; + // Add address. + grpc_arg arg = grpc_channel_arg_pointer_create( + const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token, + &lb_token_arg_vtable); + grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); + addresses.emplace_back(addr, args); + } + return addresses; } // @@ -829,8 +789,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_); } else { // Dispose of the fallback. - grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_); - grpclb_policy->fallback_backend_addresses_ = nullptr; + grpclb_policy->fallback_backend_addresses_.reset(); if (grpclb_policy->fallback_timer_callback_pending_) { grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); } @@ -910,31 +869,25 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( // helper code for creating balancer channel // -grpc_lb_addresses* ExtractBalancerAddresses( - const grpc_lb_addresses* addresses) { - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; - } - // There must be at least one balancer address, or else the - // client_channel would not have chosen this LB policy. - GPR_ASSERT(num_grpclb_addrs > 0); - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_grpclb_addrs, nullptr); - size_t lb_addresses_idx = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) continue; - if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); +ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) { + ServerAddressList balancer_addresses; + for (size_t i = 0; i < addresses.size(); ++i) { + if (addresses[i].IsBalancer()) { + // Strip out the is_balancer channel arg, since we don't want to + // recursively use the grpclb policy in the channel used to talk to + // the balancers. Note that we do NOT strip out the balancer_name + // channel arg, since we need that to set the authority correctly + // to talk to the balancers. + static const char* args_to_remove[] = { + GRPC_ARG_ADDRESS_IS_BALANCER, + }; + balancer_addresses.emplace_back( + addresses[i].address(), + grpc_channel_args_copy_and_remove(addresses[i].args(), args_to_remove, + GPR_ARRAY_SIZE(args_to_remove))); } - grpc_lb_addresses_set_address( - lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr, - addresses->addresses[i].address.len, false /* is balancer */, - addresses->addresses[i].balancer_name, nullptr /* user data */); } - GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx); - return lb_addresses; + return balancer_addresses; } /* Returns the channel args for the LB channel, used to create a bidirectional @@ -946,10 +899,10 @@ grpc_lb_addresses* ExtractBalancerAddresses( * above the grpclb policy. * - \a args: other args inherited from the grpclb policy. */ grpc_channel_args* BuildBalancerChannelArgs( - const grpc_lb_addresses* addresses, + const ServerAddressList& addresses, FakeResolverResponseGenerator* response_generator, const grpc_channel_args* args) { - grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses); + ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses); // Channel args to remove. static const char* args_to_remove[] = { // LB policy name, since we want to use the default (pick_first) in @@ -967,7 +920,7 @@ grpc_channel_args* BuildBalancerChannelArgs( // is_balancer=true. We need the LB channel to return addresses with // is_balancer=false so that it does not wind up recursively using the // grpclb LB policy, as per the special case logic in client_channel.c. - GRPC_ARG_LB_ADDRESSES, + GRPC_ARG_SERVER_ADDRESS_LIST, // The fake resolver response generator, because we are replacing it // with the one from the grpclb policy, used to propagate updates to // the LB channel. @@ -983,10 +936,10 @@ grpc_channel_args* BuildBalancerChannelArgs( }; // Channel args to add. const grpc_arg args_to_add[] = { - // New LB addresses. + // New address list. // Note that we pass these in both when creating the LB channel // and via the fake resolver. The latter is what actually gets used. - grpc_lb_addresses_create_channel_arg(lb_addresses), + CreateServerAddressListChannelArg(&balancer_addresses), // The fake resolver response generator, which we use to inject // address updates into the LB channel. grpc_core::FakeResolverResponseGenerator::MakeChannelArg( @@ -1004,18 +957,14 @@ grpc_channel_args* BuildBalancerChannelArgs( args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add, GPR_ARRAY_SIZE(args_to_add)); // Make any necessary modifications for security. - new_args = grpc_lb_policy_grpclb_modify_lb_channel_args(new_args); - // Clean up. - grpc_lb_addresses_destroy(lb_addresses); - return new_args; + return grpc_lb_policy_grpclb_modify_lb_channel_args(new_args); } // // ctor and dtor // -GrpcLb::GrpcLb(const grpc_lb_addresses* addresses, - const LoadBalancingPolicy::Args& args) +GrpcLb::GrpcLb(const LoadBalancingPolicy::Args& args) : LoadBalancingPolicy(args), response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()), lb_call_backoff_( @@ -1072,9 +1021,6 @@ GrpcLb::~GrpcLb() { if (serverlist_ != nullptr) { grpc_grpclb_destroy_serverlist(serverlist_); } - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } grpc_subchannel_index_unref(); } @@ -1122,7 +1068,6 @@ void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { while ((pp = pending_picks_) != nullptr) { pending_picks_ = pp->next; pp->pick->on_complete = pp->original_on_complete; - pp->pick->user_data = nullptr; grpc_error* error = GRPC_ERROR_NONE; if (new_policy->PickLocked(pp->pick, &error)) { // Synchronous return; schedule closure. @@ -1276,9 +1221,27 @@ void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current, notify); } +// Returns the backend addresses extracted from the given addresses. +UniquePtr<ServerAddressList> ExtractBackendAddresses( + const ServerAddressList& addresses) { + void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; + grpc_arg arg = grpc_channel_arg_pointer_create( + const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token, + &lb_token_arg_vtable); + auto backend_addresses = MakeUnique<ServerAddressList>(); + for (size_t i = 0; i < addresses.size(); ++i) { + if (!addresses[i].IsBalancer()) { + backend_addresses->emplace_back( + addresses[i].address(), + grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1)); + } + } + return backend_addresses; +} + void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { + const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); + if (addresses == nullptr) { // Ignore this update. gpr_log( GPR_ERROR, @@ -1286,13 +1249,8 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { this); return; } - const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); // Update fallback address list. - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } - fallback_backend_addresses_ = ExtractBackendAddresses(addresses); + fallback_backend_addresses_ = ExtractBackendAddresses(*addresses); // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; @@ -1303,7 +1261,7 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); // Construct args for balancer channel. grpc_channel_args* lb_channel_args = - BuildBalancerChannelArgs(addresses, response_generator_.get(), &args); + BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args); // Create balancer channel if needed. if (lb_channel_ == nullptr) { char* uri_str; @@ -1509,12 +1467,17 @@ void DestroyClientStats(void* arg) { } void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) { - /* if connected_subchannel is nullptr, no pick has been made by the RR - * policy (e.g., all addresses failed to connect). There won't be any - * user_data/token available */ + // If connected_subchannel is nullptr, no pick has been made by the RR + // policy (e.g., all addresses failed to connect). There won't be any + // LB token available. if (pp->pick->connected_subchannel != nullptr) { - if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) { - AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token), + const grpc_arg* arg = + grpc_channel_args_find(pp->pick->connected_subchannel->args(), + GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN); + if (arg != nullptr) { + grpc_mdelem lb_token = { + reinterpret_cast<uintptr_t>(arg->value.pointer.p)}; + AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(lb_token), &pp->pick->lb_token_mdelem_storage, pp->pick->initial_metadata); } else { @@ -1598,12 +1561,10 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, return true; } } - // Set client_stats and user_data. + // Set client_stats. if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { pp->client_stats = lb_calld_->client_stats()->Ref(); } - GPR_ASSERT(pp->pick->user_data == nullptr); - pp->pick->user_data = (void**)&pp->lb_token; // Pick via the RR policy. bool pick_done = rr_policy_->PickLocked(pp->pick, error); if (pick_done) { @@ -1668,10 +1629,11 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) { } grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { - grpc_lb_addresses* addresses; + ServerAddressList tmp_addresses; + ServerAddressList* addresses = &tmp_addresses; bool is_backend_from_grpclb_load_balancer = false; if (serverlist_ != nullptr) { - addresses = ProcessServerlist(serverlist_); + tmp_addresses = ProcessServerlist(serverlist_); is_backend_from_grpclb_load_balancer = true; } else { // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't @@ -1680,14 +1642,14 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { // empty, in which case the new round_robin policy will keep the requested // picks pending. GPR_ASSERT(fallback_backend_addresses_ != nullptr); - addresses = grpc_lb_addresses_copy(fallback_backend_addresses_); + addresses = fallback_backend_addresses_.get(); } GPR_ASSERT(addresses != nullptr); - // Replace the LB addresses in the channel args that we pass down to + // Replace the server address list in the channel args that we pass down to // the subchannel. - static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; + static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST}; grpc_arg args_to_add[3] = { - grpc_lb_addresses_create_channel_arg(addresses), + CreateServerAddressListChannelArg(addresses), // A channel arg indicating if the target is a backend inferred from a // grpclb load balancer. grpc_channel_arg_integer_create( @@ -1704,7 +1666,6 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, num_args_to_add); - grpc_lb_addresses_destroy(addresses); return args; } @@ -1837,19 +1798,18 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory { OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( const LoadBalancingPolicy::Args& args) const override { /* Count the number of gRPC-LB addresses. There must be at least one. */ - const grpc_arg* arg = - grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { - return nullptr; - } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + const ServerAddressList* addresses = + FindServerAddressListChannelArg(args.args); + if (addresses == nullptr) return nullptr; + bool found_balancer = false; + for (size_t i = 0; i < addresses->size(); ++i) { + if ((*addresses)[i].IsBalancer()) { + found_balancer = true; + break; + } } - if (num_grpclb_addrs == 0) return nullptr; - return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args)); + if (!found_balancer) return nullptr; + return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(args)); } const char* name() const override { return "grpclb"; } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h index 825065a9c3..3b2dc370eb 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h @@ -21,7 +21,7 @@ #include <grpc/support/port_platform.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include <grpc/impl/codegen/grpc_types.h> /// Makes any necessary modifications to \a args for use in the grpclb /// balancer channel. diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc index 441efd5e23..6e8fbdcab7 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc @@ -26,6 +26,7 @@ #include <grpc/support/string_util.h> #include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -42,22 +43,23 @@ int BalancerNameCmp(const grpc_core::UniquePtr<char>& a, } RefCountedPtr<TargetAuthorityTable> CreateTargetAuthorityTable( - grpc_lb_addresses* addresses) { + const ServerAddressList& addresses) { TargetAuthorityTable::Entry* target_authority_entries = - static_cast<TargetAuthorityTable::Entry*>(gpr_zalloc( - sizeof(*target_authority_entries) * addresses->num_addresses)); - for (size_t i = 0; i < addresses->num_addresses; ++i) { + static_cast<TargetAuthorityTable::Entry*>( + gpr_zalloc(sizeof(*target_authority_entries) * addresses.size())); + for (size_t i = 0; i < addresses.size(); ++i) { char* addr_str; - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_str, &addresses->addresses[i].address, true) > 0); + GPR_ASSERT( + grpc_sockaddr_to_string(&addr_str, &addresses[i].address(), true) > 0); target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str); - target_authority_entries[i].value.reset( - gpr_strdup(addresses->addresses[i].balancer_name)); gpr_free(addr_str); + char* balancer_name = grpc_channel_arg_get_string(grpc_channel_args_find( + addresses[i].args(), GRPC_ARG_ADDRESS_BALANCER_NAME)); + target_authority_entries[i].value.reset(gpr_strdup(balancer_name)); } RefCountedPtr<TargetAuthorityTable> target_authority_table = - TargetAuthorityTable::Create(addresses->num_addresses, - target_authority_entries, BalancerNameCmp); + TargetAuthorityTable::Create(addresses.size(), target_authority_entries, + BalancerNameCmp); gpr_free(target_authority_entries); return target_authority_table; } @@ -72,13 +74,12 @@ grpc_channel_args* grpc_lb_policy_grpclb_modify_lb_channel_args( grpc_arg args_to_add[2]; size_t num_args_to_add = 0; // Add arg for targets info table. - const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_LB_ADDRESSES); - GPR_ASSERT(arg != nullptr); - GPR_ASSERT(arg->type == GRPC_ARG_POINTER); - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); + grpc_core::ServerAddressList* addresses = + grpc_core::FindServerAddressListChannelArg(args); + GPR_ASSERT(addresses != nullptr); grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable> - target_authority_table = grpc_core::CreateTargetAuthorityTable(addresses); + target_authority_table = + grpc_core::CreateTargetAuthorityTable(*addresses); args_to_add[num_args_to_add++] = grpc_core::CreateTargetAuthorityTableChannelArg( target_authority_table.get()); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index 9ca7b28d8e..71d371c880 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -25,7 +25,7 @@ #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/lib/iomgr/exec_ctx.h" #define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128 diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index d1a05f1255..74c17612a2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -24,6 +24,7 @@ #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" @@ -75,11 +76,9 @@ class PickFirst : public LoadBalancingPolicy { PickFirstSubchannelData( SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, grpc_subchannel* subchannel, + const ServerAddress& address, grpc_subchannel* subchannel, grpc_combiner* combiner) - : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, - combiner) {} + : SubchannelData(subchannel_list, address, subchannel, combiner) {} void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state, grpc_error* error) override; @@ -95,7 +94,7 @@ class PickFirst : public LoadBalancingPolicy { PickFirstSubchannelData> { public: PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer, - const grpc_lb_addresses* addresses, + const ServerAddressList& addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, const grpc_channel_args& args) @@ -337,8 +336,8 @@ void PickFirst::UpdateChildRefsLocked() { void PickFirst::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { AutoChildRefsUpdater guard(this); - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { + const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); + if (addresses == nullptr) { if (subchannel_list_ == nullptr) { // If we don't have a current subchannel list, go into TRANSIENT FAILURE. grpc_connectivity_state_set( @@ -354,19 +353,17 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, } return; } - const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p received update with %" PRIuPTR " addresses", this, - addresses->num_addresses); + addresses->size()); } grpc_arg new_arg = grpc_channel_arg_integer_create( const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1); grpc_channel_args* new_args = grpc_channel_args_copy_and_add(&args, &new_arg, 1); auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>( - this, &grpc_lb_pick_first_trace, addresses, combiner(), + this, &grpc_lb_pick_first_trace, *addresses, combiner(), client_channel_factory(), *new_args); grpc_channel_args_destroy(new_args); if (subchannel_list->num_subchannels() == 0) { diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 2a16975131..63089afbd7 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -82,8 +82,6 @@ class RoundRobin : public LoadBalancingPolicy { // Data for a particular subchannel in a subchannel list. // This subclass adds the following functionality: - // - Tracks user_data associated with each address, which will be - // returned along with picks that select the subchannel. // - Tracks the previous connectivity state of the subchannel, so that // we know how many subchannels are in each state. class RoundRobinSubchannelData @@ -93,26 +91,9 @@ class RoundRobin : public LoadBalancingPolicy { RoundRobinSubchannelData( SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, grpc_subchannel* subchannel, + const ServerAddress& address, grpc_subchannel* subchannel, grpc_combiner* combiner) - : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, - combiner), - user_data_vtable_(user_data_vtable), - user_data_(user_data_vtable_ != nullptr - ? user_data_vtable_->copy(address.user_data) - : nullptr) {} - - void UnrefSubchannelLocked(const char* reason) override { - SubchannelData::UnrefSubchannelLocked(reason); - if (user_data_ != nullptr) { - GPR_ASSERT(user_data_vtable_ != nullptr); - user_data_vtable_->destroy(user_data_); - user_data_ = nullptr; - } - } - - void* user_data() const { return user_data_; } + : SubchannelData(subchannel_list, address, subchannel, combiner) {} grpc_connectivity_state connectivity_state() const { return last_connectivity_state_; @@ -125,8 +106,6 @@ class RoundRobin : public LoadBalancingPolicy { void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state, grpc_error* error) override; - const grpc_lb_user_data_vtable* user_data_vtable_; - void* user_data_ = nullptr; grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE; }; @@ -137,7 +116,7 @@ class RoundRobin : public LoadBalancingPolicy { public: RoundRobinSubchannelList( RoundRobin* policy, TraceFlag* tracer, - const grpc_lb_addresses* addresses, grpc_combiner* combiner, + const ServerAddressList& addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, const grpc_channel_args& args) : SubchannelList(policy, tracer, addresses, combiner, @@ -354,9 +333,6 @@ bool RoundRobin::DoPickLocked(PickState* pick) { subchannel_list_->subchannel(next_ready_index); GPR_ASSERT(sd->connected_subchannel() != nullptr); pick->connected_subchannel = sd->connected_subchannel()->Ref(); - if (pick->user_data != nullptr) { - *pick->user_data = sd->user_data(); - } if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " @@ -667,9 +643,9 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, void RoundRobin::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); AutoChildRefsUpdater guard(this); - if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { + const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); + if (addresses == nullptr) { gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this); // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. // Otherwise, keep using the current subchannel list (ignore this update). @@ -681,11 +657,9 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args, } return; } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", - this, addresses->num_addresses); + this, addresses->size()); } // Replace latest_pending_subchannel_list_. if (latest_pending_subchannel_list_ != nullptr) { @@ -696,7 +670,7 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args, } } latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>( - this, &grpc_lb_round_robin_trace, addresses, combiner(), + this, &grpc_lb_round_robin_trace, *addresses, combiner(), client_channel_factory(), args); // If we haven't started picking yet or the new list is empty, // immediately promote the new list to the current list. diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index f31401502c..6f31a643c1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -26,6 +26,7 @@ #include <grpc/support/alloc.h> #include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" @@ -141,8 +142,7 @@ class SubchannelData { protected: SubchannelData( SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, grpc_subchannel* subchannel, + const ServerAddress& address, grpc_subchannel* subchannel, grpc_combiner* combiner); virtual ~SubchannelData(); @@ -156,9 +156,8 @@ class SubchannelData { grpc_connectivity_state connectivity_state, grpc_error* error) GRPC_ABSTRACT; - // Unrefs the subchannel. May be overridden by subclasses that need - // to perform extra cleanup when unreffing the subchannel. - virtual void UnrefSubchannelLocked(const char* reason); + // Unrefs the subchannel. + void UnrefSubchannelLocked(const char* reason); private: // Updates connected_subchannel_ based on pending_connectivity_state_unsafe_. @@ -232,7 +231,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> { protected: SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer, - const grpc_lb_addresses* addresses, grpc_combiner* combiner, + const ServerAddressList& addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, const grpc_channel_args& args); @@ -277,8 +276,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> { template <typename SubchannelListType, typename SubchannelDataType> SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData( SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, grpc_subchannel* subchannel, + const ServerAddress& address, grpc_subchannel* subchannel, grpc_combiner* combiner) : subchannel_list_(subchannel_list), subchannel_(subchannel), @@ -488,7 +486,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() { template <typename SubchannelListType, typename SubchannelDataType> SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( LoadBalancingPolicy* policy, TraceFlag* tracer, - const grpc_lb_addresses* addresses, grpc_combiner* combiner, + const ServerAddressList& addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, const grpc_channel_args& args) : InternallyRefCounted<SubchannelListType>(tracer), @@ -498,9 +496,9 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( if (tracer_->enabled()) { gpr_log(GPR_INFO, "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", - tracer_->name(), policy, this, addresses->num_addresses); + tracer_->name(), policy, this, addresses.size()); } - subchannels_.reserve(addresses->num_addresses); + subchannels_.reserve(addresses.size()); // We need to remove the LB addresses in order to be able to compare the // subchannel keys of subchannels from a different batch of addresses. // We also remove the inhibit-health-checking arg, since we are @@ -508,19 +506,27 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( inhibit_health_checking_ = grpc_channel_arg_get_bool( grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false); static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, - GRPC_ARG_LB_ADDRESSES, + GRPC_ARG_SERVER_ADDRESS_LIST, GRPC_ARG_INHIBIT_HEALTH_CHECKING}; // Create a subchannel for each address. grpc_subchannel_args sc_args; - for (size_t i = 0; i < addresses->num_addresses; i++) { - // If there were any balancer, we would have chosen grpclb policy instead. - GPR_ASSERT(!addresses->addresses[i].is_balancer); + for (size_t i = 0; i < addresses.size(); i++) { + // If there were any balancer addresses, we would have chosen grpclb + // policy, which does not use a SubchannelList. + GPR_ASSERT(!addresses[i].IsBalancer()); memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - grpc_arg addr_arg = - grpc_create_subchannel_address_arg(&addresses->addresses[i].address); + InlinedVector<grpc_arg, 4> args_to_add; + args_to_add.emplace_back( + grpc_create_subchannel_address_arg(&addresses[i].address())); + if (addresses[i].args() != nullptr) { + for (size_t j = 0; j < addresses[i].args()->num_args; ++j) { + args_to_add.emplace_back(addresses[i].args()->args[j]); + } + } grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( - &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1); - gpr_free(addr_arg.value.string); + &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), + args_to_add.data(), args_to_add.size()); + gpr_free(args_to_add[0].value.string); sc_args.args = new_args; grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( client_channel_factory, &sc_args); @@ -528,8 +534,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( if (subchannel == nullptr) { // Subchannel could not be created. if (tracer_->enabled()) { - char* address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); + char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address()); gpr_log(GPR_INFO, "[%s %p] could not create subchannel for address uri %s, " "ignoring", @@ -539,8 +544,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( continue; } if (tracer_->enabled()) { - char* address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); + char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address()); gpr_log(GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR ": Created subchannel %p for address uri %s", @@ -548,8 +552,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( address_uri); gpr_free(address_uri); } - subchannels_.emplace_back(this, addresses->user_data_vtable, - addresses->addresses[i], subchannel, combiner); + subchannels_.emplace_back(this, addresses[i], subchannel, combiner); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index faedc0a919..3c25de2386 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -79,6 +79,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" @@ -116,7 +117,7 @@ namespace { class XdsLb : public LoadBalancingPolicy { public: - XdsLb(const grpc_lb_addresses* addresses, const Args& args); + explicit XdsLb(const Args& args); void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; @@ -156,9 +157,6 @@ class XdsLb : public LoadBalancingPolicy { // Our on_complete closure and the original one. grpc_closure on_complete; grpc_closure* original_on_complete; - // The LB token associated with the pick. This is set via user_data in - // the pick. - grpc_mdelem lb_token; // Stats for client-side load reporting. RefCountedPtr<XdsLbClientStats> client_stats; // Next pending pick. @@ -256,7 +254,7 @@ class XdsLb : public LoadBalancingPolicy { grpc_error* error); // Pending pick methods. - static void PendingPickSetMetadataAndContext(PendingPick* pp); + static void PendingPickCleanup(PendingPick* pp); PendingPick* PendingPickCreate(PickState* pick); void AddPendingPick(PendingPick* pp); static void OnPendingPickComplete(void* arg, grpc_error* error); @@ -319,7 +317,7 @@ class XdsLb : public LoadBalancingPolicy { // 0 means not using fallback. int lb_fallback_timeout_ms_ = 0; // The backend addresses from the resolver. - grpc_lb_addresses* fallback_backend_addresses_ = nullptr; + UniquePtr<ServerAddressList> fallback_backend_addresses_; // Fallback timer. bool fallback_timer_callback_pending_ = false; grpc_timer lb_fallback_timer_; @@ -339,47 +337,15 @@ class XdsLb : public LoadBalancingPolicy { // serverlist parsing code // -// vtable for LB tokens in grpc_lb_addresses -void* lb_token_copy(void* token) { - return token == nullptr - ? nullptr - : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload; -} -void lb_token_destroy(void* token) { - if (token != nullptr) { - GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token}); - } -} -int lb_token_cmp(void* token1, void* token2) { - if (token1 > token2) return 1; - if (token1 < token2) return -1; - return 0; -} -const grpc_lb_user_data_vtable lb_token_vtable = { - lb_token_copy, lb_token_destroy, lb_token_cmp}; - // Returns the backend addresses extracted from the given addresses. -grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) { - // First pass: count the number of backend addresses. - size_t num_backends = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) { - ++num_backends; +UniquePtr<ServerAddressList> ExtractBackendAddresses( + const ServerAddressList& addresses) { + auto backend_addresses = MakeUnique<ServerAddressList>(); + for (size_t i = 0; i < addresses.size(); ++i) { + if (!addresses[i].IsBalancer()) { + backend_addresses->emplace_back(addresses[i]); } } - // Second pass: actually populate the addresses and (empty) LB tokens. - grpc_lb_addresses* backend_addresses = - grpc_lb_addresses_create(num_backends, &lb_token_vtable); - size_t num_copied = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) continue; - const grpc_resolved_address* addr = &addresses->addresses[i].address; - grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, - addr->len, false /* is_balancer */, - nullptr /* balancer_name */, - (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); - ++num_copied; - } return backend_addresses; } @@ -429,56 +395,17 @@ void ParseServer(const xds_grpclb_server* server, grpc_resolved_address* addr) { } // Returns addresses extracted from \a serverlist. -grpc_lb_addresses* ProcessServerlist(const xds_grpclb_serverlist* serverlist) { - size_t num_valid = 0; - /* first pass: count how many are valid in order to allocate the necessary - * memory in a single block */ +UniquePtr<ServerAddressList> ProcessServerlist( + const xds_grpclb_serverlist* serverlist) { + auto addresses = MakeUnique<ServerAddressList>(); for (size_t i = 0; i < serverlist->num_servers; ++i) { - if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid; - } - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_valid, &lb_token_vtable); - /* second pass: actually populate the addresses and LB tokens (aka user data - * to the outside world) to be read by the child policy during its creation. - * Given that the validity tests are very cheap, they are performed again - * instead of marking the valid ones during the first pass, as this would - * incurr in an allocation due to the arbitrary number of server */ - size_t addr_idx = 0; - for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { - const xds_grpclb_server* server = serverlist->servers[sl_idx]; - if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue; - GPR_ASSERT(addr_idx < num_valid); - /* address processing */ + const xds_grpclb_server* server = serverlist->servers[i]; + if (!IsServerValid(serverlist->servers[i], i, false)) continue; grpc_resolved_address addr; ParseServer(server, &addr); - /* lb token processing */ - void* user_data; - if (server->has_load_balance_token) { - const size_t lb_token_max_length = - GPR_ARRAY_SIZE(server->load_balance_token); - const size_t lb_token_length = - strnlen(server->load_balance_token, lb_token_max_length); - grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( - server->load_balance_token, lb_token_length); - user_data = - (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr) - .payload; - } else { - char* uri = grpc_sockaddr_to_uri(&addr); - gpr_log(GPR_INFO, - "Missing LB token for backend address '%s'. The empty token will " - "be used instead", - uri); - gpr_free(uri); - user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; - } - grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, - false /* is_balancer */, - nullptr /* balancer_name */, user_data); - ++addr_idx; + addresses->emplace_back(addr, nullptr); } - GPR_ASSERT(addr_idx == num_valid); - return lb_addresses; + return addresses; } // @@ -789,8 +716,7 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( xds_grpclb_destroy_serverlist(xdslb_policy->serverlist_); } else { /* or dispose of the fallback */ - grpc_lb_addresses_destroy(xdslb_policy->fallback_backend_addresses_); - xdslb_policy->fallback_backend_addresses_ = nullptr; + xdslb_policy->fallback_backend_addresses_.reset(); if (xdslb_policy->fallback_timer_callback_pending_) { grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_); } @@ -876,31 +802,15 @@ void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked( // helper code for creating balancer channel // -grpc_lb_addresses* ExtractBalancerAddresses( - const grpc_lb_addresses* addresses) { - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; - } - // There must be at least one balancer address, or else the - // client_channel would not have chosen this LB policy. - GPR_ASSERT(num_grpclb_addrs > 0); - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_grpclb_addrs, nullptr); - size_t lb_addresses_idx = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) continue; - if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); +UniquePtr<ServerAddressList> ExtractBalancerAddresses( + const ServerAddressList& addresses) { + auto balancer_addresses = MakeUnique<ServerAddressList>(); + for (size_t i = 0; i < addresses.size(); ++i) { + if (addresses[i].IsBalancer()) { + balancer_addresses->emplace_back(addresses[i]); } - grpc_lb_addresses_set_address( - lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr, - addresses->addresses[i].address.len, false /* is balancer */, - addresses->addresses[i].balancer_name, nullptr /* user data */); } - GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx); - return lb_addresses; + return balancer_addresses; } /* Returns the channel args for the LB channel, used to create a bidirectional @@ -912,10 +822,11 @@ grpc_lb_addresses* ExtractBalancerAddresses( * above the grpclb policy. * - \a args: other args inherited from the xds policy. */ grpc_channel_args* BuildBalancerChannelArgs( - const grpc_lb_addresses* addresses, + const ServerAddressList& addresses, FakeResolverResponseGenerator* response_generator, const grpc_channel_args* args) { - grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses); + UniquePtr<ServerAddressList> balancer_addresses = + ExtractBalancerAddresses(addresses); // Channel args to remove. static const char* args_to_remove[] = { // LB policy name, since we want to use the default (pick_first) in @@ -933,7 +844,7 @@ grpc_channel_args* BuildBalancerChannelArgs( // is_balancer=true. We need the LB channel to return addresses with // is_balancer=false so that it does not wind up recursively using the // xds LB policy, as per the special case logic in client_channel.c. - GRPC_ARG_LB_ADDRESSES, + GRPC_ARG_SERVER_ADDRESS_LIST, // The fake resolver response generator, because we are replacing it // with the one from the xds policy, used to propagate updates to // the LB channel. @@ -949,10 +860,10 @@ grpc_channel_args* BuildBalancerChannelArgs( }; // Channel args to add. const grpc_arg args_to_add[] = { - // New LB addresses. + // New server address list. // Note that we pass these in both when creating the LB channel // and via the fake resolver. The latter is what actually gets used. - grpc_lb_addresses_create_channel_arg(lb_addresses), + CreateServerAddressListChannelArg(balancer_addresses.get()), // The fake resolver response generator, which we use to inject // address updates into the LB channel. grpc_core::FakeResolverResponseGenerator::MakeChannelArg( @@ -970,10 +881,7 @@ grpc_channel_args* BuildBalancerChannelArgs( args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add, GPR_ARRAY_SIZE(args_to_add)); // Make any necessary modifications for security. - new_args = grpc_lb_policy_xds_modify_lb_channel_args(new_args); - // Clean up. - grpc_lb_addresses_destroy(lb_addresses); - return new_args; + return grpc_lb_policy_xds_modify_lb_channel_args(new_args); } // @@ -981,8 +889,7 @@ grpc_channel_args* BuildBalancerChannelArgs( // // TODO(vishalpowar): Use lb_config in args to configure LB policy. -XdsLb::XdsLb(const grpc_lb_addresses* addresses, - const LoadBalancingPolicy::Args& args) +XdsLb::XdsLb(const LoadBalancingPolicy::Args& args) : LoadBalancingPolicy(args), response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()), lb_call_backoff_( @@ -1038,9 +945,6 @@ XdsLb::~XdsLb() { if (serverlist_ != nullptr) { xds_grpclb_destroy_serverlist(serverlist_); } - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } grpc_subchannel_index_unref(); } @@ -1088,7 +992,6 @@ void XdsLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { while ((pp = pending_picks_) != nullptr) { pending_picks_ = pp->next; pp->pick->on_complete = pp->original_on_complete; - pp->pick->user_data = nullptr; grpc_error* error = GRPC_ERROR_NONE; if (new_policy->PickLocked(pp->pick, &error)) { // Synchronous return; schedule closure. @@ -1241,21 +1144,16 @@ void XdsLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current, } void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { + const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); + if (addresses == nullptr) { // Ignore this update. gpr_log(GPR_ERROR, "[xdslb %p] No valid LB addresses channel arg in update, ignoring.", this); return; } - const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); // Update fallback address list. - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } - fallback_backend_addresses_ = ExtractBackendAddresses(addresses); + fallback_backend_addresses_ = ExtractBackendAddresses(*addresses); // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; @@ -1266,7 +1164,7 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); // Construct args for balancer channel. grpc_channel_args* lb_channel_args = - BuildBalancerChannelArgs(addresses, response_generator_.get(), &args); + BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args); // Create balancer channel if needed. if (lb_channel_ == nullptr) { char* uri_str; @@ -1457,37 +1355,15 @@ void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg, // PendingPick // -// Adds lb_token of selected subchannel (address) to the call's initial -// metadata. -grpc_error* AddLbTokenToInitialMetadata( - grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage, - grpc_metadata_batch* initial_metadata) { - GPR_ASSERT(lb_token_mdelem_storage != nullptr); - GPR_ASSERT(!GRPC_MDISNULL(lb_token)); - return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, - lb_token); -} - // Destroy function used when embedding client stats in call context. void DestroyClientStats(void* arg) { static_cast<XdsLbClientStats*>(arg)->Unref(); } -void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) { - /* if connected_subchannel is nullptr, no pick has been made by the - * child policy (e.g., all addresses failed to connect). There won't be any - * user_data/token available */ +void XdsLb::PendingPickCleanup(PendingPick* pp) { + // If connected_subchannel is nullptr, no pick has been made by the + // child policy (e.g., all addresses failed to connect). if (pp->pick->connected_subchannel != nullptr) { - if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) { - AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token), - &pp->pick->lb_token_mdelem_storage, - pp->pick->initial_metadata); - } else { - gpr_log(GPR_ERROR, - "[xdslb %p] No LB token for connected subchannel pick %p", - pp->xdslb_policy, pp->pick); - abort(); - } // Pass on client stats via context. Passes ownership of the reference. if (pp->client_stats != nullptr) { pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = @@ -1505,7 +1381,7 @@ void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) { * order to unref the child policy instance upon its invocation */ void XdsLb::OnPendingPickComplete(void* arg, grpc_error* error) { PendingPick* pp = static_cast<PendingPick*>(arg); - PendingPickSetMetadataAndContext(pp); + PendingPickCleanup(pp); GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error)); Delete(pp); } @@ -1537,16 +1413,14 @@ void XdsLb::AddPendingPick(PendingPick* pp) { // completion callback even if the pick is available immediately. bool XdsLb::PickFromChildPolicyLocked(bool force_async, PendingPick* pp, grpc_error** error) { - // Set client_stats and user_data. + // Set client_stats. if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { pp->client_stats = lb_calld_->client_stats()->Ref(); } - GPR_ASSERT(pp->pick->user_data == nullptr); - pp->pick->user_data = (void**)&pp->lb_token; // Pick via the child policy. bool pick_done = child_policy_->PickLocked(pp->pick, error); if (pick_done) { - PendingPickSetMetadataAndContext(pp); + PendingPickCleanup(pp); if (force_async) { GRPC_CLOSURE_SCHED(pp->original_on_complete, *error); *error = GRPC_ERROR_NONE; @@ -1608,20 +1482,19 @@ void XdsLb::CreateChildPolicyLocked(const Args& args) { } grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { - grpc_lb_addresses* addresses; bool is_backend_from_grpclb_load_balancer = false; // This should never be invoked if we do not have serverlist_, as fallback // mode is disabled for xDS plugin. GPR_ASSERT(serverlist_ != nullptr); GPR_ASSERT(serverlist_->num_servers > 0); - addresses = ProcessServerlist(serverlist_); - is_backend_from_grpclb_load_balancer = true; + UniquePtr<ServerAddressList> addresses = ProcessServerlist(serverlist_); GPR_ASSERT(addresses != nullptr); - // Replace the LB addresses in the channel args that we pass down to + is_backend_from_grpclb_load_balancer = true; + // Replace the server address list in the channel args that we pass down to // the subchannel. - static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; + static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST}; const grpc_arg args_to_add[] = { - grpc_lb_addresses_create_channel_arg(addresses), + CreateServerAddressListChannelArg(addresses.get()), // A channel arg indicating if the target is a backend inferred from a // grpclb load balancer. grpc_channel_arg_integer_create( @@ -1631,7 +1504,6 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, GPR_ARRAY_SIZE(args_to_add)); - grpc_lb_addresses_destroy(addresses); return args; } @@ -1765,19 +1637,18 @@ class XdsFactory : public LoadBalancingPolicyFactory { OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( const LoadBalancingPolicy::Args& args) const override { /* Count the number of gRPC-LB addresses. There must be at least one. */ - const grpc_arg* arg = - grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { - return nullptr; - } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + const ServerAddressList* addresses = + FindServerAddressListChannelArg(args.args); + if (addresses == nullptr) return nullptr; + bool found_balancer_address = false; + for (size_t i = 0; i < addresses->size(); ++i) { + if ((*addresses)[i].IsBalancer()) { + found_balancer_address = true; + break; + } } - if (num_grpclb_addrs == 0) return nullptr; - return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(addresses, args)); + if (!found_balancer_address) return nullptr; + return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(args)); } const char* name() const override { return "xds_experimental"; } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h index 32c4acc8a3..f713b7f563 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h @@ -21,7 +21,7 @@ #include <grpc/support/port_platform.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include <grpc/impl/codegen/grpc_types.h> /// Makes any necessary modifications to \a args for use in the xds /// balancer channel. diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc index 5ab72efce4..9a11f8e39f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc @@ -25,6 +25,7 @@ #include <string.h> #include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -41,22 +42,23 @@ int BalancerNameCmp(const grpc_core::UniquePtr<char>& a, } RefCountedPtr<TargetAuthorityTable> CreateTargetAuthorityTable( - grpc_lb_addresses* addresses) { + const ServerAddressList& addresses) { TargetAuthorityTable::Entry* target_authority_entries = - static_cast<TargetAuthorityTable::Entry*>(gpr_zalloc( - sizeof(*target_authority_entries) * addresses->num_addresses)); - for (size_t i = 0; i < addresses->num_addresses; ++i) { + static_cast<TargetAuthorityTable::Entry*>( + gpr_zalloc(sizeof(*target_authority_entries) * addresses.size())); + for (size_t i = 0; i < addresses.size(); ++i) { char* addr_str; - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_str, &addresses->addresses[i].address, true) > 0); + GPR_ASSERT( + grpc_sockaddr_to_string(&addr_str, &addresses[i].address(), true) > 0); target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str); - target_authority_entries[i].value.reset( - gpr_strdup(addresses->addresses[i].balancer_name)); gpr_free(addr_str); + char* balancer_name = grpc_channel_arg_get_string(grpc_channel_args_find( + addresses[i].args(), GRPC_ARG_ADDRESS_BALANCER_NAME)); + target_authority_entries[i].value.reset(gpr_strdup(balancer_name)); } RefCountedPtr<TargetAuthorityTable> target_authority_table = - TargetAuthorityTable::Create(addresses->num_addresses, - target_authority_entries, BalancerNameCmp); + TargetAuthorityTable::Create(addresses.size(), target_authority_entries, + BalancerNameCmp); gpr_free(target_authority_entries); return target_authority_table; } @@ -71,13 +73,12 @@ grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args( grpc_arg args_to_add[2]; size_t num_args_to_add = 0; // Add arg for targets info table. - const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_LB_ADDRESSES); - GPR_ASSERT(arg != nullptr); - GPR_ASSERT(arg->type == GRPC_ARG_POINTER); - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); + grpc_core::ServerAddressList* addresses = + grpc_core::FindServerAddressListChannelArg(args); + GPR_ASSERT(addresses != nullptr); grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable> - target_authority_table = grpc_core::CreateTargetAuthorityTable(addresses); + target_authority_table = + grpc_core::CreateTargetAuthorityTable(*addresses); args_to_add[num_args_to_add++] = grpc_core::CreateTargetAuthorityTableChannelArg( target_authority_table.get()); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h index 9d08defa7e..6704995641 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h @@ -25,7 +25,7 @@ #include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/lib/iomgr/exec_ctx.h" #define XDS_SERVICE_NAME_MAX_LENGTH 128 diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.cc b/src/core/ext/filters/client_channel/lb_policy_factory.cc deleted file mode 100644 index 5c6363d295..0000000000 --- a/src/core/ext/filters/client_channel/lb_policy_factory.cc +++ /dev/null @@ -1,163 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#include <string.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/string_util.h> - -#include "src/core/lib/channel/channel_args.h" - -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" -#include "src/core/ext/filters/client_channel/parse_address.h" - -grpc_lb_addresses* grpc_lb_addresses_create( - size_t num_addresses, const grpc_lb_user_data_vtable* user_data_vtable) { - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(gpr_zalloc(sizeof(grpc_lb_addresses))); - addresses->num_addresses = num_addresses; - addresses->user_data_vtable = user_data_vtable; - const size_t addresses_size = sizeof(grpc_lb_address) * num_addresses; - addresses->addresses = - static_cast<grpc_lb_address*>(gpr_zalloc(addresses_size)); - return addresses; -} - -grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) { - grpc_lb_addresses* new_addresses = grpc_lb_addresses_create( - addresses->num_addresses, addresses->user_data_vtable); - memcpy(new_addresses->addresses, addresses->addresses, - sizeof(grpc_lb_address) * addresses->num_addresses); - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (new_addresses->addresses[i].balancer_name != nullptr) { - new_addresses->addresses[i].balancer_name = - gpr_strdup(new_addresses->addresses[i].balancer_name); - } - if (new_addresses->addresses[i].user_data != nullptr) { - new_addresses->addresses[i].user_data = addresses->user_data_vtable->copy( - new_addresses->addresses[i].user_data); - } - } - return new_addresses; -} - -void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, - const void* address, size_t address_len, - bool is_balancer, const char* balancer_name, - void* user_data) { - GPR_ASSERT(index < addresses->num_addresses); - if (user_data != nullptr) GPR_ASSERT(addresses->user_data_vtable != nullptr); - grpc_lb_address* target = &addresses->addresses[index]; - memcpy(target->address.addr, address, address_len); - target->address.len = static_cast<socklen_t>(address_len); - target->is_balancer = is_balancer; - target->balancer_name = gpr_strdup(balancer_name); - target->user_data = user_data; -} - -bool grpc_lb_addresses_set_address_from_uri(grpc_lb_addresses* addresses, - size_t index, const grpc_uri* uri, - bool is_balancer, - const char* balancer_name, - void* user_data) { - grpc_resolved_address address; - if (!grpc_parse_uri(uri, &address)) return false; - grpc_lb_addresses_set_address(addresses, index, address.addr, address.len, - is_balancer, balancer_name, user_data); - return true; -} - -int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1, - const grpc_lb_addresses* addresses2) { - if (addresses1->num_addresses > addresses2->num_addresses) return 1; - if (addresses1->num_addresses < addresses2->num_addresses) return -1; - if (addresses1->user_data_vtable > addresses2->user_data_vtable) return 1; - if (addresses1->user_data_vtable < addresses2->user_data_vtable) return -1; - for (size_t i = 0; i < addresses1->num_addresses; ++i) { - const grpc_lb_address* target1 = &addresses1->addresses[i]; - const grpc_lb_address* target2 = &addresses2->addresses[i]; - if (target1->address.len > target2->address.len) return 1; - if (target1->address.len < target2->address.len) return -1; - int retval = memcmp(target1->address.addr, target2->address.addr, - target1->address.len); - if (retval != 0) return retval; - if (target1->is_balancer > target2->is_balancer) return 1; - if (target1->is_balancer < target2->is_balancer) return -1; - const char* balancer_name1 = - target1->balancer_name != nullptr ? target1->balancer_name : ""; - const char* balancer_name2 = - target2->balancer_name != nullptr ? target2->balancer_name : ""; - retval = strcmp(balancer_name1, balancer_name2); - if (retval != 0) return retval; - if (addresses1->user_data_vtable != nullptr) { - retval = addresses1->user_data_vtable->cmp(target1->user_data, - target2->user_data); - if (retval != 0) return retval; - } - } - return 0; -} - -void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses) { - for (size_t i = 0; i < addresses->num_addresses; ++i) { - gpr_free(addresses->addresses[i].balancer_name); - if (addresses->addresses[i].user_data != nullptr) { - addresses->user_data_vtable->destroy(addresses->addresses[i].user_data); - } - } - gpr_free(addresses->addresses); - gpr_free(addresses); -} - -static void* lb_addresses_copy(void* addresses) { - return grpc_lb_addresses_copy(static_cast<grpc_lb_addresses*>(addresses)); -} -static void lb_addresses_destroy(void* addresses) { - grpc_lb_addresses_destroy(static_cast<grpc_lb_addresses*>(addresses)); -} -static int lb_addresses_cmp(void* addresses1, void* addresses2) { - return grpc_lb_addresses_cmp(static_cast<grpc_lb_addresses*>(addresses1), - static_cast<grpc_lb_addresses*>(addresses2)); -} -static const grpc_arg_pointer_vtable lb_addresses_arg_vtable = { - lb_addresses_copy, lb_addresses_destroy, lb_addresses_cmp}; - -grpc_arg grpc_lb_addresses_create_channel_arg( - const grpc_lb_addresses* addresses) { - return grpc_channel_arg_pointer_create( - (char*)GRPC_ARG_LB_ADDRESSES, (void*)addresses, &lb_addresses_arg_vtable); -} - -grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( - const grpc_channel_args* channel_args) { - const grpc_arg* lb_addresses_arg = - grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES); - if (lb_addresses_arg == nullptr || lb_addresses_arg->type != GRPC_ARG_POINTER) - return nullptr; - return static_cast<grpc_lb_addresses*>(lb_addresses_arg->value.pointer.p); -} - -bool grpc_lb_addresses_contains_balancer_address( - const grpc_lb_addresses& addresses) { - for (size_t i = 0; i < addresses.num_addresses; ++i) { - if (addresses.addresses[i].is_balancer) return true; - } - return false; -} diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h index a59deadb26..a165ebafab 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.h +++ b/src/core/ext/filters/client_channel/lb_policy_factory.h @@ -21,91 +21,9 @@ #include <grpc/support/port_platform.h> -#include "src/core/lib/iomgr/resolve_address.h" - -#include "src/core/ext/filters/client_channel/client_channel_factory.h" #include "src/core/ext/filters/client_channel/lb_policy.h" -#include "src/core/lib/uri/uri_parser.h" - -// -// representation of an LB address -// - -// Channel arg key for grpc_lb_addresses. -#define GRPC_ARG_LB_ADDRESSES "grpc.lb_addresses" - -/** A resolved address alongside any LB related information associated with it. - * \a user_data, if not NULL, contains opaque data meant to be consumed by the - * gRPC LB policy. Note that no all LB policies support \a user_data as input. - * Those who don't will simply ignore it and will correspondingly return NULL in - * their namesake pick() output argument. */ -// TODO(roth): Once we figure out a better way of handling user_data in -// LB policies, convert these structs to C++ classes. -typedef struct grpc_lb_address { - grpc_resolved_address address; - bool is_balancer; - char* balancer_name; /* For secure naming. */ - void* user_data; -} grpc_lb_address; - -typedef struct grpc_lb_user_data_vtable { - void* (*copy)(void*); - void (*destroy)(void*); - int (*cmp)(void*, void*); -} grpc_lb_user_data_vtable; - -typedef struct grpc_lb_addresses { - size_t num_addresses; - grpc_lb_address* addresses; - const grpc_lb_user_data_vtable* user_data_vtable; -} grpc_lb_addresses; - -/** Returns a grpc_addresses struct with enough space for - \a num_addresses addresses. The \a user_data_vtable argument may be - NULL if no user data will be added. */ -grpc_lb_addresses* grpc_lb_addresses_create( - size_t num_addresses, const grpc_lb_user_data_vtable* user_data_vtable); - -/** Creates a copy of \a addresses. */ -grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses); - -/** Sets the value of the address at index \a index of \a addresses. - * \a address is a socket address of length \a address_len. */ -void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, - const void* address, size_t address_len, - bool is_balancer, const char* balancer_name, - void* user_data); - -/** Sets the value of the address at index \a index of \a addresses from \a uri. - * Returns true upon success, false otherwise. */ -bool grpc_lb_addresses_set_address_from_uri(grpc_lb_addresses* addresses, - size_t index, const grpc_uri* uri, - bool is_balancer, - const char* balancer_name, - void* user_data); - -/** Compares \a addresses1 and \a addresses2. */ -int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1, - const grpc_lb_addresses* addresses2); - -/** Destroys \a addresses. */ -void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses); - -/** Returns a channel arg containing \a addresses. */ -grpc_arg grpc_lb_addresses_create_channel_arg( - const grpc_lb_addresses* addresses); - -/** Returns the \a grpc_lb_addresses instance in \a channel_args or NULL */ -grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( - const grpc_channel_args* channel_args); - -// Returns true if addresses contains at least one balancer address. -bool grpc_lb_addresses_contains_balancer_address( - const grpc_lb_addresses& addresses); - -// -// LB policy factory -// +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/orphanable.h" namespace grpc_core { diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 4ebc2c8161..c8425ae336 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -33,6 +33,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/env.h" @@ -117,7 +118,7 @@ class AresDnsResolver : public Resolver { /// retry backoff state BackOff backoff_; /// currently resolving addresses - grpc_lb_addresses* lb_addresses_ = nullptr; + UniquePtr<ServerAddressList> addresses_; /// currently resolving service config char* service_config_json_ = nullptr; // has shutdown been initiated @@ -314,13 +315,13 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { r->resolving_ = false; gpr_free(r->pending_request_); r->pending_request_ = nullptr; - if (r->lb_addresses_ != nullptr) { + if (r->addresses_ != nullptr) { static const char* args_to_remove[1]; size_t num_args_to_remove = 0; grpc_arg args_to_add[2]; size_t num_args_to_add = 0; args_to_add[num_args_to_add++] = - grpc_lb_addresses_create_channel_arg(r->lb_addresses_); + CreateServerAddressListChannelArg(r->addresses_.get()); char* service_config_string = nullptr; if (r->service_config_json_ != nullptr) { service_config_string = ChooseServiceConfig(r->service_config_json_); @@ -337,7 +338,7 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { r->channel_args_, args_to_remove, num_args_to_remove, args_to_add, num_args_to_add); gpr_free(service_config_string); - grpc_lb_addresses_destroy(r->lb_addresses_); + r->addresses_.reset(); // Reset backoff state so that we start from the beginning when the // next request gets triggered. r->backoff_.Reset(); @@ -412,11 +413,10 @@ void AresDnsResolver::StartResolvingLocked() { self.release(); GPR_ASSERT(!resolving_); resolving_ = true; - lb_addresses_ = nullptr; service_config_json_ = nullptr; pending_request_ = grpc_dns_lookup_ares_locked( dns_server_, name_to_resolve_, kDefaultPort, interested_parties_, - &on_resolved_, &lb_addresses_, true /* check_grpclb */, + &on_resolved_, &addresses_, true /* check_grpclb */, request_service_config_ ? &service_config_json_ : nullptr, query_timeout_ms_, combiner()); last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now(); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc index f42b1e309d..8abc34c6ed 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc @@ -31,6 +31,7 @@ #include <grpc/support/time.h> #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 55715869b6..1b1c2303da 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -37,12 +37,16 @@ #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/nameser.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +using grpc_core::ServerAddress; +using grpc_core::ServerAddressList; + static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; @@ -58,7 +62,7 @@ struct grpc_ares_request { /** closure to call when the request completes */ grpc_closure* on_done; /** the pointer to receive the resolved addresses */ - grpc_lb_addresses** lb_addrs_out; + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses_out; /** the pointer to receive the service config in JSON */ char** service_config_json_out; /** the evernt driver used by this request */ @@ -87,12 +91,11 @@ typedef struct grpc_ares_hostbyname_request { static void do_basic_init(void) { gpr_mu_init(&g_init_mu); } -static void log_address_sorting_list(grpc_lb_addresses* lb_addrs, +static void log_address_sorting_list(const ServerAddressList& addresses, const char* input_output_str) { - for (size_t i = 0; i < lb_addrs->num_addresses; i++) { + for (size_t i = 0; i < addresses.size(); i++) { char* addr_str; - if (grpc_sockaddr_to_string(&addr_str, &lb_addrs->addresses[i].address, - true)) { + if (grpc_sockaddr_to_string(&addr_str, &addresses[i].address(), true)) { gpr_log(GPR_DEBUG, "c-ares address sorting: %s[%" PRIuPTR "]=%s", input_output_str, i, addr_str); gpr_free(addr_str); @@ -104,29 +107,28 @@ static void log_address_sorting_list(grpc_lb_addresses* lb_addrs, } } -void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs) { +void grpc_cares_wrapper_address_sorting_sort(ServerAddressList* addresses) { if (grpc_trace_cares_address_sorting.enabled()) { - log_address_sorting_list(lb_addrs, "input"); + log_address_sorting_list(*addresses, "input"); } address_sorting_sortable* sortables = (address_sorting_sortable*)gpr_zalloc( - sizeof(address_sorting_sortable) * lb_addrs->num_addresses); - for (size_t i = 0; i < lb_addrs->num_addresses; i++) { - sortables[i].user_data = &lb_addrs->addresses[i]; - memcpy(&sortables[i].dest_addr.addr, &lb_addrs->addresses[i].address.addr, - lb_addrs->addresses[i].address.len); - sortables[i].dest_addr.len = lb_addrs->addresses[i].address.len; + sizeof(address_sorting_sortable) * addresses->size()); + for (size_t i = 0; i < addresses->size(); ++i) { + sortables[i].user_data = &(*addresses)[i]; + memcpy(&sortables[i].dest_addr.addr, &(*addresses)[i].address().addr, + (*addresses)[i].address().len); + sortables[i].dest_addr.len = (*addresses)[i].address().len; } - address_sorting_rfc_6724_sort(sortables, lb_addrs->num_addresses); - grpc_lb_address* sorted_lb_addrs = (grpc_lb_address*)gpr_zalloc( - sizeof(grpc_lb_address) * lb_addrs->num_addresses); - for (size_t i = 0; i < lb_addrs->num_addresses; i++) { - sorted_lb_addrs[i] = *(grpc_lb_address*)sortables[i].user_data; + address_sorting_rfc_6724_sort(sortables, addresses->size()); + ServerAddressList sorted; + sorted.reserve(addresses->size()); + for (size_t i = 0; i < addresses->size(); ++i) { + sorted.emplace_back(*static_cast<ServerAddress*>(sortables[i].user_data)); } gpr_free(sortables); - gpr_free(lb_addrs->addresses); - lb_addrs->addresses = sorted_lb_addrs; + *addresses = std::move(sorted); if (grpc_trace_cares_address_sorting.enabled()) { - log_address_sorting_list(lb_addrs, "output"); + log_address_sorting_list(*addresses, "output"); } } @@ -145,9 +147,9 @@ void grpc_ares_complete_request_locked(grpc_ares_request* r) { /* Invoke on_done callback and destroy the request */ r->ev_driver = nullptr; - grpc_lb_addresses* lb_addrs = *(r->lb_addrs_out); - if (lb_addrs != nullptr) { - grpc_cares_wrapper_address_sorting_sort(lb_addrs); + ServerAddressList* addresses = r->addresses_out->get(); + if (addresses != nullptr) { + grpc_cares_wrapper_address_sorting_sort(addresses); } GRPC_CLOSURE_SCHED(r->on_done, r->error); } @@ -181,33 +183,30 @@ static void on_hostbyname_done_locked(void* arg, int status, int timeouts, GRPC_ERROR_UNREF(r->error); r->error = GRPC_ERROR_NONE; r->success = true; - grpc_lb_addresses** lb_addresses = r->lb_addrs_out; - if (*lb_addresses == nullptr) { - *lb_addresses = grpc_lb_addresses_create(0, nullptr); - } - size_t prev_naddr = (*lb_addresses)->num_addresses; - size_t i; - for (i = 0; hostent->h_addr_list[i] != nullptr; i++) { + if (*r->addresses_out == nullptr) { + *r->addresses_out = grpc_core::MakeUnique<ServerAddressList>(); } - (*lb_addresses)->num_addresses += i; - (*lb_addresses)->addresses = static_cast<grpc_lb_address*>( - gpr_realloc((*lb_addresses)->addresses, - sizeof(grpc_lb_address) * (*lb_addresses)->num_addresses)); - for (i = prev_naddr; i < (*lb_addresses)->num_addresses; i++) { + ServerAddressList& addresses = **r->addresses_out; + for (size_t i = 0; hostent->h_addr_list[i] != nullptr; ++i) { + grpc_core::InlinedVector<grpc_arg, 2> args_to_add; + if (hr->is_balancer) { + args_to_add.emplace_back(grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_ADDRESS_IS_BALANCER), 1)); + args_to_add.emplace_back(grpc_channel_arg_string_create( + const_cast<char*>(GRPC_ARG_ADDRESS_BALANCER_NAME), hr->host)); + } + grpc_channel_args* args = grpc_channel_args_copy_and_add( + nullptr, args_to_add.data(), args_to_add.size()); switch (hostent->h_addrtype) { case AF_INET6: { size_t addr_len = sizeof(struct sockaddr_in6); struct sockaddr_in6 addr; memset(&addr, 0, addr_len); - memcpy(&addr.sin6_addr, hostent->h_addr_list[i - prev_naddr], + memcpy(&addr.sin6_addr, hostent->h_addr_list[i], sizeof(struct in6_addr)); addr.sin6_family = static_cast<unsigned char>(hostent->h_addrtype); addr.sin6_port = hr->port; - grpc_lb_addresses_set_address( - *lb_addresses, i, &addr, addr_len, - hr->is_balancer /* is_balancer */, - hr->is_balancer ? hr->host : nullptr /* balancer_name */, - nullptr /* user_data */); + addresses.emplace_back(&addr, addr_len, args); char output[INET6_ADDRSTRLEN]; ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN); gpr_log(GPR_DEBUG, @@ -220,15 +219,11 @@ static void on_hostbyname_done_locked(void* arg, int status, int timeouts, size_t addr_len = sizeof(struct sockaddr_in); struct sockaddr_in addr; memset(&addr, 0, addr_len); - memcpy(&addr.sin_addr, hostent->h_addr_list[i - prev_naddr], + memcpy(&addr.sin_addr, hostent->h_addr_list[i], sizeof(struct in_addr)); addr.sin_family = static_cast<unsigned char>(hostent->h_addrtype); addr.sin_port = hr->port; - grpc_lb_addresses_set_address( - *lb_addresses, i, &addr, addr_len, - hr->is_balancer /* is_balancer */, - hr->is_balancer ? hr->host : nullptr /* balancer_name */, - nullptr /* user_data */); + addresses.emplace_back(&addr, addr_len, args); char output[INET_ADDRSTRLEN]; ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN); gpr_log(GPR_DEBUG, @@ -467,11 +462,10 @@ error_cleanup: gpr_free(port); } -static bool inner_resolve_as_ip_literal_locked(const char* name, - const char* default_port, - grpc_lb_addresses** addrs, - char** host, char** port, - char** hostport) { +static bool inner_resolve_as_ip_literal_locked( + const char* name, const char* default_port, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, char** host, + char** port, char** hostport) { gpr_split_host_port(name, host, port); if (*host == nullptr) { gpr_log(GPR_ERROR, @@ -495,18 +489,16 @@ static bool inner_resolve_as_ip_literal_locked(const char* name, if (grpc_parse_ipv4_hostport(*hostport, &addr, false /* log errors */) || grpc_parse_ipv6_hostport(*hostport, &addr, false /* log errors */)) { GPR_ASSERT(*addrs == nullptr); - *addrs = grpc_lb_addresses_create(1, nullptr); - grpc_lb_addresses_set_address( - *addrs, 0, addr.addr, addr.len, false /* is_balancer */, - nullptr /* balancer_name */, nullptr /* user_data */); + *addrs = grpc_core::MakeUnique<ServerAddressList>(); + (*addrs)->emplace_back(addr.addr, addr.len, nullptr /* args */); return true; } return false; } -static bool resolve_as_ip_literal_locked(const char* name, - const char* default_port, - grpc_lb_addresses** addrs) { +static bool resolve_as_ip_literal_locked( + const char* name, const char* default_port, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs) { char* host = nullptr; char* port = nullptr; char* hostport = nullptr; @@ -521,13 +513,14 @@ static bool resolve_as_ip_literal_locked(const char* name, static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, - int query_timeout_ms, grpc_combiner* combiner) { + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, + bool check_grpclb, char** service_config_json, int query_timeout_ms, + grpc_combiner* combiner) { grpc_ares_request* r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request))); r->ev_driver = nullptr; r->on_done = on_done; - r->lb_addrs_out = addrs; + r->addresses_out = addrs; r->service_config_json_out = service_config_json; r->success = false; r->error = GRPC_ERROR_NONE; @@ -553,8 +546,8 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, - int query_timeout_ms, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, + bool check_grpclb, char** service_config_json, int query_timeout_ms, grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl; static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) { @@ -599,8 +592,8 @@ typedef struct grpc_resolve_address_ares_request { grpc_combiner* combiner; /** the pointer to receive the resolved addresses */ grpc_resolved_addresses** addrs_out; - /** currently resolving lb addresses */ - grpc_lb_addresses* lb_addrs; + /** currently resolving addresses */ + grpc_core::UniquePtr<ServerAddressList> addresses; /** closure to call when the resolve_address_ares request completes */ grpc_closure* on_resolve_address_done; /** a closure wrapping on_resolve_address_done, which should be invoked when @@ -613,7 +606,7 @@ typedef struct grpc_resolve_address_ares_request { /* pollset_set to be driven by */ grpc_pollset_set* interested_parties; /* underlying ares_request that the query is performed on */ - grpc_ares_request* ares_request; + grpc_ares_request* ares_request = nullptr; } grpc_resolve_address_ares_request; static void on_dns_lookup_done_locked(void* arg, grpc_error* error) { @@ -621,25 +614,24 @@ static void on_dns_lookup_done_locked(void* arg, grpc_error* error) { static_cast<grpc_resolve_address_ares_request*>(arg); gpr_free(r->ares_request); grpc_resolved_addresses** resolved_addresses = r->addrs_out; - if (r->lb_addrs == nullptr || r->lb_addrs->num_addresses == 0) { + if (r->addresses == nullptr || r->addresses->empty()) { *resolved_addresses = nullptr; } else { *resolved_addresses = static_cast<grpc_resolved_addresses*>( gpr_zalloc(sizeof(grpc_resolved_addresses))); - (*resolved_addresses)->naddrs = r->lb_addrs->num_addresses; + (*resolved_addresses)->naddrs = r->addresses->size(); (*resolved_addresses)->addrs = static_cast<grpc_resolved_address*>(gpr_zalloc( sizeof(grpc_resolved_address) * (*resolved_addresses)->naddrs)); - for (size_t i = 0; i < (*resolved_addresses)->naddrs; i++) { - GPR_ASSERT(!r->lb_addrs->addresses[i].is_balancer); - memcpy(&(*resolved_addresses)->addrs[i], - &r->lb_addrs->addresses[i].address, sizeof(grpc_resolved_address)); + for (size_t i = 0; i < (*resolved_addresses)->naddrs; ++i) { + GPR_ASSERT(!(*r->addresses)[i].IsBalancer()); + memcpy(&(*resolved_addresses)->addrs[i], &(*r->addresses)[i].address(), + sizeof(grpc_resolved_address)); } } GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error)); - if (r->lb_addrs != nullptr) grpc_lb_addresses_destroy(r->lb_addrs); GRPC_COMBINER_UNREF(r->combiner, "on_dns_lookup_done_cb"); - gpr_free(r); + grpc_core::Delete(r); } static void grpc_resolve_address_invoke_dns_lookup_ares_locked( @@ -648,7 +640,7 @@ static void grpc_resolve_address_invoke_dns_lookup_ares_locked( static_cast<grpc_resolve_address_ares_request*>(arg); r->ares_request = grpc_dns_lookup_ares_locked( nullptr /* dns_server */, r->name, r->default_port, r->interested_parties, - &r->on_dns_lookup_done_locked, &r->lb_addrs, false /* check_grpclb */, + &r->on_dns_lookup_done_locked, &r->addresses, false /* check_grpclb */, nullptr /* service_config_json */, GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, r->combiner); } @@ -659,8 +651,7 @@ static void grpc_resolve_address_ares_impl(const char* name, grpc_closure* on_done, grpc_resolved_addresses** addrs) { grpc_resolve_address_ares_request* r = - static_cast<grpc_resolve_address_ares_request*>( - gpr_zalloc(sizeof(grpc_resolve_address_ares_request))); + grpc_core::New<grpc_resolve_address_ares_request>(); r->combiner = grpc_combiner_create(); r->addrs_out = addrs; r->on_resolve_address_done = on_done; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index 9acef1d0ca..2808250456 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -21,7 +21,7 @@ #include <grpc/support/port_platform.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -61,8 +61,9 @@ extern void (*grpc_resolve_address_ares)(const char* name, extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addresses, bool check_grpclb, - char** service_config_json, int query_timeout_ms, grpc_combiner* combiner); + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, + bool check_grpclb, char** service_config_json, int query_timeout_ms, + grpc_combiner* combiner); /* Cancel the pending grpc_ares_request \a request */ extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request); @@ -89,10 +90,12 @@ bool grpc_ares_query_ipv6(); * Returns a bool indicating whether or not such an action was performed. * See https://github.com/grpc/grpc/issues/15158. */ bool grpc_ares_maybe_resolve_localhost_manually_locked( - const char* name, const char* default_port, grpc_lb_addresses** addrs); + const char* name, const char* default_port, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs); /* Sorts destinations in lb_addrs according to RFC 6724. */ -void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs); +void grpc_cares_wrapper_address_sorting_sort( + grpc_core::ServerAddressList* addresses); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H \ */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc index fc78b18304..1f4701c999 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc @@ -29,16 +29,17 @@ struct grpc_ares_request { static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, - int query_timeout_ms, grpc_combiner* combiner) { + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, + bool check_grpclb, char** service_config_json, int query_timeout_ms, + grpc_combiner* combiner) { return NULL; } grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, - int query_timeout_ms, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, + bool check_grpclb, char** service_config_json, int query_timeout_ms, grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl; static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {} diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc index 639eec2323..028d844216 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc @@ -27,7 +27,8 @@ bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } bool grpc_ares_maybe_resolve_localhost_manually_locked( - const char* name, const char* default_port, grpc_lb_addresses** addrs) { + const char* name, const char* default_port, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs) { return false; } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc index 7e34784691..202452f1b2 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc @@ -23,9 +23,9 @@ #include <grpc/support/string_util.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/socket_windows.h" @@ -33,8 +33,9 @@ bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } static bool inner_maybe_resolve_localhost_manually_locked( - const char* name, const char* default_port, grpc_lb_addresses** addrs, - char** host, char** port) { + const char* name, const char* default_port, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, char** host, + char** port) { gpr_split_host_port(name, host, port); if (*host == nullptr) { gpr_log(GPR_ERROR, @@ -55,7 +56,7 @@ static bool inner_maybe_resolve_localhost_manually_locked( } if (gpr_stricmp(*host, "localhost") == 0) { GPR_ASSERT(*addrs == nullptr); - *addrs = grpc_lb_addresses_create(2, nullptr); + *addrs = grpc_core::MakeUnique<grpc_core::ServerAddressList>(); uint16_t numeric_port = grpc_strhtons(*port); // Append the ipv6 loopback address. struct sockaddr_in6 ipv6_loopback_addr; @@ -63,10 +64,8 @@ static bool inner_maybe_resolve_localhost_manually_locked( ((char*)&ipv6_loopback_addr.sin6_addr)[15] = 1; ipv6_loopback_addr.sin6_family = AF_INET6; ipv6_loopback_addr.sin6_port = numeric_port; - grpc_lb_addresses_set_address( - *addrs, 0, &ipv6_loopback_addr, sizeof(ipv6_loopback_addr), - false /* is_balancer */, nullptr /* balancer_name */, - nullptr /* user_data */); + (*addrs)->emplace_back(&ipv6_loopback_addr, sizeof(ipv6_loopback_addr), + nullptr /* args */); // Append the ipv4 loopback address. struct sockaddr_in ipv4_loopback_addr; memset(&ipv4_loopback_addr, 0, sizeof(ipv4_loopback_addr)); @@ -74,19 +73,18 @@ static bool inner_maybe_resolve_localhost_manually_locked( ((char*)&ipv4_loopback_addr.sin_addr)[3] = 0x01; ipv4_loopback_addr.sin_family = AF_INET; ipv4_loopback_addr.sin_port = numeric_port; - grpc_lb_addresses_set_address( - *addrs, 1, &ipv4_loopback_addr, sizeof(ipv4_loopback_addr), - false /* is_balancer */, nullptr /* balancer_name */, - nullptr /* user_data */); + (*addrs)->emplace_back(&ipv4_loopback_addr, sizeof(ipv4_loopback_addr), + nullptr /* args */); // Let the address sorter figure out which one should be tried first. - grpc_cares_wrapper_address_sorting_sort(*addrs); + grpc_cares_wrapper_address_sorting_sort(addrs->get()); return true; } return false; } bool grpc_ares_maybe_resolve_localhost_manually_locked( - const char* name, const char* default_port, grpc_lb_addresses** addrs) { + const char* name, const char* default_port, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs) { char* host = nullptr; char* port = nullptr; bool out = inner_maybe_resolve_localhost_manually_locked(name, default_port, diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 65ff1ec1a5..c365f1abfd 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -26,8 +26,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> -#include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/env.h" @@ -198,18 +198,14 @@ void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(r->name_to_resolve_)); if (r->addresses_ != nullptr) { - grpc_lb_addresses* addresses = grpc_lb_addresses_create( - r->addresses_->naddrs, nullptr /* user_data_vtable */); + ServerAddressList addresses; for (size_t i = 0; i < r->addresses_->naddrs; ++i) { - grpc_lb_addresses_set_address( - addresses, i, &r->addresses_->addrs[i].addr, - r->addresses_->addrs[i].len, false /* is_balancer */, - nullptr /* balancer_name */, nullptr /* user_data */); + addresses.emplace_back(&r->addresses_->addrs[i].addr, + r->addresses_->addrs[i].len, nullptr /* args */); } - grpc_arg new_arg = grpc_lb_addresses_create_channel_arg(addresses); + grpc_arg new_arg = CreateServerAddressListChannelArg(&addresses); result = grpc_channel_args_copy_and_add(r->channel_args_, &new_arg, 1); grpc_resolved_addresses_destroy(r->addresses_); - grpc_lb_addresses_destroy(addresses); // Reset backoff state so that we start from the beginning when the // next request gets triggered. r->backoff_.Reset(); diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index 3aa690bea4..258339491c 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -28,12 +28,13 @@ #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index 7f69059351..d86111c382 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -19,10 +19,9 @@ #include <grpc/support/port_platform.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/ref_counted.h" -#include "src/core/lib/uri/uri_parser.h" +#include "src/core/lib/iomgr/error.h" #define GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR \ "grpc.fake_resolver.response_generator" diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc index 801734764b..1654747a79 100644 --- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc @@ -26,9 +26,9 @@ #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" @@ -45,7 +45,8 @@ namespace { class SockaddrResolver : public Resolver { public: /// Takes ownership of \a addresses. - SockaddrResolver(const ResolverArgs& args, grpc_lb_addresses* addresses); + SockaddrResolver(const ResolverArgs& args, + UniquePtr<ServerAddressList> addresses); void NextLocked(grpc_channel_args** result, grpc_closure* on_complete) override; @@ -58,7 +59,7 @@ class SockaddrResolver : public Resolver { void MaybeFinishNextLocked(); /// the addresses that we've "resolved" - grpc_lb_addresses* addresses_ = nullptr; + UniquePtr<ServerAddressList> addresses_; /// channel args grpc_channel_args* channel_args_ = nullptr; /// have we published? @@ -70,13 +71,12 @@ class SockaddrResolver : public Resolver { }; SockaddrResolver::SockaddrResolver(const ResolverArgs& args, - grpc_lb_addresses* addresses) + UniquePtr<ServerAddressList> addresses) : Resolver(args.combiner), - addresses_(addresses), + addresses_(std::move(addresses)), channel_args_(grpc_channel_args_copy(args.args)) {} SockaddrResolver::~SockaddrResolver() { - grpc_lb_addresses_destroy(addresses_); grpc_channel_args_destroy(channel_args_); } @@ -100,7 +100,7 @@ void SockaddrResolver::ShutdownLocked() { void SockaddrResolver::MaybeFinishNextLocked() { if (next_completion_ != nullptr && !published_) { published_ = true; - grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses_); + grpc_arg arg = CreateServerAddressListChannelArg(addresses_.get()); *target_result_ = grpc_channel_args_copy_and_add(channel_args_, &arg, 1); GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE); next_completion_ = nullptr; @@ -127,27 +127,27 @@ OrphanablePtr<Resolver> CreateSockaddrResolver( grpc_slice_buffer path_parts; grpc_slice_buffer_init(&path_parts); grpc_slice_split(path_slice, ",", &path_parts); - grpc_lb_addresses* addresses = grpc_lb_addresses_create( - path_parts.count, nullptr /* user_data_vtable */); + auto addresses = MakeUnique<ServerAddressList>(); bool errors_found = false; - for (size_t i = 0; i < addresses->num_addresses; i++) { + for (size_t i = 0; i < path_parts.count; i++) { grpc_uri ith_uri = *args.uri; - char* part_str = grpc_slice_to_c_string(path_parts.slices[i]); - ith_uri.path = part_str; - if (!parse(&ith_uri, &addresses->addresses[i].address)) { + UniquePtr<char> part_str(grpc_slice_to_c_string(path_parts.slices[i])); + ith_uri.path = part_str.get(); + grpc_resolved_address addr; + if (!parse(&ith_uri, &addr)) { errors_found = true; /* GPR_TRUE */ + break; } - gpr_free(part_str); - if (errors_found) break; + addresses->emplace_back(addr, nullptr /* args */); } grpc_slice_buffer_destroy_internal(&path_parts); grpc_slice_unref_internal(path_slice); if (errors_found) { - grpc_lb_addresses_destroy(addresses); return OrphanablePtr<Resolver>(nullptr); } // Instantiate resolver. - return OrphanablePtr<Resolver>(New<SockaddrResolver>(args, addresses)); + return OrphanablePtr<Resolver>( + New<SockaddrResolver>(args, std::move(addresses))); } class IPv4ResolverFactory : public ResolverFactory { diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.cc b/src/core/ext/filters/client_channel/resolver_result_parsing.cc index 4f7fd6b424..22b06db45c 100644 --- a/src/core/ext/filters/client_channel/resolver_result_parsing.cc +++ b/src/core/ext/filters/client_channel/resolver_result_parsing.cc @@ -30,9 +30,11 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/status_util.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/uri/uri_parser.h" // As per the retry design, we do not allow more than 5 retry attempts. #define MAX_MAX_RETRY_ATTEMPTS 5 @@ -99,12 +101,18 @@ void ProcessedResolverResult::ProcessLbPolicyName( } // Special case: If at least one balancer address is present, we use // the grpclb policy, regardless of what the resolver has returned. - const grpc_arg* channel_arg = - grpc_channel_args_find(resolver_result, GRPC_ARG_LB_ADDRESSES); - if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p); - if (grpc_lb_addresses_contains_balancer_address(*addresses)) { + const ServerAddressList* addresses = + FindServerAddressListChannelArg(resolver_result); + if (addresses != nullptr) { + bool found_balancer_address = false; + for (size_t i = 0; i < addresses->size(); ++i) { + const ServerAddress& address = (*addresses)[i]; + if (address.IsBalancer()) { + found_balancer_address = true; + break; + } + } + if (found_balancer_address) { if (lb_policy_name_ != nullptr && strcmp(lb_policy_name_.get(), "grpclb") != 0) { gpr_log(GPR_INFO, diff --git a/src/core/ext/filters/client_channel/server_address.cc b/src/core/ext/filters/client_channel/server_address.cc new file mode 100644 index 0000000000..ec33cbbd95 --- /dev/null +++ b/src/core/ext/filters/client_channel/server_address.cc @@ -0,0 +1,103 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/filters/client_channel/server_address.h" + +#include <string.h> + +namespace grpc_core { + +// +// ServerAddress +// + +ServerAddress::ServerAddress(const grpc_resolved_address& address, + grpc_channel_args* args) + : address_(address), args_(args) {} + +ServerAddress::ServerAddress(const void* address, size_t address_len, + grpc_channel_args* args) + : args_(args) { + memcpy(address_.addr, address, address_len); + address_.len = static_cast<socklen_t>(address_len); +} + +int ServerAddress::Cmp(const ServerAddress& other) const { + if (address_.len > other.address_.len) return 1; + if (address_.len < other.address_.len) return -1; + int retval = memcmp(address_.addr, other.address_.addr, address_.len); + if (retval != 0) return retval; + return grpc_channel_args_compare(args_, other.args_); +} + +bool ServerAddress::IsBalancer() const { + return grpc_channel_arg_get_bool( + grpc_channel_args_find(args_, GRPC_ARG_ADDRESS_IS_BALANCER), false); +} + +// +// ServerAddressList +// + +namespace { + +void* ServerAddressListCopy(void* addresses) { + ServerAddressList* a = static_cast<ServerAddressList*>(addresses); + return New<ServerAddressList>(*a); +} + +void ServerAddressListDestroy(void* addresses) { + ServerAddressList* a = static_cast<ServerAddressList*>(addresses); + Delete(a); +} + +int ServerAddressListCompare(void* addresses1, void* addresses2) { + ServerAddressList* a1 = static_cast<ServerAddressList*>(addresses1); + ServerAddressList* a2 = static_cast<ServerAddressList*>(addresses2); + if (a1->size() > a2->size()) return 1; + if (a1->size() < a2->size()) return -1; + for (size_t i = 0; i < a1->size(); ++i) { + int retval = (*a1)[i].Cmp((*a2)[i]); + if (retval != 0) return retval; + } + return 0; +} + +const grpc_arg_pointer_vtable server_addresses_arg_vtable = { + ServerAddressListCopy, ServerAddressListDestroy, ServerAddressListCompare}; + +} // namespace + +grpc_arg CreateServerAddressListChannelArg(const ServerAddressList* addresses) { + return grpc_channel_arg_pointer_create( + const_cast<char*>(GRPC_ARG_SERVER_ADDRESS_LIST), + const_cast<ServerAddressList*>(addresses), &server_addresses_arg_vtable); +} + +ServerAddressList* FindServerAddressListChannelArg( + const grpc_channel_args* channel_args) { + const grpc_arg* lb_addresses_arg = + grpc_channel_args_find(channel_args, GRPC_ARG_SERVER_ADDRESS_LIST); + if (lb_addresses_arg == nullptr || lb_addresses_arg->type != GRPC_ARG_POINTER) + return nullptr; + return static_cast<ServerAddressList*>(lb_addresses_arg->value.pointer.p); +} + +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/server_address.h b/src/core/ext/filters/client_channel/server_address.h new file mode 100644 index 0000000000..3a1bf1df67 --- /dev/null +++ b/src/core/ext/filters/client_channel/server_address.h @@ -0,0 +1,108 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SERVER_ADDRESS_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SERVER_ADDRESS_H + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/uri/uri_parser.h" + +// Channel arg key for ServerAddressList. +#define GRPC_ARG_SERVER_ADDRESS_LIST "grpc.server_address_list" + +// Channel arg key for a bool indicating whether an address is a grpclb +// load balancer (as opposed to a backend). +#define GRPC_ARG_ADDRESS_IS_BALANCER "grpc.address_is_balancer" + +// Channel arg key for a string indicating an address's balancer name. +#define GRPC_ARG_ADDRESS_BALANCER_NAME "grpc.address_balancer_name" + +namespace grpc_core { + +// +// ServerAddress +// + +// A server address is a grpc_resolved_address with an associated set of +// channel args. Any args present here will be merged into the channel +// args when a subchannel is created for this address. +class ServerAddress { + public: + // Takes ownership of args. + ServerAddress(const grpc_resolved_address& address, grpc_channel_args* args); + ServerAddress(const void* address, size_t address_len, + grpc_channel_args* args); + + ~ServerAddress() { grpc_channel_args_destroy(args_); } + + // Copyable. + ServerAddress(const ServerAddress& other) + : address_(other.address_), args_(grpc_channel_args_copy(other.args_)) {} + ServerAddress& operator=(const ServerAddress& other) { + address_ = other.address_; + grpc_channel_args_destroy(args_); + args_ = grpc_channel_args_copy(other.args_); + return *this; + } + + // Movable. + ServerAddress(ServerAddress&& other) + : address_(other.address_), args_(other.args_) { + other.args_ = nullptr; + } + ServerAddress& operator=(ServerAddress&& other) { + address_ = other.address_; + args_ = other.args_; + other.args_ = nullptr; + return *this; + } + + bool operator==(const ServerAddress& other) const { return Cmp(other) == 0; } + + int Cmp(const ServerAddress& other) const; + + const grpc_resolved_address& address() const { return address_; } + const grpc_channel_args* args() const { return args_; } + + bool IsBalancer() const; + + private: + grpc_resolved_address address_; + grpc_channel_args* args_; +}; + +// +// ServerAddressList +// + +typedef InlinedVector<ServerAddress, 1> ServerAddressList; + +// Returns a channel arg containing \a addresses. +grpc_arg CreateServerAddressListChannelArg(const ServerAddressList* addresses); + +// Returns the ServerListAddress instance in channel_args or NULL. +ServerAddressList* FindServerAddressListChannelArg( + const grpc_channel_args* channel_args); + +} // namespace grpc_core + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SERVER_ADDRESS_H */ diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 0817b1dd39..3100889e3f 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -837,7 +837,7 @@ static bool publish_transport_locked(grpc_subchannel* c) { /* publish */ c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>( - stk, c->channelz_subchannel, socket_uuid)); + stk, c->args, c->channelz_subchannel, socket_uuid)); gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", c->connected_subchannel.get(), c); @@ -1068,16 +1068,18 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { namespace grpc_core { ConnectedSubchannel::ConnectedSubchannel( - grpc_channel_stack* channel_stack, + grpc_channel_stack* channel_stack, const grpc_channel_args* args, grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode> channelz_subchannel, intptr_t socket_uuid) : RefCounted<ConnectedSubchannel>(&grpc_trace_stream_refcount), channel_stack_(channel_stack), + args_(grpc_channel_args_copy(args)), channelz_subchannel_(std::move(channelz_subchannel)), socket_uuid_(socket_uuid) {} ConnectedSubchannel::~ConnectedSubchannel() { + grpc_channel_args_destroy(args_); GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 69c2456ec2..14f87f2c68 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -85,28 +85,31 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> { size_t parent_data_size; }; - explicit ConnectedSubchannel( - grpc_channel_stack* channel_stack, + ConnectedSubchannel( + grpc_channel_stack* channel_stack, const grpc_channel_args* args, grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode> channelz_subchannel, intptr_t socket_uuid); ~ConnectedSubchannel(); - grpc_channel_stack* channel_stack() { return channel_stack_; } void NotifyOnStateChange(grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* closure); void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call); - channelz::SubchannelNode* channelz_subchannel() { + + grpc_channel_stack* channel_stack() const { return channel_stack_; } + const grpc_channel_args* args() const { return args_; } + channelz::SubchannelNode* channelz_subchannel() const { return channelz_subchannel_.get(); } - intptr_t socket_uuid() { return socket_uuid_; } + intptr_t socket_uuid() const { return socket_uuid_; } size_t GetInitialCallSizeEstimate(size_t parent_data_size) const; private: grpc_channel_stack* channel_stack_; + grpc_channel_args* args_; // ref counted pointer to the channelz node in this connected subchannel's // owning subchannel. grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode> diff --git a/src/core/lib/iomgr/sockaddr_utils.cc b/src/core/lib/iomgr/sockaddr_utils.cc index 1b66dceb13..0839bdfef2 100644 --- a/src/core/lib/iomgr/sockaddr_utils.cc +++ b/src/core/lib/iomgr/sockaddr_utils.cc @@ -217,6 +217,7 @@ void grpc_string_to_sockaddr(grpc_resolved_address* out, char* addr, int port) { } char* grpc_sockaddr_to_uri(const grpc_resolved_address* resolved_addr) { + if (resolved_addr->len == 0) return nullptr; grpc_resolved_address addr_normalized; if (grpc_sockaddr_is_v4mapped(resolved_addr, &addr_normalized)) { resolved_addr = &addr_normalized; |