diff options
author | David G. Quintas <dgq@google.com> | 2017-12-19 14:46:33 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-19 14:46:33 -0800 |
commit | e9b0fd07a579b1c10e6acacb4d6a59b1ac6a12d6 (patch) | |
tree | 3808535876f814dc4d2df27060f7a9c6722ea03d /test/cpp/end2end | |
parent | 1c4d410c109157bfe924144bc777e57169d5c1e3 (diff) | |
parent | 4ef4c38275ce558ee608f95fde36195a62eb2389 (diff) |
Merge pull request #13494 from dgquintas/backoff_cpp
C++-ize backoff
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r-- | test/cpp/end2end/client_lb_end2end_test.cc | 115 |
1 files changed, 96 insertions, 19 deletions
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index e83a6e675e..c6e9577f0c 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -28,6 +28,7 @@ #include <grpc++/server_builder.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> +#include <grpc/support/atm.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/thd.h> @@ -35,6 +36,7 @@ #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" +#include "src/core/lib/backoff/backoff.h" #include "src/core/lib/support/env.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -48,10 +50,33 @@ using grpc::testing::EchoRequest; using grpc::testing::EchoResponse; using std::chrono::system_clock; +// defined in tcp_client_posix.c +extern void (*grpc_tcp_client_connect_impl)( + grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, grpc_millis deadline); + +const auto original_tcp_connect_fn = grpc_tcp_client_connect_impl; + namespace grpc { namespace testing { namespace { +gpr_atm g_connection_delay_ms; + +void tcp_client_connect_with_delay(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_millis deadline) { + const int delay_ms = gpr_atm_acq_load(&g_connection_delay_ms); + if (delay_ms > 0) { + gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms)); + } + original_tcp_connect_fn(closure, ep, interested_parties, channel_args, addr, + deadline + delay_ms); +} + // Subclass of TestServiceImpl that increments a request counter for // every call to the Echo RPC. class MyTestServiceImpl : public TestServiceImpl { @@ -136,22 +161,22 @@ class ClientLbEnd2endTest : public ::testing::Test { grpc_lb_addresses_destroy(addresses); } - void ResetStub(const grpc::string& lb_policy_name = "") { - ChannelArguments args; + std::vector<int> GetServersPorts() { + std::vector<int> ports; + for (const auto& server : servers_) ports.push_back(server->port_); + return ports; + } + + void ResetStub(const std::vector<int>& ports, + const grpc::string& lb_policy_name, + ChannelArguments args = ChannelArguments()) { if (lb_policy_name.size() > 0) { args.SetLoadBalancingPolicyName(lb_policy_name); } // else, default to pick first args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_); - args.SetInt("grpc.testing.fixed_reconnect_backoff_ms", 2000); - std::ostringstream uri; - uri << "fake:///"; - for (size_t i = 0; i < servers_.size() - 1; ++i) { - uri << "127.0.0.1:" << servers_[i]->port_ << ","; - } - uri << "127.0.0.1:" << servers_[servers_.size() - 1]->port_; channel_ = - CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args); + CreateCustomChannel("fake:///", InsecureChannelCredentials(), args); stub_ = grpc::testing::EchoTestService::NewStub(channel_); } @@ -266,7 +291,7 @@ TEST_F(ClientLbEnd2endTest, PickFirst) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); - ResetStub(); // implicit pick first + ResetStub(GetServersPorts(), ""); // test that pick first is the default. std::vector<int> ports; for (size_t i = 0; i < servers_.size(); ++i) { ports.emplace_back(servers_[i]->port_); @@ -290,11 +315,63 @@ TEST_F(ClientLbEnd2endTest, PickFirst) { EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName()); } +TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) { + ChannelArguments args; + constexpr int kInitialBackOffMs = 100; + args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); + const std::vector<int> ports = {grpc_pick_unused_port_or_die()}; + const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC); + ResetStub(ports, "pick_first", args); + SetNextResolution(ports); + // The channel won't become connected (there's no server). + ASSERT_FALSE(channel_->WaitForConnected( + grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2))); + // Bring up a server on the chosen port. + StartServers(1, ports); + // Now it will. + ASSERT_TRUE(channel_->WaitForConnected( + grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 2))); + const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC); + const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0)); + gpr_log(GPR_DEBUG, "Waited %ld milliseconds", waited_ms); + // We should have waited at least kInitialBackOffMs. We substract one to + // account for test and precision accuracy drift. + EXPECT_GE(waited_ms, kInitialBackOffMs - 1); + // But not much more. + EXPECT_GT( + gpr_time_cmp( + grpc_timeout_milliseconds_to_deadline(kInitialBackOffMs * 1.10), t1), + 0); +} + +TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) { + ChannelArguments args; + constexpr int kMinReconnectBackOffMs = 1000; + args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, kMinReconnectBackOffMs); + const std::vector<int> ports = {grpc_pick_unused_port_or_die()}; + ResetStub(ports, "pick_first", args); + SetNextResolution(ports); + // Make connection delay a 10% longer than it's willing to in order to make + // sure we are hitting the codepath that waits for the min reconnect backoff. + gpr_atm_rel_store(&g_connection_delay_ms, kMinReconnectBackOffMs * 1.10); + grpc_tcp_client_connect_impl = tcp_client_connect_with_delay; + const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC); + channel_->WaitForConnected( + grpc_timeout_milliseconds_to_deadline(kMinReconnectBackOffMs * 2)); + const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC); + const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0)); + gpr_log(GPR_DEBUG, "Waited %ld ms", waited_ms); + // We should have waited at least kMinReconnectBackOffMs. We substract one to + // account for test and precision accuracy drift. + EXPECT_GE(waited_ms, kMinReconnectBackOffMs - 1); + gpr_atm_rel_store(&g_connection_delay_ms, 0); +} + TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); - ResetStub(); // implicit pick first + ResetStub(GetServersPorts(), "pick_first"); std::vector<int> ports; // Perform one RPC against the first server. @@ -340,7 +417,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); - ResetStub(); // implicit pick first + ResetStub(GetServersPorts(), "pick_first"); std::vector<int> ports; // Perform one RPC against the first server. @@ -370,7 +447,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); - ResetStub(); // implicit pick first + ResetStub(GetServersPorts(), "pick_first"); std::vector<int> ports; for (size_t i = 0; i < servers_.size(); ++i) { ports.emplace_back(servers_[i]->port_); @@ -392,7 +469,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); - ResetStub("round_robin"); + ResetStub(GetServersPorts(), "round_robin"); std::vector<int> ports; for (const auto& server : servers_) { ports.emplace_back(server->port_); @@ -423,7 +500,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); - ResetStub("round_robin"); + ResetStub(GetServersPorts(), "round_robin"); std::vector<int> ports; // Start with a single server. @@ -506,7 +583,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { TEST_F(ClientLbEnd2endTest, RoundRobinUpdateInError) { const int kNumServers = 3; StartServers(kNumServers); - ResetStub("round_robin"); + ResetStub(GetServersPorts(), "round_robin"); std::vector<int> ports; // Start with a single server. @@ -538,7 +615,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; StartServers(kNumServers); - ResetStub("round_robin"); + ResetStub(GetServersPorts(), "round_robin"); std::vector<int> ports; for (size_t i = 0; i < servers_.size(); ++i) { ports.emplace_back(servers_[i]->port_); @@ -565,7 +642,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { ports.push_back(grpc_pick_unused_port_or_die()); } StartServers(kNumServers, ports); - ResetStub("round_robin"); + ResetStub(GetServersPorts(), "round_robin"); SetNextResolution(ports); // Send a number of RPCs, which succeed. for (size_t i = 0; i < 100; ++i) { |