diff options
author | Vijay Pai <vpai@google.com> | 2018-02-21 10:24:15 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2018-02-21 10:24:15 -0800 |
commit | a8453a20d8565ad63ca9ebcdc1b981400a7b09a0 (patch) | |
tree | b554248cd31da09f9120ead2fe9e6eca23ccfa68 /test/cpp/end2end/grpclb_end2end_test.cc | |
parent | 57c52eebce27bd104ae130e72c3833f361b11919 (diff) | |
parent | 5262c087d88123c0f69603f42b6401a4b8fa9282 (diff) |
Merge branch 'master' into 2phase_thd
Diffstat (limited to 'test/cpp/end2end/grpclb_end2end_test.cc')
-rw-r--r-- | test/cpp/end2end/grpclb_end2end_test.cc | 159 |
1 files changed, 139 insertions, 20 deletions
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 211389bfab..a4ecd6d996 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -186,11 +186,16 @@ class BalancerServiceImpl : public BalancerService { Status BalanceLoad(ServerContext* context, Stream* stream) override { gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this); LoadBalanceRequest request; - stream->Read(&request); + std::vector<ResponseDelayPair> responses_and_delays; + + if (!stream->Read(&request)) { + goto done; + } IncreaseRequestCount(); - gpr_log(GPR_INFO, "LB[%p]: recv msg '%s'", this, + gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this, request.DebugString().c_str()); + // TODO(juanlishen): Initial response should always be the first response. if (client_load_reporting_interval_seconds_ > 0) { LoadBalanceResponse initial_response; initial_response.mutable_initial_response() @@ -199,7 +204,6 @@ class BalancerServiceImpl : public BalancerService { stream->Write(initial_response); } - std::vector<ResponseDelayPair> responses_and_delays; { std::unique_lock<std::mutex> lock(mu_); responses_and_delays = responses_and_delays_; @@ -215,14 +219,13 @@ class BalancerServiceImpl : public BalancerService { std::unique_lock<std::mutex> lock(mu_); if (shutdown_) goto done; serverlist_cond_.wait(lock, [this] { return serverlist_ready_; }); - serverlist_ready_ = false; } if (client_load_reporting_interval_seconds_ > 0) { request.Clear(); if (stream->Read(&request)) { - gpr_log(GPR_INFO, "LB[%p]: recv client load report msg: '%s'", this, - request.DebugString().c_str()); + gpr_log(GPR_INFO, "LB[%p]: received client load report message '%s'", + this, request.DebugString().c_str()); GPR_ASSERT(request.has_client_stats()); // We need to acquire the lock here in order to prevent the notify_one // below from firing before its corresponding wait is executed. @@ -295,7 +298,7 @@ class BalancerServiceImpl : public BalancerService { void NotifyDoneWithServerlists() { std::lock_guard<std::mutex> lock(mu_); serverlist_ready_ = true; - serverlist_cond_.notify_one(); + serverlist_cond_.notify_all(); } private: @@ -443,7 +446,7 @@ class GrpclbEnd2endTest : public ::testing::Test { void WaitForBackend(size_t backend_idx) { do { - CheckRpcSendOk(); + SendRpc(); } while (backends_[backend_idx]->request_count() == 0); ResetBackendCounters(); } @@ -662,9 +665,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // and sent two responses. EXPECT_EQ(2U, balancer_servers_[0].service_->response_count()); - - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } TEST_F(SingleBalancerTest, Fallback) { @@ -873,8 +873,6 @@ TEST_F(SingleBalancerTest, BackendsRestart) { // machinery to either update the LB responses "on the fly" or instruct // backends which ports to restart on. CheckRpcSendFailure(); - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } class UpdatesTest : public GrpclbEnd2endTest { @@ -938,8 +936,6 @@ TEST_F(UpdatesTest, UpdateBalancers) { EXPECT_EQ(1U, balancer_servers_[1].service_->response_count()); EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } // Send an update with the same set of LBs as the one in SetUp() in order to @@ -1011,9 +1007,6 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { // doesn't assign the second backend. EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); balancers_[0]->NotifyDoneWithServerlists(); - - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { @@ -1096,8 +1089,134 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U); EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(UpdatesTest, ReresolveDeadBackend) { + ResetStub(500); + // The first resolution contains the addresses of a balancer that never + // responds, and a fallback backend. + std::vector<AddressData> addresses; + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + addresses.emplace_back(AddressData{backend_servers_[0].port_, false, ""}); + SetNextResolution(addresses); + // The re-resolution result will contain the addresses of the same balancer + // and a new fallback backend. + addresses.clear(); + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + addresses.emplace_back(AddressData{backend_servers_[1].port_, false, ""}); + SetNextReresolutionResponse(addresses); + + // Start servers and send 10 RPCs per server. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + // All 10 requests should have gone to the fallback backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + // Kill backend 0. + gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************"); + if (backends_[0]->Shutdown()) backend_servers_[0].Shutdown(); + gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************"); + + // Wait until re-resolution has finished, as signaled by the second backend + // receiving a request. + WaitForBackend(1); + + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + // All 10 requests should have gone to the second backend. + EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); + + balancers_[0]->NotifyDoneWithServerlists(); + balancers_[1]->NotifyDoneWithServerlists(); + balancers_[2]->NotifyDoneWithServerlists(); + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); +} + +// TODO(juanlishen): Should be removed when the first response is always the +// initial response. Currently, if client load reporting is not enabled, the +// balancer doesn't send initial response. When the backend shuts down, an +// unexpected re-resolution will happen. This test configuration is a workaround +// for test ReresolveDeadBalancer. +class UpdatesWithClientLoadReportingTest : public GrpclbEnd2endTest { + public: + UpdatesWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 3, 2) {} +}; + +TEST_F(UpdatesWithClientLoadReportingTest, ReresolveDeadBalancer) { + std::vector<AddressData> addresses; + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + SetNextResolution(addresses); + addresses.clear(); + addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""}); + SetNextReresolutionResponse(addresses); + const std::vector<int> first_backend{GetBackendPorts()[0]}; + const std::vector<int> second_backend{GetBackendPorts()[1]}; + + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); + ScheduleResponseForBalancer( + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); + + // Start servers and send 10 RPCs per server. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + // All 10 requests should have gone to the first backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + // Kill backend 0. + gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************"); + if (backends_[0]->Shutdown()) backend_servers_[0].Shutdown(); + gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************"); + + CheckRpcSendFailure(); + + // Balancer 0 got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + + // Kill balancer 0. + gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************"); + if (balancers_[0]->Shutdown()) balancer_servers_[0].Shutdown(); + gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************"); + + // Wait until re-resolution has finished, as signaled by the second backend + // receiving a request. + WaitForBackend(1); + + // This is serviced by the new serverlist. + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + // All 10 requests should have gone to the second backend. + EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); + + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + // After balancer 0 is killed, we restart an LB call immediately (because we + // disconnect to a previously connected balancer). Although we will cancel + // this call when the re-resolution update is done and another LB call restart + // is needed, this old call may still succeed reaching the LB server if + // re-resolution is slow. So balancer 1 may have received 2 requests and sent + // 2 responses. + EXPECT_GE(balancer_servers_[1].service_->request_count(), 1U); + EXPECT_GE(balancer_servers_[1].service_->response_count(), 1U); + EXPECT_LE(balancer_servers_[1].service_->request_count(), 2U); + EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); } TEST_F(SingleBalancerTest, Drop) { |