diff options
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r-- | src/cpp/server/server_cc.cc | 36 |
1 files changed, 25 insertions, 11 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 7c764f4bce..7aeddff643 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -380,7 +380,6 @@ class Server::SyncRequestThreadManager : public ThreadManager { int cq_timeout_msec_; std::vector<std::unique_ptr<SyncRequest>> sync_requests_; std::unique_ptr<internal::RpcServiceMethod> unknown_method_; - std::unique_ptr<internal::RpcServiceMethod> health_check_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; }; @@ -573,16 +572,24 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { // Only create default health check service when user did not provide an // explicit one. + ServerCompletionQueue* health_check_cq = nullptr; + DefaultHealthCheckService::HealthCheckServiceImpl* + default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && DefaultHealthCheckServiceEnabled()) { - if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) { - gpr_log(GPR_INFO, - "Default health check service disabled at async-only server."); - } else { - auto* default_hc_service = new DefaultHealthCheckService; - health_check_service_.reset(default_hc_service); - RegisterService(nullptr, default_hc_service->GetHealthCheckService()); - } + auto* default_hc_service = new DefaultHealthCheckService; + health_check_service_.reset(default_hc_service); + // We create a non-polling CQ to avoid impacting application + // performance. This ensures that we don't introduce thread hops + // for application requests that wind up on this CQ, which is polled + // in its own thread. + health_check_cq = new ServerCompletionQueue(GRPC_CQ_NON_POLLING); + grpc_server_register_completion_queue(server_, health_check_cq->cq(), + nullptr); + default_health_check_service_impl = + default_hc_service->GetHealthCheckService( + std::unique_ptr<ServerCompletionQueue>(health_check_cq)); + RegisterService(nullptr, default_health_check_service_impl); } grpc_server_start(server_); @@ -597,6 +604,9 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { new UnimplementedAsyncRequest(this, cqs[i]); } } + if (health_check_cq != nullptr) { + new UnimplementedAsyncRequest(this, health_check_cq); + } } // If this server has any support for synchronous methods (has any sync @@ -609,6 +619,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Start(); } + + if (default_health_check_service_impl != nullptr) { + default_health_check_service_impl->StartServingThread(); + } } void Server::ShutdownInternal(gpr_timespec deadline) { @@ -671,8 +685,8 @@ void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, size_t nops = 0; grpc_op cops[MAX_OPS]; ops->FillOps(call->call(), cops, &nops); - // TODO(vjpai): Use ops->cq_tag once this case supports callbacks - auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); + auto result = + grpc_call_start_batch(call->call(), cops, nops, ops->cq_tag(), nullptr); if (result != GRPC_CALL_OK) { gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result); grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR, |