diff options
Diffstat (limited to 'src/cpp/server/health/default_health_check_service.h')
-rw-r--r-- | src/cpp/server/health/default_health_check_service.h | 234 |
1 files changed, 226 insertions, 8 deletions
diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index a1ce5aa64e..3bab76b6b0 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -19,42 +19,260 @@ #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H +#include <atomic> #include <mutex> +#include <set> +#include <grpc/support/log.h> +#include <grpcpp/grpcpp.h> #include <grpcpp/health_check_service_interface.h> +#include <grpcpp/impl/codegen/async_generic_service.h> +#include <grpcpp/impl/codegen/async_unary_call.h> #include <grpcpp/impl/codegen/service_type.h> #include <grpcpp/support/byte_buffer.h> +#include "src/core/lib/gprpp/thd.h" + namespace grpc { // Default implementation of HealthCheckServiceInterface. Server will create and // own it. class DefaultHealthCheckService final : public HealthCheckServiceInterface { public: + enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; + // The service impl to register with the server. class HealthCheckServiceImpl : public Service { public: - explicit HealthCheckServiceImpl(DefaultHealthCheckService* service); + // Base class for call handlers. + class CallHandler { + public: + virtual ~CallHandler() = default; + virtual void SendHealth(std::shared_ptr<CallHandler> self, + ServingStatus status) = 0; + }; - Status Check(ServerContext* context, const ByteBuffer* request, - ByteBuffer* response); + HealthCheckServiceImpl(DefaultHealthCheckService* database, + std::unique_ptr<ServerCompletionQueue> cq); + + ~HealthCheckServiceImpl(); + + void StartServingThread(); private: - const DefaultHealthCheckService* const service_; - internal::RpcServiceMethod* method_; + // A tag that can be called with a bool argument. It's tailored for + // CallHandler's use. Before being used, it should be constructed with a + // method of CallHandler and a shared pointer to the handler. The + // shared pointer will be moved to the invoked function and the function + // can only be invoked once. That makes ref counting of the handler easier, + // because the shared pointer is not bound to the function and can be gone + // once the invoked function returns (if not used any more). + class CallableTag { + public: + using HandlerFunction = + std::function<void(std::shared_ptr<CallHandler>, bool)>; + + CallableTag() {} + + CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler) + : handler_function_(std::move(func)), handler_(std::move(handler)) { + GPR_ASSERT(handler_function_ != nullptr); + GPR_ASSERT(handler_ != nullptr); + } + + // Runs the tag. This should be called only once. The handler is no + // longer owned by this tag after this method is invoked. + void Run(bool ok) { + GPR_ASSERT(handler_function_ != nullptr); + GPR_ASSERT(handler_ != nullptr); + handler_function_(std::move(handler_), ok); + } + + // Releases and returns the shared pointer to the handler. + std::shared_ptr<CallHandler> ReleaseHandler() { + return std::move(handler_); + } + + private: + HandlerFunction handler_function_ = nullptr; + std::shared_ptr<CallHandler> handler_; + }; + + // Call handler for Check method. + // Each handler takes care of one call. It contains per-call data and it + // will access the members of the parent class (i.e., + // DefaultHealthCheckService) for per-service health data. + class CheckCallHandler : public CallHandler { + public: + // Instantiates a CheckCallHandler and requests the next health check + // call. The handler object will manage its own lifetime, so no action is + // needed from the caller any more regarding that object. + static void CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // This ctor is public because we want to use std::make_shared<> in + // CreateAndStart(). This ctor shouldn't be used elsewhere. + CheckCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // Not used for Check. + void SendHealth(std::shared_ptr<CallHandler> self, + ServingStatus status) override {} + + private: + // Called when we receive a call. + // Spawns a new handler so that we can keep servicing future calls. + void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); + + // Called when Finish() is done. + void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); + + // The members passed down from HealthCheckServiceImpl. + ServerCompletionQueue* cq_; + DefaultHealthCheckService* database_; + HealthCheckServiceImpl* service_; + + ByteBuffer request_; + GenericServerAsyncResponseWriter writer_; + ServerContext ctx_; + + CallableTag next_; + }; + + // Call handler for Watch method. + // Each handler takes care of one call. It contains per-call data and it + // will access the members of the parent class (i.e., + // DefaultHealthCheckService) for per-service health data. + class WatchCallHandler : public CallHandler { + public: + // Instantiates a WatchCallHandler and requests the next health check + // call. The handler object will manage its own lifetime, so no action is + // needed from the caller any more regarding that object. + static void CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // This ctor is public because we want to use std::make_shared<> in + // CreateAndStart(). This ctor shouldn't be used elsewhere. + WatchCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + void SendHealth(std::shared_ptr<CallHandler> self, + ServingStatus status) override; + + private: + // Called when we receive a call. + // Spawns a new handler so that we can keep servicing future calls. + void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); + + // Requires holding send_mu_. + void SendHealthLocked(std::shared_ptr<CallHandler> self, + ServingStatus status); + + // When sending a health result finishes. + void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok); + + void SendFinish(std::shared_ptr<CallHandler> self, const Status& status); + + // Requires holding service_->cq_shutdown_mu_. + void SendFinishLocked(std::shared_ptr<CallHandler> self, + const Status& status); + + // Called when Finish() is done. + void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); + + // Called when AsyncNotifyWhenDone() notifies us. + void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok); + + // The members passed down from HealthCheckServiceImpl. + ServerCompletionQueue* cq_; + DefaultHealthCheckService* database_; + HealthCheckServiceImpl* service_; + + ByteBuffer request_; + grpc::string service_name_; + GenericServerAsyncWriter stream_; + ServerContext ctx_; + + std::mutex send_mu_; + bool send_in_flight_ = false; // Guarded by mu_. + ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. + + bool finish_called_ = false; + CallableTag next_; + CallableTag on_done_notified_; + CallableTag on_finish_done_; + }; + + // Handles the incoming requests and drives the completion queue in a loop. + static void Serve(void* arg); + + // Returns true on success. + static bool DecodeRequest(const ByteBuffer& request, + grpc::string* service_name); + static bool EncodeResponse(ServingStatus status, ByteBuffer* response); + + // Needed to appease Windows compilers, which don't seem to allow + // nested classes to access protected members in the parent's + // superclass. + using Service::RequestAsyncServerStreaming; + using Service::RequestAsyncUnary; + + DefaultHealthCheckService* database_; + std::unique_ptr<ServerCompletionQueue> cq_; + + // To synchronize the operations related to shutdown state of cq_, so that + // we don't enqueue new tags into cq_ after it is already shut down. + std::mutex cq_shutdown_mu_; + std::atomic_bool shutdown_{false}; + std::unique_ptr<::grpc_core::Thread> thread_; }; DefaultHealthCheckService(); + void SetServingStatus(const grpc::string& service_name, bool serving) override; void SetServingStatus(bool serving) override; - enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; + ServingStatus GetServingStatus(const grpc::string& service_name) const; - HealthCheckServiceImpl* GetHealthCheckService(); + + HealthCheckServiceImpl* GetHealthCheckService( + std::unique_ptr<ServerCompletionQueue> cq); private: + // Stores the current serving status of a service and any call + // handlers registered for updates when the service's status changes. + class ServiceData { + public: + void SetServingStatus(ServingStatus status); + ServingStatus GetServingStatus() const { return status_; } + void AddCallHandler( + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + void RemoveCallHandler( + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + bool Unused() const { + return call_handlers_.empty() && status_ == NOT_FOUND; + } + + private: + ServingStatus status_ = NOT_FOUND; + std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>> + call_handlers_; + }; + + void RegisterCallHandler( + const grpc::string& service_name, + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + + void UnregisterCallHandler( + const grpc::string& service_name, + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + mutable std::mutex mu_; - std::map<grpc::string, bool> services_map_; + std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_. std::unique_ptr<HealthCheckServiceImpl> impl_; }; |