aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2017-11-14 19:04:02 -0800
committerGravatar Vijay Pai <vpai@google.com>2018-01-08 10:02:38 -0800
commit5dd32268be62114e8a7c81d60c0dc2633fb83081 (patch)
tree5d97aa70dfc6ea09df7da9e7955866d7574cb1e3 /test
parent669900c7de64d5992c92a838e23097b27e09d0b5 (diff)
Switch C++ sync server to use gpr_thd rather than std::thread and provide resource exhaustion mechanism
Diffstat (limited to 'test')
-rw-r--r--test/cpp/end2end/thread_stress_test.cc157
-rw-r--r--test/cpp/thread_manager/BUILD31
-rw-r--r--test/cpp/thread_manager/thread_manager_test.cc8
3 files changed, 131 insertions, 65 deletions
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc
index 90b2eddbbb..fd43c8f584 100644
--- a/test/cpp/end2end/thread_stress_test.cc
+++ b/test/cpp/end2end/thread_stress_test.cc
@@ -26,6 +26,7 @@
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
+#include <grpc/support/atm.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
@@ -52,63 +53,13 @@ namespace testing {
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
public:
- TestServiceImpl() : signal_client_(false) {}
+ TestServiceImpl() {}
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
response->set_message(request->message());
return Status::OK;
}
-
- // Unimplemented is left unimplemented to test the returned error.
-
- Status RequestStream(ServerContext* context,
- ServerReader<EchoRequest>* reader,
- EchoResponse* response) override {
- EchoRequest request;
- response->set_message("");
- while (reader->Read(&request)) {
- response->mutable_message()->append(request.message());
- }
- return Status::OK;
- }
-
- // Return 3 messages.
- // TODO(yangg) make it generic by adding a parameter into EchoRequest
- Status ResponseStream(ServerContext* context, const EchoRequest* request,
- ServerWriter<EchoResponse>* writer) override {
- EchoResponse response;
- response.set_message(request->message() + "0");
- writer->Write(response);
- response.set_message(request->message() + "1");
- writer->Write(response);
- response.set_message(request->message() + "2");
- writer->Write(response);
-
- return Status::OK;
- }
-
- Status BidiStream(
- ServerContext* context,
- ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
- EchoRequest request;
- EchoResponse response;
- while (stream->Read(&request)) {
- gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
- response.set_message(request.message());
- stream->Write(response);
- }
- return Status::OK;
- }
-
- bool signal_client() {
- std::unique_lock<std::mutex> lock(mu_);
- return signal_client_;
- }
-
- private:
- bool signal_client_;
- std::mutex mu_;
};
template <class Service>
@@ -119,10 +70,15 @@ class CommonStressTest {
virtual void SetUp() = 0;
virtual void TearDown() = 0;
virtual void ResetStub() = 0;
+ virtual bool AllowExhaustion() = 0;
grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
protected:
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
+ // Some tests use a custom thread creator. This should be declared before the
+ // server so that it's destructor happens after the server
+ std::unique_ptr<ServerBuilderThreadCreatorOverrideTest> creator_;
+
std::unique_ptr<Server> server_;
virtual void SetUpStart(ServerBuilder* builder, Service* service) = 0;
@@ -147,6 +103,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
CreateChannel(server_address_.str(), InsecureChannelCredentials());
this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
+ bool AllowExhaustion() override { return false; }
protected:
void SetUpStart(ServerBuilder* builder, Service* service) override {
@@ -162,7 +119,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
std::ostringstream server_address_;
};
-template <class Service>
+template <class Service, bool allow_resource_exhaustion>
class CommonStressTestInproc : public CommonStressTest<Service> {
public:
void ResetStub() override {
@@ -170,6 +127,7 @@ class CommonStressTestInproc : public CommonStressTest<Service> {
std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args);
this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
+ bool AllowExhaustion() override { return allow_resource_exhaustion; }
protected:
void SetUpStart(ServerBuilder* builder, Service* service) override {
@@ -194,6 +152,67 @@ class CommonStressTestSyncServer : public BaseClass {
TestServiceImpl service_;
};
+class ServerBuilderThreadCreatorOverrideTest {
+ public:
+ ServerBuilderThreadCreatorOverrideTest(ServerBuilder* builder, size_t limit)
+ : limit_(limit), threads_(0) {
+ builder->SetThreadFunctions(
+ [this](gpr_thd_id* id, const char* name, void (*f)(void*), void* arg,
+ const gpr_thd_options* options) -> int {
+ std::unique_lock<std::mutex> l(mu_);
+ if (threads_ < limit_) {
+ l.unlock();
+ if (gpr_thd_new(id, name, f, arg, options) != 0) {
+ l.lock();
+ threads_++;
+ return 1;
+ }
+ }
+ return 0;
+ },
+ [this](gpr_thd_id id) {
+ gpr_thd_join(id);
+ std::unique_lock<std::mutex> l(mu_);
+ threads_--;
+ if (threads_ == 0) {
+ done_.notify_one();
+ }
+ });
+ }
+ ~ServerBuilderThreadCreatorOverrideTest() {
+ // Don't allow destruction until all threads are really done and uncounted
+ std::unique_lock<std::mutex> l(mu_);
+ done_.wait(l, [this] { return (threads_ == 0); });
+ }
+
+ private:
+ size_t limit_;
+ size_t threads_;
+ std::mutex mu_;
+ std::condition_variable done_;
+};
+
+template <class BaseClass>
+class CommonStressTestSyncServerLowThreadCount : public BaseClass {
+ public:
+ void SetUp() override {
+ ServerBuilder builder;
+ this->SetUpStart(&builder, &service_);
+ builder.SetSyncServerOption(ServerBuilder::SyncServerOption::MIN_POLLERS,
+ 1);
+ this->creator_.reset(
+ new ServerBuilderThreadCreatorOverrideTest(&builder, 4));
+ this->SetUpEnd(&builder);
+ }
+ void TearDown() override {
+ this->TearDownStart();
+ this->TearDownEnd();
+ }
+
+ private:
+ TestServiceImpl service_;
+};
+
template <class BaseClass>
class CommonStressTestAsyncServer : public BaseClass {
public:
@@ -294,7 +313,8 @@ class End2endTest : public ::testing::Test {
Common common_;
};
-static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
+static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
+ bool allow_exhaustion, gpr_atm* errors) {
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@@ -302,33 +322,48 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
for (int i = 0; i < num_rpcs; ++i) {
ClientContext context;
Status s = stub->Echo(&context, request, &response);
- EXPECT_EQ(response.message(), request.message());
+ EXPECT_TRUE(s.ok() || (allow_exhaustion &&
+ s.error_code() == StatusCode::RESOURCE_EXHAUSTED));
if (!s.ok()) {
- gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
- s.error_message().c_str());
+ if (!(allow_exhaustion &&
+ s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) {
+ gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
+ s.error_message().c_str());
+ }
+ gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1));
+ } else {
+ EXPECT_EQ(response.message(), request.message());
}
- ASSERT_TRUE(s.ok());
}
}
typedef ::testing::Types<
CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>,
- CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl>>,
+ CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>,
+ CommonStressTestSyncServerLowThreadCount<
+ CommonStressTestInproc<TestServiceImpl, true>>,
CommonStressTestAsyncServer<
CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>,
- CommonStressTestAsyncServer<
- CommonStressTestInproc<grpc::testing::EchoTestService::AsyncService>>>
+ CommonStressTestAsyncServer<CommonStressTestInproc<
+ grpc::testing::EchoTestService::AsyncService, false>>>
CommonTypes;
TYPED_TEST_CASE(End2endTest, CommonTypes);
TYPED_TEST(End2endTest, ThreadStress) {
this->common_.ResetStub();
std::vector<std::thread> threads;
+ gpr_atm errors;
+ gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0));
for (int i = 0; i < kNumThreads; ++i) {
- threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs);
+ threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs,
+ this->common_.AllowExhaustion(), &errors);
}
for (int i = 0; i < kNumThreads; ++i) {
threads[i].join();
}
+ uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors));
+ if (error_cnt != 0) {
+ gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt);
+ }
}
template <class Common>
diff --git a/test/cpp/thread_manager/BUILD b/test/cpp/thread_manager/BUILD
new file mode 100644
index 0000000000..1f0878770b
--- /dev/null
+++ b/test/cpp/thread_manager/BUILD
@@ -0,0 +1,31 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+licenses(["notice"]) # Apache v2
+
+load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
+
+grpc_package(name = "test/cpp/thread_manager")
+
+grpc_cc_test(
+ name = "thread_manager_test",
+ srcs = ["thread_manager_test.cc"],
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//:grpc++",
+ "//test/cpp/util:test_config",
+ ],
+)
+
diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc
index 8282d46694..d3d31f9dd9 100644
--- a/test/cpp/thread_manager/thread_manager_test.cc
+++ b/test/cpp/thread_manager/thread_manager_test.cc
@@ -20,10 +20,10 @@
#include <memory>
#include <string>
-#include <gflags/gflags.h>
#include <grpc++/grpc++.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
+#include <grpc/support/thd.h>
#include "src/cpp/thread_manager/thread_manager.h"
#include "test/cpp/util/test_config.h"
@@ -32,13 +32,13 @@ namespace grpc {
class ThreadManagerTest final : public grpc::ThreadManager {
public:
ThreadManagerTest()
- : ThreadManager(kMinPollers, kMaxPollers),
+ : ThreadManager(kMinPollers, kMaxPollers, gpr_thd_new, gpr_thd_join),
num_do_work_(0),
num_poll_for_work_(0),
num_work_found_(0) {}
grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override;
- void DoWork(void* tag, bool ok) override;
+ void DoWork(void* tag, bool ok, bool resources) override;
void PerformTest();
private:
@@ -89,7 +89,7 @@ grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void** tag,
}
}
-void ThreadManagerTest::DoWork(void* tag, bool ok) {
+void ThreadManagerTest::DoWork(void* tag, bool ok, bool resources) {
gpr_atm_no_barrier_fetch_add(&num_do_work_, 1);
SleepForMs(kDoWorkDurationMsec); // Simulate doing work by sleeping
}