aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end/client_lb_end2end_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/end2end/client_lb_end2end_test.cc')
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc238
1 files changed, 154 insertions, 84 deletions
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index b85e286e5c..ee5adbc6fa 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -39,6 +39,7 @@
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gprpp/debug_location.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
@@ -162,44 +163,82 @@ class ClientLbEnd2endTest : public ::testing::Test {
grpc_lb_addresses_destroy(addresses);
}
+ void SetNextResolutionUponError(const std::vector<int>& ports) {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_lb_addresses* addresses =
+ grpc_lb_addresses_create(ports.size(), nullptr);
+ for (size_t i = 0; i < ports.size(); ++i) {
+ char* lb_uri_str;
+ gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", ports[i]);
+ grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
+ GPR_ASSERT(lb_uri != nullptr);
+ grpc_lb_addresses_set_address_from_uri(addresses, i, lb_uri,
+ false /* is balancer */,
+ "" /* balancer name */, nullptr);
+ grpc_uri_destroy(lb_uri);
+ gpr_free(lb_uri_str);
+ }
+ const grpc_arg fake_addresses =
+ grpc_lb_addresses_create_channel_arg(addresses);
+ grpc_channel_args* fake_result =
+ grpc_channel_args_copy_and_add(nullptr, &fake_addresses, 1);
+ grpc_fake_resolver_response_generator_set_response_upon_error(
+ response_generator_, fake_result);
+ grpc_channel_args_destroy(fake_result);
+ grpc_lb_addresses_destroy(addresses);
+ }
+
std::vector<int> GetServersPorts() {
std::vector<int> ports;
for (const auto& server : servers_) ports.push_back(server->port_);
return ports;
}
- void ResetStub(const grpc::string& lb_policy_name,
- ChannelArguments args = ChannelArguments()) {
+ std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
+ const std::shared_ptr<Channel>& channel) {
+ return grpc::testing::EchoTestService::NewStub(channel);
+ }
+
+ std::shared_ptr<Channel> BuildChannel(
+ const grpc::string& lb_policy_name,
+ ChannelArguments args = ChannelArguments()) {
if (lb_policy_name.size() > 0) {
args.SetLoadBalancingPolicyName(lb_policy_name);
} // else, default to pick first
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_);
- channel_ =
- CreateCustomChannel("fake:///", InsecureChannelCredentials(), args);
- stub_ = grpc::testing::EchoTestService::NewStub(channel_);
+ return CreateCustomChannel("fake:///", InsecureChannelCredentials(), args);
}
- bool SendRpc(EchoResponse* response = nullptr) {
+ bool SendRpc(
+ const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
+ EchoResponse* response = nullptr, int timeout_ms = 1000) {
const bool local_response = (response == nullptr);
if (local_response) response = new EchoResponse;
EchoRequest request;
request.set_message(kRequestMessage_);
ClientContext context;
- Status status = stub_->Echo(&context, request, response);
+ context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
+ Status status = stub->Echo(&context, request, response);
if (local_response) delete response;
return status.ok();
}
- void CheckRpcSendOk() {
+ void CheckRpcSendOk(
+ const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
+ const grpc_core::DebugLocation& location) {
EchoResponse response;
- const bool success = SendRpc(&response);
- EXPECT_TRUE(success);
- EXPECT_EQ(response.message(), kRequestMessage_);
+ const bool success = SendRpc(stub, &response);
+ if (!success) abort();
+ ASSERT_TRUE(success) << "From " << location.file() << ":"
+ << location.line();
+ ASSERT_EQ(response.message(), kRequestMessage_)
+ << "From " << location.file() << ":" << location.line();
}
- void CheckRpcSendFailure() {
- const bool success = SendRpc();
+ void CheckRpcSendFailure(
+ const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub) {
+ const bool success = SendRpc(stub);
EXPECT_FALSE(success);
}
@@ -238,7 +277,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
}
void Shutdown(bool join = true) {
- server_->Shutdown();
+ server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
if (join) thread_->join();
}
};
@@ -247,9 +286,11 @@ class ClientLbEnd2endTest : public ::testing::Test {
for (const auto& server : servers_) server->service_.ResetCounters();
}
- void WaitForServer(size_t server_idx) {
+ void WaitForServer(
+ const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
+ size_t server_idx, const grpc_core::DebugLocation& location) {
do {
- CheckRpcSendOk();
+ CheckRpcSendOk(stub, location);
} while (servers_[server_idx]->service_.request_count() == 0);
ResetCounters();
}
@@ -280,7 +321,6 @@ class ClientLbEnd2endTest : public ::testing::Test {
}
const grpc::string server_host_;
- std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::vector<std::unique_ptr<ServerData>> servers_;
grpc_fake_resolver_response_generator* response_generator_;
@@ -291,14 +331,15 @@ TEST_F(ClientLbEnd2endTest, PickFirst) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
- ResetStub(""); // test that pick first is the default.
+ auto channel = BuildChannel(""); // test that pick first is the default.
+ auto stub = BuildStub(channel);
std::vector<int> ports;
for (size_t i = 0; i < servers_.size(); ++i) {
ports.emplace_back(servers_[i]->port_);
}
SetNextResolution(ports);
for (size_t i = 0; i < servers_.size(); ++i) {
- CheckRpcSendOk();
+ CheckRpcSendOk(stub, DEBUG_LOCATION);
}
// All requests should have gone to a single server.
bool found = false;
@@ -312,7 +353,7 @@ TEST_F(ClientLbEnd2endTest, PickFirst) {
}
EXPECT_TRUE(found);
// Check LB policy name for the channel.
- EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
+ EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
@@ -321,15 +362,16 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
- ResetStub("pick_first", args);
+ auto channel = BuildChannel("pick_first", args);
+ auto stub = BuildStub(channel);
SetNextResolution(ports);
// The channel won't become connected (there's no server).
- ASSERT_FALSE(channel_->WaitForConnected(
+ ASSERT_FALSE(channel->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
// Bring up a server on the chosen port.
StartServers(1, ports);
// Now it will.
- ASSERT_TRUE(channel_->WaitForConnected(
+ ASSERT_TRUE(channel->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2)));
const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
@@ -349,14 +391,15 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) {
constexpr int kMinReconnectBackOffMs = 1000;
args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs);
const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
- ResetStub("pick_first", args);
+ auto channel = BuildChannel("pick_first", args);
+ auto stub = BuildStub(channel);
SetNextResolution(ports);
// Make connection delay a 10% longer than it's willing to in order to make
// sure we are hitting the codepath that waits for the min reconnect backoff.
gpr_atm_rel_store(&g_connection_delay_ms, kMinReconnectBackOffMs * 1.10);
grpc_tcp_client_connect_impl = tcp_client_connect_with_delay;
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
- channel_->WaitForConnected(
+ channel->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2));
const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
@@ -371,14 +414,16 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
- ResetStub("pick_first");
+ auto channel = BuildChannel("pick_first");
+ auto stub = BuildStub(channel);
+
std::vector<int> ports;
// Perform one RPC against the first server.
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [0] *******");
- CheckRpcSendOk();
+ CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
// An empty update will result in the channel going into TRANSIENT_FAILURE.
@@ -387,7 +432,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
gpr_log(GPR_INFO, "****** SET none *******");
grpc_connectivity_state channel_state;
do {
- channel_state = channel_->GetState(true /* try to connect */);
+ channel_state = channel->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY);
GPR_ASSERT(channel_state != GRPC_CHANNEL_READY);
servers_[0]->service_.ResetCounters();
@@ -397,7 +442,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
ports.emplace_back(servers_[1]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [1] *******");
- WaitForServer(1);
+ WaitForServer(stub, 1, DEBUG_LOCATION);
EXPECT_EQ(servers_[0]->service_.request_count(), 0);
// And again for servers_[2]
@@ -405,26 +450,28 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
ports.emplace_back(servers_[2]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [2] *******");
- WaitForServer(2);
+ WaitForServer(stub, 2, DEBUG_LOCATION);
EXPECT_EQ(servers_[0]->service_.request_count(), 0);
EXPECT_EQ(servers_[1]->service_.request_count(), 0);
// Check LB policy name for the channel.
- EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
+ EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
- ResetStub("pick_first");
+ auto channel = BuildChannel("pick_first");
+ auto stub = BuildStub(channel);
+
std::vector<int> ports;
// Perform one RPC against the first server.
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET [0] *******");
- CheckRpcSendOk();
+ CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(servers_[0]->service_.request_count(), 1);
servers_[0]->service_.ResetCounters();
@@ -434,20 +481,21 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET superset *******");
- CheckRpcSendOk();
+ CheckRpcSendOk(stub, DEBUG_LOCATION);
// We stick to the previously connected server.
- WaitForServer(0);
+ WaitForServer(stub, 0, DEBUG_LOCATION);
EXPECT_EQ(0, servers_[1]->service_.request_count());
// Check LB policy name for the channel.
- EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
+ EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
- ResetStub("pick_first");
+ auto channel = BuildChannel("pick_first");
+ auto stub = BuildStub(channel);
std::vector<int> ports;
for (size_t i = 0; i < servers_.size(); ++i) {
ports.emplace_back(servers_[i]->port_);
@@ -459,18 +507,19 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
std::shuffle(ports.begin(), ports.end(),
std::mt19937(std::random_device()()));
SetNextResolution(ports);
- if (i % 10 == 0) CheckRpcSendOk();
+ if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
}
}
// Check LB policy name for the channel.
- EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
+ EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobin) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
- ResetStub("round_robin");
+ auto channel = BuildChannel("round_robin");
+ auto stub = BuildStub(channel);
std::vector<int> ports;
for (const auto& server : servers_) {
ports.emplace_back(server->port_);
@@ -478,15 +527,15 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) {
SetNextResolution(ports);
// Wait until all backends are ready.
do {
- CheckRpcSendOk();
+ CheckRpcSendOk(stub, DEBUG_LOCATION);
} while (!SeenAllServers());
ResetCounters();
// "Sync" to the end of the list. Next sequence of picks will start at the
// first server (index 0).
- WaitForServer(servers_.size() - 1);
+ WaitForServer(stub, servers_.size() - 1, DEBUG_LOCATION);
std::vector<int> connection_order;
for (size_t i = 0; i < servers_.size(); ++i) {
- CheckRpcSendOk();
+ CheckRpcSendOk(stub, DEBUG_LOCATION);
UpdateConnectionOrder(servers_, &connection_order);
}
// Backends should be iterated over in the order in which the addresses were
@@ -494,22 +543,23 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) {
const auto expected = std::vector<int>{0, 1, 2};
EXPECT_EQ(expected, connection_order);
// Check LB policy name for the channel.
- EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
+ EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
- ResetStub("round_robin");
+ auto channel = BuildChannel("round_robin");
+ auto stub = BuildStub(channel);
std::vector<int> ports;
// Start with a single server.
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
- WaitForServer(0);
+ WaitForServer(stub, 0, DEBUG_LOCATION);
// Send RPCs. They should all go servers_[0]
- for (size_t i = 0; i < 10; ++i) CheckRpcSendOk();
+ for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(10, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
@@ -523,9 +573,9 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
// Wait until update has been processed, as signaled by the second backend
// receiving a request.
EXPECT_EQ(0, servers_[1]->service_.request_count());
- WaitForServer(1);
+ WaitForServer(stub, 1, DEBUG_LOCATION);
- for (size_t i = 0; i < 10; ++i) CheckRpcSendOk();
+ for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(10, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
@@ -535,9 +585,9 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
ports.clear();
ports.emplace_back(servers_[2]->port_);
SetNextResolution(ports);
- WaitForServer(2);
+ WaitForServer(stub, 2, DEBUG_LOCATION);
- for (size_t i = 0; i < 10; ++i) CheckRpcSendOk();
+ for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(10, servers_[2]->service_.request_count());
@@ -549,12 +599,12 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[2]->port_);
SetNextResolution(ports);
- WaitForServer(0);
- WaitForServer(1);
- WaitForServer(2);
+ WaitForServer(stub, 0, DEBUG_LOCATION);
+ WaitForServer(stub, 1, DEBUG_LOCATION);
+ WaitForServer(stub, 2, DEBUG_LOCATION);
// Send three RPCs, one per server.
- for (size_t i = 0; i < 3; ++i) CheckRpcSendOk();
+ for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(1, servers_[0]->service_.request_count());
EXPECT_EQ(1, servers_[1]->service_.request_count());
EXPECT_EQ(1, servers_[2]->service_.request_count());
@@ -564,7 +614,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
SetNextResolution(ports);
grpc_connectivity_state channel_state;
do {
- channel_state = channel_->GetState(true /* try to connect */);
+ channel_state = channel->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY);
GPR_ASSERT(channel_state != GRPC_CHANNEL_READY);
servers_[0]->service_.ResetCounters();
@@ -573,26 +623,27 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
ports.clear();
ports.emplace_back(servers_[1]->port_);
SetNextResolution(ports);
- WaitForServer(1);
- channel_state = channel_->GetState(false /* try to connect */);
+ WaitForServer(stub, 1, DEBUG_LOCATION);
+ channel_state = channel->GetState(false /* try to connect */);
GPR_ASSERT(channel_state == GRPC_CHANNEL_READY);
// Check LB policy name for the channel.
- EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
+ EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
const int kNumServers = 3;
StartServers(kNumServers);
- ResetStub("round_robin");
+ auto channel = BuildChannel("round_robin");
+ auto stub = BuildStub(channel);
std::vector<int> ports;
// Start with a single server.
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
- WaitForServer(0);
+ WaitForServer(stub, 0, DEBUG_LOCATION);
// Send RPCs. They should all go to servers_[0]
- for (size_t i = 0; i < 10; ++i) SendRpc();
+ for (size_t i = 0; i < 10; ++i) SendRpc(stub);
EXPECT_EQ(10, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
@@ -603,11 +654,11 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) {
ports.emplace_back(servers_[1]->port_);
ports.emplace_back(servers_[2]->port_);
SetNextResolution(ports);
- WaitForServer(0);
- WaitForServer(2);
+ WaitForServer(stub, 0, DEBUG_LOCATION);
+ WaitForServer(stub, 2, DEBUG_LOCATION);
// Send three RPCs, one per server.
- for (size_t i = 0; i < kNumServers; ++i) SendRpc();
+ for (size_t i = 0; i < kNumServers; ++i) SendRpc(stub);
// The server in shutdown shouldn't receive any.
EXPECT_EQ(0, servers_[1]->service_.request_count());
}
@@ -616,7 +667,8 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
StartServers(kNumServers);
- ResetStub("round_robin");
+ auto channel = BuildChannel("round_robin");
+ auto stub = BuildStub(channel);
std::vector<int> ports;
for (size_t i = 0; i < servers_.size(); ++i) {
ports.emplace_back(servers_[i]->port_);
@@ -625,10 +677,10 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
std::shuffle(ports.begin(), ports.end(),
std::mt19937(std::random_device()()));
SetNextResolution(ports);
- if (i % 10 == 0) CheckRpcSendOk();
+ if (i % 10 == 0) CheckRpcSendOk(stub, DEBUG_LOCATION);
}
// Check LB policy name for the channel.
- EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
+ EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) {
@@ -639,16 +691,21 @@ TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) {
TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
- std::vector<int> ports;
+ std::vector<int> first_ports;
+ std::vector<int> second_ports;
for (int i = 0; i < kNumServers; ++i) {
- ports.push_back(grpc_pick_unused_port_or_die());
+ first_ports.push_back(grpc_pick_unused_port_or_die());
}
- StartServers(kNumServers, ports);
- ResetStub("round_robin");
- SetNextResolution(ports);
+ for (int i = 0; i < kNumServers; ++i) {
+ second_ports.push_back(grpc_pick_unused_port_or_die());
+ }
+ StartServers(kNumServers, first_ports);
+ auto channel = BuildChannel("round_robin");
+ auto stub = BuildStub(channel);
+ SetNextResolution(first_ports);
// Send a number of RPCs, which succeed.
for (size_t i = 0; i < 100; ++i) {
- CheckRpcSendOk();
+ CheckRpcSendOk(stub, DEBUG_LOCATION);
}
// Kill all servers
gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******");
@@ -658,18 +715,25 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
gpr_log(GPR_INFO, "****** SERVERS KILLED *******");
gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******");
// Client requests should fail. Send enough to tickle all subchannels.
- for (size_t i = 0; i < servers_.size(); ++i) CheckRpcSendFailure();
+ for (size_t i = 0; i < servers_.size(); ++i) CheckRpcSendFailure(stub);
gpr_log(GPR_INFO, "****** DOOMED REQUESTS SENT *******");
- // Bring servers back up on the same port (we aren't recreating the channel).
+ // Bring servers back up on a different set of ports. We need to do this to be
+ // sure that the eventual success is *not* due to subchannel reconnection
+ // attempts and that an actual re-resolution has happened as a result of the
+ // RR policy going into transient failure when all its subchannels become
+ // unavailable (in transient failure as well).
gpr_log(GPR_INFO, "****** RESTARTING SERVERS *******");
- StartServers(kNumServers, ports);
+ StartServers(kNumServers, second_ports);
+ // Don't notify of the update. Wait for the LB policy's re-resolution to
+ // "pull" the new ports.
+ SetNextResolutionUponError(second_ports);
gpr_log(GPR_INFO, "****** SERVERS RESTARTED *******");
gpr_log(GPR_INFO, "****** SENDING REQUEST TO SUCCEED *******");
// Client request should eventually (but still fairly soon) succeed.
const gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
while (gpr_time_cmp(deadline, now) > 0) {
- if (SendRpc()) break;
+ if (SendRpc(stub)) break;
now = gpr_now(GPR_CLOCK_MONOTONIC);
}
GPR_ASSERT(gpr_time_cmp(deadline, now) > 0);
@@ -679,11 +743,13 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
const int kNumServers = 3;
StartServers(kNumServers);
const auto ports = GetServersPorts();
- ResetStub("round_robin");
+ auto channel = BuildChannel("round_robin");
+ auto stub = BuildStub(channel);
SetNextResolution(ports);
- for (size_t i = 0; i < kNumServers; ++i) WaitForServer(i);
+ for (size_t i = 0; i < kNumServers; ++i)
+ WaitForServer(stub, i, DEBUG_LOCATION);
for (size_t i = 0; i < servers_.size(); ++i) {
- CheckRpcSendOk();
+ CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
}
// One request should have gone to each server.
@@ -695,10 +761,14 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
servers_[0]->Shutdown(true);
// Client request still succeed. May need retrying if RR had returned a pick
// before noticing the change in the server's connectivity.
- while (!SendRpc())
+ while (!SendRpc(stub)) {
; // Retry until success.
+ }
+ gpr_log(GPR_INFO, "------------------------------------------------------");
// Send a bunch of RPCs that should succeed.
- for (int i = 0; i < 10 * kNumServers; ++i) CheckRpcSendOk();
+ for (int i = 0; i < 10 * kNumServers; ++i) {
+ CheckRpcSendOk(stub, DEBUG_LOCATION);
+ }
const auto post_death = servers_[0]->service_.request_count();
// No requests have gone to the deceased server.
EXPECT_EQ(pre_death, post_death);
@@ -708,7 +778,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
// the server managed to start before the RR policy retried the subchannel) or
// after the subchannel retry delay otherwise (RR's subchannel retried before
// the server was fully back up).
- WaitForServer(0);
+ WaitForServer(stub, 0, DEBUG_LOCATION);
}
} // namespace