diff options
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 7 | ||||
-rw-r--r-- | test/cpp/end2end/client_crash_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/end2end/client_crash_test_server.cc | 1 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 192 | ||||
-rw-r--r-- | test/cpp/end2end/generic_end2end_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/end2end/mock_test.cc | 4 | ||||
-rw-r--r-- | test/cpp/end2end/server_crash_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/end2end/server_crash_test_client.cc | 1 | ||||
-rw-r--r-- | test/cpp/end2end/shutdown_test.cc | 4 | ||||
-rw-r--r-- | test/cpp/end2end/streaming_throughput_test.cc | 189 | ||||
-rw-r--r-- | test/cpp/end2end/thread_stress_test.cc | 4 | ||||
-rw-r--r-- | test/cpp/end2end/zookeeper_test.cc | 2 |
12 files changed, 363 insertions, 47 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index bbcac9ba34..cfb6c21edc 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -39,11 +39,9 @@ #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> -#include <grpc++/credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> #include <grpc++/server_context.h> -#include <grpc++/server_credentials.h> #include <gtest/gtest.h> #include "test/core/util/port.h" @@ -202,7 +200,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> { void ResetStub() { std::shared_ptr<Channel> channel = CreateChannel(server_address_.str(), InsecureCredentials()); - stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); + stub_ = grpc::cpp::test::util::TestService::NewStub(channel); } void SendRpc(int num_rpcs) { @@ -753,8 +751,7 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) { std::shared_ptr<Channel> channel = CreateChannel(server_address_.str(), InsecureCredentials()); std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub; - stub = - std::move(grpc::cpp::test::util::UnimplementedService::NewStub(channel)); + stub = grpc::cpp::test::util::UnimplementedService::NewStub(channel); EchoRequest send_request; EchoResponse recv_response; Status recv_status; diff --git a/test/cpp/end2end/client_crash_test.cc b/test/cpp/end2end/client_crash_test.cc index 3a6e55216a..058e696166 100644 --- a/test/cpp/end2end/client_crash_test.cc +++ b/test/cpp/end2end/client_crash_test.cc @@ -37,11 +37,9 @@ #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> -#include <grpc++/credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> #include <grpc++/server_context.h> -#include <grpc++/server_credentials.h> #include <gtest/gtest.h> #include "test/core/util/port.h" diff --git a/test/cpp/end2end/client_crash_test_server.cc b/test/cpp/end2end/client_crash_test_server.cc index 79a7832874..7ffeecca70 100644 --- a/test/cpp/end2end/client_crash_test_server.cc +++ b/test/cpp/end2end/client_crash_test_server.cc @@ -39,7 +39,6 @@ #include <grpc++/server.h> #include <grpc++/server_builder.h> #include <grpc++/server_context.h> -#include <grpc++/server_credentials.h> #include "test/cpp/util/echo.grpc.pb.h" DEFINE_string(address, "", "Address to bind to"); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 37a6e693e6..bd829d96e1 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -40,11 +40,12 @@ #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> -#include <grpc++/credentials.h> +#include <grpc++/security/auth_metadata_processor.h> +#include <grpc++/security/credentials.h> +#include <grpc++/security/server_credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> #include <grpc++/server_context.h> -#include <grpc++/server_credentials.h> #include <gtest/gtest.h> #include "src/core/security/credentials.h" @@ -79,14 +80,23 @@ void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, } } -void CheckServerAuthContext(const ServerContext* context) { +void CheckServerAuthContext(const ServerContext* context, + const grpc::string& expected_client_identity) { std::shared_ptr<const AuthContext> auth_ctx = context->auth_context(); std::vector<grpc::string_ref> ssl = auth_ctx->FindPropertyValues("transport_security_type"); EXPECT_EQ(1u, ssl.size()); EXPECT_EQ("ssl", ToString(ssl[0])); - EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty()); - EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty()); + if (expected_client_identity.length() == 0) { + EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty()); + EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty()); + EXPECT_FALSE(auth_ctx->IsPeerAuthenticated()); + } else { + auto identity = auth_ctx->GetPeerIdentity(); + EXPECT_TRUE(auth_ctx->IsPeerAuthenticated()); + EXPECT_EQ(1u, identity.size()); + EXPECT_EQ(expected_client_identity, identity[0]); + } } bool CheckIsLocalhost(const grpc::string& addr) { @@ -98,6 +108,54 @@ bool CheckIsLocalhost(const grpc::string& addr) { addr.substr(0, kIpv6.size()) == kIpv6; } +class TestAuthMetadataProcessor : public AuthMetadataProcessor { + public: + static const char kGoodGuy[]; + + TestAuthMetadataProcessor(bool is_blocking) : is_blocking_(is_blocking) {} + + std::shared_ptr<Credentials> GetCompatibleClientCreds() { + return AccessTokenCredentials(kGoodGuy); + } + std::shared_ptr<Credentials> GetIncompatibleClientCreds() { + return AccessTokenCredentials("Mr Hyde"); + } + + // Interface implementation + bool IsBlocking() const GRPC_OVERRIDE { return is_blocking_; } + + Status Process(const InputMetadata& auth_metadata, AuthContext* context, + OutputMetadata* consumed_auth_metadata, + OutputMetadata* response_metadata) GRPC_OVERRIDE { + EXPECT_TRUE(consumed_auth_metadata != nullptr); + EXPECT_TRUE(context != nullptr); + EXPECT_TRUE(response_metadata != nullptr); + auto auth_md = auth_metadata.find(GRPC_AUTHORIZATION_METADATA_KEY); + EXPECT_NE(auth_md, auth_metadata.end()); + string_ref auth_md_value = auth_md->second; + if (auth_md_value.ends_with(kGoodGuy)) { + context->AddProperty(kIdentityPropName, kGoodGuy); + context->SetPeerIdentityPropertyName(kIdentityPropName); + consumed_auth_metadata->insert( + std::make_pair(string(auth_md->first.data(), auth_md->first.length()), + auth_md->second)); + return Status::OK; + } else { + return Status(StatusCode::UNAUTHENTICATED, + string("Invalid principal: ") + + string(auth_md_value.data(), auth_md_value.length())); + } + } + + protected: + static const char kIdentityPropName[]; + bool is_blocking_; +}; + +const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll"; +const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity"; + + } // namespace class Proxy : public ::grpc::cpp::test::util::TestService::Service { @@ -162,8 +220,10 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { ToString(iter->second)); } } - if (request->has_param() && request->param().check_auth_context()) { - CheckServerAuthContext(context); + if (request->has_param() && + (request->param().expected_client_identity().length() > 0 || + request->param().check_auth_context())) { + CheckServerAuthContext(context, request->param().expected_client_identity()); } if (request->has_param() && request->param().response_message_length() > 0) { @@ -259,9 +319,18 @@ class TestServiceImplDupPkg class End2endTest : public ::testing::TestWithParam<bool> { protected: End2endTest() - : kMaxMessageSize_(8192), special_service_("special") {} + : is_server_started_(false), + kMaxMessageSize_(8192), + special_service_("special") {} + + void TearDown() GRPC_OVERRIDE { + if (is_server_started_) { + server_->Shutdown(); + if (proxy_server_) proxy_server_->Shutdown(); + } + } - void SetUp() GRPC_OVERRIDE { + void StartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) { int port = grpc_pick_unused_port_or_die(); server_address_ << "127.0.0.1:" << port; // Setup server @@ -271,22 +340,23 @@ class End2endTest : public ::testing::TestWithParam<bool> { SslServerCredentialsOptions ssl_opts; ssl_opts.pem_root_certs = ""; ssl_opts.pem_key_cert_pairs.push_back(pkcp); - builder.AddListeningPort(server_address_.str(), - SslServerCredentials(ssl_opts)); + auto server_creds = SslServerCredentials(ssl_opts); + server_creds->SetAuthMetadataProcessor(processor); + builder.AddListeningPort(server_address_.str(), server_creds); builder.RegisterService(&service_); builder.RegisterService("foo.test.youtube.com", &special_service_); builder.SetMaxMessageSize( kMaxMessageSize_); // For testing max message size. builder.RegisterService(&dup_pkg_service_); server_ = builder.BuildAndStart(); - } - - void TearDown() GRPC_OVERRIDE { - server_->Shutdown(); - if (proxy_server_) proxy_server_->Shutdown(); + is_server_started_ = true; } void ResetChannel() { + if (!is_server_started_) { + StartServer(std::shared_ptr<AuthMetadataProcessor>()); + } + EXPECT_TRUE(is_server_started_); SslCredentialsOptions ssl_opts = {test_root_cert, "", ""}; ChannelArguments args; args.SetSslTargetNameOverride("foo.test.google.fr"); @@ -310,9 +380,10 @@ class End2endTest : public ::testing::TestWithParam<bool> { channel_ = CreateChannel(proxyaddr.str(), InsecureCredentials()); } - stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel_)); + stub_ = grpc::cpp::test::util::TestService::NewStub(channel_); } + bool is_server_started_; std::shared_ptr<Channel> channel_; std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_; std::unique_ptr<Server> server_; @@ -562,7 +633,7 @@ TEST_F(End2endTest, DiffPackageServices) { // rpc and stream should fail on bad credentials. TEST_F(End2endTest, BadCredentials) { - std::shared_ptr<Credentials> bad_creds = ServiceAccountCredentials("", "", 1); + std::shared_ptr<Credentials> bad_creds = GoogleRefreshTokenCredentials(""); EXPECT_EQ(static_cast<Credentials*>(nullptr), bad_creds.get()); std::shared_ptr<Channel> channel = CreateChannel(server_address_.str(), bad_creds); @@ -743,7 +814,7 @@ TEST_F(End2endTest, SetPerCallCredentials) { EchoResponse response; ClientContext context; std::shared_ptr<Credentials> creds = - IAMCredentials("fake_token", "fake_selector"); + GoogleIAMCredentials("fake_token", "fake_selector"); context.set_credentials(creds); request.set_message("Hello"); request.mutable_param()->set_echo_metadata(true); @@ -780,10 +851,10 @@ TEST_F(End2endTest, OverridePerCallCredentials) { EchoResponse response; ClientContext context; std::shared_ptr<Credentials> creds1 = - IAMCredentials("fake_token1", "fake_selector1"); + GoogleIAMCredentials("fake_token1", "fake_selector1"); context.set_credentials(creds1); std::shared_ptr<Credentials> creds2 = - IAMCredentials("fake_token2", "fake_selector2"); + GoogleIAMCredentials("fake_token2", "fake_selector2"); context.set_credentials(creds2); request.set_message("Hello"); request.mutable_param()->set_echo_metadata(true); @@ -805,6 +876,82 @@ TEST_F(End2endTest, OverridePerCallCredentials) { EXPECT_TRUE(s.ok()); } +TEST_F(End2endTest, NonBlockingAuthMetadataProcessorSuccess) { + auto* processor = new TestAuthMetadataProcessor(false); + StartServer(std::shared_ptr<AuthMetadataProcessor>(processor)); + ResetStub(false); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_credentials(processor->GetCompatibleClientCreds()); + request.set_message("Hello"); + request.mutable_param()->set_echo_metadata(true); + request.mutable_param()->set_expected_client_identity( + TestAuthMetadataProcessor::kGoodGuy); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(request.message(), response.message()); + EXPECT_TRUE(s.ok()); + + // Metadata should have been consumed by the processor. + EXPECT_FALSE(MetadataContains( + context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY, + grpc::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy)); +} + +TEST_F(End2endTest, NonBlockingAuthMetadataProcessorFailure) { + auto* processor = new TestAuthMetadataProcessor(false); + StartServer(std::shared_ptr<AuthMetadataProcessor>(processor)); + ResetStub(false); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_credentials(processor->GetIncompatibleClientCreds()); + request.set_message("Hello"); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); +} + +TEST_F(End2endTest, BlockingAuthMetadataProcessorSuccess) { + auto* processor = new TestAuthMetadataProcessor(true); + StartServer(std::shared_ptr<AuthMetadataProcessor>(processor)); + ResetStub(false); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_credentials(processor->GetCompatibleClientCreds()); + request.set_message("Hello"); + request.mutable_param()->set_echo_metadata(true); + request.mutable_param()->set_expected_client_identity( + TestAuthMetadataProcessor::kGoodGuy); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(request.message(), response.message()); + EXPECT_TRUE(s.ok()); + + // Metadata should have been consumed by the processor. + EXPECT_FALSE(MetadataContains( + context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY, + grpc::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy)); +} + +TEST_F(End2endTest, BlockingAuthMetadataProcessorFailure) { + auto* processor = new TestAuthMetadataProcessor(true); + StartServer(std::shared_ptr<AuthMetadataProcessor>(processor)); + ResetStub(false); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_credentials(processor->GetIncompatibleClientCreds()); + request.set_message("Hello"); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); +} + // Client sends 20 requests and the server returns CANCELLED status after // reading 10 requests. TEST_F(End2endTest, RequestStreamServerEarlyCancelTest) { @@ -933,8 +1080,7 @@ TEST_F(End2endTest, ChannelState) { TEST_F(End2endTest, NonExistingService) { ResetChannel(); std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub; - stub = - std::move(grpc::cpp::test::util::UnimplementedService::NewStub(channel_)); + stub = grpc::cpp::test::util::UnimplementedService::NewStub(channel_); EchoRequest request; EchoResponse response; diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index 7acbc711fb..6a46916728 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -40,13 +40,11 @@ #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> -#include <grpc++/credentials.h> #include <grpc++/generic/async_generic_service.h> #include <grpc++/generic/generic_stub.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> #include <grpc++/server_context.h> -#include <grpc++/server_credentials.h> #include <grpc++/support/slice.h> #include <gtest/gtest.h> diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc index 077d21aa72..9c35fede8f 100644 --- a/test/cpp/end2end/mock_test.cc +++ b/test/cpp/end2end/mock_test.cc @@ -39,11 +39,9 @@ #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> -#include <grpc++/credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> #include <grpc++/server_context.h> -#include <grpc++/server_credentials.h> #include <gtest/gtest.h> #include "test/core/util/port.h" @@ -247,7 +245,7 @@ class MockTest : public ::testing::Test { void ResetStub() { std::shared_ptr<Channel> channel = CreateChannel(server_address_.str(), InsecureCredentials()); - stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); + stub_ = grpc::cpp::test::util::TestService::NewStub(channel); } std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_; diff --git a/test/cpp/end2end/server_crash_test.cc b/test/cpp/end2end/server_crash_test.cc index 1a0f04e22b..4b6793abe6 100644 --- a/test/cpp/end2end/server_crash_test.cc +++ b/test/cpp/end2end/server_crash_test.cc @@ -37,11 +37,9 @@ #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> -#include <grpc++/credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> #include <grpc++/server_context.h> -#include <grpc++/server_credentials.h> #include <gtest/gtest.h> #include "test/core/util/port.h" diff --git a/test/cpp/end2end/server_crash_test_client.cc b/test/cpp/end2end/server_crash_test_client.cc index 6ff42fcb30..17869362c2 100644 --- a/test/cpp/end2end/server_crash_test_client.cc +++ b/test/cpp/end2end/server_crash_test_client.cc @@ -40,7 +40,6 @@ #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> -#include <grpc++/credentials.h> #include "test/cpp/util/echo.grpc.pb.h" DEFINE_string(address, "", "Address to connect to"); diff --git a/test/cpp/end2end/shutdown_test.cc b/test/cpp/end2end/shutdown_test.cc index 59fec6ad40..0549bb8b5f 100644 --- a/test/cpp/end2end/shutdown_test.cc +++ b/test/cpp/end2end/shutdown_test.cc @@ -38,11 +38,9 @@ #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> -#include <grpc++/credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> #include <grpc++/server_context.h> -#include <grpc++/server_credentials.h> #include <gtest/gtest.h> #include "src/core/support/env.h" @@ -96,7 +94,7 @@ class ShutdownTest : public ::testing::Test { void ResetStub() { string target = "dns:localhost:" + to_string(port_); channel_ = CreateChannel(target, InsecureCredentials()); - stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel_)); + stub_ = grpc::cpp::test::util::TestService::NewStub(channel_); } string to_string(const int number) { diff --git a/test/cpp/end2end/streaming_throughput_test.cc b/test/cpp/end2end/streaming_throughput_test.cc new file mode 100644 index 0000000000..63d49c0425 --- /dev/null +++ b/test/cpp/end2end/streaming_throughput_test.cc @@ -0,0 +1,189 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <atomic> +#include <mutex> +#include <thread> + +#include <grpc++/channel.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/security/credentials.h> +#include <grpc++/security/server_credentials.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> +#include <grpc/grpc.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> +#include <gtest/gtest.h> + +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/util/echo_duplicate.grpc.pb.h" +#include "test/cpp/util/echo.grpc.pb.h" + +using grpc::cpp::test::util::EchoRequest; +using grpc::cpp::test::util::EchoResponse; +using std::chrono::system_clock; + +const char* kLargeString = "(" + "To be, or not to be- that is the question:" + "Whether 'tis nobler in the mind to suffer" + "The slings and arrows of outrageous fortune" + "Or to take arms against a sea of troubles," + "And by opposing end them. To die- to sleep-" + "No more; and by a sleep to say we end" + "The heartache, and the thousand natural shock" + "That flesh is heir to. 'Tis a consummation" + "Devoutly to be wish'd. To die- to sleep." + "To sleep- perchance to dream: ay, there's the rub!" + "For in that sleep of death what dreams may come" + "When we have shuffled off this mortal coil," + "Must give us pause. There's the respect" + "That makes calamity of so long life." + "For who would bear the whips and scorns of time," + "Th' oppressor's wrong, the proud man's contumely," + "The pangs of despis'd love, the law's delay," + "The insolence of office, and the spurns" + "That patient merit of th' unworthy takes," + "When he himself might his quietus make" + "With a bare bodkin? Who would these fardels bear," + "To grunt and sweat under a weary life," + "But that the dread of something after death-" + "The undiscover'd country, from whose bourn" + "No traveller returns- puzzles the will," + "And makes us rather bear those ills we have" + "Than fly to others that we know not of?" + "Thus conscience does make cowards of us all," + "And thus the native hue of resolution" + "Is sicklied o'er with the pale cast of thought," + "And enterprises of great pith and moment" + "With this regard their currents turn awry" + "And lose the name of action.- Soft you now!" + "The fair Ophelia!- Nymph, in thy orisons" + "Be all my sins rememb'red."; + +namespace grpc { +namespace testing { + +class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { + public: + static void BidiStream_Sender(ServerReaderWriter<EchoResponse, EchoRequest>* stream, std::atomic<bool>* should_exit) { + EchoResponse response; + response.set_message(kLargeString); + while (!should_exit->load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + stream->Write(response); + } + } + + // Only implement the one method we will be calling for brevity. + Status BidiStream(ServerContext* context, + ServerReaderWriter<EchoResponse, EchoRequest>* stream) + GRPC_OVERRIDE { + EchoRequest request; + std::atomic<bool> should_exit(false); + std::thread sender(std::bind(&TestServiceImpl::BidiStream_Sender, stream, &should_exit)); + + while (stream->Read(&request)) { + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + } + should_exit.store(true); + sender.join(); + return Status::OK; + } +}; + +class End2endTest : public ::testing::Test { + protected: + void SetUp() GRPC_OVERRIDE { + int port = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port; + // Setup server + ServerBuilder builder; + builder.AddListeningPort(server_address_.str(), + InsecureServerCredentials()); + builder.RegisterService(&service_); + server_ = builder.BuildAndStart(); + } + + void TearDown() GRPC_OVERRIDE { server_->Shutdown(); } + + void ResetStub() { + std::shared_ptr<Channel> channel = CreateChannel( + server_address_.str(), InsecureCredentials()); + stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); + } + + std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_; + std::unique_ptr<Server> server_; + std::ostringstream server_address_; + TestServiceImpl service_; +}; + +static void Drainer(ClientReaderWriter<EchoRequest, EchoResponse>* reader) { + EchoResponse response; + while (reader->Read(&response)) { + // Just drain out the responses as fast as possible. + } +} + +TEST_F(End2endTest, StreamingThroughput) { + ResetStub(); + grpc::ClientContext context; + auto stream = stub_->BidiStream(&context); + + auto reader = stream.get(); + std::thread receiver(std::bind(Drainer, reader)); + + for (int i = 0; i < 10000; i++) { + EchoRequest request; + request.set_message(kLargeString); + ASSERT_TRUE(stream->Write(request)); + if (i % 1000 == 0) { + gpr_log(GPR_INFO, "Send count = %d", i); + } + } + stream->WritesDone(); + receiver.join(); +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index 2a16481972..75a07d89c5 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -40,11 +40,9 @@ #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> -#include <grpc++/credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> #include <grpc++/server_context.h> -#include <grpc++/server_credentials.h> #include <gtest/gtest.h> #include "test/core/util/port.h" @@ -193,7 +191,7 @@ class End2endTest : public ::testing::Test { void ResetStub() { std::shared_ptr<Channel> channel = CreateChannel(server_address_.str(), InsecureCredentials()); - stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); + stub_ = grpc::cpp::test::util::TestService::NewStub(channel); } std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_; diff --git a/test/cpp/end2end/zookeeper_test.cc b/test/cpp/end2end/zookeeper_test.cc index 931541ca34..d4c7f0489f 100644 --- a/test/cpp/end2end/zookeeper_test.cc +++ b/test/cpp/end2end/zookeeper_test.cc @@ -34,11 +34,9 @@ #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> -#include <grpc++/credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> #include <grpc++/server_context.h> -#include <grpc++/server_credentials.h> #include <gtest/gtest.h> #include <grpc/grpc.h> #include <grpc/grpc_zookeeper.h> |