diff options
author | Noah Eisen <ncteisen@gmail.com> | 2018-09-10 13:54:23 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-10 13:54:23 -0700 |
commit | 107d10ea73f77dc9bb498c9b91e1fcd0188dfb45 (patch) | |
tree | cd281257ca549ec7726aa3e6e00a826e183f8c2b /src | |
parent | cdb4ad333da599b0c6bf79ebd894662037162ca3 (diff) | |
parent | 722bc69966d1cf7149235fae6e6690d1a87fe28e (diff) |
Merge pull request #16346 from ncteisen/channelz-server
Channelz Part 5: Server Support
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/channel/channelz.cc | 38 | ||||
-rw-r--r-- | src/core/lib/channel/channelz.h | 26 | ||||
-rw-r--r-- | src/core/lib/channel/channelz_registry.cc | 40 | ||||
-rw-r--r-- | src/core/lib/channel/channelz_registry.h | 7 | ||||
-rw-r--r-- | src/core/lib/surface/call.cc | 38 | ||||
-rw-r--r-- | src/core/lib/surface/call.h | 1 | ||||
-rw-r--r-- | src/core/lib/surface/channel.cc | 7 | ||||
-rw-r--r-- | src/core/lib/surface/server.cc | 24 | ||||
-rw-r--r-- | src/core/lib/surface/server.h | 4 | ||||
-rw-r--r-- | src/cpp/server/channelz/channelz_service.cc | 6 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc_imports.generated.c | 2 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc_imports.generated.h | 3 |
12 files changed, 175 insertions, 21 deletions
diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 9f54850002..375cf25cc6 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -48,6 +48,7 @@ BaseNode::~BaseNode() { ChannelzRegistry::Unregister(uuid_); } char* BaseNode::RenderJsonString() { grpc_json* json = RenderJson(); + GPR_ASSERT(json != nullptr); char* json_str = grpc_json_dump_to_string(json, 0); grpc_json_destroy(json); return json_str; @@ -146,5 +147,42 @@ RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode( channel, channel_tracer_max_nodes, is_top_level_channel); } +ServerNode::ServerNode(size_t channel_tracer_max_nodes) + : BaseNode(EntityType::kServer), trace_(channel_tracer_max_nodes) {} + +ServerNode::~ServerNode() {} + +grpc_json* ServerNode::RenderJson() { + // We need to track these three json objects to build our object + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + // create and fill the ref child + json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr, + GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = grpc_json_add_number_string_child(json, json_iterator, + "serverId", uuid()); + // reset json iterators to top level object + json = top_level_json; + json_iterator = nullptr; + // create and fill the data child. + grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr, + GRPC_JSON_OBJECT, false); + json = data; + json_iterator = nullptr; + // fill in the channel trace if applicable + grpc_json* trace_json = trace_.RenderJson(); + if (trace_json != nullptr) { + trace_json->key = "trace"; // this object is named trace in channelz.proto + grpc_json_link_child(json, trace_json, nullptr); + } + // ask CallCountingHelper to populate trace and call count data. + call_counter_.PopulateCallCounts(json); + json = top_level_json; + return top_level_json; +} + } // namespace channelz } // namespace grpc_core diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index db5d05140d..9be256147b 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -170,12 +170,30 @@ class ChannelNode : public BaseNode { }; // Handles channelz bookkeeping for servers -// TODO(ncteisen): implement in subsequent PR. class ServerNode : public BaseNode { public: - explicit ServerNode(size_t channel_tracer_max_nodes) - : BaseNode(EntityType::kServer) {} - ~ServerNode() override {} + explicit ServerNode(size_t channel_tracer_max_nodes); + ~ServerNode() override; + + grpc_json* RenderJson() override; + + // proxy methods to composed classes. + void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) { + trace_.AddTraceEvent(severity, data); + } + void AddTraceEventWithReference(ChannelTrace::Severity severity, + grpc_slice data, + RefCountedPtr<BaseNode> referenced_channel) { + trace_.AddTraceEventWithReference(severity, data, + std::move(referenced_channel)); + } + void RecordCallStarted() { call_counter_.RecordCallStarted(); } + void RecordCallFailed() { call_counter_.RecordCallFailed(); } + void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } + + private: + CallCountingHelper call_counter_; + ChannelTrace trace_; }; // Handles channelz bookkeeping for sockets diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc index 972c61c1d6..adc7b6ba44 100644 --- a/src/core/lib/channel/channelz_registry.cc +++ b/src/core/lib/channel/channelz_registry.cc @@ -112,6 +112,42 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { return json_str; } +char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) { + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + InlinedVector<BaseNode*, 10> servers; + // uuids index into entities one-off (idx 0 is really uuid 1, since 0 is + // reserved). However, we want to support requests coming in with + // start_server_id=0, which signifies "give me everything." + size_t start_idx = start_server_id == 0 ? 0 : start_server_id - 1; + for (size_t i = start_idx; i < entities_.size(); ++i) { + if (entities_[i] != nullptr && + entities_[i]->type() == + grpc_core::channelz::BaseNode::EntityType::kServer) { + servers.push_back(entities_[i]); + } + } + if (!servers.empty()) { + // create list of servers + grpc_json* array_parent = grpc_json_create_child( + nullptr, json, "server", nullptr, GRPC_JSON_ARRAY, false); + for (size_t i = 0; i < servers.size(); ++i) { + grpc_json* server_json = servers[i]->RenderJson(); + json_iterator = + grpc_json_link_child(array_parent, server_json, json_iterator); + } + } + // For now we do not have any pagination rules. In the future we could + // pick a constant for max_channels_sent for a GetServers request. + // Tracking: https://github.com/grpc/grpc/issues/16019. + json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr, + GRPC_JSON_TRUE, false); + char* json_str = grpc_json_dump_to_string(top_level_json, 0); + grpc_json_destroy(top_level_json); + return json_str; +} + } // namespace channelz } // namespace grpc_core @@ -120,6 +156,10 @@ char* grpc_channelz_get_top_channels(intptr_t start_channel_id) { start_channel_id); } +char* grpc_channelz_get_servers(intptr_t start_server_id) { + return grpc_core::channelz::ChannelzRegistry::GetServers(start_server_id); +} + char* grpc_channelz_get_channel(intptr_t channel_id) { grpc_core::channelz::BaseNode* channel_node = grpc_core::channelz::ChannelzRegistry::Get(channel_id); diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h index 142c039220..d0d660600d 100644 --- a/src/core/lib/channel/channelz_registry.h +++ b/src/core/lib/channel/channelz_registry.h @@ -52,6 +52,12 @@ class ChannelzRegistry { return Default()->InternalGetTopChannels(start_channel_id); } + // Returns the allocated JSON string that represents the proto + // GetServersResponse as per channelz.proto. + static char* GetServers(intptr_t start_server_id) { + return Default()->InternalGetServers(start_server_id); + } + private: GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE @@ -74,6 +80,7 @@ class ChannelzRegistry { BaseNode* InternalGet(intptr_t uuid); char* InternalGetTopChannels(intptr_t start_channel_id); + char* InternalGetServers(intptr_t start_server_id); // protects entities_ and uuid_ gpr_mu mu_; diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index b07c4d6c10..3e008e5606 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -48,6 +48,7 @@ #include "src/core/lib/surface/call_test_only.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/surface/server.h" #include "src/core/lib/surface/validate_metadata.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/metadata.h" @@ -202,6 +203,8 @@ struct grpc_call { } client; struct { int* cancelled; + // backpointer to owning server if this is a server side call. + grpc_server* server; } server; } final_op; grpc_error* status_error; @@ -311,14 +314,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_MESSAGE_COMPRESS_NONE); call->is_client = args->server_transport_data == nullptr; - if (call->is_client) { - GRPC_STATS_INC_CLIENT_CALLS_CREATED(); - } else { - GRPC_STATS_INC_SERVER_CALLS_CREATED(); - } call->stream_op_payload.context = call->context; grpc_slice path = grpc_empty_slice(); if (call->is_client) { + GRPC_STATS_INC_CLIENT_CALLS_CREATED(); GPR_ASSERT(args->add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); for (i = 0; i < args->add_initial_metadata_count; i++) { @@ -332,6 +331,8 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, call->send_extra_metadata_count = static_cast<int>(args->add_initial_metadata_count); } else { + GRPC_STATS_INC_SERVER_CALLS_CREATED(); + call->final_op.server.server = args->server; GPR_ASSERT(args->add_initial_metadata_count == 0); call->send_extra_metadata_count = 0; } @@ -435,10 +436,18 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, &call->pollent); } - grpc_core::channelz::ChannelNode* channelz_channel = - grpc_channel_get_channelz_node(call->channel); - if (channelz_channel != nullptr) { - channelz_channel->RecordCallStarted(); + if (call->is_client) { + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(call->channel); + if (channelz_channel != nullptr) { + channelz_channel->RecordCallStarted(); + } + } else { + grpc_core::channelz::ServerNode* channelz_server = + grpc_server_get_channelz_node(call->final_op.server.server); + if (channelz_server != nullptr) { + channelz_server->RecordCallStarted(); + } } grpc_slice_unref_internal(path); @@ -709,14 +718,15 @@ static void set_final_status(grpc_call* call, grpc_error* error) { } else { *call->final_op.server.cancelled = error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE; - /* TODO(ncteisen) : Update channelz handling for server - if (channelz_channel != nullptr) { + grpc_core::channelz::ServerNode* channelz_server = + grpc_server_get_channelz_node(call->final_op.server.server); + if (channelz_server != nullptr) { if (*call->final_op.server.cancelled) { - channelz_channel->RecordCallFailed(); + channelz_server->RecordCallFailed(); } else { - channelz_channel->RecordCallSucceeded(); + channelz_server->RecordCallSucceeded(); } - } */ + } GRPC_ERROR_UNREF(error); } } diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index b3b06059d4..b34260505a 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -33,6 +33,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success, typedef struct grpc_call_create_args { grpc_channel* channel; + grpc_server* server; grpc_call* parent; uint32_t propagation_mask; diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 895ecaf6c8..054fe105c3 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -165,10 +165,11 @@ grpc_channel* grpc_channel_create_with_builder( } grpc_channel_args_destroy(args); - if (channelz_enabled) { - bool is_top_level_channel = channel->is_client && !internal_channel; + // we only need to do the channelz bookkeeping for clients here. The channelz + // bookkeeping for server channels occurs in src/core/lib/surface/server.cc + if (channelz_enabled && channel->is_client) { channel->channelz_channel = channel_node_create_func( - channel, channel_tracer_max_nodes, is_top_level_channel); + channel, channel_tracer_max_nodes, !internal_channel); channel->channelz_channel->AddTraceEvent( grpc_core::channelz::ChannelTrace::Severity::Info, grpc_slice_from_static_string("Channel created")); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 521825ca69..c0fae0f140 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -222,6 +222,8 @@ struct grpc_server { /** when did we print the last shutdown progress message */ gpr_timespec last_shutdown_message_time; + + grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server; }; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -367,6 +369,7 @@ static void server_ref(grpc_server* server) { static void server_delete(grpc_server* server) { registered_method* rm; size_t i; + server->channelz_server.reset(); grpc_channel_args_destroy(server->channel_args); gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_call); @@ -796,6 +799,7 @@ static void accept_stream(void* cd, grpc_transport* transport, args.channel = chand->channel; args.server_transport_data = transport_server_data; args.send_deadline = GRPC_MILLIS_INF_FUTURE; + args.server = chand->server; grpc_call* call; grpc_error* error = grpc_call_create(&args, &call); grpc_call_element* elem = @@ -960,6 +964,7 @@ void grpc_server_register_completion_queue(grpc_server* server, } grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) { + grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); grpc_server* server = @@ -976,6 +981,20 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) { server->channel_args = grpc_channel_args_copy(args); + const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ); + if (grpc_channel_arg_get_bool(arg, false)) { + arg = grpc_channel_args_find(args, + GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE); + size_t trace_events_per_node = + grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX}); + server->channelz_server = + grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>( + trace_events_per_node); + server->channelz_server->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Server created")); + } + return server; } @@ -1478,3 +1497,8 @@ int grpc_server_has_open_connections(grpc_server* server) { gpr_mu_unlock(&server->mu_global); return r; } + +grpc_core::channelz::ServerNode* grpc_server_get_channelz_node( + grpc_server* server) { + return server->channelz_server.get(); +} diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index c617cc223e..0196743ff9 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -23,6 +23,7 @@ #include <grpc/grpc.h> #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channelz.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/transport.h" @@ -46,6 +47,9 @@ void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport, grpc_pollset* accepting_pollset, const grpc_channel_args* args); +grpc_core::channelz::ServerNode* grpc_server_get_channelz_node( + grpc_server* server); + const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server); int grpc_server_has_open_connections(grpc_server* server); diff --git a/src/cpp/server/channelz/channelz_service.cc b/src/cpp/server/channelz/channelz_service.cc index e0ef4acd58..c055a9b1ba 100644 --- a/src/cpp/server/channelz/channelz_service.cc +++ b/src/cpp/server/channelz/channelz_service.cc @@ -32,6 +32,9 @@ Status ChannelzService::GetTopChannels( ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request, channelz::v1::GetTopChannelsResponse* response) { char* json_str = grpc_channelz_get_top_channels(request->start_channel_id()); + if (json_str == nullptr) { + return Status(INTERNAL, "grpc_channelz_get_top_channels returned null"); + } google::protobuf::util::Status s = google::protobuf::util::JsonStringToMessage(json_str, response); gpr_free(json_str); @@ -45,6 +48,9 @@ Status ChannelzService::GetChannel( ServerContext* unused, const channelz::v1::GetChannelRequest* request, channelz::v1::GetChannelResponse* response) { char* json_str = grpc_channelz_get_channel(request->channel_id()); + if (json_str == nullptr) { + return Status(NOT_FOUND, "No object found for that ChannelId"); + } google::protobuf::util::Status s = google::protobuf::util::JsonStringToMessage(json_str, response); gpr_free(json_str); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 25ba7aa275..0c46f6c85a 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -97,6 +97,7 @@ grpc_resource_quota_resize_type grpc_resource_quota_resize_import; grpc_resource_quota_set_max_threads_type grpc_resource_quota_set_max_threads_import; grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import; grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import; +grpc_channelz_get_servers_type grpc_channelz_get_servers_import; grpc_channelz_get_channel_type grpc_channelz_get_channel_import; grpc_channelz_get_subchannel_type grpc_channelz_get_subchannel_import; grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import; @@ -352,6 +353,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_resource_quota_set_max_threads_import = (grpc_resource_quota_set_max_threads_type) GetProcAddress(library, "grpc_resource_quota_set_max_threads"); grpc_resource_quota_arg_vtable_import = (grpc_resource_quota_arg_vtable_type) GetProcAddress(library, "grpc_resource_quota_arg_vtable"); grpc_channelz_get_top_channels_import = (grpc_channelz_get_top_channels_type) GetProcAddress(library, "grpc_channelz_get_top_channels"); + grpc_channelz_get_servers_import = (grpc_channelz_get_servers_type) GetProcAddress(library, "grpc_channelz_get_servers"); grpc_channelz_get_channel_import = (grpc_channelz_get_channel_type) GetProcAddress(library, "grpc_channelz_get_channel"); grpc_channelz_get_subchannel_import = (grpc_channelz_get_subchannel_type) GetProcAddress(library, "grpc_channelz_get_subchannel"); grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 30fcb79407..6adddb536c 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -266,6 +266,9 @@ extern grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import typedef char*(*grpc_channelz_get_top_channels_type)(intptr_t start_channel_id); extern grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import; #define grpc_channelz_get_top_channels grpc_channelz_get_top_channels_import +typedef char*(*grpc_channelz_get_servers_type)(intptr_t start_channel_id); +extern grpc_channelz_get_servers_type grpc_channelz_get_servers_import; +#define grpc_channelz_get_servers grpc_channelz_get_servers_import typedef char*(*grpc_channelz_get_channel_type)(intptr_t channel_id); extern grpc_channelz_get_channel_type grpc_channelz_get_channel_import; #define grpc_channelz_get_channel grpc_channelz_get_channel_import |