aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end/grpclb_end2end_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/end2end/grpclb_end2end_test.cc')
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc129
1 files changed, 123 insertions, 6 deletions
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 8ebeba3522..30e1a1e0c9 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -147,12 +147,38 @@ grpc::string Ip4ToPackedString(const char* ip_str) {
return grpc::string(reinterpret_cast<const char*>(&ip4), sizeof(ip4));
}
+struct ClientStats {
+ size_t num_calls_started = 0;
+ size_t num_calls_finished = 0;
+ size_t num_calls_finished_with_drop_for_rate_limiting = 0;
+ size_t num_calls_finished_with_drop_for_load_balancing = 0;
+ size_t num_calls_finished_with_client_failed_to_send = 0;
+ size_t num_calls_finished_known_received = 0;
+
+ ClientStats& operator+=(const ClientStats& other) {
+ num_calls_started += other.num_calls_started;
+ num_calls_finished += other.num_calls_finished;
+ num_calls_finished_with_drop_for_rate_limiting +=
+ other.num_calls_finished_with_drop_for_rate_limiting;
+ num_calls_finished_with_drop_for_load_balancing +=
+ other.num_calls_finished_with_drop_for_load_balancing;
+ num_calls_finished_with_client_failed_to_send +=
+ other.num_calls_finished_with_client_failed_to_send;
+ num_calls_finished_known_received +=
+ other.num_calls_finished_known_received;
+ return *this;
+ }
+};
+
class BalancerServiceImpl : public BalancerService {
public:
using Stream = ServerReaderWriter<LoadBalanceResponse, LoadBalanceRequest>;
using ResponseDelayPair = std::pair<LoadBalanceResponse, int>;
- BalancerServiceImpl() : shutdown_(false) {}
+ explicit BalancerServiceImpl(int client_load_reporting_interval_seconds)
+ : client_load_reporting_interval_seconds_(
+ client_load_reporting_interval_seconds),
+ shutdown_(false) {}
Status BalanceLoad(ServerContext* context, Stream* stream) override {
LoadBalanceRequest request;
@@ -160,16 +186,49 @@ class BalancerServiceImpl : public BalancerService {
IncreaseRequestCount();
gpr_log(GPR_INFO, "LB: recv msg '%s'", request.DebugString().c_str());
+ if (client_load_reporting_interval_seconds_ > 0) {
+ LoadBalanceResponse initial_response;
+ initial_response.mutable_initial_response()
+ ->mutable_client_stats_report_interval()
+ ->set_seconds(client_load_reporting_interval_seconds_);
+ stream->Write(initial_response);
+ }
+
std::vector<ResponseDelayPair> responses_and_delays;
{
std::unique_lock<std::mutex> lock(mu_);
responses_and_delays = responses_and_delays_;
}
-
for (const auto& response_and_delay : responses_and_delays) {
if (shutdown_) break;
SendResponse(stream, response_and_delay.first, response_and_delay.second);
}
+
+ if (client_load_reporting_interval_seconds_ > 0) {
+ request.Clear();
+ stream->Read(&request);
+ gpr_log(GPR_INFO, "LB: recv client load report msg: '%s'",
+ request.DebugString().c_str());
+ GPR_ASSERT(request.has_client_stats());
+ client_stats_.num_calls_started +=
+ request.client_stats().num_calls_started();
+ client_stats_.num_calls_finished +=
+ request.client_stats().num_calls_finished();
+ client_stats_.num_calls_finished_with_drop_for_rate_limiting +=
+ request.client_stats()
+ .num_calls_finished_with_drop_for_rate_limiting();
+ client_stats_.num_calls_finished_with_drop_for_load_balancing +=
+ request.client_stats()
+ .num_calls_finished_with_drop_for_load_balancing();
+ client_stats_.num_calls_finished_with_client_failed_to_send +=
+ request.client_stats()
+ .num_calls_finished_with_client_failed_to_send();
+ client_stats_.num_calls_finished_known_received +=
+ request.client_stats().num_calls_finished_known_received();
+ std::lock_guard<std::mutex> lock(mu_);
+ cond_.notify_one();
+ }
+
return Status::OK;
}
@@ -194,6 +253,12 @@ class BalancerServiceImpl : public BalancerService {
return response;
}
+ const ClientStats& WaitForLoadReport() {
+ std::unique_lock<std::mutex> lock(mu_);
+ cond_.wait(lock);
+ return client_stats_;
+ }
+
private:
void SendResponse(Stream* stream, const LoadBalanceResponse& response,
int delay_ms) {
@@ -206,16 +271,23 @@ class BalancerServiceImpl : public BalancerService {
IncreaseResponseCount();
}
+ const int client_load_reporting_interval_seconds_;
std::vector<ResponseDelayPair> responses_and_delays_;
+ std::mutex mu_;
+ std::condition_variable cond_;
+ ClientStats client_stats_;
bool shutdown_;
};
class GrpclbEnd2endTest : public ::testing::Test {
protected:
- GrpclbEnd2endTest(int num_backends, int num_balancers)
+ GrpclbEnd2endTest(int num_backends, int num_balancers,
+ int client_load_reporting_interval_seconds)
: server_host_("localhost"),
num_backends_(num_backends),
- num_balancers_(num_balancers) {}
+ num_balancers_(num_balancers),
+ client_load_reporting_interval_seconds_(
+ client_load_reporting_interval_seconds) {}
void SetUp() override {
response_generator_ = grpc_fake_resolver_response_generator_create();
@@ -227,7 +299,8 @@ class GrpclbEnd2endTest : public ::testing::Test {
}
// Start the load balancers.
for (size_t i = 0; i < num_balancers_; ++i) {
- balancers_.emplace_back(new BalancerServiceImpl());
+ balancers_.emplace_back(
+ new BalancerServiceImpl(client_load_reporting_interval_seconds_));
balancer_servers_.emplace_back(ServerThread<BalancerService>(
"balancer", server_host_, balancers_.back().get()));
}
@@ -261,6 +334,14 @@ class GrpclbEnd2endTest : public ::testing::Test {
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
+ ClientStats WaitForLoadReports() {
+ ClientStats client_stats;
+ for (const auto& balancer : balancers_) {
+ client_stats += balancer->WaitForLoadReport();
+ }
+ return client_stats;
+ }
+
struct AddressData {
int port;
bool is_balancer;
@@ -367,6 +448,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
const grpc::string server_host_;
const size_t num_backends_;
const size_t num_balancers_;
+ const int client_load_reporting_interval_seconds_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
@@ -381,7 +463,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
class SingleBalancerTest : public GrpclbEnd2endTest {
public:
- SingleBalancerTest() : GrpclbEnd2endTest(4, 1) {}
+ SingleBalancerTest() : GrpclbEnd2endTest(4, 1, 0) {}
};
TEST_F(SingleBalancerTest, Vanilla) {
@@ -505,6 +587,41 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
+class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
+ public:
+ SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 2) {}
+};
+
+TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0);
+ // Start servers and send 100 RPCs per server.
+ const auto& statuses_and_responses = SendRpc(kMessage_, 100 * num_backends_);
+
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+
+ // Each backend should have gotten 100 requests.
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ EXPECT_EQ(100, backend_servers_[i].service_->request_count());
+ }
+ // The balancer got a single request.
+ EXPECT_EQ(1, balancer_servers_[0].service_->request_count());
+ // and sent a single response.
+ EXPECT_EQ(1, balancer_servers_[0].service_->response_count());
+
+ const ClientStats client_stats = WaitForLoadReports();
+ EXPECT_EQ(100 * num_backends_, client_stats.num_calls_started);
+ EXPECT_EQ(100 * num_backends_, client_stats.num_calls_finished);
+ EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting);
+ EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing);
+ EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
+ EXPECT_EQ(100 * num_backends_,
+ client_stats.num_calls_finished_known_received);
+}
+
} // namespace
} // namespace testing
} // namespace grpc