diff options
author | 2018-08-14 15:04:35 -0700 | |
---|---|---|
committer | 2018-08-14 16:23:21 -0700 | |
commit | 14ad82a76de99de39460d901cf44767308859ae0 (patch) | |
tree | 5b5651416c040123b77b65cfe978258a347472c3 | |
parent | 8165c4c0aaa0fc020f1c74a67d4e50dff6c5e9b1 (diff) |
Create a new method handler for resource exhaustion and tie into thread mgr
-rw-r--r-- | include/grpcpp/impl/codegen/byte_buffer.h | 4 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/completion_queue.h | 6 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/method_handler_impl.h | 17 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/server_context.h | 6 | ||||
-rw-r--r-- | include/grpcpp/server.h | 3 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 22 | ||||
-rw-r--r-- | src/cpp/thread_manager/thread_manager.cc | 35 | ||||
-rw-r--r-- | src/cpp/thread_manager/thread_manager.h | 2 | ||||
-rw-r--r-- | test/cpp/end2end/thread_stress_test.cc | 113 | ||||
-rw-r--r-- | test/cpp/thread_manager/thread_manager_test.cc | 4 |
10 files changed, 125 insertions, 87 deletions
diff --git a/include/grpcpp/impl/codegen/byte_buffer.h b/include/grpcpp/impl/codegen/byte_buffer.h index 86c047ebe7..8cc5158115 100644 --- a/include/grpcpp/impl/codegen/byte_buffer.h +++ b/include/grpcpp/impl/codegen/byte_buffer.h @@ -45,6 +45,8 @@ template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; +template <StatusCode code> +class ErrorMethodHandler; template <class R> class DeserializeFuncType; class GrpcByteBufferPeer; @@ -144,6 +146,8 @@ class ByteBuffer final { friend class internal::RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> friend class internal::ServerStreamingHandler; + template <StatusCode code> + friend class internal::ErrorMethodHandler; template <class R> friend class internal::DeserializeFuncType; friend class ProtoBufferReader; diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index 272575dac2..3f7d4fb765 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -78,9 +78,10 @@ template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; -class UnknownMethodHandler; template <class Streamer, bool WriteNeeded> class TemplatedBidiStreamingHandler; +template <StatusCode code> +class ErrorMethodHandler; template <class InputMessage, class OutputMessage> class BlockingUnaryCallImpl; } // namespace internal @@ -265,7 +266,8 @@ class CompletionQueue : private GrpcLibraryCodegen { friend class ::grpc::internal::ServerStreamingHandler; template <class Streamer, bool WriteNeeded> friend class ::grpc::internal::TemplatedBidiStreamingHandler; - friend class ::grpc::internal::UnknownMethodHandler; + template <StatusCode code> + friend class ::grpc::internal::ErrorMethodHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; friend class ::grpc::ServerInterface; diff --git a/include/grpcpp/impl/codegen/method_handler_impl.h b/include/grpcpp/impl/codegen/method_handler_impl.h index 851aa2a024..53117f941b 100644 --- a/include/grpcpp/impl/codegen/method_handler_impl.h +++ b/include/grpcpp/impl/codegen/method_handler_impl.h @@ -272,12 +272,14 @@ class SplitServerStreamingHandler ServerSplitStreamer<RequestType, ResponseType>, false>(func) {} }; -/// Handle unknown method by returning UNIMPLEMENTED error. -class UnknownMethodHandler : public MethodHandler { +/// General method handler class for errors that prevent real method use +/// e.g., handle unknown method by returning UNIMPLEMENTED error. +template <StatusCode code> +class ErrorMethodHandler : public MethodHandler { public: template <class T> static void FillOps(ServerContext* context, T* ops) { - Status status(StatusCode::UNIMPLEMENTED, ""); + Status status(code, ""); if (!context->sent_initial_metadata_) { ops->SendInitialMetadata(context->initial_metadata_, context->initial_metadata_flags()); @@ -294,9 +296,18 @@ class UnknownMethodHandler : public MethodHandler { FillOps(param.server_context, &ops); param.call->PerformOps(&ops); param.call->cq()->Pluck(&ops); + // We also have to destroy any request payload in the handler parameter + ByteBuffer* payload = param.request.bbuf_ptr(); + if (payload != nullptr) { + payload->Clear(); + } } }; +typedef ErrorMethodHandler<StatusCode::UNIMPLEMENTED> UnknownMethodHandler; +typedef ErrorMethodHandler<StatusCode::RESOURCE_EXHAUSTED> + ResourceExhaustedHandler; + } // namespace internal } // namespace grpc diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index 153b404d9e..10372de129 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -63,9 +63,10 @@ template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; -class UnknownMethodHandler; template <class Streamer, bool WriteNeeded> class TemplatedBidiStreamingHandler; +template <StatusCode code> +class ErrorMethodHandler; class Call; } // namespace internal @@ -262,7 +263,8 @@ class ServerContext { friend class ::grpc::internal::ServerStreamingHandler; template <class Streamer, bool WriteNeeded> friend class ::grpc::internal::TemplatedBidiStreamingHandler; - friend class ::grpc::internal::UnknownMethodHandler; + template <StatusCode code> + friend class internal::ErrorMethodHandler; friend class ::grpc::ClientContext; /// Prevent copying. diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h index 189d8bec22..72544c0f0b 100644 --- a/include/grpcpp/server.h +++ b/include/grpcpp/server.h @@ -223,6 +223,9 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { std::unique_ptr<HealthCheckServiceInterface> health_check_service_; bool health_check_service_disabled_; + + // A special handler for resource exhausted in sync case + std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_; }; } // namespace grpc diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index d32d6b4904..66d432c910 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -210,8 +210,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { call_(mrd->call_, server, &cq_, server->max_receive_message_size()), ctx_(mrd->deadline_, &mrd->request_metadata_), has_request_payload_(mrd->has_request_payload_), - request_payload_(mrd->request_payload_), - method_(mrd->method_) { + request_payload_(has_request_payload_ ? mrd->request_payload_ + : nullptr), + method_(mrd->method_), + server_(server) { ctx_.set_call(mrd->call_); ctx_.cq_ = &cq_; GPR_ASSERT(mrd->in_flight_); @@ -225,10 +227,13 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { } } - void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks) { + void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, + bool resources) { ctx_.BeginCompletionOp(&call_); global_callbacks->PreSynchronousRequest(&ctx_); - method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter( + auto* handler = resources ? method_->handler() + : server_->resource_exhausted_handler_.get(); + handler->RunHandler(internal::MethodHandler::HandlerParameter( &call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; @@ -250,6 +255,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { const bool has_request_payload_; grpc_byte_buffer* request_payload_; internal::RpcServiceMethod* const method_; + Server* server_; }; private: @@ -300,7 +306,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { GPR_UNREACHABLE_CODE(return TIMEOUT); } - void DoWork(void* tag, bool ok) override { + void DoWork(void* tag, bool ok, bool resources) override { SyncRequest* sync_req = static_cast<SyncRequest*>(tag); if (!sync_req) { @@ -320,7 +326,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { } GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(global_callbacks_); + cd.Run(global_callbacks_, resources); } // TODO (sreek) If ok is false here (which it isn't in case of // grpc_request_registered_call), we should still re-queue the request @@ -578,6 +584,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { } } + if (!sync_server_cqs_->empty()) { + resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler); + } + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Start(); } diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index fa9eec5f9b..e48bf951ea 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -166,22 +166,37 @@ void ThreadManager::MainWorkLoop() { case WORK_FOUND: // If we got work and there are now insufficient pollers and there is // quota available to create a new thread, start a new poller thread - if (!shutdown_ && num_pollers_ < min_pollers_ && - grpc_resource_user_allocate_threads(resource_user_, 1)) { - num_pollers_++; - num_threads_++; - if (num_threads_ > max_active_threads_sofar_) { - max_active_threads_sofar_ = num_threads_; + bool got_thread; + if (!shutdown_ && num_pollers_ < min_pollers_) { + if (grpc_resource_user_allocate_threads(resource_user_, 1)) { + num_pollers_++; + num_threads_++; + if (num_threads_ > max_active_threads_sofar_) { + max_active_threads_sofar_ = num_threads_; + } + // Drop lock before spawning thread to avoid contention + lock.unlock(); + new WorkerThread(this); + got_thread = true; + } else if (num_pollers_ > 0) { + // There is still at least some thread polling, so we can go on + // even though we couldn't allocate a new thread + lock.unlock(); + got_thread = true; + } else { + // There are no pollers to spare and we couldn't allocate + // a new thread, so resources are exhausted! + lock.unlock(); + got_thread = false; } - // Drop lock before spawning thread to avoid contention - lock.unlock(); - new WorkerThread(this); } else { // Drop lock for consistency with above branch lock.unlock(); + got_thread = true; } // Lock is always released at this point - do the application work - DoWork(tag, ok); + // or return resource exhausted + DoWork(tag, ok, got_thread); // Take the lock again to check post conditions lock.lock(); // If we're shutdown, we should finish at this point. diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index 01043edb31..352f80baf4 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -72,7 +72,7 @@ class ThreadManager { // The implementation of DoWork() should also do any setup needed to ensure // that the next call to PollForWork() (not necessarily by the current thread) // actually finds some work - virtual void DoWork(void* tag, bool ok) = 0; + virtual void DoWork(void* tag, bool ok, bool resources) = 0; // Mark the ThreadManager as shutdown and begin draining the work. This is a // non-blocking call and the caller should call Wait(), a blocking call which diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index ccf8400a87..94ad684fe7 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -16,6 +16,7 @@ * */ +#include <cinttypes> #include <mutex> #include <thread> @@ -24,6 +25,7 @@ #include <grpcpp/channel.h> #include <grpcpp/client_context.h> #include <grpcpp/create_channel.h> +#include <grpcpp/resource_quota.h> #include <grpcpp/server.h> #include <grpcpp/server_builder.h> #include <grpcpp/server_context.h> @@ -51,63 +53,13 @@ namespace testing { class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { public: - TestServiceImpl() : signal_client_(false) {} + TestServiceImpl() {} Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) override { response->set_message(request->message()); return Status::OK; } - - // Unimplemented is left unimplemented to test the returned error. - - Status RequestStream(ServerContext* context, - ServerReader<EchoRequest>* reader, - EchoResponse* response) override { - EchoRequest request; - response->set_message(""); - while (reader->Read(&request)) { - response->mutable_message()->append(request.message()); - } - return Status::OK; - } - - // Return 3 messages. - // TODO(yangg) make it generic by adding a parameter into EchoRequest - Status ResponseStream(ServerContext* context, const EchoRequest* request, - ServerWriter<EchoResponse>* writer) override { - EchoResponse response; - response.set_message(request->message() + "0"); - writer->Write(response); - response.set_message(request->message() + "1"); - writer->Write(response); - response.set_message(request->message() + "2"); - writer->Write(response); - - return Status::OK; - } - - Status BidiStream( - ServerContext* context, - ServerReaderWriter<EchoResponse, EchoRequest>* stream) override { - EchoRequest request; - EchoResponse response; - while (stream->Read(&request)) { - gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); - response.set_message(request.message()); - stream->Write(response); - } - return Status::OK; - } - - bool signal_client() { - std::unique_lock<std::mutex> lock(mu_); - return signal_client_; - } - - private: - bool signal_client_; - std::mutex mu_; }; template <class Service> @@ -118,6 +70,7 @@ class CommonStressTest { virtual void SetUp() = 0; virtual void TearDown() = 0; virtual void ResetStub() = 0; + virtual bool AllowExhaustion() = 0; grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); } protected: @@ -146,6 +99,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> { CreateChannel(server_address_.str(), InsecureChannelCredentials()); this->stub_ = grpc::testing::EchoTestService::NewStub(channel); } + bool AllowExhaustion() override { return false; } protected: void SetUpStart(ServerBuilder* builder, Service* service) override { @@ -161,7 +115,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> { std::ostringstream server_address_; }; -template <class Service> +template <class Service, bool allow_resource_exhaustion> class CommonStressTestInproc : public CommonStressTest<Service> { public: void ResetStub() override { @@ -169,6 +123,7 @@ class CommonStressTestInproc : public CommonStressTest<Service> { std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args); this->stub_ = grpc::testing::EchoTestService::NewStub(channel); } + bool AllowExhaustion() override { return allow_resource_exhaustion; } protected: void SetUpStart(ServerBuilder* builder, Service* service) override { @@ -194,6 +149,26 @@ class CommonStressTestSyncServer : public BaseClass { }; template <class BaseClass> +class CommonStressTestSyncServerLowThreadCount : public BaseClass { + public: + void SetUp() override { + ServerBuilder builder; + ResourceQuota quota; + this->SetUpStart(&builder, &service_); + quota.SetMaxThreads(4); + builder.SetResourceQuota(quota); + this->SetUpEnd(&builder); + } + void TearDown() override { + this->TearDownStart(); + this->TearDownEnd(); + } + + private: + TestServiceImpl service_; +}; + +template <class BaseClass> class CommonStressTestAsyncServer : public BaseClass { public: CommonStressTestAsyncServer() : contexts_(kNumAsyncServerThreads * 100) {} @@ -293,7 +268,8 @@ class End2endTest : public ::testing::Test { Common common_; }; -static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { +static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs, + bool allow_exhaustion, gpr_atm* errors) { EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -301,34 +277,49 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { for (int i = 0; i < num_rpcs; ++i) { ClientContext context; Status s = stub->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok() || (allow_exhaustion && + s.error_code() == StatusCode::RESOURCE_EXHAUSTED)); if (!s.ok()) { - gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), - s.error_message().c_str()); + if (!(allow_exhaustion && + s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) { + gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), + s.error_message().c_str()); + } + gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1)); + } else { + EXPECT_EQ(response.message(), request.message()); } - ASSERT_TRUE(s.ok()); } } typedef ::testing::Types< CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>, - CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl>>, + CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>, + CommonStressTestSyncServerLowThreadCount< + CommonStressTestInproc<TestServiceImpl, true>>, CommonStressTestAsyncServer< CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>, - CommonStressTestAsyncServer< - CommonStressTestInproc<grpc::testing::EchoTestService::AsyncService>>> + CommonStressTestAsyncServer<CommonStressTestInproc< + grpc::testing::EchoTestService::AsyncService, false>>> CommonTypes; TYPED_TEST_CASE(End2endTest, CommonTypes); TYPED_TEST(End2endTest, ThreadStress) { this->common_.ResetStub(); std::vector<std::thread> threads; + gpr_atm errors; + gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0)); threads.reserve(kNumThreads); for (int i = 0; i < kNumThreads; ++i) { - threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs); + threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs, + this->common_.AllowExhaustion(), &errors); } for (int i = 0; i < kNumThreads; ++i) { threads[i].join(); } + uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors)); + if (error_cnt != 0) { + gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt); + } } template <class Common> diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc index 838f5f72ad..99de5a3e01 100644 --- a/test/cpp/thread_manager/thread_manager_test.cc +++ b/test/cpp/thread_manager/thread_manager_test.cc @@ -55,7 +55,7 @@ class ThreadManagerTest final : public grpc::ThreadManager { num_work_found_(0) {} grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override; - void DoWork(void* tag, bool ok) override; + void DoWork(void* tag, bool ok, bool resources) override; // Get number of times PollForWork() returned WORK_FOUND int GetNumWorkFound(); @@ -102,7 +102,7 @@ grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void** tag, return WORK_FOUND; } -void ThreadManagerTest::DoWork(void* tag, bool ok) { +void ThreadManagerTest::DoWork(void* tag, bool ok, bool resources) { gpr_atm_no_barrier_fetch_add(&num_do_work_, 1); SleepForMs(settings_.work_duration_ms); // Simulate work by sleeping } |