diff options
author | 2018-07-13 19:52:59 -0700 | |
---|---|---|
committer | 2018-07-13 19:52:59 -0700 | |
commit | be40b0d3a8cf2e37c80b2c248111051fa8bdf7bc (patch) | |
tree | 79384a0045d5bbc550666f047d088f9582755086 /test/cpp | |
parent | f9f5c67aff91e4ad26371b0a2482a5011ab45226 (diff) |
Add server load reporting service
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/BUILD | 16 | ||||
-rw-r--r-- | test/cpp/end2end/server_load_reporting_end2end_test.cc | 191 | ||||
-rw-r--r-- | test/cpp/server/load_reporter/BUILD | 1 | ||||
-rw-r--r-- | test/cpp/server/load_reporter/load_reporter_test.cc | 9 |
4 files changed, 216 insertions, 1 deletions
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 23dde69dd0..95bb7ed229 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -14,7 +14,7 @@ licenses(["notice"]) # Apache v2 -load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package", "grpc_cc_binary") +load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_library", "grpc_cc_test", "grpc_package") grpc_package( name = "test/cpp/end2end", @@ -430,6 +430,20 @@ grpc_cc_binary( ) grpc_cc_test( + name = "server_load_reporting_end2end_test", + srcs = ["server_load_reporting_end2end_test.cc"], + external_deps = [ + "gtest", + "gmock", + ], + deps = [ + "//:grpcpp_server_load_reporting", + "//src/proto/grpc/testing:echo_proto", + "//test/cpp/util:test_util", + ], +) + +grpc_cc_test( name = "shutdown_test", srcs = ["shutdown_test.cc"], external_deps = [ diff --git a/test/cpp/end2end/server_load_reporting_end2end_test.cc b/test/cpp/end2end/server_load_reporting_end2end_test.cc new file mode 100644 index 0000000000..7bc9af2a6e --- /dev/null +++ b/test/cpp/end2end/server_load_reporting_end2end_test.cc @@ -0,0 +1,191 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include <thread> + +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <grpc++/grpc++.h> +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpcpp/ext/server_load_reporting.h> +#include <grpcpp/server_builder.h> + +#include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" + +namespace grpc { +namespace testing { +namespace { + +constexpr double kMetricValue = 3.1415; +constexpr char kMetricName[] = "METRIC_PI"; + +// Different messages result in different response statuses. For simplicity in +// computing request bytes, the message sizes should be the same. +const char kOkMessage[] = "hello"; +const char kServerErrorMessage[] = "sverr"; +const char kClientErrorMessage[] = "clerr"; + +class EchoTestServiceImpl : public EchoTestService::Service { + public: + ~EchoTestServiceImpl() override {} + + Status Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response) override { + if (request->message() == kServerErrorMessage) { + return Status(StatusCode::UNKNOWN, "Server error requested"); + } + if (request->message() == kClientErrorMessage) { + return Status(StatusCode::FAILED_PRECONDITION, "Client error requested"); + } + response->set_message(request->message()); + ::grpc::load_reporter::experimental::AddLoadReportingCost( + context, kMetricName, kMetricValue); + return Status::OK; + } +}; + +class ServerLoadReportingEnd2endTest : public ::testing::Test { + protected: + void SetUp() override { + server_address_ = + "localhost:" + std::to_string(grpc_pick_unused_port_or_die()); + server_ = + ServerBuilder() + .AddListeningPort(server_address_, InsecureServerCredentials()) + .RegisterService(&echo_service_) + .SetOption(std::unique_ptr<::grpc::ServerBuilderOption>( + new ::grpc::load_reporter::experimental:: + LoadReportingServiceServerBuilderOption())) + .BuildAndStart(); + server_thread_ = + std::thread(&ServerLoadReportingEnd2endTest::RunServerLoop, this); + } + + void RunServerLoop() { server_->Wait(); } + + void TearDown() override { + server_->Shutdown(); + server_thread_.join(); + } + + void ClientMakeEchoCalls(const grpc::string& lb_id, + const grpc::string& lb_tag, + const grpc::string& message, size_t num_requests) { + auto stub = EchoTestService::NewStub( + CreateChannel(server_address_, InsecureChannelCredentials())); + grpc::string lb_token = lb_id + lb_tag; + for (int i = 0; i < num_requests; ++i) { + ClientContext ctx; + if (!lb_token.empty()) ctx.AddMetadata(GRPC_LB_TOKEN_MD_KEY, lb_token); + EchoRequest request; + EchoResponse response; + request.set_message(message); + Status status = stub->Echo(&ctx, request, &response); + if (message == kOkMessage) { + ASSERT_EQ(status.error_code(), StatusCode::OK); + ASSERT_EQ(request.message(), response.message()); + } else if (message == kServerErrorMessage) { + ASSERT_EQ(status.error_code(), StatusCode::UNKNOWN); + } else if (message == kClientErrorMessage) { + ASSERT_EQ(status.error_code(), StatusCode::FAILED_PRECONDITION); + } + } + } + + grpc::string server_address_; + std::unique_ptr<Server> server_; + std::thread server_thread_; + EchoTestServiceImpl echo_service_; +}; + +TEST_F(ServerLoadReportingEnd2endTest, NoCall) {} + +TEST_F(ServerLoadReportingEnd2endTest, BasicReport) { + auto channel = + grpc::CreateChannel(server_address_, InsecureChannelCredentials()); + auto stub = ::grpc::lb::v1::LoadReporter::NewStub(channel); + ClientContext ctx; + auto stream = stub->ReportLoad(&ctx); + ::grpc::lb::v1::LoadReportRequest request; + request.mutable_initial_request()->set_load_balanced_hostname( + server_address_); + request.mutable_initial_request()->set_load_key("LOAD_KEY"); + request.mutable_initial_request() + ->mutable_load_report_interval() + ->set_seconds(5); + stream->Write(request); + gpr_log(GPR_INFO, "Initial request sent."); + ::grpc::lb::v1::LoadReportResponse response; + stream->Read(&response); + const grpc::string& lb_id = response.initial_response().load_balancer_id(); + gpr_log(GPR_INFO, "Initial response received (lb_id: %s).", lb_id.c_str()); + ClientMakeEchoCalls(lb_id, "LB_TAG", kOkMessage, 1); + while (true) { + stream->Read(&response); + if (!response.load().empty()) { + ASSERT_EQ(response.load().size(), 3); + for (const auto& load : response.load()) { + if (load.in_progress_report_case()) { + // The special load record that reports the number of in-progress + // calls. + ASSERT_EQ(load.num_calls_in_progress(), 1); + } else if (load.orphaned_load_case()) { + // The call from the balancer doesn't have any valid LB token. + ASSERT_EQ(load.orphaned_load_case(), load.kLoadKeyUnknown); + ASSERT_EQ(load.num_calls_started(), 1); + ASSERT_EQ(load.num_calls_finished_without_error(), 0); + ASSERT_EQ(load.num_calls_finished_with_error(), 0); + } else { + // This corresponds to the calls from the client. + ASSERT_EQ(load.num_calls_started(), 1); + ASSERT_EQ(load.num_calls_finished_without_error(), 1); + ASSERT_EQ(load.num_calls_finished_with_error(), 0); + ASSERT_GE(load.total_bytes_received(), sizeof(kOkMessage)); + ASSERT_GE(load.total_bytes_sent(), sizeof(kOkMessage)); + ASSERT_EQ(load.metric_data().size(), 1); + ASSERT_EQ(load.metric_data().Get(0).metric_name(), kMetricName); + ASSERT_EQ(load.metric_data().Get(0).num_calls_finished_with_metric(), + 1); + ASSERT_EQ(load.metric_data().Get(0).total_metric_value(), + kMetricValue); + } + } + break; + } + } + stream->WritesDone(); + ASSERT_EQ(stream->Finish().error_code(), StatusCode::CANCELLED); +} + +// TODO(juanlishen): Add more tests. + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/cpp/server/load_reporter/BUILD b/test/cpp/server/load_reporter/BUILD index ebfcfbb348..b7c4d29d71 100644 --- a/test/cpp/server/load_reporter/BUILD +++ b/test/cpp/server/load_reporter/BUILD @@ -42,6 +42,7 @@ grpc_cc_test( "//:gpr", "//:grpc", "//:lb_load_reporter", + "//:lb_server_load_reporting_filter", "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", ], diff --git a/test/cpp/server/load_reporter/load_reporter_test.cc b/test/cpp/server/load_reporter/load_reporter_test.cc index 3264dba134..719c3a67d9 100644 --- a/test/cpp/server/load_reporter/load_reporter_test.cc +++ b/test/cpp/server/load_reporter/load_reporter_test.cc @@ -25,6 +25,7 @@ #include <grpc/grpc.h> #include <gtest/gtest.h> +#include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/cpp/server/load_reporter/constants.h" #include "src/cpp/server/load_reporter/load_reporter.h" @@ -123,6 +124,14 @@ class LoadReporterTest : public ::testing::Test { private: void SetUp() override { + // Access the measures to make them valid. + ::grpc::load_reporter::MeasureStartCount(); + ::grpc::load_reporter::MeasureEndCount(); + ::grpc::load_reporter::MeasureEndBytesSent(); + ::grpc::load_reporter::MeasureEndBytesReceived(); + ::grpc::load_reporter::MeasureEndLatencyMs(); + ::grpc::load_reporter::MeasureOtherCallMetric(); + // Set up the load reporter. auto mock_cpu = new MockCpuStatsProvider(); auto mock_census = new MockCensusViewProvider(); // Prepare the initial CPU stats data. Note that the expectation should be |