aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-11-29 13:14:34 -0800
committerGravatar GitHub <noreply@github.com>2018-11-29 13:14:34 -0800
commit714a54aa43db1f21dacd383217ef202b50bf99e0 (patch)
treebaeeb756464a25582c8a2c2d2cb37da889d402ee
parentf0b99dd6b98545287ebf30aba4d12fd3072fcff4 (diff)
parent575da5118a2f02a9e3473077d4c934eda9c8cd40 (diff)
Merge pull request #17331 from yashykt/timestamplength
Add the byte offset for the RPC that is traced in Timestamps
-rw-r--r--src/core/ext/transport/chttp2/transport/context_list.cc1
-rw-r--r--src/core/ext/transport/chttp2/transport/context_list.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.cc7
-rw-r--r--src/core/lib/iomgr/buffer_list.h4
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc2
-rw-r--r--test/core/transport/chttp2/context_list_test.cc12
7 files changed, 21 insertions, 9 deletions
diff --git a/src/core/ext/transport/chttp2/transport/context_list.cc b/src/core/ext/transport/chttp2/transport/context_list.cc
index 4acd0c9583..f30d41c332 100644
--- a/src/core/ext/transport/chttp2/transport/context_list.cc
+++ b/src/core/ext/transport/chttp2/transport/context_list.cc
@@ -32,6 +32,7 @@ void ContextList::Execute(void* arg, grpc_core::Timestamps* ts,
while (head != nullptr) {
if (error == GRPC_ERROR_NONE && ts != nullptr) {
if (write_timestamps_callback_g) {
+ ts->byte_offset = static_cast<uint32_t>(head->byte_offset_);
write_timestamps_callback_g(head->s_->context, ts);
}
}
diff --git a/src/core/ext/transport/chttp2/transport/context_list.h b/src/core/ext/transport/chttp2/transport/context_list.h
index 68d11e94d8..d870107749 100644
--- a/src/core/ext/transport/chttp2/transport/context_list.h
+++ b/src/core/ext/transport/chttp2/transport/context_list.h
@@ -50,6 +50,7 @@ class ContextList {
/* Create a new element in the list and add it at the front */
ContextList* elem = grpc_core::New<ContextList>();
elem->s_ = s;
+ elem->byte_offset_ = s->byte_counter;
elem->next_ = *head;
*head = elem;
}
@@ -61,6 +62,7 @@ class ContextList {
private:
grpc_chttp2_stream* s_ = nullptr;
ContextList* next_ = nullptr;
+ size_t byte_offset_ = 0;
};
void grpc_http2_set_write_timestamps_callback(
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index aeaa4935ad..6aa68f5d4a 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -646,6 +646,8 @@ struct grpc_chttp2_stream {
bool traced = false;
/** gRPC header bytes that are already decompressed */
size_t decompressed_header_bytes = 0;
+ /** Byte counter for number of bytes written */
+ size_t byte_counter = 0;
};
/** Transport writing call flow:
diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc
index 77320b496f..265d3365d3 100644
--- a/src/core/ext/transport/chttp2/transport/writing.cc
+++ b/src/core/ext/transport/chttp2/transport/writing.cc
@@ -363,6 +363,7 @@ class DataSendContext {
grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
s_->flow_control->SentData(send_bytes);
+ s_->byte_counter += send_bytes;
if (s_->compressed_data_buffer.length == 0) {
s_->sending_bytes += s_->uncompressed_data_size;
}
@@ -488,9 +489,6 @@ class StreamWriteContext {
return; // early out: nothing to do
}
- if (s_->traced && grpc_endpoint_can_track_err(t_->ep)) {
- grpc_core::ContextList::Append(&t_->cl, s_);
- }
while ((s_->flow_controlled_buffer.length > 0 ||
s_->compressed_data_buffer.length > 0) &&
data_send_context.max_outgoing() > 0) {
@@ -500,6 +498,9 @@ class StreamWriteContext {
data_send_context.CompressMoreBytes();
}
}
+ if (s_->traced && grpc_endpoint_can_track_err(t_->ep)) {
+ grpc_core::ContextList::Append(&t_->cl, s_);
+ }
write_context_->ResetPingClock();
if (data_send_context.is_last_frame()) {
SentLastFrame();
diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h
index 87d74f9ce2..627f1bde99 100644
--- a/src/core/lib/iomgr/buffer_list.h
+++ b/src/core/lib/iomgr/buffer_list.h
@@ -37,6 +37,8 @@ struct Timestamps {
gpr_timespec scheduled_time;
gpr_timespec sent_time;
gpr_timespec acked_time;
+
+ uint32_t byte_offset; /* byte offset relative to the start of the RPC */
};
/** TracedBuffer is a class to keep track of timestamps for a specific buffer in
@@ -73,7 +75,7 @@ class TracedBuffer {
private:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
- TracedBuffer(int seq_no, void* arg)
+ TracedBuffer(uint32_t seq_no, void* arg)
: seq_no_(seq_no), arg_(arg), next_(nullptr) {}
uint32_t seq_no_; /* The sequence number for the last byte in the buffer */
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index 68c61b1201..cfcb190d60 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -634,7 +634,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
if (sending_length == static_cast<size_t>(length)) {
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::AddNewEntry(
- &tcp->tb_head, static_cast<int>(tcp->bytes_counter + length),
+ &tcp->tb_head, static_cast<uint32_t>(tcp->bytes_counter + length),
tcp->outgoing_buffer_arg);
gpr_mu_unlock(&tcp->tb_mu);
tcp->outgoing_buffer_arg = nullptr;
diff --git a/test/core/transport/chttp2/context_list_test.cc b/test/core/transport/chttp2/context_list_test.cc
index e2100899d3..e64f1532ec 100644
--- a/test/core/transport/chttp2/context_list_test.cc
+++ b/test/core/transport/chttp2/context_list_test.cc
@@ -33,8 +33,12 @@
namespace grpc_core {
namespace testing {
namespace {
+
+const uint32_t kByteOffset = 123;
+
void TestExecuteFlushesListVerifier(void* arg, grpc_core::Timestamps* ts) {
- GPR_ASSERT(arg != nullptr);
+ ASSERT_NE(arg, nullptr);
+ EXPECT_EQ(ts->byte_offset, kByteOffset);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
}
@@ -43,7 +47,7 @@ void discard_write(grpc_slice slice) {}
/** Tests that all ContextList elements in the list are flushed out on
* execute.
- * Also tests that arg is passed correctly.
+ * Also tests that arg and byte_counter are passed correctly.
*/
TEST(ContextList, ExecuteFlushesList) {
grpc_core::ContextList* list = nullptr;
@@ -68,14 +72,14 @@ TEST(ContextList, ExecuteFlushesList) {
reinterpret_cast<grpc_stream*>(s[i]), &ref,
nullptr, nullptr);
s[i]->context = &verifier_called[i];
+ s[i]->byte_counter = kByteOffset;
gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
grpc_core::ContextList::Append(&list, s[i]);
}
grpc_core::Timestamps ts;
grpc_core::ContextList::Execute(list, &ts, GRPC_ERROR_NONE);
for (auto i = 0; i < kNumElems; i++) {
- GPR_ASSERT(gpr_atm_acq_load(&verifier_called[i]) ==
- static_cast<gpr_atm>(1));
+ EXPECT_EQ(gpr_atm_acq_load(&verifier_called[i]), static_cast<gpr_atm>(1));
grpc_transport_destroy_stream(reinterpret_cast<grpc_transport*>(t),
reinterpret_cast<grpc_stream*>(s[i]),
nullptr);