aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-12-07 12:41:51 -0800
committerGravatar Mark D. Roth <roth@google.com>2018-12-07 12:41:51 -0800
commitdedff37b4f569e888836b0cf92a9d6de2ddec326 (patch)
treef4640b9fd8893b735a790aa69e1a6bfbedcabc6f /src/core/ext/filters/client_channel/lb_policy
parent680a3546815d564fca8f2abe0cae441e78c25798 (diff)
Allow encoding arbitrary channel args on a per-address basis.
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc242
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc33
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc19
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc40
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h53
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds.cc247
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc33
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h2
11 files changed, 241 insertions, 434 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index a46579c7f7..a9a5965ed1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -84,6 +84,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
@@ -113,6 +114,8 @@
#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
+#define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token"
+
namespace grpc_core {
TraceFlag grpc_lb_glb_trace(false, "glb");
@@ -121,7 +124,7 @@ namespace {
class GrpcLb : public LoadBalancingPolicy {
public:
- GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
+ explicit GrpcLb(const Args& args);
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
@@ -161,9 +164,6 @@ class GrpcLb : public LoadBalancingPolicy {
// Our on_complete closure and the original one.
grpc_closure on_complete;
grpc_closure* original_on_complete;
- // The LB token associated with the pick. This is set via user_data in
- // the pick.
- grpc_mdelem lb_token;
// Stats for client-side load reporting.
RefCountedPtr<GrpcLbClientStats> client_stats;
// Next pending pick.
@@ -329,7 +329,7 @@ class GrpcLb : public LoadBalancingPolicy {
// 0 means not using fallback.
int lb_fallback_timeout_ms_ = 0;
// The backend addresses from the resolver.
- grpc_lb_addresses* fallback_backend_addresses_ = nullptr;
+ UniquePtr<ServerAddressList> fallback_backend_addresses_;
// Fallback timer.
bool fallback_timer_callback_pending_ = false;
grpc_timer lb_fallback_timer_;
@@ -349,7 +349,7 @@ class GrpcLb : public LoadBalancingPolicy {
// serverlist parsing code
//
-// vtable for LB tokens in grpc_lb_addresses
+// vtable for LB token channel arg.
void* lb_token_copy(void* token) {
return token == nullptr
? nullptr
@@ -361,38 +361,11 @@ void lb_token_destroy(void* token) {
}
}
int lb_token_cmp(void* token1, void* token2) {
- if (token1 > token2) return 1;
- if (token1 < token2) return -1;
- return 0;
+ return GPR_ICMP(token1, token2);
}
-const grpc_lb_user_data_vtable lb_token_vtable = {
+const grpc_arg_pointer_vtable lb_token_arg_vtable = {
lb_token_copy, lb_token_destroy, lb_token_cmp};
-// Returns the backend addresses extracted from the given addresses.
-grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
- // First pass: count the number of backend addresses.
- size_t num_backends = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (!addresses->addresses[i].is_balancer) {
- ++num_backends;
- }
- }
- // Second pass: actually populate the addresses and (empty) LB tokens.
- grpc_lb_addresses* backend_addresses =
- grpc_lb_addresses_create(num_backends, &lb_token_vtable);
- size_t num_copied = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (addresses->addresses[i].is_balancer) continue;
- const grpc_resolved_address* addr = &addresses->addresses[i].address;
- grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
- addr->len, false /* is_balancer */,
- nullptr /* balancer_name */,
- (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
- ++num_copied;
- }
- return backend_addresses;
-}
-
bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
if (server->drop) return false;
const grpc_grpclb_ip_address* ip = &server->ip_address;
@@ -440,30 +413,16 @@ void ParseServer(const grpc_grpclb_server* server,
}
// Returns addresses extracted from \a serverlist.
-grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
- size_t num_valid = 0;
- /* first pass: count how many are valid in order to allocate the necessary
- * memory in a single block */
+ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
+ ServerAddressList addresses;
for (size_t i = 0; i < serverlist->num_servers; ++i) {
- if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid;
- }
- grpc_lb_addresses* lb_addresses =
- grpc_lb_addresses_create(num_valid, &lb_token_vtable);
- /* second pass: actually populate the addresses and LB tokens (aka user data
- * to the outside world) to be read by the RR policy during its creation.
- * Given that the validity tests are very cheap, they are performed again
- * instead of marking the valid ones during the first pass, as this would
- * incurr in an allocation due to the arbitrary number of server */
- size_t addr_idx = 0;
- for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
- const grpc_grpclb_server* server = serverlist->servers[sl_idx];
- if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue;
- GPR_ASSERT(addr_idx < num_valid);
- /* address processing */
+ const grpc_grpclb_server* server = serverlist->servers[i];
+ if (!IsServerValid(serverlist->servers[i], i, false)) continue;
+ // Address processing.
grpc_resolved_address addr;
ParseServer(server, &addr);
- /* lb token processing */
- void* user_data;
+ // LB token processing.
+ void* lb_token;
if (server->has_load_balance_token) {
const size_t lb_token_max_length =
GPR_ARRAY_SIZE(server->load_balance_token);
@@ -471,7 +430,7 @@ grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
strnlen(server->load_balance_token, lb_token_max_length);
grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
server->load_balance_token, lb_token_length);
- user_data =
+ lb_token =
(void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
.payload;
} else {
@@ -481,15 +440,16 @@ grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
"be used instead",
uri);
gpr_free(uri);
- user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
+ lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
}
- grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
- false /* is_balancer */,
- nullptr /* balancer_name */, user_data);
- ++addr_idx;
- }
- GPR_ASSERT(addr_idx == num_valid);
- return lb_addresses;
+ // Add address.
+ grpc_arg arg = grpc_channel_arg_pointer_create(
+ const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
+ &lb_token_arg_vtable);
+ grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
+ addresses.emplace_back(addr, args);
+ }
+ return addresses;
}
//
@@ -829,8 +789,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
} else {
// Dispose of the fallback.
- grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
- grpclb_policy->fallback_backend_addresses_ = nullptr;
+ grpclb_policy->fallback_backend_addresses_.reset();
if (grpclb_policy->fallback_timer_callback_pending_) {
grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
}
@@ -910,31 +869,25 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// helper code for creating balancer channel
//
-grpc_lb_addresses* ExtractBalancerAddresses(
- const grpc_lb_addresses* addresses) {
- size_t num_grpclb_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
- }
- // There must be at least one balancer address, or else the
- // client_channel would not have chosen this LB policy.
- GPR_ASSERT(num_grpclb_addrs > 0);
- grpc_lb_addresses* lb_addresses =
- grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
- size_t lb_addresses_idx = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (!addresses->addresses[i].is_balancer) continue;
- if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) {
- gpr_log(GPR_ERROR,
- "This LB policy doesn't support user data. It will be ignored");
+ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) {
+ ServerAddressList balancer_addresses;
+ for (size_t i = 0; i < addresses.size(); ++i) {
+ if (addresses[i].IsBalancer()) {
+ // Strip out the is_balancer channel arg, since we don't want to
+ // recursively use the grpclb policy in the channel used to talk to
+ // the balancers. Note that we do NOT strip out the balancer_name
+ // channel arg, since we need that to set the authority correctly
+ // to talk to the balancers.
+ static const char* args_to_remove[] = {
+ GRPC_ARG_ADDRESS_IS_BALANCER,
+ };
+ balancer_addresses.emplace_back(
+ addresses[i].address(),
+ grpc_channel_args_copy_and_remove(addresses[i].args(), args_to_remove,
+ GPR_ARRAY_SIZE(args_to_remove)));
}
- grpc_lb_addresses_set_address(
- lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
- addresses->addresses[i].address.len, false /* is balancer */,
- addresses->addresses[i].balancer_name, nullptr /* user data */);
}
- GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
- return lb_addresses;
+ return balancer_addresses;
}
/* Returns the channel args for the LB channel, used to create a bidirectional
@@ -946,10 +899,10 @@ grpc_lb_addresses* ExtractBalancerAddresses(
* above the grpclb policy.
* - \a args: other args inherited from the grpclb policy. */
grpc_channel_args* BuildBalancerChannelArgs(
- const grpc_lb_addresses* addresses,
+ const ServerAddressList& addresses,
FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args) {
- grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses);
+ ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses);
// Channel args to remove.
static const char* args_to_remove[] = {
// LB policy name, since we want to use the default (pick_first) in
@@ -967,7 +920,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
// is_balancer=true. We need the LB channel to return addresses with
// is_balancer=false so that it does not wind up recursively using the
// grpclb LB policy, as per the special case logic in client_channel.c.
- GRPC_ARG_LB_ADDRESSES,
+ GRPC_ARG_SERVER_ADDRESS_LIST,
// The fake resolver response generator, because we are replacing it
// with the one from the grpclb policy, used to propagate updates to
// the LB channel.
@@ -983,10 +936,10 @@ grpc_channel_args* BuildBalancerChannelArgs(
};
// Channel args to add.
const grpc_arg args_to_add[] = {
- // New LB addresses.
+ // New address list.
// Note that we pass these in both when creating the LB channel
// and via the fake resolver. The latter is what actually gets used.
- grpc_lb_addresses_create_channel_arg(lb_addresses),
+ CreateServerAddressListChannelArg(&balancer_addresses),
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
@@ -1004,18 +957,14 @@ grpc_channel_args* BuildBalancerChannelArgs(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
// Make any necessary modifications for security.
- new_args = grpc_lb_policy_grpclb_modify_lb_channel_args(new_args);
- // Clean up.
- grpc_lb_addresses_destroy(lb_addresses);
- return new_args;
+ return grpc_lb_policy_grpclb_modify_lb_channel_args(new_args);
}
//
// ctor and dtor
//
-GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
- const LoadBalancingPolicy::Args& args)
+GrpcLb::GrpcLb(const LoadBalancingPolicy::Args& args)
: LoadBalancingPolicy(args),
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
lb_call_backoff_(
@@ -1072,9 +1021,6 @@ GrpcLb::~GrpcLb() {
if (serverlist_ != nullptr) {
grpc_grpclb_destroy_serverlist(serverlist_);
}
- if (fallback_backend_addresses_ != nullptr) {
- grpc_lb_addresses_destroy(fallback_backend_addresses_);
- }
grpc_subchannel_index_unref();
}
@@ -1122,7 +1068,6 @@ void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
while ((pp = pending_picks_) != nullptr) {
pending_picks_ = pp->next;
pp->pick->on_complete = pp->original_on_complete;
- pp->pick->user_data = nullptr;
grpc_error* error = GRPC_ERROR_NONE;
if (new_policy->PickLocked(pp->pick, &error)) {
// Synchronous return; schedule closure.
@@ -1276,9 +1221,27 @@ void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
notify);
}
+// Returns the backend addresses extracted from the given addresses.
+UniquePtr<ServerAddressList> ExtractBackendAddresses(
+ const ServerAddressList& addresses) {
+ void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
+ grpc_arg arg = grpc_channel_arg_pointer_create(
+ const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
+ &lb_token_arg_vtable);
+ auto backend_addresses = MakeUnique<ServerAddressList>();
+ for (size_t i = 0; i < addresses.size(); ++i) {
+ if (!addresses[i].IsBalancer()) {
+ backend_addresses->emplace_back(
+ addresses[i].address(),
+ grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1));
+ }
+ }
+ return backend_addresses;
+}
+
void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
- const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
- if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
+ const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
+ if (addresses == nullptr) {
// Ignore this update.
gpr_log(
GPR_ERROR,
@@ -1286,13 +1249,8 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
this);
return;
}
- const grpc_lb_addresses* addresses =
- static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
// Update fallback address list.
- if (fallback_backend_addresses_ != nullptr) {
- grpc_lb_addresses_destroy(fallback_backend_addresses_);
- }
- fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
+ fallback_backend_addresses_ = ExtractBackendAddresses(*addresses);
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
@@ -1303,7 +1261,7 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
// Construct args for balancer channel.
grpc_channel_args* lb_channel_args =
- BuildBalancerChannelArgs(addresses, response_generator_.get(), &args);
+ BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args);
// Create balancer channel if needed.
if (lb_channel_ == nullptr) {
char* uri_str;
@@ -1509,12 +1467,17 @@ void DestroyClientStats(void* arg) {
}
void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
- /* if connected_subchannel is nullptr, no pick has been made by the RR
- * policy (e.g., all addresses failed to connect). There won't be any
- * user_data/token available */
+ // If connected_subchannel is nullptr, no pick has been made by the RR
+ // policy (e.g., all addresses failed to connect). There won't be any
+ // LB token available.
if (pp->pick->connected_subchannel != nullptr) {
- if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) {
- AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
+ const grpc_arg* arg =
+ grpc_channel_args_find(pp->pick->connected_subchannel->args(),
+ GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
+ if (arg != nullptr) {
+ grpc_mdelem lb_token = {
+ reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
+ AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(lb_token),
&pp->pick->lb_token_mdelem_storage,
pp->pick->initial_metadata);
} else {
@@ -1598,12 +1561,10 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
return true;
}
}
- // Set client_stats and user_data.
+ // Set client_stats.
if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
pp->client_stats = lb_calld_->client_stats()->Ref();
}
- GPR_ASSERT(pp->pick->user_data == nullptr);
- pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the RR policy.
bool pick_done = rr_policy_->PickLocked(pp->pick, error);
if (pick_done) {
@@ -1668,10 +1629,11 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
}
grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
- grpc_lb_addresses* addresses;
+ ServerAddressList tmp_addresses;
+ ServerAddressList* addresses = &tmp_addresses;
bool is_backend_from_grpclb_load_balancer = false;
if (serverlist_ != nullptr) {
- addresses = ProcessServerlist(serverlist_);
+ tmp_addresses = ProcessServerlist(serverlist_);
is_backend_from_grpclb_load_balancer = true;
} else {
// If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
@@ -1680,14 +1642,14 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
// empty, in which case the new round_robin policy will keep the requested
// picks pending.
GPR_ASSERT(fallback_backend_addresses_ != nullptr);
- addresses = grpc_lb_addresses_copy(fallback_backend_addresses_);
+ addresses = fallback_backend_addresses_.get();
}
GPR_ASSERT(addresses != nullptr);
- // Replace the LB addresses in the channel args that we pass down to
+ // Replace the server address list in the channel args that we pass down to
// the subchannel.
- static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
+ static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST};
grpc_arg args_to_add[3] = {
- grpc_lb_addresses_create_channel_arg(addresses),
+ CreateServerAddressListChannelArg(addresses),
// A channel arg indicating if the target is a backend inferred from a
// grpclb load balancer.
grpc_channel_arg_integer_create(
@@ -1704,7 +1666,6 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
num_args_to_add);
- grpc_lb_addresses_destroy(addresses);
return args;
}
@@ -1837,19 +1798,18 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const override {
/* Count the number of gRPC-LB addresses. There must be at least one. */
- const grpc_arg* arg =
- grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES);
- if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
- return nullptr;
- }
- grpc_lb_addresses* addresses =
- static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
- size_t num_grpclb_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ const ServerAddressList* addresses =
+ FindServerAddressListChannelArg(args.args);
+ if (addresses == nullptr) return nullptr;
+ bool found_balancer = false;
+ for (size_t i = 0; i < addresses->size(); ++i) {
+ if ((*addresses)[i].IsBalancer()) {
+ found_balancer = true;
+ break;
+ }
}
- if (num_grpclb_addrs == 0) return nullptr;
- return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args));
+ if (!found_balancer) return nullptr;
+ return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(args));
}
const char* name() const override { return "grpclb"; }
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
index 825065a9c3..3b2dc370eb 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
@@ -21,7 +21,7 @@
#include <grpc/support/port_platform.h>
-#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include <grpc/impl/codegen/grpc_types.h>
/// Makes any necessary modifications to \a args for use in the grpclb
/// balancer channel.
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
index 441efd5e23..6e8fbdcab7 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
@@ -26,6 +26,7 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -42,22 +43,23 @@ int BalancerNameCmp(const grpc_core::UniquePtr<char>& a,
}
RefCountedPtr<TargetAuthorityTable> CreateTargetAuthorityTable(
- grpc_lb_addresses* addresses) {
+ const ServerAddressList& addresses) {
TargetAuthorityTable::Entry* target_authority_entries =
- static_cast<TargetAuthorityTable::Entry*>(gpr_zalloc(
- sizeof(*target_authority_entries) * addresses->num_addresses));
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ static_cast<TargetAuthorityTable::Entry*>(
+ gpr_zalloc(sizeof(*target_authority_entries) * addresses.size()));
+ for (size_t i = 0; i < addresses.size(); ++i) {
char* addr_str;
- GPR_ASSERT(grpc_sockaddr_to_string(
- &addr_str, &addresses->addresses[i].address, true) > 0);
+ GPR_ASSERT(
+ grpc_sockaddr_to_string(&addr_str, &addresses[i].address(), true) > 0);
target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str);
- target_authority_entries[i].value.reset(
- gpr_strdup(addresses->addresses[i].balancer_name));
gpr_free(addr_str);
+ char* balancer_name = grpc_channel_arg_get_string(grpc_channel_args_find(
+ addresses[i].args(), GRPC_ARG_ADDRESS_BALANCER_NAME));
+ target_authority_entries[i].value.reset(gpr_strdup(balancer_name));
}
RefCountedPtr<TargetAuthorityTable> target_authority_table =
- TargetAuthorityTable::Create(addresses->num_addresses,
- target_authority_entries, BalancerNameCmp);
+ TargetAuthorityTable::Create(addresses.size(), target_authority_entries,
+ BalancerNameCmp);
gpr_free(target_authority_entries);
return target_authority_table;
}
@@ -72,13 +74,12 @@ grpc_channel_args* grpc_lb_policy_grpclb_modify_lb_channel_args(
grpc_arg args_to_add[2];
size_t num_args_to_add = 0;
// Add arg for targets info table.
- const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_LB_ADDRESSES);
- GPR_ASSERT(arg != nullptr);
- GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
- grpc_lb_addresses* addresses =
- static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
+ grpc_core::ServerAddressList* addresses =
+ grpc_core::FindServerAddressListChannelArg(args);
+ GPR_ASSERT(addresses != nullptr);
grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable>
- target_authority_table = grpc_core::CreateTargetAuthorityTable(addresses);
+ target_authority_table =
+ grpc_core::CreateTargetAuthorityTable(*addresses);
args_to_add[num_args_to_add++] =
grpc_core::CreateTargetAuthorityTableChannelArg(
target_authority_table.get());
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
index 9ca7b28d8e..71d371c880 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@@ -25,7 +25,7 @@
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
-#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index d1a05f1255..74c17612a2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -24,6 +24,7 @@
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
@@ -75,11 +76,9 @@ class PickFirst : public LoadBalancingPolicy {
PickFirstSubchannelData(
SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
subchannel_list,
- const grpc_lb_user_data_vtable* user_data_vtable,
- const grpc_lb_address& address, grpc_subchannel* subchannel,
+ const ServerAddress& address, grpc_subchannel* subchannel,
grpc_combiner* combiner)
- : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
- combiner) {}
+ : SubchannelData(subchannel_list, address, subchannel, combiner) {}
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
@@ -95,7 +94,7 @@ class PickFirst : public LoadBalancingPolicy {
PickFirstSubchannelData> {
public:
PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
- const grpc_lb_addresses* addresses,
+ const ServerAddressList& addresses,
grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args)
@@ -337,8 +336,8 @@ void PickFirst::UpdateChildRefsLocked() {
void PickFirst::UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) {
AutoChildRefsUpdater guard(this);
- const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
- if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
+ const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
+ if (addresses == nullptr) {
if (subchannel_list_ == nullptr) {
// If we don't have a current subchannel list, go into TRANSIENT FAILURE.
grpc_connectivity_state_set(
@@ -354,19 +353,17 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args,
}
return;
}
- const grpc_lb_addresses* addresses =
- static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
- addresses->num_addresses);
+ addresses->size());
}
grpc_arg new_arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(&args, &new_arg, 1);
auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
- this, &grpc_lb_pick_first_trace, addresses, combiner(),
+ this, &grpc_lb_pick_first_trace, *addresses, combiner(),
client_channel_factory(), *new_args);
grpc_channel_args_destroy(new_args);
if (subchannel_list->num_subchannels() == 0) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 2a16975131..63089afbd7 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -82,8 +82,6 @@ class RoundRobin : public LoadBalancingPolicy {
// Data for a particular subchannel in a subchannel list.
// This subclass adds the following functionality:
- // - Tracks user_data associated with each address, which will be
- // returned along with picks that select the subchannel.
// - Tracks the previous connectivity state of the subchannel, so that
// we know how many subchannels are in each state.
class RoundRobinSubchannelData
@@ -93,26 +91,9 @@ class RoundRobin : public LoadBalancingPolicy {
RoundRobinSubchannelData(
SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
subchannel_list,
- const grpc_lb_user_data_vtable* user_data_vtable,
- const grpc_lb_address& address, grpc_subchannel* subchannel,
+ const ServerAddress& address, grpc_subchannel* subchannel,
grpc_combiner* combiner)
- : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
- combiner),
- user_data_vtable_(user_data_vtable),
- user_data_(user_data_vtable_ != nullptr
- ? user_data_vtable_->copy(address.user_data)
- : nullptr) {}
-
- void UnrefSubchannelLocked(const char* reason) override {
- SubchannelData::UnrefSubchannelLocked(reason);
- if (user_data_ != nullptr) {
- GPR_ASSERT(user_data_vtable_ != nullptr);
- user_data_vtable_->destroy(user_data_);
- user_data_ = nullptr;
- }
- }
-
- void* user_data() const { return user_data_; }
+ : SubchannelData(subchannel_list, address, subchannel, combiner) {}
grpc_connectivity_state connectivity_state() const {
return last_connectivity_state_;
@@ -125,8 +106,6 @@ class RoundRobin : public LoadBalancingPolicy {
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
- const grpc_lb_user_data_vtable* user_data_vtable_;
- void* user_data_ = nullptr;
grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
};
@@ -137,7 +116,7 @@ class RoundRobin : public LoadBalancingPolicy {
public:
RoundRobinSubchannelList(
RoundRobin* policy, TraceFlag* tracer,
- const grpc_lb_addresses* addresses, grpc_combiner* combiner,
+ const ServerAddressList& addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args)
: SubchannelList(policy, tracer, addresses, combiner,
@@ -354,9 +333,6 @@ bool RoundRobin::DoPickLocked(PickState* pick) {
subchannel_list_->subchannel(next_ready_index);
GPR_ASSERT(sd->connected_subchannel() != nullptr);
pick->connected_subchannel = sd->connected_subchannel()->Ref();
- if (pick->user_data != nullptr) {
- *pick->user_data = sd->user_data();
- }
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
@@ -667,9 +643,9 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
void RoundRobin::UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) {
- const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
AutoChildRefsUpdater guard(this);
- if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
+ const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
+ if (addresses == nullptr) {
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
// Otherwise, keep using the current subchannel list (ignore this update).
@@ -681,11 +657,9 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args,
}
return;
}
- grpc_lb_addresses* addresses =
- static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
- this, addresses->num_addresses);
+ this, addresses->size());
}
// Replace latest_pending_subchannel_list_.
if (latest_pending_subchannel_list_ != nullptr) {
@@ -696,7 +670,7 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args,
}
}
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
- this, &grpc_lb_round_robin_trace, addresses, combiner(),
+ this, &grpc_lb_round_robin_trace, *addresses, combiner(),
client_channel_factory(), args);
// If we haven't started picking yet or the new list is empty,
// immediately promote the new list to the current list.
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index f31401502c..6f31a643c1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -26,6 +26,7 @@
#include <grpc/support/alloc.h>
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
@@ -141,8 +142,7 @@ class SubchannelData {
protected:
SubchannelData(
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
- const grpc_lb_user_data_vtable* user_data_vtable,
- const grpc_lb_address& address, grpc_subchannel* subchannel,
+ const ServerAddress& address, grpc_subchannel* subchannel,
grpc_combiner* combiner);
virtual ~SubchannelData();
@@ -156,9 +156,8 @@ class SubchannelData {
grpc_connectivity_state connectivity_state,
grpc_error* error) GRPC_ABSTRACT;
- // Unrefs the subchannel. May be overridden by subclasses that need
- // to perform extra cleanup when unreffing the subchannel.
- virtual void UnrefSubchannelLocked(const char* reason);
+ // Unrefs the subchannel.
+ void UnrefSubchannelLocked(const char* reason);
private:
// Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
@@ -232,7 +231,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
protected:
SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
- const grpc_lb_addresses* addresses, grpc_combiner* combiner,
+ const ServerAddressList& addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args);
@@ -277,8 +276,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
- const grpc_lb_user_data_vtable* user_data_vtable,
- const grpc_lb_address& address, grpc_subchannel* subchannel,
+ const ServerAddress& address, grpc_subchannel* subchannel,
grpc_combiner* combiner)
: subchannel_list_(subchannel_list),
subchannel_(subchannel),
@@ -488,7 +486,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
LoadBalancingPolicy* policy, TraceFlag* tracer,
- const grpc_lb_addresses* addresses, grpc_combiner* combiner,
+ const ServerAddressList& addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args)
: InternallyRefCounted<SubchannelListType>(tracer),
@@ -498,9 +496,9 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
if (tracer_->enabled()) {
gpr_log(GPR_INFO,
"[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
- tracer_->name(), policy, this, addresses->num_addresses);
+ tracer_->name(), policy, this, addresses.size());
}
- subchannels_.reserve(addresses->num_addresses);
+ subchannels_.reserve(addresses.size());
// We need to remove the LB addresses in order to be able to compare the
// subchannel keys of subchannels from a different batch of addresses.
// We also remove the inhibit-health-checking arg, since we are
@@ -508,19 +506,27 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
inhibit_health_checking_ = grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
- GRPC_ARG_LB_ADDRESSES,
+ GRPC_ARG_SERVER_ADDRESS_LIST,
GRPC_ARG_INHIBIT_HEALTH_CHECKING};
// Create a subchannel for each address.
grpc_subchannel_args sc_args;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- // If there were any balancer, we would have chosen grpclb policy instead.
- GPR_ASSERT(!addresses->addresses[i].is_balancer);
+ for (size_t i = 0; i < addresses.size(); i++) {
+ // If there were any balancer addresses, we would have chosen grpclb
+ // policy, which does not use a SubchannelList.
+ GPR_ASSERT(!addresses[i].IsBalancer());
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- grpc_arg addr_arg =
- grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
+ InlinedVector<grpc_arg, 4> args_to_add;
+ args_to_add.emplace_back(
+ grpc_create_subchannel_address_arg(&addresses[i].address()));
+ if (addresses[i].args() != nullptr) {
+ for (size_t j = 0; j < addresses[i].args()->num_args; ++j) {
+ args_to_add.emplace_back(addresses[i].args()->args[j]);
+ }
+ }
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
- &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1);
- gpr_free(addr_arg.value.string);
+ &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove),
+ args_to_add.data(), args_to_add.size());
+ gpr_free(args_to_add[0].value.string);
sc_args.args = new_args;
grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
client_channel_factory, &sc_args);
@@ -528,8 +534,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
if (subchannel == nullptr) {
// Subchannel could not be created.
if (tracer_->enabled()) {
- char* address_uri =
- grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address());
gpr_log(GPR_INFO,
"[%s %p] could not create subchannel for address uri %s, "
"ignoring",
@@ -539,8 +544,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
continue;
}
if (tracer_->enabled()) {
- char* address_uri =
- grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address());
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR
": Created subchannel %p for address uri %s",
@@ -548,8 +552,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
address_uri);
gpr_free(address_uri);
}
- subchannels_.emplace_back(this, addresses->user_data_vtable,
- addresses->addresses[i], subchannel, combiner);
+ subchannels_.emplace_back(this, addresses[i], subchannel, combiner);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
index faedc0a919..3c25de2386 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -79,6 +79,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
@@ -116,7 +117,7 @@ namespace {
class XdsLb : public LoadBalancingPolicy {
public:
- XdsLb(const grpc_lb_addresses* addresses, const Args& args);
+ explicit XdsLb(const Args& args);
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
@@ -156,9 +157,6 @@ class XdsLb : public LoadBalancingPolicy {
// Our on_complete closure and the original one.
grpc_closure on_complete;
grpc_closure* original_on_complete;
- // The LB token associated with the pick. This is set via user_data in
- // the pick.
- grpc_mdelem lb_token;
// Stats for client-side load reporting.
RefCountedPtr<XdsLbClientStats> client_stats;
// Next pending pick.
@@ -256,7 +254,7 @@ class XdsLb : public LoadBalancingPolicy {
grpc_error* error);
// Pending pick methods.
- static void PendingPickSetMetadataAndContext(PendingPick* pp);
+ static void PendingPickCleanup(PendingPick* pp);
PendingPick* PendingPickCreate(PickState* pick);
void AddPendingPick(PendingPick* pp);
static void OnPendingPickComplete(void* arg, grpc_error* error);
@@ -319,7 +317,7 @@ class XdsLb : public LoadBalancingPolicy {
// 0 means not using fallback.
int lb_fallback_timeout_ms_ = 0;
// The backend addresses from the resolver.
- grpc_lb_addresses* fallback_backend_addresses_ = nullptr;
+ UniquePtr<ServerAddressList> fallback_backend_addresses_;
// Fallback timer.
bool fallback_timer_callback_pending_ = false;
grpc_timer lb_fallback_timer_;
@@ -339,47 +337,15 @@ class XdsLb : public LoadBalancingPolicy {
// serverlist parsing code
//
-// vtable for LB tokens in grpc_lb_addresses
-void* lb_token_copy(void* token) {
- return token == nullptr
- ? nullptr
- : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
-}
-void lb_token_destroy(void* token) {
- if (token != nullptr) {
- GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
- }
-}
-int lb_token_cmp(void* token1, void* token2) {
- if (token1 > token2) return 1;
- if (token1 < token2) return -1;
- return 0;
-}
-const grpc_lb_user_data_vtable lb_token_vtable = {
- lb_token_copy, lb_token_destroy, lb_token_cmp};
-
// Returns the backend addresses extracted from the given addresses.
-grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
- // First pass: count the number of backend addresses.
- size_t num_backends = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (!addresses->addresses[i].is_balancer) {
- ++num_backends;
+UniquePtr<ServerAddressList> ExtractBackendAddresses(
+ const ServerAddressList& addresses) {
+ auto backend_addresses = MakeUnique<ServerAddressList>();
+ for (size_t i = 0; i < addresses.size(); ++i) {
+ if (!addresses[i].IsBalancer()) {
+ backend_addresses->emplace_back(addresses[i]);
}
}
- // Second pass: actually populate the addresses and (empty) LB tokens.
- grpc_lb_addresses* backend_addresses =
- grpc_lb_addresses_create(num_backends, &lb_token_vtable);
- size_t num_copied = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (addresses->addresses[i].is_balancer) continue;
- const grpc_resolved_address* addr = &addresses->addresses[i].address;
- grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
- addr->len, false /* is_balancer */,
- nullptr /* balancer_name */,
- (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
- ++num_copied;
- }
return backend_addresses;
}
@@ -429,56 +395,17 @@ void ParseServer(const xds_grpclb_server* server, grpc_resolved_address* addr) {
}
// Returns addresses extracted from \a serverlist.
-grpc_lb_addresses* ProcessServerlist(const xds_grpclb_serverlist* serverlist) {
- size_t num_valid = 0;
- /* first pass: count how many are valid in order to allocate the necessary
- * memory in a single block */
+UniquePtr<ServerAddressList> ProcessServerlist(
+ const xds_grpclb_serverlist* serverlist) {
+ auto addresses = MakeUnique<ServerAddressList>();
for (size_t i = 0; i < serverlist->num_servers; ++i) {
- if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid;
- }
- grpc_lb_addresses* lb_addresses =
- grpc_lb_addresses_create(num_valid, &lb_token_vtable);
- /* second pass: actually populate the addresses and LB tokens (aka user data
- * to the outside world) to be read by the child policy during its creation.
- * Given that the validity tests are very cheap, they are performed again
- * instead of marking the valid ones during the first pass, as this would
- * incurr in an allocation due to the arbitrary number of server */
- size_t addr_idx = 0;
- for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
- const xds_grpclb_server* server = serverlist->servers[sl_idx];
- if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue;
- GPR_ASSERT(addr_idx < num_valid);
- /* address processing */
+ const xds_grpclb_server* server = serverlist->servers[i];
+ if (!IsServerValid(serverlist->servers[i], i, false)) continue;
grpc_resolved_address addr;
ParseServer(server, &addr);
- /* lb token processing */
- void* user_data;
- if (server->has_load_balance_token) {
- const size_t lb_token_max_length =
- GPR_ARRAY_SIZE(server->load_balance_token);
- const size_t lb_token_length =
- strnlen(server->load_balance_token, lb_token_max_length);
- grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
- server->load_balance_token, lb_token_length);
- user_data =
- (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
- .payload;
- } else {
- char* uri = grpc_sockaddr_to_uri(&addr);
- gpr_log(GPR_INFO,
- "Missing LB token for backend address '%s'. The empty token will "
- "be used instead",
- uri);
- gpr_free(uri);
- user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
- }
- grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
- false /* is_balancer */,
- nullptr /* balancer_name */, user_data);
- ++addr_idx;
+ addresses->emplace_back(addr, nullptr);
}
- GPR_ASSERT(addr_idx == num_valid);
- return lb_addresses;
+ return addresses;
}
//
@@ -789,8 +716,7 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
xds_grpclb_destroy_serverlist(xdslb_policy->serverlist_);
} else {
/* or dispose of the fallback */
- grpc_lb_addresses_destroy(xdslb_policy->fallback_backend_addresses_);
- xdslb_policy->fallback_backend_addresses_ = nullptr;
+ xdslb_policy->fallback_backend_addresses_.reset();
if (xdslb_policy->fallback_timer_callback_pending_) {
grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_);
}
@@ -876,31 +802,15 @@ void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// helper code for creating balancer channel
//
-grpc_lb_addresses* ExtractBalancerAddresses(
- const grpc_lb_addresses* addresses) {
- size_t num_grpclb_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
- }
- // There must be at least one balancer address, or else the
- // client_channel would not have chosen this LB policy.
- GPR_ASSERT(num_grpclb_addrs > 0);
- grpc_lb_addresses* lb_addresses =
- grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
- size_t lb_addresses_idx = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (!addresses->addresses[i].is_balancer) continue;
- if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) {
- gpr_log(GPR_ERROR,
- "This LB policy doesn't support user data. It will be ignored");
+UniquePtr<ServerAddressList> ExtractBalancerAddresses(
+ const ServerAddressList& addresses) {
+ auto balancer_addresses = MakeUnique<ServerAddressList>();
+ for (size_t i = 0; i < addresses.size(); ++i) {
+ if (addresses[i].IsBalancer()) {
+ balancer_addresses->emplace_back(addresses[i]);
}
- grpc_lb_addresses_set_address(
- lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
- addresses->addresses[i].address.len, false /* is balancer */,
- addresses->addresses[i].balancer_name, nullptr /* user data */);
}
- GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
- return lb_addresses;
+ return balancer_addresses;
}
/* Returns the channel args for the LB channel, used to create a bidirectional
@@ -912,10 +822,11 @@ grpc_lb_addresses* ExtractBalancerAddresses(
* above the grpclb policy.
* - \a args: other args inherited from the xds policy. */
grpc_channel_args* BuildBalancerChannelArgs(
- const grpc_lb_addresses* addresses,
+ const ServerAddressList& addresses,
FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args) {
- grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses);
+ UniquePtr<ServerAddressList> balancer_addresses =
+ ExtractBalancerAddresses(addresses);
// Channel args to remove.
static const char* args_to_remove[] = {
// LB policy name, since we want to use the default (pick_first) in
@@ -933,7 +844,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
// is_balancer=true. We need the LB channel to return addresses with
// is_balancer=false so that it does not wind up recursively using the
// xds LB policy, as per the special case logic in client_channel.c.
- GRPC_ARG_LB_ADDRESSES,
+ GRPC_ARG_SERVER_ADDRESS_LIST,
// The fake resolver response generator, because we are replacing it
// with the one from the xds policy, used to propagate updates to
// the LB channel.
@@ -949,10 +860,10 @@ grpc_channel_args* BuildBalancerChannelArgs(
};
// Channel args to add.
const grpc_arg args_to_add[] = {
- // New LB addresses.
+ // New server address list.
// Note that we pass these in both when creating the LB channel
// and via the fake resolver. The latter is what actually gets used.
- grpc_lb_addresses_create_channel_arg(lb_addresses),
+ CreateServerAddressListChannelArg(balancer_addresses.get()),
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
@@ -970,10 +881,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
// Make any necessary modifications for security.
- new_args = grpc_lb_policy_xds_modify_lb_channel_args(new_args);
- // Clean up.
- grpc_lb_addresses_destroy(lb_addresses);
- return new_args;
+ return grpc_lb_policy_xds_modify_lb_channel_args(new_args);
}
//
@@ -981,8 +889,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
//
// TODO(vishalpowar): Use lb_config in args to configure LB policy.
-XdsLb::XdsLb(const grpc_lb_addresses* addresses,
- const LoadBalancingPolicy::Args& args)
+XdsLb::XdsLb(const LoadBalancingPolicy::Args& args)
: LoadBalancingPolicy(args),
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
lb_call_backoff_(
@@ -1038,9 +945,6 @@ XdsLb::~XdsLb() {
if (serverlist_ != nullptr) {
xds_grpclb_destroy_serverlist(serverlist_);
}
- if (fallback_backend_addresses_ != nullptr) {
- grpc_lb_addresses_destroy(fallback_backend_addresses_);
- }
grpc_subchannel_index_unref();
}
@@ -1088,7 +992,6 @@ void XdsLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
while ((pp = pending_picks_) != nullptr) {
pending_picks_ = pp->next;
pp->pick->on_complete = pp->original_on_complete;
- pp->pick->user_data = nullptr;
grpc_error* error = GRPC_ERROR_NONE;
if (new_policy->PickLocked(pp->pick, &error)) {
// Synchronous return; schedule closure.
@@ -1241,21 +1144,16 @@ void XdsLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
}
void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
- const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
- if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
+ const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
+ if (addresses == nullptr) {
// Ignore this update.
gpr_log(GPR_ERROR,
"[xdslb %p] No valid LB addresses channel arg in update, ignoring.",
this);
return;
}
- const grpc_lb_addresses* addresses =
- static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
// Update fallback address list.
- if (fallback_backend_addresses_ != nullptr) {
- grpc_lb_addresses_destroy(fallback_backend_addresses_);
- }
- fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
+ fallback_backend_addresses_ = ExtractBackendAddresses(*addresses);
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
@@ -1266,7 +1164,7 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
// Construct args for balancer channel.
grpc_channel_args* lb_channel_args =
- BuildBalancerChannelArgs(addresses, response_generator_.get(), &args);
+ BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args);
// Create balancer channel if needed.
if (lb_channel_ == nullptr) {
char* uri_str;
@@ -1457,37 +1355,15 @@ void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
// PendingPick
//
-// Adds lb_token of selected subchannel (address) to the call's initial
-// metadata.
-grpc_error* AddLbTokenToInitialMetadata(
- grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage,
- grpc_metadata_batch* initial_metadata) {
- GPR_ASSERT(lb_token_mdelem_storage != nullptr);
- GPR_ASSERT(!GRPC_MDISNULL(lb_token));
- return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
- lb_token);
-}
-
// Destroy function used when embedding client stats in call context.
void DestroyClientStats(void* arg) {
static_cast<XdsLbClientStats*>(arg)->Unref();
}
-void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
- /* if connected_subchannel is nullptr, no pick has been made by the
- * child policy (e.g., all addresses failed to connect). There won't be any
- * user_data/token available */
+void XdsLb::PendingPickCleanup(PendingPick* pp) {
+ // If connected_subchannel is nullptr, no pick has been made by the
+ // child policy (e.g., all addresses failed to connect).
if (pp->pick->connected_subchannel != nullptr) {
- if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) {
- AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
- &pp->pick->lb_token_mdelem_storage,
- pp->pick->initial_metadata);
- } else {
- gpr_log(GPR_ERROR,
- "[xdslb %p] No LB token for connected subchannel pick %p",
- pp->xdslb_policy, pp->pick);
- abort();
- }
// Pass on client stats via context. Passes ownership of the reference.
if (pp->client_stats != nullptr) {
pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
@@ -1505,7 +1381,7 @@ void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
* order to unref the child policy instance upon its invocation */
void XdsLb::OnPendingPickComplete(void* arg, grpc_error* error) {
PendingPick* pp = static_cast<PendingPick*>(arg);
- PendingPickSetMetadataAndContext(pp);
+ PendingPickCleanup(pp);
GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
Delete(pp);
}
@@ -1537,16 +1413,14 @@ void XdsLb::AddPendingPick(PendingPick* pp) {
// completion callback even if the pick is available immediately.
bool XdsLb::PickFromChildPolicyLocked(bool force_async, PendingPick* pp,
grpc_error** error) {
- // Set client_stats and user_data.
+ // Set client_stats.
if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
pp->client_stats = lb_calld_->client_stats()->Ref();
}
- GPR_ASSERT(pp->pick->user_data == nullptr);
- pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the child policy.
bool pick_done = child_policy_->PickLocked(pp->pick, error);
if (pick_done) {
- PendingPickSetMetadataAndContext(pp);
+ PendingPickCleanup(pp);
if (force_async) {
GRPC_CLOSURE_SCHED(pp->original_on_complete, *error);
*error = GRPC_ERROR_NONE;
@@ -1608,20 +1482,19 @@ void XdsLb::CreateChildPolicyLocked(const Args& args) {
}
grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
- grpc_lb_addresses* addresses;
bool is_backend_from_grpclb_load_balancer = false;
// This should never be invoked if we do not have serverlist_, as fallback
// mode is disabled for xDS plugin.
GPR_ASSERT(serverlist_ != nullptr);
GPR_ASSERT(serverlist_->num_servers > 0);
- addresses = ProcessServerlist(serverlist_);
- is_backend_from_grpclb_load_balancer = true;
+ UniquePtr<ServerAddressList> addresses = ProcessServerlist(serverlist_);
GPR_ASSERT(addresses != nullptr);
- // Replace the LB addresses in the channel args that we pass down to
+ is_backend_from_grpclb_load_balancer = true;
+ // Replace the server address list in the channel args that we pass down to
// the subchannel.
- static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
+ static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST};
const grpc_arg args_to_add[] = {
- grpc_lb_addresses_create_channel_arg(addresses),
+ CreateServerAddressListChannelArg(addresses.get()),
// A channel arg indicating if the target is a backend inferred from a
// grpclb load balancer.
grpc_channel_arg_integer_create(
@@ -1631,7 +1504,6 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
- grpc_lb_addresses_destroy(addresses);
return args;
}
@@ -1765,19 +1637,18 @@ class XdsFactory : public LoadBalancingPolicyFactory {
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const override {
/* Count the number of gRPC-LB addresses. There must be at least one. */
- const grpc_arg* arg =
- grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES);
- if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
- return nullptr;
- }
- grpc_lb_addresses* addresses =
- static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
- size_t num_grpclb_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ const ServerAddressList* addresses =
+ FindServerAddressListChannelArg(args.args);
+ if (addresses == nullptr) return nullptr;
+ bool found_balancer_address = false;
+ for (size_t i = 0; i < addresses->size(); ++i) {
+ if ((*addresses)[i].IsBalancer()) {
+ found_balancer_address = true;
+ break;
+ }
}
- if (num_grpclb_addrs == 0) return nullptr;
- return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(addresses, args));
+ if (!found_balancer_address) return nullptr;
+ return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(args));
}
const char* name() const override { return "xds_experimental"; }
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h
index 32c4acc8a3..f713b7f563 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h
@@ -21,7 +21,7 @@
#include <grpc/support/port_platform.h>
-#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include <grpc/impl/codegen/grpc_types.h>
/// Makes any necessary modifications to \a args for use in the xds
/// balancer channel.
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc
index 5ab72efce4..9a11f8e39f 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc
@@ -25,6 +25,7 @@
#include <string.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -41,22 +42,23 @@ int BalancerNameCmp(const grpc_core::UniquePtr<char>& a,
}
RefCountedPtr<TargetAuthorityTable> CreateTargetAuthorityTable(
- grpc_lb_addresses* addresses) {
+ const ServerAddressList& addresses) {
TargetAuthorityTable::Entry* target_authority_entries =
- static_cast<TargetAuthorityTable::Entry*>(gpr_zalloc(
- sizeof(*target_authority_entries) * addresses->num_addresses));
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ static_cast<TargetAuthorityTable::Entry*>(
+ gpr_zalloc(sizeof(*target_authority_entries) * addresses.size()));
+ for (size_t i = 0; i < addresses.size(); ++i) {
char* addr_str;
- GPR_ASSERT(grpc_sockaddr_to_string(
- &addr_str, &addresses->addresses[i].address, true) > 0);
+ GPR_ASSERT(
+ grpc_sockaddr_to_string(&addr_str, &addresses[i].address(), true) > 0);
target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str);
- target_authority_entries[i].value.reset(
- gpr_strdup(addresses->addresses[i].balancer_name));
gpr_free(addr_str);
+ char* balancer_name = grpc_channel_arg_get_string(grpc_channel_args_find(
+ addresses[i].args(), GRPC_ARG_ADDRESS_BALANCER_NAME));
+ target_authority_entries[i].value.reset(gpr_strdup(balancer_name));
}
RefCountedPtr<TargetAuthorityTable> target_authority_table =
- TargetAuthorityTable::Create(addresses->num_addresses,
- target_authority_entries, BalancerNameCmp);
+ TargetAuthorityTable::Create(addresses.size(), target_authority_entries,
+ BalancerNameCmp);
gpr_free(target_authority_entries);
return target_authority_table;
}
@@ -71,13 +73,12 @@ grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args(
grpc_arg args_to_add[2];
size_t num_args_to_add = 0;
// Add arg for targets info table.
- const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_LB_ADDRESSES);
- GPR_ASSERT(arg != nullptr);
- GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
- grpc_lb_addresses* addresses =
- static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
+ grpc_core::ServerAddressList* addresses =
+ grpc_core::FindServerAddressListChannelArg(args);
+ GPR_ASSERT(addresses != nullptr);
grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable>
- target_authority_table = grpc_core::CreateTargetAuthorityTable(addresses);
+ target_authority_table =
+ grpc_core::CreateTargetAuthorityTable(*addresses);
args_to_add[num_args_to_add++] =
grpc_core::CreateTargetAuthorityTableChannelArg(
target_authority_table.get());
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h
index 9d08defa7e..6704995641 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h
@@ -25,7 +25,7 @@
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h"
-#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#define XDS_SERVICE_NAME_MAX_LENGTH 128