diff options
author | 2018-09-10 14:06:52 -0700 | |
---|---|---|
committer | 2018-09-10 14:06:52 -0700 | |
commit | 42d9becd91bf1fa51a9775e8141ddece74ea82a6 (patch) | |
tree | 64f1075f4246b53862f2b0a11d206e6f45ad3b52 /src/cpp/server/health/default_health_check_service.h | |
parent | 107d10ea73f77dc9bb498c9b91e1fcd0188dfb45 (diff) |
Revert "Second attempt: Implement Watch method in health check service."
Diffstat (limited to 'src/cpp/server/health/default_health_check_service.h')
-rw-r--r-- | src/cpp/server/health/default_health_check_service.h | 242 |
1 files changed, 8 insertions, 234 deletions
diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index edad594936..a1ce5aa64e 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -19,268 +19,42 @@ #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H -#include <atomic> #include <mutex> -#include <set> -#include <grpc/support/log.h> -#include <grpcpp/grpcpp.h> #include <grpcpp/health_check_service_interface.h> -#include <grpcpp/impl/codegen/async_generic_service.h> -#include <grpcpp/impl/codegen/async_unary_call.h> #include <grpcpp/impl/codegen/service_type.h> #include <grpcpp/support/byte_buffer.h> -#include "src/core/lib/gprpp/thd.h" - namespace grpc { // Default implementation of HealthCheckServiceInterface. Server will create and // own it. class DefaultHealthCheckService final : public HealthCheckServiceInterface { public: - enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; - // The service impl to register with the server. class HealthCheckServiceImpl : public Service { public: - // Base class for call handlers. - class CallHandler { - public: - virtual ~CallHandler() = default; - virtual void SendHealth(std::shared_ptr<CallHandler> self, - ServingStatus status) = 0; - }; + explicit HealthCheckServiceImpl(DefaultHealthCheckService* service); - HealthCheckServiceImpl(DefaultHealthCheckService* database, - std::unique_ptr<ServerCompletionQueue> cq); - - ~HealthCheckServiceImpl(); - - void StartServingThread(); + Status Check(ServerContext* context, const ByteBuffer* request, + ByteBuffer* response); private: - // A tag that can be called with a bool argument. It's tailored for - // CallHandler's use. Before being used, it should be constructed with a - // method of CallHandler and a shared pointer to the handler. The - // shared pointer will be moved to the invoked function and the function - // can only be invoked once. That makes ref counting of the handler easier, - // because the shared pointer is not bound to the function and can be gone - // once the invoked function returns (if not used any more). - class CallableTag { - public: - using HandlerFunction = - std::function<void(std::shared_ptr<CallHandler>, bool)>; - - CallableTag() {} - - CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler) - : handler_function_(std::move(func)), handler_(std::move(handler)) { - GPR_ASSERT(handler_function_ != nullptr); - GPR_ASSERT(handler_ != nullptr); - } - - // Runs the tag. This should be called only once. The handler is no - // longer owned by this tag after this method is invoked. - void Run(bool ok) { - GPR_ASSERT(handler_function_ != nullptr); - GPR_ASSERT(handler_ != nullptr); - handler_function_(std::move(handler_), ok); - } - - // Releases and returns the shared pointer to the handler. - std::shared_ptr<CallHandler> ReleaseHandler() { - return std::move(handler_); - } - - private: - HandlerFunction handler_function_ = nullptr; - std::shared_ptr<CallHandler> handler_; - }; - - // Call handler for Check method. - // Each handler takes care of one call. It contains per-call data and it - // will access the members of the parent class (i.e., - // DefaultHealthCheckService) for per-service health data. - class CheckCallHandler : public CallHandler { - public: - // Instantiates a CheckCallHandler and requests the next health check - // call. The handler object will manage its own lifetime, so no action is - // needed from the caller any more regarding that object. - static void CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // This ctor is public because we want to use std::make_shared<> in - // CreateAndStart(). This ctor shouldn't be used elsewhere. - CheckCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // Not used for Check. - void SendHealth(std::shared_ptr<CallHandler> self, - ServingStatus status) override {} - - private: - // Called when we receive a call. - // Spawns a new handler so that we can keep servicing future calls. - void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); - - // Called when Finish() is done. - void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); - - // The members passed down from HealthCheckServiceImpl. - ServerCompletionQueue* cq_; - DefaultHealthCheckService* database_; - HealthCheckServiceImpl* service_; - - ByteBuffer request_; - GenericServerAsyncResponseWriter writer_; - ServerContext ctx_; - - CallableTag next_; - }; - - // Call handler for Watch method. - // Each handler takes care of one call. It contains per-call data and it - // will access the members of the parent class (i.e., - // DefaultHealthCheckService) for per-service health data. - class WatchCallHandler : public CallHandler { - public: - // Instantiates a WatchCallHandler and requests the next health check - // call. The handler object will manage its own lifetime, so no action is - // needed from the caller any more regarding that object. - static void CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // This ctor is public because we want to use std::make_shared<> in - // CreateAndStart(). This ctor shouldn't be used elsewhere. - WatchCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - void SendHealth(std::shared_ptr<CallHandler> self, - ServingStatus status) override; - - private: - // Called when we receive a call. - // Spawns a new handler so that we can keep servicing future calls. - void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); - - // Requires holding mu_. - void SendHealthLocked(std::shared_ptr<CallHandler> self, - ServingStatus status); - - // When sending a health result finishes. - void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok); - - // Called when Finish() is done. - void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); - - // Called when AsyncNotifyWhenDone() notifies us. - void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok); - - void Shutdown(std::shared_ptr<CallHandler> self, const char* reason); - - // The members passed down from HealthCheckServiceImpl. - ServerCompletionQueue* cq_; - DefaultHealthCheckService* database_; - HealthCheckServiceImpl* service_; - - ByteBuffer request_; - grpc::string service_name_; - GenericServerAsyncWriter stream_; - ServerContext ctx_; - - std::mutex mu_; - bool send_in_flight_ = false; // Guarded by mu_. - ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. - - // The state of the RPC progress. - enum CallState { - WAITING_FOR_CALL, - CALL_RECEIVED, - SEND_MESSAGE_PENDING, - FINISH_CALLED - } call_state_; - - bool shutdown_ = false; - bool done_notified_ = false; - bool is_cancelled_ = false; - CallableTag next_; - CallableTag on_done_notified_; - CallableTag on_finish_done_; - }; - - // Handles the incoming requests and drives the completion queue in a loop. - static void Serve(void* arg); - - // Returns true on success. - static bool DecodeRequest(const ByteBuffer& request, - grpc::string* service_name); - static bool EncodeResponse(ServingStatus status, ByteBuffer* response); - - // Needed to appease Windows compilers, which don't seem to allow - // nested classes to access protected members in the parent's - // superclass. - using Service::RequestAsyncServerStreaming; - using Service::RequestAsyncUnary; - - DefaultHealthCheckService* database_; - std::unique_ptr<ServerCompletionQueue> cq_; - internal::RpcServiceMethod* check_method_; - internal::RpcServiceMethod* watch_method_; - - // To synchronize the operations related to shutdown state of cq_, so that - // we don't enqueue new tags into cq_ after it is already shut down. - std::mutex cq_shutdown_mu_; - std::atomic_bool shutdown_{false}; - std::unique_ptr<::grpc_core::Thread> thread_; + const DefaultHealthCheckService* const service_; + internal::RpcServiceMethod* method_; }; DefaultHealthCheckService(); - void SetServingStatus(const grpc::string& service_name, bool serving) override; void SetServingStatus(bool serving) override; - + enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; ServingStatus GetServingStatus(const grpc::string& service_name) const; - - HealthCheckServiceImpl* GetHealthCheckService( - std::unique_ptr<ServerCompletionQueue> cq); + HealthCheckServiceImpl* GetHealthCheckService(); private: - // Stores the current serving status of a service and any call - // handlers registered for updates when the service's status changes. - class ServiceData { - public: - void SetServingStatus(ServingStatus status); - ServingStatus GetServingStatus() const { return status_; } - void AddCallHandler( - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); - void RemoveCallHandler( - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); - bool Unused() const { - return call_handlers_.empty() && status_ == NOT_FOUND; - } - - private: - ServingStatus status_ = NOT_FOUND; - std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>> - call_handlers_; - }; - - void RegisterCallHandler( - const grpc::string& service_name, - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); - - void UnregisterCallHandler( - const grpc::string& service_name, - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); - mutable std::mutex mu_; - std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_. + std::map<grpc::string, bool> services_map_; std::unique_ptr<HealthCheckServiceImpl> impl_; }; |