diff options
Diffstat (limited to 'src/cpp/server')
-rw-r--r-- | src/cpp/server/server_builder.cc | 2 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 57 |
2 files changed, 46 insertions, 13 deletions
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index e0b9b7a62b..8417c45e64 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -263,7 +263,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { std::unique_ptr<Server> server(new Server( max_receive_message_size_, &args, sync_server_cqs, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, - sync_server_settings_.cq_timeout_msec)); + sync_server_settings_.cq_timeout_msec, resource_quota_)); if (has_sync_methods) { // This is a Sync server diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 0d77510e29..46872c85a1 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -47,6 +47,13 @@ namespace grpc { namespace { +// The default value for maximum number of threads that can be created in the +// sync server. This value of INT_MAX is chosen to match the default behavior if +// no ResourceQuota is set. To modify the max number of threads in a sync +// server, pass a custom ResourceQuota object (with the desired number of +// max-threads set) to the server builder. +#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX + class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: ~DefaultGlobalCallbacks() override {} @@ -204,8 +211,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { call_(mrd->call_, server, &cq_, server->max_receive_message_size()), ctx_(mrd->deadline_, &mrd->request_metadata_), has_request_payload_(mrd->has_request_payload_), - request_payload_(mrd->request_payload_), - method_(mrd->method_) { + request_payload_(has_request_payload_ ? mrd->request_payload_ + : nullptr), + method_(mrd->method_), + server_(server) { ctx_.set_call(mrd->call_); ctx_.cq_ = &cq_; GPR_ASSERT(mrd->in_flight_); @@ -219,10 +228,13 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { } } - void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks) { + void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, + bool resources) { ctx_.BeginCompletionOp(&call_); global_callbacks->PreSynchronousRequest(&ctx_); - method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter( + auto* handler = resources ? method_->handler() + : server_->resource_exhausted_handler_.get(); + handler->RunHandler(internal::MethodHandler::HandlerParameter( &call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; @@ -244,6 +256,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { const bool has_request_payload_; grpc_byte_buffer* request_payload_; internal::RpcServiceMethod* const method_; + Server* server_; }; private: @@ -266,9 +279,9 @@ class Server::SyncRequestThreadManager : public ThreadManager { public: SyncRequestThreadManager(Server* server, CompletionQueue* server_cq, std::shared_ptr<GlobalCallbacks> global_callbacks, - int min_pollers, int max_pollers, - int cq_timeout_msec) - : ThreadManager(min_pollers, max_pollers), + grpc_resource_quota* rq, int min_pollers, + int max_pollers, int cq_timeout_msec) + : ThreadManager("SyncServer", rq, min_pollers, max_pollers), server_(server), server_cq_(server_cq), cq_timeout_msec_(cq_timeout_msec), @@ -294,7 +307,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { GPR_UNREACHABLE_CODE(return TIMEOUT); } - void DoWork(void* tag, bool ok) override { + void DoWork(void* tag, bool ok, bool resources) override { SyncRequest* sync_req = static_cast<SyncRequest*>(tag); if (!sync_req) { @@ -314,7 +327,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { } GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(global_callbacks_); + cd.Run(global_callbacks_, resources); } // TODO (sreek) If ok is false here (which it isn't in case of // grpc_request_registered_call), we should still re-queue the request @@ -376,7 +389,8 @@ Server::Server( int max_receive_message_size, ChannelArguments* args, std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec) + int min_pollers, int max_pollers, int sync_cq_timeout_msec, + grpc_resource_quota* server_rq) : max_receive_message_size_(max_receive_message_size), sync_server_cqs_(std::move(sync_server_cqs)), started_(false), @@ -392,10 +406,22 @@ Server::Server( global_callbacks_->UpdateArguments(args); if (sync_server_cqs_ != nullptr) { + bool default_rq_created = false; + if (server_rq == nullptr) { + server_rq = grpc_resource_quota_create("SyncServer-default-rq"); + grpc_resource_quota_set_max_threads(server_rq, + DEFAULT_MAX_SYNC_SERVER_THREADS); + default_rq_created = true; + } + for (const auto& it : *sync_server_cqs_) { sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( - this, it.get(), global_callbacks_, min_pollers, max_pollers, - sync_cq_timeout_msec)); + this, it.get(), global_callbacks_, server_rq, min_pollers, + max_pollers, sync_cq_timeout_msec)); + } + + if (default_rq_created) { + grpc_resource_quota_unref(server_rq); } } @@ -559,6 +585,13 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { } } + // If this server has any support for synchronous methods (has any sync + // server CQs), make sure that we have a ResourceExhausted handler + // to deal with the case of thread exhaustion + if (!sync_server_cqs_->empty()) { + resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler); + } + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Start(); } |