aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-08-31 10:54:11 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-08-31 10:54:11 -0700
commit8fd40d5ed9303c693865ff431b7565173a4b898e (patch)
tree5d3f6ce958de74e79a8bbaf71b7eb8bbd8acfecd /test/cpp/end2end
parentc9c78ee96c5a73234513decd398a63d48f14aa89 (diff)
parent8d51e8d17e012f81ca8e94c18f525e1781130481 (diff)
Merge github.com:grpc/grpc into wc
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r--test/cpp/end2end/BUILD10
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc108
-rw-r--r--test/cpp/end2end/end2end_test.cc8
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc223
4 files changed, 250 insertions, 99 deletions
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 27c5492c17..b8505c1ae7 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -14,15 +14,9 @@
licenses(["notice"]) # Apache v2
-load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test")
+load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
-package(
- default_visibility=["//visibility:public"], # Allows external users to implement end2end tests.
- features = [
- "-layering_check",
- "-parse_headers",
- ],
-)
+grpc_package(name = "test/cpp/end2end", visibility = "public") # Allows external users to implement end2end tests.
grpc_cc_library(
name = "test_service_impl",
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index e1160ecdc6..b588eda84f 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -85,7 +85,8 @@ class MyTestServiceImpl : public TestServiceImpl {
class ClientLbEnd2endTest : public ::testing::Test {
protected:
- ClientLbEnd2endTest() : server_host_("localhost") {}
+ ClientLbEnd2endTest()
+ : server_host_("localhost"), kRequestMessage_("Live long and prosper.") {}
void SetUp() override {
response_generator_ = grpc_fake_resolver_response_generator_create();
@@ -139,6 +140,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
} // else, default to pick first
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_);
+ args.SetInt("grpc.testing.fixed_reconnect_backoff_ms", 2000);
std::ostringstream uri;
uri << "fake:///";
for (size_t i = 0; i < servers_.size() - 1; ++i) {
@@ -150,18 +152,27 @@ class ClientLbEnd2endTest : public ::testing::Test {
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
- void SendRpc(bool expect_ok = true) {
+ Status SendRpc(EchoResponse* response = nullptr) {
+ const bool local_response = (response == nullptr);
+ if (local_response) response = new EchoResponse;
EchoRequest request;
- EchoResponse response;
- request.set_message("Live long and prosper.");
+ request.set_message(kRequestMessage_);
ClientContext context;
- Status status = stub_->Echo(&context, request, &response);
- if (expect_ok) {
- EXPECT_TRUE(status.ok());
- EXPECT_EQ(response.message(), request.message());
- } else {
- EXPECT_FALSE(status.ok());
- }
+ Status status = stub_->Echo(&context, request, response);
+ if (local_response) delete response;
+ return status;
+ }
+
+ void CheckRpcSendOk() {
+ EchoResponse response;
+ const Status status = SendRpc(&response);
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ(response.message(), kRequestMessage_);
+ }
+
+ void CheckRpcSendFailure() {
+ const Status status = SendRpc();
+ EXPECT_FALSE(status.ok());
}
struct ServerData {
@@ -207,7 +218,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
void WaitForServer(size_t server_idx) {
do {
- SendRpc();
+ CheckRpcSendOk();
} while (servers_[server_idx]->service_.request_count() == 0);
ResetCounters();
}
@@ -217,6 +228,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::vector<std::unique_ptr<ServerData>> servers_;
grpc_fake_resolver_response_generator* response_generator_;
+ const grpc::string kRequestMessage_;
};
TEST_F(ClientLbEnd2endTest, PickFirst) {
@@ -230,7 +242,7 @@ TEST_F(ClientLbEnd2endTest, PickFirst) {
}
SetNextResolution(ports);
for (size_t i = 0; i < servers_.size(); ++i) {
- SendRpc();
+ CheckRpcSendOk();
}
// All requests should have gone to a single server.
bool found = false;
@@ -258,7 +270,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [0] *******");
- SendRpc();
+ CheckRpcSendOk();
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
// An empty update will result in the channel going into TRANSIENT_FAILURE.
@@ -304,7 +316,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [0] *******");
- SendRpc();
+ CheckRpcSendOk();
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
servers_[0]->service_.ResetCounters();
@@ -314,7 +326,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET superset *******");
- SendRpc();
+ CheckRpcSendOk();
// We stick to the previously connected server.
WaitForServer(0);
EXPECT_EQ(0, servers_[1]->service_.request_count());
@@ -338,7 +350,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
for (size_t i = 0; i < 1000; ++i) {
std::random_shuffle(ports.begin(), ports.end());
SetNextResolution(ports);
- if (i % 10 == 0) SendRpc();
+ if (i % 10 == 0) CheckRpcSendOk();
}
}
// Check LB policy name for the channel.
@@ -356,7 +368,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) {
}
SetNextResolution(ports);
for (size_t i = 0; i < servers_.size(); ++i) {
- SendRpc();
+ CheckRpcSendOk();
}
// One request should have gone to each server.
for (size_t i = 0; i < servers_.size(); ++i) {
@@ -378,7 +390,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
SetNextResolution(ports);
WaitForServer(0);
// Send RPCs. They should all go servers_[0]
- for (size_t i = 0; i < 10; ++i) SendRpc();
+ for (size_t i = 0; i < 10; ++i) CheckRpcSendOk();
EXPECT_EQ(10, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
@@ -394,7 +406,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
EXPECT_EQ(0, servers_[1]->service_.request_count());
WaitForServer(1);
- for (size_t i = 0; i < 10; ++i) SendRpc();
+ for (size_t i = 0; i < 10; ++i) CheckRpcSendOk();
EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(10, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
@@ -406,7 +418,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
SetNextResolution(ports);
WaitForServer(2);
- for (size_t i = 0; i < 10; ++i) SendRpc();
+ for (size_t i = 0; i < 10; ++i) CheckRpcSendOk();
EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(10, servers_[2]->service_.request_count());
@@ -423,7 +435,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
WaitForServer(2);
// Send three RPCs, one per server.
- for (size_t i = 0; i < 3; ++i) SendRpc();
+ for (size_t i = 0; i < 3; ++i) CheckRpcSendOk();
EXPECT_EQ(1, servers_[0]->service_.request_count());
EXPECT_EQ(1, servers_[1]->service_.request_count());
EXPECT_EQ(1, servers_[2]->service_.request_count());
@@ -450,6 +462,37 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
}
+TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub("round_robin");
+ std::vector<int> ports;
+
+ // Start with a single server.
+ ports.emplace_back(servers_[0]->port_);
+ SetNextResolution(ports);
+ WaitForServer(0);
+ // Send RPCs. They should all go to servers_[0]
+ for (size_t i = 0; i < 10; ++i) SendRpc();
+ EXPECT_EQ(10, servers_[0]->service_.request_count());
+ EXPECT_EQ(0, servers_[1]->service_.request_count());
+ EXPECT_EQ(0, servers_[2]->service_.request_count());
+ servers_[0]->service_.ResetCounters();
+
+ // Shutdown one of the servers to be sent in the update.
+ servers_[1]->Shutdown(false);
+ ports.emplace_back(servers_[1]->port_);
+ ports.emplace_back(servers_[2]->port_);
+ SetNextResolution(ports);
+ WaitForServer(0);
+ WaitForServer(2);
+
+ // Send three RPCs, one per server.
+ for (size_t i = 0; i < kNumServers; ++i) SendRpc();
+ // The server in shutdown shouldn't receive any.
+ EXPECT_EQ(0, servers_[1]->service_.request_count());
+}
+
TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
@@ -462,7 +505,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
for (size_t i = 0; i < 1000; ++i) {
std::random_shuffle(ports.begin(), ports.end());
SetNextResolution(ports);
- if (i % 10 == 0) SendRpc();
+ if (i % 10 == 0) CheckRpcSendOk();
}
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
@@ -473,11 +516,13 @@ TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) {
// update provisions of RR.
}
-TEST_F(ClientLbEnd2endTest, RoundRobinReconnect) {
+TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
// Start servers and send one RPC per server.
- const int kNumServers = 1;
+ const int kNumServers = 3;
std::vector<int> ports;
- ports.push_back(grpc_pick_unused_port_or_die());
+ for (int i = 0; i < kNumServers; ++i) {
+ ports.push_back(grpc_pick_unused_port_or_die());
+ }
StartServers(kNumServers, ports);
ResetStub("round_robin");
SetNextResolution(ports);
@@ -486,24 +531,19 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReconnect) {
// state READY in the order in which the addresses are specified,
// which is only true because the backends are all local.
for (size_t i = 0; i < servers_.size(); ++i) {
- SendRpc();
+ CheckRpcSendOk();
EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
}
- // Check LB policy name for the channel.
- EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
-
// Kill all servers
for (size_t i = 0; i < servers_.size(); ++i) {
servers_[i]->Shutdown(false);
}
// Client request should fail.
- SendRpc(false);
-
+ CheckRpcSendFailure();
// Bring servers back up on the same port (we aren't recreating the channel).
StartServers(kNumServers, ports);
-
// Client request should succeed.
- SendRpc();
+ CheckRpcSendOk();
}
} // namespace
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 8d12971bc1..8bada48a2b 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -1565,7 +1565,9 @@ TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
Status s = stub_->Echo(&context, request, &response);
EXPECT_FALSE(s.ok());
EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
- EXPECT_EQ(s.error_message(), kTestCredsPluginErrorMsg);
+ EXPECT_EQ(s.error_message(),
+ grpc::string("Getting metadata from plugin failed with error: ") +
+ kTestCredsPluginErrorMsg);
}
TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
@@ -1624,7 +1626,9 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) {
Status s = stub_->Echo(&context, request, &response);
EXPECT_FALSE(s.ok());
EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
- EXPECT_EQ(s.error_message(), kTestCredsPluginErrorMsg);
+ EXPECT_EQ(s.error_message(),
+ grpc::string("Getting metadata from plugin failed with error: ") +
+ kTestCredsPluginErrorMsg);
}
TEST_P(SecureEnd2endTest, ClientAuthContext) {
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 86cce2d30d..b5cff664f6 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -45,6 +45,7 @@ extern "C" {
#include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include <gmock/gmock.h>
#include <gtest/gtest.h>
// TODO(dgq): Other scenarios in need of testing:
@@ -73,8 +74,8 @@ extern "C" {
using std::chrono::system_clock;
-using grpc::lb::v1::LoadBalanceResponse;
using grpc::lb::v1::LoadBalanceRequest;
+using grpc::lb::v1::LoadBalanceResponse;
using grpc::lb::v1::LoadBalancer;
namespace grpc {
@@ -131,6 +132,19 @@ class BackendServiceImpl : public BackendService {
IncreaseResponseCount();
return status;
}
+
+ // Returns true on its first invocation, false otherwise.
+ bool Shutdown() {
+ std::unique_lock<std::mutex> lock(mu_);
+ const bool prev = !shutdown_;
+ shutdown_ = true;
+ gpr_log(GPR_INFO, "Backend: shut down");
+ return prev;
+ }
+
+ private:
+ std::mutex mu_;
+ bool shutdown_ = false;
};
grpc::string Ip4ToPackedString(const char* ip_str) {
@@ -142,22 +156,20 @@ grpc::string Ip4ToPackedString(const char* ip_str) {
struct ClientStats {
size_t num_calls_started = 0;
size_t num_calls_finished = 0;
- size_t num_calls_finished_with_drop_for_rate_limiting = 0;
- size_t num_calls_finished_with_drop_for_load_balancing = 0;
size_t num_calls_finished_with_client_failed_to_send = 0;
size_t num_calls_finished_known_received = 0;
+ std::map<grpc::string, size_t> drop_token_counts;
ClientStats& operator+=(const ClientStats& other) {
num_calls_started += other.num_calls_started;
num_calls_finished += other.num_calls_finished;
- num_calls_finished_with_drop_for_rate_limiting +=
- other.num_calls_finished_with_drop_for_rate_limiting;
- num_calls_finished_with_drop_for_load_balancing +=
- other.num_calls_finished_with_drop_for_load_balancing;
num_calls_finished_with_client_failed_to_send +=
other.num_calls_finished_with_client_failed_to_send;
num_calls_finished_known_received +=
other.num_calls_finished_known_received;
+ for (const auto& p : other.drop_token_counts) {
+ drop_token_counts[p.first] += p.second;
+ }
return *this;
}
};
@@ -173,11 +185,12 @@ class BalancerServiceImpl : public BalancerService {
shutdown_(false) {}
Status BalanceLoad(ServerContext* context, Stream* stream) override {
- gpr_log(GPR_INFO, "LB: BalanceLoad");
+ gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this);
LoadBalanceRequest request;
stream->Read(&request);
IncreaseRequestCount();
- gpr_log(GPR_INFO, "LB: recv msg '%s'", request.DebugString().c_str());
+ gpr_log(GPR_INFO, "LB[%p]: recv msg '%s'", this,
+ request.DebugString().c_str());
if (client_load_reporting_interval_seconds_ > 0) {
LoadBalanceResponse initial_response;
@@ -202,13 +215,14 @@ class BalancerServiceImpl : public BalancerService {
{
std::unique_lock<std::mutex> lock(mu_);
if (shutdown_) goto done;
- serverlist_cond_.wait(lock);
+ serverlist_cond_.wait(lock, [this] { return serverlist_ready_; });
+ serverlist_ready_ = false;
}
if (client_load_reporting_interval_seconds_ > 0) {
request.Clear();
stream->Read(&request);
- gpr_log(GPR_INFO, "LB: recv client load report msg: '%s'",
+ gpr_log(GPR_INFO, "LB[%p]: recv client load report msg: '%s'", this,
request.DebugString().c_str());
GPR_ASSERT(request.has_client_stats());
// We need to acquire the lock here in order to prevent the notify_one
@@ -218,21 +232,22 @@ class BalancerServiceImpl : public BalancerService {
request.client_stats().num_calls_started();
client_stats_.num_calls_finished +=
request.client_stats().num_calls_finished();
- client_stats_.num_calls_finished_with_drop_for_rate_limiting +=
- request.client_stats()
- .num_calls_finished_with_drop_for_rate_limiting();
- client_stats_.num_calls_finished_with_drop_for_load_balancing +=
- request.client_stats()
- .num_calls_finished_with_drop_for_load_balancing();
client_stats_.num_calls_finished_with_client_failed_to_send +=
request.client_stats()
.num_calls_finished_with_client_failed_to_send();
client_stats_.num_calls_finished_known_received +=
request.client_stats().num_calls_finished_known_received();
+ for (const auto& drop_token_count :
+ request.client_stats().calls_finished_with_drop()) {
+ client_stats_
+ .drop_token_counts[drop_token_count.load_balance_token()] +=
+ drop_token_count.num_calls();
+ }
+ load_report_ready_ = true;
load_report_cond_.notify_one();
}
done:
- gpr_log(GPR_INFO, "LB: done");
+ gpr_log(GPR_INFO, "LB[%p]: done", this);
return Status::OK;
}
@@ -247,21 +262,20 @@ class BalancerServiceImpl : public BalancerService {
std::unique_lock<std::mutex> lock(mu_);
const bool prev = !shutdown_;
shutdown_ = true;
- gpr_log(GPR_INFO, "LB: shut down");
+ gpr_log(GPR_INFO, "LB[%p]: shut down", this);
return prev;
}
static LoadBalanceResponse BuildResponseForBackends(
- const std::vector<int>& backend_ports, int num_drops_for_rate_limiting,
- int num_drops_for_load_balancing) {
+ const std::vector<int>& backend_ports,
+ const std::map<grpc::string, size_t>& drop_token_counts) {
LoadBalanceResponse response;
- for (int i = 0; i < num_drops_for_rate_limiting; ++i) {
- auto* server = response.mutable_server_list()->add_servers();
- server->set_drop_for_rate_limiting(true);
- }
- for (int i = 0; i < num_drops_for_load_balancing; ++i) {
- auto* server = response.mutable_server_list()->add_servers();
- server->set_drop_for_load_balancing(true);
+ for (const auto& drop_token_count : drop_token_counts) {
+ for (size_t i = 0; i < drop_token_count.second; ++i) {
+ auto* server = response.mutable_server_list()->add_servers();
+ server->set_drop(true);
+ server->set_load_balance_token(drop_token_count.first);
+ }
}
for (const int& backend_port : backend_ports) {
auto* server = response.mutable_server_list()->add_servers();
@@ -273,25 +287,27 @@ class BalancerServiceImpl : public BalancerService {
const ClientStats& WaitForLoadReport() {
std::unique_lock<std::mutex> lock(mu_);
- load_report_cond_.wait(lock);
+ load_report_cond_.wait(lock, [this] { return load_report_ready_; });
+ load_report_ready_ = false;
return client_stats_;
}
void NotifyDoneWithServerlists() {
std::lock_guard<std::mutex> lock(mu_);
+ serverlist_ready_ = true;
serverlist_cond_.notify_one();
}
private:
void SendResponse(Stream* stream, const LoadBalanceResponse& response,
int delay_ms) {
- gpr_log(GPR_INFO, "LB: sleeping for %d ms...", delay_ms);
+ gpr_log(GPR_INFO, "LB[%p]: sleeping for %d ms...", this, delay_ms);
if (delay_ms > 0) {
gpr_sleep_until(
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
}
- gpr_log(GPR_INFO, "LB: Woke up! Sending response '%s'",
+ gpr_log(GPR_INFO, "LB[%p]: Woke up! Sending response '%s'", this,
response.DebugString().c_str());
IncreaseResponseCount();
stream->Write(response);
@@ -301,7 +317,9 @@ class BalancerServiceImpl : public BalancerService {
std::vector<ResponseDelayPair> responses_and_delays_;
std::mutex mu_;
std::condition_variable load_report_cond_;
+ bool load_report_ready_ = false;
std::condition_variable serverlist_cond_;
+ bool serverlist_ready_ = false;
ClientStats client_stats_;
bool shutdown_;
};
@@ -341,7 +359,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
void TearDown() override {
for (size_t i = 0; i < backends_.size(); ++i) {
- backend_servers_[i].Shutdown();
+ if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
}
for (size_t i = 0; i < balancers_.size(); ++i) {
if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown();
@@ -499,7 +517,7 @@ class SingleBalancerTest : public GrpclbEnd2endTest {
TEST_F(SingleBalancerTest, Vanilla) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
@@ -538,7 +556,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0);
// Send non-empty serverlist only after kServerlistDelayMs
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
kServerlistDelayMs);
const auto t0 = system_clock::now();
@@ -580,11 +598,11 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) {
// Send a serverlist right away.
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// ... and the same one a bit later.
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
kServerlistDelayMs);
// Send num_backends/2 requests.
@@ -639,6 +657,61 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
+TEST_F(SingleBalancerTest, BackendsRestart) {
+ const size_t kNumRpcsPerAddress = 100;
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
+ 0);
+ // Make sure that trying to connect works without a call.
+ channel_->GetState(true /* try_to_connect */);
+ // Send 100 RPCs per server.
+ auto statuses_and_responses =
+ SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
+ for (const auto& status_and_response : statuses_and_responses) {
+ const Status& status = status_and_response.first;
+ const EchoResponse& response = status_and_response.second;
+ EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+ << " message=" << status.error_message();
+ EXPECT_EQ(response.message(), kMessage_);
+ }
+ // Each backend should have gotten 100 requests.
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ EXPECT_EQ(kNumRpcsPerAddress,
+ backend_servers_[i].service_->request_count());
+ }
+ balancers_[0]->NotifyDoneWithServerlists();
+ // The balancer got a single request.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+ // and sent a single response.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
+ }
+ statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ const Status& status = status_and_response.first;
+ EXPECT_FALSE(status.ok());
+ }
+ for (size_t i = 0; i < num_backends_; ++i) {
+ backends_.emplace_back(new BackendServiceImpl());
+ backend_servers_.emplace_back(ServerThread<BackendService>(
+ "backend", server_host_, backends_.back().get()));
+ }
+ // The following RPC will fail due to the backend ports having changed. It
+ // will nonetheless exercise the grpclb-roundrobin handling of the RR policy
+ // having gone into shutdown.
+ // TODO(dgq): implement the "backend restart" component as well. We need extra
+ // machinery to either update the LB responses "on the fly" or instruct
+ // backends which ports to restart on.
+ statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ const Status& status = status_and_response.first;
+ EXPECT_FALSE(status.ok());
+ }
+ // Check LB policy name for the channel.
+ EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
+}
+
class UpdatesTest : public GrpclbEnd2endTest {
public:
UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}
@@ -647,12 +720,10 @@ class UpdatesTest : public GrpclbEnd2endTest {
TEST_F(UpdatesTest, UpdateBalancers) {
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[1]};
-
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
- 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
- 0);
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -727,10 +798,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
const std::vector<int> second_backend{GetBackendPorts()[0]};
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
- 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
- 0);
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -810,10 +880,9 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
const std::vector<int> second_backend{GetBackendPorts()[1]};
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
- 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
- 0);
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -902,7 +971,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
TEST_F(SingleBalancerTest, Drop) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 1, 2),
+ 0, BalancerServiceImpl::BuildResponseForBackends(
+ GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}),
0);
// Send 100 RPCs for each server and drop address.
const auto& statuses_and_responses =
@@ -934,6 +1004,49 @@ TEST_F(SingleBalancerTest, Drop) {
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
}
+TEST_F(SingleBalancerTest, DropAllFirst) {
+ // All registered addresses are marked as "drop".
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(
+ {}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
+ 0);
+ const auto& statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ const Status& status = status_and_response.first;
+ EXPECT_FALSE(status.ok());
+ EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
+ }
+}
+
+TEST_F(SingleBalancerTest, DropAll) {
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
+ 0);
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(
+ {}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
+ 1000);
+
+ // First call succeeds.
+ auto statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ const Status& status = status_and_response.first;
+ const EchoResponse& response = status_and_response.second;
+ EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+ << " message=" << status.error_message();
+ EXPECT_EQ(response.message(), kMessage_);
+ }
+ // But eventually, the update with only dropped servers is processed and calls
+ // fail.
+ do {
+ statuses_and_responses = SendRpc(kMessage_, 1);
+ ASSERT_EQ(statuses_and_responses.size(), 1UL);
+ } while (statuses_and_responses[0].first.ok());
+ const Status& status = statuses_and_responses[0].first;
+ EXPECT_FALSE(status.ok());
+ EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
+}
+
class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
public:
SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 2) {}
@@ -942,7 +1055,7 @@ class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// Send 100 RPCs per server.
const auto& statuses_and_responses =
@@ -971,17 +1084,17 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_started);
EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
client_stats.num_calls_finished);
- EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting);
- EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
client_stats.num_calls_finished_known_received);
+ EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre());
}
TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
const size_t kNumRpcsPerAddress = 3;
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 2, 1),
+ 0, BalancerServiceImpl::BuildResponseForBackends(
+ GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}),
0);
// Send 100 RPCs for each server and drop address.
const auto& statuses_and_responses =
@@ -1018,13 +1131,13 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
client_stats.num_calls_started);
EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3),
client_stats.num_calls_finished);
- EXPECT_EQ(kNumRpcsPerAddress * 2,
- client_stats.num_calls_finished_with_drop_for_rate_limiting);
- EXPECT_EQ(kNumRpcsPerAddress,
- client_stats.num_calls_finished_with_drop_for_load_balancing);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
client_stats.num_calls_finished_known_received);
+ EXPECT_THAT(client_stats.drop_token_counts,
+ ::testing::ElementsAre(
+ ::testing::Pair("load_balancing", kNumRpcsPerAddress),
+ ::testing::Pair("rate_limiting", kNumRpcsPerAddress * 2)));
}
} // namespace