aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/health/default_health_check_service.h
blob: edad594936263d5586f87433703008efe7b01315 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
/*
 *
 * Copyright 2016 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
#define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H

#include <atomic>
#include <mutex>
#include <set>

#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/impl/codegen/async_generic_service.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/support/byte_buffer.h>

#include "src/core/lib/gprpp/thd.h"

namespace grpc {

// Default implementation of HealthCheckServiceInterface. Server will create and
// own it.
class DefaultHealthCheckService final : public HealthCheckServiceInterface {
 public:
  enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };

  // The service impl to register with the server.
  class HealthCheckServiceImpl : public Service {
   public:
    // Base class for call handlers.
    class CallHandler {
     public:
      virtual ~CallHandler() = default;
      virtual void SendHealth(std::shared_ptr<CallHandler> self,
                              ServingStatus status) = 0;
    };

    HealthCheckServiceImpl(DefaultHealthCheckService* database,
                           std::unique_ptr<ServerCompletionQueue> cq);

    ~HealthCheckServiceImpl();

    void StartServingThread();

   private:
    // A tag that can be called with a bool argument. It's tailored for
    // CallHandler's use. Before being used, it should be constructed with a
    // method of CallHandler and a shared pointer to the handler. The
    // shared pointer will be moved to the invoked function and the function
    // can only be invoked once. That makes ref counting of the handler easier,
    // because the shared pointer is not bound to the function and can be gone
    // once the invoked function returns (if not used any more).
    class CallableTag {
     public:
      using HandlerFunction =
          std::function<void(std::shared_ptr<CallHandler>, bool)>;

      CallableTag() {}

      CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
          : handler_function_(std::move(func)), handler_(std::move(handler)) {
        GPR_ASSERT(handler_function_ != nullptr);
        GPR_ASSERT(handler_ != nullptr);
      }

      // Runs the tag. This should be called only once. The handler is no
      // longer owned by this tag after this method is invoked.
      void Run(bool ok) {
        GPR_ASSERT(handler_function_ != nullptr);
        GPR_ASSERT(handler_ != nullptr);
        handler_function_(std::move(handler_), ok);
      }

      // Releases and returns the shared pointer to the handler.
      std::shared_ptr<CallHandler> ReleaseHandler() {
        return std::move(handler_);
      }

     private:
      HandlerFunction handler_function_ = nullptr;
      std::shared_ptr<CallHandler> handler_;
    };

    // Call handler for Check method.
    // Each handler takes care of one call. It contains per-call data and it
    // will access the members of the parent class (i.e.,
    // DefaultHealthCheckService) for per-service health data.
    class CheckCallHandler : public CallHandler {
     public:
      // Instantiates a CheckCallHandler and requests the next health check
      // call. The handler object will manage its own lifetime, so no action is
      // needed from the caller any more regarding that object.
      static void CreateAndStart(ServerCompletionQueue* cq,
                                 DefaultHealthCheckService* database,
                                 HealthCheckServiceImpl* service);

      // This ctor is public because we want to use std::make_shared<> in
      // CreateAndStart(). This ctor shouldn't be used elsewhere.
      CheckCallHandler(ServerCompletionQueue* cq,
                       DefaultHealthCheckService* database,
                       HealthCheckServiceImpl* service);

      // Not used for Check.
      void SendHealth(std::shared_ptr<CallHandler> self,
                      ServingStatus status) override {}

     private:
      // Called when we receive a call.
      // Spawns a new handler so that we can keep servicing future calls.
      void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);

      // Called when Finish() is done.
      void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);

      // The members passed down from HealthCheckServiceImpl.
      ServerCompletionQueue* cq_;
      DefaultHealthCheckService* database_;
      HealthCheckServiceImpl* service_;

      ByteBuffer request_;
      GenericServerAsyncResponseWriter writer_;
      ServerContext ctx_;

      CallableTag next_;
    };

    // Call handler for Watch method.
    // Each handler takes care of one call. It contains per-call data and it
    // will access the members of the parent class (i.e.,
    // DefaultHealthCheckService) for per-service health data.
    class WatchCallHandler : public CallHandler {
     public:
      // Instantiates a WatchCallHandler and requests the next health check
      // call. The handler object will manage its own lifetime, so no action is
      // needed from the caller any more regarding that object.
      static void CreateAndStart(ServerCompletionQueue* cq,
                                 DefaultHealthCheckService* database,
                                 HealthCheckServiceImpl* service);

      // This ctor is public because we want to use std::make_shared<> in
      // CreateAndStart(). This ctor shouldn't be used elsewhere.
      WatchCallHandler(ServerCompletionQueue* cq,
                       DefaultHealthCheckService* database,
                       HealthCheckServiceImpl* service);

      void SendHealth(std::shared_ptr<CallHandler> self,
                      ServingStatus status) override;

     private:
      // Called when we receive a call.
      // Spawns a new handler so that we can keep servicing future calls.
      void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);

      // Requires holding mu_.
      void SendHealthLocked(std::shared_ptr<CallHandler> self,
                            ServingStatus status);

      // When sending a health result finishes.
      void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);

      // Called when Finish() is done.
      void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);

      // Called when AsyncNotifyWhenDone() notifies us.
      void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);

      void Shutdown(std::shared_ptr<CallHandler> self, const char* reason);

      // The members passed down from HealthCheckServiceImpl.
      ServerCompletionQueue* cq_;
      DefaultHealthCheckService* database_;
      HealthCheckServiceImpl* service_;

      ByteBuffer request_;
      grpc::string service_name_;
      GenericServerAsyncWriter stream_;
      ServerContext ctx_;

      std::mutex mu_;
      bool send_in_flight_ = false;               // Guarded by mu_.
      ServingStatus pending_status_ = NOT_FOUND;  // Guarded by mu_.

      // The state of the RPC progress.
      enum CallState {
        WAITING_FOR_CALL,
        CALL_RECEIVED,
        SEND_MESSAGE_PENDING,
        FINISH_CALLED
      } call_state_;

      bool shutdown_ = false;
      bool done_notified_ = false;
      bool is_cancelled_ = false;
      CallableTag next_;
      CallableTag on_done_notified_;
      CallableTag on_finish_done_;
    };

    // Handles the incoming requests and drives the completion queue in a loop.
    static void Serve(void* arg);

    // Returns true on success.
    static bool DecodeRequest(const ByteBuffer& request,
                              grpc::string* service_name);
    static bool EncodeResponse(ServingStatus status, ByteBuffer* response);

    // Needed to appease Windows compilers, which don't seem to allow
    // nested classes to access protected members in the parent's
    // superclass.
    using Service::RequestAsyncServerStreaming;
    using Service::RequestAsyncUnary;

    DefaultHealthCheckService* database_;
    std::unique_ptr<ServerCompletionQueue> cq_;
    internal::RpcServiceMethod* check_method_;
    internal::RpcServiceMethod* watch_method_;

    // To synchronize the operations related to shutdown state of cq_, so that
    // we don't enqueue new tags into cq_ after it is already shut down.
    std::mutex cq_shutdown_mu_;
    std::atomic_bool shutdown_{false};
    std::unique_ptr<::grpc_core::Thread> thread_;
  };

  DefaultHealthCheckService();

  void SetServingStatus(const grpc::string& service_name,
                        bool serving) override;
  void SetServingStatus(bool serving) override;

  ServingStatus GetServingStatus(const grpc::string& service_name) const;

  HealthCheckServiceImpl* GetHealthCheckService(
      std::unique_ptr<ServerCompletionQueue> cq);

 private:
  // Stores the current serving status of a service and any call
  // handlers registered for updates when the service's status changes.
  class ServiceData {
   public:
    void SetServingStatus(ServingStatus status);
    ServingStatus GetServingStatus() const { return status_; }
    void AddCallHandler(
        std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
    void RemoveCallHandler(
        std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
    bool Unused() const {
      return call_handlers_.empty() && status_ == NOT_FOUND;
    }

   private:
    ServingStatus status_ = NOT_FOUND;
    std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
        call_handlers_;
  };

  void RegisterCallHandler(
      const grpc::string& service_name,
      std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);

  void UnregisterCallHandler(
      const grpc::string& service_name,
      std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);

  mutable std::mutex mu_;
  std::map<grpc::string, ServiceData> services_map_;  // Guarded by mu_.
  std::unique_ptr<HealthCheckServiceImpl> impl_;
};

}  // namespace grpc

#endif  // GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H