aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end/thread_stress_test.cc
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2018-01-12 10:16:22 +0100
committerGravatar GitHub <noreply@github.com>2018-01-12 10:16:22 +0100
commitc9ec2c0888271491eaf425721a72736392f85945 (patch)
tree8ee3fe7e6fe56bed7bbfa7c8537add5f24afe12a /test/cpp/end2end/thread_stress_test.cc
parentb0b4555f4ca720e626f42855f2257bf598e1bf74 (diff)
Revert "Stop using std::thread in C++ library since it can trigger exceptions"
Diffstat (limited to 'test/cpp/end2end/thread_stress_test.cc')
-rw-r--r--test/cpp/end2end/thread_stress_test.cc157
1 files changed, 61 insertions, 96 deletions
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc
index fd43c8f584..90b2eddbbb 100644
--- a/test/cpp/end2end/thread_stress_test.cc
+++ b/test/cpp/end2end/thread_stress_test.cc
@@ -26,7 +26,6 @@
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
-#include <grpc/support/atm.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
@@ -53,13 +52,63 @@ namespace testing {
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
public:
- TestServiceImpl() {}
+ TestServiceImpl() : signal_client_(false) {}
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
response->set_message(request->message());
return Status::OK;
}
+
+ // Unimplemented is left unimplemented to test the returned error.
+
+ Status RequestStream(ServerContext* context,
+ ServerReader<EchoRequest>* reader,
+ EchoResponse* response) override {
+ EchoRequest request;
+ response->set_message("");
+ while (reader->Read(&request)) {
+ response->mutable_message()->append(request.message());
+ }
+ return Status::OK;
+ }
+
+ // Return 3 messages.
+ // TODO(yangg) make it generic by adding a parameter into EchoRequest
+ Status ResponseStream(ServerContext* context, const EchoRequest* request,
+ ServerWriter<EchoResponse>* writer) override {
+ EchoResponse response;
+ response.set_message(request->message() + "0");
+ writer->Write(response);
+ response.set_message(request->message() + "1");
+ writer->Write(response);
+ response.set_message(request->message() + "2");
+ writer->Write(response);
+
+ return Status::OK;
+ }
+
+ Status BidiStream(
+ ServerContext* context,
+ ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
+ EchoRequest request;
+ EchoResponse response;
+ while (stream->Read(&request)) {
+ gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
+ response.set_message(request.message());
+ stream->Write(response);
+ }
+ return Status::OK;
+ }
+
+ bool signal_client() {
+ std::unique_lock<std::mutex> lock(mu_);
+ return signal_client_;
+ }
+
+ private:
+ bool signal_client_;
+ std::mutex mu_;
};
template <class Service>
@@ -70,15 +119,10 @@ class CommonStressTest {
virtual void SetUp() = 0;
virtual void TearDown() = 0;
virtual void ResetStub() = 0;
- virtual bool AllowExhaustion() = 0;
grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
protected:
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
- // Some tests use a custom thread creator. This should be declared before the
- // server so that it's destructor happens after the server
- std::unique_ptr<ServerBuilderThreadCreatorOverrideTest> creator_;
-
std::unique_ptr<Server> server_;
virtual void SetUpStart(ServerBuilder* builder, Service* service) = 0;
@@ -103,7 +147,6 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
CreateChannel(server_address_.str(), InsecureChannelCredentials());
this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
- bool AllowExhaustion() override { return false; }
protected:
void SetUpStart(ServerBuilder* builder, Service* service) override {
@@ -119,7 +162,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
std::ostringstream server_address_;
};
-template <class Service, bool allow_resource_exhaustion>
+template <class Service>
class CommonStressTestInproc : public CommonStressTest<Service> {
public:
void ResetStub() override {
@@ -127,7 +170,6 @@ class CommonStressTestInproc : public CommonStressTest<Service> {
std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args);
this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
- bool AllowExhaustion() override { return allow_resource_exhaustion; }
protected:
void SetUpStart(ServerBuilder* builder, Service* service) override {
@@ -152,67 +194,6 @@ class CommonStressTestSyncServer : public BaseClass {
TestServiceImpl service_;
};
-class ServerBuilderThreadCreatorOverrideTest {
- public:
- ServerBuilderThreadCreatorOverrideTest(ServerBuilder* builder, size_t limit)
- : limit_(limit), threads_(0) {
- builder->SetThreadFunctions(
- [this](gpr_thd_id* id, const char* name, void (*f)(void*), void* arg,
- const gpr_thd_options* options) -> int {
- std::unique_lock<std::mutex> l(mu_);
- if (threads_ < limit_) {
- l.unlock();
- if (gpr_thd_new(id, name, f, arg, options) != 0) {
- l.lock();
- threads_++;
- return 1;
- }
- }
- return 0;
- },
- [this](gpr_thd_id id) {
- gpr_thd_join(id);
- std::unique_lock<std::mutex> l(mu_);
- threads_--;
- if (threads_ == 0) {
- done_.notify_one();
- }
- });
- }
- ~ServerBuilderThreadCreatorOverrideTest() {
- // Don't allow destruction until all threads are really done and uncounted
- std::unique_lock<std::mutex> l(mu_);
- done_.wait(l, [this] { return (threads_ == 0); });
- }
-
- private:
- size_t limit_;
- size_t threads_;
- std::mutex mu_;
- std::condition_variable done_;
-};
-
-template <class BaseClass>
-class CommonStressTestSyncServerLowThreadCount : public BaseClass {
- public:
- void SetUp() override {
- ServerBuilder builder;
- this->SetUpStart(&builder, &service_);
- builder.SetSyncServerOption(ServerBuilder::SyncServerOption::MIN_POLLERS,
- 1);
- this->creator_.reset(
- new ServerBuilderThreadCreatorOverrideTest(&builder, 4));
- this->SetUpEnd(&builder);
- }
- void TearDown() override {
- this->TearDownStart();
- this->TearDownEnd();
- }
-
- private:
- TestServiceImpl service_;
-};
-
template <class BaseClass>
class CommonStressTestAsyncServer : public BaseClass {
public:
@@ -313,8 +294,7 @@ class End2endTest : public ::testing::Test {
Common common_;
};
-static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
- bool allow_exhaustion, gpr_atm* errors) {
+static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@@ -322,48 +302,33 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
for (int i = 0; i < num_rpcs; ++i) {
ClientContext context;
Status s = stub->Echo(&context, request, &response);
- EXPECT_TRUE(s.ok() || (allow_exhaustion &&
- s.error_code() == StatusCode::RESOURCE_EXHAUSTED));
+ EXPECT_EQ(response.message(), request.message());
if (!s.ok()) {
- if (!(allow_exhaustion &&
- s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) {
- gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
- s.error_message().c_str());
- }
- gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1));
- } else {
- EXPECT_EQ(response.message(), request.message());
+ gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
+ s.error_message().c_str());
}
+ ASSERT_TRUE(s.ok());
}
}
typedef ::testing::Types<
CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>,
- CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>,
- CommonStressTestSyncServerLowThreadCount<
- CommonStressTestInproc<TestServiceImpl, true>>,
+ CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl>>,
CommonStressTestAsyncServer<
CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>,
- CommonStressTestAsyncServer<CommonStressTestInproc<
- grpc::testing::EchoTestService::AsyncService, false>>>
+ CommonStressTestAsyncServer<
+ CommonStressTestInproc<grpc::testing::EchoTestService::AsyncService>>>
CommonTypes;
TYPED_TEST_CASE(End2endTest, CommonTypes);
TYPED_TEST(End2endTest, ThreadStress) {
this->common_.ResetStub();
std::vector<std::thread> threads;
- gpr_atm errors;
- gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0));
for (int i = 0; i < kNumThreads; ++i) {
- threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs,
- this->common_.AllowExhaustion(), &errors);
+ threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs);
}
for (int i = 0; i < kNumThreads; ++i) {
threads[i].join();
}
- uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors));
- if (error_cnt != 0) {
- gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt);
- }
}
template <class Common>