aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2/writing.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2/writing.c')
-rw-r--r--src/core/transport/chttp2/writing.c94
1 files changed, 53 insertions, 41 deletions
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index d8ec117aa5..d39b0c42f7 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -44,6 +44,7 @@ int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_writing *stream_writing;
+ grpc_chttp2_stream_global *first_reinserted_stream = NULL;
gpr_uint32 window_delta;
/* simple writes are queued to qbuf, and flushed here */
@@ -64,50 +65,54 @@ int grpc_chttp2_unlocking_check_writes(
}
/* for each grpc_chttp2_stream that's become writable, frame it's data
- (according to
- available window sizes) and add to the output buffer */
- while (grpc_chttp2_list_pop_writable_stream(transport_global,
- transport_writing, &stream_global,
- &stream_writing)) {
+ (according to available window sizes) and add to the output buffer */
+ while (grpc_chttp2_list_pop_writable_stream(
+ transport_global, transport_writing, &stream_global, &stream_writing)) {
+ if (stream_global == first_reinserted_stream) {
+ /* prevent infinite loop */
+ grpc_chttp2_list_add_first_writable_stream(transport_global,
+ stream_global);
+ break;
+ }
+
stream_writing->id = stream_global->id;
- window_delta = grpc_chttp2_preencode(
- stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops,
- GPR_MIN(transport_global->outgoing_window,
- stream_global->outgoing_window),
- &stream_writing->sopb);
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "write", transport_global, outgoing_window, -(gpr_int64)window_delta);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
- outgoing_window, -(gpr_int64)window_delta);
- transport_global->outgoing_window -= window_delta;
- stream_global->outgoing_window -= window_delta;
-
- if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE &&
- stream_global->outgoing_sopb->nops == 0) {
- if (!transport_global->is_client && !stream_global->read_closed) {
- stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM;
- } else {
- stream_writing->send_closed = GRPC_SEND_CLOSED;
+ stream_writing->send_closed = GRPC_DONT_SEND_CLOSED;
+ GPR_ASSERT(!stream_global->writing_now);
+
+ if (stream_global->outgoing_sopb) {
+ window_delta =
+ grpc_chttp2_preencode(stream_global->outgoing_sopb->ops,
+ &stream_global->outgoing_sopb->nops,
+ GPR_MIN(transport_global->outgoing_window,
+ stream_global->outgoing_window),
+ &stream_writing->sopb);
+ GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
+ "write", transport_global, outgoing_window, -(gpr_int64)window_delta);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
+ outgoing_window,
+ -(gpr_int64)window_delta);
+ transport_global->outgoing_window -= window_delta;
+ stream_global->outgoing_window -= window_delta;
+
+ if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE &&
+ stream_global->outgoing_sopb->nops == 0) {
+ if (!transport_global->is_client && !stream_global->read_closed) {
+ stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM;
+ } else {
+ stream_writing->send_closed = GRPC_SEND_CLOSED;
+ }
}
- }
- if (stream_writing->sopb.nops > 0 ||
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
- grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
- }
- if (stream_global->outgoing_window > 0 &&
- stream_global->outgoing_sopb->nops != 0) {
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ if (stream_global->outgoing_window > 0 &&
+ stream_global->outgoing_sopb->nops != 0) {
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ if (first_reinserted_stream == NULL &&
+ transport_global->outgoing_window == 0) {
+ first_reinserted_stream = stream_global;
+ }
+ }
}
- }
- /* for each grpc_chttp2_stream that wants to update its window, add that
- * window here */
- while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global,
- transport_writing,
- &stream_global,
- &stream_writing)) {
- stream_writing->id = stream_global->id;
if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) {
stream_writing->announce_window = stream_global->unannounced_incoming_window;
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
@@ -118,6 +123,11 @@ int grpc_chttp2_unlocking_check_writes(
stream_global->unannounced_incoming_window = 0;
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
+ stream_global->writing_now = 1;
+ grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
+ } else if (stream_writing->sopb.nops > 0 ||
+ stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
+ stream_global->writing_now = 1;
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
}
}
@@ -205,6 +215,8 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
+ GPR_ASSERT(stream_global->writing_now);
+ stream_global->writing_now = 0;
if (stream_global->outgoing_sopb != NULL &&
stream_global->outgoing_sopb->nops == 0) {
stream_global->outgoing_sopb = NULL;
@@ -216,9 +228,9 @@ void grpc_chttp2_cleanup_writing(
if (!transport_global->is_client) {
stream_global->read_closed = 1;
}
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
}
+ grpc_chttp2_list_add_read_write_state_changed(transport_global,
+ stream_global);
}
transport_writing->outbuf.count = 0;
transport_writing->outbuf.length = 0;