diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/cpp/end2end/thread_stress_test.cc | 157 | ||||
-rw-r--r-- | test/cpp/thread_manager/BUILD | 31 | ||||
-rw-r--r-- | test/cpp/thread_manager/thread_manager_test.cc | 8 |
3 files changed, 131 insertions, 65 deletions
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index 90b2eddbbb..fd43c8f584 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -26,6 +26,7 @@ #include <grpc++/server_builder.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> +#include <grpc/support/atm.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> @@ -52,63 +53,13 @@ namespace testing { class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { public: - TestServiceImpl() : signal_client_(false) {} + TestServiceImpl() {} Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) override { response->set_message(request->message()); return Status::OK; } - - // Unimplemented is left unimplemented to test the returned error. - - Status RequestStream(ServerContext* context, - ServerReader<EchoRequest>* reader, - EchoResponse* response) override { - EchoRequest request; - response->set_message(""); - while (reader->Read(&request)) { - response->mutable_message()->append(request.message()); - } - return Status::OK; - } - - // Return 3 messages. - // TODO(yangg) make it generic by adding a parameter into EchoRequest - Status ResponseStream(ServerContext* context, const EchoRequest* request, - ServerWriter<EchoResponse>* writer) override { - EchoResponse response; - response.set_message(request->message() + "0"); - writer->Write(response); - response.set_message(request->message() + "1"); - writer->Write(response); - response.set_message(request->message() + "2"); - writer->Write(response); - - return Status::OK; - } - - Status BidiStream( - ServerContext* context, - ServerReaderWriter<EchoResponse, EchoRequest>* stream) override { - EchoRequest request; - EchoResponse response; - while (stream->Read(&request)) { - gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); - response.set_message(request.message()); - stream->Write(response); - } - return Status::OK; - } - - bool signal_client() { - std::unique_lock<std::mutex> lock(mu_); - return signal_client_; - } - - private: - bool signal_client_; - std::mutex mu_; }; template <class Service> @@ -119,10 +70,15 @@ class CommonStressTest { virtual void SetUp() = 0; virtual void TearDown() = 0; virtual void ResetStub() = 0; + virtual bool AllowExhaustion() = 0; grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); } protected: std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; + // Some tests use a custom thread creator. This should be declared before the + // server so that it's destructor happens after the server + std::unique_ptr<ServerBuilderThreadCreatorOverrideTest> creator_; + std::unique_ptr<Server> server_; virtual void SetUpStart(ServerBuilder* builder, Service* service) = 0; @@ -147,6 +103,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> { CreateChannel(server_address_.str(), InsecureChannelCredentials()); this->stub_ = grpc::testing::EchoTestService::NewStub(channel); } + bool AllowExhaustion() override { return false; } protected: void SetUpStart(ServerBuilder* builder, Service* service) override { @@ -162,7 +119,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> { std::ostringstream server_address_; }; -template <class Service> +template <class Service, bool allow_resource_exhaustion> class CommonStressTestInproc : public CommonStressTest<Service> { public: void ResetStub() override { @@ -170,6 +127,7 @@ class CommonStressTestInproc : public CommonStressTest<Service> { std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args); this->stub_ = grpc::testing::EchoTestService::NewStub(channel); } + bool AllowExhaustion() override { return allow_resource_exhaustion; } protected: void SetUpStart(ServerBuilder* builder, Service* service) override { @@ -194,6 +152,67 @@ class CommonStressTestSyncServer : public BaseClass { TestServiceImpl service_; }; +class ServerBuilderThreadCreatorOverrideTest { + public: + ServerBuilderThreadCreatorOverrideTest(ServerBuilder* builder, size_t limit) + : limit_(limit), threads_(0) { + builder->SetThreadFunctions( + [this](gpr_thd_id* id, const char* name, void (*f)(void*), void* arg, + const gpr_thd_options* options) -> int { + std::unique_lock<std::mutex> l(mu_); + if (threads_ < limit_) { + l.unlock(); + if (gpr_thd_new(id, name, f, arg, options) != 0) { + l.lock(); + threads_++; + return 1; + } + } + return 0; + }, + [this](gpr_thd_id id) { + gpr_thd_join(id); + std::unique_lock<std::mutex> l(mu_); + threads_--; + if (threads_ == 0) { + done_.notify_one(); + } + }); + } + ~ServerBuilderThreadCreatorOverrideTest() { + // Don't allow destruction until all threads are really done and uncounted + std::unique_lock<std::mutex> l(mu_); + done_.wait(l, [this] { return (threads_ == 0); }); + } + + private: + size_t limit_; + size_t threads_; + std::mutex mu_; + std::condition_variable done_; +}; + +template <class BaseClass> +class CommonStressTestSyncServerLowThreadCount : public BaseClass { + public: + void SetUp() override { + ServerBuilder builder; + this->SetUpStart(&builder, &service_); + builder.SetSyncServerOption(ServerBuilder::SyncServerOption::MIN_POLLERS, + 1); + this->creator_.reset( + new ServerBuilderThreadCreatorOverrideTest(&builder, 4)); + this->SetUpEnd(&builder); + } + void TearDown() override { + this->TearDownStart(); + this->TearDownEnd(); + } + + private: + TestServiceImpl service_; +}; + template <class BaseClass> class CommonStressTestAsyncServer : public BaseClass { public: @@ -294,7 +313,8 @@ class End2endTest : public ::testing::Test { Common common_; }; -static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { +static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs, + bool allow_exhaustion, gpr_atm* errors) { EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -302,33 +322,48 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { for (int i = 0; i < num_rpcs; ++i) { ClientContext context; Status s = stub->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok() || (allow_exhaustion && + s.error_code() == StatusCode::RESOURCE_EXHAUSTED)); if (!s.ok()) { - gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), - s.error_message().c_str()); + if (!(allow_exhaustion && + s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) { + gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), + s.error_message().c_str()); + } + gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1)); + } else { + EXPECT_EQ(response.message(), request.message()); } - ASSERT_TRUE(s.ok()); } } typedef ::testing::Types< CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>, - CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl>>, + CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>, + CommonStressTestSyncServerLowThreadCount< + CommonStressTestInproc<TestServiceImpl, true>>, CommonStressTestAsyncServer< CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>, - CommonStressTestAsyncServer< - CommonStressTestInproc<grpc::testing::EchoTestService::AsyncService>>> + CommonStressTestAsyncServer<CommonStressTestInproc< + grpc::testing::EchoTestService::AsyncService, false>>> CommonTypes; TYPED_TEST_CASE(End2endTest, CommonTypes); TYPED_TEST(End2endTest, ThreadStress) { this->common_.ResetStub(); std::vector<std::thread> threads; + gpr_atm errors; + gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0)); for (int i = 0; i < kNumThreads; ++i) { - threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs); + threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs, + this->common_.AllowExhaustion(), &errors); } for (int i = 0; i < kNumThreads; ++i) { threads[i].join(); } + uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors)); + if (error_cnt != 0) { + gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt); + } } template <class Common> diff --git a/test/cpp/thread_manager/BUILD b/test/cpp/thread_manager/BUILD new file mode 100644 index 0000000000..1f0878770b --- /dev/null +++ b/test/cpp/thread_manager/BUILD @@ -0,0 +1,31 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +licenses(["notice"]) # Apache v2 + +load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package") + +grpc_package(name = "test/cpp/thread_manager") + +grpc_cc_test( + name = "thread_manager_test", + srcs = ["thread_manager_test.cc"], + deps = [ + "//:gpr", + "//:grpc", + "//:grpc++", + "//test/cpp/util:test_config", + ], +) + diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc index 8282d46694..d3d31f9dd9 100644 --- a/test/cpp/thread_manager/thread_manager_test.cc +++ b/test/cpp/thread_manager/thread_manager_test.cc @@ -20,10 +20,10 @@ #include <memory> #include <string> -#include <gflags/gflags.h> #include <grpc++/grpc++.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> +#include <grpc/support/thd.h> #include "src/cpp/thread_manager/thread_manager.h" #include "test/cpp/util/test_config.h" @@ -32,13 +32,13 @@ namespace grpc { class ThreadManagerTest final : public grpc::ThreadManager { public: ThreadManagerTest() - : ThreadManager(kMinPollers, kMaxPollers), + : ThreadManager(kMinPollers, kMaxPollers, gpr_thd_new, gpr_thd_join), num_do_work_(0), num_poll_for_work_(0), num_work_found_(0) {} grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override; - void DoWork(void* tag, bool ok) override; + void DoWork(void* tag, bool ok, bool resources) override; void PerformTest(); private: @@ -89,7 +89,7 @@ grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void** tag, } } -void ThreadManagerTest::DoWork(void* tag, bool ok) { +void ThreadManagerTest::DoWork(void* tag, bool ok, bool resources) { gpr_atm_no_barrier_fetch_add(&num_do_work_, 1); SleepForMs(kDoWorkDurationMsec); // Simulate doing work by sleeping } |