aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc18
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc41
2 files changed, 51 insertions, 8 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index a9a5965ed1..49a1b2d692 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -361,7 +361,9 @@ void lb_token_destroy(void* token) {
}
}
int lb_token_cmp(void* token1, void* token2) {
- return GPR_ICMP(token1, token2);
+ // Always indicate a match, since we don't want this channel arg to
+ // affect the subchannel's key in the index.
+ return 0;
}
const grpc_arg_pointer_vtable lb_token_arg_vtable = {
lb_token_copy, lb_token_destroy, lb_token_cmp};
@@ -422,7 +424,7 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
grpc_resolved_address addr;
ParseServer(server, &addr);
// LB token processing.
- void* lb_token;
+ grpc_mdelem lb_token;
if (server->has_load_balance_token) {
const size_t lb_token_max_length =
GPR_ARRAY_SIZE(server->load_balance_token);
@@ -430,9 +432,7 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
strnlen(server->load_balance_token, lb_token_max_length);
grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
server->load_balance_token, lb_token_length);
- lb_token =
- (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
- .payload;
+ lb_token = grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr);
} else {
char* uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
@@ -440,14 +440,16 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
"be used instead",
uri);
gpr_free(uri);
- lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
+ lb_token = GRPC_MDELEM_LB_TOKEN_EMPTY;
}
// Add address.
grpc_arg arg = grpc_channel_arg_pointer_create(
- const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
- &lb_token_arg_vtable);
+ const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN),
+ (void*)lb_token.payload, &lb_token_arg_vtable);
grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
addresses.emplace_back(addr, args);
+ // Clean up.
+ GRPC_MDELEM_UNREF(lb_token);
}
return addresses;
}
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 2eaacd429d..f739ed032b 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -18,6 +18,7 @@
#include <memory>
#include <mutex>
+#include <set>
#include <sstream>
#include <thread>
@@ -145,6 +146,7 @@ class BackendServiceImpl : public BackendService {
IncreaseRequestCount();
const auto status = TestServiceImpl::Echo(context, request, response);
IncreaseResponseCount();
+ AddClient(context->peer());
return status;
}
@@ -157,9 +159,21 @@ class BackendServiceImpl : public BackendService {
return prev;
}
+ std::set<grpc::string> clients() {
+ std::unique_lock<std::mutex> lock(clients_mu_);
+ return clients_;
+ }
+
private:
+ void AddClient(const grpc::string& client) {
+ std::unique_lock<std::mutex> lock(clients_mu_);
+ clients_.insert(client);
+ }
+
std::mutex mu_;
bool shutdown_ = false;
+ std::mutex clients_mu_;
+ std::set<grpc::string> clients_;
};
grpc::string Ip4ToPackedString(const char* ip_str) {
@@ -303,6 +317,11 @@ class BalancerServiceImpl : public BalancerService {
auto* server = response.mutable_server_list()->add_servers();
server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
server->set_port(backend_port);
+ static int token_count = 0;
+ char* token;
+ gpr_asprintf(&token, "token%03d", ++token_count);
+ server->set_load_balance_token(token);
+ gpr_free(token);
}
return response;
}
@@ -675,6 +694,28 @@ TEST_F(SingleBalancerTest, Vanilla) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
+TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) {
+ SetNextResolutionAllBalancers();
+ // Same backend listed twice.
+ std::vector<int> ports;
+ ports.push_back(backend_servers_[0].port_);
+ ports.push_back(backend_servers_[0].port_);
+ const size_t kNumRpcsPerAddress = 10;
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(ports, {}), 0);
+ // We need to wait for the backend to come online.
+ WaitForBackend(0);
+ // Send kNumRpcsPerAddress RPCs per server.
+ CheckRpcSendOk(kNumRpcsPerAddress * ports.size());
+ // Backend should have gotten 20 requests.
+ EXPECT_EQ(kNumRpcsPerAddress * 2,
+ backend_servers_[0].service_->request_count());
+ // And they should have come from a single client port, because of
+ // subchannel sharing.
+ EXPECT_EQ(1UL, backends_[0]->clients().size());
+ balancers_[0]->NotifyDoneWithServerlists();
+}
+
TEST_F(SingleBalancerTest, SecureNaming) {
ResetStub(0, kApplicationTargetName_ + ";lb");
SetNextResolution({AddressData{balancer_servers_[0].port_, true, "lb"}});