diff options
author | 2017-08-31 10:54:11 -0700 | |
---|---|---|
committer | 2017-08-31 10:54:11 -0700 | |
commit | 8fd40d5ed9303c693865ff431b7565173a4b898e (patch) | |
tree | 5d3f6ce958de74e79a8bbaf71b7eb8bbd8acfecd /test/cpp/end2end | |
parent | c9c78ee96c5a73234513decd398a63d48f14aa89 (diff) | |
parent | 8d51e8d17e012f81ca8e94c18f525e1781130481 (diff) |
Merge github.com:grpc/grpc into wc
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r-- | test/cpp/end2end/BUILD | 10 | ||||
-rw-r--r-- | test/cpp/end2end/client_lb_end2end_test.cc | 108 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 8 | ||||
-rw-r--r-- | test/cpp/end2end/grpclb_end2end_test.cc | 223 |
4 files changed, 250 insertions, 99 deletions
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 27c5492c17..b8505c1ae7 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -14,15 +14,9 @@ licenses(["notice"]) # Apache v2 -load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test") +load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package") -package( - default_visibility=["//visibility:public"], # Allows external users to implement end2end tests. - features = [ - "-layering_check", - "-parse_headers", - ], -) +grpc_package(name = "test/cpp/end2end", visibility = "public") # Allows external users to implement end2end tests. grpc_cc_library( name = "test_service_impl", diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index e1160ecdc6..b588eda84f 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -85,7 +85,8 @@ class MyTestServiceImpl : public TestServiceImpl { class ClientLbEnd2endTest : public ::testing::Test { protected: - ClientLbEnd2endTest() : server_host_("localhost") {} + ClientLbEnd2endTest() + : server_host_("localhost"), kRequestMessage_("Live long and prosper.") {} void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -139,6 +140,7 @@ class ClientLbEnd2endTest : public ::testing::Test { } // else, default to pick first args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_); + args.SetInt("grpc.testing.fixed_reconnect_backoff_ms", 2000); std::ostringstream uri; uri << "fake:///"; for (size_t i = 0; i < servers_.size() - 1; ++i) { @@ -150,18 +152,27 @@ class ClientLbEnd2endTest : public ::testing::Test { stub_ = grpc::testing::EchoTestService::NewStub(channel_); } - void SendRpc(bool expect_ok = true) { + Status SendRpc(EchoResponse* response = nullptr) { + const bool local_response = (response == nullptr); + if (local_response) response = new EchoResponse; EchoRequest request; - EchoResponse response; - request.set_message("Live long and prosper."); + request.set_message(kRequestMessage_); ClientContext context; - Status status = stub_->Echo(&context, request, &response); - if (expect_ok) { - EXPECT_TRUE(status.ok()); - EXPECT_EQ(response.message(), request.message()); - } else { - EXPECT_FALSE(status.ok()); - } + Status status = stub_->Echo(&context, request, response); + if (local_response) delete response; + return status; + } + + void CheckRpcSendOk() { + EchoResponse response; + const Status status = SendRpc(&response); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(response.message(), kRequestMessage_); + } + + void CheckRpcSendFailure() { + const Status status = SendRpc(); + EXPECT_FALSE(status.ok()); } struct ServerData { @@ -207,7 +218,7 @@ class ClientLbEnd2endTest : public ::testing::Test { void WaitForServer(size_t server_idx) { do { - SendRpc(); + CheckRpcSendOk(); } while (servers_[server_idx]->service_.request_count() == 0); ResetCounters(); } @@ -217,6 +228,7 @@ class ClientLbEnd2endTest : public ::testing::Test { std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::vector<std::unique_ptr<ServerData>> servers_; grpc_fake_resolver_response_generator* response_generator_; + const grpc::string kRequestMessage_; }; TEST_F(ClientLbEnd2endTest, PickFirst) { @@ -230,7 +242,7 @@ TEST_F(ClientLbEnd2endTest, PickFirst) { } SetNextResolution(ports); for (size_t i = 0; i < servers_.size(); ++i) { - SendRpc(); + CheckRpcSendOk(); } // All requests should have gone to a single server. bool found = false; @@ -258,7 +270,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { ports.emplace_back(servers_[0]->port_); SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET [0] *******"); - SendRpc(); + CheckRpcSendOk(); EXPECT_EQ(servers_[0]->service_.request_count(), 1); // An empty update will result in the channel going into TRANSIENT_FAILURE. @@ -304,7 +316,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) { ports.emplace_back(servers_[0]->port_); SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET [0] *******"); - SendRpc(); + CheckRpcSendOk(); EXPECT_EQ(servers_[0]->service_.request_count(), 1); servers_[0]->service_.ResetCounters(); @@ -314,7 +326,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) { ports.emplace_back(servers_[0]->port_); SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET superset *******"); - SendRpc(); + CheckRpcSendOk(); // We stick to the previously connected server. WaitForServer(0); EXPECT_EQ(0, servers_[1]->service_.request_count()); @@ -338,7 +350,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) { for (size_t i = 0; i < 1000; ++i) { std::random_shuffle(ports.begin(), ports.end()); SetNextResolution(ports); - if (i % 10 == 0) SendRpc(); + if (i % 10 == 0) CheckRpcSendOk(); } } // Check LB policy name for the channel. @@ -356,7 +368,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) { } SetNextResolution(ports); for (size_t i = 0; i < servers_.size(); ++i) { - SendRpc(); + CheckRpcSendOk(); } // One request should have gone to each server. for (size_t i = 0; i < servers_.size(); ++i) { @@ -378,7 +390,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { SetNextResolution(ports); WaitForServer(0); // Send RPCs. They should all go servers_[0] - for (size_t i = 0; i < 10; ++i) SendRpc(); + for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(); EXPECT_EQ(10, servers_[0]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count()); @@ -394,7 +406,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { EXPECT_EQ(0, servers_[1]->service_.request_count()); WaitForServer(1); - for (size_t i = 0; i < 10; ++i) SendRpc(); + for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(); EXPECT_EQ(0, servers_[0]->service_.request_count()); EXPECT_EQ(10, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count()); @@ -406,7 +418,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { SetNextResolution(ports); WaitForServer(2); - for (size_t i = 0; i < 10; ++i) SendRpc(); + for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(); EXPECT_EQ(0, servers_[0]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(10, servers_[2]->service_.request_count()); @@ -423,7 +435,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { WaitForServer(2); // Send three RPCs, one per server. - for (size_t i = 0; i < 3; ++i) SendRpc(); + for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(); EXPECT_EQ(1, servers_[0]->service_.request_count()); EXPECT_EQ(1, servers_[1]->service_.request_count()); EXPECT_EQ(1, servers_[2]->service_.request_count()); @@ -450,6 +462,37 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); } +TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { + const int kNumServers = 3; + StartServers(kNumServers); + ResetStub("round_robin"); + std::vector<int> ports; + + // Start with a single server. + ports.emplace_back(servers_[0]->port_); + SetNextResolution(ports); + WaitForServer(0); + // Send RPCs. They should all go to servers_[0] + for (size_t i = 0; i < 10; ++i) SendRpc(); + EXPECT_EQ(10, servers_[0]->service_.request_count()); + EXPECT_EQ(0, servers_[1]->service_.request_count()); + EXPECT_EQ(0, servers_[2]->service_.request_count()); + servers_[0]->service_.ResetCounters(); + + // Shutdown one of the servers to be sent in the update. + servers_[1]->Shutdown(false); + ports.emplace_back(servers_[1]->port_); + ports.emplace_back(servers_[2]->port_); + SetNextResolution(ports); + WaitForServer(0); + WaitForServer(2); + + // Send three RPCs, one per server. + for (size_t i = 0; i < kNumServers; ++i) SendRpc(); + // The server in shutdown shouldn't receive any. + EXPECT_EQ(0, servers_[1]->service_.request_count()); +} + TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; @@ -462,7 +505,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) { for (size_t i = 0; i < 1000; ++i) { std::random_shuffle(ports.begin(), ports.end()); SetNextResolution(ports); - if (i % 10 == 0) SendRpc(); + if (i % 10 == 0) CheckRpcSendOk(); } // Check LB policy name for the channel. EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); @@ -473,11 +516,13 @@ TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) { // update provisions of RR. } -TEST_F(ClientLbEnd2endTest, RoundRobinReconnect) { +TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { // Start servers and send one RPC per server. - const int kNumServers = 1; + const int kNumServers = 3; std::vector<int> ports; - ports.push_back(grpc_pick_unused_port_or_die()); + for (int i = 0; i < kNumServers; ++i) { + ports.push_back(grpc_pick_unused_port_or_die()); + } StartServers(kNumServers, ports); ResetStub("round_robin"); SetNextResolution(ports); @@ -486,24 +531,19 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReconnect) { // state READY in the order in which the addresses are specified, // which is only true because the backends are all local. for (size_t i = 0; i < servers_.size(); ++i) { - SendRpc(); + CheckRpcSendOk(); EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i; } - // Check LB policy name for the channel. - EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); - // Kill all servers for (size_t i = 0; i < servers_.size(); ++i) { servers_[i]->Shutdown(false); } // Client request should fail. - SendRpc(false); - + CheckRpcSendFailure(); // Bring servers back up on the same port (we aren't recreating the channel). StartServers(kNumServers, ports); - // Client request should succeed. - SendRpc(); + CheckRpcSendOk(); } } // namespace diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 8d12971bc1..8bada48a2b 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -1565,7 +1565,9 @@ TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) { Status s = stub_->Echo(&context, request, &response); EXPECT_FALSE(s.ok()); EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); - EXPECT_EQ(s.error_message(), kTestCredsPluginErrorMsg); + EXPECT_EQ(s.error_message(), + grpc::string("Getting metadata from plugin failed with error: ") + + kTestCredsPluginErrorMsg); } TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) { @@ -1624,7 +1626,9 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) { Status s = stub_->Echo(&context, request, &response); EXPECT_FALSE(s.ok()); EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); - EXPECT_EQ(s.error_message(), kTestCredsPluginErrorMsg); + EXPECT_EQ(s.error_message(), + grpc::string("Getting metadata from plugin failed with error: ") + + kTestCredsPluginErrorMsg); } TEST_P(SecureEnd2endTest, ClientAuthContext) { diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 86cce2d30d..b5cff664f6 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -45,6 +45,7 @@ extern "C" { #include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" +#include <gmock/gmock.h> #include <gtest/gtest.h> // TODO(dgq): Other scenarios in need of testing: @@ -73,8 +74,8 @@ extern "C" { using std::chrono::system_clock; -using grpc::lb::v1::LoadBalanceResponse; using grpc::lb::v1::LoadBalanceRequest; +using grpc::lb::v1::LoadBalanceResponse; using grpc::lb::v1::LoadBalancer; namespace grpc { @@ -131,6 +132,19 @@ class BackendServiceImpl : public BackendService { IncreaseResponseCount(); return status; } + + // Returns true on its first invocation, false otherwise. + bool Shutdown() { + std::unique_lock<std::mutex> lock(mu_); + const bool prev = !shutdown_; + shutdown_ = true; + gpr_log(GPR_INFO, "Backend: shut down"); + return prev; + } + + private: + std::mutex mu_; + bool shutdown_ = false; }; grpc::string Ip4ToPackedString(const char* ip_str) { @@ -142,22 +156,20 @@ grpc::string Ip4ToPackedString(const char* ip_str) { struct ClientStats { size_t num_calls_started = 0; size_t num_calls_finished = 0; - size_t num_calls_finished_with_drop_for_rate_limiting = 0; - size_t num_calls_finished_with_drop_for_load_balancing = 0; size_t num_calls_finished_with_client_failed_to_send = 0; size_t num_calls_finished_known_received = 0; + std::map<grpc::string, size_t> drop_token_counts; ClientStats& operator+=(const ClientStats& other) { num_calls_started += other.num_calls_started; num_calls_finished += other.num_calls_finished; - num_calls_finished_with_drop_for_rate_limiting += - other.num_calls_finished_with_drop_for_rate_limiting; - num_calls_finished_with_drop_for_load_balancing += - other.num_calls_finished_with_drop_for_load_balancing; num_calls_finished_with_client_failed_to_send += other.num_calls_finished_with_client_failed_to_send; num_calls_finished_known_received += other.num_calls_finished_known_received; + for (const auto& p : other.drop_token_counts) { + drop_token_counts[p.first] += p.second; + } return *this; } }; @@ -173,11 +185,12 @@ class BalancerServiceImpl : public BalancerService { shutdown_(false) {} Status BalanceLoad(ServerContext* context, Stream* stream) override { - gpr_log(GPR_INFO, "LB: BalanceLoad"); + gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this); LoadBalanceRequest request; stream->Read(&request); IncreaseRequestCount(); - gpr_log(GPR_INFO, "LB: recv msg '%s'", request.DebugString().c_str()); + gpr_log(GPR_INFO, "LB[%p]: recv msg '%s'", this, + request.DebugString().c_str()); if (client_load_reporting_interval_seconds_ > 0) { LoadBalanceResponse initial_response; @@ -202,13 +215,14 @@ class BalancerServiceImpl : public BalancerService { { std::unique_lock<std::mutex> lock(mu_); if (shutdown_) goto done; - serverlist_cond_.wait(lock); + serverlist_cond_.wait(lock, [this] { return serverlist_ready_; }); + serverlist_ready_ = false; } if (client_load_reporting_interval_seconds_ > 0) { request.Clear(); stream->Read(&request); - gpr_log(GPR_INFO, "LB: recv client load report msg: '%s'", + gpr_log(GPR_INFO, "LB[%p]: recv client load report msg: '%s'", this, request.DebugString().c_str()); GPR_ASSERT(request.has_client_stats()); // We need to acquire the lock here in order to prevent the notify_one @@ -218,21 +232,22 @@ class BalancerServiceImpl : public BalancerService { request.client_stats().num_calls_started(); client_stats_.num_calls_finished += request.client_stats().num_calls_finished(); - client_stats_.num_calls_finished_with_drop_for_rate_limiting += - request.client_stats() - .num_calls_finished_with_drop_for_rate_limiting(); - client_stats_.num_calls_finished_with_drop_for_load_balancing += - request.client_stats() - .num_calls_finished_with_drop_for_load_balancing(); client_stats_.num_calls_finished_with_client_failed_to_send += request.client_stats() .num_calls_finished_with_client_failed_to_send(); client_stats_.num_calls_finished_known_received += request.client_stats().num_calls_finished_known_received(); + for (const auto& drop_token_count : + request.client_stats().calls_finished_with_drop()) { + client_stats_ + .drop_token_counts[drop_token_count.load_balance_token()] += + drop_token_count.num_calls(); + } + load_report_ready_ = true; load_report_cond_.notify_one(); } done: - gpr_log(GPR_INFO, "LB: done"); + gpr_log(GPR_INFO, "LB[%p]: done", this); return Status::OK; } @@ -247,21 +262,20 @@ class BalancerServiceImpl : public BalancerService { std::unique_lock<std::mutex> lock(mu_); const bool prev = !shutdown_; shutdown_ = true; - gpr_log(GPR_INFO, "LB: shut down"); + gpr_log(GPR_INFO, "LB[%p]: shut down", this); return prev; } static LoadBalanceResponse BuildResponseForBackends( - const std::vector<int>& backend_ports, int num_drops_for_rate_limiting, - int num_drops_for_load_balancing) { + const std::vector<int>& backend_ports, + const std::map<grpc::string, size_t>& drop_token_counts) { LoadBalanceResponse response; - for (int i = 0; i < num_drops_for_rate_limiting; ++i) { - auto* server = response.mutable_server_list()->add_servers(); - server->set_drop_for_rate_limiting(true); - } - for (int i = 0; i < num_drops_for_load_balancing; ++i) { - auto* server = response.mutable_server_list()->add_servers(); - server->set_drop_for_load_balancing(true); + for (const auto& drop_token_count : drop_token_counts) { + for (size_t i = 0; i < drop_token_count.second; ++i) { + auto* server = response.mutable_server_list()->add_servers(); + server->set_drop(true); + server->set_load_balance_token(drop_token_count.first); + } } for (const int& backend_port : backend_ports) { auto* server = response.mutable_server_list()->add_servers(); @@ -273,25 +287,27 @@ class BalancerServiceImpl : public BalancerService { const ClientStats& WaitForLoadReport() { std::unique_lock<std::mutex> lock(mu_); - load_report_cond_.wait(lock); + load_report_cond_.wait(lock, [this] { return load_report_ready_; }); + load_report_ready_ = false; return client_stats_; } void NotifyDoneWithServerlists() { std::lock_guard<std::mutex> lock(mu_); + serverlist_ready_ = true; serverlist_cond_.notify_one(); } private: void SendResponse(Stream* stream, const LoadBalanceResponse& response, int delay_ms) { - gpr_log(GPR_INFO, "LB: sleeping for %d ms...", delay_ms); + gpr_log(GPR_INFO, "LB[%p]: sleeping for %d ms...", this, delay_ms); if (delay_ms > 0) { gpr_sleep_until( gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(delay_ms, GPR_TIMESPAN))); } - gpr_log(GPR_INFO, "LB: Woke up! Sending response '%s'", + gpr_log(GPR_INFO, "LB[%p]: Woke up! Sending response '%s'", this, response.DebugString().c_str()); IncreaseResponseCount(); stream->Write(response); @@ -301,7 +317,9 @@ class BalancerServiceImpl : public BalancerService { std::vector<ResponseDelayPair> responses_and_delays_; std::mutex mu_; std::condition_variable load_report_cond_; + bool load_report_ready_ = false; std::condition_variable serverlist_cond_; + bool serverlist_ready_ = false; ClientStats client_stats_; bool shutdown_; }; @@ -341,7 +359,7 @@ class GrpclbEnd2endTest : public ::testing::Test { void TearDown() override { for (size_t i = 0; i < backends_.size(); ++i) { - backend_servers_[i].Shutdown(); + if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown(); } for (size_t i = 0; i < balancers_.size(); ++i) { if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown(); @@ -499,7 +517,7 @@ class SingleBalancerTest : public GrpclbEnd2endTest { TEST_F(SingleBalancerTest, Vanilla) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); @@ -538,7 +556,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0); // Send non-empty serverlist only after kServerlistDelayMs ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), kServerlistDelayMs); const auto t0 = system_clock::now(); @@ -580,11 +598,11 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { // Send a serverlist right away. ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); // ... and the same one a bit later. ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), kServerlistDelayMs); // Send num_backends/2 requests. @@ -639,6 +657,61 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +TEST_F(SingleBalancerTest, BackendsRestart) { + const size_t kNumRpcsPerAddress = 100; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), + 0); + // Make sure that trying to connect works without a call. + channel_->GetState(true /* try_to_connect */); + // Send 100 RPCs per server. + auto statuses_and_responses = + SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + const EchoResponse& response = status_and_response.second; + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kMessage_); + } + // Each backend should have gotten 100 requests. + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(kNumRpcsPerAddress, + backend_servers_[i].service_->request_count()); + } + balancers_[0]->NotifyDoneWithServerlists(); + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown(); + } + statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + EXPECT_FALSE(status.ok()); + } + for (size_t i = 0; i < num_backends_; ++i) { + backends_.emplace_back(new BackendServiceImpl()); + backend_servers_.emplace_back(ServerThread<BackendService>( + "backend", server_host_, backends_.back().get())); + } + // The following RPC will fail due to the backend ports having changed. It + // will nonetheless exercise the grpclb-roundrobin handling of the RR policy + // having gone into shutdown. + // TODO(dgq): implement the "backend restart" component as well. We need extra + // machinery to either update the LB responses "on the fly" or instruct + // backends which ports to restart on. + statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + EXPECT_FALSE(status.ok()); + } + // Check LB policy name for the channel. + EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + class UpdatesTest : public GrpclbEnd2endTest { public: UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {} @@ -647,12 +720,10 @@ class UpdatesTest : public GrpclbEnd2endTest { TEST_F(UpdatesTest, UpdateBalancers) { const std::vector<int> first_backend{GetBackendPorts()[0]}; const std::vector<int> second_backend{GetBackendPorts()[1]}; - ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); ScheduleResponseForBalancer( - 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), - 0); + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); @@ -727,10 +798,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { const std::vector<int> second_backend{GetBackendPorts()[0]}; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); ScheduleResponseForBalancer( - 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), - 0); + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); @@ -810,10 +880,9 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { const std::vector<int> second_backend{GetBackendPorts()[1]}; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); ScheduleResponseForBalancer( - 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), - 0); + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); @@ -902,7 +971,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { TEST_F(SingleBalancerTest, Drop) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 1, 2), + 0, BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}), 0); // Send 100 RPCs for each server and drop address. const auto& statuses_and_responses = @@ -934,6 +1004,49 @@ TEST_F(SingleBalancerTest, Drop) { EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); } +TEST_F(SingleBalancerTest, DropAllFirst) { + // All registered addresses are marked as "drop". + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends( + {}, {{"rate_limiting", 1}, {"load_balancing", 1}}), + 0); + const auto& statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); + } +} + +TEST_F(SingleBalancerTest, DropAll) { + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), + 0); + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends( + {}, {{"rate_limiting", 1}, {"load_balancing", 1}}), + 1000); + + // First call succeeds. + auto statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + const EchoResponse& response = status_and_response.second; + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kMessage_); + } + // But eventually, the update with only dropped servers is processed and calls + // fail. + do { + statuses_and_responses = SendRpc(kMessage_, 1); + ASSERT_EQ(statuses_and_responses.size(), 1UL); + } while (statuses_and_responses[0].first.ok()); + const Status& status = statuses_and_responses[0].first; + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); +} + class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest { public: SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 2) {} @@ -942,7 +1055,7 @@ class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest { TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); // Send 100 RPCs per server. const auto& statuses_and_responses = @@ -971,17 +1084,17 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_started); EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_finished); - EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting); - EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing); EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_finished_known_received); + EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre()); } TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { const size_t kNumRpcsPerAddress = 3; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 2, 1), + 0, BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}), 0); // Send 100 RPCs for each server and drop address. const auto& statuses_and_responses = @@ -1018,13 +1131,13 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { client_stats.num_calls_started); EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3), client_stats.num_calls_finished); - EXPECT_EQ(kNumRpcsPerAddress * 2, - client_stats.num_calls_finished_with_drop_for_rate_limiting); - EXPECT_EQ(kNumRpcsPerAddress, - client_stats.num_calls_finished_with_drop_for_load_balancing); EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_finished_known_received); + EXPECT_THAT(client_stats.drop_token_counts, + ::testing::ElementsAre( + ::testing::Pair("load_balancing", kNumRpcsPerAddress), + ::testing::Pair("rate_limiting", kNumRpcsPerAddress * 2))); } } // namespace |