diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-05-06 07:42:43 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-05-06 07:42:43 -0700 |
commit | 0c23320c8a967acc035ed05a93c6e709aaf9662c (patch) | |
tree | ea557ca733a5da343053b945db01980ffe8529ae /test/cpp | |
parent | 0e17857ab50339f9724c75aa769eb3dee6c6667f (diff) |
Split thread stress from end2end to improve parallelism
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 12 | ||||
-rw-r--r-- | test/cpp/end2end/thread_stress_test.cc | 242 |
2 files changed, 242 insertions, 12 deletions
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 0945ed269d..f35b16fe55 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -577,18 +577,6 @@ TEST_F(End2endTest, ClientCancelsBidi) { EXPECT_EQ(grpc::StatusCode::CANCELLED, s.code()); } -TEST_F(End2endTest, ThreadStress) { - ResetStub(); - std::vector<std::thread*> threads; - for (int i = 0; i < 100; ++i) { - threads.push_back(new std::thread(SendRpc, stub_.get(), 1000)); - } - for (int i = 0; i < 100; ++i) { - threads[i]->join(); - delete threads[i]; - } -} - TEST_F(End2endTest, RpcMaxMessageSize) { ResetStub(); EchoRequest request; diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc new file mode 100644 index 0000000000..12656128c0 --- /dev/null +++ b/test/cpp/end2end/thread_stress_test.cc @@ -0,0 +1,242 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <thread> + +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/util/echo_duplicate.grpc.pb.h" +#include "test/cpp/util/echo.grpc.pb.h" +#include "src/cpp/server/thread_pool.h" +#include <grpc++/channel_arguments.h> +#include <grpc++/channel_interface.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/credentials.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> +#include <grpc++/server_credentials.h> +#include <grpc++/status.h> +#include <grpc++/stream.h> +#include <grpc++/time.h> +#include <gtest/gtest.h> + +#include <grpc/grpc.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> + +using grpc::cpp::test::util::EchoRequest; +using grpc::cpp::test::util::EchoResponse; +using std::chrono::system_clock; + +namespace grpc { +namespace testing { + +namespace { + +// When echo_deadline is requested, deadline seen in the ServerContext is set in +// the response in seconds. +void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, + EchoResponse* response) { + if (request->has_param() && request->param().echo_deadline()) { + gpr_timespec deadline = gpr_inf_future; + if (context->deadline() != system_clock::time_point::max()) { + Timepoint2Timespec(context->deadline(), &deadline); + } + response->mutable_param()->set_request_deadline(deadline.tv_sec); + } +} + +} // namespace + +class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { + public: + TestServiceImpl() : signal_client_(false) {} + + Status Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response) GRPC_OVERRIDE { + response->set_message(request->message()); + MaybeEchoDeadline(context, request, response); + if (request->has_param() && request->param().client_cancel_after_us()) { + { + std::unique_lock<std::mutex> lock(mu_); + signal_client_ = true; + } + while (!context->IsCancelled()) { + std::this_thread::sleep_for(std::chrono::microseconds( + request->param().client_cancel_after_us())); + } + return Status::Cancelled; + } else if (request->has_param() && + request->param().server_cancel_after_us()) { + std::this_thread::sleep_for( + std::chrono::microseconds(request->param().server_cancel_after_us())); + return Status::Cancelled; + } else { + EXPECT_FALSE(context->IsCancelled()); + } + return Status::OK; + } + + // Unimplemented is left unimplemented to test the returned error. + + Status RequestStream(ServerContext* context, + ServerReader<EchoRequest>* reader, + EchoResponse* response) GRPC_OVERRIDE { + EchoRequest request; + response->set_message(""); + while (reader->Read(&request)) { + response->mutable_message()->append(request.message()); + } + return Status::OK; + } + + // Return 3 messages. + // TODO(yangg) make it generic by adding a parameter into EchoRequest + Status ResponseStream(ServerContext* context, const EchoRequest* request, + ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE { + EchoResponse response; + response.set_message(request->message() + "0"); + writer->Write(response); + response.set_message(request->message() + "1"); + writer->Write(response); + response.set_message(request->message() + "2"); + writer->Write(response); + + return Status::OK; + } + + Status BidiStream(ServerContext* context, + ServerReaderWriter<EchoResponse, EchoRequest>* stream) + GRPC_OVERRIDE { + EchoRequest request; + EchoResponse response; + while (stream->Read(&request)) { + gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); + response.set_message(request.message()); + stream->Write(response); + } + return Status::OK; + } + + bool signal_client() { + std::unique_lock<std::mutex> lock(mu_); + return signal_client_; + } + + private: + bool signal_client_; + std::mutex mu_; +}; + +class TestServiceImplDupPkg + : public ::grpc::cpp::test::util::duplicate::TestService::Service { + public: + Status Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response) GRPC_OVERRIDE { + response->set_message("no package"); + return Status::OK; + } +}; + +class End2endTest : public ::testing::Test { + protected: + End2endTest() : kMaxMessageSize_(8192), thread_pool_(2) {} + + void SetUp() GRPC_OVERRIDE { + int port = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port; + // Setup server + ServerBuilder builder; + builder.AddListeningPort(server_address_.str(), + InsecureServerCredentials()); + builder.RegisterService(&service_); + builder.SetMaxMessageSize( + kMaxMessageSize_); // For testing max message size. + builder.RegisterService(&dup_pkg_service_); + builder.SetThreadPool(&thread_pool_); + server_ = builder.BuildAndStart(); + } + + void TearDown() GRPC_OVERRIDE { server_->Shutdown(); } + + void ResetStub() { + std::shared_ptr<ChannelInterface> channel = CreateChannel( + server_address_.str(), InsecureCredentials(), ChannelArguments()); + stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); + } + + std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_; + std::unique_ptr<Server> server_; + std::ostringstream server_address_; + const int kMaxMessageSize_; + TestServiceImpl service_; + TestServiceImplDupPkg dup_pkg_service_; + ThreadPool thread_pool_; +}; + +static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub, + int num_rpcs) { + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + + for (int i = 0; i < num_rpcs; ++i) { + ClientContext context; + Status s = stub->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.IsOk()); + } +} + +TEST_F(End2endTest, ThreadStress) { + ResetStub(); + std::vector<std::thread*> threads; + for (int i = 0; i < 100; ++i) { + threads.push_back(new std::thread(SendRpc, stub_.get(), 1000)); + } + for (int i = 0; i < 100; ++i) { + threads[i]->join(); + delete threads[i]; + } +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |