aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end/grpclb_end2end_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/end2end/grpclb_end2end_test.cc')
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc92
1 files changed, 71 insertions, 21 deletions
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 6ce0696114..f739ed032b 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -18,6 +18,7 @@
#include <memory>
#include <mutex>
+#include <set>
#include <sstream>
#include <thread>
@@ -32,7 +33,9 @@
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
+#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -143,6 +146,7 @@ class BackendServiceImpl : public BackendService {
IncreaseRequestCount();
const auto status = TestServiceImpl::Echo(context, request, response);
IncreaseResponseCount();
+ AddClient(context->peer());
return status;
}
@@ -155,9 +159,21 @@ class BackendServiceImpl : public BackendService {
return prev;
}
+ std::set<grpc::string> clients() {
+ std::unique_lock<std::mutex> lock(clients_mu_);
+ return clients_;
+ }
+
private:
+ void AddClient(const grpc::string& client) {
+ std::unique_lock<std::mutex> lock(clients_mu_);
+ clients_.insert(client);
+ }
+
std::mutex mu_;
bool shutdown_ = false;
+ std::mutex clients_mu_;
+ std::set<grpc::string> clients_;
};
grpc::string Ip4ToPackedString(const char* ip_str) {
@@ -301,6 +317,11 @@ class BalancerServiceImpl : public BalancerService {
auto* server = response.mutable_server_list()->add_servers();
server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
server->set_port(backend_port);
+ static int token_count = 0;
+ char* token;
+ gpr_asprintf(&token, "token%03d", ++token_count);
+ server->set_load_balance_token(token);
+ gpr_free(token);
}
return response;
}
@@ -412,8 +433,8 @@ class GrpclbEnd2endTest : public ::testing::Test {
std::shared_ptr<ChannelCredentials> creds(
new SecureChannelCredentials(grpc_composite_channel_credentials_create(
channel_creds, call_creds, nullptr)));
- grpc_call_credentials_unref(call_creds);
- grpc_channel_credentials_unref(channel_creds);
+ call_creds->Unref();
+ channel_creds->Unref();
channel_ = CreateCustomChannel(uri.str(), creds, args);
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
@@ -486,18 +507,27 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpc::string balancer_name;
};
- grpc_lb_addresses* CreateLbAddressesFromAddressDataList(
+ grpc_core::ServerAddressList CreateLbAddressesFromAddressDataList(
const std::vector<AddressData>& address_data) {
- grpc_lb_addresses* addresses =
- grpc_lb_addresses_create(address_data.size(), nullptr);
- for (size_t i = 0; i < address_data.size(); ++i) {
+ grpc_core::ServerAddressList addresses;
+ for (const auto& addr : address_data) {
char* lb_uri_str;
- gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", address_data[i].port);
+ gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", addr.port);
grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
GPR_ASSERT(lb_uri != nullptr);
- grpc_lb_addresses_set_address_from_uri(
- addresses, i, lb_uri, address_data[i].is_balancer,
- address_data[i].balancer_name.c_str(), nullptr);
+ grpc_resolved_address address;
+ GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
+ std::vector<grpc_arg> args_to_add;
+ if (addr.is_balancer) {
+ args_to_add.emplace_back(grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_ADDRESS_IS_BALANCER), 1));
+ args_to_add.emplace_back(grpc_channel_arg_string_create(
+ const_cast<char*>(GRPC_ARG_ADDRESS_BALANCER_NAME),
+ const_cast<char*>(addr.balancer_name.c_str())));
+ }
+ grpc_channel_args* args = grpc_channel_args_copy_and_add(
+ nullptr, args_to_add.data(), args_to_add.size());
+ addresses.emplace_back(address.addr, address.len, args);
grpc_uri_destroy(lb_uri);
gpr_free(lb_uri_str);
}
@@ -506,23 +536,21 @@ class GrpclbEnd2endTest : public ::testing::Test {
void SetNextResolution(const std::vector<AddressData>& address_data) {
grpc_core::ExecCtx exec_ctx;
- grpc_lb_addresses* addresses =
+ grpc_core::ServerAddressList addresses =
CreateLbAddressesFromAddressDataList(address_data);
- grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses);
+ grpc_arg fake_addresses = CreateServerAddressListChannelArg(&addresses);
grpc_channel_args fake_result = {1, &fake_addresses};
response_generator_->SetResponse(&fake_result);
- grpc_lb_addresses_destroy(addresses);
}
void SetNextReresolutionResponse(
const std::vector<AddressData>& address_data) {
grpc_core::ExecCtx exec_ctx;
- grpc_lb_addresses* addresses =
+ grpc_core::ServerAddressList addresses =
CreateLbAddressesFromAddressDataList(address_data);
- grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses);
+ grpc_arg fake_addresses = CreateServerAddressListChannelArg(&addresses);
grpc_channel_args fake_result = {1, &fake_addresses};
response_generator_->SetReresolutionResponse(&fake_result);
- grpc_lb_addresses_destroy(addresses);
}
const std::vector<int> GetBackendPorts(const size_t start_index = 0) const {
@@ -553,10 +581,11 @@ class GrpclbEnd2endTest : public ::testing::Test {
return status;
}
- void CheckRpcSendOk(const size_t times = 1, const int timeout_ms = 1000) {
+ void CheckRpcSendOk(const size_t times = 1, const int timeout_ms = 1000,
+ bool wait_for_ready = false) {
for (size_t i = 0; i < times; ++i) {
EchoResponse response;
- const Status status = SendRpc(&response, timeout_ms);
+ const Status status = SendRpc(&response, timeout_ms, wait_for_ready);
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage_);
@@ -665,6 +694,28 @@ TEST_F(SingleBalancerTest, Vanilla) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
+TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) {
+ SetNextResolutionAllBalancers();
+ // Same backend listed twice.
+ std::vector<int> ports;
+ ports.push_back(backend_servers_[0].port_);
+ ports.push_back(backend_servers_[0].port_);
+ const size_t kNumRpcsPerAddress = 10;
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(ports, {}), 0);
+ // We need to wait for the backend to come online.
+ WaitForBackend(0);
+ // Send kNumRpcsPerAddress RPCs per server.
+ CheckRpcSendOk(kNumRpcsPerAddress * ports.size());
+ // Backend should have gotten 20 requests.
+ EXPECT_EQ(kNumRpcsPerAddress * 2,
+ backend_servers_[0].service_->request_count());
+ // And they should have come from a single client port, because of
+ // subchannel sharing.
+ EXPECT_EQ(1UL, backends_[0]->clients().size());
+ balancers_[0]->NotifyDoneWithServerlists();
+}
+
TEST_F(SingleBalancerTest, SecureNaming) {
ResetStub(0, kApplicationTargetName_ + ";lb");
SetNextResolution({AddressData{balancer_servers_[0].port_, true, "lb"}});
@@ -717,10 +768,9 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
kServerlistDelayMs);
-
const auto t0 = system_clock::now();
// Client will block: LB will initially send empty serverlist.
- CheckRpcSendOk(1, kCallDeadlineMs);
+ CheckRpcSendOk(1, kCallDeadlineMs, true /* wait_for_ready */);
const auto ellapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
system_clock::now() - t0);
@@ -1518,7 +1568,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
int main(int argc, char** argv) {
grpc_init();
- grpc_test_init(argc, argv);
+ grpc::testing::TestEnvironment env(argc, argv);
::testing::InitGoogleTest(&argc, argv);
const auto result = RUN_ALL_TESTS();
grpc_shutdown();