aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end/thread_stress_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/end2end/thread_stress_test.cc')
-rw-r--r--test/cpp/end2end/thread_stress_test.cc91
1 files changed, 43 insertions, 48 deletions
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc
index 0b9d4cda9f..d353f9894b 100644
--- a/test/cpp/end2end/thread_stress_test.cc
+++ b/test/cpp/end2end/thread_stress_test.cc
@@ -86,12 +86,12 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
TestServiceImpl() : signal_client_(false) {}
Status Echo(ServerContext* context, const EchoRequest* request,
- EchoResponse* response) GRPC_OVERRIDE {
+ EchoResponse* response) override {
response->set_message(request->message());
MaybeEchoDeadline(context, request, response);
if (request->has_param() && request->param().client_cancel_after_us()) {
{
- unique_lock<mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
signal_client_ = true;
}
while (!context->IsCancelled()) {
@@ -118,7 +118,7 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
Status RequestStream(ServerContext* context,
ServerReader<EchoRequest>* reader,
- EchoResponse* response) GRPC_OVERRIDE {
+ EchoResponse* response) override {
EchoRequest request;
response->set_message("");
while (reader->Read(&request)) {
@@ -130,7 +130,7 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
// Return 3 messages.
// TODO(yangg) make it generic by adding a parameter into EchoRequest
Status ResponseStream(ServerContext* context, const EchoRequest* request,
- ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE {
+ ServerWriter<EchoResponse>* writer) override {
EchoResponse response;
response.set_message(request->message() + "0");
writer->Write(response);
@@ -142,9 +142,9 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
return Status::OK;
}
- Status BidiStream(ServerContext* context,
- ServerReaderWriter<EchoResponse, EchoRequest>* stream)
- GRPC_OVERRIDE {
+ Status BidiStream(
+ ServerContext* context,
+ ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
EchoRequest request;
EchoResponse response;
while (stream->Read(&request)) {
@@ -156,20 +156,20 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
}
bool signal_client() {
- unique_lock<mutex> lock(mu_);
+ std::unique_lock<std::mutex> lock(mu_);
return signal_client_;
}
private:
bool signal_client_;
- mutex mu_;
+ std::mutex mu_;
};
class TestServiceImplDupPkg
: public ::grpc::testing::duplicate::EchoTestService::Service {
public:
Status Echo(ServerContext* context, const EchoRequest* request,
- EchoResponse* response) GRPC_OVERRIDE {
+ EchoResponse* response) override {
response->set_message("no package");
return Status::OK;
}
@@ -215,12 +215,12 @@ class CommonStressTest {
class CommonStressTestSyncServer : public CommonStressTest<TestServiceImpl> {
public:
- void SetUp() GRPC_OVERRIDE {
+ void SetUp() override {
ServerBuilder builder;
SetUpStart(&builder, &service_);
SetUpEnd(&builder);
}
- void TearDown() GRPC_OVERRIDE {
+ void TearDown() override {
TearDownStart();
TearDownEnd();
}
@@ -232,32 +232,31 @@ class CommonStressTestSyncServer : public CommonStressTest<TestServiceImpl> {
class CommonStressTestAsyncServer
: public CommonStressTest<grpc::testing::EchoTestService::AsyncService> {
public:
- void SetUp() GRPC_OVERRIDE {
+ CommonStressTestAsyncServer() : contexts_(kNumAsyncServerThreads * 100) {}
+ void SetUp() override {
shutting_down_ = false;
ServerBuilder builder;
SetUpStart(&builder, &service_);
cq_ = builder.AddCompletionQueue();
SetUpEnd(&builder);
- contexts_ = new Context[kNumAsyncServerThreads * 100];
for (int i = 0; i < kNumAsyncServerThreads * 100; i++) {
RefreshContext(i);
}
for (int i = 0; i < kNumAsyncServerThreads; i++) {
- server_threads_.push_back(
- new std::thread(&CommonStressTestAsyncServer::ProcessRpcs, this));
+ server_threads_.emplace_back(&CommonStressTestAsyncServer::ProcessRpcs,
+ this);
}
}
- void TearDown() GRPC_OVERRIDE {
+ void TearDown() override {
{
- unique_lock<mutex> l(mu_);
+ std::unique_lock<std::mutex> l(mu_);
TearDownStart();
shutting_down_ = true;
cq_->Shutdown();
}
for (int i = 0; i < kNumAsyncServerThreads; i++) {
- server_threads_[i]->join();
- delete server_threads_[i];
+ server_threads_[i].join();
}
void* ignored_tag;
@@ -265,7 +264,6 @@ class CommonStressTestAsyncServer
while (cq_->Next(&ignored_tag, &ignored_ok))
;
TearDownEnd();
- delete[] contexts_;
}
private:
@@ -292,7 +290,7 @@ class CommonStressTestAsyncServer
}
}
void RefreshContext(int i) {
- unique_lock<mutex> l(mu_);
+ std::unique_lock<std::mutex> l(mu_);
if (!shutting_down_) {
contexts_[i].state = Context::READY;
contexts_[i].srv_ctx.reset(new ServerContext);
@@ -311,20 +309,21 @@ class CommonStressTestAsyncServer
response_writer;
EchoRequest recv_request;
enum { READY, DONE } state;
- } * contexts_;
+ };
+ std::vector<Context> contexts_;
::grpc::testing::EchoTestService::AsyncService service_;
std::unique_ptr<ServerCompletionQueue> cq_;
bool shutting_down_;
- mutex mu_;
- std::vector<std::thread*> server_threads_;
+ std::mutex mu_;
+ std::vector<std::thread> server_threads_;
};
template <class Common>
class End2endTest : public ::testing::Test {
protected:
End2endTest() {}
- void SetUp() GRPC_OVERRIDE { common_.SetUp(); }
- void TearDown() GRPC_OVERRIDE { common_.TearDown(); }
+ void SetUp() override { common_.SetUp(); }
+ void TearDown() override { common_.TearDown(); }
void ResetStub() { common_.ResetStub(); }
Common common_;
@@ -353,14 +352,12 @@ typedef ::testing::Types<CommonStressTestSyncServer,
TYPED_TEST_CASE(End2endTest, CommonTypes);
TYPED_TEST(End2endTest, ThreadStress) {
this->common_.ResetStub();
- std::vector<std::thread*> threads;
+ std::vector<std::thread> threads;
for (int i = 0; i < kNumThreads; ++i) {
- threads.push_back(
- new std::thread(SendRpc, this->common_.GetStub(), kNumRpcs));
+ threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs);
}
for (int i = 0; i < kNumThreads; ++i) {
- threads[i]->join();
- delete threads[i];
+ threads[i].join();
}
}
@@ -369,8 +366,8 @@ class AsyncClientEnd2endTest : public ::testing::Test {
protected:
AsyncClientEnd2endTest() : rpcs_outstanding_(0) {}
- void SetUp() GRPC_OVERRIDE { common_.SetUp(); }
- void TearDown() GRPC_OVERRIDE {
+ void SetUp() override { common_.SetUp(); }
+ void TearDown() override {
void* ignored_tag;
bool ignored_ok;
while (cq_.Next(&ignored_tag, &ignored_ok))
@@ -379,7 +376,7 @@ class AsyncClientEnd2endTest : public ::testing::Test {
}
void Wait() {
- unique_lock<mutex> l(mu_);
+ std::unique_lock<std::mutex> l(mu_);
while (rpcs_outstanding_ != 0) {
cv_.wait(l);
}
@@ -404,7 +401,7 @@ class AsyncClientEnd2endTest : public ::testing::Test {
call->response_reader->Finish(&call->response, &call->status,
(void*)call);
- unique_lock<mutex> l(mu_);
+ std::unique_lock<std::mutex> l(mu_);
rpcs_outstanding_++;
}
}
@@ -422,7 +419,7 @@ class AsyncClientEnd2endTest : public ::testing::Test {
bool notify;
{
- unique_lock<mutex> l(mu_);
+ std::unique_lock<std::mutex> l(mu_);
rpcs_outstanding_--;
notify = (rpcs_outstanding_ == 0);
}
@@ -434,34 +431,32 @@ class AsyncClientEnd2endTest : public ::testing::Test {
Common common_;
CompletionQueue cq_;
- mutex mu_;
- condition_variable cv_;
+ std::mutex mu_;
+ std::condition_variable cv_;
int rpcs_outstanding_;
};
TYPED_TEST_CASE(AsyncClientEnd2endTest, CommonTypes);
TYPED_TEST(AsyncClientEnd2endTest, ThreadStress) {
this->common_.ResetStub();
- std::vector<std::thread *> send_threads, completion_threads;
+ std::vector<std::thread> send_threads, completion_threads;
for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
- completion_threads.push_back(new std::thread(
+ completion_threads.emplace_back(
&AsyncClientEnd2endTest_ThreadStress_Test<TypeParam>::AsyncCompleteRpc,
- this));
+ this);
}
for (int i = 0; i < kNumAsyncSendThreads; ++i) {
- send_threads.push_back(new std::thread(
+ send_threads.emplace_back(
&AsyncClientEnd2endTest_ThreadStress_Test<TypeParam>::AsyncSendRpc,
- this, kNumRpcs));
+ this, kNumRpcs);
}
for (int i = 0; i < kNumAsyncSendThreads; ++i) {
- send_threads[i]->join();
- delete send_threads[i];
+ send_threads[i].join();
}
this->Wait();
for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
- completion_threads[i]->join();
- delete completion_threads[i];
+ completion_threads[i].join();
}
}