aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpcpp/impl/codegen/completion_queue.h1
-rw-r--r--src/cpp/server/health/default_health_check_service.cc484
-rw-r--r--src/cpp/server/health/default_health_check_service.h242
-rw-r--r--src/cpp/server/health/health.pb.c1
-rw-r--r--src/cpp/server/health/health.pb.h7
-rw-r--r--src/cpp/server/server_cc.cc27
-rw-r--r--src/proto/grpc/health/v1/health.proto20
-rw-r--r--test/cpp/end2end/health_service_end2end_test.cc76
-rwxr-xr-xtools/distrib/check_nanopb_output.sh18
9 files changed, 105 insertions, 771 deletions
diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h
index 6c8428ebde..3f7d4fb765 100644
--- a/include/grpcpp/impl/codegen/completion_queue.h
+++ b/include/grpcpp/impl/codegen/completion_queue.h
@@ -384,7 +384,6 @@ class ServerCompletionQueue : public CompletionQueue {
grpc_cq_polling_type polling_type_;
friend class ServerBuilder;
- friend class Server;
};
} // namespace grpc
diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc
index 670da63a4a..bfda67d086 100644
--- a/src/cpp/server/health/default_health_check_service.cc
+++ b/src/cpp/server/health/default_health_check_service.cc
@@ -30,162 +30,29 @@
#include "src/cpp/server/health/health.pb.h"
namespace grpc {
-
-//
-// DefaultHealthCheckService
-//
-
-DefaultHealthCheckService::DefaultHealthCheckService() {
- services_map_[""].SetServingStatus(SERVING);
-}
-
-void DefaultHealthCheckService::SetServingStatus(
- const grpc::string& service_name, bool serving) {
- std::unique_lock<std::mutex> lock(mu_);
- services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
-}
-
-void DefaultHealthCheckService::SetServingStatus(bool serving) {
- const ServingStatus status = serving ? SERVING : NOT_SERVING;
- std::unique_lock<std::mutex> lock(mu_);
- for (auto& p : services_map_) {
- ServiceData& service_data = p.second;
- service_data.SetServingStatus(status);
- }
-}
-
-DefaultHealthCheckService::ServingStatus
-DefaultHealthCheckService::GetServingStatus(
- const grpc::string& service_name) const {
- std::lock_guard<std::mutex> lock(mu_);
- auto it = services_map_.find(service_name);
- if (it == services_map_.end()) {
- return NOT_FOUND;
- }
- const ServiceData& service_data = it->second;
- return service_data.GetServingStatus();
-}
-
-void DefaultHealthCheckService::RegisterCallHandler(
- const grpc::string& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
- std::unique_lock<std::mutex> lock(mu_);
- ServiceData& service_data = services_map_[service_name];
- service_data.AddCallHandler(handler /* copies ref */);
- handler->SendHealth(std::move(handler), service_data.GetServingStatus());
-}
-
-void DefaultHealthCheckService::UnregisterCallHandler(
- const grpc::string& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
- std::unique_lock<std::mutex> lock(mu_);
- auto it = services_map_.find(service_name);
- if (it == services_map_.end()) return;
- ServiceData& service_data = it->second;
- service_data.RemoveCallHandler(std::move(handler));
- if (service_data.Unused()) {
- services_map_.erase(it);
- }
-}
-
-DefaultHealthCheckService::HealthCheckServiceImpl*
-DefaultHealthCheckService::GetHealthCheckService(
- std::unique_ptr<ServerCompletionQueue> cq) {
- GPR_ASSERT(impl_ == nullptr);
- impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
- return impl_.get();
-}
-
-//
-// DefaultHealthCheckService::ServiceData
-//
-
-void DefaultHealthCheckService::ServiceData::SetServingStatus(
- ServingStatus status) {
- status_ = status;
- for (auto& call_handler : call_handlers_) {
- call_handler->SendHealth(call_handler /* copies ref */, status);
- }
-}
-
-void DefaultHealthCheckService::ServiceData::AddCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
- call_handlers_.insert(std::move(handler));
-}
-
-void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
- call_handlers_.erase(std::move(handler));
-}
-
-//
-// DefaultHealthCheckService::HealthCheckServiceImpl
-//
-
namespace {
const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
-const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
} // namespace
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
- DefaultHealthCheckService* database,
- std::unique_ptr<ServerCompletionQueue> cq)
- : database_(database), cq_(std::move(cq)) {
- // Add Check() method.
- check_method_ = new internal::RpcServiceMethod(
- kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr);
- AddMethod(check_method_);
- // Add Watch() method.
- watch_method_ = new internal::RpcServiceMethod(
- kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr);
- AddMethod(watch_method_);
- // Create serving thread.
- thread_ = std::unique_ptr<::grpc_core::Thread>(
- new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
-}
-
-DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
- // We will reach here after the server starts shutting down.
- shutdown_ = true;
- {
- std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
- cq_->Shutdown();
- }
- thread_->Join();
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
- thread_->Start();
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
- HealthCheckServiceImpl* service =
- reinterpret_cast<HealthCheckServiceImpl*>(arg);
- // TODO(juanlishen): This is a workaround to wait for the cq to be ready.
- // Need to figure out why cq is not ready after service starts.
- gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_seconds(1, GPR_TIMESPAN)));
- CheckCallHandler::CreateAndStart(service->cq_.get(), service->database_,
- service);
- WatchCallHandler::CreateAndStart(service->cq_.get(), service->database_,
- service);
- void* tag;
- bool ok;
- while (true) {
- if (!service->cq_->Next(&tag, &ok)) {
- // The completion queue is shutting down.
- GPR_ASSERT(service->shutdown_);
- break;
- }
- auto* next_step = static_cast<CallableTag*>(tag);
- next_step->Run(ok);
- }
-}
-
-bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
- const ByteBuffer& request, grpc::string* service_name) {
+ DefaultHealthCheckService* service)
+ : service_(service), method_(nullptr) {
+ internal::MethodHandler* handler =
+ new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer,
+ ByteBuffer>(
+ std::mem_fn(&HealthCheckServiceImpl::Check), this);
+ method_ = new internal::RpcServiceMethod(
+ kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler);
+ AddMethod(method_);
+}
+
+Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
+ ServerContext* context, const ByteBuffer* request, ByteBuffer* response) {
+ // Decode request.
std::vector<Slice> slices;
- if (!request.Dump(&slices).ok()) return false;
+ if (!request->Dump(&slices).ok()) {
+ return Status(StatusCode::INVALID_ARGUMENT, "");
+ }
uint8_t* request_bytes = nullptr;
bool request_bytes_owned = false;
size_t request_size = 0;
@@ -197,13 +64,14 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
request_size = slices[0].size();
} else {
request_bytes_owned = true;
- request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
+ request_bytes = static_cast<uint8_t*>(gpr_malloc(request->Length()));
uint8_t* copy_to = request_bytes;
for (size_t i = 0; i < slices.size(); i++) {
memcpy(copy_to, slices[i].begin(), slices[i].size());
copy_to += slices[i].size();
}
}
+
if (request_bytes != nullptr) {
pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
bool decode_status = pb_decode(
@@ -211,22 +79,26 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
if (request_bytes_owned) {
gpr_free(request_bytes);
}
- if (!decode_status) return false;
+ if (!decode_status) {
+ return Status(StatusCode::INVALID_ARGUMENT, "");
+ }
}
- *service_name = request_struct.has_service ? request_struct.service : "";
- return true;
-}
-bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
- ServingStatus status, ByteBuffer* response) {
+ // Check status from the associated default health checking service.
+ DefaultHealthCheckService::ServingStatus serving_status =
+ service_->GetServingStatus(
+ request_struct.has_service ? request_struct.service : "");
+ if (serving_status == DefaultHealthCheckService::NOT_FOUND) {
+ return Status(StatusCode::NOT_FOUND, "");
+ }
+
+ // Encode response
grpc_health_v1_HealthCheckResponse response_struct;
response_struct.has_status = true;
response_struct.status =
- status == NOT_FOUND
- ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
- : status == SERVING
- ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
- : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
+ serving_status == DefaultHealthCheckService::SERVING
+ ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
+ : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
pb_ostream_t ostream;
memset(&ostream, 0, sizeof(ostream));
pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields,
@@ -236,282 +108,48 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
GRPC_SLICE_LENGTH(response_slice));
bool encode_status = pb_encode(
&ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct);
- if (!encode_status) return false;
+ if (!encode_status) {
+ return Status(StatusCode::INTERNAL, "Failed to encode response.");
+ }
Slice encoded_response(response_slice, Slice::STEAL_REF);
ByteBuffer response_buffer(&encoded_response, 1);
response->Swap(&response_buffer);
- return true;
-}
-
-//
-// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
-//
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service) {
- std::shared_ptr<CallHandler> self =
- std::make_shared<CheckCallHandler>(cq, database, service);
- CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
- {
- std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
- if (service->shutdown_) return;
- // Request a Check() call.
- handler->next_ =
- CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
- &handler->writer_, cq, cq, &handler->next_);
- }
-}
-
-DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- CheckCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service)
- : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
- if (!ok) {
- // The value of ok being false means that the server is shutting down.
- return;
- }
- // Spawn a new handler instance to serve the next new client. Every handler
- // instance will deallocate itself when it's done.
- CreateAndStart(cq_, database_, service_);
- // Process request.
- gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
- this);
- grpc::string service_name;
- grpc::Status status = Status::OK;
- ByteBuffer response;
- if (!service_->DecodeRequest(request_, &service_name)) {
- status = Status(INVALID_ARGUMENT, "");
- } else {
- ServingStatus serving_status = database_->GetServingStatus(service_name);
- if (serving_status == NOT_FOUND) {
- status = Status(StatusCode::NOT_FOUND, "service name unknown");
- } else if (!service_->EncodeResponse(serving_status, &response)) {
- status = Status(INTERNAL, "");
- }
- }
- // Send response.
- {
- std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
- if (!service_->shutdown_) {
- next_ =
- CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- if (status.ok()) {
- writer_.Finish(response, status, &next_);
- } else {
- writer_.FinishWithError(status, &next_);
- }
- }
- }
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
- if (ok) {
- gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
- service_, this);
- }
-}
-
-//
-// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
-//
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service) {
- std::shared_ptr<CallHandler> self =
- std::make_shared<WatchCallHandler>(cq, database, service);
- WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
- {
- std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
- if (service->shutdown_) return;
- // Request AsyncNotifyWhenDone().
- handler->on_done_notified_ =
- CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
- std::placeholders::_1, std::placeholders::_2),
- self /* copies ref */);
- handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
- // Request a Watch() call.
- handler->next_ =
- CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
- &handler->stream_, cq, cq,
- &handler->next_);
- }
+ return Status::OK;
}
-DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- WatchCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service)
- : cq_(cq),
- database_(database),
- service_(service),
- stream_(&ctx_),
- call_state_(WAITING_FOR_CALL) {}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
- if (ok) {
- call_state_ = CALL_RECEIVED;
- } else {
- // AsyncNotifyWhenDone() needs to be called before the call starts, but the
- // tag will not pop out if the call never starts (
- // https://github.com/grpc/grpc/issues/10136). So we need to manually
- // release the ownership of the handler in this case.
- GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
- }
- if (!ok || shutdown_) {
- // The value of ok being false means that the server is shutting down.
- Shutdown(std::move(self), "OnCallReceived");
- return;
- }
- // Spawn a new handler instance to serve the next new client. Every handler
- // instance will deallocate itself when it's done.
- CreateAndStart(cq_, database_, service_);
- // Parse request.
- if (!service_->DecodeRequest(request_, &service_name_)) {
- on_finish_done_ =
- CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- stream_.Finish(Status(INVALID_ARGUMENT, ""), &on_finish_done_);
- call_state_ = FINISH_CALLED;
- return;
- }
- // Register the call for updates to the service.
- gpr_log(GPR_DEBUG,
- "[HCS %p] Health check watch started for service \"%s\" "
- "(handler: %p)",
- service_, service_name_.c_str(), this);
- database_->RegisterCallHandler(service_name_, std::move(self));
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
- std::unique_lock<std::mutex> lock(mu_);
- // If there's already a send in flight, cache the new status, and
- // we'll start a new send for it when the one in flight completes.
- if (send_in_flight_) {
- pending_status_ = status;
- return;
- }
- // Start a send.
- SendHealthLocked(std::move(self), status);
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
- std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
- if (service_->shutdown_) {
- cq_lock.release()->unlock();
- Shutdown(std::move(self), "SendHealthLocked");
- return;
- }
- send_in_flight_ = true;
- call_state_ = SEND_MESSAGE_PENDING;
- // Construct response.
- ByteBuffer response;
- if (!service_->EncodeResponse(status, &response)) {
- on_finish_done_ =
- CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- stream_.Finish(Status(INTERNAL, ""), &on_finish_done_);
- return;
- }
- next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- stream_.Write(response, &next_);
+DefaultHealthCheckService::DefaultHealthCheckService() {
+ services_map_.emplace("", true);
}
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
- if (!ok || shutdown_) {
- Shutdown(std::move(self), "OnSendHealthDone");
- return;
- }
- call_state_ = CALL_RECEIVED;
- {
- std::unique_lock<std::mutex> lock(mu_);
- send_in_flight_ = false;
- // If we got a new status since we started the last send, start a
- // new send for it.
- if (pending_status_ != NOT_FOUND) {
- auto status = pending_status_;
- pending_status_ = NOT_FOUND;
- SendHealthLocked(std::move(self), status);
- }
- }
+void DefaultHealthCheckService::SetServingStatus(
+ const grpc::string& service_name, bool serving) {
+ std::lock_guard<std::mutex> lock(mu_);
+ services_map_[service_name] = serving;
}
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
- GPR_ASSERT(ok);
- done_notified_ = true;
- if (ctx_.IsCancelled()) {
- is_cancelled_ = true;
+void DefaultHealthCheckService::SetServingStatus(bool serving) {
+ std::lock_guard<std::mutex> lock(mu_);
+ for (auto iter = services_map_.begin(); iter != services_map_.end(); ++iter) {
+ iter->second = serving;
}
- gpr_log(GPR_DEBUG,
- "[HCS %p] Healt check call is notified done (handler: %p, "
- "is_cancelled: %d).",
- service_, this, static_cast<int>(is_cancelled_));
- Shutdown(std::move(self), "OnDoneNotified");
}
-// TODO(roth): This method currently assumes that there will be only one
-// thread polling the cq and invoking the corresponding callbacks. If
-// that changes, we will need to add synchronization here.
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- Shutdown(std::shared_ptr<CallHandler> self, const char* reason) {
- if (!shutdown_) {
- gpr_log(GPR_DEBUG,
- "[HCS %p] Shutting down the handler (service_name: \"%s\", "
- "handler: %p, reason: %s).",
- service_, service_name_.c_str(), this, reason);
- shutdown_ = true;
- }
- // OnCallReceived() may be called after OnDoneNotified(), so we need to
- // try to Finish() every time we are in Shutdown().
- if (call_state_ >= CALL_RECEIVED && call_state_ < FINISH_CALLED) {
- std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
- if (!service_->shutdown_) {
- on_finish_done_ =
- CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- // TODO(juanlishen): Maybe add a message proto for the client to
- // explicitly cancel the stream so that we can return OK status in such
- // cases.
- stream_.Finish(Status::CANCELLED, &on_finish_done_);
- call_state_ = FINISH_CALLED;
- }
+DefaultHealthCheckService::ServingStatus
+DefaultHealthCheckService::GetServingStatus(
+ const grpc::string& service_name) const {
+ std::lock_guard<std::mutex> lock(mu_);
+ const auto& iter = services_map_.find(service_name);
+ if (iter == services_map_.end()) {
+ return NOT_FOUND;
}
+ return iter->second ? SERVING : NOT_SERVING;
}
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
- if (ok) {
- gpr_log(GPR_DEBUG,
- "[HCS %p] Health check call finished (service_name: \"%s\", "
- "handler: %p).",
- service_, service_name_.c_str(), this);
- }
+DefaultHealthCheckService::HealthCheckServiceImpl*
+DefaultHealthCheckService::GetHealthCheckService() {
+ GPR_ASSERT(impl_ == nullptr);
+ impl_.reset(new HealthCheckServiceImpl(this));
+ return impl_.get();
}
} // namespace grpc
diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h
index edad594936..a1ce5aa64e 100644
--- a/src/cpp/server/health/default_health_check_service.h
+++ b/src/cpp/server/health/default_health_check_service.h
@@ -19,268 +19,42 @@
#ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
#define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
-#include <atomic>
#include <mutex>
-#include <set>
-#include <grpc/support/log.h>
-#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
-#include <grpcpp/impl/codegen/async_generic_service.h>
-#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/support/byte_buffer.h>
-#include "src/core/lib/gprpp/thd.h"
-
namespace grpc {
// Default implementation of HealthCheckServiceInterface. Server will create and
// own it.
class DefaultHealthCheckService final : public HealthCheckServiceInterface {
public:
- enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
-
// The service impl to register with the server.
class HealthCheckServiceImpl : public Service {
public:
- // Base class for call handlers.
- class CallHandler {
- public:
- virtual ~CallHandler() = default;
- virtual void SendHealth(std::shared_ptr<CallHandler> self,
- ServingStatus status) = 0;
- };
+ explicit HealthCheckServiceImpl(DefaultHealthCheckService* service);
- HealthCheckServiceImpl(DefaultHealthCheckService* database,
- std::unique_ptr<ServerCompletionQueue> cq);
-
- ~HealthCheckServiceImpl();
-
- void StartServingThread();
+ Status Check(ServerContext* context, const ByteBuffer* request,
+ ByteBuffer* response);
private:
- // A tag that can be called with a bool argument. It's tailored for
- // CallHandler's use. Before being used, it should be constructed with a
- // method of CallHandler and a shared pointer to the handler. The
- // shared pointer will be moved to the invoked function and the function
- // can only be invoked once. That makes ref counting of the handler easier,
- // because the shared pointer is not bound to the function and can be gone
- // once the invoked function returns (if not used any more).
- class CallableTag {
- public:
- using HandlerFunction =
- std::function<void(std::shared_ptr<CallHandler>, bool)>;
-
- CallableTag() {}
-
- CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
- : handler_function_(std::move(func)), handler_(std::move(handler)) {
- GPR_ASSERT(handler_function_ != nullptr);
- GPR_ASSERT(handler_ != nullptr);
- }
-
- // Runs the tag. This should be called only once. The handler is no
- // longer owned by this tag after this method is invoked.
- void Run(bool ok) {
- GPR_ASSERT(handler_function_ != nullptr);
- GPR_ASSERT(handler_ != nullptr);
- handler_function_(std::move(handler_), ok);
- }
-
- // Releases and returns the shared pointer to the handler.
- std::shared_ptr<CallHandler> ReleaseHandler() {
- return std::move(handler_);
- }
-
- private:
- HandlerFunction handler_function_ = nullptr;
- std::shared_ptr<CallHandler> handler_;
- };
-
- // Call handler for Check method.
- // Each handler takes care of one call. It contains per-call data and it
- // will access the members of the parent class (i.e.,
- // DefaultHealthCheckService) for per-service health data.
- class CheckCallHandler : public CallHandler {
- public:
- // Instantiates a CheckCallHandler and requests the next health check
- // call. The handler object will manage its own lifetime, so no action is
- // needed from the caller any more regarding that object.
- static void CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- // This ctor is public because we want to use std::make_shared<> in
- // CreateAndStart(). This ctor shouldn't be used elsewhere.
- CheckCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- // Not used for Check.
- void SendHealth(std::shared_ptr<CallHandler> self,
- ServingStatus status) override {}
-
- private:
- // Called when we receive a call.
- // Spawns a new handler so that we can keep servicing future calls.
- void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
-
- // Called when Finish() is done.
- void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
-
- // The members passed down from HealthCheckServiceImpl.
- ServerCompletionQueue* cq_;
- DefaultHealthCheckService* database_;
- HealthCheckServiceImpl* service_;
-
- ByteBuffer request_;
- GenericServerAsyncResponseWriter writer_;
- ServerContext ctx_;
-
- CallableTag next_;
- };
-
- // Call handler for Watch method.
- // Each handler takes care of one call. It contains per-call data and it
- // will access the members of the parent class (i.e.,
- // DefaultHealthCheckService) for per-service health data.
- class WatchCallHandler : public CallHandler {
- public:
- // Instantiates a WatchCallHandler and requests the next health check
- // call. The handler object will manage its own lifetime, so no action is
- // needed from the caller any more regarding that object.
- static void CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- // This ctor is public because we want to use std::make_shared<> in
- // CreateAndStart(). This ctor shouldn't be used elsewhere.
- WatchCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- void SendHealth(std::shared_ptr<CallHandler> self,
- ServingStatus status) override;
-
- private:
- // Called when we receive a call.
- // Spawns a new handler so that we can keep servicing future calls.
- void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
-
- // Requires holding mu_.
- void SendHealthLocked(std::shared_ptr<CallHandler> self,
- ServingStatus status);
-
- // When sending a health result finishes.
- void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
-
- // Called when Finish() is done.
- void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
-
- // Called when AsyncNotifyWhenDone() notifies us.
- void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
-
- void Shutdown(std::shared_ptr<CallHandler> self, const char* reason);
-
- // The members passed down from HealthCheckServiceImpl.
- ServerCompletionQueue* cq_;
- DefaultHealthCheckService* database_;
- HealthCheckServiceImpl* service_;
-
- ByteBuffer request_;
- grpc::string service_name_;
- GenericServerAsyncWriter stream_;
- ServerContext ctx_;
-
- std::mutex mu_;
- bool send_in_flight_ = false; // Guarded by mu_.
- ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
-
- // The state of the RPC progress.
- enum CallState {
- WAITING_FOR_CALL,
- CALL_RECEIVED,
- SEND_MESSAGE_PENDING,
- FINISH_CALLED
- } call_state_;
-
- bool shutdown_ = false;
- bool done_notified_ = false;
- bool is_cancelled_ = false;
- CallableTag next_;
- CallableTag on_done_notified_;
- CallableTag on_finish_done_;
- };
-
- // Handles the incoming requests and drives the completion queue in a loop.
- static void Serve(void* arg);
-
- // Returns true on success.
- static bool DecodeRequest(const ByteBuffer& request,
- grpc::string* service_name);
- static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
-
- // Needed to appease Windows compilers, which don't seem to allow
- // nested classes to access protected members in the parent's
- // superclass.
- using Service::RequestAsyncServerStreaming;
- using Service::RequestAsyncUnary;
-
- DefaultHealthCheckService* database_;
- std::unique_ptr<ServerCompletionQueue> cq_;
- internal::RpcServiceMethod* check_method_;
- internal::RpcServiceMethod* watch_method_;
-
- // To synchronize the operations related to shutdown state of cq_, so that
- // we don't enqueue new tags into cq_ after it is already shut down.
- std::mutex cq_shutdown_mu_;
- std::atomic_bool shutdown_{false};
- std::unique_ptr<::grpc_core::Thread> thread_;
+ const DefaultHealthCheckService* const service_;
+ internal::RpcServiceMethod* method_;
};
DefaultHealthCheckService();
-
void SetServingStatus(const grpc::string& service_name,
bool serving) override;
void SetServingStatus(bool serving) override;
-
+ enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
ServingStatus GetServingStatus(const grpc::string& service_name) const;
-
- HealthCheckServiceImpl* GetHealthCheckService(
- std::unique_ptr<ServerCompletionQueue> cq);
+ HealthCheckServiceImpl* GetHealthCheckService();
private:
- // Stores the current serving status of a service and any call
- // handlers registered for updates when the service's status changes.
- class ServiceData {
- public:
- void SetServingStatus(ServingStatus status);
- ServingStatus GetServingStatus() const { return status_; }
- void AddCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
- void RemoveCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
- bool Unused() const {
- return call_handlers_.empty() && status_ == NOT_FOUND;
- }
-
- private:
- ServingStatus status_ = NOT_FOUND;
- std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
- call_handlers_;
- };
-
- void RegisterCallHandler(
- const grpc::string& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
-
- void UnregisterCallHandler(
- const grpc::string& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
-
mutable std::mutex mu_;
- std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_.
+ std::map<grpc::string, bool> services_map_;
std::unique_ptr<HealthCheckServiceImpl> impl_;
};
diff --git a/src/cpp/server/health/health.pb.c b/src/cpp/server/health/health.pb.c
index 5c214c7160..09bd98a3d9 100644
--- a/src/cpp/server/health/health.pb.c
+++ b/src/cpp/server/health/health.pb.c
@@ -2,6 +2,7 @@
/* Generated by nanopb-0.3.7-dev */
#include "src/cpp/server/health/health.pb.h"
+
/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
diff --git a/src/cpp/server/health/health.pb.h b/src/cpp/server/health/health.pb.h
index 9d54ccd618..29e1f3bacb 100644
--- a/src/cpp/server/health/health.pb.h
+++ b/src/cpp/server/health/health.pb.h
@@ -17,12 +17,11 @@ extern "C" {
typedef enum _grpc_health_v1_HealthCheckResponse_ServingStatus {
grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN = 0,
grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING = 1,
- grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2,
- grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN = 3
+ grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2
} grpc_health_v1_HealthCheckResponse_ServingStatus;
#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MIN grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN
-#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
-#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN+1))
+#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING
+#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING+1))
/* Struct definitions */
typedef struct _grpc_health_v1_HealthCheckRequest {
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 3cadf65c80..36c709eb45 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -559,20 +559,16 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
// Only create default health check service when user did not provide an
// explicit one.
- ServerCompletionQueue* health_check_cq = nullptr;
- DefaultHealthCheckService::HealthCheckServiceImpl*
- default_health_check_service_impl = nullptr;
if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
DefaultHealthCheckServiceEnabled()) {
- auto* default_hc_service = new DefaultHealthCheckService;
- health_check_service_.reset(default_hc_service);
- health_check_cq = new ServerCompletionQueue(GRPC_CQ_DEFAULT_POLLING);
- grpc_server_register_completion_queue(server_, health_check_cq->cq(),
- nullptr);
- default_health_check_service_impl =
- default_hc_service->GetHealthCheckService(
- std::unique_ptr<ServerCompletionQueue>(health_check_cq));
- RegisterService(nullptr, default_health_check_service_impl);
+ if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) {
+ gpr_log(GPR_INFO,
+ "Default health check service disabled at async-only server.");
+ } else {
+ auto* default_hc_service = new DefaultHealthCheckService;
+ health_check_service_.reset(default_hc_service);
+ RegisterService(nullptr, default_hc_service->GetHealthCheckService());
+ }
}
grpc_server_start(server_);
@@ -587,9 +583,6 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
- if (health_check_cq != nullptr) {
- new UnimplementedAsyncRequest(this, health_check_cq);
- }
}
// If this server has any support for synchronous methods (has any sync
@@ -602,10 +595,6 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start();
}
-
- if (default_health_check_service_impl != nullptr) {
- default_health_check_service_impl->StartServingThread();
- }
}
void Server::ShutdownInternal(gpr_timespec deadline) {
diff --git a/src/proto/grpc/health/v1/health.proto b/src/proto/grpc/health/v1/health.proto
index 38843ff1e7..4b4677b8a4 100644
--- a/src/proto/grpc/health/v1/health.proto
+++ b/src/proto/grpc/health/v1/health.proto
@@ -34,30 +34,10 @@ message HealthCheckResponse {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
- SERVICE_UNKNOWN = 3; // Used only by the Watch method.
}
ServingStatus status = 1;
}
service Health {
- // If the requested service is unknown, the call will fail with status
- // NOT_FOUND.
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
-
- // Performs a watch for the serving status of the requested service.
- // The server will immediately send back a message indicating the current
- // serving status. It will then subsequently send a new message whenever
- // the service's serving status changes.
- //
- // If the requested service is unknown when the call is received, the
- // server will send a message setting the serving status to
- // SERVICE_UNKNOWN but will *not* terminate the call. If at some
- // future point, the serving status of the service becomes known, the
- // server will send a new message with the service's serving status.
- //
- // If the call terminates with status UNIMPLEMENTED, then clients
- // should assume this method is not supported and should not retry the
- // call. If the call terminates with any other status (including OK),
- // clients should retry the call with appropriate exponential backoff.
- rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
diff --git a/test/cpp/end2end/health_service_end2end_test.cc b/test/cpp/end2end/health_service_end2end_test.cc
index fca65dfc13..1c48b9d151 100644
--- a/test/cpp/end2end/health_service_end2end_test.cc
+++ b/test/cpp/end2end/health_service_end2end_test.cc
@@ -64,29 +64,6 @@ class HealthCheckServiceImpl : public ::grpc::health::v1::Health::Service {
return Status::OK;
}
- Status Watch(ServerContext* context, const HealthCheckRequest* request,
- ::grpc::ServerWriter<HealthCheckResponse>* writer) override {
- auto last_state = HealthCheckResponse::UNKNOWN;
- while (!context->IsCancelled()) {
- {
- std::lock_guard<std::mutex> lock(mu_);
- HealthCheckResponse response;
- auto iter = status_map_.find(request->service());
- if (iter == status_map_.end()) {
- response.set_status(response.SERVICE_UNKNOWN);
- } else {
- response.set_status(iter->second);
- }
- if (response.status() != last_state) {
- writer->Write(response, ::grpc::WriteOptions());
- }
- }
- gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_millis(1000, GPR_TIMESPAN)));
- }
- return Status::OK;
- }
-
void SetStatus(const grpc::string& service_name,
HealthCheckResponse::ServingStatus status) {
std::lock_guard<std::mutex> lock(mu_);
@@ -129,6 +106,14 @@ class CustomHealthCheckService : public HealthCheckServiceInterface {
HealthCheckServiceImpl* impl_; // not owned
};
+void LoopCompletionQueue(ServerCompletionQueue* cq) {
+ void* tag;
+ bool ok;
+ while (cq->Next(&tag, &ok)) {
+ abort(); // Nothing should come out of the cq.
+ }
+}
+
class HealthServiceEnd2endTest : public ::testing::Test {
protected:
HealthServiceEnd2endTest() {}
@@ -233,33 +218,6 @@ class HealthServiceEnd2endTest : public ::testing::Test {
Status(StatusCode::NOT_FOUND, ""));
}
- void VerifyHealthCheckServiceStreaming() {
- const grpc::string kServiceName("service_name");
- HealthCheckServiceInterface* service = server_->GetHealthCheckService();
- // Start Watch for service.
- ClientContext context;
- HealthCheckRequest request;
- request.set_service(kServiceName);
- std::unique_ptr<::grpc::ClientReaderInterface<HealthCheckResponse>> reader =
- hc_stub_->Watch(&context, request);
- // Initial response will be SERVICE_UNKNOWN.
- HealthCheckResponse response;
- EXPECT_TRUE(reader->Read(&response));
- EXPECT_EQ(response.SERVICE_UNKNOWN, response.status());
- response.Clear();
- // Now set service to NOT_SERVING and make sure we get an update.
- service->SetServingStatus(kServiceName, false);
- EXPECT_TRUE(reader->Read(&response));
- EXPECT_EQ(response.NOT_SERVING, response.status());
- response.Clear();
- // Now set service to SERVING and make sure we get another update.
- service->SetServingStatus(kServiceName, true);
- EXPECT_TRUE(reader->Read(&response));
- EXPECT_EQ(response.SERVING, response.status());
- // Finish call.
- context.TryCancel();
- }
-
TestServiceImpl echo_test_service_;
HealthCheckServiceImpl health_check_service_impl_;
std::unique_ptr<Health::Stub> hc_stub_;
@@ -287,7 +245,6 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthService) {
EXPECT_TRUE(DefaultHealthCheckServiceEnabled());
SetUpServer(true, false, false, nullptr);
VerifyHealthCheckService();
- VerifyHealthCheckServiceStreaming();
// The default service has a size limit of the service name.
const grpc::string kTooLongServiceName(201, 'x');
@@ -295,6 +252,22 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthService) {
Status(StatusCode::INVALID_ARGUMENT, ""));
}
+// The server has no sync service.
+TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceAsyncOnly) {
+ EnableDefaultHealthCheckService(true);
+ EXPECT_TRUE(DefaultHealthCheckServiceEnabled());
+ SetUpServer(false, true, false, nullptr);
+ cq_thread_ = std::thread(LoopCompletionQueue, cq_.get());
+
+ HealthCheckServiceInterface* default_service =
+ server_->GetHealthCheckService();
+ EXPECT_TRUE(default_service == nullptr);
+
+ ResetStubs();
+
+ SendHealthCheckRpc("", Status(StatusCode::UNIMPLEMENTED, ""));
+}
+
// Provide an empty service to disable the default service.
TEST_F(HealthServiceEnd2endTest, ExplicitlyDisableViaOverride) {
EnableDefaultHealthCheckService(true);
@@ -323,7 +296,6 @@ TEST_F(HealthServiceEnd2endTest, ExplicitlyOverride) {
ResetStubs();
VerifyHealthCheckService();
- VerifyHealthCheckServiceStreaming();
}
} // namespace
diff --git a/tools/distrib/check_nanopb_output.sh b/tools/distrib/check_nanopb_output.sh
index 1c2ef9b768..6b98619c32 100755
--- a/tools/distrib/check_nanopb_output.sh
+++ b/tools/distrib/check_nanopb_output.sh
@@ -16,7 +16,6 @@
set -ex
readonly NANOPB_ALTS_TMP_OUTPUT="$(mktemp -d)"
-readonly NANOPB_HEALTH_TMP_OUTPUT="$(mktemp -d)"
readonly NANOPB_TMP_OUTPUT="$(mktemp -d)"
readonly PROTOBUF_INSTALL_PREFIX="$(mktemp -d)"
@@ -69,23 +68,6 @@ if ! diff -r "$NANOPB_TMP_OUTPUT" src/core/ext/filters/client_channel/lb_policy/
fi
#
-# checks for health.proto
-#
-readonly HEALTH_GRPC_OUTPUT_PATH='src/cpp/server/health'
-# nanopb-compile the proto to a temp location
-./tools/codegen/core/gen_nano_proto.sh \
- src/proto/grpc/health/v1/health.proto \
- "$NANOPB_HEALTH_TMP_OUTPUT" \
- "$HEALTH_GRPC_OUTPUT_PATH"
-# compare outputs to checked compiled code
-for NANOPB_OUTPUT_FILE in $NANOPB_HEALTH_TMP_OUTPUT/*.pb.*; do
- if ! diff "$NANOPB_OUTPUT_FILE" "src/cpp/server/health/$(basename $NANOPB_OUTPUT_FILE)"; then
- echo "Outputs differ: $NANOPB_HEALTH_TMP_OUTPUT vs $HEALTH_GRPC_OUTPUT_PATH"
- exit 2
- fi
-done
-
-#
# Checks for handshaker.proto and transport_security_common.proto
#
readonly HANDSHAKER_GRPC_OUTPUT_PATH='src/core/tsi/alts/handshaker'