aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Noah Eisen <ncteisen@gmail.com>2018-09-10 13:54:23 -0700
committerGravatar GitHub <noreply@github.com>2018-09-10 13:54:23 -0700
commit107d10ea73f77dc9bb498c9b91e1fcd0188dfb45 (patch)
treecd281257ca549ec7726aa3e6e00a826e183f8c2b /src
parentcdb4ad333da599b0c6bf79ebd894662037162ca3 (diff)
parent722bc69966d1cf7149235fae6e6690d1a87fe28e (diff)
Merge pull request #16346 from ncteisen/channelz-server
Channelz Part 5: Server Support
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/channel/channelz.cc38
-rw-r--r--src/core/lib/channel/channelz.h26
-rw-r--r--src/core/lib/channel/channelz_registry.cc40
-rw-r--r--src/core/lib/channel/channelz_registry.h7
-rw-r--r--src/core/lib/surface/call.cc38
-rw-r--r--src/core/lib/surface/call.h1
-rw-r--r--src/core/lib/surface/channel.cc7
-rw-r--r--src/core/lib/surface/server.cc24
-rw-r--r--src/core/lib/surface/server.h4
-rw-r--r--src/cpp/server/channelz/channelz_service.cc6
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h3
12 files changed, 175 insertions, 21 deletions
diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc
index 9f54850002..375cf25cc6 100644
--- a/src/core/lib/channel/channelz.cc
+++ b/src/core/lib/channel/channelz.cc
@@ -48,6 +48,7 @@ BaseNode::~BaseNode() { ChannelzRegistry::Unregister(uuid_); }
char* BaseNode::RenderJsonString() {
grpc_json* json = RenderJson();
+ GPR_ASSERT(json != nullptr);
char* json_str = grpc_json_dump_to_string(json, 0);
grpc_json_destroy(json);
return json_str;
@@ -146,5 +147,42 @@ RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode(
channel, channel_tracer_max_nodes, is_top_level_channel);
}
+ServerNode::ServerNode(size_t channel_tracer_max_nodes)
+ : BaseNode(EntityType::kServer), trace_(channel_tracer_max_nodes) {}
+
+ServerNode::~ServerNode() {}
+
+grpc_json* ServerNode::RenderJson() {
+ // We need to track these three json objects to build our object
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* json_iterator = nullptr;
+ // create and fill the ref child
+ json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = json_iterator;
+ json_iterator = nullptr;
+ json_iterator = grpc_json_add_number_string_child(json, json_iterator,
+ "serverId", uuid());
+ // reset json iterators to top level object
+ json = top_level_json;
+ json_iterator = nullptr;
+ // create and fill the data child.
+ grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr,
+ GRPC_JSON_OBJECT, false);
+ json = data;
+ json_iterator = nullptr;
+ // fill in the channel trace if applicable
+ grpc_json* trace_json = trace_.RenderJson();
+ if (trace_json != nullptr) {
+ trace_json->key = "trace"; // this object is named trace in channelz.proto
+ grpc_json_link_child(json, trace_json, nullptr);
+ }
+ // ask CallCountingHelper to populate trace and call count data.
+ call_counter_.PopulateCallCounts(json);
+ json = top_level_json;
+ return top_level_json;
+}
+
} // namespace channelz
} // namespace grpc_core
diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h
index db5d05140d..9be256147b 100644
--- a/src/core/lib/channel/channelz.h
+++ b/src/core/lib/channel/channelz.h
@@ -170,12 +170,30 @@ class ChannelNode : public BaseNode {
};
// Handles channelz bookkeeping for servers
-// TODO(ncteisen): implement in subsequent PR.
class ServerNode : public BaseNode {
public:
- explicit ServerNode(size_t channel_tracer_max_nodes)
- : BaseNode(EntityType::kServer) {}
- ~ServerNode() override {}
+ explicit ServerNode(size_t channel_tracer_max_nodes);
+ ~ServerNode() override;
+
+ grpc_json* RenderJson() override;
+
+ // proxy methods to composed classes.
+ void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) {
+ trace_.AddTraceEvent(severity, data);
+ }
+ void AddTraceEventWithReference(ChannelTrace::Severity severity,
+ grpc_slice data,
+ RefCountedPtr<BaseNode> referenced_channel) {
+ trace_.AddTraceEventWithReference(severity, data,
+ std::move(referenced_channel));
+ }
+ void RecordCallStarted() { call_counter_.RecordCallStarted(); }
+ void RecordCallFailed() { call_counter_.RecordCallFailed(); }
+ void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
+
+ private:
+ CallCountingHelper call_counter_;
+ ChannelTrace trace_;
};
// Handles channelz bookkeeping for sockets
diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc
index 972c61c1d6..adc7b6ba44 100644
--- a/src/core/lib/channel/channelz_registry.cc
+++ b/src/core/lib/channel/channelz_registry.cc
@@ -112,6 +112,42 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
return json_str;
}
+char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) {
+ grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
+ grpc_json* json = top_level_json;
+ grpc_json* json_iterator = nullptr;
+ InlinedVector<BaseNode*, 10> servers;
+ // uuids index into entities one-off (idx 0 is really uuid 1, since 0 is
+ // reserved). However, we want to support requests coming in with
+ // start_server_id=0, which signifies "give me everything."
+ size_t start_idx = start_server_id == 0 ? 0 : start_server_id - 1;
+ for (size_t i = start_idx; i < entities_.size(); ++i) {
+ if (entities_[i] != nullptr &&
+ entities_[i]->type() ==
+ grpc_core::channelz::BaseNode::EntityType::kServer) {
+ servers.push_back(entities_[i]);
+ }
+ }
+ if (!servers.empty()) {
+ // create list of servers
+ grpc_json* array_parent = grpc_json_create_child(
+ nullptr, json, "server", nullptr, GRPC_JSON_ARRAY, false);
+ for (size_t i = 0; i < servers.size(); ++i) {
+ grpc_json* server_json = servers[i]->RenderJson();
+ json_iterator =
+ grpc_json_link_child(array_parent, server_json, json_iterator);
+ }
+ }
+ // For now we do not have any pagination rules. In the future we could
+ // pick a constant for max_channels_sent for a GetServers request.
+ // Tracking: https://github.com/grpc/grpc/issues/16019.
+ json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr,
+ GRPC_JSON_TRUE, false);
+ char* json_str = grpc_json_dump_to_string(top_level_json, 0);
+ grpc_json_destroy(top_level_json);
+ return json_str;
+}
+
} // namespace channelz
} // namespace grpc_core
@@ -120,6 +156,10 @@ char* grpc_channelz_get_top_channels(intptr_t start_channel_id) {
start_channel_id);
}
+char* grpc_channelz_get_servers(intptr_t start_server_id) {
+ return grpc_core::channelz::ChannelzRegistry::GetServers(start_server_id);
+}
+
char* grpc_channelz_get_channel(intptr_t channel_id) {
grpc_core::channelz::BaseNode* channel_node =
grpc_core::channelz::ChannelzRegistry::Get(channel_id);
diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h
index 142c039220..d0d660600d 100644
--- a/src/core/lib/channel/channelz_registry.h
+++ b/src/core/lib/channel/channelz_registry.h
@@ -52,6 +52,12 @@ class ChannelzRegistry {
return Default()->InternalGetTopChannels(start_channel_id);
}
+ // Returns the allocated JSON string that represents the proto
+ // GetServersResponse as per channelz.proto.
+ static char* GetServers(intptr_t start_server_id) {
+ return Default()->InternalGetServers(start_server_id);
+ }
+
private:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
@@ -74,6 +80,7 @@ class ChannelzRegistry {
BaseNode* InternalGet(intptr_t uuid);
char* InternalGetTopChannels(intptr_t start_channel_id);
+ char* InternalGetServers(intptr_t start_server_id);
// protects entities_ and uuid_
gpr_mu mu_;
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index b07c4d6c10..3e008e5606 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -48,6 +48,7 @@
#include "src/core/lib/surface/call_test_only.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/server.h"
#include "src/core/lib/surface/validate_metadata.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata.h"
@@ -202,6 +203,8 @@ struct grpc_call {
} client;
struct {
int* cancelled;
+ // backpointer to owning server if this is a server side call.
+ grpc_server* server;
} server;
} final_op;
grpc_error* status_error;
@@ -311,14 +314,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_MESSAGE_COMPRESS_NONE);
call->is_client = args->server_transport_data == nullptr;
- if (call->is_client) {
- GRPC_STATS_INC_CLIENT_CALLS_CREATED();
- } else {
- GRPC_STATS_INC_SERVER_CALLS_CREATED();
- }
call->stream_op_payload.context = call->context;
grpc_slice path = grpc_empty_slice();
if (call->is_client) {
+ GRPC_STATS_INC_CLIENT_CALLS_CREATED();
GPR_ASSERT(args->add_initial_metadata_count <
MAX_SEND_EXTRA_METADATA_COUNT);
for (i = 0; i < args->add_initial_metadata_count; i++) {
@@ -332,6 +331,8 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
call->send_extra_metadata_count =
static_cast<int>(args->add_initial_metadata_count);
} else {
+ GRPC_STATS_INC_SERVER_CALLS_CREATED();
+ call->final_op.server.server = args->server;
GPR_ASSERT(args->add_initial_metadata_count == 0);
call->send_extra_metadata_count = 0;
}
@@ -435,10 +436,18 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
&call->pollent);
}
- grpc_core::channelz::ChannelNode* channelz_channel =
- grpc_channel_get_channelz_node(call->channel);
- if (channelz_channel != nullptr) {
- channelz_channel->RecordCallStarted();
+ if (call->is_client) {
+ grpc_core::channelz::ChannelNode* channelz_channel =
+ grpc_channel_get_channelz_node(call->channel);
+ if (channelz_channel != nullptr) {
+ channelz_channel->RecordCallStarted();
+ }
+ } else {
+ grpc_core::channelz::ServerNode* channelz_server =
+ grpc_server_get_channelz_node(call->final_op.server.server);
+ if (channelz_server != nullptr) {
+ channelz_server->RecordCallStarted();
+ }
}
grpc_slice_unref_internal(path);
@@ -709,14 +718,15 @@ static void set_final_status(grpc_call* call, grpc_error* error) {
} else {
*call->final_op.server.cancelled =
error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE;
- /* TODO(ncteisen) : Update channelz handling for server
- if (channelz_channel != nullptr) {
+ grpc_core::channelz::ServerNode* channelz_server =
+ grpc_server_get_channelz_node(call->final_op.server.server);
+ if (channelz_server != nullptr) {
if (*call->final_op.server.cancelled) {
- channelz_channel->RecordCallFailed();
+ channelz_server->RecordCallFailed();
} else {
- channelz_channel->RecordCallSucceeded();
+ channelz_server->RecordCallSucceeded();
}
- } */
+ }
GRPC_ERROR_UNREF(error);
}
}
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index b3b06059d4..b34260505a 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -33,6 +33,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success,
typedef struct grpc_call_create_args {
grpc_channel* channel;
+ grpc_server* server;
grpc_call* parent;
uint32_t propagation_mask;
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index 895ecaf6c8..054fe105c3 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -165,10 +165,11 @@ grpc_channel* grpc_channel_create_with_builder(
}
grpc_channel_args_destroy(args);
- if (channelz_enabled) {
- bool is_top_level_channel = channel->is_client && !internal_channel;
+ // we only need to do the channelz bookkeeping for clients here. The channelz
+ // bookkeeping for server channels occurs in src/core/lib/surface/server.cc
+ if (channelz_enabled && channel->is_client) {
channel->channelz_channel = channel_node_create_func(
- channel, channel_tracer_max_nodes, is_top_level_channel);
+ channel, channel_tracer_max_nodes, !internal_channel);
channel->channelz_channel->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 521825ca69..c0fae0f140 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -222,6 +222,8 @@ struct grpc_server {
/** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time;
+
+ grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;
};
#define SERVER_FROM_CALL_ELEM(elem) \
@@ -367,6 +369,7 @@ static void server_ref(grpc_server* server) {
static void server_delete(grpc_server* server) {
registered_method* rm;
size_t i;
+ server->channelz_server.reset();
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
@@ -796,6 +799,7 @@ static void accept_stream(void* cd, grpc_transport* transport,
args.channel = chand->channel;
args.server_transport_data = transport_server_data;
args.send_deadline = GRPC_MILLIS_INF_FUTURE;
+ args.server = chand->server;
grpc_call* call;
grpc_error* error = grpc_call_create(&args, &call);
grpc_call_element* elem =
@@ -960,6 +964,7 @@ void grpc_server_register_completion_queue(grpc_server* server,
}
grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
+ grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
grpc_server* server =
@@ -976,6 +981,20 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
server->channel_args = grpc_channel_args_copy(args);
+ const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);
+ if (grpc_channel_arg_get_bool(arg, false)) {
+ arg = grpc_channel_args_find(args,
+ GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE);
+ size_t trace_events_per_node =
+ grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
+ server->channelz_server =
+ grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
+ trace_events_per_node);
+ server->channelz_server->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string("Server created"));
+ }
+
return server;
}
@@ -1478,3 +1497,8 @@ int grpc_server_has_open_connections(grpc_server* server) {
gpr_mu_unlock(&server->mu_global);
return r;
}
+
+grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
+ grpc_server* server) {
+ return server->channelz_server.get();
+}
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index c617cc223e..0196743ff9 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -23,6 +23,7 @@
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/transport.h"
@@ -46,6 +47,9 @@ void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport,
grpc_pollset* accepting_pollset,
const grpc_channel_args* args);
+grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
+ grpc_server* server);
+
const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server);
int grpc_server_has_open_connections(grpc_server* server);
diff --git a/src/cpp/server/channelz/channelz_service.cc b/src/cpp/server/channelz/channelz_service.cc
index e0ef4acd58..c055a9b1ba 100644
--- a/src/cpp/server/channelz/channelz_service.cc
+++ b/src/cpp/server/channelz/channelz_service.cc
@@ -32,6 +32,9 @@ Status ChannelzService::GetTopChannels(
ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request,
channelz::v1::GetTopChannelsResponse* response) {
char* json_str = grpc_channelz_get_top_channels(request->start_channel_id());
+ if (json_str == nullptr) {
+ return Status(INTERNAL, "grpc_channelz_get_top_channels returned null");
+ }
google::protobuf::util::Status s =
google::protobuf::util::JsonStringToMessage(json_str, response);
gpr_free(json_str);
@@ -45,6 +48,9 @@ Status ChannelzService::GetChannel(
ServerContext* unused, const channelz::v1::GetChannelRequest* request,
channelz::v1::GetChannelResponse* response) {
char* json_str = grpc_channelz_get_channel(request->channel_id());
+ if (json_str == nullptr) {
+ return Status(NOT_FOUND, "No object found for that ChannelId");
+ }
google::protobuf::util::Status s =
google::protobuf::util::JsonStringToMessage(json_str, response);
gpr_free(json_str);
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index 25ba7aa275..0c46f6c85a 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -97,6 +97,7 @@ grpc_resource_quota_resize_type grpc_resource_quota_resize_import;
grpc_resource_quota_set_max_threads_type grpc_resource_quota_set_max_threads_import;
grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import;
grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import;
+grpc_channelz_get_servers_type grpc_channelz_get_servers_import;
grpc_channelz_get_channel_type grpc_channelz_get_channel_import;
grpc_channelz_get_subchannel_type grpc_channelz_get_subchannel_import;
grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
@@ -352,6 +353,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_resource_quota_set_max_threads_import = (grpc_resource_quota_set_max_threads_type) GetProcAddress(library, "grpc_resource_quota_set_max_threads");
grpc_resource_quota_arg_vtable_import = (grpc_resource_quota_arg_vtable_type) GetProcAddress(library, "grpc_resource_quota_arg_vtable");
grpc_channelz_get_top_channels_import = (grpc_channelz_get_top_channels_type) GetProcAddress(library, "grpc_channelz_get_top_channels");
+ grpc_channelz_get_servers_import = (grpc_channelz_get_servers_type) GetProcAddress(library, "grpc_channelz_get_servers");
grpc_channelz_get_channel_import = (grpc_channelz_get_channel_type) GetProcAddress(library, "grpc_channelz_get_channel");
grpc_channelz_get_subchannel_import = (grpc_channelz_get_subchannel_type) GetProcAddress(library, "grpc_channelz_get_subchannel");
grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index 30fcb79407..6adddb536c 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -266,6 +266,9 @@ extern grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import
typedef char*(*grpc_channelz_get_top_channels_type)(intptr_t start_channel_id);
extern grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import;
#define grpc_channelz_get_top_channels grpc_channelz_get_top_channels_import
+typedef char*(*grpc_channelz_get_servers_type)(intptr_t start_channel_id);
+extern grpc_channelz_get_servers_type grpc_channelz_get_servers_import;
+#define grpc_channelz_get_servers grpc_channelz_get_servers_import
typedef char*(*grpc_channelz_get_channel_type)(intptr_t channel_id);
extern grpc_channelz_get_channel_type grpc_channelz_get_channel_import;
#define grpc_channelz_get_channel grpc_channelz_get_channel_import