diff options
Diffstat (limited to 'test/cpp/end2end/grpclb_end2end_test.cc')
-rw-r--r-- | test/cpp/end2end/grpclb_end2end_test.cc | 358 |
1 files changed, 317 insertions, 41 deletions
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 8f901247cc..a8ac631fbd 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -1,33 +1,18 @@ /* * - * Copyright 2017, Google Inc. - * All rights reserved. + * Copyright 2017 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -49,8 +34,8 @@ #include <grpc/support/time.h> extern "C" { +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/iomgr/sockaddr.h" -#include "test/core/end2end/fake_resolver.h" } #include "test/core/util/port.h" @@ -118,6 +103,12 @@ class CountedService : public ServiceType { ++request_count_; } + void ResetCounters() { + std::unique_lock<std::mutex> lock(mu_); + request_count_ = 0; + response_count_ = 0; + } + protected: std::mutex mu_; @@ -182,6 +173,7 @@ class BalancerServiceImpl : public BalancerService { shutdown_(false) {} Status BalanceLoad(ServerContext* context, Stream* stream) override { + gpr_log(GPR_INFO, "LB: BalanceLoad"); LoadBalanceRequest request; stream->Read(&request); IncreaseRequestCount(); @@ -201,9 +193,16 @@ class BalancerServiceImpl : public BalancerService { responses_and_delays = responses_and_delays_; } for (const auto& response_and_delay : responses_and_delays) { - if (shutdown_) break; + { + std::unique_lock<std::mutex> lock(mu_); + if (shutdown_) break; + } SendResponse(stream, response_and_delay.first, response_and_delay.second); } + { + std::unique_lock<std::mutex> lock(mu_); + serverlist_cond_.wait(lock); + } if (client_load_reporting_interval_seconds_ > 0) { request.Clear(); @@ -227,9 +226,10 @@ class BalancerServiceImpl : public BalancerService { client_stats_.num_calls_finished_known_received += request.client_stats().num_calls_finished_known_received(); std::lock_guard<std::mutex> lock(mu_); - cond_.notify_one(); + load_report_cond_.notify_one(); } + gpr_log(GPR_INFO, "LB: done"); return Status::OK; } @@ -238,9 +238,14 @@ class BalancerServiceImpl : public BalancerService { responses_and_delays_.push_back(std::make_pair(response, send_after_ms)); } - void Shutdown() { + // Returns true on its first invocation, false otherwise. + bool Shutdown() { + NotifyDoneWithServerlists(); std::unique_lock<std::mutex> lock(mu_); + const bool prev = !shutdown_; shutdown_ = true; + gpr_log(GPR_INFO, "LB: shut down"); + return prev; } static LoadBalanceResponse BuildResponseForBackends( @@ -265,26 +270,35 @@ class BalancerServiceImpl : public BalancerService { const ClientStats& WaitForLoadReport() { std::unique_lock<std::mutex> lock(mu_); - cond_.wait(lock); + load_report_cond_.wait(lock); return client_stats_; } + void NotifyDoneWithServerlists() { + std::lock_guard<std::mutex> lock(mu_); + serverlist_cond_.notify_one(); + } + private: void SendResponse(Stream* stream, const LoadBalanceResponse& response, int delay_ms) { gpr_log(GPR_INFO, "LB: sleeping for %d ms...", delay_ms); - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(delay_ms, GPR_TIMESPAN))); + if (delay_ms > 0) { + gpr_sleep_until( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(delay_ms, GPR_TIMESPAN))); + } gpr_log(GPR_INFO, "LB: Woke up! Sending response '%s'", response.DebugString().c_str()); - stream->Write(response); IncreaseResponseCount(); + stream->Write(response); } const int client_load_reporting_interval_seconds_; std::vector<ResponseDelayPair> responses_and_delays_; std::mutex mu_; - std::condition_variable cond_; + std::condition_variable load_report_cond_; + std::condition_variable serverlist_cond_; ClientStats client_stats_; bool shutdown_; }; @@ -327,8 +341,7 @@ class GrpclbEnd2endTest : public ::testing::Test { backend_servers_[i].Shutdown(); } for (size_t i = 0; i < balancers_.size(); ++i) { - balancers_[i]->Shutdown(); - balancer_servers_[i].Shutdown(); + if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown(); } grpc_fake_resolver_response_generator_unref(response_generator_); } @@ -337,8 +350,10 @@ class GrpclbEnd2endTest : public ::testing::Test { ChannelArguments args; args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_); - channel_ = CreateCustomChannel("test:///not_used", - InsecureChannelCredentials(), args); + std::ostringstream uri; + uri << "fake:///servername_not_used"; + channel_ = + CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args); stub_ = grpc::testing::EchoTestService::NewStub(channel_); } @@ -498,6 +513,7 @@ TEST_F(SingleBalancerTest, Vanilla) { EXPECT_EQ(kNumRpcsPerAddress, backend_servers_[i].service_->request_count()); } + balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // and sent a single response. @@ -542,7 +558,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { << " message=" << status.error_message(); EXPECT_EQ(response.message(), kMessage_); } - + balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // and sent two responses. @@ -609,13 +625,272 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { << " message=" << status.error_message(); EXPECT_EQ(response.message(), kMessage_); } - + balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +class UpdatesTest : public GrpclbEnd2endTest { + public: + UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {} +}; + +TEST_F(UpdatesTest, UpdateBalancers) { + const std::vector<int> first_backend{GetBackendPorts()[0]}; + const std::vector<int> second_backend{GetBackendPorts()[1]}; + + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + ScheduleResponseForBalancer( + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), + 0); + + // Start servers and send 10 RPCs per server. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + auto statuses_and_responses = SendRpc(kMessage_, 10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + // All 10 requests should have gone to the first backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + balancers_[0]->NotifyDoneWithServerlists(); + balancers_[1]->NotifyDoneWithServerlists(); + balancers_[2]->NotifyDoneWithServerlists(); + // Balancer 0 got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + + std::vector<AddressData> addresses; + addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""}); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); + SetNextResolution(addresses); + gpr_log(GPR_INFO, "========= UPDATE 1 DONE =========="); + + // Wait until update has been processed, as signaled by the second backend + // receiving a request. + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + do { + auto statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + } while (backend_servers_[1].service_->request_count() == 0); + + backend_servers_[1].service_->ResetCounters(); + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + statuses_and_responses = SendRpc(kMessage_, 10); + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + // All 10 requests should have gone to the second backend. + EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); + + balancers_[0]->NotifyDoneWithServerlists(); + balancers_[1]->NotifyDoneWithServerlists(); + balancers_[2]->NotifyDoneWithServerlists(); + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(1U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + // Check LB policy name for the channel. + EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + +// Send an update with the same set of LBs as the one in SetUp() in order to +// verify that the LB channel inside grpclb keeps the initial connection (which +// by definition is also present in the update). +TEST_F(UpdatesTest, UpdateBalancersRepeated) { + const std::vector<int> first_backend{GetBackendPorts()[0]}; + const std::vector<int> second_backend{GetBackendPorts()[0]}; + + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + ScheduleResponseForBalancer( + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), + 0); + + // Start servers and send 10 RPCs per server. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + auto statuses_and_responses = SendRpc(kMessage_, 10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + // All 10 requests should have gone to the first backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + balancers_[0]->NotifyDoneWithServerlists(); + // Balancer 0 got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + + std::vector<AddressData> addresses; + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""}); + addresses.emplace_back(AddressData{balancer_servers_[2].port_, true, ""}); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); + SetNextResolution(addresses); + gpr_log(GPR_INFO, "========= UPDATE 1 DONE =========="); + + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + gpr_timespec deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN)); + // Send 10 seconds worth of RPCs + do { + statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); + // grpclb continued using the original LB call to the first balancer, which + // doesn't assign the second backend. + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + balancers_[0]->NotifyDoneWithServerlists(); + + addresses.clear(); + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""}); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 2 =========="); + SetNextResolution(addresses); + gpr_log(GPR_INFO, "========= UPDATE 2 DONE =========="); + + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(10000, GPR_TIMESPAN)); + // Send 10 seconds worth of RPCs + do { + statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); + // grpclb continued using the original LB call to the first balancer, which + // doesn't assign the second backend. + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + balancers_[0]->NotifyDoneWithServerlists(); + + // Check LB policy name for the channel. + EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { + const std::vector<int> first_backend{GetBackendPorts()[0]}; + const std::vector<int> second_backend{GetBackendPorts()[1]}; + + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + ScheduleResponseForBalancer( + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), + 0); + + // Start servers and send 10 RPCs per server. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + auto statuses_and_responses = SendRpc(kMessage_, 10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + // All 10 requests should have gone to the first backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + // Kill balancer 0 + gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************"); + if (balancers_[0]->Shutdown()) balancer_servers_[0].Shutdown(); + gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************"); + + // This is serviced by the existing RR policy + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + statuses_and_responses = SendRpc(kMessage_, 10); + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + // All 10 requests should again have gone to the first backend. + EXPECT_EQ(20U, backend_servers_[0].service_->request_count()); + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + + balancers_[0]->NotifyDoneWithServerlists(); + balancers_[1]->NotifyDoneWithServerlists(); + balancers_[2]->NotifyDoneWithServerlists(); + // Balancer 0 got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + + std::vector<AddressData> addresses; + addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""}); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); + SetNextResolution(addresses); + gpr_log(GPR_INFO, "========= UPDATE 1 DONE =========="); + + // Wait until update has been processed, as signaled by the second backend + // receiving a request. In the meantime, the client continues to be serviced + // (by the first backend) without interruption. + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + do { + auto statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + } while (backend_servers_[1].service_->request_count() == 0); + + // This is serviced by the existing RR policy + backend_servers_[1].service_->ResetCounters(); + gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH =========="); + statuses_and_responses = SendRpc(kMessage_, 10); + gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH =========="); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + // All 10 requests should have gone to the second backend. + EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); + + balancers_[0]->NotifyDoneWithServerlists(); + balancers_[1]->NotifyDoneWithServerlists(); + balancers_[2]->NotifyDoneWithServerlists(); + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(1U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + // Check LB policy name for the channel. + EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + TEST_F(SingleBalancerTest, Drop) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( @@ -678,6 +953,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { EXPECT_EQ(kNumRpcsPerAddress, backend_servers_[i].service_->request_count()); } + balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // and sent a single response. @@ -723,6 +999,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { EXPECT_EQ(kNumRpcsPerAddress, backend_servers_[i].service_->request_count()); } + balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // and sent a single response. @@ -749,7 +1026,6 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { int main(int argc, char** argv) { grpc_init(); grpc_test_init(argc, argv); - grpc_fake_resolver_init(); ::testing::InitGoogleTest(&argc, argv); const auto result = RUN_ALL_TESTS(); grpc_shutdown(); |