diff options
Diffstat (limited to 'test/cpp/end2end/grpclb_end2end_test.cc')
-rw-r--r-- | test/cpp/end2end/grpclb_end2end_test.cc | 310 |
1 files changed, 88 insertions, 222 deletions
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index b5cff664f6..570a3d1067 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -332,7 +332,8 @@ class GrpclbEnd2endTest : public ::testing::Test { num_backends_(num_backends), num_balancers_(num_balancers), client_load_reporting_interval_seconds_( - client_load_reporting_interval_seconds) {} + client_load_reporting_interval_seconds), + kRequestMessage_("Live long and prosper.") {} void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -378,6 +379,10 @@ class GrpclbEnd2endTest : public ::testing::Test { stub_ = grpc::testing::EchoTestService::NewStub(channel_); } + void ResetBackendCounters() { + for (const auto& backend : backends_) backend->ResetCounters(); + } + ClientStats WaitForLoadReports() { ClientStats client_stats; for (const auto& balancer : balancers_) { @@ -386,6 +391,27 @@ class GrpclbEnd2endTest : public ::testing::Test { return client_stats; } + bool SeenAllBackends() { + for (const auto& backend : backends_) { + if (backend->request_count() == 0) return false; + } + return true; + } + + void WaitForAllBackends() { + while (!SeenAllBackends()) { + CheckRpcSendOk(); + } + ResetBackendCounters(); + } + + void WaitForBackend(size_t backend_idx) { + do { + CheckRpcSendOk(); + } while (backends_[backend_idx]->request_count() == 0); + ResetBackendCounters(); + } + struct AddressData { int port; bool is_balancer; @@ -429,20 +455,31 @@ class GrpclbEnd2endTest : public ::testing::Test { balancers_.at(i)->add_response(response, delay_ms); } - std::vector<std::pair<Status, EchoResponse>> SendRpc(const string& message, - int num_rpcs, - int timeout_ms = 1000) { - std::vector<std::pair<Status, EchoResponse>> results; + Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000) { + const bool local_response = (response == nullptr); + if (local_response) response = new EchoResponse; EchoRequest request; - EchoResponse response; - request.set_message(message); - for (int i = 0; i < num_rpcs; i++) { - ClientContext context; - context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); - Status status = stub_->Echo(&context, request, &response); - results.push_back(std::make_pair(status, response)); + request.set_message(kRequestMessage_); + ClientContext context; + context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); + Status status = stub_->Echo(&context, request, response); + if (local_response) delete response; + return status; + } + + void CheckRpcSendOk(const size_t times = 1) { + for (size_t i = 0; i < times; ++i) { + EchoResponse response; + const Status status = SendRpc(&response); + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kRequestMessage_); } - return results; + } + + void CheckRpcSendFailure() { + const Status status = SendRpc(); + EXPECT_FALSE(status.ok()); } template <typename T> @@ -499,14 +536,12 @@ class GrpclbEnd2endTest : public ::testing::Test { const int client_load_reporting_interval_seconds_; std::shared_ptr<Channel> channel_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; - std::vector<std::unique_ptr<BackendServiceImpl>> backends_; std::vector<std::unique_ptr<BalancerServiceImpl>> balancers_; - std::vector<ServerThread<BackendService>> backend_servers_; std::vector<ServerThread<BalancerService>> balancer_servers_; - grpc_fake_resolver_response_generator* response_generator_; + const grpc::string kRequestMessage_; }; class SingleBalancerTest : public GrpclbEnd2endTest { @@ -521,17 +556,12 @@ TEST_F(SingleBalancerTest, Vanilla) { 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); - // Send 100 RPCs per server. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); - - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } + + // We need to wait for all backends to come online. + WaitForAllBackends(); + + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { @@ -561,8 +591,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { const auto t0 = system_clock::now(); // Client will block: LB will initially send empty serverlist. - const auto& statuses_and_responses = - SendRpc(kMessage_, num_backends_, kCallDeadlineMs); + CheckRpcSendOk(num_backends_); const auto ellapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( system_clock::now() - t0); @@ -576,13 +605,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { for (size_t i = 0; i < backends_.size(); ++i) { EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); } - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); @@ -593,70 +615,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } -TEST_F(SingleBalancerTest, RepeatedServerlist) { - constexpr int kServerlistDelayMs = 100; - - // Send a serverlist right away. - ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), - 0); - // ... and the same one a bit later. - ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), - kServerlistDelayMs); - - // Send num_backends/2 requests. - auto statuses_and_responses = SendRpc(kMessage_, num_backends_ / 2); - // only the first half of the backends will receive them. - for (size_t i = 0; i < backends_.size(); ++i) { - if (i < backends_.size() / 2) - EXPECT_EQ(1U, backend_servers_[i].service_->request_count()) - << "for backend #" << i; - else - EXPECT_EQ(0U, backend_servers_[i].service_->request_count()) - << "for backend #" << i; - } - EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - - // Wait for the (duplicated) serverlist update. - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(kServerlistDelayMs * 1.1, GPR_TIMESPAN))); - - // Verify the LB has sent two responses. - EXPECT_EQ(2U, balancer_servers_[0].service_->response_count()); - - // Some more calls to complete the total number of backends. - statuses_and_responses = SendRpc( - kMessage_, - num_backends_ / 2 + (num_backends_ & 0x1) /* extra one if num_bes odd */); - // Because a duplicated serverlist should have no effect, all backends must - // have been hit once now. - for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); - } - EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - balancers_[0]->NotifyDoneWithServerlists(); - // The balancer got a single request. - EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); -} - TEST_F(SingleBalancerTest, BackendsRestart) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( @@ -664,21 +622,8 @@ TEST_F(SingleBalancerTest, BackendsRestart) { 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); - // Send 100 RPCs per server. - auto statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - // Each backend should have gotten 100 requests. - for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(kNumRpcsPerAddress, - backend_servers_[i].service_->request_count()); - } + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); @@ -687,11 +632,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) { for (size_t i = 0; i < backends_.size(); ++i) { if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown(); } - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - EXPECT_FALSE(status.ok()); - } + CheckRpcSendFailure(); for (size_t i = 0; i < num_backends_; ++i) { backends_.emplace_back(new BackendServiceImpl()); backend_servers_.emplace_back(ServerThread<BackendService>( @@ -703,11 +644,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) { // TODO(dgq): implement the "backend restart" component as well. We need extra // machinery to either update the LB responses "on the fly" or instruct // backends which ports to restart on. - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - EXPECT_FALSE(status.ok()); - } + CheckRpcSendFailure(); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } @@ -727,13 +664,9 @@ TEST_F(UpdatesTest, UpdateBalancers) { // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - auto statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); @@ -758,22 +691,12 @@ TEST_F(UpdatesTest, UpdateBalancers) { // Wait until update has been processed, as signaled by the second backend // receiving a request. EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); - do { - auto statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } - } while (backend_servers_[1].service_->request_count() == 0); + WaitForBackend(1); backend_servers_[1].service_->ResetCounters(); gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); - statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the second backend. EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); @@ -804,13 +727,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - auto statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); @@ -837,11 +756,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN)); // Send 10 seconds worth of RPCs do { - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } + CheckRpcSendOk(); } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); // grpclb continued using the original LB call to the first balancer, which // doesn't assign the second backend. @@ -860,11 +775,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { gpr_time_from_millis(10000, GPR_TIMESPAN)); // Send 10 seconds worth of RPCs do { - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } + CheckRpcSendOk(); } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); // grpclb continued using the original LB call to the first balancer, which // doesn't assign the second backend. @@ -886,12 +797,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - auto statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); @@ -903,12 +810,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // This is serviced by the existing RR policy gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); - statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should again have gone to the first backend. EXPECT_EQ(20U, backend_servers_[0].service_->request_count()); EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); @@ -935,23 +838,13 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // receiving a request. In the meantime, the client continues to be serviced // (by the first backend) without interruption. EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); - do { - auto statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } - } while (backend_servers_[1].service_->request_count() == 0); + WaitForBackend(1); // This is serviced by the existing RR policy backend_servers_[1].service_->ResetCounters(); gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH =========="); - statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the second backend. EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); @@ -974,14 +867,11 @@ TEST_F(SingleBalancerTest, Drop) { 0, BalancerServiceImpl::BuildResponseForBackends( GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}), 0); - // Send 100 RPCs for each server and drop address. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3)); - + // Send kNumRpcsPerAddress RPCs for each server and drop address. size_t num_drops = 0; - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; + for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) { + EchoResponse response; + const Status status = SendRpc(&response); if (!status.ok() && status.error_message() == "Call dropped by load balancing policy") { ++num_drops; @@ -1010,12 +900,9 @@ TEST_F(SingleBalancerTest, DropAllFirst) { 0, BalancerServiceImpl::BuildResponseForBackends( {}, {{"rate_limiting", 1}, {"load_balancing", 1}}), 0); - const auto& statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - EXPECT_FALSE(status.ok()); - EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); - } + const Status status = SendRpc(); + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); } TEST_F(SingleBalancerTest, DropAll) { @@ -1028,21 +915,13 @@ TEST_F(SingleBalancerTest, DropAll) { 1000); // First call succeeds. - auto statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } + CheckRpcSendOk(); // But eventually, the update with only dropped servers is processed and calls // fail. + Status status; do { - statuses_and_responses = SendRpc(kMessage_, 1); - ASSERT_EQ(statuses_and_responses.size(), 1UL); - } while (statuses_and_responses[0].first.ok()); - const Status& status = statuses_and_responses[0].first; + status = SendRpc(); + } while (status.ok()); EXPECT_FALSE(status.ok()); EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); } @@ -1057,18 +936,8 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); - // Send 100 RPCs per server. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); - - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { EXPECT_EQ(kNumRpcsPerAddress, @@ -1096,14 +965,11 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { 0, BalancerServiceImpl::BuildResponseForBackends( GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}), 0); - // Send 100 RPCs for each server and drop address. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3)); size_t num_drops = 0; - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; + for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) { + EchoResponse response; + const Status status = SendRpc(&response); if (!status.ok() && status.error_message() == "Call dropped by load balancing policy") { ++num_drops; |