aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end/thread_stress_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/end2end/thread_stress_test.cc')
-rw-r--r--test/cpp/end2end/thread_stress_test.cc117
1 files changed, 56 insertions, 61 deletions
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc
index ccf8400a87..1a5ed28a2c 100644
--- a/test/cpp/end2end/thread_stress_test.cc
+++ b/test/cpp/end2end/thread_stress_test.cc
@@ -16,6 +16,7 @@
*
*/
+#include <cinttypes>
#include <mutex>
#include <thread>
@@ -24,6 +25,7 @@
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
+#include <grpcpp/resource_quota.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
@@ -51,63 +53,13 @@ namespace testing {
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
public:
- TestServiceImpl() : signal_client_(false) {}
+ TestServiceImpl() {}
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
response->set_message(request->message());
return Status::OK;
}
-
- // Unimplemented is left unimplemented to test the returned error.
-
- Status RequestStream(ServerContext* context,
- ServerReader<EchoRequest>* reader,
- EchoResponse* response) override {
- EchoRequest request;
- response->set_message("");
- while (reader->Read(&request)) {
- response->mutable_message()->append(request.message());
- }
- return Status::OK;
- }
-
- // Return 3 messages.
- // TODO(yangg) make it generic by adding a parameter into EchoRequest
- Status ResponseStream(ServerContext* context, const EchoRequest* request,
- ServerWriter<EchoResponse>* writer) override {
- EchoResponse response;
- response.set_message(request->message() + "0");
- writer->Write(response);
- response.set_message(request->message() + "1");
- writer->Write(response);
- response.set_message(request->message() + "2");
- writer->Write(response);
-
- return Status::OK;
- }
-
- Status BidiStream(
- ServerContext* context,
- ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
- EchoRequest request;
- EchoResponse response;
- while (stream->Read(&request)) {
- gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
- response.set_message(request.message());
- stream->Write(response);
- }
- return Status::OK;
- }
-
- bool signal_client() {
- std::unique_lock<std::mutex> lock(mu_);
- return signal_client_;
- }
-
- private:
- bool signal_client_;
- std::mutex mu_;
};
template <class Service>
@@ -118,6 +70,7 @@ class CommonStressTest {
virtual void SetUp() = 0;
virtual void TearDown() = 0;
virtual void ResetStub() = 0;
+ virtual bool AllowExhaustion() = 0;
grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
protected:
@@ -146,6 +99,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
CreateChannel(server_address_.str(), InsecureChannelCredentials());
this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
+ bool AllowExhaustion() override { return false; }
protected:
void SetUpStart(ServerBuilder* builder, Service* service) override {
@@ -161,7 +115,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
std::ostringstream server_address_;
};
-template <class Service>
+template <class Service, bool allow_resource_exhaustion>
class CommonStressTestInproc : public CommonStressTest<Service> {
public:
void ResetStub() override {
@@ -169,6 +123,7 @@ class CommonStressTestInproc : public CommonStressTest<Service> {
std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args);
this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
+ bool AllowExhaustion() override { return allow_resource_exhaustion; }
protected:
void SetUpStart(ServerBuilder* builder, Service* service) override {
@@ -194,6 +149,26 @@ class CommonStressTestSyncServer : public BaseClass {
};
template <class BaseClass>
+class CommonStressTestSyncServerLowThreadCount : public BaseClass {
+ public:
+ void SetUp() override {
+ ServerBuilder builder;
+ ResourceQuota quota;
+ this->SetUpStart(&builder, &service_);
+ quota.SetMaxThreads(4);
+ builder.SetResourceQuota(quota);
+ this->SetUpEnd(&builder);
+ }
+ void TearDown() override {
+ this->TearDownStart();
+ this->TearDownEnd();
+ }
+
+ private:
+ TestServiceImpl service_;
+};
+
+template <class BaseClass>
class CommonStressTestAsyncServer : public BaseClass {
public:
CommonStressTestAsyncServer() : contexts_(kNumAsyncServerThreads * 100) {}
@@ -293,7 +268,8 @@ class End2endTest : public ::testing::Test {
Common common_;
};
-static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
+static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
+ bool allow_exhaustion, gpr_atm* errors) {
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@@ -301,34 +277,53 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
for (int i = 0; i < num_rpcs; ++i) {
ClientContext context;
Status s = stub->Echo(&context, request, &response);
- EXPECT_EQ(response.message(), request.message());
+ EXPECT_TRUE(s.ok() || (allow_exhaustion &&
+ s.error_code() == StatusCode::RESOURCE_EXHAUSTED));
if (!s.ok()) {
- gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
- s.error_message().c_str());
+ if (!(allow_exhaustion &&
+ s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) {
+ gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
+ s.error_message().c_str());
+ }
+ gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1));
+ } else {
+ EXPECT_EQ(response.message(), request.message());
}
- ASSERT_TRUE(s.ok());
}
}
typedef ::testing::Types<
CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>,
- CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl>>,
+ CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>,
+ CommonStressTestSyncServerLowThreadCount<
+ CommonStressTestInproc<TestServiceImpl, true>>,
CommonStressTestAsyncServer<
CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>,
- CommonStressTestAsyncServer<
- CommonStressTestInproc<grpc::testing::EchoTestService::AsyncService>>>
+ CommonStressTestAsyncServer<CommonStressTestInproc<
+ grpc::testing::EchoTestService::AsyncService, false>>>
CommonTypes;
TYPED_TEST_CASE(End2endTest, CommonTypes);
TYPED_TEST(End2endTest, ThreadStress) {
this->common_.ResetStub();
std::vector<std::thread> threads;
+ gpr_atm errors;
+ gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0));
threads.reserve(kNumThreads);
for (int i = 0; i < kNumThreads; ++i) {
- threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs);
+ threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs,
+ this->common_.AllowExhaustion(), &errors);
}
for (int i = 0; i < kNumThreads; ++i) {
threads[i].join();
}
+ uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors));
+ if (error_cnt != 0) {
+ gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt);
+ }
+ // If this test allows resource exhaustion, expect that it actually sees some
+ if (this->common_.AllowExhaustion()) {
+ EXPECT_GT(error_cnt, static_cast<uint64_t>(0));
+ }
}
template <class Common>