From 348f3a234fe6fbb74d2e340ac2df27d8c76b04ca Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 27 Jan 2016 16:17:32 -0800 Subject: Use a separate list for streams stalled by transport in writing path --- src/core/transport/chttp2/internal.h | 12 +++++++++--- src/core/transport/chttp2/stream_lists.c | 18 ++++++++++++++++-- src/core/transport/chttp2/writing.c | 14 ++++++++------ 3 files changed, 33 insertions(+), 11 deletions(-) (limited to 'src/core/transport/chttp2') diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index a8262b7af2..ba7dcc65a5 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -67,6 +67,9 @@ typedef enum { GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT, + /* streams waiting for the outgoing window in the writing path, they will be + * merged to the stalled list or writable list under transport lock. */ + GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT, /** streams that are waiting to start because there are too many concurrent streams on the connection */ GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY, @@ -504,11 +507,11 @@ void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); -/** Get a writable stream - returns non-zero if there was a stream available */ void grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); +/** Get a writable stream + returns non-zero if there was a stream available */ int grpc_chttp2_list_pop_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, @@ -560,9 +563,12 @@ int grpc_chttp2_list_pop_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); -void grpc_chttp2_list_add_stalled_by_transport( +void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); +void grpc_chttp2_list_flush_writing_stalled_by_transport( + grpc_chttp2_transport_writing *transport_writing); + int grpc_chttp2_list_pop_stalled_by_transport( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 273a513e2f..41ddf8d291 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -313,12 +313,26 @@ int grpc_chttp2_list_pop_check_read_ops( return r; } -void grpc_chttp2_list_add_stalled_by_transport( +void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), STREAM_FROM_WRITING(stream_writing), - GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); + GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT); +} + +void grpc_chttp2_list_flush_writing_stalled_by_transport( + grpc_chttp2_transport_writing *transport_writing) { + grpc_chttp2_stream *stream; + grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing); + while (stream_list_pop(transport, &stream, + GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) { + if (transport_writing->outgoing_window > 0) { + grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global); + } else { + stream_list_add(transport, stream, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); + } + } } int grpc_chttp2_list_pop_stalled_by_transport( diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index fdad05b5fb..36916f10ce 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -130,8 +130,8 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); } } else { - grpc_chttp2_list_add_stalled_by_transport(transport_writing, - stream_writing); + grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, + stream_writing); } } if (stream_global->send_trailing_metadata) { @@ -273,8 +273,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, stream_writing->sent_message = 1; } } else if (transport_writing->outgoing_window == 0) { - grpc_chttp2_list_add_stalled_by_transport(transport_writing, - stream_writing); + grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, + stream_writing); grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } } @@ -312,8 +312,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, /* do nothing - already reffed */ } } else { - grpc_chttp2_list_add_stalled_by_transport(transport_writing, - stream_writing); + grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, + stream_writing); grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } } else { @@ -330,6 +330,8 @@ void grpc_chttp2_cleanup_writing( grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; + grpc_chttp2_list_flush_writing_stalled_by_transport(transport_writing); + while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { if (stream_writing->sent_initial_metadata) { -- cgit v1.2.3