diff options
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/server/health/default_health_check_service.cc | 169 | ||||
-rw-r--r-- | src/cpp/server/health/default_health_check_service.h | 85 | ||||
-rw-r--r-- | src/cpp/server/health/health.pb.c | 24 | ||||
-rw-r--r-- | src/cpp/server/health/health.pb.h | 72 | ||||
-rw-r--r-- | src/cpp/server/health/health_check_service.cc | 49 | ||||
-rw-r--r-- | src/cpp/server/health/health_check_service_server_builder_option.cc | 50 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 160 |
7 files changed, 606 insertions, 3 deletions
diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc new file mode 100644 index 0000000000..9743bd5775 --- /dev/null +++ b/src/cpp/server/health/default_health_check_service.cc @@ -0,0 +1,169 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <memory> +#include <mutex> + +#include <grpc++/impl/codegen/method_handler_impl.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/cpp/server/health/default_health_check_service.h" +#include "src/cpp/server/health/health.pb.h" +#include "third_party/nanopb/pb_decode.h" +#include "third_party/nanopb/pb_encode.h" + +namespace grpc { +namespace { +const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; +} // namespace + +DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( + DefaultHealthCheckService* service, bool sync) + : service_(service), method_(nullptr), sync_(sync) { + MethodHandler* handler = nullptr; + if (sync_) { + handler = + new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>( + std::mem_fn(&HealthCheckServiceImpl::Check), this); + } + method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC, + handler); + AddMethod(method_); +} + +Status DefaultHealthCheckService::HealthCheckServiceImpl::Check( + ServerContext* context, const ByteBuffer* request, ByteBuffer* response) { + // Decode request. + std::vector<Slice> slices; + request->Dump(&slices); + uint8_t* request_bytes = nullptr; + bool request_bytes_owned = false; + size_t request_size = 0; + grpc_health_v1_HealthCheckRequest request_struct; + if (slices.empty()) { + request_struct.has_service = false; + } else if (slices.size() == 1) { + request_bytes = const_cast<uint8_t*>(slices[0].begin()); + request_size = slices[0].size(); + } else { + request_bytes_owned = true; + request_bytes = static_cast<uint8_t*>(gpr_malloc(request->Length())); + uint8_t* copy_to = request_bytes; + for (size_t i = 0; i < slices.size(); i++) { + memcpy(copy_to, slices[i].begin(), slices[i].size()); + copy_to += slices[i].size(); + } + } + + if (request_bytes != nullptr) { + pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size); + bool decode_status = pb_decode( + &istream, grpc_health_v1_HealthCheckRequest_fields, &request_struct); + if (request_bytes_owned) { + gpr_free(request_bytes); + } + if (!decode_status) { + return Status(StatusCode::INVALID_ARGUMENT, ""); + } + } + + // Check status from the associated default health checking service. + DefaultHealthCheckService::ServingStatus serving_status = + service_->GetServingStatus( + request_struct.has_service ? request_struct.service : ""); + if (serving_status == DefaultHealthCheckService::NOT_FOUND) { + return Status(StatusCode::NOT_FOUND, ""); + } + + // Encode response + grpc_health_v1_HealthCheckResponse response_struct; + response_struct.has_status = true; + response_struct.status = + serving_status == DefaultHealthCheckService::SERVING + ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING + : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING; + pb_ostream_t ostream; + memset(&ostream, 0, sizeof(ostream)); + pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields, + &response_struct); + grpc_slice response_slice = grpc_slice_malloc(ostream.bytes_written); + ostream = pb_ostream_from_buffer(GRPC_SLICE_START_PTR(response_slice), + GRPC_SLICE_LENGTH(response_slice)); + bool encode_status = pb_encode( + &ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct); + if (!encode_status) { + return Status(StatusCode::INTERNAL, "Failed to encode response."); + } + Slice encoded_response(response_slice, Slice::STEAL_REF); + ByteBuffer response_buffer(&encoded_response, 1); + response->Swap(&response_buffer); + return Status::OK; +} + +DefaultHealthCheckService::DefaultHealthCheckService() { + services_map_.emplace("", true); +} + +void DefaultHealthCheckService::SetServingStatus( + const grpc::string& service_name, bool serving) { + std::lock_guard<std::mutex> lock(mu_); + services_map_[service_name] = serving; +} + +void DefaultHealthCheckService::SetServingStatus(bool serving) { + std::lock_guard<std::mutex> lock(mu_); + for (auto iter = services_map_.begin(); iter != services_map_.end(); ++iter) { + iter->second = serving; + } +} + +DefaultHealthCheckService::ServingStatus +DefaultHealthCheckService::GetServingStatus( + const grpc::string& service_name) const { + std::lock_guard<std::mutex> lock(mu_); + const auto& iter = services_map_.find(service_name); + if (iter == services_map_.end()) { + return NOT_FOUND; + } + return iter->second ? SERVING : NOT_SERVING; +} + +DefaultHealthCheckService::HealthCheckServiceImpl* +DefaultHealthCheckService::GetHealthCheckService(bool sync) { + GPR_ASSERT(impl_ == nullptr); + impl_.reset(new HealthCheckServiceImpl(this, sync)); + return impl_.get(); +} + +} // namespace grpc diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h new file mode 100644 index 0000000000..1ecb0a2ba9 --- /dev/null +++ b/src/cpp/server/health/default_health_check_service.h @@ -0,0 +1,85 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H +#define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H + +#include <mutex> + +#include <grpc++/health_check_service_interface.h> +#include <grpc++/impl/codegen/service_type.h> +#include <grpc++/support/byte_buffer.h> + +namespace grpc { + +// Default implementation of HealthCheckServiceInterface. Server will create and +// own it. +class DefaultHealthCheckService final : public HealthCheckServiceInterface { + public: + // The service impl to register with the server. + class HealthCheckServiceImpl : public Service { + public: + HealthCheckServiceImpl(DefaultHealthCheckService* service, bool sync); + + Status Check(ServerContext* context, const ByteBuffer* request, + ByteBuffer* response); + + bool sync() { return sync_; } + + // This is only useful for the async mode. It should be called after + // RegisterService returns. + void* server_tag() const { return method_->server_tag(); } + + private: + const DefaultHealthCheckService* const service_; + RpcServiceMethod* method_; + const bool sync_; + }; + + DefaultHealthCheckService(); + void SetServingStatus(const grpc::string& service_name, + bool serving) override; + void SetServingStatus(bool serving) override; + enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; + ServingStatus GetServingStatus(const grpc::string& service_name) const; + HealthCheckServiceImpl* GetHealthCheckService(bool sync); + + private: + mutable std::mutex mu_; + std::map<grpc::string, bool> services_map_; + std::unique_ptr<HealthCheckServiceImpl> impl_; +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H diff --git a/src/cpp/server/health/health.pb.c b/src/cpp/server/health/health.pb.c new file mode 100644 index 0000000000..09bd98a3d9 --- /dev/null +++ b/src/cpp/server/health/health.pb.c @@ -0,0 +1,24 @@ +/* Automatically generated nanopb constant definitions */ +/* Generated by nanopb-0.3.7-dev */ + +#include "src/cpp/server/health/health.pb.h" + +/* @@protoc_insertion_point(includes) */ +#if PB_PROTO_HEADER_VERSION != 30 +#error Regenerate this file with the current version of nanopb generator. +#endif + + + +const pb_field_t grpc_health_v1_HealthCheckRequest_fields[2] = { + PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_health_v1_HealthCheckRequest, service, service, 0), + PB_LAST_FIELD +}; + +const pb_field_t grpc_health_v1_HealthCheckResponse_fields[2] = { + PB_FIELD( 1, UENUM , OPTIONAL, STATIC , FIRST, grpc_health_v1_HealthCheckResponse, status, status, 0), + PB_LAST_FIELD +}; + + +/* @@protoc_insertion_point(eof) */ diff --git a/src/cpp/server/health/health.pb.h b/src/cpp/server/health/health.pb.h new file mode 100644 index 0000000000..7051b3260a --- /dev/null +++ b/src/cpp/server/health/health.pb.h @@ -0,0 +1,72 @@ +/* Automatically generated nanopb header */ +/* Generated by nanopb-0.3.7-dev */ + +#ifndef PB_GRPC_HEALTH_V1_HEALTH_PB_H_INCLUDED +#define PB_GRPC_HEALTH_V1_HEALTH_PB_H_INCLUDED +#include "third_party/nanopb/pb.h" +/* @@protoc_insertion_point(includes) */ +#if PB_PROTO_HEADER_VERSION != 30 +#error Regenerate this file with the current version of nanopb generator. +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +/* Enum definitions */ +typedef enum _grpc_health_v1_HealthCheckResponse_ServingStatus { + grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN = 0, + grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING = 1, + grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2 +} grpc_health_v1_HealthCheckResponse_ServingStatus; +#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MIN grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN +#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING +#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING+1)) + +/* Struct definitions */ +typedef struct _grpc_health_v1_HealthCheckRequest { + bool has_service; + char service[200]; +/* @@protoc_insertion_point(struct:grpc_health_v1_HealthCheckRequest) */ +} grpc_health_v1_HealthCheckRequest; + +typedef struct _grpc_health_v1_HealthCheckResponse { + bool has_status; + grpc_health_v1_HealthCheckResponse_ServingStatus status; +/* @@protoc_insertion_point(struct:grpc_health_v1_HealthCheckResponse) */ +} grpc_health_v1_HealthCheckResponse; + +/* Default values for struct fields */ + +/* Initializer values for message structs */ +#define grpc_health_v1_HealthCheckRequest_init_default {false, ""} +#define grpc_health_v1_HealthCheckResponse_init_default {false, (grpc_health_v1_HealthCheckResponse_ServingStatus)0} +#define grpc_health_v1_HealthCheckRequest_init_zero {false, ""} +#define grpc_health_v1_HealthCheckResponse_init_zero {false, (grpc_health_v1_HealthCheckResponse_ServingStatus)0} + +/* Field tags (for use in manual encoding/decoding) */ +#define grpc_health_v1_HealthCheckRequest_service_tag 1 +#define grpc_health_v1_HealthCheckResponse_status_tag 1 + +/* Struct field encoding specification for nanopb */ +extern const pb_field_t grpc_health_v1_HealthCheckRequest_fields[2]; +extern const pb_field_t grpc_health_v1_HealthCheckResponse_fields[2]; + +/* Maximum encoded size of messages (where known) */ +#define grpc_health_v1_HealthCheckRequest_size 203 +#define grpc_health_v1_HealthCheckResponse_size 2 + +/* Message IDs (where set with "msgid" option) */ +#ifdef PB_MSGID + +#define HEALTH_MESSAGES \ + + +#endif + +#ifdef __cplusplus +} /* extern "C" */ +#endif +/* @@protoc_insertion_point(eof) */ + +#endif diff --git a/src/cpp/server/health/health_check_service.cc b/src/cpp/server/health/health_check_service.cc new file mode 100644 index 0000000000..cca68c5549 --- /dev/null +++ b/src/cpp/server/health/health_check_service.cc @@ -0,0 +1,49 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/health_check_service_interface.h> + +namespace grpc { +namespace { +bool g_grpc_default_health_check_service_enabled = false; +} // namesapce + +bool DefaultHealthCheckServiceEnabled() { + return g_grpc_default_health_check_service_enabled; +} + +void EnableDefaultHealthCheckService(bool enable) { + g_grpc_default_health_check_service_enabled = enable; +} + +} // namespace grpc diff --git a/src/cpp/server/health/health_check_service_server_builder_option.cc b/src/cpp/server/health/health_check_service_server_builder_option.cc new file mode 100644 index 0000000000..24264204b3 --- /dev/null +++ b/src/cpp/server/health/health_check_service_server_builder_option.cc @@ -0,0 +1,50 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/ext/health_check_service_server_builder_option.h> + +namespace grpc { + +HealthCheckServiceServerBuilderOption::HealthCheckServiceServerBuilderOption( + std::unique_ptr<HealthCheckServiceInterface> hc) + : hc_(std::move(hc)) {} +// Hand over hc_ to the server. +void HealthCheckServiceServerBuilderOption::UpdateArguments( + ChannelArguments* args) { + args->SetPointer(kHealthCheckServiceInterfaceArg, hc_.release()); +} + +void HealthCheckServiceServerBuilderOption::UpdatePlugins( + std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) {} + +} // namespace grpc diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 817d85a81c..00aea485db 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -37,6 +37,7 @@ #include <grpc++/completion_queue.h> #include <grpc++/generic/async_generic_service.h> +#include <grpc++/impl/codegen/async_unary_call.h> #include <grpc++/impl/codegen/completion_queue_tag.h> #include <grpc++/impl/grpc_library.h> #include <grpc++/impl/method_handler_impl.h> @@ -51,6 +52,7 @@ #include <grpc/support/log.h> #include "src/core/lib/profiling/timers.h" +#include "src/cpp/server/health/default_health_check_service.h" #include "src/cpp/thread_manager/thread_manager.h" namespace grpc { @@ -117,6 +119,79 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; +// This is a dummy implementation of the interface so that +// HealthCheckAsyncRequest can get Call from RegisteredAsyncRequest. It does not +// do any reading or writing. +class HealthCheckAsyncResponseWriter final + : public ServerAsyncStreamingInterface { + public: + HealthCheckAsyncResponseWriter() : call_(nullptr, nullptr, nullptr) {} + void SendInitialMetadata(void* tag) override { + abort(); // should not be called. + } + void BindCall(Call* call) override { call_ = *call; } + Call* call() { return &call_; } + + private: + Call call_; +}; + +class Server::HealthCheckAsyncRequestContext { + protected: + ServerContext server_context_; + HealthCheckAsyncResponseWriter rpc_; +}; + +class Server::HealthCheckAsyncRequest final + : public HealthCheckAsyncRequestContext, + public RegisteredAsyncRequest { + public: + HealthCheckAsyncRequest( + DefaultHealthCheckService::HealthCheckServiceImpl* service, + Server* server, ServerCompletionQueue* cq) + : RegisteredAsyncRequest(server, &server_context_, &rpc_, cq, this, + false), + service_(service), + server_(server), + cq_(cq) { + IssueRequest(service->server_tag(), &payload_, cq); + } + + bool FinalizeResult(void** tag, bool* status) override; + Call* call() { return rpc_.call(); } + ByteBuffer* response() { return &response_; } + Status* status() { return &status_; } + ServerContext* server_context() { return &server_context_; } + + private: + DefaultHealthCheckService::HealthCheckServiceImpl* service_; + Server* const server_; + ServerCompletionQueue* const cq_; + grpc_byte_buffer* payload_; + ByteBuffer request_; + ByteBuffer response_; + Status status_; +}; + +typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + HealthCheckAsyncResponseOp; +class Server::HealthCheckAsyncResponse final + : public HealthCheckAsyncResponseOp { + public: + HealthCheckAsyncResponse(HealthCheckAsyncRequest* request); + ~HealthCheckAsyncResponse() { delete request_; } + + bool FinalizeResult(void** tag, bool* status) override { + HealthCheckAsyncResponseOp::FinalizeResult(tag, status); + delete this; + return false; + } + + private: + HealthCheckAsyncRequest* const request_; +}; + class ShutdownTag : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { return false; } @@ -342,6 +417,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { int cq_timeout_msec_; std::vector<std::unique_ptr<SyncRequest>> sync_requests_; std::unique_ptr<RpcServiceMethod> unknown_method_; + std::unique_ptr<RpcServiceMethod> health_check_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; }; @@ -358,7 +434,8 @@ Server::Server( shutdown_notified_(false), has_generic_service_(false), server_(nullptr), - server_initializer_(new ServerInitializer(this)) { + server_initializer_(new ServerInitializer(this)), + health_check_service_disabled_(false) { g_gli_initializer.summon(); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); global_callbacks_ = g_callbacks; @@ -374,6 +451,19 @@ Server::Server( grpc_channel_args channel_args; args->SetChannelArgs(&channel_args); + for (size_t i = 0; i < channel_args.num_args; i++) { + if (0 == + strcmp(channel_args.args[i].key, kHealthCheckServiceInterfaceArg)) { + if (channel_args.args[i].value.pointer.p == nullptr) { + health_check_service_disabled_ = true; + } else { + health_check_service_.reset(static_cast<HealthCheckServiceInterface*>( + channel_args.args[i].value.pointer.p)); + } + break; + } + } + server_ = grpc_server_create(&channel_args, nullptr); } @@ -479,6 +569,19 @@ int Server::AddListeningPort(const grpc::string& addr, bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { GPR_ASSERT(!started_); started_ = true; + + // Only create default health check service when user did not provide an + // explicit one. + DefaultHealthCheckService::HealthCheckServiceImpl* health_service = nullptr; + if (health_check_service_ == nullptr && !health_check_service_disabled_ && + DefaultHealthCheckServiceEnabled()) { + auto* default_hc_service = new DefaultHealthCheckService; + health_check_service_.reset(default_hc_service); + health_service = + default_hc_service->GetHealthCheckService(!sync_server_cqs_->empty()); + RegisterService(nullptr, health_service); + } + grpc_server_start(server_); if (!has_generic_service_) { @@ -493,6 +596,14 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { } } + if (health_service && !health_service->sync()) { + for (size_t i = 0; i < num_cqs; i++) { + if (cqs[i]->IsFrequentlyPolled()) { + new HealthCheckAsyncRequest(health_service, this, cqs[i]); + } + } + } + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Start(); } @@ -613,8 +724,10 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) - : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, + bool delete_on_finalize) + : BaseAsyncRequest(server, context, stream, call_cq, tag, + delete_on_finalize) {} void ServerInterface::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, @@ -671,6 +784,47 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( request_->stream()->call_.PerformOps(this); } +bool Server::HealthCheckAsyncRequest::FinalizeResult(void** tag, bool* status) { + bool serialization_status = + *status && payload_ && + SerializationTraits<ByteBuffer>::Deserialize( + payload_, &request_, server_->max_receive_message_size()) + .ok(); + RegisteredAsyncRequest::FinalizeResult(tag, status); + *status = serialization_status && *status; + if (*status) { + new HealthCheckAsyncRequest(service_, server_, cq_); + status_ = service_->Check(&server_context_, &request_, &response_); + new HealthCheckAsyncResponse(this); + return false; + } else { + delete this; + return false; + } +} + +Server::HealthCheckAsyncResponse::HealthCheckAsyncResponse( + HealthCheckAsyncRequest* request) + : request_(request) { + ServerContext* context = request_->server_context(); + if (!context->sent_initial_metadata_) { + SendInitialMetadata(context->initial_metadata_, + context->initial_metadata_flags()); + if (context->compression_level_set()) { + set_compression_level(context->compression_level()); + } + context->sent_initial_metadata_ = true; + } + Status* status = request_->status(); + if (status->ok()) { + ServerSendStatus(context->trailing_metadata_, + SendMessage(*request_->response())); + } else { + ServerSendStatus(context->trailing_metadata_, *status); + } + request_->call()->PerformOps(this); +} + ServerInitializer* Server::initializer() { return server_initializer_.get(); } } // namespace grpc |