aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/server/health/default_health_check_service.cc169
-rw-r--r--src/cpp/server/health/default_health_check_service.h85
-rw-r--r--src/cpp/server/health/health.pb.c24
-rw-r--r--src/cpp/server/health/health.pb.h72
-rw-r--r--src/cpp/server/health/health_check_service.cc49
-rw-r--r--src/cpp/server/health/health_check_service_server_builder_option.cc50
-rw-r--r--src/cpp/server/server_cc.cc160
7 files changed, 606 insertions, 3 deletions
diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc
new file mode 100644
index 0000000000..9743bd5775
--- /dev/null
+++ b/src/cpp/server/health/default_health_check_service.cc
@@ -0,0 +1,169 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <memory>
+#include <mutex>
+
+#include <grpc++/impl/codegen/method_handler_impl.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/cpp/server/health/default_health_check_service.h"
+#include "src/cpp/server/health/health.pb.h"
+#include "third_party/nanopb/pb_decode.h"
+#include "third_party/nanopb/pb_encode.h"
+
+namespace grpc {
+namespace {
+const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
+} // namespace
+
+DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
+ DefaultHealthCheckService* service, bool sync)
+ : service_(service), method_(nullptr), sync_(sync) {
+ MethodHandler* handler = nullptr;
+ if (sync_) {
+ handler =
+ new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>(
+ std::mem_fn(&HealthCheckServiceImpl::Check), this);
+ }
+ method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC,
+ handler);
+ AddMethod(method_);
+}
+
+Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
+ ServerContext* context, const ByteBuffer* request, ByteBuffer* response) {
+ // Decode request.
+ std::vector<Slice> slices;
+ request->Dump(&slices);
+ uint8_t* request_bytes = nullptr;
+ bool request_bytes_owned = false;
+ size_t request_size = 0;
+ grpc_health_v1_HealthCheckRequest request_struct;
+ if (slices.empty()) {
+ request_struct.has_service = false;
+ } else if (slices.size() == 1) {
+ request_bytes = const_cast<uint8_t*>(slices[0].begin());
+ request_size = slices[0].size();
+ } else {
+ request_bytes_owned = true;
+ request_bytes = static_cast<uint8_t*>(gpr_malloc(request->Length()));
+ uint8_t* copy_to = request_bytes;
+ for (size_t i = 0; i < slices.size(); i++) {
+ memcpy(copy_to, slices[i].begin(), slices[i].size());
+ copy_to += slices[i].size();
+ }
+ }
+
+ if (request_bytes != nullptr) {
+ pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
+ bool decode_status = pb_decode(
+ &istream, grpc_health_v1_HealthCheckRequest_fields, &request_struct);
+ if (request_bytes_owned) {
+ gpr_free(request_bytes);
+ }
+ if (!decode_status) {
+ return Status(StatusCode::INVALID_ARGUMENT, "");
+ }
+ }
+
+ // Check status from the associated default health checking service.
+ DefaultHealthCheckService::ServingStatus serving_status =
+ service_->GetServingStatus(
+ request_struct.has_service ? request_struct.service : "");
+ if (serving_status == DefaultHealthCheckService::NOT_FOUND) {
+ return Status(StatusCode::NOT_FOUND, "");
+ }
+
+ // Encode response
+ grpc_health_v1_HealthCheckResponse response_struct;
+ response_struct.has_status = true;
+ response_struct.status =
+ serving_status == DefaultHealthCheckService::SERVING
+ ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
+ : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
+ pb_ostream_t ostream;
+ memset(&ostream, 0, sizeof(ostream));
+ pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields,
+ &response_struct);
+ grpc_slice response_slice = grpc_slice_malloc(ostream.bytes_written);
+ ostream = pb_ostream_from_buffer(GRPC_SLICE_START_PTR(response_slice),
+ GRPC_SLICE_LENGTH(response_slice));
+ bool encode_status = pb_encode(
+ &ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct);
+ if (!encode_status) {
+ return Status(StatusCode::INTERNAL, "Failed to encode response.");
+ }
+ Slice encoded_response(response_slice, Slice::STEAL_REF);
+ ByteBuffer response_buffer(&encoded_response, 1);
+ response->Swap(&response_buffer);
+ return Status::OK;
+}
+
+DefaultHealthCheckService::DefaultHealthCheckService() {
+ services_map_.emplace("", true);
+}
+
+void DefaultHealthCheckService::SetServingStatus(
+ const grpc::string& service_name, bool serving) {
+ std::lock_guard<std::mutex> lock(mu_);
+ services_map_[service_name] = serving;
+}
+
+void DefaultHealthCheckService::SetServingStatus(bool serving) {
+ std::lock_guard<std::mutex> lock(mu_);
+ for (auto iter = services_map_.begin(); iter != services_map_.end(); ++iter) {
+ iter->second = serving;
+ }
+}
+
+DefaultHealthCheckService::ServingStatus
+DefaultHealthCheckService::GetServingStatus(
+ const grpc::string& service_name) const {
+ std::lock_guard<std::mutex> lock(mu_);
+ const auto& iter = services_map_.find(service_name);
+ if (iter == services_map_.end()) {
+ return NOT_FOUND;
+ }
+ return iter->second ? SERVING : NOT_SERVING;
+}
+
+DefaultHealthCheckService::HealthCheckServiceImpl*
+DefaultHealthCheckService::GetHealthCheckService(bool sync) {
+ GPR_ASSERT(impl_ == nullptr);
+ impl_.reset(new HealthCheckServiceImpl(this, sync));
+ return impl_.get();
+}
+
+} // namespace grpc
diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h
new file mode 100644
index 0000000000..1ecb0a2ba9
--- /dev/null
+++ b/src/cpp/server/health/default_health_check_service.h
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
+#define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
+
+#include <mutex>
+
+#include <grpc++/health_check_service_interface.h>
+#include <grpc++/impl/codegen/service_type.h>
+#include <grpc++/support/byte_buffer.h>
+
+namespace grpc {
+
+// Default implementation of HealthCheckServiceInterface. Server will create and
+// own it.
+class DefaultHealthCheckService final : public HealthCheckServiceInterface {
+ public:
+ // The service impl to register with the server.
+ class HealthCheckServiceImpl : public Service {
+ public:
+ HealthCheckServiceImpl(DefaultHealthCheckService* service, bool sync);
+
+ Status Check(ServerContext* context, const ByteBuffer* request,
+ ByteBuffer* response);
+
+ bool sync() { return sync_; }
+
+ // This is only useful for the async mode. It should be called after
+ // RegisterService returns.
+ void* server_tag() const { return method_->server_tag(); }
+
+ private:
+ const DefaultHealthCheckService* const service_;
+ RpcServiceMethod* method_;
+ const bool sync_;
+ };
+
+ DefaultHealthCheckService();
+ void SetServingStatus(const grpc::string& service_name,
+ bool serving) override;
+ void SetServingStatus(bool serving) override;
+ enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
+ ServingStatus GetServingStatus(const grpc::string& service_name) const;
+ HealthCheckServiceImpl* GetHealthCheckService(bool sync);
+
+ private:
+ mutable std::mutex mu_;
+ std::map<grpc::string, bool> services_map_;
+ std::unique_ptr<HealthCheckServiceImpl> impl_;
+};
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
diff --git a/src/cpp/server/health/health.pb.c b/src/cpp/server/health/health.pb.c
new file mode 100644
index 0000000000..09bd98a3d9
--- /dev/null
+++ b/src/cpp/server/health/health.pb.c
@@ -0,0 +1,24 @@
+/* Automatically generated nanopb constant definitions */
+/* Generated by nanopb-0.3.7-dev */
+
+#include "src/cpp/server/health/health.pb.h"
+
+/* @@protoc_insertion_point(includes) */
+#if PB_PROTO_HEADER_VERSION != 30
+#error Regenerate this file with the current version of nanopb generator.
+#endif
+
+
+
+const pb_field_t grpc_health_v1_HealthCheckRequest_fields[2] = {
+ PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_health_v1_HealthCheckRequest, service, service, 0),
+ PB_LAST_FIELD
+};
+
+const pb_field_t grpc_health_v1_HealthCheckResponse_fields[2] = {
+ PB_FIELD( 1, UENUM , OPTIONAL, STATIC , FIRST, grpc_health_v1_HealthCheckResponse, status, status, 0),
+ PB_LAST_FIELD
+};
+
+
+/* @@protoc_insertion_point(eof) */
diff --git a/src/cpp/server/health/health.pb.h b/src/cpp/server/health/health.pb.h
new file mode 100644
index 0000000000..7051b3260a
--- /dev/null
+++ b/src/cpp/server/health/health.pb.h
@@ -0,0 +1,72 @@
+/* Automatically generated nanopb header */
+/* Generated by nanopb-0.3.7-dev */
+
+#ifndef PB_GRPC_HEALTH_V1_HEALTH_PB_H_INCLUDED
+#define PB_GRPC_HEALTH_V1_HEALTH_PB_H_INCLUDED
+#include "third_party/nanopb/pb.h"
+/* @@protoc_insertion_point(includes) */
+#if PB_PROTO_HEADER_VERSION != 30
+#error Regenerate this file with the current version of nanopb generator.
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* Enum definitions */
+typedef enum _grpc_health_v1_HealthCheckResponse_ServingStatus {
+ grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN = 0,
+ grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING = 1,
+ grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2
+} grpc_health_v1_HealthCheckResponse_ServingStatus;
+#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MIN grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN
+#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING
+#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING+1))
+
+/* Struct definitions */
+typedef struct _grpc_health_v1_HealthCheckRequest {
+ bool has_service;
+ char service[200];
+/* @@protoc_insertion_point(struct:grpc_health_v1_HealthCheckRequest) */
+} grpc_health_v1_HealthCheckRequest;
+
+typedef struct _grpc_health_v1_HealthCheckResponse {
+ bool has_status;
+ grpc_health_v1_HealthCheckResponse_ServingStatus status;
+/* @@protoc_insertion_point(struct:grpc_health_v1_HealthCheckResponse) */
+} grpc_health_v1_HealthCheckResponse;
+
+/* Default values for struct fields */
+
+/* Initializer values for message structs */
+#define grpc_health_v1_HealthCheckRequest_init_default {false, ""}
+#define grpc_health_v1_HealthCheckResponse_init_default {false, (grpc_health_v1_HealthCheckResponse_ServingStatus)0}
+#define grpc_health_v1_HealthCheckRequest_init_zero {false, ""}
+#define grpc_health_v1_HealthCheckResponse_init_zero {false, (grpc_health_v1_HealthCheckResponse_ServingStatus)0}
+
+/* Field tags (for use in manual encoding/decoding) */
+#define grpc_health_v1_HealthCheckRequest_service_tag 1
+#define grpc_health_v1_HealthCheckResponse_status_tag 1
+
+/* Struct field encoding specification for nanopb */
+extern const pb_field_t grpc_health_v1_HealthCheckRequest_fields[2];
+extern const pb_field_t grpc_health_v1_HealthCheckResponse_fields[2];
+
+/* Maximum encoded size of messages (where known) */
+#define grpc_health_v1_HealthCheckRequest_size 203
+#define grpc_health_v1_HealthCheckResponse_size 2
+
+/* Message IDs (where set with "msgid" option) */
+#ifdef PB_MSGID
+
+#define HEALTH_MESSAGES \
+
+
+#endif
+
+#ifdef __cplusplus
+} /* extern "C" */
+#endif
+/* @@protoc_insertion_point(eof) */
+
+#endif
diff --git a/src/cpp/server/health/health_check_service.cc b/src/cpp/server/health/health_check_service.cc
new file mode 100644
index 0000000000..cca68c5549
--- /dev/null
+++ b/src/cpp/server/health/health_check_service.cc
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/health_check_service_interface.h>
+
+namespace grpc {
+namespace {
+bool g_grpc_default_health_check_service_enabled = false;
+} // namesapce
+
+bool DefaultHealthCheckServiceEnabled() {
+ return g_grpc_default_health_check_service_enabled;
+}
+
+void EnableDefaultHealthCheckService(bool enable) {
+ g_grpc_default_health_check_service_enabled = enable;
+}
+
+} // namespace grpc
diff --git a/src/cpp/server/health/health_check_service_server_builder_option.cc b/src/cpp/server/health/health_check_service_server_builder_option.cc
new file mode 100644
index 0000000000..24264204b3
--- /dev/null
+++ b/src/cpp/server/health/health_check_service_server_builder_option.cc
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/ext/health_check_service_server_builder_option.h>
+
+namespace grpc {
+
+HealthCheckServiceServerBuilderOption::HealthCheckServiceServerBuilderOption(
+ std::unique_ptr<HealthCheckServiceInterface> hc)
+ : hc_(std::move(hc)) {}
+// Hand over hc_ to the server.
+void HealthCheckServiceServerBuilderOption::UpdateArguments(
+ ChannelArguments* args) {
+ args->SetPointer(kHealthCheckServiceInterfaceArg, hc_.release());
+}
+
+void HealthCheckServiceServerBuilderOption::UpdatePlugins(
+ std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) {}
+
+} // namespace grpc
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 817d85a81c..00aea485db 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -37,6 +37,7 @@
#include <grpc++/completion_queue.h>
#include <grpc++/generic/async_generic_service.h>
+#include <grpc++/impl/codegen/async_unary_call.h>
#include <grpc++/impl/codegen/completion_queue_tag.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/impl/method_handler_impl.h>
@@ -51,6 +52,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/profiling/timers.h"
+#include "src/cpp/server/health/default_health_check_service.h"
#include "src/cpp/thread_manager/thread_manager.h"
namespace grpc {
@@ -117,6 +119,79 @@ class Server::UnimplementedAsyncResponse final
UnimplementedAsyncRequest* const request_;
};
+// This is a dummy implementation of the interface so that
+// HealthCheckAsyncRequest can get Call from RegisteredAsyncRequest. It does not
+// do any reading or writing.
+class HealthCheckAsyncResponseWriter final
+ : public ServerAsyncStreamingInterface {
+ public:
+ HealthCheckAsyncResponseWriter() : call_(nullptr, nullptr, nullptr) {}
+ void SendInitialMetadata(void* tag) override {
+ abort(); // should not be called.
+ }
+ void BindCall(Call* call) override { call_ = *call; }
+ Call* call() { return &call_; }
+
+ private:
+ Call call_;
+};
+
+class Server::HealthCheckAsyncRequestContext {
+ protected:
+ ServerContext server_context_;
+ HealthCheckAsyncResponseWriter rpc_;
+};
+
+class Server::HealthCheckAsyncRequest final
+ : public HealthCheckAsyncRequestContext,
+ public RegisteredAsyncRequest {
+ public:
+ HealthCheckAsyncRequest(
+ DefaultHealthCheckService::HealthCheckServiceImpl* service,
+ Server* server, ServerCompletionQueue* cq)
+ : RegisteredAsyncRequest(server, &server_context_, &rpc_, cq, this,
+ false),
+ service_(service),
+ server_(server),
+ cq_(cq) {
+ IssueRequest(service->server_tag(), &payload_, cq);
+ }
+
+ bool FinalizeResult(void** tag, bool* status) override;
+ Call* call() { return rpc_.call(); }
+ ByteBuffer* response() { return &response_; }
+ Status* status() { return &status_; }
+ ServerContext* server_context() { return &server_context_; }
+
+ private:
+ DefaultHealthCheckService::HealthCheckServiceImpl* service_;
+ Server* const server_;
+ ServerCompletionQueue* const cq_;
+ grpc_byte_buffer* payload_;
+ ByteBuffer request_;
+ ByteBuffer response_;
+ Status status_;
+};
+
+typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus>
+ HealthCheckAsyncResponseOp;
+class Server::HealthCheckAsyncResponse final
+ : public HealthCheckAsyncResponseOp {
+ public:
+ HealthCheckAsyncResponse(HealthCheckAsyncRequest* request);
+ ~HealthCheckAsyncResponse() { delete request_; }
+
+ bool FinalizeResult(void** tag, bool* status) override {
+ HealthCheckAsyncResponseOp::FinalizeResult(tag, status);
+ delete this;
+ return false;
+ }
+
+ private:
+ HealthCheckAsyncRequest* const request_;
+};
+
class ShutdownTag : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) { return false; }
@@ -342,6 +417,7 @@ class Server::SyncRequestThreadManager : public ThreadManager {
int cq_timeout_msec_;
std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
std::unique_ptr<RpcServiceMethod> unknown_method_;
+ std::unique_ptr<RpcServiceMethod> health_check_;
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
};
@@ -358,7 +434,8 @@ Server::Server(
shutdown_notified_(false),
has_generic_service_(false),
server_(nullptr),
- server_initializer_(new ServerInitializer(this)) {
+ server_initializer_(new ServerInitializer(this)),
+ health_check_service_disabled_(false) {
g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
@@ -374,6 +451,19 @@ Server::Server(
grpc_channel_args channel_args;
args->SetChannelArgs(&channel_args);
+ for (size_t i = 0; i < channel_args.num_args; i++) {
+ if (0 ==
+ strcmp(channel_args.args[i].key, kHealthCheckServiceInterfaceArg)) {
+ if (channel_args.args[i].value.pointer.p == nullptr) {
+ health_check_service_disabled_ = true;
+ } else {
+ health_check_service_.reset(static_cast<HealthCheckServiceInterface*>(
+ channel_args.args[i].value.pointer.p));
+ }
+ break;
+ }
+ }
+
server_ = grpc_server_create(&channel_args, nullptr);
}
@@ -479,6 +569,19 @@ int Server::AddListeningPort(const grpc::string& addr,
bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
GPR_ASSERT(!started_);
started_ = true;
+
+ // Only create default health check service when user did not provide an
+ // explicit one.
+ DefaultHealthCheckService::HealthCheckServiceImpl* health_service = nullptr;
+ if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
+ DefaultHealthCheckServiceEnabled()) {
+ auto* default_hc_service = new DefaultHealthCheckService;
+ health_check_service_.reset(default_hc_service);
+ health_service =
+ default_hc_service->GetHealthCheckService(!sync_server_cqs_->empty());
+ RegisterService(nullptr, health_service);
+ }
+
grpc_server_start(server_);
if (!has_generic_service_) {
@@ -493,6 +596,14 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
}
}
+ if (health_service && !health_service->sync()) {
+ for (size_t i = 0; i < num_cqs; i++) {
+ if (cqs[i]->IsFrequentlyPolled()) {
+ new HealthCheckAsyncRequest(health_service, this, cqs[i]);
+ }
+ }
+ }
+
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start();
}
@@ -613,8 +724,10 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
- : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
+ bool delete_on_finalize)
+ : BaseAsyncRequest(server, context, stream, call_cq, tag,
+ delete_on_finalize) {}
void ServerInterface::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
@@ -671,6 +784,47 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
request_->stream()->call_.PerformOps(this);
}
+bool Server::HealthCheckAsyncRequest::FinalizeResult(void** tag, bool* status) {
+ bool serialization_status =
+ *status && payload_ &&
+ SerializationTraits<ByteBuffer>::Deserialize(
+ payload_, &request_, server_->max_receive_message_size())
+ .ok();
+ RegisteredAsyncRequest::FinalizeResult(tag, status);
+ *status = serialization_status && *status;
+ if (*status) {
+ new HealthCheckAsyncRequest(service_, server_, cq_);
+ status_ = service_->Check(&server_context_, &request_, &response_);
+ new HealthCheckAsyncResponse(this);
+ return false;
+ } else {
+ delete this;
+ return false;
+ }
+}
+
+Server::HealthCheckAsyncResponse::HealthCheckAsyncResponse(
+ HealthCheckAsyncRequest* request)
+ : request_(request) {
+ ServerContext* context = request_->server_context();
+ if (!context->sent_initial_metadata_) {
+ SendInitialMetadata(context->initial_metadata_,
+ context->initial_metadata_flags());
+ if (context->compression_level_set()) {
+ set_compression_level(context->compression_level());
+ }
+ context->sent_initial_metadata_ = true;
+ }
+ Status* status = request_->status();
+ if (status->ok()) {
+ ServerSendStatus(context->trailing_metadata_,
+ SendMessage(*request_->response()));
+ } else {
+ ServerSendStatus(context->trailing_metadata_, *status);
+ }
+ request_->call()->PerformOps(this);
+}
+
ServerInitializer* Server::initializer() { return server_initializer_.get(); }
} // namespace grpc