aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2018-03-27 14:46:18 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2018-03-27 14:46:18 -0700
commit158375a44007cfa3a07901d3a2035200a63c9e48 (patch)
tree43efac1051e80bda4e794cbb084f1928eefd5f92 /src/core/ext/transport
parent46511cd5d3488007b22480d64074feff0bf71010 (diff)
parentb984a27d90ab1d3ed206a5f7a32200f01e1f3a11 (diff)
Merge branch 'master' of github.com:grpc/grpc into authority_header
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc20
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h5
2 files changed, 21 insertions, 4 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index ab69cecf3a..dc4e002405 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -669,6 +669,7 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs,
GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s,
grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
+ s->unprocessed_incoming_frames_buffer_cached_length = 0;
grpc_slice_buffer_init(&s->frame_storage);
grpc_slice_buffer_init(&s->compressed_data_buffer);
grpc_slice_buffer_init(&s->decompressed_data_buffer);
@@ -1577,20 +1578,27 @@ static void perform_stream_op_locked(void* stream_op,
if (op->recv_message) {
GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
- size_t already_received;
+ size_t before = 0;
GPR_ASSERT(s->recv_message_ready == nullptr);
GPR_ASSERT(!s->pending_byte_stream);
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
if (s->id != 0) {
if (!s->read_closed) {
- already_received = s->frame_storage.length;
+ before = s->frame_storage.length +
+ s->unprocessed_incoming_frames_buffer.length;
+ }
+ }
+ grpc_chttp2_maybe_complete_recv_message(t, s);
+ if (s->id != 0) {
+ if (!s->read_closed && s->frame_storage.length == 0) {
+ size_t after = s->frame_storage.length +
+ s->unprocessed_incoming_frames_buffer_cached_length;
s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
- already_received);
+ before - after);
grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
}
}
- grpc_chttp2_maybe_complete_recv_message(t, s);
}
if (op->recv_trailing_metadata) {
@@ -1867,6 +1875,10 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
}
}
}
+ // save the length of the buffer before handing control back to application
+ // threads. Needed to support correct flow control bookkeeping
+ s->unprocessed_incoming_frames_buffer_cached_length =
+ s->unprocessed_incoming_frames_buffer.length;
if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE);
} else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 6d11e5aa31..ca6e715978 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -550,6 +550,11 @@ struct grpc_chttp2_stream {
grpc_slice_buffer unprocessed_incoming_frames_buffer;
grpc_closure* on_next; /* protected by t combiner */
bool pending_byte_stream; /* protected by t combiner */
+ // cached length of buffer to be used by the transport thread in cases where
+ // stream->pending_byte_stream == true. The value is saved before
+ // application threads are allowed to modify
+ // unprocessed_incoming_frames_buffer
+ size_t unprocessed_incoming_frames_buffer_cached_length;
grpc_closure reset_byte_stream;
grpc_error* byte_stream_error; /* protected by t combiner */
bool received_last_frame; /* protected by t combiner */