aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-11-29 01:26:45 -0800
committerGravatar Yash Tibrewal <yashkt@google.com>2018-11-29 01:26:45 -0800
commit9506d356740f8375f121e00e057a2eba41c97a98 (patch)
tree7ed16ae19b1478f2a13398205859ace41f47ef0c
parentfe4ef31ac28f702755c67cb0d79140bc9cbaa552 (diff)
Add a byte counter to chttp2_stream and use that for timestamps
-rw-r--r--src/core/ext/transport/chttp2/transport/context_list.cc1
-rw-r--r--src/core/ext/transport/chttp2/transport/context_list.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.cc7
-rw-r--r--src/core/lib/iomgr/buffer_list.h2
-rw-r--r--test/core/transport/chttp2/context_list_test.cc12
6 files changed, 18 insertions, 8 deletions
diff --git a/src/core/ext/transport/chttp2/transport/context_list.cc b/src/core/ext/transport/chttp2/transport/context_list.cc
index 4acd0c9583..89e574ac67 100644
--- a/src/core/ext/transport/chttp2/transport/context_list.cc
+++ b/src/core/ext/transport/chttp2/transport/context_list.cc
@@ -32,6 +32,7 @@ void ContextList::Execute(void* arg, grpc_core::Timestamps* ts,
while (head != nullptr) {
if (error == GRPC_ERROR_NONE && ts != nullptr) {
if (write_timestamps_callback_g) {
+ ts->byte_offset = head->byte_offset_;
write_timestamps_callback_g(head->s_->context, ts);
}
}
diff --git a/src/core/ext/transport/chttp2/transport/context_list.h b/src/core/ext/transport/chttp2/transport/context_list.h
index 68d11e94d8..d870107749 100644
--- a/src/core/ext/transport/chttp2/transport/context_list.h
+++ b/src/core/ext/transport/chttp2/transport/context_list.h
@@ -50,6 +50,7 @@ class ContextList {
/* Create a new element in the list and add it at the front */
ContextList* elem = grpc_core::New<ContextList>();
elem->s_ = s;
+ elem->byte_offset_ = s->byte_counter;
elem->next_ = *head;
*head = elem;
}
@@ -61,6 +62,7 @@ class ContextList {
private:
grpc_chttp2_stream* s_ = nullptr;
ContextList* next_ = nullptr;
+ size_t byte_offset_ = 0;
};
void grpc_http2_set_write_timestamps_callback(
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index aeaa4935ad..6aa68f5d4a 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -646,6 +646,8 @@ struct grpc_chttp2_stream {
bool traced = false;
/** gRPC header bytes that are already decompressed */
size_t decompressed_header_bytes = 0;
+ /** Byte counter for number of bytes written */
+ size_t byte_counter = 0;
};
/** Transport writing call flow:
diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc
index 77320b496f..265d3365d3 100644
--- a/src/core/ext/transport/chttp2/transport/writing.cc
+++ b/src/core/ext/transport/chttp2/transport/writing.cc
@@ -363,6 +363,7 @@ class DataSendContext {
grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
s_->flow_control->SentData(send_bytes);
+ s_->byte_counter += send_bytes;
if (s_->compressed_data_buffer.length == 0) {
s_->sending_bytes += s_->uncompressed_data_size;
}
@@ -488,9 +489,6 @@ class StreamWriteContext {
return; // early out: nothing to do
}
- if (s_->traced && grpc_endpoint_can_track_err(t_->ep)) {
- grpc_core::ContextList::Append(&t_->cl, s_);
- }
while ((s_->flow_controlled_buffer.length > 0 ||
s_->compressed_data_buffer.length > 0) &&
data_send_context.max_outgoing() > 0) {
@@ -500,6 +498,9 @@ class StreamWriteContext {
data_send_context.CompressMoreBytes();
}
}
+ if (s_->traced && grpc_endpoint_can_track_err(t_->ep)) {
+ grpc_core::ContextList::Append(&t_->cl, s_);
+ }
write_context_->ResetPingClock();
if (data_send_context.is_last_frame()) {
SentLastFrame();
diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h
index 9f62d988cc..627f1bde99 100644
--- a/src/core/lib/iomgr/buffer_list.h
+++ b/src/core/lib/iomgr/buffer_list.h
@@ -38,7 +38,7 @@ struct Timestamps {
gpr_timespec sent_time;
gpr_timespec acked_time;
- uint32_t length; /* The length of the buffer traced */
+ uint32_t byte_offset; /* byte offset relative to the start of the RPC */
};
/** TracedBuffer is a class to keep track of timestamps for a specific buffer in
diff --git a/test/core/transport/chttp2/context_list_test.cc b/test/core/transport/chttp2/context_list_test.cc
index e2100899d3..d61f32985e 100644
--- a/test/core/transport/chttp2/context_list_test.cc
+++ b/test/core/transport/chttp2/context_list_test.cc
@@ -33,8 +33,12 @@
namespace grpc_core {
namespace testing {
namespace {
+
+const int kByteOffset = 123;
+
void TestExecuteFlushesListVerifier(void* arg, grpc_core::Timestamps* ts) {
- GPR_ASSERT(arg != nullptr);
+ ASSERT_NE(arg, nullptr);
+ EXPECT_EQ(ts->byte_offset, kByteOffset);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
}
@@ -43,7 +47,7 @@ void discard_write(grpc_slice slice) {}
/** Tests that all ContextList elements in the list are flushed out on
* execute.
- * Also tests that arg is passed correctly.
+ * Also tests that arg and byte_counter are passed correctly.
*/
TEST(ContextList, ExecuteFlushesList) {
grpc_core::ContextList* list = nullptr;
@@ -68,14 +72,14 @@ TEST(ContextList, ExecuteFlushesList) {
reinterpret_cast<grpc_stream*>(s[i]), &ref,
nullptr, nullptr);
s[i]->context = &verifier_called[i];
+ s[i]->byte_counter = kByteOffset;
gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
grpc_core::ContextList::Append(&list, s[i]);
}
grpc_core::Timestamps ts;
grpc_core::ContextList::Execute(list, &ts, GRPC_ERROR_NONE);
for (auto i = 0; i < kNumElems; i++) {
- GPR_ASSERT(gpr_atm_acq_load(&verifier_called[i]) ==
- static_cast<gpr_atm>(1));
+ EXPECT_EQ(gpr_atm_acq_load(&verifier_called[i]), static_cast<gpr_atm>(1));
grpc_transport_destroy_stream(reinterpret_cast<grpc_transport*>(t),
reinterpret_cast<grpc_stream*>(s[i]),
nullptr);