aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-09-12 11:34:00 -0700
committerGravatar ncteisen <ncteisen@gmail.com>2018-09-18 14:57:02 -0700
commitbd9d97a2003322f523d54f7bc9fa89dff3e36ab5 (patch)
treed28d72744d0c31208e9db9a651479f804265ae91 /src/core/ext
parente914bbac72e866db04600d123e37e0087a08d5f7 (diff)
Channelz socket support
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc24
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.cc1
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h7
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.cc6
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.cc1
5 files changed, 39 insertions, 0 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 26cad2cc9a..38032d56a6 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -335,6 +335,10 @@ static bool read_channel_args(grpc_chttp2_transport* t,
GRPC_ARG_OPTIMIZATION_TARGET,
channel_args->args[i].value.string);
}
+ } else if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
+ t->channelz_socket =
+ grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>();
} else {
static const struct {
const char* channel_arg_name;
@@ -720,6 +724,14 @@ static void destroy_stream_locked(void* sp, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
grpc_chttp2_transport* t = s->t;
+ if (t->channelz_socket != nullptr) {
+ if ((t->is_client && s->eos_received) || (!t->is_client && s->eos_sent)) {
+ t->channelz_socket->RecordStreamSucceeded();
+ } else {
+ t->channelz_socket->RecordStreamFailed();
+ }
+ }
+
GPR_ASSERT((s->write_closed && s->read_closed) || s->id == 0);
if (s->id != 0) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == nullptr);
@@ -1407,6 +1419,9 @@ static void perform_stream_op_locked(void* stream_op,
}
if (op->send_initial_metadata) {
+ if (t->is_client && t->channelz_socket != nullptr) {
+ t->channelz_socket->RecordStreamStartedFromLocal();
+ }
GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
@@ -1492,6 +1507,9 @@ static void perform_stream_op_locked(void* stream_op,
if (op->send_message) {
GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
+ if (t->channelz_socket != nullptr) {
+ t->channelz_socket->RecordMessageSent();
+ }
GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
op->payload->send_message.send_message->length());
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
@@ -1609,6 +1627,9 @@ static void perform_stream_op_locked(void* stream_op,
if (op->recv_message) {
GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
+ if (t->channelz_socket != nullptr) {
+ t->channelz_socket->RecordMessageRecieved();
+ }
size_t before = 0;
GPR_ASSERT(s->recv_message_ready == nullptr);
GPR_ASSERT(!s->pending_byte_stream);
@@ -2707,6 +2728,9 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
if (error != GRPC_ERROR_NONE) {
return;
}
+ if (t->channelz_socket != nullptr) {
+ t->channelz_socket->RecordKeepaliveSent();
+ }
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
grpc_timer_init(&t->keepalive_watchdog_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc
index f8f06f6789..15de879528 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_data.cc
@@ -62,6 +62,7 @@ grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser,
if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
s->received_last_frame = true;
+ s->eos_received = true;
} else {
s->received_last_frame = false;
}
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 6b5309bab4..bf0dfa98af 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -36,6 +36,7 @@
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
+#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/compression/stream_compression.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -471,6 +472,8 @@ struct grpc_chttp2_transport {
bool keepalive_permit_without_calls;
/** keep-alive state machine state */
grpc_chttp2_keepalive_state keepalive_state;
+
+ grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
};
typedef enum {
@@ -534,6 +537,10 @@ struct grpc_chttp2_stream {
/** Has trailing metadata been received. */
bool received_trailing_metadata;
+ /* have we sent or received the EOS bit? */
+ bool eos_received;
+ bool eos_sent;
+
/** the error that resulted in this stream being read-closed */
grpc_error* read_closed_error;
/** the error that resulted in this stream being write-closed */
diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc
index 1e491d2ef8..0694ad743b 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.cc
+++ b/src/core/ext/transport/chttp2/transport/parsing.cc
@@ -598,6 +598,9 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t,
gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted"));
return init_skip_frame_parser(t, 1);
}
+ if (t->channelz_socket != nullptr) {
+ t->channelz_socket->RecordStreamStartedFromRemote();
+ }
} else {
t->incoming_stream = s;
}
@@ -611,6 +614,9 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t,
}
t->parser = grpc_chttp2_header_parser_parse;
t->parser_data = &t->hpack_parser;
+ if (t->header_eof) {
+ s->eos_received = true;
+ }
switch (s->header_frames_received) {
case 0:
if (t->is_client && t->header_eof) {
diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc
index 8b73b01dea..5beaf5491e 100644
--- a/src/core/ext/transport/chttp2/transport/writing.cc
+++ b/src/core/ext/transport/chttp2/transport/writing.cc
@@ -569,6 +569,7 @@ class StreamWriteContext {
void SentLastFrame() {
s_->send_trailing_metadata = nullptr;
s_->sent_trailing_metadata = true;
+ s_->eos_sent = true;
if (!t_->is_client && !s_->read_closed) {
grpc_slice_buffer_add(