aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/channel/channelz.cc38
-rw-r--r--src/core/lib/channel/channelz.h26
-rw-r--r--src/core/lib/channel/channelz_registry.cc40
-rw-r--r--src/core/lib/channel/channelz_registry.h7
-rw-r--r--src/core/lib/surface/call.cc38
-rw-r--r--src/core/lib/surface/call.h1
-rw-r--r--src/core/lib/surface/channel.cc7
-rw-r--r--src/core/lib/surface/server.cc24
-rw-r--r--src/core/lib/surface/server.h4
-rw-r--r--src/cpp/server/channelz/channelz_service.cc6
-rw-r--r--src/cpp/server/health/default_health_check_service.cc484
-rw-r--r--src/cpp/server/health/default_health_check_service.h242
-rw-r--r--src/cpp/server/health/health.pb.c1
-rw-r--r--src/cpp/server/health/health.pb.h7
-rw-r--r--src/cpp/server/server_cc.cc27
-rw-r--r--src/proto/grpc/health/v1/health.proto20
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h3
18 files changed, 256 insertions, 721 deletions
diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc
index 9f54850002..375cf25cc6 100644
--- a/src/core/lib/channel/channelz.cc
+++ b/src/core/lib/channel/channelz.cc
@@ -48,6 +48,7 @@ BaseNode::~BaseNode() { ChannelzRegistry::Unregister(uuid_); }
char* BaseNode::RenderJsonString() {
grpc_json* json = RenderJson();
+ GPR_ASSERT(json != nullptr);
char* json_str = grpc_json_dump_to_string(json, 0);
grpc_json_destroy(json);
return json_str;
@@ -146,5 +147,42 @@ RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode(
channel, channel_tracer_max_nodes, is_top_level_channel);
}
+ServerNode::ServerNode(size_t channel_tracer_max_nodes)
+ : BaseNode(EntityType::kServer), trace_(channel_tracer_max_nodes) {}
+
+ServerNode::~ServerNode() {}
+
+grpc_json* ServerNode::RenderJson() {
+ // We need to track these three json objects to build our object
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* json_iterator = nullptr;
+ // create and fill the ref child
+ json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = json_iterator;
+ json_iterator = nullptr;
+ json_iterator = grpc_json_add_number_string_child(json, json_iterator,
+ "serverId", uuid());
+ // reset json iterators to top level object
+ json = top_level_json;
+ json_iterator = nullptr;
+ // create and fill the data child.
+ grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = data;
+ json_iterator = nullptr;
+ // fill in the channel trace if applicable
+ grpc_json* trace_json = trace_.RenderJson();
+ if (trace_json != nullptr) {
+ trace_json->key = "trace"; // this object is named trace in channelz.proto
+ grpc_json_link_child(json, trace_json, nullptr);
+ }
+ // ask CallCountingHelper to populate trace and call count data.
+ call_counter_.PopulateCallCounts(json);
+ json = top_level_json;
+ return top_level_json;
+}
+
} // namespace channelz
} // namespace grpc_core
diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h
index db5d05140d..9be256147b 100644
--- a/src/core/lib/channel/channelz.h
+++ b/src/core/lib/channel/channelz.h
@@ -170,12 +170,30 @@ class ChannelNode : public BaseNode {
};
// Handles channelz bookkeeping for servers
-// TODO(ncteisen): implement in subsequent PR.
class ServerNode : public BaseNode {
public:
- explicit ServerNode(size_t channel_tracer_max_nodes)
- : BaseNode(EntityType::kServer) {}
- ~ServerNode() override {}
+ explicit ServerNode(size_t channel_tracer_max_nodes);
+ ~ServerNode() override;
+
+ grpc_json* RenderJson() override;
+
+ // proxy methods to composed classes.
+ void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) {
+ trace_.AddTraceEvent(severity, data);
+ }
+ void AddTraceEventWithReference(ChannelTrace::Severity severity,
+ grpc_slice data,
+ RefCountedPtr<BaseNode> referenced_channel) {
+ trace_.AddTraceEventWithReference(severity, data,
+ std::move(referenced_channel));
+ }
+ void RecordCallStarted() { call_counter_.RecordCallStarted(); }
+ void RecordCallFailed() { call_counter_.RecordCallFailed(); }
+ void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
+
+ private:
+ CallCountingHelper call_counter_;
+ ChannelTrace trace_;
};
// Handles channelz bookkeeping for sockets
diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc
index 972c61c1d6..adc7b6ba44 100644
--- a/src/core/lib/channel/channelz_registry.cc
+++ b/src/core/lib/channel/channelz_registry.cc
@@ -112,6 +112,42 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
return json_str;
}
+char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) {
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* json_iterator = nullptr;
+ InlinedVector<BaseNode*, 10> servers;
+ // uuids index into entities one-off (idx 0 is really uuid 1, since 0 is
+ // reserved). However, we want to support requests coming in with
+ // start_server_id=0, which signifies "give me everything."
+ size_t start_idx = start_server_id == 0 ? 0 : start_server_id - 1;
+ for (size_t i = start_idx; i < entities_.size(); ++i) {
+ if (entities_[i] != nullptr &&
+ entities_[i]->type() ==
+ grpc_core::channelz::BaseNode::EntityType::kServer) {
+ servers.push_back(entities_[i]);
+ }
+ }
+ if (!servers.empty()) {
+ // create list of servers
+ grpc_json* array_parent = grpc_json_create_child(
+ nullptr, json, "server", nullptr, GRPC_JSON_ARRAY, false);
+ for (size_t i = 0; i < servers.size(); ++i) {
+ grpc_json* server_json = servers[i]->RenderJson();
+ json_iterator =
+ grpc_json_link_child(array_parent, server_json, json_iterator);
+ }
+ }
+ // For now we do not have any pagination rules. In the future we could
+ // pick a constant for max_channels_sent for a GetServers request.
+ // Tracking: https://github.com/grpc/grpc/issues/16019.
+ json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr,
+ GRPC_JSON_TRUE, false);
+ char* json_str = grpc_json_dump_to_string(top_level_json, 0);
+ grpc_json_destroy(top_level_json);
+ return json_str;
+}
+
} // namespace channelz
} // namespace grpc_core
@@ -120,6 +156,10 @@ char* grpc_channelz_get_top_channels(intptr_t start_channel_id) {
start_channel_id);
}
+char* grpc_channelz_get_servers(intptr_t start_server_id) {
+ return grpc_core::channelz::ChannelzRegistry::GetServers(start_server_id);
+}
+
char* grpc_channelz_get_channel(intptr_t channel_id) {
grpc_core::channelz::BaseNode* channel_node =
grpc_core::channelz::ChannelzRegistry::Get(channel_id);
diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h
index 142c039220..d0d660600d 100644
--- a/src/core/lib/channel/channelz_registry.h
+++ b/src/core/lib/channel/channelz_registry.h
@@ -52,6 +52,12 @@ class ChannelzRegistry {
return Default()->InternalGetTopChannels(start_channel_id);
}
+ // Returns the allocated JSON string that represents the proto
+ // GetServersResponse as per channelz.proto.
+ static char* GetServers(intptr_t start_server_id) {
+ return Default()->InternalGetServers(start_server_id);
+ }
+
private:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
@@ -74,6 +80,7 @@ class ChannelzRegistry {
BaseNode* InternalGet(intptr_t uuid);
char* InternalGetTopChannels(intptr_t start_channel_id);
+ char* InternalGetServers(intptr_t start_server_id);
// protects entities_ and uuid_
gpr_mu mu_;
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index b07c4d6c10..3e008e5606 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -48,6 +48,7 @@
#include "src/core/lib/surface/call_test_only.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/server.h"
#include "src/core/lib/surface/validate_metadata.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata.h"
@@ -202,6 +203,8 @@ struct grpc_call {
} client;
struct {
int* cancelled;
+ // backpointer to owning server if this is a server side call.
+ grpc_server* server;
} server;
} final_op;
grpc_error* status_error;
@@ -311,14 +314,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_MESSAGE_COMPRESS_NONE);
call->is_client = args->server_transport_data == nullptr;
- if (call->is_client) {
- GRPC_STATS_INC_CLIENT_CALLS_CREATED();
- } else {
- GRPC_STATS_INC_SERVER_CALLS_CREATED();
- }
call->stream_op_payload.context = call->context;
grpc_slice path = grpc_empty_slice();
if (call->is_client) {
+ GRPC_STATS_INC_CLIENT_CALLS_CREATED();
GPR_ASSERT(args->add_initial_metadata_count <
MAX_SEND_EXTRA_METADATA_COUNT);
for (i = 0; i < args->add_initial_metadata_count; i++) {
@@ -332,6 +331,8 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
call->send_extra_metadata_count =
static_cast<int>(args->add_initial_metadata_count);
} else {
+ GRPC_STATS_INC_SERVER_CALLS_CREATED();
+ call->final_op.server.server = args->server;
GPR_ASSERT(args->add_initial_metadata_count == 0);
call->send_extra_metadata_count = 0;
}
@@ -435,10 +436,18 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
&call->pollent);
}
- grpc_core::channelz::ChannelNode* channelz_channel =
- grpc_channel_get_channelz_node(call->channel);
- if (channelz_channel != nullptr) {
- channelz_channel->RecordCallStarted();
+ if (call->is_client) {
+ grpc_core::channelz::ChannelNode* channelz_channel =
+ grpc_channel_get_channelz_node(call->channel);
+ if (channelz_channel != nullptr) {
+ channelz_channel->RecordCallStarted();
+ }
+ } else {
+ grpc_core::channelz::ServerNode* channelz_server =
+ grpc_server_get_channelz_node(call->final_op.server.server);
+ if (channelz_server != nullptr) {
+ channelz_server->RecordCallStarted();
+ }
}
grpc_slice_unref_internal(path);
@@ -709,14 +718,15 @@ static void set_final_status(grpc_call* call, grpc_error* error) {
} else {
*call->final_op.server.cancelled =
error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE;
- /* TODO(ncteisen) : Update channelz handling for server
- if (channelz_channel != nullptr) {
+ grpc_core::channelz::ServerNode* channelz_server =
+ grpc_server_get_channelz_node(call->final_op.server.server);
+ if (channelz_server != nullptr) {
if (*call->final_op.server.cancelled) {
- channelz_channel->RecordCallFailed();
+ channelz_server->RecordCallFailed();
} else {
- channelz_channel->RecordCallSucceeded();
+ channelz_server->RecordCallSucceeded();
}
- } */
+ }
GRPC_ERROR_UNREF(error);
}
}
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index b3b06059d4..b34260505a 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -33,6 +33,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success,
typedef struct grpc_call_create_args {
grpc_channel* channel;
+ grpc_server* server;
grpc_call* parent;
uint32_t propagation_mask;
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index 895ecaf6c8..054fe105c3 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -165,10 +165,11 @@ grpc_channel* grpc_channel_create_with_builder(
}
grpc_channel_args_destroy(args);
- if (channelz_enabled) {
- bool is_top_level_channel = channel->is_client && !internal_channel;
+ // we only need to do the channelz bookkeeping for clients here. The channelz
+ // bookkeeping for server channels occurs in src/core/lib/surface/server.cc
+ if (channelz_enabled && channel->is_client) {
channel->channelz_channel = channel_node_create_func(
- channel, channel_tracer_max_nodes, is_top_level_channel);
+ channel, channel_tracer_max_nodes, !internal_channel);
channel->channelz_channel->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 521825ca69..c0fae0f140 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -222,6 +222,8 @@ struct grpc_server {
/** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time;
+
+ grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;
};
#define SERVER_FROM_CALL_ELEM(elem) \
@@ -367,6 +369,7 @@ static void server_ref(grpc_server* server) {
static void server_delete(grpc_server* server) {
registered_method* rm;
size_t i;
+ server->channelz_server.reset();
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
@@ -796,6 +799,7 @@ static void accept_stream(void* cd, grpc_transport* transport,
args.channel = chand->channel;
args.server_transport_data = transport_server_data;
args.send_deadline = GRPC_MILLIS_INF_FUTURE;
+ args.server = chand->server;
grpc_call* call;
grpc_error* error = grpc_call_create(&args, &call);
grpc_call_element* elem =
@@ -960,6 +964,7 @@ void grpc_server_register_completion_queue(grpc_server* server,
}
grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
+ grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
grpc_server* server =
@@ -976,6 +981,20 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
server->channel_args = grpc_channel_args_copy(args);
+ const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);
+ if (grpc_channel_arg_get_bool(arg, false)) {
+ arg = grpc_channel_args_find(args,
+ GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE);
+ size_t trace_events_per_node =
+ grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
+ server->channelz_server =
+ grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
+ trace_events_per_node);
+ server->channelz_server->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string("Server created"));
+ }
+
return server;
}
@@ -1478,3 +1497,8 @@ int grpc_server_has_open_connections(grpc_server* server) {
gpr_mu_unlock(&server->mu_global);
return r;
}
+
+grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
+ grpc_server* server) {
+ return server->channelz_server.get();
+}
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index c617cc223e..0196743ff9 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -23,6 +23,7 @@
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/transport.h"
@@ -46,6 +47,9 @@ void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport,
grpc_pollset* accepting_pollset,
const grpc_channel_args* args);
+grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
+ grpc_server* server);
+
const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server);
int grpc_server_has_open_connections(grpc_server* server);
diff --git a/src/cpp/server/channelz/channelz_service.cc b/src/cpp/server/channelz/channelz_service.cc
index e0ef4acd58..c055a9b1ba 100644
--- a/src/cpp/server/channelz/channelz_service.cc
+++ b/src/cpp/server/channelz/channelz_service.cc
@@ -32,6 +32,9 @@ Status ChannelzService::GetTopChannels(
ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request,
channelz::v1::GetTopChannelsResponse* response) {
char* json_str = grpc_channelz_get_top_channels(request->start_channel_id());
+ if (json_str == nullptr) {
+ return Status(INTERNAL, "grpc_channelz_get_top_channels returned null");
+ }
google::protobuf::util::Status s =
google::protobuf::util::JsonStringToMessage(json_str, response);
gpr_free(json_str);
@@ -45,6 +48,9 @@ Status ChannelzService::GetChannel(
ServerContext* unused, const channelz::v1::GetChannelRequest* request,
channelz::v1::GetChannelResponse* response) {
char* json_str = grpc_channelz_get_channel(request->channel_id());
+ if (json_str == nullptr) {
+ return Status(NOT_FOUND, "No object found for that ChannelId");
+ }
google::protobuf::util::Status s =
google::protobuf::util::JsonStringToMessage(json_str, response);
gpr_free(json_str);
diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc
index fc3db1bba7..bfda67d086 100644
--- a/src/cpp/server/health/default_health_check_service.cc
+++ b/src/cpp/server/health/default_health_check_service.cc
@@ -30,162 +30,29 @@
#include "src/cpp/server/health/health.pb.h"
namespace grpc {
-
-//
-// DefaultHealthCheckService
-//
-
-DefaultHealthCheckService::DefaultHealthCheckService() {
- services_map_[""].SetServingStatus(SERVING);
-}
-
-void DefaultHealthCheckService::SetServingStatus(
- const grpc::string& service_name, bool serving) {
- std::unique_lock<std::mutex> lock(mu_);
- services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
-}
-
-void DefaultHealthCheckService::SetServingStatus(bool serving) {
- const ServingStatus status = serving ? SERVING : NOT_SERVING;
- std::unique_lock<std::mutex> lock(mu_);
- for (auto& p : services_map_) {
- ServiceData& service_data = p.second;
- service_data.SetServingStatus(status);
- }
-}
-
-DefaultHealthCheckService::ServingStatus
-DefaultHealthCheckService::GetServingStatus(
- const grpc::string& service_name) const {
- std::lock_guard<std::mutex> lock(mu_);
- auto it = services_map_.find(service_name);
- if (it == services_map_.end()) {
- return NOT_FOUND;
- }
- const ServiceData& service_data = it->second;
- return service_data.GetServingStatus();
-}
-
-void DefaultHealthCheckService::RegisterCallHandler(
- const grpc::string& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
- std::unique_lock<std::mutex> lock(mu_);
- ServiceData& service_data = services_map_[service_name];
- service_data.AddCallHandler(handler /* copies ref */);
- handler->SendHealth(std::move(handler), service_data.GetServingStatus());
-}
-
-void DefaultHealthCheckService::UnregisterCallHandler(
- const grpc::string& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
- std::unique_lock<std::mutex> lock(mu_);
- auto it = services_map_.find(service_name);
- if (it == services_map_.end()) return;
- ServiceData& service_data = it->second;
- service_data.RemoveCallHandler(std::move(handler));
- if (service_data.Unused()) {
- services_map_.erase(it);
- }
-}
-
-DefaultHealthCheckService::HealthCheckServiceImpl*
-DefaultHealthCheckService::GetHealthCheckService(
- std::unique_ptr<ServerCompletionQueue> cq) {
- GPR_ASSERT(impl_ == nullptr);
- impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
- return impl_.get();
-}
-
-//
-// DefaultHealthCheckService::ServiceData
-//
-
-void DefaultHealthCheckService::ServiceData::SetServingStatus(
- ServingStatus status) {
- status_ = status;
- for (auto& call_handler : call_handlers_) {
- call_handler->SendHealth(call_handler /* copies ref */, status);
- }
-}
-
-void DefaultHealthCheckService::ServiceData::AddCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
- call_handlers_.insert(std::move(handler));
-}
-
-void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
- call_handlers_.erase(std::move(handler));
-}
-
-//
-// DefaultHealthCheckService::HealthCheckServiceImpl
-//
-
namespace {
const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
-const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
} // namespace
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
- DefaultHealthCheckService* database,
- std::unique_ptr<ServerCompletionQueue> cq)
- : database_(database), cq_(std::move(cq)) {
- // Add Check() method.
- check_method_ = new internal::RpcServiceMethod(
- kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr);
- AddMethod(check_method_);
- // Add Watch() method.
- watch_method_ = new internal::RpcServiceMethod(
- kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr);
- AddMethod(watch_method_);
- // Create serving thread.
- thread_ = std::unique_ptr<::grpc_core::Thread>(
- new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
-}
-
-DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
- // We will reach here after the server starts shutting down.
- shutdown_ = true;
- {
- std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
- cq_->Shutdown();
- }
- thread_->Join();
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
- thread_->Start();
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
- HealthCheckServiceImpl* service =
- reinterpret_cast<HealthCheckServiceImpl*>(arg);
- // TODO(juanlishen): This is a workaround to wait for the cq to be ready.
- // Need to figure out why cq is not ready after service starts.
- gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_seconds(1, GPR_TIMESPAN)));
- CheckCallHandler::CreateAndStart(service->cq_.get(), service->database_,
- service);
- WatchCallHandler::CreateAndStart(service->cq_.get(), service->database_,
- service);
- void* tag;
- bool ok;
- while (true) {
- if (!service->cq_->Next(&tag, &ok)) {
- // The completion queue is shutting down.
- GPR_ASSERT(service->shutdown_);
- break;
- }
- auto* next_step = static_cast<CallableTag*>(tag);
- next_step->Run(ok);
- }
-}
-
-bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
- const ByteBuffer& request, grpc::string* service_name) {
+ DefaultHealthCheckService* service)
+ : service_(service), method_(nullptr) {
+ internal::MethodHandler* handler =
+ new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer,
+ ByteBuffer>(
+ std::mem_fn(&HealthCheckServiceImpl::Check), this);
+ method_ = new internal::RpcServiceMethod(
+ kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler);
+ AddMethod(method_);
+}
+
+Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
+ ServerContext* context, const ByteBuffer* request, ByteBuffer* response) {
+ // Decode request.
std::vector<Slice> slices;
- if (!request.Dump(&slices).ok()) return false;
+ if (!request->Dump(&slices).ok()) {
+ return Status(StatusCode::INVALID_ARGUMENT, "");
+ }
uint8_t* request_bytes = nullptr;
bool request_bytes_owned = false;
size_t request_size = 0;
@@ -197,13 +64,14 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
request_size = slices[0].size();
} else {
request_bytes_owned = true;
- request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
+ request_bytes = static_cast<uint8_t*>(gpr_malloc(request->Length()));
uint8_t* copy_to = request_bytes;
for (size_t i = 0; i < slices.size(); i++) {
memcpy(copy_to, slices[i].begin(), slices[i].size());
copy_to += slices[i].size();
}
}
+
if (request_bytes != nullptr) {
pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
bool decode_status = pb_decode(
@@ -211,22 +79,26 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
if (request_bytes_owned) {
gpr_free(request_bytes);
}
- if (!decode_status) return false;
+ if (!decode_status) {
+ return Status(StatusCode::INVALID_ARGUMENT, "");
+ }
}
- *service_name = request_struct.has_service ? request_struct.service : "";
- return true;
-}
-bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
- ServingStatus status, ByteBuffer* response) {
+ // Check status from the associated default health checking service.
+ DefaultHealthCheckService::ServingStatus serving_status =
+ service_->GetServingStatus(
+ request_struct.has_service ? request_struct.service : "");
+ if (serving_status == DefaultHealthCheckService::NOT_FOUND) {
+ return Status(StatusCode::NOT_FOUND, "");
+ }
+
+ // Encode response
grpc_health_v1_HealthCheckResponse response_struct;
response_struct.has_status = true;
response_struct.status =
- status == NOT_FOUND
- ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
- : status == SERVING
- ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
- : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
+ serving_status == DefaultHealthCheckService::SERVING
+ ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
+ : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
pb_ostream_t ostream;
memset(&ostream, 0, sizeof(ostream));
pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields,
@@ -236,282 +108,48 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
GRPC_SLICE_LENGTH(response_slice));
bool encode_status = pb_encode(
&ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct);
- if (!encode_status) return false;
+ if (!encode_status) {
+ return Status(StatusCode::INTERNAL, "Failed to encode response.");
+ }
Slice encoded_response(response_slice, Slice::STEAL_REF);
ByteBuffer response_buffer(&encoded_response, 1);
response->Swap(&response_buffer);
- return true;
-}
-
-//
-// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
-//
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service) {
- std::shared_ptr<CallHandler> self =
- std::make_shared<CheckCallHandler>(cq, database, service);
- CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
- {
- std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
- if (service->shutdown_) return;
- // Request a Check() call.
- handler->next_ =
- CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
- &handler->writer_, cq, cq, &handler->next_);
- }
-}
-
-DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- CheckCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service)
- : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
- if (!ok) {
- // The value of ok being false means that the server is shutting down.
- return;
- }
- // Spawn a new handler instance to serve the next new client. Every handler
- // instance will deallocate itself when it's done.
- CreateAndStart(cq_, database_, service_);
- // Process request.
- gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
- this);
- grpc::string service_name;
- grpc::Status status = Status::OK;
- ByteBuffer response;
- if (!service_->DecodeRequest(request_, &service_name)) {
- status = Status(StatusCode::INVALID_ARGUMENT, "");
- } else {
- ServingStatus serving_status = database_->GetServingStatus(service_name);
- if (serving_status == NOT_FOUND) {
- status = Status(StatusCode::NOT_FOUND, "service name unknown");
- } else if (!service_->EncodeResponse(serving_status, &response)) {
- status = Status(StatusCode::INTERNAL, "");
- }
- }
- // Send response.
- {
- std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
- if (!service_->shutdown_) {
- next_ =
- CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- if (status.ok()) {
- writer_.Finish(response, status, &next_);
- } else {
- writer_.FinishWithError(status, &next_);
- }
- }
- }
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
- OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
- if (ok) {
- gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
- service_, this);
- }
-}
-
-//
-// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
-//
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service) {
- std::shared_ptr<CallHandler> self =
- std::make_shared<WatchCallHandler>(cq, database, service);
- WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
- {
- std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
- if (service->shutdown_) return;
- // Request AsyncNotifyWhenDone().
- handler->on_done_notified_ =
- CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
- std::placeholders::_1, std::placeholders::_2),
- self /* copies ref */);
- handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
- // Request a Watch() call.
- handler->next_ =
- CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
- &handler->stream_, cq, cq,
- &handler->next_);
- }
+ return Status::OK;
}
-DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- WatchCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service)
- : cq_(cq),
- database_(database),
- service_(service),
- stream_(&ctx_),
- call_state_(WAITING_FOR_CALL) {}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
- if (ok) {
- call_state_ = CALL_RECEIVED;
- } else {
- // AsyncNotifyWhenDone() needs to be called before the call starts, but the
- // tag will not pop out if the call never starts (
- // https://github.com/grpc/grpc/issues/10136). So we need to manually
- // release the ownership of the handler in this case.
- GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
- }
- if (!ok || shutdown_) {
- // The value of ok being false means that the server is shutting down.
- Shutdown(std::move(self), "OnCallReceived");
- return;
- }
- // Spawn a new handler instance to serve the next new client. Every handler
- // instance will deallocate itself when it's done.
- CreateAndStart(cq_, database_, service_);
- // Parse request.
- if (!service_->DecodeRequest(request_, &service_name_)) {
- on_finish_done_ =
- CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- stream_.Finish(Status(StatusCode::INVALID_ARGUMENT, ""), &on_finish_done_);
- call_state_ = FINISH_CALLED;
- return;
- }
- // Register the call for updates to the service.
- gpr_log(GPR_DEBUG,
- "[HCS %p] Health check watch started for service \"%s\" "
- "(handler: %p)",
- service_, service_name_.c_str(), this);
- database_->RegisterCallHandler(service_name_, std::move(self));
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
- std::unique_lock<std::mutex> lock(mu_);
- // If there's already a send in flight, cache the new status, and
- // we'll start a new send for it when the one in flight completes.
- if (send_in_flight_) {
- pending_status_ = status;
- return;
- }
- // Start a send.
- SendHealthLocked(std::move(self), status);
-}
-
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
- std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
- if (service_->shutdown_) {
- cq_lock.release()->unlock();
- Shutdown(std::move(self), "SendHealthLocked");
- return;
- }
- send_in_flight_ = true;
- call_state_ = SEND_MESSAGE_PENDING;
- // Construct response.
- ByteBuffer response;
- if (!service_->EncodeResponse(status, &response)) {
- on_finish_done_ =
- CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- stream_.Finish(Status(StatusCode::INTERNAL, ""), &on_finish_done_);
- return;
- }
- next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- stream_.Write(response, &next_);
+DefaultHealthCheckService::DefaultHealthCheckService() {
+ services_map_.emplace("", true);
}
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
- if (!ok || shutdown_) {
- Shutdown(std::move(self), "OnSendHealthDone");
- return;
- }
- call_state_ = CALL_RECEIVED;
- {
- std::unique_lock<std::mutex> lock(mu_);
- send_in_flight_ = false;
- // If we got a new status since we started the last send, start a
- // new send for it.
- if (pending_status_ != NOT_FOUND) {
- auto status = pending_status_;
- pending_status_ = NOT_FOUND;
- SendHealthLocked(std::move(self), status);
- }
- }
+void DefaultHealthCheckService::SetServingStatus(
+ const grpc::string& service_name, bool serving) {
+ std::lock_guard<std::mutex> lock(mu_);
+ services_map_[service_name] = serving;
}
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
- GPR_ASSERT(ok);
- done_notified_ = true;
- if (ctx_.IsCancelled()) {
- is_cancelled_ = true;
+void DefaultHealthCheckService::SetServingStatus(bool serving) {
+ std::lock_guard<std::mutex> lock(mu_);
+ for (auto iter = services_map_.begin(); iter != services_map_.end(); ++iter) {
+ iter->second = serving;
}
- gpr_log(GPR_DEBUG,
- "[HCS %p] Healt check call is notified done (handler: %p, "
- "is_cancelled: %d).",
- service_, this, static_cast<int>(is_cancelled_));
- Shutdown(std::move(self), "OnDoneNotified");
}
-// TODO(roth): This method currently assumes that there will be only one
-// thread polling the cq and invoking the corresponding callbacks. If
-// that changes, we will need to add synchronization here.
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- Shutdown(std::shared_ptr<CallHandler> self, const char* reason) {
- if (!shutdown_) {
- gpr_log(GPR_DEBUG,
- "[HCS %p] Shutting down the handler (service_name: \"%s\", "
- "handler: %p, reason: %s).",
- service_, service_name_.c_str(), this, reason);
- shutdown_ = true;
- }
- // OnCallReceived() may be called after OnDoneNotified(), so we need to
- // try to Finish() every time we are in Shutdown().
- if (call_state_ >= CALL_RECEIVED && call_state_ < FINISH_CALLED) {
- std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
- if (!service_->shutdown_) {
- on_finish_done_ =
- CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
- std::placeholders::_1, std::placeholders::_2),
- std::move(self));
- // TODO(juanlishen): Maybe add a message proto for the client to
- // explicitly cancel the stream so that we can return OK status in such
- // cases.
- stream_.Finish(Status::CANCELLED, &on_finish_done_);
- call_state_ = FINISH_CALLED;
- }
+DefaultHealthCheckService::ServingStatus
+DefaultHealthCheckService::GetServingStatus(
+ const grpc::string& service_name) const {
+ std::lock_guard<std::mutex> lock(mu_);
+ const auto& iter = services_map_.find(service_name);
+ if (iter == services_map_.end()) {
+ return NOT_FOUND;
}
+ return iter->second ? SERVING : NOT_SERVING;
}
-void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
- OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
- if (ok) {
- gpr_log(GPR_DEBUG,
- "[HCS %p] Health check call finished (service_name: \"%s\", "
- "handler: %p).",
- service_, service_name_.c_str(), this);
- }
+DefaultHealthCheckService::HealthCheckServiceImpl*
+DefaultHealthCheckService::GetHealthCheckService() {
+ GPR_ASSERT(impl_ == nullptr);
+ impl_.reset(new HealthCheckServiceImpl(this));
+ return impl_.get();
}
} // namespace grpc
diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h
index edad594936..a1ce5aa64e 100644
--- a/src/cpp/server/health/default_health_check_service.h
+++ b/src/cpp/server/health/default_health_check_service.h
@@ -19,268 +19,42 @@
#ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
#define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
-#include <atomic>
#include <mutex>
-#include <set>
-#include <grpc/support/log.h>
-#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
-#include <grpcpp/impl/codegen/async_generic_service.h>
-#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/support/byte_buffer.h>
-#include "src/core/lib/gprpp/thd.h"
-
namespace grpc {
// Default implementation of HealthCheckServiceInterface. Server will create and
// own it.
class DefaultHealthCheckService final : public HealthCheckServiceInterface {
public:
- enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
-
// The service impl to register with the server.
class HealthCheckServiceImpl : public Service {
public:
- // Base class for call handlers.
- class CallHandler {
- public:
- virtual ~CallHandler() = default;
- virtual void SendHealth(std::shared_ptr<CallHandler> self,
- ServingStatus status) = 0;
- };
+ explicit HealthCheckServiceImpl(DefaultHealthCheckService* service);
- HealthCheckServiceImpl(DefaultHealthCheckService* database,
- std::unique_ptr<ServerCompletionQueue> cq);
-
- ~HealthCheckServiceImpl();
-
- void StartServingThread();
+ Status Check(ServerContext* context, const ByteBuffer* request,
+ ByteBuffer* response);
private:
- // A tag that can be called with a bool argument. It's tailored for
- // CallHandler's use. Before being used, it should be constructed with a
- // method of CallHandler and a shared pointer to the handler. The
- // shared pointer will be moved to the invoked function and the function
- // can only be invoked once. That makes ref counting of the handler easier,
- // because the shared pointer is not bound to the function and can be gone
- // once the invoked function returns (if not used any more).
- class CallableTag {
- public:
- using HandlerFunction =
- std::function<void(std::shared_ptr<CallHandler>, bool)>;
-
- CallableTag() {}
-
- CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
- : handler_function_(std::move(func)), handler_(std::move(handler)) {
- GPR_ASSERT(handler_function_ != nullptr);
- GPR_ASSERT(handler_ != nullptr);
- }
-
- // Runs the tag. This should be called only once. The handler is no
- // longer owned by this tag after this method is invoked.
- void Run(bool ok) {
- GPR_ASSERT(handler_function_ != nullptr);
- GPR_ASSERT(handler_ != nullptr);
- handler_function_(std::move(handler_), ok);
- }
-
- // Releases and returns the shared pointer to the handler.
- std::shared_ptr<CallHandler> ReleaseHandler() {
- return std::move(handler_);
- }
-
- private:
- HandlerFunction handler_function_ = nullptr;
- std::shared_ptr<CallHandler> handler_;
- };
-
- // Call handler for Check method.
- // Each handler takes care of one call. It contains per-call data and it
- // will access the members of the parent class (i.e.,
- // DefaultHealthCheckService) for per-service health data.
- class CheckCallHandler : public CallHandler {
- public:
- // Instantiates a CheckCallHandler and requests the next health check
- // call. The handler object will manage its own lifetime, so no action is
- // needed from the caller any more regarding that object.
- static void CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- // This ctor is public because we want to use std::make_shared<> in
- // CreateAndStart(). This ctor shouldn't be used elsewhere.
- CheckCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- // Not used for Check.
- void SendHealth(std::shared_ptr<CallHandler> self,
- ServingStatus status) override {}
-
- private:
- // Called when we receive a call.
- // Spawns a new handler so that we can keep servicing future calls.
- void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
-
- // Called when Finish() is done.
- void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
-
- // The members passed down from HealthCheckServiceImpl.
- ServerCompletionQueue* cq_;
- DefaultHealthCheckService* database_;
- HealthCheckServiceImpl* service_;
-
- ByteBuffer request_;
- GenericServerAsyncResponseWriter writer_;
- ServerContext ctx_;
-
- CallableTag next_;
- };
-
- // Call handler for Watch method.
- // Each handler takes care of one call. It contains per-call data and it
- // will access the members of the parent class (i.e.,
- // DefaultHealthCheckService) for per-service health data.
- class WatchCallHandler : public CallHandler {
- public:
- // Instantiates a WatchCallHandler and requests the next health check
- // call. The handler object will manage its own lifetime, so no action is
- // needed from the caller any more regarding that object.
- static void CreateAndStart(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- // This ctor is public because we want to use std::make_shared<> in
- // CreateAndStart(). This ctor shouldn't be used elsewhere.
- WatchCallHandler(ServerCompletionQueue* cq,
- DefaultHealthCheckService* database,
- HealthCheckServiceImpl* service);
-
- void SendHealth(std::shared_ptr<CallHandler> self,
- ServingStatus status) override;
-
- private:
- // Called when we receive a call.
- // Spawns a new handler so that we can keep servicing future calls.
- void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
-
- // Requires holding mu_.
- void SendHealthLocked(std::shared_ptr<CallHandler> self,
- ServingStatus status);
-
- // When sending a health result finishes.
- void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
-
- // Called when Finish() is done.
- void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
-
- // Called when AsyncNotifyWhenDone() notifies us.
- void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
-
- void Shutdown(std::shared_ptr<CallHandler> self, const char* reason);
-
- // The members passed down from HealthCheckServiceImpl.
- ServerCompletionQueue* cq_;
- DefaultHealthCheckService* database_;
- HealthCheckServiceImpl* service_;
-
- ByteBuffer request_;
- grpc::string service_name_;
- GenericServerAsyncWriter stream_;
- ServerContext ctx_;
-
- std::mutex mu_;
- bool send_in_flight_ = false; // Guarded by mu_.
- ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
-
- // The state of the RPC progress.
- enum CallState {
- WAITING_FOR_CALL,
- CALL_RECEIVED,
- SEND_MESSAGE_PENDING,
- FINISH_CALLED
- } call_state_;
-
- bool shutdown_ = false;
- bool done_notified_ = false;
- bool is_cancelled_ = false;
- CallableTag next_;
- CallableTag on_done_notified_;
- CallableTag on_finish_done_;
- };
-
- // Handles the incoming requests and drives the completion queue in a loop.
- static void Serve(void* arg);
-
- // Returns true on success.
- static bool DecodeRequest(const ByteBuffer& request,
- grpc::string* service_name);
- static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
-
- // Needed to appease Windows compilers, which don't seem to allow
- // nested classes to access protected members in the parent's
- // superclass.
- using Service::RequestAsyncServerStreaming;
- using Service::RequestAsyncUnary;
-
- DefaultHealthCheckService* database_;
- std::unique_ptr<ServerCompletionQueue> cq_;
- internal::RpcServiceMethod* check_method_;
- internal::RpcServiceMethod* watch_method_;
-
- // To synchronize the operations related to shutdown state of cq_, so that
- // we don't enqueue new tags into cq_ after it is already shut down.
- std::mutex cq_shutdown_mu_;
- std::atomic_bool shutdown_{false};
- std::unique_ptr<::grpc_core::Thread> thread_;
+ const DefaultHealthCheckService* const service_;
+ internal::RpcServiceMethod* method_;
};
DefaultHealthCheckService();
-
void SetServingStatus(const grpc::string& service_name,
bool serving) override;
void SetServingStatus(bool serving) override;
-
+ enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
ServingStatus GetServingStatus(const grpc::string& service_name) const;
-
- HealthCheckServiceImpl* GetHealthCheckService(
- std::unique_ptr<ServerCompletionQueue> cq);
+ HealthCheckServiceImpl* GetHealthCheckService();
private:
- // Stores the current serving status of a service and any call
- // handlers registered for updates when the service's status changes.
- class ServiceData {
- public:
- void SetServingStatus(ServingStatus status);
- ServingStatus GetServingStatus() const { return status_; }
- void AddCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
- void RemoveCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
- bool Unused() const {
- return call_handlers_.empty() && status_ == NOT_FOUND;
- }
-
- private:
- ServingStatus status_ = NOT_FOUND;
- std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
- call_handlers_;
- };
-
- void RegisterCallHandler(
- const grpc::string& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
-
- void UnregisterCallHandler(
- const grpc::string& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
-
mutable std::mutex mu_;
- std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_.
+ std::map<grpc::string, bool> services_map_;
std::unique_ptr<HealthCheckServiceImpl> impl_;
};
diff --git a/src/cpp/server/health/health.pb.c b/src/cpp/server/health/health.pb.c
index 5c214c7160..09bd98a3d9 100644
--- a/src/cpp/server/health/health.pb.c
+++ b/src/cpp/server/health/health.pb.c
@@ -2,6 +2,7 @@
/* Generated by nanopb-0.3.7-dev */
#include "src/cpp/server/health/health.pb.h"
+
/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
diff --git a/src/cpp/server/health/health.pb.h b/src/cpp/server/health/health.pb.h
index 9d54ccd618..29e1f3bacb 100644
--- a/src/cpp/server/health/health.pb.h
+++ b/src/cpp/server/health/health.pb.h
@@ -17,12 +17,11 @@ extern "C" {
typedef enum _grpc_health_v1_HealthCheckResponse_ServingStatus {
grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN = 0,
grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING = 1,
- grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2,
- grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN = 3
+ grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2
} grpc_health_v1_HealthCheckResponse_ServingStatus;
#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MIN grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN
-#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
-#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN+1))
+#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING
+#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING+1))
/* Struct definitions */
typedef struct _grpc_health_v1_HealthCheckRequest {
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 48f0f110a6..b8ba7042d9 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -559,20 +559,16 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
// Only create default health check service when user did not provide an
// explicit one.
- ServerCompletionQueue* health_check_cq = nullptr;
- DefaultHealthCheckService::HealthCheckServiceImpl*
- default_health_check_service_impl = nullptr;
if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
DefaultHealthCheckServiceEnabled()) {
- auto* default_hc_service = new DefaultHealthCheckService;
- health_check_service_.reset(default_hc_service);
- health_check_cq = new ServerCompletionQueue(GRPC_CQ_DEFAULT_POLLING);
- grpc_server_register_completion_queue(server_, health_check_cq->cq(),
- nullptr);
- default_health_check_service_impl =
- default_hc_service->GetHealthCheckService(
- std::unique_ptr<ServerCompletionQueue>(health_check_cq));
- RegisterService(nullptr, default_health_check_service_impl);
+ if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) {
+ gpr_log(GPR_INFO,
+ "Default health check service disabled at async-only server.");
+ } else {
+ auto* default_hc_service = new DefaultHealthCheckService;
+ health_check_service_.reset(default_hc_service);
+ RegisterService(nullptr, default_hc_service->GetHealthCheckService());
+ }
}
grpc_server_start(server_);
@@ -587,9 +583,6 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
- if (health_check_cq != nullptr) {
- new UnimplementedAsyncRequest(this, health_check_cq);
- }
}
// If this server has any support for synchronous methods (has any sync
@@ -602,10 +595,6 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start();
}
-
- if (default_health_check_service_impl != nullptr) {
- default_health_check_service_impl->StartServingThread();
- }
}
void Server::ShutdownInternal(gpr_timespec deadline) {
diff --git a/src/proto/grpc/health/v1/health.proto b/src/proto/grpc/health/v1/health.proto
index 38843ff1e7..4b4677b8a4 100644
--- a/src/proto/grpc/health/v1/health.proto
+++ b/src/proto/grpc/health/v1/health.proto
@@ -34,30 +34,10 @@ message HealthCheckResponse {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
- SERVICE_UNKNOWN = 3; // Used only by the Watch method.
}
ServingStatus status = 1;
}
service Health {
- // If the requested service is unknown, the call will fail with status
- // NOT_FOUND.
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
-
- // Performs a watch for the serving status of the requested service.
- // The server will immediately send back a message indicating the current
- // serving status. It will then subsequently send a new message whenever
- // the service's serving status changes.
- //
- // If the requested service is unknown when the call is received, the
- // server will send a message setting the serving status to
- // SERVICE_UNKNOWN but will *not* terminate the call. If at some
- // future point, the serving status of the service becomes known, the
- // server will send a new message with the service's serving status.
- //
- // If the call terminates with status UNIMPLEMENTED, then clients
- // should assume this method is not supported and should not retry the
- // call. If the call terminates with any other status (including OK),
- // clients should retry the call with appropriate exponential backoff.
- rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index 25ba7aa275..0c46f6c85a 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -97,6 +97,7 @@ grpc_resource_quota_resize_type grpc_resource_quota_resize_import;
grpc_resource_quota_set_max_threads_type grpc_resource_quota_set_max_threads_import;
grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import;
grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import;
+grpc_channelz_get_servers_type grpc_channelz_get_servers_import;
grpc_channelz_get_channel_type grpc_channelz_get_channel_import;
grpc_channelz_get_subchannel_type grpc_channelz_get_subchannel_import;
grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
@@ -352,6 +353,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_resource_quota_set_max_threads_import = (grpc_resource_quota_set_max_threads_type) GetProcAddress(library, "grpc_resource_quota_set_max_threads");
grpc_resource_quota_arg_vtable_import = (grpc_resource_quota_arg_vtable_type) GetProcAddress(library, "grpc_resource_quota_arg_vtable");
grpc_channelz_get_top_channels_import = (grpc_channelz_get_top_channels_type) GetProcAddress(library, "grpc_channelz_get_top_channels");
+ grpc_channelz_get_servers_import = (grpc_channelz_get_servers_type) GetProcAddress(library, "grpc_channelz_get_servers");
grpc_channelz_get_channel_import = (grpc_channelz_get_channel_type) GetProcAddress(library, "grpc_channelz_get_channel");
grpc_channelz_get_subchannel_import = (grpc_channelz_get_subchannel_type) GetProcAddress(library, "grpc_channelz_get_subchannel");
grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index 30fcb79407..6adddb536c 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -266,6 +266,9 @@ extern grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import
typedef char*(*grpc_channelz_get_top_channels_type)(intptr_t start_channel_id);
extern grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import;
#define grpc_channelz_get_top_channels grpc_channelz_get_top_channels_import
+typedef char*(*grpc_channelz_get_servers_type)(intptr_t start_channel_id);
+extern grpc_channelz_get_servers_type grpc_channelz_get_servers_import;
+#define grpc_channelz_get_servers grpc_channelz_get_servers_import
typedef char*(*grpc_channelz_get_channel_type)(intptr_t channel_id);
extern grpc_channelz_get_channel_type grpc_channelz_get_channel_import;
#define grpc_channelz_get_channel grpc_channelz_get_channel_import