aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/health/default_health_check_service.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/health/default_health_check_service.h')
-rw-r--r--src/cpp/server/health/default_health_check_service.h242
1 files changed, 234 insertions, 8 deletions
diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h
index a1ce5aa64e..edad594936 100644
--- a/src/cpp/server/health/default_health_check_service.h
+++ b/src/cpp/server/health/default_health_check_service.h
@@ -19,42 +19,268 @@
#ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
#define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
+#include <atomic>
#include <mutex>
+#include <set>
+#include <grpc/support/log.h>
+#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
+#include <grpcpp/impl/codegen/async_generic_service.h>
+#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/support/byte_buffer.h>
+#include "src/core/lib/gprpp/thd.h"
+
namespace grpc {
// Default implementation of HealthCheckServiceInterface. Server will create and
// own it.
class DefaultHealthCheckService final : public HealthCheckServiceInterface {
public:
+ enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
+
// The service impl to register with the server.
class HealthCheckServiceImpl : public Service {
public:
- explicit HealthCheckServiceImpl(DefaultHealthCheckService* service);
+ // Base class for call handlers.
+ class CallHandler {
+ public:
+ virtual ~CallHandler() = default;
+ virtual void SendHealth(std::shared_ptr<CallHandler> self,
+ ServingStatus status) = 0;
+ };
- Status Check(ServerContext* context, const ByteBuffer* request,
- ByteBuffer* response);
+ HealthCheckServiceImpl(DefaultHealthCheckService* database,
+ std::unique_ptr<ServerCompletionQueue> cq);
+
+ ~HealthCheckServiceImpl();
+
+ void StartServingThread();
private:
- const DefaultHealthCheckService* const service_;
- internal::RpcServiceMethod* method_;
+ // A tag that can be called with a bool argument. It's tailored for
+ // CallHandler's use. Before being used, it should be constructed with a
+ // method of CallHandler and a shared pointer to the handler. The
+ // shared pointer will be moved to the invoked function and the function
+ // can only be invoked once. That makes ref counting of the handler easier,
+ // because the shared pointer is not bound to the function and can be gone
+ // once the invoked function returns (if not used any more).
+ class CallableTag {
+ public:
+ using HandlerFunction =
+ std::function<void(std::shared_ptr<CallHandler>, bool)>;
+
+ CallableTag() {}
+
+ CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
+ : handler_function_(std::move(func)), handler_(std::move(handler)) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ }
+
+ // Runs the tag. This should be called only once. The handler is no
+ // longer owned by this tag after this method is invoked.
+ void Run(bool ok) {
+ GPR_ASSERT(handler_function_ != nullptr);
+ GPR_ASSERT(handler_ != nullptr);
+ handler_function_(std::move(handler_), ok);
+ }
+
+ // Releases and returns the shared pointer to the handler.
+ std::shared_ptr<CallHandler> ReleaseHandler() {
+ return std::move(handler_);
+ }
+
+ private:
+ HandlerFunction handler_function_ = nullptr;
+ std::shared_ptr<CallHandler> handler_;
+ };
+
+ // Call handler for Check method.
+ // Each handler takes care of one call. It contains per-call data and it
+ // will access the members of the parent class (i.e.,
+ // DefaultHealthCheckService) for per-service health data.
+ class CheckCallHandler : public CallHandler {
+ public:
+ // Instantiates a CheckCallHandler and requests the next health check
+ // call. The handler object will manage its own lifetime, so no action is
+ // needed from the caller any more regarding that object.
+ static void CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ // This ctor is public because we want to use std::make_shared<> in
+ // CreateAndStart(). This ctor shouldn't be used elsewhere.
+ CheckCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ // Not used for Check.
+ void SendHealth(std::shared_ptr<CallHandler> self,
+ ServingStatus status) override {}
+
+ private:
+ // Called when we receive a call.
+ // Spawns a new handler so that we can keep servicing future calls.
+ void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
+
+ // Called when Finish() is done.
+ void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
+
+ // The members passed down from HealthCheckServiceImpl.
+ ServerCompletionQueue* cq_;
+ DefaultHealthCheckService* database_;
+ HealthCheckServiceImpl* service_;
+
+ ByteBuffer request_;
+ GenericServerAsyncResponseWriter writer_;
+ ServerContext ctx_;
+
+ CallableTag next_;
+ };
+
+ // Call handler for Watch method.
+ // Each handler takes care of one call. It contains per-call data and it
+ // will access the members of the parent class (i.e.,
+ // DefaultHealthCheckService) for per-service health data.
+ class WatchCallHandler : public CallHandler {
+ public:
+ // Instantiates a WatchCallHandler and requests the next health check
+ // call. The handler object will manage its own lifetime, so no action is
+ // needed from the caller any more regarding that object.
+ static void CreateAndStart(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ // This ctor is public because we want to use std::make_shared<> in
+ // CreateAndStart(). This ctor shouldn't be used elsewhere.
+ WatchCallHandler(ServerCompletionQueue* cq,
+ DefaultHealthCheckService* database,
+ HealthCheckServiceImpl* service);
+
+ void SendHealth(std::shared_ptr<CallHandler> self,
+ ServingStatus status) override;
+
+ private:
+ // Called when we receive a call.
+ // Spawns a new handler so that we can keep servicing future calls.
+ void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
+
+ // Requires holding mu_.
+ void SendHealthLocked(std::shared_ptr<CallHandler> self,
+ ServingStatus status);
+
+ // When sending a health result finishes.
+ void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
+
+ // Called when Finish() is done.
+ void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
+
+ // Called when AsyncNotifyWhenDone() notifies us.
+ void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
+
+ void Shutdown(std::shared_ptr<CallHandler> self, const char* reason);
+
+ // The members passed down from HealthCheckServiceImpl.
+ ServerCompletionQueue* cq_;
+ DefaultHealthCheckService* database_;
+ HealthCheckServiceImpl* service_;
+
+ ByteBuffer request_;
+ grpc::string service_name_;
+ GenericServerAsyncWriter stream_;
+ ServerContext ctx_;
+
+ std::mutex mu_;
+ bool send_in_flight_ = false; // Guarded by mu_.
+ ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
+
+ // The state of the RPC progress.
+ enum CallState {
+ WAITING_FOR_CALL,
+ CALL_RECEIVED,
+ SEND_MESSAGE_PENDING,
+ FINISH_CALLED
+ } call_state_;
+
+ bool shutdown_ = false;
+ bool done_notified_ = false;
+ bool is_cancelled_ = false;
+ CallableTag next_;
+ CallableTag on_done_notified_;
+ CallableTag on_finish_done_;
+ };
+
+ // Handles the incoming requests and drives the completion queue in a loop.
+ static void Serve(void* arg);
+
+ // Returns true on success.
+ static bool DecodeRequest(const ByteBuffer& request,
+ grpc::string* service_name);
+ static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
+
+ // Needed to appease Windows compilers, which don't seem to allow
+ // nested classes to access protected members in the parent's
+ // superclass.
+ using Service::RequestAsyncServerStreaming;
+ using Service::RequestAsyncUnary;
+
+ DefaultHealthCheckService* database_;
+ std::unique_ptr<ServerCompletionQueue> cq_;
+ internal::RpcServiceMethod* check_method_;
+ internal::RpcServiceMethod* watch_method_;
+
+ // To synchronize the operations related to shutdown state of cq_, so that
+ // we don't enqueue new tags into cq_ after it is already shut down.
+ std::mutex cq_shutdown_mu_;
+ std::atomic_bool shutdown_{false};
+ std::unique_ptr<::grpc_core::Thread> thread_;
};
DefaultHealthCheckService();
+
void SetServingStatus(const grpc::string& service_name,
bool serving) override;
void SetServingStatus(bool serving) override;
- enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
+
ServingStatus GetServingStatus(const grpc::string& service_name) const;
- HealthCheckServiceImpl* GetHealthCheckService();
+
+ HealthCheckServiceImpl* GetHealthCheckService(
+ std::unique_ptr<ServerCompletionQueue> cq);
private:
+ // Stores the current serving status of a service and any call
+ // handlers registered for updates when the service's status changes.
+ class ServiceData {
+ public:
+ void SetServingStatus(ServingStatus status);
+ ServingStatus GetServingStatus() const { return status_; }
+ void AddCallHandler(
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+ void RemoveCallHandler(
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+ bool Unused() const {
+ return call_handlers_.empty() && status_ == NOT_FOUND;
+ }
+
+ private:
+ ServingStatus status_ = NOT_FOUND;
+ std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
+ call_handlers_;
+ };
+
+ void RegisterCallHandler(
+ const grpc::string& service_name,
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+
+ void UnregisterCallHandler(
+ const grpc::string& service_name,
+ std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+
mutable std::mutex mu_;
- std::map<grpc::string, bool> services_map_;
+ std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_.
std::unique_ptr<HealthCheckServiceImpl> impl_;
};