/* * * Copyright 2016 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H #include #include #include #include #include #include #include #include #include #include #include "src/core/lib/gprpp/thd.h" namespace grpc { // Default implementation of HealthCheckServiceInterface. Server will create and // own it. class DefaultHealthCheckService final : public HealthCheckServiceInterface { public: enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; // The service impl to register with the server. class HealthCheckServiceImpl : public Service { public: // Base class for call handlers. class CallHandler { public: virtual ~CallHandler() = default; virtual void SendHealth(std::shared_ptr self, ServingStatus status) = 0; }; HealthCheckServiceImpl(DefaultHealthCheckService* database, std::unique_ptr cq); ~HealthCheckServiceImpl(); void StartServingThread(); private: // A tag that can be called with a bool argument. It's tailored for // CallHandler's use. Before being used, it should be constructed with a // method of CallHandler and a shared pointer to the handler. The // shared pointer will be moved to the invoked function and the function // can only be invoked once. That makes ref counting of the handler easier, // because the shared pointer is not bound to the function and can be gone // once the invoked function returns (if not used any more). class CallableTag { public: using HandlerFunction = std::function, bool)>; CallableTag() {} CallableTag(HandlerFunction func, std::shared_ptr handler) : handler_function_(std::move(func)), handler_(std::move(handler)) { GPR_ASSERT(handler_function_ != nullptr); GPR_ASSERT(handler_ != nullptr); } // Runs the tag. This should be called only once. The handler is no // longer owned by this tag after this method is invoked. void Run(bool ok) { GPR_ASSERT(handler_function_ != nullptr); GPR_ASSERT(handler_ != nullptr); handler_function_(std::move(handler_), ok); } // Releases and returns the shared pointer to the handler. std::shared_ptr ReleaseHandler() { return std::move(handler_); } private: HandlerFunction handler_function_ = nullptr; std::shared_ptr handler_; }; // Call handler for Check method. // Each handler takes care of one call. It contains per-call data and it // will access the members of the parent class (i.e., // DefaultHealthCheckService) for per-service health data. class CheckCallHandler : public CallHandler { public: // Instantiates a CheckCallHandler and requests the next health check // call. The handler object will manage its own lifetime, so no action is // needed from the caller any more regarding that object. static void CreateAndStart(ServerCompletionQueue* cq, DefaultHealthCheckService* database, HealthCheckServiceImpl* service); // This ctor is public because we want to use std::make_shared<> in // CreateAndStart(). This ctor shouldn't be used elsewhere. CheckCallHandler(ServerCompletionQueue* cq, DefaultHealthCheckService* database, HealthCheckServiceImpl* service); // Not used for Check. void SendHealth(std::shared_ptr self, ServingStatus status) override {} private: // Called when we receive a call. // Spawns a new handler so that we can keep servicing future calls. void OnCallReceived(std::shared_ptr self, bool ok); // Called when Finish() is done. void OnFinishDone(std::shared_ptr self, bool ok); // The members passed down from HealthCheckServiceImpl. ServerCompletionQueue* cq_; DefaultHealthCheckService* database_; HealthCheckServiceImpl* service_; ByteBuffer request_; GenericServerAsyncResponseWriter writer_; ServerContext ctx_; CallableTag next_; }; // Call handler for Watch method. // Each handler takes care of one call. It contains per-call data and it // will access the members of the parent class (i.e., // DefaultHealthCheckService) for per-service health data. class WatchCallHandler : public CallHandler { public: // Instantiates a WatchCallHandler and requests the next health check // call. The handler object will manage its own lifetime, so no action is // needed from the caller any more regarding that object. static void CreateAndStart(ServerCompletionQueue* cq, DefaultHealthCheckService* database, HealthCheckServiceImpl* service); // This ctor is public because we want to use std::make_shared<> in // CreateAndStart(). This ctor shouldn't be used elsewhere. WatchCallHandler(ServerCompletionQueue* cq, DefaultHealthCheckService* database, HealthCheckServiceImpl* service); void SendHealth(std::shared_ptr self, ServingStatus status) override; private: // Called when we receive a call. // Spawns a new handler so that we can keep servicing future calls. void OnCallReceived(std::shared_ptr self, bool ok); // Requires holding mu_. void SendHealthLocked(std::shared_ptr self, ServingStatus status); // When sending a health result finishes. void OnSendHealthDone(std::shared_ptr self, bool ok); // Called when Finish() is done. void OnFinishDone(std::shared_ptr self, bool ok); // Called when AsyncNotifyWhenDone() notifies us. void OnDoneNotified(std::shared_ptr self, bool ok); void Shutdown(std::shared_ptr self, const char* reason); // The members passed down from HealthCheckServiceImpl. ServerCompletionQueue* cq_; DefaultHealthCheckService* database_; HealthCheckServiceImpl* service_; ByteBuffer request_; grpc::string service_name_; GenericServerAsyncWriter stream_; ServerContext ctx_; std::mutex mu_; bool send_in_flight_ = false; // Guarded by mu_. ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. // The state of the RPC progress. enum CallState { WAITING_FOR_CALL, CALL_RECEIVED, SEND_MESSAGE_PENDING, FINISH_CALLED } call_state_; bool shutdown_ = false; bool done_notified_ = false; bool is_cancelled_ = false; CallableTag next_; CallableTag on_done_notified_; CallableTag on_finish_done_; }; // Handles the incoming requests and drives the completion queue in a loop. static void Serve(void* arg); // Returns true on success. static bool DecodeRequest(const ByteBuffer& request, grpc::string* service_name); static bool EncodeResponse(ServingStatus status, ByteBuffer* response); // Needed to appease Windows compilers, which don't seem to allow // nested classes to access protected members in the parent's // superclass. using Service::RequestAsyncServerStreaming; using Service::RequestAsyncUnary; DefaultHealthCheckService* database_; std::unique_ptr cq_; internal::RpcServiceMethod* check_method_; internal::RpcServiceMethod* watch_method_; // To synchronize the operations related to shutdown state of cq_, so that // we don't enqueue new tags into cq_ after it is already shut down. std::mutex cq_shutdown_mu_; std::atomic_bool shutdown_{false}; std::unique_ptr<::grpc_core::Thread> thread_; }; DefaultHealthCheckService(); void SetServingStatus(const grpc::string& service_name, bool serving) override; void SetServingStatus(bool serving) override; ServingStatus GetServingStatus(const grpc::string& service_name) const; HealthCheckServiceImpl* GetHealthCheckService( std::unique_ptr cq); private: // Stores the current serving status of a service and any call // handlers registered for updates when the service's status changes. class ServiceData { public: void SetServingStatus(ServingStatus status); ServingStatus GetServingStatus() const { return status_; } void AddCallHandler( std::shared_ptr handler); void RemoveCallHandler( std::shared_ptr handler); bool Unused() const { return call_handlers_.empty() && status_ == NOT_FOUND; } private: ServingStatus status_ = NOT_FOUND; std::set> call_handlers_; }; void RegisterCallHandler( const grpc::string& service_name, std::shared_ptr handler); void UnregisterCallHandler( const grpc::string& service_name, std::shared_ptr handler); mutable std::mutex mu_; std::map services_map_; // Guarded by mu_. std::unique_ptr impl_; }; } // namespace grpc #endif // GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H