diff options
author | 2018-01-12 10:16:22 +0100 | |
---|---|---|
committer | 2018-01-12 10:16:22 +0100 | |
commit | c9ec2c0888271491eaf425721a72736392f85945 (patch) | |
tree | 8ee3fe7e6fe56bed7bbfa7c8537add5f24afe12a /test/cpp/end2end/thread_stress_test.cc | |
parent | b0b4555f4ca720e626f42855f2257bf598e1bf74 (diff) |
Revert "Stop using std::thread in C++ library since it can trigger exceptions"
Diffstat (limited to 'test/cpp/end2end/thread_stress_test.cc')
-rw-r--r-- | test/cpp/end2end/thread_stress_test.cc | 157 |
1 files changed, 61 insertions, 96 deletions
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index fd43c8f584..90b2eddbbb 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -26,7 +26,6 @@ #include <grpc++/server_builder.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> -#include <grpc/support/atm.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> @@ -53,13 +52,63 @@ namespace testing { class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { public: - TestServiceImpl() {} + TestServiceImpl() : signal_client_(false) {} Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) override { response->set_message(request->message()); return Status::OK; } + + // Unimplemented is left unimplemented to test the returned error. + + Status RequestStream(ServerContext* context, + ServerReader<EchoRequest>* reader, + EchoResponse* response) override { + EchoRequest request; + response->set_message(""); + while (reader->Read(&request)) { + response->mutable_message()->append(request.message()); + } + return Status::OK; + } + + // Return 3 messages. + // TODO(yangg) make it generic by adding a parameter into EchoRequest + Status ResponseStream(ServerContext* context, const EchoRequest* request, + ServerWriter<EchoResponse>* writer) override { + EchoResponse response; + response.set_message(request->message() + "0"); + writer->Write(response); + response.set_message(request->message() + "1"); + writer->Write(response); + response.set_message(request->message() + "2"); + writer->Write(response); + + return Status::OK; + } + + Status BidiStream( + ServerContext* context, + ServerReaderWriter<EchoResponse, EchoRequest>* stream) override { + EchoRequest request; + EchoResponse response; + while (stream->Read(&request)) { + gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); + response.set_message(request.message()); + stream->Write(response); + } + return Status::OK; + } + + bool signal_client() { + std::unique_lock<std::mutex> lock(mu_); + return signal_client_; + } + + private: + bool signal_client_; + std::mutex mu_; }; template <class Service> @@ -70,15 +119,10 @@ class CommonStressTest { virtual void SetUp() = 0; virtual void TearDown() = 0; virtual void ResetStub() = 0; - virtual bool AllowExhaustion() = 0; grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); } protected: std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; - // Some tests use a custom thread creator. This should be declared before the - // server so that it's destructor happens after the server - std::unique_ptr<ServerBuilderThreadCreatorOverrideTest> creator_; - std::unique_ptr<Server> server_; virtual void SetUpStart(ServerBuilder* builder, Service* service) = 0; @@ -103,7 +147,6 @@ class CommonStressTestInsecure : public CommonStressTest<Service> { CreateChannel(server_address_.str(), InsecureChannelCredentials()); this->stub_ = grpc::testing::EchoTestService::NewStub(channel); } - bool AllowExhaustion() override { return false; } protected: void SetUpStart(ServerBuilder* builder, Service* service) override { @@ -119,7 +162,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> { std::ostringstream server_address_; }; -template <class Service, bool allow_resource_exhaustion> +template <class Service> class CommonStressTestInproc : public CommonStressTest<Service> { public: void ResetStub() override { @@ -127,7 +170,6 @@ class CommonStressTestInproc : public CommonStressTest<Service> { std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args); this->stub_ = grpc::testing::EchoTestService::NewStub(channel); } - bool AllowExhaustion() override { return allow_resource_exhaustion; } protected: void SetUpStart(ServerBuilder* builder, Service* service) override { @@ -152,67 +194,6 @@ class CommonStressTestSyncServer : public BaseClass { TestServiceImpl service_; }; -class ServerBuilderThreadCreatorOverrideTest { - public: - ServerBuilderThreadCreatorOverrideTest(ServerBuilder* builder, size_t limit) - : limit_(limit), threads_(0) { - builder->SetThreadFunctions( - [this](gpr_thd_id* id, const char* name, void (*f)(void*), void* arg, - const gpr_thd_options* options) -> int { - std::unique_lock<std::mutex> l(mu_); - if (threads_ < limit_) { - l.unlock(); - if (gpr_thd_new(id, name, f, arg, options) != 0) { - l.lock(); - threads_++; - return 1; - } - } - return 0; - }, - [this](gpr_thd_id id) { - gpr_thd_join(id); - std::unique_lock<std::mutex> l(mu_); - threads_--; - if (threads_ == 0) { - done_.notify_one(); - } - }); - } - ~ServerBuilderThreadCreatorOverrideTest() { - // Don't allow destruction until all threads are really done and uncounted - std::unique_lock<std::mutex> l(mu_); - done_.wait(l, [this] { return (threads_ == 0); }); - } - - private: - size_t limit_; - size_t threads_; - std::mutex mu_; - std::condition_variable done_; -}; - -template <class BaseClass> -class CommonStressTestSyncServerLowThreadCount : public BaseClass { - public: - void SetUp() override { - ServerBuilder builder; - this->SetUpStart(&builder, &service_); - builder.SetSyncServerOption(ServerBuilder::SyncServerOption::MIN_POLLERS, - 1); - this->creator_.reset( - new ServerBuilderThreadCreatorOverrideTest(&builder, 4)); - this->SetUpEnd(&builder); - } - void TearDown() override { - this->TearDownStart(); - this->TearDownEnd(); - } - - private: - TestServiceImpl service_; -}; - template <class BaseClass> class CommonStressTestAsyncServer : public BaseClass { public: @@ -313,8 +294,7 @@ class End2endTest : public ::testing::Test { Common common_; }; -static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs, - bool allow_exhaustion, gpr_atm* errors) { +static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -322,48 +302,33 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs, for (int i = 0; i < num_rpcs; ++i) { ClientContext context; Status s = stub->Echo(&context, request, &response); - EXPECT_TRUE(s.ok() || (allow_exhaustion && - s.error_code() == StatusCode::RESOURCE_EXHAUSTED)); + EXPECT_EQ(response.message(), request.message()); if (!s.ok()) { - if (!(allow_exhaustion && - s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) { - gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), - s.error_message().c_str()); - } - gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1)); - } else { - EXPECT_EQ(response.message(), request.message()); + gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), + s.error_message().c_str()); } + ASSERT_TRUE(s.ok()); } } typedef ::testing::Types< CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>, - CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>, - CommonStressTestSyncServerLowThreadCount< - CommonStressTestInproc<TestServiceImpl, true>>, + CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl>>, CommonStressTestAsyncServer< CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>, - CommonStressTestAsyncServer<CommonStressTestInproc< - grpc::testing::EchoTestService::AsyncService, false>>> + CommonStressTestAsyncServer< + CommonStressTestInproc<grpc::testing::EchoTestService::AsyncService>>> CommonTypes; TYPED_TEST_CASE(End2endTest, CommonTypes); TYPED_TEST(End2endTest, ThreadStress) { this->common_.ResetStub(); std::vector<std::thread> threads; - gpr_atm errors; - gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0)); for (int i = 0; i < kNumThreads; ++i) { - threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs, - this->common_.AllowExhaustion(), &errors); + threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs); } for (int i = 0; i < kNumThreads; ++i) { threads[i].join(); } - uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors)); - if (error_cnt != 0) { - gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt); - } } template <class Common> |