aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end/grpclb_end2end_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/end2end/grpclb_end2end_test.cc')
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc310
1 files changed, 88 insertions, 222 deletions
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index b5cff664f6..570a3d1067 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -332,7 +332,8 @@ class GrpclbEnd2endTest : public ::testing::Test {
num_backends_(num_backends),
num_balancers_(num_balancers),
client_load_reporting_interval_seconds_(
- client_load_reporting_interval_seconds) {}
+ client_load_reporting_interval_seconds),
+ kRequestMessage_("Live long and prosper.") {}
void SetUp() override {
response_generator_ = grpc_fake_resolver_response_generator_create();
@@ -378,6 +379,10 @@ class GrpclbEnd2endTest : public ::testing::Test {
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
+ void ResetBackendCounters() {
+ for (const auto& backend : backends_) backend->ResetCounters();
+ }
+
ClientStats WaitForLoadReports() {
ClientStats client_stats;
for (const auto& balancer : balancers_) {
@@ -386,6 +391,27 @@ class GrpclbEnd2endTest : public ::testing::Test {
return client_stats;
}
+ bool SeenAllBackends() {
+ for (const auto& backend : backends_) {
+ if (backend->request_count() == 0) return false;
+ }
+ return true;
+ }
+
+ void WaitForAllBackends() {
+ while (!SeenAllBackends()) {
+ CheckRpcSendOk();
+ }
+ ResetBackendCounters();
+ }
+
+ void WaitForBackend(size_t backend_idx) {
+ do {
+ CheckRpcSendOk();
+ } while (backends_[backend_idx]->request_count() == 0);
+ ResetBackendCounters();
+ }
+
struct AddressData {
int port;
bool is_balancer;
@@ -429,20 +455,31 @@ class GrpclbEnd2endTest : public ::testing::Test {
balancers_.at(i)->add_response(response, delay_ms);
}
- std::vector<std::pair<Status, EchoResponse>> SendRpc(const string& message,
- int num_rpcs,
- int timeout_ms = 1000) {
- std::vector<std::pair<Status, EchoResponse>> results;
+ Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000) {
+ const bool local_response = (response == nullptr);
+ if (local_response) response = new EchoResponse;
EchoRequest request;
- EchoResponse response;
- request.set_message(message);
- for (int i = 0; i < num_rpcs; i++) {
- ClientContext context;
- context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
- Status status = stub_->Echo(&context, request, &response);
- results.push_back(std::make_pair(status, response));
+ request.set_message(kRequestMessage_);
+ ClientContext context;
+ context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
+ Status status = stub_->Echo(&context, request, response);
+ if (local_response) delete response;
+ return status;
+ }
+
+ void CheckRpcSendOk(const size_t times = 1) {
+ for (size_t i = 0; i < times; ++i) {
+ EchoResponse response;
+ const Status status = SendRpc(&response);
+ EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+ << " message=" << status.error_message();
+ EXPECT_EQ(response.message(), kRequestMessage_);
}
- return results;
+ }
+
+ void CheckRpcSendFailure() {
+ const Status status = SendRpc();
+ EXPECT_FALSE(status.ok());
}
template <typename T>
@@ -499,14 +536,12 @@ class GrpclbEnd2endTest : public ::testing::Test {
const int client_load_reporting_interval_seconds_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
-
std::vector<std::unique_ptr<BackendServiceImpl>> backends_;
std::vector<std::unique_ptr<BalancerServiceImpl>> balancers_;
-
std::vector<ServerThread<BackendService>> backend_servers_;
std::vector<ServerThread<BalancerService>> balancer_servers_;
-
grpc_fake_resolver_response_generator* response_generator_;
+ const grpc::string kRequestMessage_;
};
class SingleBalancerTest : public GrpclbEnd2endTest {
@@ -521,17 +556,12 @@ TEST_F(SingleBalancerTest, Vanilla) {
0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
- // Send 100 RPCs per server.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
-
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
+
+ // We need to wait for all backends to come online.
+ WaitForAllBackends();
+
+ // Send kNumRpcsPerAddress RPCs per server.
+ CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
@@ -561,8 +591,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
const auto t0 = system_clock::now();
// Client will block: LB will initially send empty serverlist.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, num_backends_, kCallDeadlineMs);
+ CheckRpcSendOk(num_backends_);
const auto ellapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
system_clock::now() - t0);
@@ -576,13 +605,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
}
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
@@ -593,70 +615,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
-TEST_F(SingleBalancerTest, RepeatedServerlist) {
- constexpr int kServerlistDelayMs = 100;
-
- // Send a serverlist right away.
- ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
- 0);
- // ... and the same one a bit later.
- ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
- kServerlistDelayMs);
-
- // Send num_backends/2 requests.
- auto statuses_and_responses = SendRpc(kMessage_, num_backends_ / 2);
- // only the first half of the backends will receive them.
- for (size_t i = 0; i < backends_.size(); ++i) {
- if (i < backends_.size() / 2)
- EXPECT_EQ(1U, backend_servers_[i].service_->request_count())
- << "for backend #" << i;
- else
- EXPECT_EQ(0U, backend_servers_[i].service_->request_count())
- << "for backend #" << i;
- }
- EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
-
- // Wait for the (duplicated) serverlist update.
- gpr_sleep_until(gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(kServerlistDelayMs * 1.1, GPR_TIMESPAN)));
-
- // Verify the LB has sent two responses.
- EXPECT_EQ(2U, balancer_servers_[0].service_->response_count());
-
- // Some more calls to complete the total number of backends.
- statuses_and_responses = SendRpc(
- kMessage_,
- num_backends_ / 2 + (num_backends_ & 0x1) /* extra one if num_bes odd */);
- // Because a duplicated serverlist should have no effect, all backends must
- // have been hit once now.
- for (size_t i = 0; i < backends_.size(); ++i) {
- EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
- }
- EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
- balancers_[0]->NotifyDoneWithServerlists();
- // The balancer got a single request.
- EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
- // Check LB policy name for the channel.
- EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
-}
-
TEST_F(SingleBalancerTest, BackendsRestart) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
@@ -664,21 +622,8 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
- // Send 100 RPCs per server.
- auto statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
- // Each backend should have gotten 100 requests.
- for (size_t i = 0; i < backends_.size(); ++i) {
- EXPECT_EQ(kNumRpcsPerAddress,
- backend_servers_[i].service_->request_count());
- }
+ // Send kNumRpcsPerAddress RPCs per server.
+ CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
@@ -687,11 +632,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
}
- statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- EXPECT_FALSE(status.ok());
- }
+ CheckRpcSendFailure();
for (size_t i = 0; i < num_backends_; ++i) {
backends_.emplace_back(new BackendServiceImpl());
backend_servers_.emplace_back(ServerThread<BackendService>(
@@ -703,11 +644,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
// TODO(dgq): implement the "backend restart" component as well. We need extra
// machinery to either update the LB responses "on the fly" or instruct
// backends which ports to restart on.
- statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- EXPECT_FALSE(status.ok());
- }
+ CheckRpcSendFailure();
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
@@ -727,13 +664,9 @@ TEST_F(UpdatesTest, UpdateBalancers) {
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
- auto statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
@@ -758,22 +691,12 @@ TEST_F(UpdatesTest, UpdateBalancers) {
// Wait until update has been processed, as signaled by the second backend
// receiving a request.
EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
- do {
- auto statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
- } while (backend_servers_[1].service_->request_count() == 0);
+ WaitForBackend(1);
backend_servers_[1].service_->ResetCounters();
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
- statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
@@ -804,13 +727,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
- auto statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
@@ -837,11 +756,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN));
// Send 10 seconds worth of RPCs
do {
- statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
+ CheckRpcSendOk();
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
// grpclb continued using the original LB call to the first balancer, which
// doesn't assign the second backend.
@@ -860,11 +775,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
gpr_time_from_millis(10000, GPR_TIMESPAN));
// Send 10 seconds worth of RPCs
do {
- statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
+ CheckRpcSendOk();
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
// grpclb continued using the original LB call to the first balancer, which
// doesn't assign the second backend.
@@ -886,12 +797,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
- auto statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
@@ -903,12 +810,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// This is serviced by the existing RR policy
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
- statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should again have gone to the first backend.
EXPECT_EQ(20U, backend_servers_[0].service_->request_count());
EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
@@ -935,23 +838,13 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// receiving a request. In the meantime, the client continues to be serviced
// (by the first backend) without interruption.
EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
- do {
- auto statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
- } while (backend_servers_[1].service_->request_count() == 0);
+ WaitForBackend(1);
// This is serviced by the existing RR policy
backend_servers_[1].service_->ResetCounters();
gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
- statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
@@ -974,14 +867,11 @@ TEST_F(SingleBalancerTest, Drop) {
0, BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}),
0);
- // Send 100 RPCs for each server and drop address.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3));
-
+ // Send kNumRpcsPerAddress RPCs for each server and drop address.
size_t num_drops = 0;
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
+ for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) {
+ EchoResponse response;
+ const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
@@ -1010,12 +900,9 @@ TEST_F(SingleBalancerTest, DropAllFirst) {
0, BalancerServiceImpl::BuildResponseForBackends(
{}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
0);
- const auto& statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- EXPECT_FALSE(status.ok());
- EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
- }
+ const Status status = SendRpc();
+ EXPECT_FALSE(status.ok());
+ EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
}
TEST_F(SingleBalancerTest, DropAll) {
@@ -1028,21 +915,13 @@ TEST_F(SingleBalancerTest, DropAll) {
1000);
// First call succeeds.
- auto statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
+ CheckRpcSendOk();
// But eventually, the update with only dropped servers is processed and calls
// fail.
+ Status status;
do {
- statuses_and_responses = SendRpc(kMessage_, 1);
- ASSERT_EQ(statuses_and_responses.size(), 1UL);
- } while (statuses_and_responses[0].first.ok());
- const Status& status = statuses_and_responses[0].first;
+ status = SendRpc();
+ } while (status.ok());
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
}
@@ -1057,18 +936,8 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
- // Send 100 RPCs per server.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
-
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
-
+ // Send kNumRpcsPerAddress RPCs per server.
+ CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress,
@@ -1096,14 +965,11 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
0, BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}),
0);
- // Send 100 RPCs for each server and drop address.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3));
size_t num_drops = 0;
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
+ for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) {
+ EchoResponse response;
+ const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;