diff options
author | 2018-09-12 11:34:00 -0700 | |
---|---|---|
committer | 2018-09-18 14:57:02 -0700 | |
commit | bd9d97a2003322f523d54f7bc9fa89dff3e36ab5 (patch) | |
tree | d28d72744d0c31208e9db9a651479f804265ae91 /src | |
parent | e914bbac72e866db04600d123e37e0087a08d5f7 (diff) |
Channelz socket support
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.cc | 24 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/frame_data.cc | 1 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/internal.h | 7 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/parsing.cc | 6 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/writing.cc | 1 | ||||
-rw-r--r-- | src/core/lib/channel/channelz.cc | 112 | ||||
-rw-r--r-- | src/core/lib/channel/channelz.h | 29 | ||||
-rw-r--r-- | src/core/lib/channel/channelz_registry.cc | 18 |
8 files changed, 192 insertions, 6 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 26cad2cc9a..38032d56a6 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -335,6 +335,10 @@ static bool read_channel_args(grpc_chttp2_transport* t, GRPC_ARG_OPTIMIZATION_TARGET, channel_args->args[i].value.string); } + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) { + t->channelz_socket = + grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(); } else { static const struct { const char* channel_arg_name; @@ -720,6 +724,14 @@ static void destroy_stream_locked(void* sp, grpc_error* error) { grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp); grpc_chttp2_transport* t = s->t; + if (t->channelz_socket != nullptr) { + if ((t->is_client && s->eos_received) || (!t->is_client && s->eos_sent)) { + t->channelz_socket->RecordStreamSucceeded(); + } else { + t->channelz_socket->RecordStreamFailed(); + } + } + GPR_ASSERT((s->write_closed && s->read_closed) || s->id == 0); if (s->id != 0) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == nullptr); @@ -1407,6 +1419,9 @@ static void perform_stream_op_locked(void* stream_op, } if (op->send_initial_metadata) { + if (t->is_client && t->channelz_socket != nullptr) { + t->channelz_socket->RecordStreamStartedFromLocal(); + } GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA(); GPR_ASSERT(s->send_initial_metadata_finished == nullptr); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; @@ -1492,6 +1507,9 @@ static void perform_stream_op_locked(void* stream_op, if (op->send_message) { GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(); + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordMessageSent(); + } GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE( op->payload->send_message.send_message->length()); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; @@ -1609,6 +1627,9 @@ static void perform_stream_op_locked(void* stream_op, if (op->recv_message) { GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE(); + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordMessageRecieved(); + } size_t before = 0; GPR_ASSERT(s->recv_message_ready == nullptr); GPR_ASSERT(!s->pending_byte_stream); @@ -2707,6 +2728,9 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { if (error != GRPC_ERROR_NONE) { return; } + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordKeepaliveSent(); + } GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); grpc_timer_init(&t->keepalive_watchdog_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc index f8f06f6789..15de879528 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.cc +++ b/src/core/ext/transport/chttp2/transport/frame_data.cc @@ -62,6 +62,7 @@ grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser, if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) { s->received_last_frame = true; + s->eos_received = true; } else { s->received_last_frame = false; } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 6b5309bab4..bf0dfa98af 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -36,6 +36,7 @@ #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" #include "src/core/ext/transport/chttp2/transport/stream_map.h" +#include "src/core/lib/channel/channelz.h" #include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" @@ -471,6 +472,8 @@ struct grpc_chttp2_transport { bool keepalive_permit_without_calls; /** keep-alive state machine state */ grpc_chttp2_keepalive_state keepalive_state; + + grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket; }; typedef enum { @@ -534,6 +537,10 @@ struct grpc_chttp2_stream { /** Has trailing metadata been received. */ bool received_trailing_metadata; + /* have we sent or received the EOS bit? */ + bool eos_received; + bool eos_sent; + /** the error that resulted in this stream being read-closed */ grpc_error* read_closed_error; /** the error that resulted in this stream being write-closed */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 1e491d2ef8..0694ad743b 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -598,6 +598,9 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted")); return init_skip_frame_parser(t, 1); } + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordStreamStartedFromRemote(); + } } else { t->incoming_stream = s; } @@ -611,6 +614,9 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, } t->parser = grpc_chttp2_header_parser_parse; t->parser_data = &t->hpack_parser; + if (t->header_eof) { + s->eos_received = true; + } switch (s->header_frames_received) { case 0: if (t->is_client && t->header_eof) { diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 8b73b01dea..5beaf5491e 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -569,6 +569,7 @@ class StreamWriteContext { void SentLastFrame() { s_->send_trailing_metadata = nullptr; s_->sent_trailing_metadata = true; + s_->eos_sent = true; if (!t_->is_client && !s_->read_closed) { grpc_slice_buffer_add( diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 375cf25cc6..e1ab2ead62 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -81,11 +81,13 @@ void CallCountingHelper::PopulateCallCounts(grpc_json* json) { json_iterator = grpc_json_add_number_string_child( json, json_iterator, "callsFailed", calls_failed_); } - gpr_timespec ts = - grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); - json_iterator = - grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", - gpr_format_timespec(ts), GRPC_JSON_STRING, true); + if (calls_started_ != 0) { + gpr_timespec ts = + grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); + json_iterator = + grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", + gpr_format_timespec(ts), GRPC_JSON_STRING, true); + } } ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, @@ -184,5 +186,105 @@ grpc_json* ServerNode::RenderJson() { return top_level_json; } +void SocketNode::RecordStreamStartedFromLocal() { + gpr_atm_no_barrier_fetch_add(&streams_started_, (gpr_atm)1); + gpr_atm_no_barrier_store(&last_local_stream_created_millis_, + (gpr_atm)ExecCtx::Get()->Now()); +} + +void SocketNode::RecordStreamStartedFromRemote() { + gpr_atm_no_barrier_fetch_add(&streams_started_, (gpr_atm)1); + gpr_atm_no_barrier_store(&last_remote_stream_created_millis_, + (gpr_atm)ExecCtx::Get()->Now()); +} + +void SocketNode::RecordMessageSent() { + gpr_atm_no_barrier_fetch_add(&messages_sent_, (gpr_atm)1); + gpr_atm_no_barrier_store(&last_message_sent_millis_, + (gpr_atm)ExecCtx::Get()->Now()); +} + +void SocketNode::RecordMessageRecieved() { + gpr_atm_no_barrier_fetch_add(&messages_recieved_, (gpr_atm)1); + gpr_atm_no_barrier_store(&last_message_recieved_millis_, + (gpr_atm)ExecCtx::Get()->Now()); +} + +grpc_json* SocketNode::RenderJson() { + // We need to track these three json objects to build our object + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + // create and fill the ref child + json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr, + GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = grpc_json_add_number_string_child(json, json_iterator, + "socketId", uuid()); + // reset json iterators to top level object + json = top_level_json; + json_iterator = nullptr; + // create and fill the data child. + grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr, + GRPC_JSON_OBJECT, false); + json = data; + json_iterator = nullptr; + if (streams_started_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "streamsStarted", streams_started_); + } + if (streams_succeeded_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "streamsSucceeded", streams_succeeded_); + } + if (streams_failed_) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "streamsFailed", streams_failed_); + } + if (messages_sent_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "messagesSent", messages_sent_); + } + if (messages_recieved_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "messagesRecieved", messages_recieved_); + } + if (keepalives_sent_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "keepAlivesSent", keepalives_sent_); + } + gpr_timespec ts; + if (streams_started_ != 0 && last_local_stream_created_millis_ != 0) { + ts = grpc_millis_to_timespec(last_local_stream_created_millis_, + GPR_CLOCK_REALTIME); + json_iterator = grpc_json_create_child( + json_iterator, json, "lastLocalStreamCreatedTimestamp", + gpr_format_timespec(ts), GRPC_JSON_STRING, true); + } + if (streams_started_ != 0 && last_remote_stream_created_millis_ != 0) { + ts = grpc_millis_to_timespec(last_remote_stream_created_millis_, + GPR_CLOCK_REALTIME); + json_iterator = grpc_json_create_child( + json_iterator, json, "lastRemoteStreamCreatedTimestamp", + gpr_format_timespec(ts), GRPC_JSON_STRING, true); + } + if (messages_sent_ != 0) { + ts = grpc_millis_to_timespec(last_message_sent_millis_, GPR_CLOCK_REALTIME); + json_iterator = + grpc_json_create_child(json_iterator, json, "lastMessageSentTimestamp", + gpr_format_timespec(ts), GRPC_JSON_STRING, true); + } + if (messages_recieved_ != 0) { + ts = grpc_millis_to_timespec(last_message_recieved_millis_, + GPR_CLOCK_REALTIME); + json_iterator = grpc_json_create_child( + json_iterator, json, "lastMessageRecievedTimestamp", + gpr_format_timespec(ts), GRPC_JSON_STRING, true); + } + json = top_level_json; + return top_level_json; +} + } // namespace channelz } // namespace grpc_core diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index 9be256147b..2486820863 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -197,11 +197,38 @@ class ServerNode : public BaseNode { }; // Handles channelz bookkeeping for sockets -// TODO(ncteisen): implement in subsequent PR. class SocketNode : public BaseNode { public: SocketNode() : BaseNode(EntityType::kSocket) {} ~SocketNode() override {} + + grpc_json* RenderJson() override; + + void RecordStreamStartedFromLocal(); + void RecordStreamStartedFromRemote(); + void RecordStreamSucceeded() { + gpr_atm_no_barrier_fetch_add(&streams_succeeded_, (gpr_atm(1))); + } + void RecordStreamFailed() { + gpr_atm_no_barrier_fetch_add(&streams_failed_, (gpr_atm(1))); + } + void RecordMessageSent(); + void RecordMessageRecieved(); + void RecordKeepaliveSent() { + gpr_atm_no_barrier_fetch_add(&keepalives_sent_, (gpr_atm(1))); + } + + private: + gpr_atm streams_started_ = 0; + gpr_atm streams_succeeded_ = 0; + gpr_atm streams_failed_ = 0; + gpr_atm messages_sent_ = 0; + gpr_atm messages_recieved_ = 0; + gpr_atm keepalives_sent_ = 0; + gpr_atm last_local_stream_created_millis_ = 0; + gpr_atm last_remote_stream_created_millis_ = 0; + gpr_atm last_message_sent_millis_ = 0; + gpr_atm last_message_recieved_millis_ = 0; }; // Creation functions diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc index adc7b6ba44..841f1c6104 100644 --- a/src/core/lib/channel/channelz_registry.cc +++ b/src/core/lib/channel/channelz_registry.cc @@ -197,3 +197,21 @@ char* grpc_channelz_get_subchannel(intptr_t subchannel_id) { grpc_json_destroy(top_level_json); return json_str; } + +char* grpc_channelz_get_socket(intptr_t socket_id) { + grpc_core::channelz::BaseNode* socket_node = + grpc_core::channelz::ChannelzRegistry::Get(socket_id); + if (socket_node == nullptr || + socket_node->type() != + grpc_core::channelz::BaseNode::EntityType::kSocket) { + return nullptr; + } + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* socket_json = socket_node->RenderJson(); + socket_json->key = "socket"; + grpc_json_link_child(json, socket_json, nullptr); + char* json_str = grpc_json_dump_to_string(top_level_json, 0); + grpc_json_destroy(top_level_json); + return json_str; +} |