diff options
author | 2018-09-12 11:34:00 -0700 | |
---|---|---|
committer | 2018-09-18 14:57:02 -0700 | |
commit | bd9d97a2003322f523d54f7bc9fa89dff3e36ab5 (patch) | |
tree | d28d72744d0c31208e9db9a651479f804265ae91 /src/core/ext | |
parent | e914bbac72e866db04600d123e37e0087a08d5f7 (diff) |
Channelz socket support
Diffstat (limited to 'src/core/ext')
5 files changed, 39 insertions, 0 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 26cad2cc9a..38032d56a6 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -335,6 +335,10 @@ static bool read_channel_args(grpc_chttp2_transport* t, GRPC_ARG_OPTIMIZATION_TARGET, channel_args->args[i].value.string); } + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) { + t->channelz_socket = + grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(); } else { static const struct { const char* channel_arg_name; @@ -720,6 +724,14 @@ static void destroy_stream_locked(void* sp, grpc_error* error) { grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp); grpc_chttp2_transport* t = s->t; + if (t->channelz_socket != nullptr) { + if ((t->is_client && s->eos_received) || (!t->is_client && s->eos_sent)) { + t->channelz_socket->RecordStreamSucceeded(); + } else { + t->channelz_socket->RecordStreamFailed(); + } + } + GPR_ASSERT((s->write_closed && s->read_closed) || s->id == 0); if (s->id != 0) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == nullptr); @@ -1407,6 +1419,9 @@ static void perform_stream_op_locked(void* stream_op, } if (op->send_initial_metadata) { + if (t->is_client && t->channelz_socket != nullptr) { + t->channelz_socket->RecordStreamStartedFromLocal(); + } GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA(); GPR_ASSERT(s->send_initial_metadata_finished == nullptr); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; @@ -1492,6 +1507,9 @@ static void perform_stream_op_locked(void* stream_op, if (op->send_message) { GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(); + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordMessageSent(); + } GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE( op->payload->send_message.send_message->length()); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; @@ -1609,6 +1627,9 @@ static void perform_stream_op_locked(void* stream_op, if (op->recv_message) { GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE(); + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordMessageRecieved(); + } size_t before = 0; GPR_ASSERT(s->recv_message_ready == nullptr); GPR_ASSERT(!s->pending_byte_stream); @@ -2707,6 +2728,9 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { if (error != GRPC_ERROR_NONE) { return; } + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordKeepaliveSent(); + } GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); grpc_timer_init(&t->keepalive_watchdog_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc index f8f06f6789..15de879528 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.cc +++ b/src/core/ext/transport/chttp2/transport/frame_data.cc @@ -62,6 +62,7 @@ grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser, if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) { s->received_last_frame = true; + s->eos_received = true; } else { s->received_last_frame = false; } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 6b5309bab4..bf0dfa98af 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -36,6 +36,7 @@ #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" #include "src/core/ext/transport/chttp2/transport/stream_map.h" +#include "src/core/lib/channel/channelz.h" #include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" @@ -471,6 +472,8 @@ struct grpc_chttp2_transport { bool keepalive_permit_without_calls; /** keep-alive state machine state */ grpc_chttp2_keepalive_state keepalive_state; + + grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket; }; typedef enum { @@ -534,6 +537,10 @@ struct grpc_chttp2_stream { /** Has trailing metadata been received. */ bool received_trailing_metadata; + /* have we sent or received the EOS bit? */ + bool eos_received; + bool eos_sent; + /** the error that resulted in this stream being read-closed */ grpc_error* read_closed_error; /** the error that resulted in this stream being write-closed */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 1e491d2ef8..0694ad743b 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -598,6 +598,9 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted")); return init_skip_frame_parser(t, 1); } + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordStreamStartedFromRemote(); + } } else { t->incoming_stream = s; } @@ -611,6 +614,9 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, } t->parser = grpc_chttp2_header_parser_parse; t->parser_data = &t->hpack_parser; + if (t->header_eof) { + s->eos_received = true; + } switch (s->header_frames_received) { case 0: if (t->is_client && t->header_eof) { diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 8b73b01dea..5beaf5491e 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -569,6 +569,7 @@ class StreamWriteContext { void SentLastFrame() { s_->send_trailing_metadata = nullptr; s_->sent_trailing_metadata = true; + s_->eos_sent = true; if (!t_->is_client && !s_->read_closed) { grpc_slice_buffer_add( |