aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc242
1 files changed, 141 insertions, 101 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index a9a5965ed1..a46579c7f7 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -84,7 +84,6 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
@@ -114,8 +113,6 @@
#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
-#define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token"
-
namespace grpc_core {
TraceFlag grpc_lb_glb_trace(false, "glb");
@@ -124,7 +121,7 @@ namespace {
class GrpcLb : public LoadBalancingPolicy {
public:
- explicit GrpcLb(const Args& args);
+ GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
@@ -164,6 +161,9 @@ class GrpcLb : public LoadBalancingPolicy {
// Our on_complete closure and the original one.
grpc_closure on_complete;
grpc_closure* original_on_complete;
+ // The LB token associated with the pick. This is set via user_data in
+ // the pick.
+ grpc_mdelem lb_token;
// Stats for client-side load reporting.
RefCountedPtr<GrpcLbClientStats> client_stats;
// Next pending pick.
@@ -329,7 +329,7 @@ class GrpcLb : public LoadBalancingPolicy {
// 0 means not using fallback.
int lb_fallback_timeout_ms_ = 0;
// The backend addresses from the resolver.
- UniquePtr<ServerAddressList> fallback_backend_addresses_;
+ grpc_lb_addresses* fallback_backend_addresses_ = nullptr;
// Fallback timer.
bool fallback_timer_callback_pending_ = false;
grpc_timer lb_fallback_timer_;
@@ -349,7 +349,7 @@ class GrpcLb : public LoadBalancingPolicy {
// serverlist parsing code
//
-// vtable for LB token channel arg.
+// vtable for LB tokens in grpc_lb_addresses
void* lb_token_copy(void* token) {
return token == nullptr
? nullptr
@@ -361,11 +361,38 @@ void lb_token_destroy(void* token) {
}
}
int lb_token_cmp(void* token1, void* token2) {
- return GPR_ICMP(token1, token2);
+ if (token1 > token2) return 1;
+ if (token1 < token2) return -1;
+ return 0;
}
-const grpc_arg_pointer_vtable lb_token_arg_vtable = {
+const grpc_lb_user_data_vtable lb_token_vtable = {
lb_token_copy, lb_token_destroy, lb_token_cmp};
+// Returns the backend addresses extracted from the given addresses.
+grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
+ // First pass: count the number of backend addresses.
+ size_t num_backends = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) {
+ ++num_backends;
+ }
+ }
+ // Second pass: actually populate the addresses and (empty) LB tokens.
+ grpc_lb_addresses* backend_addresses =
+ grpc_lb_addresses_create(num_backends, &lb_token_vtable);
+ size_t num_copied = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) continue;
+ const grpc_resolved_address* addr = &addresses->addresses[i].address;
+ grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
+ addr->len, false /* is_balancer */,
+ nullptr /* balancer_name */,
+ (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
+ ++num_copied;
+ }
+ return backend_addresses;
+}
+
bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
if (server->drop) return false;
const grpc_grpclb_ip_address* ip = &server->ip_address;
@@ -413,16 +440,30 @@ void ParseServer(const grpc_grpclb_server* server,
}
// Returns addresses extracted from \a serverlist.
-ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
- ServerAddressList addresses;
+grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
+ size_t num_valid = 0;
+ /* first pass: count how many are valid in order to allocate the necessary
+ * memory in a single block */
for (size_t i = 0; i < serverlist->num_servers; ++i) {
- const grpc_grpclb_server* server = serverlist->servers[i];
- if (!IsServerValid(serverlist->servers[i], i, false)) continue;
- // Address processing.
+ if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid;
+ }
+ grpc_lb_addresses* lb_addresses =
+ grpc_lb_addresses_create(num_valid, &lb_token_vtable);
+ /* second pass: actually populate the addresses and LB tokens (aka user data
+ * to the outside world) to be read by the RR policy during its creation.
+ * Given that the validity tests are very cheap, they are performed again
+ * instead of marking the valid ones during the first pass, as this would
+ * incurr in an allocation due to the arbitrary number of server */
+ size_t addr_idx = 0;
+ for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
+ const grpc_grpclb_server* server = serverlist->servers[sl_idx];
+ if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue;
+ GPR_ASSERT(addr_idx < num_valid);
+ /* address processing */
grpc_resolved_address addr;
ParseServer(server, &addr);
- // LB token processing.
- void* lb_token;
+ /* lb token processing */
+ void* user_data;
if (server->has_load_balance_token) {
const size_t lb_token_max_length =
GPR_ARRAY_SIZE(server->load_balance_token);
@@ -430,7 +471,7 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
strnlen(server->load_balance_token, lb_token_max_length);
grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
server->load_balance_token, lb_token_length);
- lb_token =
+ user_data =
(void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
.payload;
} else {
@@ -440,16 +481,15 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
"be used instead",
uri);
gpr_free(uri);
- lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
+ user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
}
- // Add address.
- grpc_arg arg = grpc_channel_arg_pointer_create(
- const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
- &lb_token_arg_vtable);
- grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
- addresses.emplace_back(addr, args);
- }
- return addresses;
+ grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
+ false /* is_balancer */,
+ nullptr /* balancer_name */, user_data);
+ ++addr_idx;
+ }
+ GPR_ASSERT(addr_idx == num_valid);
+ return lb_addresses;
}
//
@@ -789,7 +829,8 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
} else {
// Dispose of the fallback.
- grpclb_policy->fallback_backend_addresses_.reset();
+ grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
+ grpclb_policy->fallback_backend_addresses_ = nullptr;
if (grpclb_policy->fallback_timer_callback_pending_) {
grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
}
@@ -869,25 +910,31 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// helper code for creating balancer channel
//
-ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) {
- ServerAddressList balancer_addresses;
- for (size_t i = 0; i < addresses.size(); ++i) {
- if (addresses[i].IsBalancer()) {
- // Strip out the is_balancer channel arg, since we don't want to
- // recursively use the grpclb policy in the channel used to talk to
- // the balancers. Note that we do NOT strip out the balancer_name
- // channel arg, since we need that to set the authority correctly
- // to talk to the balancers.
- static const char* args_to_remove[] = {
- GRPC_ARG_ADDRESS_IS_BALANCER,
- };
- balancer_addresses.emplace_back(
- addresses[i].address(),
- grpc_channel_args_copy_and_remove(addresses[i].args(), args_to_remove,
- GPR_ARRAY_SIZE(args_to_remove)));
+grpc_lb_addresses* ExtractBalancerAddresses(
+ const grpc_lb_addresses* addresses) {
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ }
+ // There must be at least one balancer address, or else the
+ // client_channel would not have chosen this LB policy.
+ GPR_ASSERT(num_grpclb_addrs > 0);
+ grpc_lb_addresses* lb_addresses =
+ grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
+ size_t lb_addresses_idx = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) continue;
+ if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
}
+ grpc_lb_addresses_set_address(
+ lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
+ addresses->addresses[i].address.len, false /* is balancer */,
+ addresses->addresses[i].balancer_name, nullptr /* user data */);
}
- return balancer_addresses;
+ GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
+ return lb_addresses;
}
/* Returns the channel args for the LB channel, used to create a bidirectional
@@ -899,10 +946,10 @@ ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) {
* above the grpclb policy.
* - \a args: other args inherited from the grpclb policy. */
grpc_channel_args* BuildBalancerChannelArgs(
- const ServerAddressList& addresses,
+ const grpc_lb_addresses* addresses,
FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args) {
- ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses);
+ grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses);
// Channel args to remove.
static const char* args_to_remove[] = {
// LB policy name, since we want to use the default (pick_first) in
@@ -920,7 +967,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
// is_balancer=true. We need the LB channel to return addresses with
// is_balancer=false so that it does not wind up recursively using the
// grpclb LB policy, as per the special case logic in client_channel.c.
- GRPC_ARG_SERVER_ADDRESS_LIST,
+ GRPC_ARG_LB_ADDRESSES,
// The fake resolver response generator, because we are replacing it
// with the one from the grpclb policy, used to propagate updates to
// the LB channel.
@@ -936,10 +983,10 @@ grpc_channel_args* BuildBalancerChannelArgs(
};
// Channel args to add.
const grpc_arg args_to_add[] = {
- // New address list.
+ // New LB addresses.
// Note that we pass these in both when creating the LB channel
// and via the fake resolver. The latter is what actually gets used.
- CreateServerAddressListChannelArg(&balancer_addresses),
+ grpc_lb_addresses_create_channel_arg(lb_addresses),
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
@@ -957,14 +1004,18 @@ grpc_channel_args* BuildBalancerChannelArgs(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
// Make any necessary modifications for security.
- return grpc_lb_policy_grpclb_modify_lb_channel_args(new_args);
+ new_args = grpc_lb_policy_grpclb_modify_lb_channel_args(new_args);
+ // Clean up.
+ grpc_lb_addresses_destroy(lb_addresses);
+ return new_args;
}
//
// ctor and dtor
//
-GrpcLb::GrpcLb(const LoadBalancingPolicy::Args& args)
+GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
+ const LoadBalancingPolicy::Args& args)
: LoadBalancingPolicy(args),
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
lb_call_backoff_(
@@ -1021,6 +1072,9 @@ GrpcLb::~GrpcLb() {
if (serverlist_ != nullptr) {
grpc_grpclb_destroy_serverlist(serverlist_);
}
+ if (fallback_backend_addresses_ != nullptr) {
+ grpc_lb_addresses_destroy(fallback_backend_addresses_);
+ }
grpc_subchannel_index_unref();
}
@@ -1068,6 +1122,7 @@ void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
while ((pp = pending_picks_) != nullptr) {
pending_picks_ = pp->next;
pp->pick->on_complete = pp->original_on_complete;
+ pp->pick->user_data = nullptr;
grpc_error* error = GRPC_ERROR_NONE;
if (new_policy->PickLocked(pp->pick, &error)) {
// Synchronous return; schedule closure.
@@ -1221,27 +1276,9 @@ void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
notify);
}
-// Returns the backend addresses extracted from the given addresses.
-UniquePtr<ServerAddressList> ExtractBackendAddresses(
- const ServerAddressList& addresses) {
- void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
- grpc_arg arg = grpc_channel_arg_pointer_create(
- const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
- &lb_token_arg_vtable);
- auto backend_addresses = MakeUnique<ServerAddressList>();
- for (size_t i = 0; i < addresses.size(); ++i) {
- if (!addresses[i].IsBalancer()) {
- backend_addresses->emplace_back(
- addresses[i].address(),
- grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1));
- }
- }
- return backend_addresses;
-}
-
void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
- const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
- if (addresses == nullptr) {
+ const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
+ if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
// Ignore this update.
gpr_log(
GPR_ERROR,
@@ -1249,8 +1286,13 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
this);
return;
}
+ const grpc_lb_addresses* addresses =
+ static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
// Update fallback address list.
- fallback_backend_addresses_ = ExtractBackendAddresses(*addresses);
+ if (fallback_backend_addresses_ != nullptr) {
+ grpc_lb_addresses_destroy(fallback_backend_addresses_);
+ }
+ fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
@@ -1261,7 +1303,7 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
// Construct args for balancer channel.
grpc_channel_args* lb_channel_args =
- BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args);
+ BuildBalancerChannelArgs(addresses, response_generator_.get(), &args);
// Create balancer channel if needed.
if (lb_channel_ == nullptr) {
char* uri_str;
@@ -1467,17 +1509,12 @@ void DestroyClientStats(void* arg) {
}
void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
- // If connected_subchannel is nullptr, no pick has been made by the RR
- // policy (e.g., all addresses failed to connect). There won't be any
- // LB token available.
+ /* if connected_subchannel is nullptr, no pick has been made by the RR
+ * policy (e.g., all addresses failed to connect). There won't be any
+ * user_data/token available */
if (pp->pick->connected_subchannel != nullptr) {
- const grpc_arg* arg =
- grpc_channel_args_find(pp->pick->connected_subchannel->args(),
- GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
- if (arg != nullptr) {
- grpc_mdelem lb_token = {
- reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
- AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(lb_token),
+ if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) {
+ AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
&pp->pick->lb_token_mdelem_storage,
pp->pick->initial_metadata);
} else {
@@ -1561,10 +1598,12 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
return true;
}
}
- // Set client_stats.
+ // Set client_stats and user_data.
if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
pp->client_stats = lb_calld_->client_stats()->Ref();
}
+ GPR_ASSERT(pp->pick->user_data == nullptr);
+ pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the RR policy.
bool pick_done = rr_policy_->PickLocked(pp->pick, error);
if (pick_done) {
@@ -1629,11 +1668,10 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
}
grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
- ServerAddressList tmp_addresses;
- ServerAddressList* addresses = &tmp_addresses;
+ grpc_lb_addresses* addresses;
bool is_backend_from_grpclb_load_balancer = false;
if (serverlist_ != nullptr) {
- tmp_addresses = ProcessServerlist(serverlist_);
+ addresses = ProcessServerlist(serverlist_);
is_backend_from_grpclb_load_balancer = true;
} else {
// If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
@@ -1642,14 +1680,14 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
// empty, in which case the new round_robin policy will keep the requested
// picks pending.
GPR_ASSERT(fallback_backend_addresses_ != nullptr);
- addresses = fallback_backend_addresses_.get();
+ addresses = grpc_lb_addresses_copy(fallback_backend_addresses_);
}
GPR_ASSERT(addresses != nullptr);
- // Replace the server address list in the channel args that we pass down to
+ // Replace the LB addresses in the channel args that we pass down to
// the subchannel.
- static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST};
+ static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
grpc_arg args_to_add[3] = {
- CreateServerAddressListChannelArg(addresses),
+ grpc_lb_addresses_create_channel_arg(addresses),
// A channel arg indicating if the target is a backend inferred from a
// grpclb load balancer.
grpc_channel_arg_integer_create(
@@ -1666,6 +1704,7 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
num_args_to_add);
+ grpc_lb_addresses_destroy(addresses);
return args;
}
@@ -1798,18 +1837,19 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const override {
/* Count the number of gRPC-LB addresses. There must be at least one. */
- const ServerAddressList* addresses =
- FindServerAddressListChannelArg(args.args);
- if (addresses == nullptr) return nullptr;
- bool found_balancer = false;
- for (size_t i = 0; i < addresses->size(); ++i) {
- if ((*addresses)[i].IsBalancer()) {
- found_balancer = true;
- break;
- }
+ const grpc_arg* arg =
+ grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
+ return nullptr;
+ }
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
}
- if (!found_balancer) return nullptr;
- return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(args));
+ if (num_grpclb_addrs == 0) return nullptr;
+ return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args));
}
const char* name() const override { return "grpclb"; }