aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server_cc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r--src/cpp/server/server_cc.cc57
1 files changed, 45 insertions, 12 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 0d77510e29..46872c85a1 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -47,6 +47,13 @@
namespace grpc {
namespace {
+// The default value for maximum number of threads that can be created in the
+// sync server. This value of INT_MAX is chosen to match the default behavior if
+// no ResourceQuota is set. To modify the max number of threads in a sync
+// server, pass a custom ResourceQuota object (with the desired number of
+// max-threads set) to the server builder.
+#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
+
class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
public:
~DefaultGlobalCallbacks() override {}
@@ -204,8 +211,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
call_(mrd->call_, server, &cq_, server->max_receive_message_size()),
ctx_(mrd->deadline_, &mrd->request_metadata_),
has_request_payload_(mrd->has_request_payload_),
- request_payload_(mrd->request_payload_),
- method_(mrd->method_) {
+ request_payload_(has_request_payload_ ? mrd->request_payload_
+ : nullptr),
+ method_(mrd->method_),
+ server_(server) {
ctx_.set_call(mrd->call_);
ctx_.cq_ = &cq_;
GPR_ASSERT(mrd->in_flight_);
@@ -219,10 +228,13 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
}
}
- void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks) {
+ void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
+ bool resources) {
ctx_.BeginCompletionOp(&call_);
global_callbacks->PreSynchronousRequest(&ctx_);
- method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter(
+ auto* handler = resources ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ handler->RunHandler(internal::MethodHandler::HandlerParameter(
&call_, &ctx_, request_payload_));
global_callbacks->PostSynchronousRequest(&ctx_);
request_payload_ = nullptr;
@@ -244,6 +256,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
internal::RpcServiceMethod* const method_;
+ Server* server_;
};
private:
@@ -266,9 +279,9 @@ class Server::SyncRequestThreadManager : public ThreadManager {
public:
SyncRequestThreadManager(Server* server, CompletionQueue* server_cq,
std::shared_ptr<GlobalCallbacks> global_callbacks,
- int min_pollers, int max_pollers,
- int cq_timeout_msec)
- : ThreadManager(min_pollers, max_pollers),
+ grpc_resource_quota* rq, int min_pollers,
+ int max_pollers, int cq_timeout_msec)
+ : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
server_(server),
server_cq_(server_cq),
cq_timeout_msec_(cq_timeout_msec),
@@ -294,7 +307,7 @@ class Server::SyncRequestThreadManager : public ThreadManager {
GPR_UNREACHABLE_CODE(return TIMEOUT);
}
- void DoWork(void* tag, bool ok) override {
+ void DoWork(void* tag, bool ok, bool resources) override {
SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
if (!sync_req) {
@@ -314,7 +327,7 @@ class Server::SyncRequestThreadManager : public ThreadManager {
}
GPR_TIMER_SCOPE("cd.Run()", 0);
- cd.Run(global_callbacks_);
+ cd.Run(global_callbacks_, resources);
}
// TODO (sreek) If ok is false here (which it isn't in case of
// grpc_request_registered_call), we should still re-queue the request
@@ -376,7 +389,8 @@ Server::Server(
int max_receive_message_size, ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
- int min_pollers, int max_pollers, int sync_cq_timeout_msec)
+ int min_pollers, int max_pollers, int sync_cq_timeout_msec,
+ grpc_resource_quota* server_rq)
: max_receive_message_size_(max_receive_message_size),
sync_server_cqs_(std::move(sync_server_cqs)),
started_(false),
@@ -392,10 +406,22 @@ Server::Server(
global_callbacks_->UpdateArguments(args);
if (sync_server_cqs_ != nullptr) {
+ bool default_rq_created = false;
+ if (server_rq == nullptr) {
+ server_rq = grpc_resource_quota_create("SyncServer-default-rq");
+ grpc_resource_quota_set_max_threads(server_rq,
+ DEFAULT_MAX_SYNC_SERVER_THREADS);
+ default_rq_created = true;
+ }
+
for (const auto& it : *sync_server_cqs_) {
sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
- this, it.get(), global_callbacks_, min_pollers, max_pollers,
- sync_cq_timeout_msec));
+ this, it.get(), global_callbacks_, server_rq, min_pollers,
+ max_pollers, sync_cq_timeout_msec));
+ }
+
+ if (default_rq_created) {
+ grpc_resource_quota_unref(server_rq);
}
}
@@ -559,6 +585,13 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
}
}
+ // If this server has any support for synchronous methods (has any sync
+ // server CQs), make sure that we have a ResourceExhausted handler
+ // to deal with the case of thread exhaustion
+ if (!sync_server_cqs_->empty()) {
+ resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler);
+ }
+
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start();
}