diff options
Diffstat (limited to 'test/cpp/end2end/thread_stress_test.cc')
-rw-r--r-- | test/cpp/end2end/thread_stress_test.cc | 91 |
1 files changed, 43 insertions, 48 deletions
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index 0b9d4cda9f..d353f9894b 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -86,12 +86,12 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { TestServiceImpl() : signal_client_(false) {} Status Echo(ServerContext* context, const EchoRequest* request, - EchoResponse* response) GRPC_OVERRIDE { + EchoResponse* response) override { response->set_message(request->message()); MaybeEchoDeadline(context, request, response); if (request->has_param() && request->param().client_cancel_after_us()) { { - unique_lock<mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); signal_client_ = true; } while (!context->IsCancelled()) { @@ -118,7 +118,7 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { Status RequestStream(ServerContext* context, ServerReader<EchoRequest>* reader, - EchoResponse* response) GRPC_OVERRIDE { + EchoResponse* response) override { EchoRequest request; response->set_message(""); while (reader->Read(&request)) { @@ -130,7 +130,7 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { // Return 3 messages. // TODO(yangg) make it generic by adding a parameter into EchoRequest Status ResponseStream(ServerContext* context, const EchoRequest* request, - ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE { + ServerWriter<EchoResponse>* writer) override { EchoResponse response; response.set_message(request->message() + "0"); writer->Write(response); @@ -142,9 +142,9 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { return Status::OK; } - Status BidiStream(ServerContext* context, - ServerReaderWriter<EchoResponse, EchoRequest>* stream) - GRPC_OVERRIDE { + Status BidiStream( + ServerContext* context, + ServerReaderWriter<EchoResponse, EchoRequest>* stream) override { EchoRequest request; EchoResponse response; while (stream->Read(&request)) { @@ -156,20 +156,20 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { } bool signal_client() { - unique_lock<mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); return signal_client_; } private: bool signal_client_; - mutex mu_; + std::mutex mu_; }; class TestServiceImplDupPkg : public ::grpc::testing::duplicate::EchoTestService::Service { public: Status Echo(ServerContext* context, const EchoRequest* request, - EchoResponse* response) GRPC_OVERRIDE { + EchoResponse* response) override { response->set_message("no package"); return Status::OK; } @@ -215,12 +215,12 @@ class CommonStressTest { class CommonStressTestSyncServer : public CommonStressTest<TestServiceImpl> { public: - void SetUp() GRPC_OVERRIDE { + void SetUp() override { ServerBuilder builder; SetUpStart(&builder, &service_); SetUpEnd(&builder); } - void TearDown() GRPC_OVERRIDE { + void TearDown() override { TearDownStart(); TearDownEnd(); } @@ -232,32 +232,31 @@ class CommonStressTestSyncServer : public CommonStressTest<TestServiceImpl> { class CommonStressTestAsyncServer : public CommonStressTest<grpc::testing::EchoTestService::AsyncService> { public: - void SetUp() GRPC_OVERRIDE { + CommonStressTestAsyncServer() : contexts_(kNumAsyncServerThreads * 100) {} + void SetUp() override { shutting_down_ = false; ServerBuilder builder; SetUpStart(&builder, &service_); cq_ = builder.AddCompletionQueue(); SetUpEnd(&builder); - contexts_ = new Context[kNumAsyncServerThreads * 100]; for (int i = 0; i < kNumAsyncServerThreads * 100; i++) { RefreshContext(i); } for (int i = 0; i < kNumAsyncServerThreads; i++) { - server_threads_.push_back( - new std::thread(&CommonStressTestAsyncServer::ProcessRpcs, this)); + server_threads_.emplace_back(&CommonStressTestAsyncServer::ProcessRpcs, + this); } } - void TearDown() GRPC_OVERRIDE { + void TearDown() override { { - unique_lock<mutex> l(mu_); + std::unique_lock<std::mutex> l(mu_); TearDownStart(); shutting_down_ = true; cq_->Shutdown(); } for (int i = 0; i < kNumAsyncServerThreads; i++) { - server_threads_[i]->join(); - delete server_threads_[i]; + server_threads_[i].join(); } void* ignored_tag; @@ -265,7 +264,6 @@ class CommonStressTestAsyncServer while (cq_->Next(&ignored_tag, &ignored_ok)) ; TearDownEnd(); - delete[] contexts_; } private: @@ -292,7 +290,7 @@ class CommonStressTestAsyncServer } } void RefreshContext(int i) { - unique_lock<mutex> l(mu_); + std::unique_lock<std::mutex> l(mu_); if (!shutting_down_) { contexts_[i].state = Context::READY; contexts_[i].srv_ctx.reset(new ServerContext); @@ -311,20 +309,21 @@ class CommonStressTestAsyncServer response_writer; EchoRequest recv_request; enum { READY, DONE } state; - } * contexts_; + }; + std::vector<Context> contexts_; ::grpc::testing::EchoTestService::AsyncService service_; std::unique_ptr<ServerCompletionQueue> cq_; bool shutting_down_; - mutex mu_; - std::vector<std::thread*> server_threads_; + std::mutex mu_; + std::vector<std::thread> server_threads_; }; template <class Common> class End2endTest : public ::testing::Test { protected: End2endTest() {} - void SetUp() GRPC_OVERRIDE { common_.SetUp(); } - void TearDown() GRPC_OVERRIDE { common_.TearDown(); } + void SetUp() override { common_.SetUp(); } + void TearDown() override { common_.TearDown(); } void ResetStub() { common_.ResetStub(); } Common common_; @@ -353,14 +352,12 @@ typedef ::testing::Types<CommonStressTestSyncServer, TYPED_TEST_CASE(End2endTest, CommonTypes); TYPED_TEST(End2endTest, ThreadStress) { this->common_.ResetStub(); - std::vector<std::thread*> threads; + std::vector<std::thread> threads; for (int i = 0; i < kNumThreads; ++i) { - threads.push_back( - new std::thread(SendRpc, this->common_.GetStub(), kNumRpcs)); + threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs); } for (int i = 0; i < kNumThreads; ++i) { - threads[i]->join(); - delete threads[i]; + threads[i].join(); } } @@ -369,8 +366,8 @@ class AsyncClientEnd2endTest : public ::testing::Test { protected: AsyncClientEnd2endTest() : rpcs_outstanding_(0) {} - void SetUp() GRPC_OVERRIDE { common_.SetUp(); } - void TearDown() GRPC_OVERRIDE { + void SetUp() override { common_.SetUp(); } + void TearDown() override { void* ignored_tag; bool ignored_ok; while (cq_.Next(&ignored_tag, &ignored_ok)) @@ -379,7 +376,7 @@ class AsyncClientEnd2endTest : public ::testing::Test { } void Wait() { - unique_lock<mutex> l(mu_); + std::unique_lock<std::mutex> l(mu_); while (rpcs_outstanding_ != 0) { cv_.wait(l); } @@ -404,7 +401,7 @@ class AsyncClientEnd2endTest : public ::testing::Test { call->response_reader->Finish(&call->response, &call->status, (void*)call); - unique_lock<mutex> l(mu_); + std::unique_lock<std::mutex> l(mu_); rpcs_outstanding_++; } } @@ -422,7 +419,7 @@ class AsyncClientEnd2endTest : public ::testing::Test { bool notify; { - unique_lock<mutex> l(mu_); + std::unique_lock<std::mutex> l(mu_); rpcs_outstanding_--; notify = (rpcs_outstanding_ == 0); } @@ -434,34 +431,32 @@ class AsyncClientEnd2endTest : public ::testing::Test { Common common_; CompletionQueue cq_; - mutex mu_; - condition_variable cv_; + std::mutex mu_; + std::condition_variable cv_; int rpcs_outstanding_; }; TYPED_TEST_CASE(AsyncClientEnd2endTest, CommonTypes); TYPED_TEST(AsyncClientEnd2endTest, ThreadStress) { this->common_.ResetStub(); - std::vector<std::thread *> send_threads, completion_threads; + std::vector<std::thread> send_threads, completion_threads; for (int i = 0; i < kNumAsyncReceiveThreads; ++i) { - completion_threads.push_back(new std::thread( + completion_threads.emplace_back( &AsyncClientEnd2endTest_ThreadStress_Test<TypeParam>::AsyncCompleteRpc, - this)); + this); } for (int i = 0; i < kNumAsyncSendThreads; ++i) { - send_threads.push_back(new std::thread( + send_threads.emplace_back( &AsyncClientEnd2endTest_ThreadStress_Test<TypeParam>::AsyncSendRpc, - this, kNumRpcs)); + this, kNumRpcs); } for (int i = 0; i < kNumAsyncSendThreads; ++i) { - send_threads[i]->join(); - delete send_threads[i]; + send_threads[i].join(); } this->Wait(); for (int i = 0; i < kNumAsyncReceiveThreads; ++i) { - completion_threads[i]->join(); - delete completion_threads[i]; + completion_threads[i].join(); } } |