diff options
Diffstat (limited to 'src/core/transport/chttp2')
-rw-r--r-- | src/core/transport/chttp2/frame_window_update.c | 4 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 20 | ||||
-rw-r--r-- | src/core/transport/chttp2/parsing.c | 7 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_lists.c | 7 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 49 |
5 files changed, 62 insertions, 25 deletions
diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c index b817df7745..d624298ad2 100644 --- a/src/core/transport/chttp2/frame_window_update.c +++ b/src/core/transport/chttp2/frame_window_update.c @@ -94,8 +94,8 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( } GPR_ASSERT(is_last); - if (transport_parsing->incoming_stream_id) { - if (stream_parsing) { + if (transport_parsing->incoming_stream_id != 0) { + if (stream_parsing != NULL) { GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("update", transport_parsing, stream_parsing, outgoing_window_update, p->amount); diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index bdd4b432eb..e5e6f445b7 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -353,7 +353,19 @@ typedef struct { /** window available for us to send to peer */ gpr_int64 outgoing_window; - /** window available for peer to send to us - updated after parse */ + /** The number of bytes the upper layers have offered to receive. + As the upper layer offers more bytes, this value increases. + As bytes are read, this value decreases. */ + gpr_uint32 max_recv_bytes; + /** The number of bytes the upper layer has offered to read but we have + not yet announced to HTTP2 flow control. + As the upper layers offer to read more bytes, this value increases. + As we advertise incoming flow control window, this value decreases. */ + gpr_uint32 unannounced_incoming_window; + /** The number of bytes of HTTP2 flow control we have advertised. + As we advertise incoming flow control window, this value increases. + As bytes are read, this value decreases. + Updated after parse. */ gpr_uint32 incoming_window; /** stream ops the transport user would like to send */ grpc_stream_op_buffer *outgoing_sopb; @@ -391,6 +403,8 @@ typedef struct { grpc_stream_op_buffer sopb; /** how strongly should we indicate closure with the next write */ grpc_chttp2_send_closed send_closed; + /** how much window should we announce? */ + gpr_uint32 announce_window; } grpc_chttp2_stream_writing; struct grpc_chttp2_stream_parsing { @@ -501,7 +515,9 @@ void grpc_chttp2_list_add_writable_window_update_stream( grpc_chttp2_stream_global *stream_global); int grpc_chttp2_list_pop_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global **stream_global); + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_global **stream_global, + grpc_chttp2_stream_writing **stream_writing); void grpc_chttp2_list_remove_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 9597395aab..82362544d5 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -173,7 +173,14 @@ void grpc_chttp2_publish_reads( GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( "parsed", transport_parsing, stream_parsing, incoming_window_delta, -(gpr_int64)stream_parsing->incoming_window_delta); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "parsed", transport_parsing, stream_global, max_recv_bytes, + -(gpr_int64)stream_parsing->incoming_window_delta); stream_global->incoming_window -= stream_parsing->incoming_window_delta; + GPR_ASSERT(stream_global->max_recv_bytes >= + stream_parsing->incoming_window_delta); + stream_global->max_recv_bytes -= + stream_parsing->incoming_window_delta; stream_parsing->incoming_window_delta = 0; grpc_chttp2_list_add_writable_window_update_stream(transport_global, stream_global); diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 4fea058c19..590f6abfbc 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -139,6 +139,7 @@ static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { + GPR_ASSERT(stream_global->id != 0); stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); } @@ -204,6 +205,7 @@ int grpc_chttp2_list_pop_written_stream( void grpc_chttp2_list_add_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { + GPR_ASSERT(stream_global->id != 0); stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); @@ -211,11 +213,14 @@ void grpc_chttp2_list_add_writable_window_update_stream( int grpc_chttp2_list_pop_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global **stream_global) { + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_global **stream_global, + grpc_chttp2_stream_writing **stream_writing) { grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); *stream_global = &stream->global; + *stream_writing = &stream->writing; return r; } diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index a78654334e..d8ec117aa5 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -66,11 +66,9 @@ int grpc_chttp2_unlocking_check_writes( /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ - while (transport_global->outgoing_window && - grpc_chttp2_list_pop_writable_stream(transport_global, + while (grpc_chttp2_list_pop_writable_stream(transport_global, transport_writing, &stream_global, - &stream_writing) && - stream_global->outgoing_window > 0) { + &stream_writing)) { stream_writing->id = stream_global->id; window_delta = grpc_chttp2_preencode( stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops, @@ -106,20 +104,21 @@ int grpc_chttp2_unlocking_check_writes( /* for each grpc_chttp2_stream that wants to update its window, add that * window here */ while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, - &stream_global)) { - window_delta = - transport_global->settings[GRPC_LOCAL_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - - stream_global->incoming_window; - if (!stream_global->read_closed && window_delta > 0) { - gpr_slice_buffer_add( - &transport_writing->outbuf, - grpc_chttp2_window_update_create(stream_global->id, window_delta)); + transport_writing, + &stream_global, + &stream_writing)) { + stream_writing->id = stream_global->id; + if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) { + stream_writing->announce_window = stream_global->unannounced_incoming_window; GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, - incoming_window, window_delta); - stream_global->incoming_window += window_delta; + incoming_window, stream_global->unannounced_incoming_window); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, + unannounced_incoming_window, -(gpr_int64)stream_global->unannounced_incoming_window); + stream_global->incoming_window += stream_global->unannounced_incoming_window; + stream_global->unannounced_incoming_window = 0; grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } } @@ -169,10 +168,19 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { while ( grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { - grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, - stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, - stream_writing->id, &transport_writing->hpack_compressor, - &transport_writing->outbuf); + if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { + grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, + stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, + stream_writing->id, &transport_writing->hpack_compressor, + &transport_writing->outbuf); + } + if (stream_writing->announce_window > 0) { + gpr_slice_buffer_add( + &transport_writing->outbuf, + grpc_chttp2_window_update_create( + stream_writing->id, stream_writing->announce_window)); + stream_writing->announce_window = 0; + } stream_writing->sopb.nops = 0; if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) { gpr_slice_buffer_add(&transport_writing->outbuf, @@ -197,7 +205,8 @@ void grpc_chttp2_cleanup_writing( while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { - if (stream_global->outgoing_sopb->nops == 0) { + if (stream_global->outgoing_sopb != NULL && + stream_global->outgoing_sopb->nops == 0) { stream_global->outgoing_sopb = NULL; grpc_chttp2_schedule_closure(transport_global, stream_global->send_done_closure, 1); |