diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 242 |
1 files changed, 101 insertions, 141 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index a46579c7f7..a9a5965ed1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -84,6 +84,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" @@ -113,6 +114,8 @@ #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000 +#define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token" + namespace grpc_core { TraceFlag grpc_lb_glb_trace(false, "glb"); @@ -121,7 +124,7 @@ namespace { class GrpcLb : public LoadBalancingPolicy { public: - GrpcLb(const grpc_lb_addresses* addresses, const Args& args); + explicit GrpcLb(const Args& args); void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; @@ -161,9 +164,6 @@ class GrpcLb : public LoadBalancingPolicy { // Our on_complete closure and the original one. grpc_closure on_complete; grpc_closure* original_on_complete; - // The LB token associated with the pick. This is set via user_data in - // the pick. - grpc_mdelem lb_token; // Stats for client-side load reporting. RefCountedPtr<GrpcLbClientStats> client_stats; // Next pending pick. @@ -329,7 +329,7 @@ class GrpcLb : public LoadBalancingPolicy { // 0 means not using fallback. int lb_fallback_timeout_ms_ = 0; // The backend addresses from the resolver. - grpc_lb_addresses* fallback_backend_addresses_ = nullptr; + UniquePtr<ServerAddressList> fallback_backend_addresses_; // Fallback timer. bool fallback_timer_callback_pending_ = false; grpc_timer lb_fallback_timer_; @@ -349,7 +349,7 @@ class GrpcLb : public LoadBalancingPolicy { // serverlist parsing code // -// vtable for LB tokens in grpc_lb_addresses +// vtable for LB token channel arg. void* lb_token_copy(void* token) { return token == nullptr ? nullptr @@ -361,38 +361,11 @@ void lb_token_destroy(void* token) { } } int lb_token_cmp(void* token1, void* token2) { - if (token1 > token2) return 1; - if (token1 < token2) return -1; - return 0; + return GPR_ICMP(token1, token2); } -const grpc_lb_user_data_vtable lb_token_vtable = { +const grpc_arg_pointer_vtable lb_token_arg_vtable = { lb_token_copy, lb_token_destroy, lb_token_cmp}; -// Returns the backend addresses extracted from the given addresses. -grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) { - // First pass: count the number of backend addresses. - size_t num_backends = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) { - ++num_backends; - } - } - // Second pass: actually populate the addresses and (empty) LB tokens. - grpc_lb_addresses* backend_addresses = - grpc_lb_addresses_create(num_backends, &lb_token_vtable); - size_t num_copied = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) continue; - const grpc_resolved_address* addr = &addresses->addresses[i].address; - grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, - addr->len, false /* is_balancer */, - nullptr /* balancer_name */, - (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); - ++num_copied; - } - return backend_addresses; -} - bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) { if (server->drop) return false; const grpc_grpclb_ip_address* ip = &server->ip_address; @@ -440,30 +413,16 @@ void ParseServer(const grpc_grpclb_server* server, } // Returns addresses extracted from \a serverlist. -grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { - size_t num_valid = 0; - /* first pass: count how many are valid in order to allocate the necessary - * memory in a single block */ +ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { + ServerAddressList addresses; for (size_t i = 0; i < serverlist->num_servers; ++i) { - if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid; - } - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_valid, &lb_token_vtable); - /* second pass: actually populate the addresses and LB tokens (aka user data - * to the outside world) to be read by the RR policy during its creation. - * Given that the validity tests are very cheap, they are performed again - * instead of marking the valid ones during the first pass, as this would - * incurr in an allocation due to the arbitrary number of server */ - size_t addr_idx = 0; - for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { - const grpc_grpclb_server* server = serverlist->servers[sl_idx]; - if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue; - GPR_ASSERT(addr_idx < num_valid); - /* address processing */ + const grpc_grpclb_server* server = serverlist->servers[i]; + if (!IsServerValid(serverlist->servers[i], i, false)) continue; + // Address processing. grpc_resolved_address addr; ParseServer(server, &addr); - /* lb token processing */ - void* user_data; + // LB token processing. + void* lb_token; if (server->has_load_balance_token) { const size_t lb_token_max_length = GPR_ARRAY_SIZE(server->load_balance_token); @@ -471,7 +430,7 @@ grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { strnlen(server->load_balance_token, lb_token_max_length); grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( server->load_balance_token, lb_token_length); - user_data = + lb_token = (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr) .payload; } else { @@ -481,15 +440,16 @@ grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { "be used instead", uri); gpr_free(uri); - user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; + lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; } - grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, - false /* is_balancer */, - nullptr /* balancer_name */, user_data); - ++addr_idx; - } - GPR_ASSERT(addr_idx == num_valid); - return lb_addresses; + // Add address. + grpc_arg arg = grpc_channel_arg_pointer_create( + const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token, + &lb_token_arg_vtable); + grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); + addresses.emplace_back(addr, args); + } + return addresses; } // @@ -829,8 +789,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_); } else { // Dispose of the fallback. - grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_); - grpclb_policy->fallback_backend_addresses_ = nullptr; + grpclb_policy->fallback_backend_addresses_.reset(); if (grpclb_policy->fallback_timer_callback_pending_) { grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); } @@ -910,31 +869,25 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( // helper code for creating balancer channel // -grpc_lb_addresses* ExtractBalancerAddresses( - const grpc_lb_addresses* addresses) { - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; - } - // There must be at least one balancer address, or else the - // client_channel would not have chosen this LB policy. - GPR_ASSERT(num_grpclb_addrs > 0); - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_grpclb_addrs, nullptr); - size_t lb_addresses_idx = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) continue; - if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); +ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) { + ServerAddressList balancer_addresses; + for (size_t i = 0; i < addresses.size(); ++i) { + if (addresses[i].IsBalancer()) { + // Strip out the is_balancer channel arg, since we don't want to + // recursively use the grpclb policy in the channel used to talk to + // the balancers. Note that we do NOT strip out the balancer_name + // channel arg, since we need that to set the authority correctly + // to talk to the balancers. + static const char* args_to_remove[] = { + GRPC_ARG_ADDRESS_IS_BALANCER, + }; + balancer_addresses.emplace_back( + addresses[i].address(), + grpc_channel_args_copy_and_remove(addresses[i].args(), args_to_remove, + GPR_ARRAY_SIZE(args_to_remove))); } - grpc_lb_addresses_set_address( - lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr, - addresses->addresses[i].address.len, false /* is balancer */, - addresses->addresses[i].balancer_name, nullptr /* user data */); } - GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx); - return lb_addresses; + return balancer_addresses; } /* Returns the channel args for the LB channel, used to create a bidirectional @@ -946,10 +899,10 @@ grpc_lb_addresses* ExtractBalancerAddresses( * above the grpclb policy. * - \a args: other args inherited from the grpclb policy. */ grpc_channel_args* BuildBalancerChannelArgs( - const grpc_lb_addresses* addresses, + const ServerAddressList& addresses, FakeResolverResponseGenerator* response_generator, const grpc_channel_args* args) { - grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses); + ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses); // Channel args to remove. static const char* args_to_remove[] = { // LB policy name, since we want to use the default (pick_first) in @@ -967,7 +920,7 @@ grpc_channel_args* BuildBalancerChannelArgs( // is_balancer=true. We need the LB channel to return addresses with // is_balancer=false so that it does not wind up recursively using the // grpclb LB policy, as per the special case logic in client_channel.c. - GRPC_ARG_LB_ADDRESSES, + GRPC_ARG_SERVER_ADDRESS_LIST, // The fake resolver response generator, because we are replacing it // with the one from the grpclb policy, used to propagate updates to // the LB channel. @@ -983,10 +936,10 @@ grpc_channel_args* BuildBalancerChannelArgs( }; // Channel args to add. const grpc_arg args_to_add[] = { - // New LB addresses. + // New address list. // Note that we pass these in both when creating the LB channel // and via the fake resolver. The latter is what actually gets used. - grpc_lb_addresses_create_channel_arg(lb_addresses), + CreateServerAddressListChannelArg(&balancer_addresses), // The fake resolver response generator, which we use to inject // address updates into the LB channel. grpc_core::FakeResolverResponseGenerator::MakeChannelArg( @@ -1004,18 +957,14 @@ grpc_channel_args* BuildBalancerChannelArgs( args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add, GPR_ARRAY_SIZE(args_to_add)); // Make any necessary modifications for security. - new_args = grpc_lb_policy_grpclb_modify_lb_channel_args(new_args); - // Clean up. - grpc_lb_addresses_destroy(lb_addresses); - return new_args; + return grpc_lb_policy_grpclb_modify_lb_channel_args(new_args); } // // ctor and dtor // -GrpcLb::GrpcLb(const grpc_lb_addresses* addresses, - const LoadBalancingPolicy::Args& args) +GrpcLb::GrpcLb(const LoadBalancingPolicy::Args& args) : LoadBalancingPolicy(args), response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()), lb_call_backoff_( @@ -1072,9 +1021,6 @@ GrpcLb::~GrpcLb() { if (serverlist_ != nullptr) { grpc_grpclb_destroy_serverlist(serverlist_); } - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } grpc_subchannel_index_unref(); } @@ -1122,7 +1068,6 @@ void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { while ((pp = pending_picks_) != nullptr) { pending_picks_ = pp->next; pp->pick->on_complete = pp->original_on_complete; - pp->pick->user_data = nullptr; grpc_error* error = GRPC_ERROR_NONE; if (new_policy->PickLocked(pp->pick, &error)) { // Synchronous return; schedule closure. @@ -1276,9 +1221,27 @@ void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current, notify); } +// Returns the backend addresses extracted from the given addresses. +UniquePtr<ServerAddressList> ExtractBackendAddresses( + const ServerAddressList& addresses) { + void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; + grpc_arg arg = grpc_channel_arg_pointer_create( + const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token, + &lb_token_arg_vtable); + auto backend_addresses = MakeUnique<ServerAddressList>(); + for (size_t i = 0; i < addresses.size(); ++i) { + if (!addresses[i].IsBalancer()) { + backend_addresses->emplace_back( + addresses[i].address(), + grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1)); + } + } + return backend_addresses; +} + void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { + const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); + if (addresses == nullptr) { // Ignore this update. gpr_log( GPR_ERROR, @@ -1286,13 +1249,8 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { this); return; } - const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); // Update fallback address list. - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } - fallback_backend_addresses_ = ExtractBackendAddresses(addresses); + fallback_backend_addresses_ = ExtractBackendAddresses(*addresses); // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; @@ -1303,7 +1261,7 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); // Construct args for balancer channel. grpc_channel_args* lb_channel_args = - BuildBalancerChannelArgs(addresses, response_generator_.get(), &args); + BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args); // Create balancer channel if needed. if (lb_channel_ == nullptr) { char* uri_str; @@ -1509,12 +1467,17 @@ void DestroyClientStats(void* arg) { } void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) { - /* if connected_subchannel is nullptr, no pick has been made by the RR - * policy (e.g., all addresses failed to connect). There won't be any - * user_data/token available */ + // If connected_subchannel is nullptr, no pick has been made by the RR + // policy (e.g., all addresses failed to connect). There won't be any + // LB token available. if (pp->pick->connected_subchannel != nullptr) { - if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) { - AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token), + const grpc_arg* arg = + grpc_channel_args_find(pp->pick->connected_subchannel->args(), + GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN); + if (arg != nullptr) { + grpc_mdelem lb_token = { + reinterpret_cast<uintptr_t>(arg->value.pointer.p)}; + AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(lb_token), &pp->pick->lb_token_mdelem_storage, pp->pick->initial_metadata); } else { @@ -1598,12 +1561,10 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, return true; } } - // Set client_stats and user_data. + // Set client_stats. if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { pp->client_stats = lb_calld_->client_stats()->Ref(); } - GPR_ASSERT(pp->pick->user_data == nullptr); - pp->pick->user_data = (void**)&pp->lb_token; // Pick via the RR policy. bool pick_done = rr_policy_->PickLocked(pp->pick, error); if (pick_done) { @@ -1668,10 +1629,11 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) { } grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { - grpc_lb_addresses* addresses; + ServerAddressList tmp_addresses; + ServerAddressList* addresses = &tmp_addresses; bool is_backend_from_grpclb_load_balancer = false; if (serverlist_ != nullptr) { - addresses = ProcessServerlist(serverlist_); + tmp_addresses = ProcessServerlist(serverlist_); is_backend_from_grpclb_load_balancer = true; } else { // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't @@ -1680,14 +1642,14 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { // empty, in which case the new round_robin policy will keep the requested // picks pending. GPR_ASSERT(fallback_backend_addresses_ != nullptr); - addresses = grpc_lb_addresses_copy(fallback_backend_addresses_); + addresses = fallback_backend_addresses_.get(); } GPR_ASSERT(addresses != nullptr); - // Replace the LB addresses in the channel args that we pass down to + // Replace the server address list in the channel args that we pass down to // the subchannel. - static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; + static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST}; grpc_arg args_to_add[3] = { - grpc_lb_addresses_create_channel_arg(addresses), + CreateServerAddressListChannelArg(addresses), // A channel arg indicating if the target is a backend inferred from a // grpclb load balancer. grpc_channel_arg_integer_create( @@ -1704,7 +1666,6 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, num_args_to_add); - grpc_lb_addresses_destroy(addresses); return args; } @@ -1837,19 +1798,18 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory { OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( const LoadBalancingPolicy::Args& args) const override { /* Count the number of gRPC-LB addresses. There must be at least one. */ - const grpc_arg* arg = - grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { - return nullptr; - } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + const ServerAddressList* addresses = + FindServerAddressListChannelArg(args.args); + if (addresses == nullptr) return nullptr; + bool found_balancer = false; + for (size_t i = 0; i < addresses->size(); ++i) { + if ((*addresses)[i].IsBalancer()) { + found_balancer = true; + break; + } } - if (num_grpclb_addrs == 0) return nullptr; - return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args)); + if (!found_balancer) return nullptr; + return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(args)); } const char* name() const override { return "grpclb"; } |