diff options
Diffstat (limited to 'test/cpp/end2end/grpclb_end2end_test.cc')
-rw-r--r-- | test/cpp/end2end/grpclb_end2end_test.cc | 129 |
1 files changed, 123 insertions, 6 deletions
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 8ebeba3522..30e1a1e0c9 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -147,12 +147,38 @@ grpc::string Ip4ToPackedString(const char* ip_str) { return grpc::string(reinterpret_cast<const char*>(&ip4), sizeof(ip4)); } +struct ClientStats { + size_t num_calls_started = 0; + size_t num_calls_finished = 0; + size_t num_calls_finished_with_drop_for_rate_limiting = 0; + size_t num_calls_finished_with_drop_for_load_balancing = 0; + size_t num_calls_finished_with_client_failed_to_send = 0; + size_t num_calls_finished_known_received = 0; + + ClientStats& operator+=(const ClientStats& other) { + num_calls_started += other.num_calls_started; + num_calls_finished += other.num_calls_finished; + num_calls_finished_with_drop_for_rate_limiting += + other.num_calls_finished_with_drop_for_rate_limiting; + num_calls_finished_with_drop_for_load_balancing += + other.num_calls_finished_with_drop_for_load_balancing; + num_calls_finished_with_client_failed_to_send += + other.num_calls_finished_with_client_failed_to_send; + num_calls_finished_known_received += + other.num_calls_finished_known_received; + return *this; + } +}; + class BalancerServiceImpl : public BalancerService { public: using Stream = ServerReaderWriter<LoadBalanceResponse, LoadBalanceRequest>; using ResponseDelayPair = std::pair<LoadBalanceResponse, int>; - BalancerServiceImpl() : shutdown_(false) {} + explicit BalancerServiceImpl(int client_load_reporting_interval_seconds) + : client_load_reporting_interval_seconds_( + client_load_reporting_interval_seconds), + shutdown_(false) {} Status BalanceLoad(ServerContext* context, Stream* stream) override { LoadBalanceRequest request; @@ -160,16 +186,49 @@ class BalancerServiceImpl : public BalancerService { IncreaseRequestCount(); gpr_log(GPR_INFO, "LB: recv msg '%s'", request.DebugString().c_str()); + if (client_load_reporting_interval_seconds_ > 0) { + LoadBalanceResponse initial_response; + initial_response.mutable_initial_response() + ->mutable_client_stats_report_interval() + ->set_seconds(client_load_reporting_interval_seconds_); + stream->Write(initial_response); + } + std::vector<ResponseDelayPair> responses_and_delays; { std::unique_lock<std::mutex> lock(mu_); responses_and_delays = responses_and_delays_; } - for (const auto& response_and_delay : responses_and_delays) { if (shutdown_) break; SendResponse(stream, response_and_delay.first, response_and_delay.second); } + + if (client_load_reporting_interval_seconds_ > 0) { + request.Clear(); + stream->Read(&request); + gpr_log(GPR_INFO, "LB: recv client load report msg: '%s'", + request.DebugString().c_str()); + GPR_ASSERT(request.has_client_stats()); + client_stats_.num_calls_started += + request.client_stats().num_calls_started(); + client_stats_.num_calls_finished += + request.client_stats().num_calls_finished(); + client_stats_.num_calls_finished_with_drop_for_rate_limiting += + request.client_stats() + .num_calls_finished_with_drop_for_rate_limiting(); + client_stats_.num_calls_finished_with_drop_for_load_balancing += + request.client_stats() + .num_calls_finished_with_drop_for_load_balancing(); + client_stats_.num_calls_finished_with_client_failed_to_send += + request.client_stats() + .num_calls_finished_with_client_failed_to_send(); + client_stats_.num_calls_finished_known_received += + request.client_stats().num_calls_finished_known_received(); + std::lock_guard<std::mutex> lock(mu_); + cond_.notify_one(); + } + return Status::OK; } @@ -194,6 +253,12 @@ class BalancerServiceImpl : public BalancerService { return response; } + const ClientStats& WaitForLoadReport() { + std::unique_lock<std::mutex> lock(mu_); + cond_.wait(lock); + return client_stats_; + } + private: void SendResponse(Stream* stream, const LoadBalanceResponse& response, int delay_ms) { @@ -206,16 +271,23 @@ class BalancerServiceImpl : public BalancerService { IncreaseResponseCount(); } + const int client_load_reporting_interval_seconds_; std::vector<ResponseDelayPair> responses_and_delays_; + std::mutex mu_; + std::condition_variable cond_; + ClientStats client_stats_; bool shutdown_; }; class GrpclbEnd2endTest : public ::testing::Test { protected: - GrpclbEnd2endTest(int num_backends, int num_balancers) + GrpclbEnd2endTest(int num_backends, int num_balancers, + int client_load_reporting_interval_seconds) : server_host_("localhost"), num_backends_(num_backends), - num_balancers_(num_balancers) {} + num_balancers_(num_balancers), + client_load_reporting_interval_seconds_( + client_load_reporting_interval_seconds) {} void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -227,7 +299,8 @@ class GrpclbEnd2endTest : public ::testing::Test { } // Start the load balancers. for (size_t i = 0; i < num_balancers_; ++i) { - balancers_.emplace_back(new BalancerServiceImpl()); + balancers_.emplace_back( + new BalancerServiceImpl(client_load_reporting_interval_seconds_)); balancer_servers_.emplace_back(ServerThread<BalancerService>( "balancer", server_host_, balancers_.back().get())); } @@ -261,6 +334,14 @@ class GrpclbEnd2endTest : public ::testing::Test { stub_ = grpc::testing::EchoTestService::NewStub(channel_); } + ClientStats WaitForLoadReports() { + ClientStats client_stats; + for (const auto& balancer : balancers_) { + client_stats += balancer->WaitForLoadReport(); + } + return client_stats; + } + struct AddressData { int port; bool is_balancer; @@ -367,6 +448,7 @@ class GrpclbEnd2endTest : public ::testing::Test { const grpc::string server_host_; const size_t num_backends_; const size_t num_balancers_; + const int client_load_reporting_interval_seconds_; std::shared_ptr<Channel> channel_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; @@ -381,7 +463,7 @@ class GrpclbEnd2endTest : public ::testing::Test { class SingleBalancerTest : public GrpclbEnd2endTest { public: - SingleBalancerTest() : GrpclbEnd2endTest(4, 1) {} + SingleBalancerTest() : GrpclbEnd2endTest(4, 1, 0) {} }; TEST_F(SingleBalancerTest, Vanilla) { @@ -505,6 +587,41 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest { + public: + SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 2) {} +}; + +TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0); + // Start servers and send 100 RPCs per server. + const auto& statuses_and_responses = SendRpc(kMessage_, 100 * num_backends_); + + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + + // Each backend should have gotten 100 requests. + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(100, backend_servers_[i].service_->request_count()); + } + // The balancer got a single request. + EXPECT_EQ(1, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1, balancer_servers_[0].service_->response_count()); + + const ClientStats client_stats = WaitForLoadReports(); + EXPECT_EQ(100 * num_backends_, client_stats.num_calls_started); + EXPECT_EQ(100 * num_backends_, client_stats.num_calls_finished); + EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting); + EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing); + EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); + EXPECT_EQ(100 * num_backends_, + client_stats.num_calls_finished_known_received); +} + } // namespace } // namespace testing } // namespace grpc |