aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar hcaseyal <hcaseyal@gmail.com>2018-12-07 16:13:37 -0800
committerGravatar GitHub <noreply@github.com>2018-12-07 16:13:37 -0800
commit27e2ba31bffa1c27fe12ad7d55e70b450eb777a8 (patch)
tree55b94f815d4213e0a82ebeb3766758d6793afb0b /src/core/ext
parentbc91ebf74c5e7f1cca4de5b6b906344194ba6c4a (diff)
Revert "Allow encoding arbitrary channel args on a per-address basis."
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc16
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h9
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc242
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc33
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc19
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc40
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h53
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds.cc247
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc33
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.cc163
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.h86
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc12
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc1
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc149
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h13
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc9
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc3
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc26
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc14
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc3
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h3
-rw-r--r--src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc34
-rw-r--r--src/core/ext/filters/client_channel/resolver_result_parsing.cc20
-rw-r--r--src/core/ext/filters/client_channel/server_address.cc103
-rw-r--r--src/core/ext/filters/client_channel/server_address.h108
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc6
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h13
31 files changed, 849 insertions, 617 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 70aac47231..ebc412b468 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -38,7 +38,6 @@
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/lib/backoff/backoff.h"
@@ -63,7 +62,6 @@
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/status_metadata.h"
-using grpc_core::ServerAddressList;
using grpc_core::internal::ClientChannelMethodParams;
using grpc_core::internal::ClientChannelMethodParamsTable;
using grpc_core::internal::ProcessedResolverResult;
@@ -385,10 +383,16 @@ static void create_new_lb_policy_locked(
static void maybe_add_trace_message_for_address_changes_locked(
channel_data* chand, TraceStringVector* trace_strings) {
- const ServerAddressList* addresses =
- grpc_core::FindServerAddressListChannelArg(chand->resolver_result);
- const bool resolution_contains_addresses =
- addresses != nullptr && addresses->size() > 0;
+ int resolution_contains_addresses = false;
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
+ if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
+ if (addresses->num_addresses > 0) {
+ resolution_contains_addresses = true;
+ }
+ }
if (!resolution_contains_addresses &&
chand->previous_resolution_contained_addresses) {
trace_strings->push_back(gpr_strdup("Address list became empty"));
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 6b76fe5d5d..7034da6249 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -55,7 +55,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
grpc_client_channel_factory* client_channel_factory = nullptr;
/// Channel args from the resolver.
/// Note that the LB policy gets the set of addresses from the
- /// GRPC_ARG_SERVER_ADDRESS_LIST channel arg.
+ /// GRPC_ARG_LB_ADDRESSES channel arg.
grpc_channel_args* args = nullptr;
/// Load balancing config from the resolver.
grpc_json* lb_config = nullptr;
@@ -80,6 +80,11 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Will be populated with context to pass to the subchannel call, if
/// needed.
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT] = {};
+ /// Upon success, \a *user_data will be set to whatever opaque information
+ /// may need to be propagated from the LB policy, or nullptr if not needed.
+ // TODO(roth): As part of revamping our metadata APIs, try to find a
+ // way to clean this up and C++-ify it.
+ void** user_data = nullptr;
/// Next pointer. For internal use by LB policy.
PickState* next = nullptr;
};
@@ -90,7 +95,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Updates the policy with a new set of \a args and a new \a lb_config from
/// the resolver. Note that the LB policy gets the set of addresses from the
- /// GRPC_ARG_SERVER_ADDRESS_LIST channel arg.
+ /// GRPC_ARG_LB_ADDRESSES channel arg.
virtual void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) GRPC_ABSTRACT;
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index a9a5965ed1..a46579c7f7 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -84,7 +84,6 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
@@ -114,8 +113,6 @@
#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
-#define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token"
-
namespace grpc_core {
TraceFlag grpc_lb_glb_trace(false, "glb");
@@ -124,7 +121,7 @@ namespace {
class GrpcLb : public LoadBalancingPolicy {
public:
- explicit GrpcLb(const Args& args);
+ GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
@@ -164,6 +161,9 @@ class GrpcLb : public LoadBalancingPolicy {
// Our on_complete closure and the original one.
grpc_closure on_complete;
grpc_closure* original_on_complete;
+ // The LB token associated with the pick. This is set via user_data in
+ // the pick.
+ grpc_mdelem lb_token;
// Stats for client-side load reporting.
RefCountedPtr<GrpcLbClientStats> client_stats;
// Next pending pick.
@@ -329,7 +329,7 @@ class GrpcLb : public LoadBalancingPolicy {
// 0 means not using fallback.
int lb_fallback_timeout_ms_ = 0;
// The backend addresses from the resolver.
- UniquePtr<ServerAddressList> fallback_backend_addresses_;
+ grpc_lb_addresses* fallback_backend_addresses_ = nullptr;
// Fallback timer.
bool fallback_timer_callback_pending_ = false;
grpc_timer lb_fallback_timer_;
@@ -349,7 +349,7 @@ class GrpcLb : public LoadBalancingPolicy {
// serverlist parsing code
//
-// vtable for LB token channel arg.
+// vtable for LB tokens in grpc_lb_addresses
void* lb_token_copy(void* token) {
return token == nullptr
? nullptr
@@ -361,11 +361,38 @@ void lb_token_destroy(void* token) {
}
}
int lb_token_cmp(void* token1, void* token2) {
- return GPR_ICMP(token1, token2);
+ if (token1 > token2) return 1;
+ if (token1 < token2) return -1;
+ return 0;
}
-const grpc_arg_pointer_vtable lb_token_arg_vtable = {
+const grpc_lb_user_data_vtable lb_token_vtable = {
lb_token_copy, lb_token_destroy, lb_token_cmp};
+// Returns the backend addresses extracted from the given addresses.
+grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
+ // First pass: count the number of backend addresses.
+ size_t num_backends = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) {
+ ++num_backends;
+ }
+ }
+ // Second pass: actually populate the addresses and (empty) LB tokens.
+ grpc_lb_addresses* backend_addresses =
+ grpc_lb_addresses_create(num_backends, &lb_token_vtable);
+ size_t num_copied = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) continue;
+ const grpc_resolved_address* addr = &addresses->addresses[i].address;
+ grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
+ addr->len, false /* is_balancer */,
+ nullptr /* balancer_name */,
+ (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
+ ++num_copied;
+ }
+ return backend_addresses;
+}
+
bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
if (server->drop) return false;
const grpc_grpclb_ip_address* ip = &server->ip_address;
@@ -413,16 +440,30 @@ void ParseServer(const grpc_grpclb_server* server,
}
// Returns addresses extracted from \a serverlist.
-ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
- ServerAddressList addresses;
+grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
+ size_t num_valid = 0;
+ /* first pass: count how many are valid in order to allocate the necessary
+ * memory in a single block */
for (size_t i = 0; i < serverlist->num_servers; ++i) {
- const grpc_grpclb_server* server = serverlist->servers[i];
- if (!IsServerValid(serverlist->servers[i], i, false)) continue;
- // Address processing.
+ if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid;
+ }
+ grpc_lb_addresses* lb_addresses =
+ grpc_lb_addresses_create(num_valid, &lb_token_vtable);
+ /* second pass: actually populate the addresses and LB tokens (aka user data
+ * to the outside world) to be read by the RR policy during its creation.
+ * Given that the validity tests are very cheap, they are performed again
+ * instead of marking the valid ones during the first pass, as this would
+ * incurr in an allocation due to the arbitrary number of server */
+ size_t addr_idx = 0;
+ for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
+ const grpc_grpclb_server* server = serverlist->servers[sl_idx];
+ if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue;
+ GPR_ASSERT(addr_idx < num_valid);
+ /* address processing */
grpc_resolved_address addr;
ParseServer(server, &addr);
- // LB token processing.
- void* lb_token;
+ /* lb token processing */
+ void* user_data;
if (server->has_load_balance_token) {
const size_t lb_token_max_length =
GPR_ARRAY_SIZE(server->load_balance_token);
@@ -430,7 +471,7 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
strnlen(server->load_balance_token, lb_token_max_length);
grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
server->load_balance_token, lb_token_length);
- lb_token =
+ user_data =
(void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
.payload;
} else {
@@ -440,16 +481,15 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
"be used instead",
uri);
gpr_free(uri);
- lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
+ user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
}
- // Add address.
- grpc_arg arg = grpc_channel_arg_pointer_create(
- const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
- &lb_token_arg_vtable);
- grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
- addresses.emplace_back(addr, args);
- }
- return addresses;
+ grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
+ false /* is_balancer */,
+ nullptr /* balancer_name */, user_data);
+ ++addr_idx;
+ }
+ GPR_ASSERT(addr_idx == num_valid);
+ return lb_addresses;
}
//
@@ -789,7 +829,8 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
} else {
// Dispose of the fallback.
- grpclb_policy->fallback_backend_addresses_.reset();
+ grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
+ grpclb_policy->fallback_backend_addresses_ = nullptr;
if (grpclb_policy->fallback_timer_callback_pending_) {
grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
}
@@ -869,25 +910,31 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// helper code for creating balancer channel
//
-ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) {
- ServerAddressList balancer_addresses;
- for (size_t i = 0; i < addresses.size(); ++i) {
- if (addresses[i].IsBalancer()) {
- // Strip out the is_balancer channel arg, since we don't want to
- // recursively use the grpclb policy in the channel used to talk to
- // the balancers. Note that we do NOT strip out the balancer_name
- // channel arg, since we need that to set the authority correctly
- // to talk to the balancers.
- static const char* args_to_remove[] = {
- GRPC_ARG_ADDRESS_IS_BALANCER,
- };
- balancer_addresses.emplace_back(
- addresses[i].address(),
- grpc_channel_args_copy_and_remove(addresses[i].args(), args_to_remove,
- GPR_ARRAY_SIZE(args_to_remove)));
+grpc_lb_addresses* ExtractBalancerAddresses(
+ const grpc_lb_addresses* addresses) {
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ }
+ // There must be at least one balancer address, or else the
+ // client_channel would not have chosen this LB policy.
+ GPR_ASSERT(num_grpclb_addrs > 0);
+ grpc_lb_addresses* lb_addresses =
+ grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
+ size_t lb_addresses_idx = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) continue;
+ if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
}
+ grpc_lb_addresses_set_address(
+ lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
+ addresses->addresses[i].address.len, false /* is balancer */,
+ addresses->addresses[i].balancer_name, nullptr /* user data */);
}
- return balancer_addresses;
+ GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
+ return lb_addresses;
}
/* Returns the channel args for the LB channel, used to create a bidirectional
@@ -899,10 +946,10 @@ ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) {
* above the grpclb policy.
* - \a args: other args inherited from the grpclb policy. */
grpc_channel_args* BuildBalancerChannelArgs(
- const ServerAddressList& addresses,
+ const grpc_lb_addresses* addresses,
FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args) {
- ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses);
+ grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses);
// Channel args to remove.
static const char* args_to_remove[] = {
// LB policy name, since we want to use the default (pick_first) in
@@ -920,7 +967,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
// is_balancer=true. We need the LB channel to return addresses with
// is_balancer=false so that it does not wind up recursively using the
// grpclb LB policy, as per the special case logic in client_channel.c.
- GRPC_ARG_SERVER_ADDRESS_LIST,
+ GRPC_ARG_LB_ADDRESSES,
// The fake resolver response generator, because we are replacing it
// with the one from the grpclb policy, used to propagate updates to
// the LB channel.
@@ -936,10 +983,10 @@ grpc_channel_args* BuildBalancerChannelArgs(
};
// Channel args to add.
const grpc_arg args_to_add[] = {
- // New address list.
+ // New LB addresses.
// Note that we pass these in both when creating the LB channel
// and via the fake resolver. The latter is what actually gets used.
- CreateServerAddressListChannelArg(&balancer_addresses),
+ grpc_lb_addresses_create_channel_arg(lb_addresses),
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
@@ -957,14 +1004,18 @@ grpc_channel_args* BuildBalancerChannelArgs(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
// Make any necessary modifications for security.
- return grpc_lb_policy_grpclb_modify_lb_channel_args(new_args);
+ new_args = grpc_lb_policy_grpclb_modify_lb_channel_args(new_args);
+ // Clean up.
+ grpc_lb_addresses_destroy(lb_addresses);
+ return new_args;
}
//
// ctor and dtor
//
-GrpcLb::GrpcLb(const LoadBalancingPolicy::Args& args)
+GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
+ const LoadBalancingPolicy::Args& args)
: LoadBalancingPolicy(args),
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
lb_call_backoff_(
@@ -1021,6 +1072,9 @@ GrpcLb::~GrpcLb() {
if (serverlist_ != nullptr) {
grpc_grpclb_destroy_serverlist(serverlist_);
}
+ if (fallback_backend_addresses_ != nullptr) {
+ grpc_lb_addresses_destroy(fallback_backend_addresses_);
+ }
grpc_subchannel_index_unref();
}
@@ -1068,6 +1122,7 @@ void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
while ((pp = pending_picks_) != nullptr) {
pending_picks_ = pp->next;
pp->pick->on_complete = pp->original_on_complete;
+ pp->pick->user_data = nullptr;
grpc_error* error = GRPC_ERROR_NONE;
if (new_policy->PickLocked(pp->pick, &error)) {
// Synchronous return; schedule closure.
@@ -1221,27 +1276,9 @@ void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
notify);
}
-// Returns the backend addresses extracted from the given addresses.
-UniquePtr<ServerAddressList> ExtractBackendAddresses(
- const ServerAddressList& addresses) {
- void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
- grpc_arg arg = grpc_channel_arg_pointer_create(
- const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
- &lb_token_arg_vtable);
- auto backend_addresses = MakeUnique<ServerAddressList>();
- for (size_t i = 0; i < addresses.size(); ++i) {
- if (!addresses[i].IsBalancer()) {
- backend_addresses->emplace_back(
- addresses[i].address(),
- grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1));
- }
- }
- return backend_addresses;
-}
-
void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
- const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
- if (addresses == nullptr) {
+ const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
+ if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
// Ignore this update.
gpr_log(
GPR_ERROR,
@@ -1249,8 +1286,13 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
this);
return;
}
+ const grpc_lb_addresses* addresses =
+ static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
// Update fallback address list.
- fallback_backend_addresses_ = ExtractBackendAddresses(*addresses);
+ if (fallback_backend_addresses_ != nullptr) {
+ grpc_lb_addresses_destroy(fallback_backend_addresses_);
+ }
+ fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
@@ -1261,7 +1303,7 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
// Construct args for balancer channel.
grpc_channel_args* lb_channel_args =
- BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args);
+ BuildBalancerChannelArgs(addresses, response_generator_.get(), &args);
// Create balancer channel if needed.
if (lb_channel_ == nullptr) {
char* uri_str;
@@ -1467,17 +1509,12 @@ void DestroyClientStats(void* arg) {
}
void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
- // If connected_subchannel is nullptr, no pick has been made by the RR
- // policy (e.g., all addresses failed to connect). There won't be any
- // LB token available.
+ /* if connected_subchannel is nullptr, no pick has been made by the RR
+ * policy (e.g., all addresses failed to connect). There won't be any
+ * user_data/token available */
if (pp->pick->connected_subchannel != nullptr) {
- const grpc_arg* arg =
- grpc_channel_args_find(pp->pick->connected_subchannel->args(),
- GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
- if (arg != nullptr) {
- grpc_mdelem lb_token = {
- reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
- AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(lb_token),
+ if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) {
+ AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
&pp->pick->lb_token_mdelem_storage,
pp->pick->initial_metadata);
} else {
@@ -1561,10 +1598,12 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
return true;
}
}
- // Set client_stats.
+ // Set client_stats and user_data.
if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
pp->client_stats = lb_calld_->client_stats()->Ref();
}
+ GPR_ASSERT(pp->pick->user_data == nullptr);
+ pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the RR policy.
bool pick_done = rr_policy_->PickLocked(pp->pick, error);
if (pick_done) {
@@ -1629,11 +1668,10 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
}
grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
- ServerAddressList tmp_addresses;
- ServerAddressList* addresses = &tmp_addresses;
+ grpc_lb_addresses* addresses;
bool is_backend_from_grpclb_load_balancer = false;
if (serverlist_ != nullptr) {
- tmp_addresses = ProcessServerlist(serverlist_);
+ addresses = ProcessServerlist(serverlist_);
is_backend_from_grpclb_load_balancer = true;
} else {
// If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
@@ -1642,14 +1680,14 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
// empty, in which case the new round_robin policy will keep the requested
// picks pending.
GPR_ASSERT(fallback_backend_addresses_ != nullptr);
- addresses = fallback_backend_addresses_.get();
+ addresses = grpc_lb_addresses_copy(fallback_backend_addresses_);
}
GPR_ASSERT(addresses != nullptr);
- // Replace the server address list in the channel args that we pass down to
+ // Replace the LB addresses in the channel args that we pass down to
// the subchannel.
- static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST};
+ static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
grpc_arg args_to_add[3] = {
- CreateServerAddressListChannelArg(addresses),
+ grpc_lb_addresses_create_channel_arg(addresses),
// A channel arg indicating if the target is a backend inferred from a
// grpclb load balancer.
grpc_channel_arg_integer_create(
@@ -1666,6 +1704,7 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
num_args_to_add);
+ grpc_lb_addresses_destroy(addresses);
return args;
}
@@ -1798,18 +1837,19 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const override {
/* Count the number of gRPC-LB addresses. There must be at least one. */
- const ServerAddressList* addresses =
- FindServerAddressListChannelArg(args.args);
- if (addresses == nullptr) return nullptr;
- bool found_balancer = false;
- for (size_t i = 0; i < addresses->size(); ++i) {
- if ((*addresses)[i].IsBalancer()) {
- found_balancer = true;
- break;
- }
+ const grpc_arg* arg =
+ grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
+ return nullptr;
+ }
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
}
- if (!found_balancer) return nullptr;
- return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(args));
+ if (num_grpclb_addrs == 0) return nullptr;
+ return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args));
}
const char* name() const override { return "grpclb"; }
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
index 3b2dc370eb..825065a9c3 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
@@ -21,7 +21,7 @@
#include <grpc/support/port_platform.h>
-#include <grpc/impl/codegen/grpc_types.h>
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
/// Makes any necessary modifications to \a args for use in the grpclb
/// balancer channel.
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
index 6e8fbdcab7..441efd5e23 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
@@ -26,7 +26,6 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -43,23 +42,22 @@ int BalancerNameCmp(const grpc_core::UniquePtr<char>& a,
}
RefCountedPtr<TargetAuthorityTable> CreateTargetAuthorityTable(
- const ServerAddressList& addresses) {
+ grpc_lb_addresses* addresses) {
TargetAuthorityTable::Entry* target_authority_entries =
- static_cast<TargetAuthorityTable::Entry*>(
- gpr_zalloc(sizeof(*target_authority_entries) * addresses.size()));
- for (size_t i = 0; i < addresses.size(); ++i) {
+ static_cast<TargetAuthorityTable::Entry*>(gpr_zalloc(
+ sizeof(*target_authority_entries) * addresses->num_addresses));
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
char* addr_str;
- GPR_ASSERT(
- grpc_sockaddr_to_string(&addr_str, &addresses[i].address(), true) > 0);
+ GPR_ASSERT(grpc_sockaddr_to_string(
+ &addr_str, &addresses->addresses[i].address, true) > 0);
target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str);
+ target_authority_entries[i].value.reset(
+ gpr_strdup(addresses->addresses[i].balancer_name));
gpr_free(addr_str);
- char* balancer_name = grpc_channel_arg_get_string(grpc_channel_args_find(
- addresses[i].args(), GRPC_ARG_ADDRESS_BALANCER_NAME));
- target_authority_entries[i].value.reset(gpr_strdup(balancer_name));
}
RefCountedPtr<TargetAuthorityTable> target_authority_table =
- TargetAuthorityTable::Create(addresses.size(), target_authority_entries,
- BalancerNameCmp);
+ TargetAuthorityTable::Create(addresses->num_addresses,
+ target_authority_entries, BalancerNameCmp);
gpr_free(target_authority_entries);
return target_authority_table;
}
@@ -74,12 +72,13 @@ grpc_channel_args* grpc_lb_policy_grpclb_modify_lb_channel_args(
grpc_arg args_to_add[2];
size_t num_args_to_add = 0;
// Add arg for targets info table.
- grpc_core::ServerAddressList* addresses =
- grpc_core::FindServerAddressListChannelArg(args);
- GPR_ASSERT(addresses != nullptr);
+ const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_LB_ADDRESSES);
+ GPR_ASSERT(arg != nullptr);
+ GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable>
- target_authority_table =
- grpc_core::CreateTargetAuthorityTable(*addresses);
+ target_authority_table = grpc_core::CreateTargetAuthorityTable(addresses);
args_to_add[num_args_to_add++] =
grpc_core::CreateTargetAuthorityTableChannelArg(
target_authority_table.get());
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
index 71d371c880..9ca7b28d8e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@@ -25,7 +25,7 @@
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 74c17612a2..d1a05f1255 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -24,7 +24,6 @@
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
@@ -76,9 +75,11 @@ class PickFirst : public LoadBalancingPolicy {
PickFirstSubchannelData(
SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
subchannel_list,
- const ServerAddress& address, grpc_subchannel* subchannel,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
grpc_combiner* combiner)
- : SubchannelData(subchannel_list, address, subchannel, combiner) {}
+ : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
+ combiner) {}
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
@@ -94,7 +95,7 @@ class PickFirst : public LoadBalancingPolicy {
PickFirstSubchannelData> {
public:
PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
- const ServerAddressList& addresses,
+ const grpc_lb_addresses* addresses,
grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args)
@@ -336,8 +337,8 @@ void PickFirst::UpdateChildRefsLocked() {
void PickFirst::UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) {
AutoChildRefsUpdater guard(this);
- const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
- if (addresses == nullptr) {
+ const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
if (subchannel_list_ == nullptr) {
// If we don't have a current subchannel list, go into TRANSIENT FAILURE.
grpc_connectivity_state_set(
@@ -353,17 +354,19 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args,
}
return;
}
+ const grpc_lb_addresses* addresses =
+ static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
- addresses->size());
+ addresses->num_addresses);
}
grpc_arg new_arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(&args, &new_arg, 1);
auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
- this, &grpc_lb_pick_first_trace, *addresses, combiner(),
+ this, &grpc_lb_pick_first_trace, addresses, combiner(),
client_channel_factory(), *new_args);
grpc_channel_args_destroy(new_args);
if (subchannel_list->num_subchannels() == 0) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 63089afbd7..2a16975131 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -82,6 +82,8 @@ class RoundRobin : public LoadBalancingPolicy {
// Data for a particular subchannel in a subchannel list.
// This subclass adds the following functionality:
+ // - Tracks user_data associated with each address, which will be
+ // returned along with picks that select the subchannel.
// - Tracks the previous connectivity state of the subchannel, so that
// we know how many subchannels are in each state.
class RoundRobinSubchannelData
@@ -91,9 +93,26 @@ class RoundRobin : public LoadBalancingPolicy {
RoundRobinSubchannelData(
SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
subchannel_list,
- const ServerAddress& address, grpc_subchannel* subchannel,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
grpc_combiner* combiner)
- : SubchannelData(subchannel_list, address, subchannel, combiner) {}
+ : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
+ combiner),
+ user_data_vtable_(user_data_vtable),
+ user_data_(user_data_vtable_ != nullptr
+ ? user_data_vtable_->copy(address.user_data)
+ : nullptr) {}
+
+ void UnrefSubchannelLocked(const char* reason) override {
+ SubchannelData::UnrefSubchannelLocked(reason);
+ if (user_data_ != nullptr) {
+ GPR_ASSERT(user_data_vtable_ != nullptr);
+ user_data_vtable_->destroy(user_data_);
+ user_data_ = nullptr;
+ }
+ }
+
+ void* user_data() const { return user_data_; }
grpc_connectivity_state connectivity_state() const {
return last_connectivity_state_;
@@ -106,6 +125,8 @@ class RoundRobin : public LoadBalancingPolicy {
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state, grpc_error* error) override;
+ const grpc_lb_user_data_vtable* user_data_vtable_;
+ void* user_data_ = nullptr;
grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
};
@@ -116,7 +137,7 @@ class RoundRobin : public LoadBalancingPolicy {
public:
RoundRobinSubchannelList(
RoundRobin* policy, TraceFlag* tracer,
- const ServerAddressList& addresses, grpc_combiner* combiner,
+ const grpc_lb_addresses* addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args)
: SubchannelList(policy, tracer, addresses, combiner,
@@ -333,6 +354,9 @@ bool RoundRobin::DoPickLocked(PickState* pick) {
subchannel_list_->subchannel(next_ready_index);
GPR_ASSERT(sd->connected_subchannel() != nullptr);
pick->connected_subchannel = sd->connected_subchannel()->Ref();
+ if (pick->user_data != nullptr) {
+ *pick->user_data = sd->user_data();
+ }
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
@@ -643,9 +667,9 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
void RoundRobin::UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) {
+ const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
AutoChildRefsUpdater guard(this);
- const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
- if (addresses == nullptr) {
+ if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
// Otherwise, keep using the current subchannel list (ignore this update).
@@ -657,9 +681,11 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args,
}
return;
}
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
- this, addresses->size());
+ this, addresses->num_addresses);
}
// Replace latest_pending_subchannel_list_.
if (latest_pending_subchannel_list_ != nullptr) {
@@ -670,7 +696,7 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args,
}
}
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
- this, &grpc_lb_round_robin_trace, *addresses, combiner(),
+ this, &grpc_lb_round_robin_trace, addresses, combiner(),
client_channel_factory(), args);
// If we haven't started picking yet or the new list is empty,
// immediately promote the new list to the current list.
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 6f31a643c1..f31401502c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -26,7 +26,6 @@
#include <grpc/support/alloc.h>
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
@@ -142,7 +141,8 @@ class SubchannelData {
protected:
SubchannelData(
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
- const ServerAddress& address, grpc_subchannel* subchannel,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
grpc_combiner* combiner);
virtual ~SubchannelData();
@@ -156,8 +156,9 @@ class SubchannelData {
grpc_connectivity_state connectivity_state,
grpc_error* error) GRPC_ABSTRACT;
- // Unrefs the subchannel.
- void UnrefSubchannelLocked(const char* reason);
+ // Unrefs the subchannel. May be overridden by subclasses that need
+ // to perform extra cleanup when unreffing the subchannel.
+ virtual void UnrefSubchannelLocked(const char* reason);
private:
// Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
@@ -231,7 +232,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
protected:
SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
- const ServerAddressList& addresses, grpc_combiner* combiner,
+ const grpc_lb_addresses* addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args);
@@ -276,7 +277,8 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
- const ServerAddress& address, grpc_subchannel* subchannel,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
grpc_combiner* combiner)
: subchannel_list_(subchannel_list),
subchannel_(subchannel),
@@ -486,7 +488,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
LoadBalancingPolicy* policy, TraceFlag* tracer,
- const ServerAddressList& addresses, grpc_combiner* combiner,
+ const grpc_lb_addresses* addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args)
: InternallyRefCounted<SubchannelListType>(tracer),
@@ -496,9 +498,9 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
if (tracer_->enabled()) {
gpr_log(GPR_INFO,
"[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
- tracer_->name(), policy, this, addresses.size());
+ tracer_->name(), policy, this, addresses->num_addresses);
}
- subchannels_.reserve(addresses.size());
+ subchannels_.reserve(addresses->num_addresses);
// We need to remove the LB addresses in order to be able to compare the
// subchannel keys of subchannels from a different batch of addresses.
// We also remove the inhibit-health-checking arg, since we are
@@ -506,27 +508,19 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
inhibit_health_checking_ = grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
- GRPC_ARG_SERVER_ADDRESS_LIST,
+ GRPC_ARG_LB_ADDRESSES,
GRPC_ARG_INHIBIT_HEALTH_CHECKING};
// Create a subchannel for each address.
grpc_subchannel_args sc_args;
- for (size_t i = 0; i < addresses.size(); i++) {
- // If there were any balancer addresses, we would have chosen grpclb
- // policy, which does not use a SubchannelList.
- GPR_ASSERT(!addresses[i].IsBalancer());
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ // If there were any balancer, we would have chosen grpclb policy instead.
+ GPR_ASSERT(!addresses->addresses[i].is_balancer);
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- InlinedVector<grpc_arg, 4> args_to_add;
- args_to_add.emplace_back(
- grpc_create_subchannel_address_arg(&addresses[i].address()));
- if (addresses[i].args() != nullptr) {
- for (size_t j = 0; j < addresses[i].args()->num_args; ++j) {
- args_to_add.emplace_back(addresses[i].args()->args[j]);
- }
- }
+ grpc_arg addr_arg =
+ grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
- &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove),
- args_to_add.data(), args_to_add.size());
- gpr_free(args_to_add[0].value.string);
+ &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1);
+ gpr_free(addr_arg.value.string);
sc_args.args = new_args;
grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
client_channel_factory, &sc_args);
@@ -534,7 +528,8 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
if (subchannel == nullptr) {
// Subchannel could not be created.
if (tracer_->enabled()) {
- char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address());
+ char* address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
gpr_log(GPR_INFO,
"[%s %p] could not create subchannel for address uri %s, "
"ignoring",
@@ -544,7 +539,8 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
continue;
}
if (tracer_->enabled()) {
- char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address());
+ char* address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR
": Created subchannel %p for address uri %s",
@@ -552,7 +548,8 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
address_uri);
gpr_free(address_uri);
}
- subchannels_.emplace_back(this, addresses[i], subchannel, combiner);
+ subchannels_.emplace_back(this, addresses->user_data_vtable,
+ addresses->addresses[i], subchannel, combiner);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
index 3c25de2386..faedc0a919 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -79,7 +79,6 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
@@ -117,7 +116,7 @@ namespace {
class XdsLb : public LoadBalancingPolicy {
public:
- explicit XdsLb(const Args& args);
+ XdsLb(const grpc_lb_addresses* addresses, const Args& args);
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
@@ -157,6 +156,9 @@ class XdsLb : public LoadBalancingPolicy {
// Our on_complete closure and the original one.
grpc_closure on_complete;
grpc_closure* original_on_complete;
+ // The LB token associated with the pick. This is set via user_data in
+ // the pick.
+ grpc_mdelem lb_token;
// Stats for client-side load reporting.
RefCountedPtr<XdsLbClientStats> client_stats;
// Next pending pick.
@@ -254,7 +256,7 @@ class XdsLb : public LoadBalancingPolicy {
grpc_error* error);
// Pending pick methods.
- static void PendingPickCleanup(PendingPick* pp);
+ static void PendingPickSetMetadataAndContext(PendingPick* pp);
PendingPick* PendingPickCreate(PickState* pick);
void AddPendingPick(PendingPick* pp);
static void OnPendingPickComplete(void* arg, grpc_error* error);
@@ -317,7 +319,7 @@ class XdsLb : public LoadBalancingPolicy {
// 0 means not using fallback.
int lb_fallback_timeout_ms_ = 0;
// The backend addresses from the resolver.
- UniquePtr<ServerAddressList> fallback_backend_addresses_;
+ grpc_lb_addresses* fallback_backend_addresses_ = nullptr;
// Fallback timer.
bool fallback_timer_callback_pending_ = false;
grpc_timer lb_fallback_timer_;
@@ -337,15 +339,47 @@ class XdsLb : public LoadBalancingPolicy {
// serverlist parsing code
//
+// vtable for LB tokens in grpc_lb_addresses
+void* lb_token_copy(void* token) {
+ return token == nullptr
+ ? nullptr
+ : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
+}
+void lb_token_destroy(void* token) {
+ if (token != nullptr) {
+ GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
+ }
+}
+int lb_token_cmp(void* token1, void* token2) {
+ if (token1 > token2) return 1;
+ if (token1 < token2) return -1;
+ return 0;
+}
+const grpc_lb_user_data_vtable lb_token_vtable = {
+ lb_token_copy, lb_token_destroy, lb_token_cmp};
+
// Returns the backend addresses extracted from the given addresses.
-UniquePtr<ServerAddressList> ExtractBackendAddresses(
- const ServerAddressList& addresses) {
- auto backend_addresses = MakeUnique<ServerAddressList>();
- for (size_t i = 0; i < addresses.size(); ++i) {
- if (!addresses[i].IsBalancer()) {
- backend_addresses->emplace_back(addresses[i]);
+grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
+ // First pass: count the number of backend addresses.
+ size_t num_backends = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) {
+ ++num_backends;
}
}
+ // Second pass: actually populate the addresses and (empty) LB tokens.
+ grpc_lb_addresses* backend_addresses =
+ grpc_lb_addresses_create(num_backends, &lb_token_vtable);
+ size_t num_copied = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) continue;
+ const grpc_resolved_address* addr = &addresses->addresses[i].address;
+ grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
+ addr->len, false /* is_balancer */,
+ nullptr /* balancer_name */,
+ (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
+ ++num_copied;
+ }
return backend_addresses;
}
@@ -395,17 +429,56 @@ void ParseServer(const xds_grpclb_server* server, grpc_resolved_address* addr) {
}
// Returns addresses extracted from \a serverlist.
-UniquePtr<ServerAddressList> ProcessServerlist(
- const xds_grpclb_serverlist* serverlist) {
- auto addresses = MakeUnique<ServerAddressList>();
+grpc_lb_addresses* ProcessServerlist(const xds_grpclb_serverlist* serverlist) {
+ size_t num_valid = 0;
+ /* first pass: count how many are valid in order to allocate the necessary
+ * memory in a single block */
for (size_t i = 0; i < serverlist->num_servers; ++i) {
- const xds_grpclb_server* server = serverlist->servers[i];
- if (!IsServerValid(serverlist->servers[i], i, false)) continue;
+ if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid;
+ }
+ grpc_lb_addresses* lb_addresses =
+ grpc_lb_addresses_create(num_valid, &lb_token_vtable);
+ /* second pass: actually populate the addresses and LB tokens (aka user data
+ * to the outside world) to be read by the child policy during its creation.
+ * Given that the validity tests are very cheap, they are performed again
+ * instead of marking the valid ones during the first pass, as this would
+ * incurr in an allocation due to the arbitrary number of server */
+ size_t addr_idx = 0;
+ for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
+ const xds_grpclb_server* server = serverlist->servers[sl_idx];
+ if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue;
+ GPR_ASSERT(addr_idx < num_valid);
+ /* address processing */
grpc_resolved_address addr;
ParseServer(server, &addr);
- addresses->emplace_back(addr, nullptr);
+ /* lb token processing */
+ void* user_data;
+ if (server->has_load_balance_token) {
+ const size_t lb_token_max_length =
+ GPR_ARRAY_SIZE(server->load_balance_token);
+ const size_t lb_token_length =
+ strnlen(server->load_balance_token, lb_token_max_length);
+ grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
+ server->load_balance_token, lb_token_length);
+ user_data =
+ (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
+ .payload;
+ } else {
+ char* uri = grpc_sockaddr_to_uri(&addr);
+ gpr_log(GPR_INFO,
+ "Missing LB token for backend address '%s'. The empty token will "
+ "be used instead",
+ uri);
+ gpr_free(uri);
+ user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
+ }
+ grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
+ false /* is_balancer */,
+ nullptr /* balancer_name */, user_data);
+ ++addr_idx;
}
- return addresses;
+ GPR_ASSERT(addr_idx == num_valid);
+ return lb_addresses;
}
//
@@ -716,7 +789,8 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
xds_grpclb_destroy_serverlist(xdslb_policy->serverlist_);
} else {
/* or dispose of the fallback */
- xdslb_policy->fallback_backend_addresses_.reset();
+ grpc_lb_addresses_destroy(xdslb_policy->fallback_backend_addresses_);
+ xdslb_policy->fallback_backend_addresses_ = nullptr;
if (xdslb_policy->fallback_timer_callback_pending_) {
grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_);
}
@@ -802,15 +876,31 @@ void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// helper code for creating balancer channel
//
-UniquePtr<ServerAddressList> ExtractBalancerAddresses(
- const ServerAddressList& addresses) {
- auto balancer_addresses = MakeUnique<ServerAddressList>();
- for (size_t i = 0; i < addresses.size(); ++i) {
- if (addresses[i].IsBalancer()) {
- balancer_addresses->emplace_back(addresses[i]);
+grpc_lb_addresses* ExtractBalancerAddresses(
+ const grpc_lb_addresses* addresses) {
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ }
+ // There must be at least one balancer address, or else the
+ // client_channel would not have chosen this LB policy.
+ GPR_ASSERT(num_grpclb_addrs > 0);
+ grpc_lb_addresses* lb_addresses =
+ grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
+ size_t lb_addresses_idx = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) continue;
+ if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
}
+ grpc_lb_addresses_set_address(
+ lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
+ addresses->addresses[i].address.len, false /* is balancer */,
+ addresses->addresses[i].balancer_name, nullptr /* user data */);
}
- return balancer_addresses;
+ GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
+ return lb_addresses;
}
/* Returns the channel args for the LB channel, used to create a bidirectional
@@ -822,11 +912,10 @@ UniquePtr<ServerAddressList> ExtractBalancerAddresses(
* above the grpclb policy.
* - \a args: other args inherited from the xds policy. */
grpc_channel_args* BuildBalancerChannelArgs(
- const ServerAddressList& addresses,
+ const grpc_lb_addresses* addresses,
FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args) {
- UniquePtr<ServerAddressList> balancer_addresses =
- ExtractBalancerAddresses(addresses);
+ grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses);
// Channel args to remove.
static const char* args_to_remove[] = {
// LB policy name, since we want to use the default (pick_first) in
@@ -844,7 +933,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
// is_balancer=true. We need the LB channel to return addresses with
// is_balancer=false so that it does not wind up recursively using the
// xds LB policy, as per the special case logic in client_channel.c.
- GRPC_ARG_SERVER_ADDRESS_LIST,
+ GRPC_ARG_LB_ADDRESSES,
// The fake resolver response generator, because we are replacing it
// with the one from the xds policy, used to propagate updates to
// the LB channel.
@@ -860,10 +949,10 @@ grpc_channel_args* BuildBalancerChannelArgs(
};
// Channel args to add.
const grpc_arg args_to_add[] = {
- // New server address list.
+ // New LB addresses.
// Note that we pass these in both when creating the LB channel
// and via the fake resolver. The latter is what actually gets used.
- CreateServerAddressListChannelArg(balancer_addresses.get()),
+ grpc_lb_addresses_create_channel_arg(lb_addresses),
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
@@ -881,7 +970,10 @@ grpc_channel_args* BuildBalancerChannelArgs(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
// Make any necessary modifications for security.
- return grpc_lb_policy_xds_modify_lb_channel_args(new_args);
+ new_args = grpc_lb_policy_xds_modify_lb_channel_args(new_args);
+ // Clean up.
+ grpc_lb_addresses_destroy(lb_addresses);
+ return new_args;
}
//
@@ -889,7 +981,8 @@ grpc_channel_args* BuildBalancerChannelArgs(
//
// TODO(vishalpowar): Use lb_config in args to configure LB policy.
-XdsLb::XdsLb(const LoadBalancingPolicy::Args& args)
+XdsLb::XdsLb(const grpc_lb_addresses* addresses,
+ const LoadBalancingPolicy::Args& args)
: LoadBalancingPolicy(args),
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
lb_call_backoff_(
@@ -945,6 +1038,9 @@ XdsLb::~XdsLb() {
if (serverlist_ != nullptr) {
xds_grpclb_destroy_serverlist(serverlist_);
}
+ if (fallback_backend_addresses_ != nullptr) {
+ grpc_lb_addresses_destroy(fallback_backend_addresses_);
+ }
grpc_subchannel_index_unref();
}
@@ -992,6 +1088,7 @@ void XdsLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
while ((pp = pending_picks_) != nullptr) {
pending_picks_ = pp->next;
pp->pick->on_complete = pp->original_on_complete;
+ pp->pick->user_data = nullptr;
grpc_error* error = GRPC_ERROR_NONE;
if (new_policy->PickLocked(pp->pick, &error)) {
// Synchronous return; schedule closure.
@@ -1144,16 +1241,21 @@ void XdsLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
}
void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
- const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
- if (addresses == nullptr) {
+ const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
+ if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
// Ignore this update.
gpr_log(GPR_ERROR,
"[xdslb %p] No valid LB addresses channel arg in update, ignoring.",
this);
return;
}
+ const grpc_lb_addresses* addresses =
+ static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
// Update fallback address list.
- fallback_backend_addresses_ = ExtractBackendAddresses(*addresses);
+ if (fallback_backend_addresses_ != nullptr) {
+ grpc_lb_addresses_destroy(fallback_backend_addresses_);
+ }
+ fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
@@ -1164,7 +1266,7 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
// Construct args for balancer channel.
grpc_channel_args* lb_channel_args =
- BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args);
+ BuildBalancerChannelArgs(addresses, response_generator_.get(), &args);
// Create balancer channel if needed.
if (lb_channel_ == nullptr) {
char* uri_str;
@@ -1355,15 +1457,37 @@ void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
// PendingPick
//
+// Adds lb_token of selected subchannel (address) to the call's initial
+// metadata.
+grpc_error* AddLbTokenToInitialMetadata(
+ grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage,
+ grpc_metadata_batch* initial_metadata) {
+ GPR_ASSERT(lb_token_mdelem_storage != nullptr);
+ GPR_ASSERT(!GRPC_MDISNULL(lb_token));
+ return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
+ lb_token);
+}
+
// Destroy function used when embedding client stats in call context.
void DestroyClientStats(void* arg) {
static_cast<XdsLbClientStats*>(arg)->Unref();
}
-void XdsLb::PendingPickCleanup(PendingPick* pp) {
- // If connected_subchannel is nullptr, no pick has been made by the
- // child policy (e.g., all addresses failed to connect).
+void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
+ /* if connected_subchannel is nullptr, no pick has been made by the
+ * child policy (e.g., all addresses failed to connect). There won't be any
+ * user_data/token available */
if (pp->pick->connected_subchannel != nullptr) {
+ if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) {
+ AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
+ &pp->pick->lb_token_mdelem_storage,
+ pp->pick->initial_metadata);
+ } else {
+ gpr_log(GPR_ERROR,
+ "[xdslb %p] No LB token for connected subchannel pick %p",
+ pp->xdslb_policy, pp->pick);
+ abort();
+ }
// Pass on client stats via context. Passes ownership of the reference.
if (pp->client_stats != nullptr) {
pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
@@ -1381,7 +1505,7 @@ void XdsLb::PendingPickCleanup(PendingPick* pp) {
* order to unref the child policy instance upon its invocation */
void XdsLb::OnPendingPickComplete(void* arg, grpc_error* error) {
PendingPick* pp = static_cast<PendingPick*>(arg);
- PendingPickCleanup(pp);
+ PendingPickSetMetadataAndContext(pp);
GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
Delete(pp);
}
@@ -1413,14 +1537,16 @@ void XdsLb::AddPendingPick(PendingPick* pp) {
// completion callback even if the pick is available immediately.
bool XdsLb::PickFromChildPolicyLocked(bool force_async, PendingPick* pp,
grpc_error** error) {
- // Set client_stats.
+ // Set client_stats and user_data.
if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
pp->client_stats = lb_calld_->client_stats()->Ref();
}
+ GPR_ASSERT(pp->pick->user_data == nullptr);
+ pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the child policy.
bool pick_done = child_policy_->PickLocked(pp->pick, error);
if (pick_done) {
- PendingPickCleanup(pp);
+ PendingPickSetMetadataAndContext(pp);
if (force_async) {
GRPC_CLOSURE_SCHED(pp->original_on_complete, *error);
*error = GRPC_ERROR_NONE;
@@ -1482,19 +1608,20 @@ void XdsLb::CreateChildPolicyLocked(const Args& args) {
}
grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
+ grpc_lb_addresses* addresses;
bool is_backend_from_grpclb_load_balancer = false;
// This should never be invoked if we do not have serverlist_, as fallback
// mode is disabled for xDS plugin.
GPR_ASSERT(serverlist_ != nullptr);
GPR_ASSERT(serverlist_->num_servers > 0);
- UniquePtr<ServerAddressList> addresses = ProcessServerlist(serverlist_);
- GPR_ASSERT(addresses != nullptr);
+ addresses = ProcessServerlist(serverlist_);
is_backend_from_grpclb_load_balancer = true;
- // Replace the server address list in the channel args that we pass down to
+ GPR_ASSERT(addresses != nullptr);
+ // Replace the LB addresses in the channel args that we pass down to
// the subchannel.
- static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST};
+ static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
const grpc_arg args_to_add[] = {
- CreateServerAddressListChannelArg(addresses.get()),
+ grpc_lb_addresses_create_channel_arg(addresses),
// A channel arg indicating if the target is a backend inferred from a
// grpclb load balancer.
grpc_channel_arg_integer_create(
@@ -1504,6 +1631,7 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
+ grpc_lb_addresses_destroy(addresses);
return args;
}
@@ -1637,18 +1765,19 @@ class XdsFactory : public LoadBalancingPolicyFactory {
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const override {
/* Count the number of gRPC-LB addresses. There must be at least one. */
- const ServerAddressList* addresses =
- FindServerAddressListChannelArg(args.args);
- if (addresses == nullptr) return nullptr;
- bool found_balancer_address = false;
- for (size_t i = 0; i < addresses->size(); ++i) {
- if ((*addresses)[i].IsBalancer()) {
- found_balancer_address = true;
- break;
- }
+ const grpc_arg* arg =
+ grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
+ return nullptr;
+ }
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
}
- if (!found_balancer_address) return nullptr;
- return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(args));
+ if (num_grpclb_addrs == 0) return nullptr;
+ return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(addresses, args));
}
const char* name() const override { return "xds_experimental"; }
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h
index f713b7f563..32c4acc8a3 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h
@@ -21,7 +21,7 @@
#include <grpc/support/port_platform.h>
-#include <grpc/impl/codegen/grpc_types.h>
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
/// Makes any necessary modifications to \a args for use in the xds
/// balancer channel.
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc
index 9a11f8e39f..5ab72efce4 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc
@@ -25,7 +25,6 @@
#include <string.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -42,23 +41,22 @@ int BalancerNameCmp(const grpc_core::UniquePtr<char>& a,
}
RefCountedPtr<TargetAuthorityTable> CreateTargetAuthorityTable(
- const ServerAddressList& addresses) {
+ grpc_lb_addresses* addresses) {
TargetAuthorityTable::Entry* target_authority_entries =
- static_cast<TargetAuthorityTable::Entry*>(
- gpr_zalloc(sizeof(*target_authority_entries) * addresses.size()));
- for (size_t i = 0; i < addresses.size(); ++i) {
+ static_cast<TargetAuthorityTable::Entry*>(gpr_zalloc(
+ sizeof(*target_authority_entries) * addresses->num_addresses));
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
char* addr_str;
- GPR_ASSERT(
- grpc_sockaddr_to_string(&addr_str, &addresses[i].address(), true) > 0);
+ GPR_ASSERT(grpc_sockaddr_to_string(
+ &addr_str, &addresses->addresses[i].address, true) > 0);
target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str);
+ target_authority_entries[i].value.reset(
+ gpr_strdup(addresses->addresses[i].balancer_name));
gpr_free(addr_str);
- char* balancer_name = grpc_channel_arg_get_string(grpc_channel_args_find(
- addresses[i].args(), GRPC_ARG_ADDRESS_BALANCER_NAME));
- target_authority_entries[i].value.reset(gpr_strdup(balancer_name));
}
RefCountedPtr<TargetAuthorityTable> target_authority_table =
- TargetAuthorityTable::Create(addresses.size(), target_authority_entries,
- BalancerNameCmp);
+ TargetAuthorityTable::Create(addresses->num_addresses,
+ target_authority_entries, BalancerNameCmp);
gpr_free(target_authority_entries);
return target_authority_table;
}
@@ -73,12 +71,13 @@ grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args(
grpc_arg args_to_add[2];
size_t num_args_to_add = 0;
// Add arg for targets info table.
- grpc_core::ServerAddressList* addresses =
- grpc_core::FindServerAddressListChannelArg(args);
- GPR_ASSERT(addresses != nullptr);
+ const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_LB_ADDRESSES);
+ GPR_ASSERT(arg != nullptr);
+ GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable>
- target_authority_table =
- grpc_core::CreateTargetAuthorityTable(*addresses);
+ target_authority_table = grpc_core::CreateTargetAuthorityTable(addresses);
args_to_add[num_args_to_add++] =
grpc_core::CreateTargetAuthorityTableChannelArg(
target_authority_table.get());
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h
index 6704995641..9d08defa7e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h
@@ -25,7 +25,7 @@
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#define XDS_SERVICE_NAME_MAX_LENGTH 128
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.cc b/src/core/ext/filters/client_channel/lb_policy_factory.cc
new file mode 100644
index 0000000000..5c6363d295
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.cc
@@ -0,0 +1,163 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/channel/channel_args.h"
+
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
+
+grpc_lb_addresses* grpc_lb_addresses_create(
+ size_t num_addresses, const grpc_lb_user_data_vtable* user_data_vtable) {
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(gpr_zalloc(sizeof(grpc_lb_addresses)));
+ addresses->num_addresses = num_addresses;
+ addresses->user_data_vtable = user_data_vtable;
+ const size_t addresses_size = sizeof(grpc_lb_address) * num_addresses;
+ addresses->addresses =
+ static_cast<grpc_lb_address*>(gpr_zalloc(addresses_size));
+ return addresses;
+}
+
+grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) {
+ grpc_lb_addresses* new_addresses = grpc_lb_addresses_create(
+ addresses->num_addresses, addresses->user_data_vtable);
+ memcpy(new_addresses->addresses, addresses->addresses,
+ sizeof(grpc_lb_address) * addresses->num_addresses);
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (new_addresses->addresses[i].balancer_name != nullptr) {
+ new_addresses->addresses[i].balancer_name =
+ gpr_strdup(new_addresses->addresses[i].balancer_name);
+ }
+ if (new_addresses->addresses[i].user_data != nullptr) {
+ new_addresses->addresses[i].user_data = addresses->user_data_vtable->copy(
+ new_addresses->addresses[i].user_data);
+ }
+ }
+ return new_addresses;
+}
+
+void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
+ const void* address, size_t address_len,
+ bool is_balancer, const char* balancer_name,
+ void* user_data) {
+ GPR_ASSERT(index < addresses->num_addresses);
+ if (user_data != nullptr) GPR_ASSERT(addresses->user_data_vtable != nullptr);
+ grpc_lb_address* target = &addresses->addresses[index];
+ memcpy(target->address.addr, address, address_len);
+ target->address.len = static_cast<socklen_t>(address_len);
+ target->is_balancer = is_balancer;
+ target->balancer_name = gpr_strdup(balancer_name);
+ target->user_data = user_data;
+}
+
+bool grpc_lb_addresses_set_address_from_uri(grpc_lb_addresses* addresses,
+ size_t index, const grpc_uri* uri,
+ bool is_balancer,
+ const char* balancer_name,
+ void* user_data) {
+ grpc_resolved_address address;
+ if (!grpc_parse_uri(uri, &address)) return false;
+ grpc_lb_addresses_set_address(addresses, index, address.addr, address.len,
+ is_balancer, balancer_name, user_data);
+ return true;
+}
+
+int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1,
+ const grpc_lb_addresses* addresses2) {
+ if (addresses1->num_addresses > addresses2->num_addresses) return 1;
+ if (addresses1->num_addresses < addresses2->num_addresses) return -1;
+ if (addresses1->user_data_vtable > addresses2->user_data_vtable) return 1;
+ if (addresses1->user_data_vtable < addresses2->user_data_vtable) return -1;
+ for (size_t i = 0; i < addresses1->num_addresses; ++i) {
+ const grpc_lb_address* target1 = &addresses1->addresses[i];
+ const grpc_lb_address* target2 = &addresses2->addresses[i];
+ if (target1->address.len > target2->address.len) return 1;
+ if (target1->address.len < target2->address.len) return -1;
+ int retval = memcmp(target1->address.addr, target2->address.addr,
+ target1->address.len);
+ if (retval != 0) return retval;
+ if (target1->is_balancer > target2->is_balancer) return 1;
+ if (target1->is_balancer < target2->is_balancer) return -1;
+ const char* balancer_name1 =
+ target1->balancer_name != nullptr ? target1->balancer_name : "";
+ const char* balancer_name2 =
+ target2->balancer_name != nullptr ? target2->balancer_name : "";
+ retval = strcmp(balancer_name1, balancer_name2);
+ if (retval != 0) return retval;
+ if (addresses1->user_data_vtable != nullptr) {
+ retval = addresses1->user_data_vtable->cmp(target1->user_data,
+ target2->user_data);
+ if (retval != 0) return retval;
+ }
+ }
+ return 0;
+}
+
+void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses) {
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ gpr_free(addresses->addresses[i].balancer_name);
+ if (addresses->addresses[i].user_data != nullptr) {
+ addresses->user_data_vtable->destroy(addresses->addresses[i].user_data);
+ }
+ }
+ gpr_free(addresses->addresses);
+ gpr_free(addresses);
+}
+
+static void* lb_addresses_copy(void* addresses) {
+ return grpc_lb_addresses_copy(static_cast<grpc_lb_addresses*>(addresses));
+}
+static void lb_addresses_destroy(void* addresses) {
+ grpc_lb_addresses_destroy(static_cast<grpc_lb_addresses*>(addresses));
+}
+static int lb_addresses_cmp(void* addresses1, void* addresses2) {
+ return grpc_lb_addresses_cmp(static_cast<grpc_lb_addresses*>(addresses1),
+ static_cast<grpc_lb_addresses*>(addresses2));
+}
+static const grpc_arg_pointer_vtable lb_addresses_arg_vtable = {
+ lb_addresses_copy, lb_addresses_destroy, lb_addresses_cmp};
+
+grpc_arg grpc_lb_addresses_create_channel_arg(
+ const grpc_lb_addresses* addresses) {
+ return grpc_channel_arg_pointer_create(
+ (char*)GRPC_ARG_LB_ADDRESSES, (void*)addresses, &lb_addresses_arg_vtable);
+}
+
+grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
+ const grpc_channel_args* channel_args) {
+ const grpc_arg* lb_addresses_arg =
+ grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES);
+ if (lb_addresses_arg == nullptr || lb_addresses_arg->type != GRPC_ARG_POINTER)
+ return nullptr;
+ return static_cast<grpc_lb_addresses*>(lb_addresses_arg->value.pointer.p);
+}
+
+bool grpc_lb_addresses_contains_balancer_address(
+ const grpc_lb_addresses& addresses) {
+ for (size_t i = 0; i < addresses.num_addresses; ++i) {
+ if (addresses.addresses[i].is_balancer) return true;
+ }
+ return false;
+}
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h
index a165ebafab..a59deadb26 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.h
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.h
@@ -21,9 +21,91 @@
#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/resolve_address.h"
+
+#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
-#include "src/core/lib/gprpp/abstract.h"
-#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/uri/uri_parser.h"
+
+//
+// representation of an LB address
+//
+
+// Channel arg key for grpc_lb_addresses.
+#define GRPC_ARG_LB_ADDRESSES "grpc.lb_addresses"
+
+/** A resolved address alongside any LB related information associated with it.
+ * \a user_data, if not NULL, contains opaque data meant to be consumed by the
+ * gRPC LB policy. Note that no all LB policies support \a user_data as input.
+ * Those who don't will simply ignore it and will correspondingly return NULL in
+ * their namesake pick() output argument. */
+// TODO(roth): Once we figure out a better way of handling user_data in
+// LB policies, convert these structs to C++ classes.
+typedef struct grpc_lb_address {
+ grpc_resolved_address address;
+ bool is_balancer;
+ char* balancer_name; /* For secure naming. */
+ void* user_data;
+} grpc_lb_address;
+
+typedef struct grpc_lb_user_data_vtable {
+ void* (*copy)(void*);
+ void (*destroy)(void*);
+ int (*cmp)(void*, void*);
+} grpc_lb_user_data_vtable;
+
+typedef struct grpc_lb_addresses {
+ size_t num_addresses;
+ grpc_lb_address* addresses;
+ const grpc_lb_user_data_vtable* user_data_vtable;
+} grpc_lb_addresses;
+
+/** Returns a grpc_addresses struct with enough space for
+ \a num_addresses addresses. The \a user_data_vtable argument may be
+ NULL if no user data will be added. */
+grpc_lb_addresses* grpc_lb_addresses_create(
+ size_t num_addresses, const grpc_lb_user_data_vtable* user_data_vtable);
+
+/** Creates a copy of \a addresses. */
+grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses);
+
+/** Sets the value of the address at index \a index of \a addresses.
+ * \a address is a socket address of length \a address_len. */
+void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
+ const void* address, size_t address_len,
+ bool is_balancer, const char* balancer_name,
+ void* user_data);
+
+/** Sets the value of the address at index \a index of \a addresses from \a uri.
+ * Returns true upon success, false otherwise. */
+bool grpc_lb_addresses_set_address_from_uri(grpc_lb_addresses* addresses,
+ size_t index, const grpc_uri* uri,
+ bool is_balancer,
+ const char* balancer_name,
+ void* user_data);
+
+/** Compares \a addresses1 and \a addresses2. */
+int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1,
+ const grpc_lb_addresses* addresses2);
+
+/** Destroys \a addresses. */
+void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses);
+
+/** Returns a channel arg containing \a addresses. */
+grpc_arg grpc_lb_addresses_create_channel_arg(
+ const grpc_lb_addresses* addresses);
+
+/** Returns the \a grpc_lb_addresses instance in \a channel_args or NULL */
+grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
+ const grpc_channel_args* channel_args);
+
+// Returns true if addresses contains at least one balancer address.
+bool grpc_lb_addresses_contains_balancer_address(
+ const grpc_lb_addresses& addresses);
+
+//
+// LB policy factory
+//
namespace grpc_core {
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index c8425ae336..4ebc2c8161 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -33,7 +33,6 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
@@ -118,7 +117,7 @@ class AresDnsResolver : public Resolver {
/// retry backoff state
BackOff backoff_;
/// currently resolving addresses
- UniquePtr<ServerAddressList> addresses_;
+ grpc_lb_addresses* lb_addresses_ = nullptr;
/// currently resolving service config
char* service_config_json_ = nullptr;
// has shutdown been initiated
@@ -315,13 +314,13 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
r->resolving_ = false;
gpr_free(r->pending_request_);
r->pending_request_ = nullptr;
- if (r->addresses_ != nullptr) {
+ if (r->lb_addresses_ != nullptr) {
static const char* args_to_remove[1];
size_t num_args_to_remove = 0;
grpc_arg args_to_add[2];
size_t num_args_to_add = 0;
args_to_add[num_args_to_add++] =
- CreateServerAddressListChannelArg(r->addresses_.get());
+ grpc_lb_addresses_create_channel_arg(r->lb_addresses_);
char* service_config_string = nullptr;
if (r->service_config_json_ != nullptr) {
service_config_string = ChooseServiceConfig(r->service_config_json_);
@@ -338,7 +337,7 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
r->channel_args_, args_to_remove, num_args_to_remove, args_to_add,
num_args_to_add);
gpr_free(service_config_string);
- r->addresses_.reset();
+ grpc_lb_addresses_destroy(r->lb_addresses_);
// Reset backoff state so that we start from the beginning when the
// next request gets triggered.
r->backoff_.Reset();
@@ -413,10 +412,11 @@ void AresDnsResolver::StartResolvingLocked() {
self.release();
GPR_ASSERT(!resolving_);
resolving_ = true;
+ lb_addresses_ = nullptr;
service_config_json_ = nullptr;
pending_request_ = grpc_dns_lookup_ares_locked(
dns_server_, name_to_resolve_, kDefaultPort, interested_parties_,
- &on_resolved_, &addresses_, true /* check_grpclb */,
+ &on_resolved_, &lb_addresses_, true /* check_grpclb */,
request_service_config_ ? &service_config_json_ : nullptr,
query_timeout_ms_, combiner());
last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now();
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
index 8abc34c6ed..f42b1e309d 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
@@ -31,7 +31,6 @@
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
index 1b1c2303da..55715869b6 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
@@ -37,16 +37,12 @@
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/nameser.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
-using grpc_core::ServerAddress;
-using grpc_core::ServerAddressList;
-
static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
@@ -62,7 +58,7 @@ struct grpc_ares_request {
/** closure to call when the request completes */
grpc_closure* on_done;
/** the pointer to receive the resolved addresses */
- grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses_out;
+ grpc_lb_addresses** lb_addrs_out;
/** the pointer to receive the service config in JSON */
char** service_config_json_out;
/** the evernt driver used by this request */
@@ -91,11 +87,12 @@ typedef struct grpc_ares_hostbyname_request {
static void do_basic_init(void) { gpr_mu_init(&g_init_mu); }
-static void log_address_sorting_list(const ServerAddressList& addresses,
+static void log_address_sorting_list(grpc_lb_addresses* lb_addrs,
const char* input_output_str) {
- for (size_t i = 0; i < addresses.size(); i++) {
+ for (size_t i = 0; i < lb_addrs->num_addresses; i++) {
char* addr_str;
- if (grpc_sockaddr_to_string(&addr_str, &addresses[i].address(), true)) {
+ if (grpc_sockaddr_to_string(&addr_str, &lb_addrs->addresses[i].address,
+ true)) {
gpr_log(GPR_DEBUG, "c-ares address sorting: %s[%" PRIuPTR "]=%s",
input_output_str, i, addr_str);
gpr_free(addr_str);
@@ -107,28 +104,29 @@ static void log_address_sorting_list(const ServerAddressList& addresses,
}
}
-void grpc_cares_wrapper_address_sorting_sort(ServerAddressList* addresses) {
+void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs) {
if (grpc_trace_cares_address_sorting.enabled()) {
- log_address_sorting_list(*addresses, "input");
+ log_address_sorting_list(lb_addrs, "input");
}
address_sorting_sortable* sortables = (address_sorting_sortable*)gpr_zalloc(
- sizeof(address_sorting_sortable) * addresses->size());
- for (size_t i = 0; i < addresses->size(); ++i) {
- sortables[i].user_data = &(*addresses)[i];
- memcpy(&sortables[i].dest_addr.addr, &(*addresses)[i].address().addr,
- (*addresses)[i].address().len);
- sortables[i].dest_addr.len = (*addresses)[i].address().len;
+ sizeof(address_sorting_sortable) * lb_addrs->num_addresses);
+ for (size_t i = 0; i < lb_addrs->num_addresses; i++) {
+ sortables[i].user_data = &lb_addrs->addresses[i];
+ memcpy(&sortables[i].dest_addr.addr, &lb_addrs->addresses[i].address.addr,
+ lb_addrs->addresses[i].address.len);
+ sortables[i].dest_addr.len = lb_addrs->addresses[i].address.len;
}
- address_sorting_rfc_6724_sort(sortables, addresses->size());
- ServerAddressList sorted;
- sorted.reserve(addresses->size());
- for (size_t i = 0; i < addresses->size(); ++i) {
- sorted.emplace_back(*static_cast<ServerAddress*>(sortables[i].user_data));
+ address_sorting_rfc_6724_sort(sortables, lb_addrs->num_addresses);
+ grpc_lb_address* sorted_lb_addrs = (grpc_lb_address*)gpr_zalloc(
+ sizeof(grpc_lb_address) * lb_addrs->num_addresses);
+ for (size_t i = 0; i < lb_addrs->num_addresses; i++) {
+ sorted_lb_addrs[i] = *(grpc_lb_address*)sortables[i].user_data;
}
gpr_free(sortables);
- *addresses = std::move(sorted);
+ gpr_free(lb_addrs->addresses);
+ lb_addrs->addresses = sorted_lb_addrs;
if (grpc_trace_cares_address_sorting.enabled()) {
- log_address_sorting_list(*addresses, "output");
+ log_address_sorting_list(lb_addrs, "output");
}
}
@@ -147,9 +145,9 @@ void grpc_ares_complete_request_locked(grpc_ares_request* r) {
/* Invoke on_done callback and destroy the
request */
r->ev_driver = nullptr;
- ServerAddressList* addresses = r->addresses_out->get();
- if (addresses != nullptr) {
- grpc_cares_wrapper_address_sorting_sort(addresses);
+ grpc_lb_addresses* lb_addrs = *(r->lb_addrs_out);
+ if (lb_addrs != nullptr) {
+ grpc_cares_wrapper_address_sorting_sort(lb_addrs);
}
GRPC_CLOSURE_SCHED(r->on_done, r->error);
}
@@ -183,30 +181,33 @@ static void on_hostbyname_done_locked(void* arg, int status, int timeouts,
GRPC_ERROR_UNREF(r->error);
r->error = GRPC_ERROR_NONE;
r->success = true;
- if (*r->addresses_out == nullptr) {
- *r->addresses_out = grpc_core::MakeUnique<ServerAddressList>();
+ grpc_lb_addresses** lb_addresses = r->lb_addrs_out;
+ if (*lb_addresses == nullptr) {
+ *lb_addresses = grpc_lb_addresses_create(0, nullptr);
}
- ServerAddressList& addresses = **r->addresses_out;
- for (size_t i = 0; hostent->h_addr_list[i] != nullptr; ++i) {
- grpc_core::InlinedVector<grpc_arg, 2> args_to_add;
- if (hr->is_balancer) {
- args_to_add.emplace_back(grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_ADDRESS_IS_BALANCER), 1));
- args_to_add.emplace_back(grpc_channel_arg_string_create(
- const_cast<char*>(GRPC_ARG_ADDRESS_BALANCER_NAME), hr->host));
- }
- grpc_channel_args* args = grpc_channel_args_copy_and_add(
- nullptr, args_to_add.data(), args_to_add.size());
+ size_t prev_naddr = (*lb_addresses)->num_addresses;
+ size_t i;
+ for (i = 0; hostent->h_addr_list[i] != nullptr; i++) {
+ }
+ (*lb_addresses)->num_addresses += i;
+ (*lb_addresses)->addresses = static_cast<grpc_lb_address*>(
+ gpr_realloc((*lb_addresses)->addresses,
+ sizeof(grpc_lb_address) * (*lb_addresses)->num_addresses));
+ for (i = prev_naddr; i < (*lb_addresses)->num_addresses; i++) {
switch (hostent->h_addrtype) {
case AF_INET6: {
size_t addr_len = sizeof(struct sockaddr_in6);
struct sockaddr_in6 addr;
memset(&addr, 0, addr_len);
- memcpy(&addr.sin6_addr, hostent->h_addr_list[i],
+ memcpy(&addr.sin6_addr, hostent->h_addr_list[i - prev_naddr],
sizeof(struct in6_addr));
addr.sin6_family = static_cast<unsigned char>(hostent->h_addrtype);
addr.sin6_port = hr->port;
- addresses.emplace_back(&addr, addr_len, args);
+ grpc_lb_addresses_set_address(
+ *lb_addresses, i, &addr, addr_len,
+ hr->is_balancer /* is_balancer */,
+ hr->is_balancer ? hr->host : nullptr /* balancer_name */,
+ nullptr /* user_data */);
char output[INET6_ADDRSTRLEN];
ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN);
gpr_log(GPR_DEBUG,
@@ -219,11 +220,15 @@ static void on_hostbyname_done_locked(void* arg, int status, int timeouts,
size_t addr_len = sizeof(struct sockaddr_in);
struct sockaddr_in addr;
memset(&addr, 0, addr_len);
- memcpy(&addr.sin_addr, hostent->h_addr_list[i],
+ memcpy(&addr.sin_addr, hostent->h_addr_list[i - prev_naddr],
sizeof(struct in_addr));
addr.sin_family = static_cast<unsigned char>(hostent->h_addrtype);
addr.sin_port = hr->port;
- addresses.emplace_back(&addr, addr_len, args);
+ grpc_lb_addresses_set_address(
+ *lb_addresses, i, &addr, addr_len,
+ hr->is_balancer /* is_balancer */,
+ hr->is_balancer ? hr->host : nullptr /* balancer_name */,
+ nullptr /* user_data */);
char output[INET_ADDRSTRLEN];
ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN);
gpr_log(GPR_DEBUG,
@@ -462,10 +467,11 @@ error_cleanup:
gpr_free(port);
}
-static bool inner_resolve_as_ip_literal_locked(
- const char* name, const char* default_port,
- grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, char** host,
- char** port, char** hostport) {
+static bool inner_resolve_as_ip_literal_locked(const char* name,
+ const char* default_port,
+ grpc_lb_addresses** addrs,
+ char** host, char** port,
+ char** hostport) {
gpr_split_host_port(name, host, port);
if (*host == nullptr) {
gpr_log(GPR_ERROR,
@@ -489,16 +495,18 @@ static bool inner_resolve_as_ip_literal_locked(
if (grpc_parse_ipv4_hostport(*hostport, &addr, false /* log errors */) ||
grpc_parse_ipv6_hostport(*hostport, &addr, false /* log errors */)) {
GPR_ASSERT(*addrs == nullptr);
- *addrs = grpc_core::MakeUnique<ServerAddressList>();
- (*addrs)->emplace_back(addr.addr, addr.len, nullptr /* args */);
+ *addrs = grpc_lb_addresses_create(1, nullptr);
+ grpc_lb_addresses_set_address(
+ *addrs, 0, addr.addr, addr.len, false /* is_balancer */,
+ nullptr /* balancer_name */, nullptr /* user_data */);
return true;
}
return false;
}
-static bool resolve_as_ip_literal_locked(
- const char* name, const char* default_port,
- grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs) {
+static bool resolve_as_ip_literal_locked(const char* name,
+ const char* default_port,
+ grpc_lb_addresses** addrs) {
char* host = nullptr;
char* port = nullptr;
char* hostport = nullptr;
@@ -513,14 +521,13 @@ static bool resolve_as_ip_literal_locked(
static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
- grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs,
- bool check_grpclb, char** service_config_json, int query_timeout_ms,
- grpc_combiner* combiner) {
+ grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
+ int query_timeout_ms, grpc_combiner* combiner) {
grpc_ares_request* r =
static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request)));
r->ev_driver = nullptr;
r->on_done = on_done;
- r->addresses_out = addrs;
+ r->lb_addrs_out = addrs;
r->service_config_json_out = service_config_json;
r->success = false;
r->error = GRPC_ERROR_NONE;
@@ -546,8 +553,8 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
- grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs,
- bool check_grpclb, char** service_config_json, int query_timeout_ms,
+ grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
+ int query_timeout_ms,
grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {
@@ -592,8 +599,8 @@ typedef struct grpc_resolve_address_ares_request {
grpc_combiner* combiner;
/** the pointer to receive the resolved addresses */
grpc_resolved_addresses** addrs_out;
- /** currently resolving addresses */
- grpc_core::UniquePtr<ServerAddressList> addresses;
+ /** currently resolving lb addresses */
+ grpc_lb_addresses* lb_addrs;
/** closure to call when the resolve_address_ares request completes */
grpc_closure* on_resolve_address_done;
/** a closure wrapping on_resolve_address_done, which should be invoked when
@@ -606,7 +613,7 @@ typedef struct grpc_resolve_address_ares_request {
/* pollset_set to be driven by */
grpc_pollset_set* interested_parties;
/* underlying ares_request that the query is performed on */
- grpc_ares_request* ares_request = nullptr;
+ grpc_ares_request* ares_request;
} grpc_resolve_address_ares_request;
static void on_dns_lookup_done_locked(void* arg, grpc_error* error) {
@@ -614,24 +621,25 @@ static void on_dns_lookup_done_locked(void* arg, grpc_error* error) {
static_cast<grpc_resolve_address_ares_request*>(arg);
gpr_free(r->ares_request);
grpc_resolved_addresses** resolved_addresses = r->addrs_out;
- if (r->addresses == nullptr || r->addresses->empty()) {
+ if (r->lb_addrs == nullptr || r->lb_addrs->num_addresses == 0) {
*resolved_addresses = nullptr;
} else {
*resolved_addresses = static_cast<grpc_resolved_addresses*>(
gpr_zalloc(sizeof(grpc_resolved_addresses)));
- (*resolved_addresses)->naddrs = r->addresses->size();
+ (*resolved_addresses)->naddrs = r->lb_addrs->num_addresses;
(*resolved_addresses)->addrs =
static_cast<grpc_resolved_address*>(gpr_zalloc(
sizeof(grpc_resolved_address) * (*resolved_addresses)->naddrs));
- for (size_t i = 0; i < (*resolved_addresses)->naddrs; ++i) {
- GPR_ASSERT(!(*r->addresses)[i].IsBalancer());
- memcpy(&(*resolved_addresses)->addrs[i], &(*r->addresses)[i].address(),
- sizeof(grpc_resolved_address));
+ for (size_t i = 0; i < (*resolved_addresses)->naddrs; i++) {
+ GPR_ASSERT(!r->lb_addrs->addresses[i].is_balancer);
+ memcpy(&(*resolved_addresses)->addrs[i],
+ &r->lb_addrs->addresses[i].address, sizeof(grpc_resolved_address));
}
}
GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error));
+ if (r->lb_addrs != nullptr) grpc_lb_addresses_destroy(r->lb_addrs);
GRPC_COMBINER_UNREF(r->combiner, "on_dns_lookup_done_cb");
- grpc_core::Delete(r);
+ gpr_free(r);
}
static void grpc_resolve_address_invoke_dns_lookup_ares_locked(
@@ -640,7 +648,7 @@ static void grpc_resolve_address_invoke_dns_lookup_ares_locked(
static_cast<grpc_resolve_address_ares_request*>(arg);
r->ares_request = grpc_dns_lookup_ares_locked(
nullptr /* dns_server */, r->name, r->default_port, r->interested_parties,
- &r->on_dns_lookup_done_locked, &r->addresses, false /* check_grpclb */,
+ &r->on_dns_lookup_done_locked, &r->lb_addrs, false /* check_grpclb */,
nullptr /* service_config_json */, GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS,
r->combiner);
}
@@ -651,7 +659,8 @@ static void grpc_resolve_address_ares_impl(const char* name,
grpc_closure* on_done,
grpc_resolved_addresses** addrs) {
grpc_resolve_address_ares_request* r =
- grpc_core::New<grpc_resolve_address_ares_request>();
+ static_cast<grpc_resolve_address_ares_request*>(
+ gpr_zalloc(sizeof(grpc_resolve_address_ares_request)));
r->combiner = grpc_combiner_create();
r->addrs_out = addrs;
r->on_resolve_address_done = on_done;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
index 2808250456..9acef1d0ca 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -21,7 +21,7 @@
#include <grpc/support/port_platform.h>
-#include "src/core/ext/filters/client_channel/server_address.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/resolve_address.h"
@@ -61,9 +61,8 @@ extern void (*grpc_resolve_address_ares)(const char* name,
extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
- grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses,
- bool check_grpclb, char** service_config_json, int query_timeout_ms,
- grpc_combiner* combiner);
+ grpc_lb_addresses** addresses, bool check_grpclb,
+ char** service_config_json, int query_timeout_ms, grpc_combiner* combiner);
/* Cancel the pending grpc_ares_request \a request */
extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request);
@@ -90,12 +89,10 @@ bool grpc_ares_query_ipv6();
* Returns a bool indicating whether or not such an action was performed.
* See https://github.com/grpc/grpc/issues/15158. */
bool grpc_ares_maybe_resolve_localhost_manually_locked(
- const char* name, const char* default_port,
- grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs);
+ const char* name, const char* default_port, grpc_lb_addresses** addrs);
/* Sorts destinations in lb_addrs according to RFC 6724. */
-void grpc_cares_wrapper_address_sorting_sort(
- grpc_core::ServerAddressList* addresses);
+void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H \
*/
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
index 1f4701c999..fc78b18304 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
@@ -29,17 +29,16 @@ struct grpc_ares_request {
static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
- grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs,
- bool check_grpclb, char** service_config_json, int query_timeout_ms,
- grpc_combiner* combiner) {
+ grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
+ int query_timeout_ms, grpc_combiner* combiner) {
return NULL;
}
grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
- grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs,
- bool check_grpclb, char** service_config_json, int query_timeout_ms,
+ grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
+ int query_timeout_ms,
grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc
index 028d844216..639eec2323 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc
@@ -27,8 +27,7 @@
bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); }
bool grpc_ares_maybe_resolve_localhost_manually_locked(
- const char* name, const char* default_port,
- grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs) {
+ const char* name, const char* default_port, grpc_lb_addresses** addrs) {
return false;
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc
index 202452f1b2..7e34784691 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc
@@ -23,9 +23,9 @@
#include <grpc/support/string_util.h>
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/socket_windows.h"
@@ -33,9 +33,8 @@
bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); }
static bool inner_maybe_resolve_localhost_manually_locked(
- const char* name, const char* default_port,
- grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, char** host,
- char** port) {
+ const char* name, const char* default_port, grpc_lb_addresses** addrs,
+ char** host, char** port) {
gpr_split_host_port(name, host, port);
if (*host == nullptr) {
gpr_log(GPR_ERROR,
@@ -56,7 +55,7 @@ static bool inner_maybe_resolve_localhost_manually_locked(
}
if (gpr_stricmp(*host, "localhost") == 0) {
GPR_ASSERT(*addrs == nullptr);
- *addrs = grpc_core::MakeUnique<grpc_core::ServerAddressList>();
+ *addrs = grpc_lb_addresses_create(2, nullptr);
uint16_t numeric_port = grpc_strhtons(*port);
// Append the ipv6 loopback address.
struct sockaddr_in6 ipv6_loopback_addr;
@@ -64,8 +63,10 @@ static bool inner_maybe_resolve_localhost_manually_locked(
((char*)&ipv6_loopback_addr.sin6_addr)[15] = 1;
ipv6_loopback_addr.sin6_family = AF_INET6;
ipv6_loopback_addr.sin6_port = numeric_port;
- (*addrs)->emplace_back(&ipv6_loopback_addr, sizeof(ipv6_loopback_addr),
- nullptr /* args */);
+ grpc_lb_addresses_set_address(
+ *addrs, 0, &ipv6_loopback_addr, sizeof(ipv6_loopback_addr),
+ false /* is_balancer */, nullptr /* balancer_name */,
+ nullptr /* user_data */);
// Append the ipv4 loopback address.
struct sockaddr_in ipv4_loopback_addr;
memset(&ipv4_loopback_addr, 0, sizeof(ipv4_loopback_addr));
@@ -73,18 +74,19 @@ static bool inner_maybe_resolve_localhost_manually_locked(
((char*)&ipv4_loopback_addr.sin_addr)[3] = 0x01;
ipv4_loopback_addr.sin_family = AF_INET;
ipv4_loopback_addr.sin_port = numeric_port;
- (*addrs)->emplace_back(&ipv4_loopback_addr, sizeof(ipv4_loopback_addr),
- nullptr /* args */);
+ grpc_lb_addresses_set_address(
+ *addrs, 1, &ipv4_loopback_addr, sizeof(ipv4_loopback_addr),
+ false /* is_balancer */, nullptr /* balancer_name */,
+ nullptr /* user_data */);
// Let the address sorter figure out which one should be tried first.
- grpc_cares_wrapper_address_sorting_sort(addrs->get());
+ grpc_cares_wrapper_address_sorting_sort(*addrs);
return true;
}
return false;
}
bool grpc_ares_maybe_resolve_localhost_manually_locked(
- const char* name, const char* default_port,
- grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs) {
+ const char* name, const char* default_port, grpc_lb_addresses** addrs) {
char* host = nullptr;
char* port = nullptr;
bool out = inner_maybe_resolve_localhost_manually_locked(name, default_port,
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
index c365f1abfd..65ff1ec1a5 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
@@ -26,8 +26,8 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
@@ -198,14 +198,18 @@ void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
grpc_slice_from_copied_string(r->name_to_resolve_));
if (r->addresses_ != nullptr) {
- ServerAddressList addresses;
+ grpc_lb_addresses* addresses = grpc_lb_addresses_create(
+ r->addresses_->naddrs, nullptr /* user_data_vtable */);
for (size_t i = 0; i < r->addresses_->naddrs; ++i) {
- addresses.emplace_back(&r->addresses_->addrs[i].addr,
- r->addresses_->addrs[i].len, nullptr /* args */);
+ grpc_lb_addresses_set_address(
+ addresses, i, &r->addresses_->addrs[i].addr,
+ r->addresses_->addrs[i].len, false /* is_balancer */,
+ nullptr /* balancer_name */, nullptr /* user_data */);
}
- grpc_arg new_arg = CreateServerAddressListChannelArg(&addresses);
+ grpc_arg new_arg = grpc_lb_addresses_create_channel_arg(addresses);
result = grpc_channel_args_copy_and_add(r->channel_args_, &new_arg, 1);
grpc_resolved_addresses_destroy(r->addresses_);
+ grpc_lb_addresses_destroy(addresses);
// Reset backoff state so that we start from the beginning when the
// next request gets triggered.
r->backoff_.Reset();
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
index 258339491c..3aa690bea4 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
@@ -28,13 +28,12 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
index d86111c382..7f69059351 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
@@ -19,9 +19,10 @@
#include <grpc/support/port_platform.h>
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/ref_counted.h"
-#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/uri/uri_parser.h"
#define GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR \
"grpc.fake_resolver.response_generator"
diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
index 1654747a79..801734764b 100644
--- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
@@ -26,9 +26,9 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
@@ -45,8 +45,7 @@ namespace {
class SockaddrResolver : public Resolver {
public:
/// Takes ownership of \a addresses.
- SockaddrResolver(const ResolverArgs& args,
- UniquePtr<ServerAddressList> addresses);
+ SockaddrResolver(const ResolverArgs& args, grpc_lb_addresses* addresses);
void NextLocked(grpc_channel_args** result,
grpc_closure* on_complete) override;
@@ -59,7 +58,7 @@ class SockaddrResolver : public Resolver {
void MaybeFinishNextLocked();
/// the addresses that we've "resolved"
- UniquePtr<ServerAddressList> addresses_;
+ grpc_lb_addresses* addresses_ = nullptr;
/// channel args
grpc_channel_args* channel_args_ = nullptr;
/// have we published?
@@ -71,12 +70,13 @@ class SockaddrResolver : public Resolver {
};
SockaddrResolver::SockaddrResolver(const ResolverArgs& args,
- UniquePtr<ServerAddressList> addresses)
+ grpc_lb_addresses* addresses)
: Resolver(args.combiner),
- addresses_(std::move(addresses)),
+ addresses_(addresses),
channel_args_(grpc_channel_args_copy(args.args)) {}
SockaddrResolver::~SockaddrResolver() {
+ grpc_lb_addresses_destroy(addresses_);
grpc_channel_args_destroy(channel_args_);
}
@@ -100,7 +100,7 @@ void SockaddrResolver::ShutdownLocked() {
void SockaddrResolver::MaybeFinishNextLocked() {
if (next_completion_ != nullptr && !published_) {
published_ = true;
- grpc_arg arg = CreateServerAddressListChannelArg(addresses_.get());
+ grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses_);
*target_result_ = grpc_channel_args_copy_and_add(channel_args_, &arg, 1);
GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE);
next_completion_ = nullptr;
@@ -127,27 +127,27 @@ OrphanablePtr<Resolver> CreateSockaddrResolver(
grpc_slice_buffer path_parts;
grpc_slice_buffer_init(&path_parts);
grpc_slice_split(path_slice, ",", &path_parts);
- auto addresses = MakeUnique<ServerAddressList>();
+ grpc_lb_addresses* addresses = grpc_lb_addresses_create(
+ path_parts.count, nullptr /* user_data_vtable */);
bool errors_found = false;
- for (size_t i = 0; i < path_parts.count; i++) {
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
grpc_uri ith_uri = *args.uri;
- UniquePtr<char> part_str(grpc_slice_to_c_string(path_parts.slices[i]));
- ith_uri.path = part_str.get();
- grpc_resolved_address addr;
- if (!parse(&ith_uri, &addr)) {
+ char* part_str = grpc_slice_to_c_string(path_parts.slices[i]);
+ ith_uri.path = part_str;
+ if (!parse(&ith_uri, &addresses->addresses[i].address)) {
errors_found = true; /* GPR_TRUE */
- break;
}
- addresses->emplace_back(addr, nullptr /* args */);
+ gpr_free(part_str);
+ if (errors_found) break;
}
grpc_slice_buffer_destroy_internal(&path_parts);
grpc_slice_unref_internal(path_slice);
if (errors_found) {
+ grpc_lb_addresses_destroy(addresses);
return OrphanablePtr<Resolver>(nullptr);
}
// Instantiate resolver.
- return OrphanablePtr<Resolver>(
- New<SockaddrResolver>(args, std::move(addresses)));
+ return OrphanablePtr<Resolver>(New<SockaddrResolver>(args, addresses));
}
class IPv4ResolverFactory : public ResolverFactory {
diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.cc b/src/core/ext/filters/client_channel/resolver_result_parsing.cc
index 22b06db45c..4f7fd6b424 100644
--- a/src/core/ext/filters/client_channel/resolver_result_parsing.cc
+++ b/src/core/ext/filters/client_channel/resolver_result_parsing.cc
@@ -30,11 +30,9 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/memory.h"
-#include "src/core/lib/uri/uri_parser.h"
// As per the retry design, we do not allow more than 5 retry attempts.
#define MAX_MAX_RETRY_ATTEMPTS 5
@@ -101,18 +99,12 @@ void ProcessedResolverResult::ProcessLbPolicyName(
}
// Special case: If at least one balancer address is present, we use
// the grpclb policy, regardless of what the resolver has returned.
- const ServerAddressList* addresses =
- FindServerAddressListChannelArg(resolver_result);
- if (addresses != nullptr) {
- bool found_balancer_address = false;
- for (size_t i = 0; i < addresses->size(); ++i) {
- const ServerAddress& address = (*addresses)[i];
- if (address.IsBalancer()) {
- found_balancer_address = true;
- break;
- }
- }
- if (found_balancer_address) {
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(resolver_result, GRPC_ARG_LB_ADDRESSES);
+ if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
+ if (grpc_lb_addresses_contains_balancer_address(*addresses)) {
if (lb_policy_name_ != nullptr &&
strcmp(lb_policy_name_.get(), "grpclb") != 0) {
gpr_log(GPR_INFO,
diff --git a/src/core/ext/filters/client_channel/server_address.cc b/src/core/ext/filters/client_channel/server_address.cc
deleted file mode 100644
index ec33cbbd95..0000000000
--- a/src/core/ext/filters/client_channel/server_address.cc
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include "src/core/ext/filters/client_channel/server_address.h"
-
-#include <string.h>
-
-namespace grpc_core {
-
-//
-// ServerAddress
-//
-
-ServerAddress::ServerAddress(const grpc_resolved_address& address,
- grpc_channel_args* args)
- : address_(address), args_(args) {}
-
-ServerAddress::ServerAddress(const void* address, size_t address_len,
- grpc_channel_args* args)
- : args_(args) {
- memcpy(address_.addr, address, address_len);
- address_.len = static_cast<socklen_t>(address_len);
-}
-
-int ServerAddress::Cmp(const ServerAddress& other) const {
- if (address_.len > other.address_.len) return 1;
- if (address_.len < other.address_.len) return -1;
- int retval = memcmp(address_.addr, other.address_.addr, address_.len);
- if (retval != 0) return retval;
- return grpc_channel_args_compare(args_, other.args_);
-}
-
-bool ServerAddress::IsBalancer() const {
- return grpc_channel_arg_get_bool(
- grpc_channel_args_find(args_, GRPC_ARG_ADDRESS_IS_BALANCER), false);
-}
-
-//
-// ServerAddressList
-//
-
-namespace {
-
-void* ServerAddressListCopy(void* addresses) {
- ServerAddressList* a = static_cast<ServerAddressList*>(addresses);
- return New<ServerAddressList>(*a);
-}
-
-void ServerAddressListDestroy(void* addresses) {
- ServerAddressList* a = static_cast<ServerAddressList*>(addresses);
- Delete(a);
-}
-
-int ServerAddressListCompare(void* addresses1, void* addresses2) {
- ServerAddressList* a1 = static_cast<ServerAddressList*>(addresses1);
- ServerAddressList* a2 = static_cast<ServerAddressList*>(addresses2);
- if (a1->size() > a2->size()) return 1;
- if (a1->size() < a2->size()) return -1;
- for (size_t i = 0; i < a1->size(); ++i) {
- int retval = (*a1)[i].Cmp((*a2)[i]);
- if (retval != 0) return retval;
- }
- return 0;
-}
-
-const grpc_arg_pointer_vtable server_addresses_arg_vtable = {
- ServerAddressListCopy, ServerAddressListDestroy, ServerAddressListCompare};
-
-} // namespace
-
-grpc_arg CreateServerAddressListChannelArg(const ServerAddressList* addresses) {
- return grpc_channel_arg_pointer_create(
- const_cast<char*>(GRPC_ARG_SERVER_ADDRESS_LIST),
- const_cast<ServerAddressList*>(addresses), &server_addresses_arg_vtable);
-}
-
-ServerAddressList* FindServerAddressListChannelArg(
- const grpc_channel_args* channel_args) {
- const grpc_arg* lb_addresses_arg =
- grpc_channel_args_find(channel_args, GRPC_ARG_SERVER_ADDRESS_LIST);
- if (lb_addresses_arg == nullptr || lb_addresses_arg->type != GRPC_ARG_POINTER)
- return nullptr;
- return static_cast<ServerAddressList*>(lb_addresses_arg->value.pointer.p);
-}
-
-} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/server_address.h b/src/core/ext/filters/client_channel/server_address.h
deleted file mode 100644
index 3a1bf1df67..0000000000
--- a/src/core/ext/filters/client_channel/server_address.h
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SERVER_ADDRESS_H
-#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SERVER_ADDRESS_H
-
-#include <grpc/support/port_platform.h>
-
-#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/gprpp/inlined_vector.h"
-#include "src/core/lib/iomgr/resolve_address.h"
-#include "src/core/lib/uri/uri_parser.h"
-
-// Channel arg key for ServerAddressList.
-#define GRPC_ARG_SERVER_ADDRESS_LIST "grpc.server_address_list"
-
-// Channel arg key for a bool indicating whether an address is a grpclb
-// load balancer (as opposed to a backend).
-#define GRPC_ARG_ADDRESS_IS_BALANCER "grpc.address_is_balancer"
-
-// Channel arg key for a string indicating an address's balancer name.
-#define GRPC_ARG_ADDRESS_BALANCER_NAME "grpc.address_balancer_name"
-
-namespace grpc_core {
-
-//
-// ServerAddress
-//
-
-// A server address is a grpc_resolved_address with an associated set of
-// channel args. Any args present here will be merged into the channel
-// args when a subchannel is created for this address.
-class ServerAddress {
- public:
- // Takes ownership of args.
- ServerAddress(const grpc_resolved_address& address, grpc_channel_args* args);
- ServerAddress(const void* address, size_t address_len,
- grpc_channel_args* args);
-
- ~ServerAddress() { grpc_channel_args_destroy(args_); }
-
- // Copyable.
- ServerAddress(const ServerAddress& other)
- : address_(other.address_), args_(grpc_channel_args_copy(other.args_)) {}
- ServerAddress& operator=(const ServerAddress& other) {
- address_ = other.address_;
- grpc_channel_args_destroy(args_);
- args_ = grpc_channel_args_copy(other.args_);
- return *this;
- }
-
- // Movable.
- ServerAddress(ServerAddress&& other)
- : address_(other.address_), args_(other.args_) {
- other.args_ = nullptr;
- }
- ServerAddress& operator=(ServerAddress&& other) {
- address_ = other.address_;
- args_ = other.args_;
- other.args_ = nullptr;
- return *this;
- }
-
- bool operator==(const ServerAddress& other) const { return Cmp(other) == 0; }
-
- int Cmp(const ServerAddress& other) const;
-
- const grpc_resolved_address& address() const { return address_; }
- const grpc_channel_args* args() const { return args_; }
-
- bool IsBalancer() const;
-
- private:
- grpc_resolved_address address_;
- grpc_channel_args* args_;
-};
-
-//
-// ServerAddressList
-//
-
-typedef InlinedVector<ServerAddress, 1> ServerAddressList;
-
-// Returns a channel arg containing \a addresses.
-grpc_arg CreateServerAddressListChannelArg(const ServerAddressList* addresses);
-
-// Returns the ServerListAddress instance in channel_args or NULL.
-ServerAddressList* FindServerAddressListChannelArg(
- const grpc_channel_args* channel_args);
-
-} // namespace grpc_core
-
-#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SERVER_ADDRESS_H */
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 9077aa9753..af55f7710e 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -837,7 +837,7 @@ static bool publish_transport_locked(grpc_subchannel* c) {
/* publish */
c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>(
- stk, c->args, c->channelz_subchannel, socket_uuid));
+ stk, c->channelz_subchannel, socket_uuid));
gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
c->connected_subchannel.get(), c);
@@ -1068,18 +1068,16 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
namespace grpc_core {
ConnectedSubchannel::ConnectedSubchannel(
- grpc_channel_stack* channel_stack, const grpc_channel_args* args,
+ grpc_channel_stack* channel_stack,
grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode>
channelz_subchannel,
intptr_t socket_uuid)
: RefCounted<ConnectedSubchannel>(&grpc_trace_stream_refcount),
channel_stack_(channel_stack),
- args_(grpc_channel_args_copy(args)),
channelz_subchannel_(std::move(channelz_subchannel)),
socket_uuid_(socket_uuid) {}
ConnectedSubchannel::~ConnectedSubchannel() {
- grpc_channel_args_destroy(args_);
GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
}
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index 14f87f2c68..69c2456ec2 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -85,31 +85,28 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
size_t parent_data_size;
};
- ConnectedSubchannel(
- grpc_channel_stack* channel_stack, const grpc_channel_args* args,
+ explicit ConnectedSubchannel(
+ grpc_channel_stack* channel_stack,
grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode>
channelz_subchannel,
intptr_t socket_uuid);
~ConnectedSubchannel();
+ grpc_channel_stack* channel_stack() { return channel_stack_; }
void NotifyOnStateChange(grpc_pollset_set* interested_parties,
grpc_connectivity_state* state,
grpc_closure* closure);
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call);
-
- grpc_channel_stack* channel_stack() const { return channel_stack_; }
- const grpc_channel_args* args() const { return args_; }
- channelz::SubchannelNode* channelz_subchannel() const {
+ channelz::SubchannelNode* channelz_subchannel() {
return channelz_subchannel_.get();
}
- intptr_t socket_uuid() const { return socket_uuid_; }
+ intptr_t socket_uuid() { return socket_uuid_; }
size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
private:
grpc_channel_stack* channel_stack_;
- grpc_channel_args* args_;
// ref counted pointer to the channelz node in this connected subchannel's
// owning subchannel.
grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode>