From 41e4cedb7012a55376322b142d74eae5e86b95e3 Mon Sep 17 00:00:00 2001 From: Vizerai Date: Fri, 13 Apr 2018 18:19:21 -0700 Subject: Adding opencensus grpc plugin. Rebasing to merge commits. --- test/cpp/ext/filters/census/BUILD | 55 +++ .../ext/filters/census/grpc_plugin_benchmark.cc | 125 +++++++ .../filters/census/stats_plugin_end2end_test.cc | 376 +++++++++++++++++++++ 3 files changed, 556 insertions(+) create mode 100644 test/cpp/ext/filters/census/BUILD create mode 100644 test/cpp/ext/filters/census/grpc_plugin_benchmark.cc create mode 100644 test/cpp/ext/filters/census/stats_plugin_end2end_test.cc (limited to 'test/cpp/ext') diff --git a/test/cpp/ext/filters/census/BUILD b/test/cpp/ext/filters/census/BUILD new file mode 100644 index 0000000000..d6968d671b --- /dev/null +++ b/test/cpp/ext/filters/census/BUILD @@ -0,0 +1,55 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package") +load("//bazel:cc_grpc_library.bzl", "cc_grpc_library") + +licenses(["notice"]) # Apache v2 + +grpc_package(name = "test/core/ext/census") + +grpc_cc_test( + name = "grpc_opencensus_plugin_test", + srcs = [ + "stats_plugin_end2end_test.cc", + ], + language = "C++", + external_deps = [ + "gtest", + "gmock", + "opencensus-stats-test", + ], + deps = [ + "//:grpc++", + "//:grpc_opencensus_plugin", + "//src/proto/grpc/testing:echo_proto", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + "//test/cpp/util:test_config", + ], +) + +grpc_cc_test( + name = "grpc_opencensus_plugin_benchmark", + srcs = ["grpc_plugin_benchmark.cc"], + language = "C++", + deps = [ + "//:grpc_opencensus_plugin", + "//src/proto/grpc/testing:echo_proto", + ], + external_deps = [ + "benchmark", + ], +) diff --git a/test/cpp/ext/filters/census/grpc_plugin_benchmark.cc b/test/cpp/ext/filters/census/grpc_plugin_benchmark.cc new file mode 100644 index 0000000000..1f48f88710 --- /dev/null +++ b/test/cpp/ext/filters/census/grpc_plugin_benchmark.cc @@ -0,0 +1,125 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include // NOLINT + +#include "absl/base/call_once.h" +#include "absl/strings/str_cat.h" +#include "benchmark/benchmark.h" +#include "include/grpc++/grpc++.h" +#include "opencensus/stats/stats.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/cpp/microbenchmarks/helpers.h" + +namespace grpc { +namespace { + +absl::once_flag once; +void RegisterOnce() { absl::call_once(once, RegisterOpenCensusPlugin); } + +class EchoServer final : public testing::EchoTestService::Service { + ::grpc::Status Echo(::grpc::ServerContext* context, + const testing::EchoRequest* request, + testing::EchoResponse* response) override { + if (request->param().expected_error().code() == 0) { + response->set_message(request->message()); + return ::grpc::Status::OK; + } else { + return ::grpc::Status(static_cast<::grpc::StatusCode>( + request->param().expected_error().code()), + ""); + } + } +}; + +// An EchoServerThread object creates an EchoServer on a separate thread and +// shuts down the server and thread when it goes out of scope. +class EchoServerThread final { + public: + EchoServerThread() { + ::grpc::ServerBuilder builder; + int port; + builder.AddListeningPort("[::]:0", ::grpc::InsecureServerCredentials(), + &port); + builder.RegisterService(&service_); + server_ = builder.BuildAndStart(); + if (server_ == nullptr || port == 0) { + std::abort(); + } + server_address_ = absl::StrCat("[::]:", port); + server_thread_ = std::thread(&EchoServerThread::RunServerLoop, this); + } + + ~EchoServerThread() { + server_->Shutdown(); + server_thread_.join(); + } + + const std::string& address() { return server_address_; } + + private: + void RunServerLoop() { server_->Wait(); } + + std::string server_address_; + EchoServer service_; + std::unique_ptr server_; + std::thread server_thread_; +}; + +void BM_E2eLatencyCensusDisabled(benchmark::State& state) { + EchoServerThread server; + std::unique_ptr stub = + testing::EchoTestService::NewStub(::grpc::CreateChannel( + server.address(), ::grpc::InsecureChannelCredentials())); + + testing::EchoResponse response; + for (auto _ : state) { + testing::EchoRequest request; + ::grpc::ClientContext context; + ::grpc::Status status = stub->Echo(&context, request, &response); + } +} +BENCHMARK(BM_E2eLatencyCensusDisabled); + +void BM_E2eLatencyCensusEnabled(benchmark::State& state) { + RegisterOnce(); + // This we can safely repeat, and doing so clears accumulated data to avoid + // initialization costs varying between runs. + RegisterGrpcViewsForExport(); + + EchoServerThread server; + std::unique_ptr stub = + testing::EchoTestService::NewStub(::grpc::CreateChannel( + server.address(), ::grpc::InsecureChannelCredentials())); + + testing::EchoResponse response; + for (auto _ : state) { + testing::EchoRequest request; + ::grpc::ClientContext context; + ::grpc::Status status = stub->Echo(&context, request, &response); + } +} +BENCHMARK(BM_E2eLatencyCensusEnabled); + +} // namespace +} // namespace grpc + +BENCHMARK_MAIN(); diff --git a/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc b/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc new file mode 100644 index 0000000000..664504a090 --- /dev/null +++ b/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc @@ -0,0 +1,376 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include // NOLINT +#include + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "include/grpc++/grpc++.h" +#include "opencensus/stats/stats.h" +#include "opencensus/stats/testing/test_utils.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/test_config.h" + +namespace grpc { +namespace testing { +namespace { + +using ::opencensus::stats::Aggregation; +using ::opencensus::stats::Distribution; +using ::opencensus::stats::View; +using ::opencensus::stats::ViewDescriptor; +using ::opencensus::stats::testing::TestUtils; + +class EchoServer final : public EchoTestService::Service { + ::grpc::Status Echo(::grpc::ServerContext* context, + const EchoRequest* request, + EchoResponse* response) override { + if (request->param().expected_error().code() == 0) { + response->set_message(request->message()); + return ::grpc::Status::OK; + } else { + return ::grpc::Status(static_cast<::grpc::StatusCode>( + request->param().expected_error().code()), + ""); + } + } +}; + +class StatsPluginEnd2EndTest : public ::testing::Test { + protected: + static void SetUpTestCase() { RegisterOpenCensusPlugin(); } + + void SetUp() { + // Set up a synchronous server on a different thread to avoid the asynch + // interface. + ::grpc::ServerBuilder builder; + int port; + // Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis. + builder.AddListeningPort("0.0.0.0:0", ::grpc::InsecureServerCredentials(), + &port); + builder.RegisterService(&service_); + server_ = builder.BuildAndStart(); + ASSERT_NE(nullptr, server_); + ASSERT_NE(0, port); + server_address_ = absl::StrCat("0.0.0.0:", port); + server_thread_ = std::thread(&StatsPluginEnd2EndTest::RunServerLoop, this); + + stub_ = EchoTestService::NewStub(::grpc::CreateChannel( + server_address_, ::grpc::InsecureChannelCredentials())); + } + + void TearDown() { + server_->Shutdown(); + server_thread_.join(); + } + + void RunServerLoop() { server_->Wait(); } + + const std::string client_method_name_ = "grpc.testing.EchoTestService/Echo"; + const std::string server_method_name_ = "grpc.testing.EchoTestService/Echo"; + + std::string server_address_; + EchoServer service_; + std::unique_ptr server_; + std::thread server_thread_; + + std::unique_ptr stub_; +}; + +TEST_F(StatsPluginEnd2EndTest, ErrorCount) { + const auto client_method_descriptor = + ViewDescriptor() + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_name("client_method") + .set_aggregation(Aggregation::Count()) + .add_column(ClientMethodTagKey()); + View client_method_view(client_method_descriptor); + const auto server_method_descriptor = + ViewDescriptor() + .set_measure(kRpcServerServerLatencyMeasureName) + .set_name("server_method") + .set_aggregation(Aggregation::Count()) + .add_column(ServerMethodTagKey()); + View server_method_view(server_method_descriptor); + + const auto client_status_descriptor = + ViewDescriptor() + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_name("client_status") + .set_aggregation(Aggregation::Count()) + .add_column(ClientStatusTagKey()); + View client_status_view(client_status_descriptor); + const auto server_status_descriptor = + ViewDescriptor() + .set_measure(kRpcServerServerLatencyMeasureName) + .set_name("server_status") + .set_aggregation(Aggregation::Count()) + .add_column(ServerStatusTagKey()); + View server_status_view(server_status_descriptor); + + // Cover all valid statuses. + for (int i = 0; i <= 16; ++i) { + EchoRequest request; + request.set_message("foo"); + request.mutable_param()->mutable_expected_error()->set_code(i); + EchoResponse response; + ::grpc::ClientContext context; + ::grpc::Status status = stub_->Echo(&context, request, &response); + } + absl::SleepFor(absl::Milliseconds(500)); + TestUtils::Flush(); + + EXPECT_THAT(client_method_view.GetData().int_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), 17))); + EXPECT_THAT(server_method_view.GetData().int_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_), 17))); + + auto codes = { + ::testing::Pair(::testing::ElementsAre("OK"), 1), + ::testing::Pair(::testing::ElementsAre("CANCELLED"), 1), + ::testing::Pair(::testing::ElementsAre("UNKNOWN"), 1), + ::testing::Pair(::testing::ElementsAre("INVALID_ARGUMENT"), 1), + ::testing::Pair(::testing::ElementsAre("DEADLINE_EXCEEDED"), 1), + ::testing::Pair(::testing::ElementsAre("NOT_FOUND"), 1), + ::testing::Pair(::testing::ElementsAre("ALREADY_EXISTS"), 1), + ::testing::Pair(::testing::ElementsAre("PERMISSION_DENIED"), 1), + ::testing::Pair(::testing::ElementsAre("UNAUTHENTICATED"), 1), + ::testing::Pair(::testing::ElementsAre("RESOURCE_EXHAUSTED"), 1), + ::testing::Pair(::testing::ElementsAre("FAILED_PRECONDITION"), 1), + ::testing::Pair(::testing::ElementsAre("ABORTED"), 1), + ::testing::Pair(::testing::ElementsAre("OUT_OF_RANGE"), 1), + ::testing::Pair(::testing::ElementsAre("UNIMPLEMENTED"), 1), + ::testing::Pair(::testing::ElementsAre("INTERNAL"), 1), + ::testing::Pair(::testing::ElementsAre("UNAVAILABLE"), 1), + ::testing::Pair(::testing::ElementsAre("DATA_LOSS"), 1), + }; + + EXPECT_THAT(client_status_view.GetData().int_data(), + ::testing::UnorderedElementsAreArray(codes)); + EXPECT_THAT(server_status_view.GetData().int_data(), + ::testing::UnorderedElementsAreArray(codes)); +} + +TEST_F(StatsPluginEnd2EndTest, RequestReceivedBytesPerRpc) { + View client_sent_bytes_per_rpc_view(ClientSentBytesPerRpcCumulative()); + View client_received_bytes_per_rpc_view( + ClientReceivedBytesPerRpcCumulative()); + View server_sent_bytes_per_rpc_view(ServerSentBytesPerRpcCumulative()); + View server_received_bytes_per_rpc_view( + ServerReceivedBytesPerRpcCumulative()); + + { + EchoRequest request; + request.set_message("foo"); + EchoResponse response; + ::grpc::ClientContext context; + ::grpc::Status status = stub_->Echo(&context, request, &response); + ASSERT_TRUE(status.ok()); + EXPECT_EQ("foo", response.message()); + } + absl::SleepFor(absl::Milliseconds(500)); + TestUtils::Flush(); + + EXPECT_THAT(client_received_bytes_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, + ::testing::Gt(0.0)))))); + EXPECT_THAT(client_sent_bytes_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, + ::testing::Gt(0.0)))))); + EXPECT_THAT(server_received_bytes_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, + ::testing::Gt(0.0)))))); + EXPECT_THAT(server_sent_bytes_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, + ::testing::Gt(0.0)))))); +} + +TEST_F(StatsPluginEnd2EndTest, Latency) { + View client_latency_view(ClientRoundtripLatencyCumulative()); + View client_server_latency_view(ClientServerLatencyCumulative()); + View server_server_latency_view(ServerServerLatencyCumulative()); + + const absl::Time start_time = absl::Now(); + { + EchoRequest request; + request.set_message("foo"); + EchoResponse response; + ::grpc::ClientContext context; + ::grpc::Status status = stub_->Echo(&context, request, &response); + ASSERT_TRUE(status.ok()); + EXPECT_EQ("foo", response.message()); + } + // We do not know exact latency/elapsed time, but we know it is less than the + // entire time spent making the RPC. + const double max_time = absl::ToDoubleMilliseconds(absl::Now() - start_time); + + absl::SleepFor(absl::Milliseconds(500)); + TestUtils::Flush(); + + EXPECT_THAT( + client_latency_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), + ::testing::AllOf( + ::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, ::testing::Gt(0.0)), + ::testing::Property(&Distribution::mean, + ::testing::Lt(max_time)))))); + + // Elapsed time is a subinterval of total latency. + const auto client_latency = client_latency_view.GetData() + .distribution_data() + .find({client_method_name_}) + ->second.mean(); + EXPECT_THAT( + client_server_latency_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), + ::testing::AllOf( + ::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, ::testing::Gt(0.0)), + ::testing::Property(&Distribution::mean, + ::testing::Lt(client_latency)))))); + + // client server elapsed time should be the same value propagated to the + // client. + const auto client_elapsed_time = client_server_latency_view.GetData() + .distribution_data() + .find({client_method_name_}) + ->second.mean(); + EXPECT_THAT( + server_server_latency_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_), + ::testing::AllOf( + ::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, + ::testing::DoubleEq(client_elapsed_time)))))); +} + +TEST_F(StatsPluginEnd2EndTest, CompletedRpcs) { + View client_completed_rpcs_view(ClientCompletedRpcsCumulative()); + View server_completed_rpcs_view(ServerCompletedRpcsCumulative()); + + EchoRequest request; + request.set_message("foo"); + EchoResponse response; + const int count = 5; + for (int i = 0; i < count; ++i) { + { + ::grpc::ClientContext context; + ::grpc::Status status = stub_->Echo(&context, request, &response); + ASSERT_TRUE(status.ok()); + EXPECT_EQ("foo", response.message()); + } + absl::SleepFor(absl::Milliseconds(500)); + TestUtils::Flush(); + + EXPECT_THAT(client_completed_rpcs_view.GetData().int_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_, "OK"), i + 1))); + EXPECT_THAT(server_completed_rpcs_view.GetData().int_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_, "OK"), i + 1))); + } +} + +TEST_F(StatsPluginEnd2EndTest, RequestReceivedMessagesPerRpc) { + // TODO: Use streaming RPCs. + View client_received_messages_per_rpc_view( + ClientSentMessagesPerRpcCumulative()); + View client_sent_messages_per_rpc_view( + ClientReceivedMessagesPerRpcCumulative()); + View server_received_messages_per_rpc_view( + ServerSentMessagesPerRpcCumulative()); + View server_sent_messages_per_rpc_view( + ServerReceivedMessagesPerRpcCumulative()); + + EchoRequest request; + request.set_message("foo"); + EchoResponse response; + const int count = 5; + for (int i = 0; i < count; ++i) { + { + ::grpc::ClientContext context; + ::grpc::Status status = stub_->Echo(&context, request, &response); + ASSERT_TRUE(status.ok()); + EXPECT_EQ("foo", response.message()); + } + absl::SleepFor(absl::Milliseconds(500)); + TestUtils::Flush(); + + EXPECT_THAT( + client_received_messages_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, i + 1), + ::testing::Property(&Distribution::mean, + ::testing::DoubleEq(1.0)))))); + EXPECT_THAT( + client_sent_messages_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, i + 1), + ::testing::Property(&Distribution::mean, + ::testing::DoubleEq(1.0)))))); + EXPECT_THAT( + server_received_messages_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, i + 1), + ::testing::Property(&Distribution::mean, + ::testing::DoubleEq(1.0)))))); + EXPECT_THAT( + server_sent_messages_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, i + 1), + ::testing::Property(&Distribution::mean, + ::testing::DoubleEq(1.0)))))); + } +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} -- cgit v1.2.3