aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server_cc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r--src/cpp/server/server_cc.cc84
1 files changed, 64 insertions, 20 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 0d77510e29..3cadf65c80 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -47,6 +47,13 @@
namespace grpc {
namespace {
+// The default value for maximum number of threads that can be created in the
+// sync server. This value of INT_MAX is chosen to match the default behavior if
+// no ResourceQuota is set. To modify the max number of threads in a sync
+// server, pass a custom ResourceQuota object (with the desired number of
+// max-threads set) to the server builder.
+#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
+
class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
public:
~DefaultGlobalCallbacks() override {}
@@ -204,8 +211,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
call_(mrd->call_, server, &cq_, server->max_receive_message_size()),
ctx_(mrd->deadline_, &mrd->request_metadata_),
has_request_payload_(mrd->has_request_payload_),
- request_payload_(mrd->request_payload_),
- method_(mrd->method_) {
+ request_payload_(has_request_payload_ ? mrd->request_payload_
+ : nullptr),
+ method_(mrd->method_),
+ server_(server) {
ctx_.set_call(mrd->call_);
ctx_.cq_ = &cq_;
GPR_ASSERT(mrd->in_flight_);
@@ -219,10 +228,13 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
}
}
- void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks) {
+ void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
+ bool resources) {
ctx_.BeginCompletionOp(&call_);
global_callbacks->PreSynchronousRequest(&ctx_);
- method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter(
+ auto* handler = resources ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ handler->RunHandler(internal::MethodHandler::HandlerParameter(
&call_, &ctx_, request_payload_));
global_callbacks->PostSynchronousRequest(&ctx_);
request_payload_ = nullptr;
@@ -244,6 +256,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
internal::RpcServiceMethod* const method_;
+ Server* server_;
};
private:
@@ -266,9 +279,9 @@ class Server::SyncRequestThreadManager : public ThreadManager {
public:
SyncRequestThreadManager(Server* server, CompletionQueue* server_cq,
std::shared_ptr<GlobalCallbacks> global_callbacks,
- int min_pollers, int max_pollers,
- int cq_timeout_msec)
- : ThreadManager(min_pollers, max_pollers),
+ grpc_resource_quota* rq, int min_pollers,
+ int max_pollers, int cq_timeout_msec)
+ : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
server_(server),
server_cq_(server_cq),
cq_timeout_msec_(cq_timeout_msec),
@@ -294,7 +307,7 @@ class Server::SyncRequestThreadManager : public ThreadManager {
GPR_UNREACHABLE_CODE(return TIMEOUT);
}
- void DoWork(void* tag, bool ok) override {
+ void DoWork(void* tag, bool ok, bool resources) override {
SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
if (!sync_req) {
@@ -314,7 +327,7 @@ class Server::SyncRequestThreadManager : public ThreadManager {
}
GPR_TIMER_SCOPE("cd.Run()", 0);
- cd.Run(global_callbacks_);
+ cd.Run(global_callbacks_, resources);
}
// TODO (sreek) If ok is false here (which it isn't in case of
// grpc_request_registered_call), we should still re-queue the request
@@ -376,7 +389,8 @@ Server::Server(
int max_receive_message_size, ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
- int min_pollers, int max_pollers, int sync_cq_timeout_msec)
+ int min_pollers, int max_pollers, int sync_cq_timeout_msec,
+ grpc_resource_quota* server_rq)
: max_receive_message_size_(max_receive_message_size),
sync_server_cqs_(std::move(sync_server_cqs)),
started_(false),
@@ -392,10 +406,22 @@ Server::Server(
global_callbacks_->UpdateArguments(args);
if (sync_server_cqs_ != nullptr) {
+ bool default_rq_created = false;
+ if (server_rq == nullptr) {
+ server_rq = grpc_resource_quota_create("SyncServer-default-rq");
+ grpc_resource_quota_set_max_threads(server_rq,
+ DEFAULT_MAX_SYNC_SERVER_THREADS);
+ default_rq_created = true;
+ }
+
for (const auto& it : *sync_server_cqs_) {
sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
- this, it.get(), global_callbacks_, min_pollers, max_pollers,
- sync_cq_timeout_msec));
+ this, it.get(), global_callbacks_, server_rq, min_pollers,
+ max_pollers, sync_cq_timeout_msec));
+ }
+
+ if (default_rq_created) {
+ grpc_resource_quota_unref(server_rq);
}
}
@@ -533,16 +559,20 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
// Only create default health check service when user did not provide an
// explicit one.
+ ServerCompletionQueue* health_check_cq = nullptr;
+ DefaultHealthCheckService::HealthCheckServiceImpl*
+ default_health_check_service_impl = nullptr;
if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
DefaultHealthCheckServiceEnabled()) {
- if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) {
- gpr_log(GPR_INFO,
- "Default health check service disabled at async-only server.");
- } else {
- auto* default_hc_service = new DefaultHealthCheckService;
- health_check_service_.reset(default_hc_service);
- RegisterService(nullptr, default_hc_service->GetHealthCheckService());
- }
+ auto* default_hc_service = new DefaultHealthCheckService;
+ health_check_service_.reset(default_hc_service);
+ health_check_cq = new ServerCompletionQueue(GRPC_CQ_DEFAULT_POLLING);
+ grpc_server_register_completion_queue(server_, health_check_cq->cq(),
+ nullptr);
+ default_health_check_service_impl =
+ default_hc_service->GetHealthCheckService(
+ std::unique_ptr<ServerCompletionQueue>(health_check_cq));
+ RegisterService(nullptr, default_health_check_service_impl);
}
grpc_server_start(server_);
@@ -557,11 +587,25 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
+ if (health_check_cq != nullptr) {
+ new UnimplementedAsyncRequest(this, health_check_cq);
+ }
+ }
+
+ // If this server has any support for synchronous methods (has any sync
+ // server CQs), make sure that we have a ResourceExhausted handler
+ // to deal with the case of thread exhaustion
+ if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
+ resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler);
}
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start();
}
+
+ if (default_health_check_service_impl != nullptr) {
+ default_health_check_service_impl->StartServingThread();
+ }
}
void Server::ShutdownInternal(gpr_timespec deadline) {