aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server_cc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r--src/cpp/server/server_cc.cc36
1 files changed, 25 insertions, 11 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 7c764f4bce..7aeddff643 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -380,7 +380,6 @@ class Server::SyncRequestThreadManager : public ThreadManager {
int cq_timeout_msec_;
std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
std::unique_ptr<internal::RpcServiceMethod> unknown_method_;
- std::unique_ptr<internal::RpcServiceMethod> health_check_;
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
};
@@ -573,16 +572,24 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
// Only create default health check service when user did not provide an
// explicit one.
+ ServerCompletionQueue* health_check_cq = nullptr;
+ DefaultHealthCheckService::HealthCheckServiceImpl*
+ default_health_check_service_impl = nullptr;
if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
DefaultHealthCheckServiceEnabled()) {
- if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) {
- gpr_log(GPR_INFO,
- "Default health check service disabled at async-only server.");
- } else {
- auto* default_hc_service = new DefaultHealthCheckService;
- health_check_service_.reset(default_hc_service);
- RegisterService(nullptr, default_hc_service->GetHealthCheckService());
- }
+ auto* default_hc_service = new DefaultHealthCheckService;
+ health_check_service_.reset(default_hc_service);
+ // We create a non-polling CQ to avoid impacting application
+ // performance. This ensures that we don't introduce thread hops
+ // for application requests that wind up on this CQ, which is polled
+ // in its own thread.
+ health_check_cq = new ServerCompletionQueue(GRPC_CQ_NON_POLLING);
+ grpc_server_register_completion_queue(server_, health_check_cq->cq(),
+ nullptr);
+ default_health_check_service_impl =
+ default_hc_service->GetHealthCheckService(
+ std::unique_ptr<ServerCompletionQueue>(health_check_cq));
+ RegisterService(nullptr, default_health_check_service_impl);
}
grpc_server_start(server_);
@@ -597,6 +604,9 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
+ if (health_check_cq != nullptr) {
+ new UnimplementedAsyncRequest(this, health_check_cq);
+ }
}
// If this server has any support for synchronous methods (has any sync
@@ -609,6 +619,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start();
}
+
+ if (default_health_check_service_impl != nullptr) {
+ default_health_check_service_impl->StartServingThread();
+ }
}
void Server::ShutdownInternal(gpr_timespec deadline) {
@@ -671,8 +685,8 @@ void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(call->call(), cops, &nops);
- // TODO(vjpai): Use ops->cq_tag once this case supports callbacks
- auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
+ auto result =
+ grpc_call_start_batch(call->call(), cops, nops, ops->cq_tag(), nullptr);
if (result != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result);
grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR,