diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/transport/chttp2/internal.h | 8 | ||||
-rw-r--r-- | src/core/transport/chttp2/parsing.c | 4 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_lists.c | 8 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 72 |
4 files changed, 48 insertions, 44 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 88eb790d12..7bd91334b4 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -62,7 +62,7 @@ typedef enum { GRPC_CHTTP2_LIST_WRITTEN, GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE, GRPC_CHTTP2_LIST_PARSING_SEEN, - GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING, + GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING, GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED, /** streams that are waiting to start because there are too many concurrent streams on the connection */ @@ -585,10 +585,10 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); -void grpc_chttp2_list_add_cancelled_waiting_for_parsing( +void grpc_chttp2_list_add_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); -int grpc_chttp2_list_pop_cancelled_waiting_for_parsing( +int grpc_chttp2_list_pop_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); @@ -610,6 +610,8 @@ void grpc_chttp2_schedule_closure( grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); +void grpc_chttp2_parsing_remove_stream( + grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 1acf0a4b91..bd04acf60f 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -175,6 +175,10 @@ void grpc_chttp2_publish_reads( /* updating closed status */ if (stream_parsing->received_close) { stream_global->read_closed = 1; + if (stream_global->write_state != WRITE_STATE_OPEN) { + stream_global->in_stream_map = 0; + grpc_chttp2_parsing_remove_stream(transport_parsing, stream_parsing->id); + } grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); } diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 24d8737ceb..b70ac8c9f7 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -250,20 +250,20 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( return r; } -void grpc_chttp2_list_add_cancelled_waiting_for_parsing( +void grpc_chttp2_list_add_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING); + GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING); } -int grpc_chttp2_list_pop_cancelled_waiting_for_parsing( +int grpc_chttp2_list_pop_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global) { grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, - GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING); + GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING); *stream_global = &stream->global; return r; } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index c4838cceb8..39df9fc340 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -78,9 +78,8 @@ static const grpc_transport_vtable vtable; static void lock(grpc_chttp2_transport *t); static void unlock(grpc_chttp2_transport *t); -static void unlock_check_cancellations(grpc_chttp2_transport *t); static void unlock_check_channel_callbacks(grpc_chttp2_transport *t); -static void unlock_check_reads(grpc_chttp2_transport *t); +static void unlock_check_read_write_state(grpc_chttp2_transport *t); /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(void *t, int iomgr_success_ignored); @@ -198,6 +197,7 @@ static void init_transport(grpc_chttp2_transport *t, t->global.incoming_window = DEFAULT_WINDOW; t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; t->global.ping_counter = 1; + t->global.pings.next = t->global.pings.prev = &t->global.pings; t->parsing.is_client = is_client; t->parsing.str_grpc_timeout = grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); @@ -382,6 +382,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || s->global.id == 0); + GPR_ASSERT(!s->global.in_stream_map); grpc_chttp2_unregister_stream(t, s); gpr_mu_unlock(&t->mu); @@ -404,6 +405,12 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( return s ? &s->parsing : NULL; } +void grpc_chttp2_parsing_remove_stream( + grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) { + grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); + grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id); +} + grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) { grpc_chttp2_stream *accepting; @@ -448,8 +455,7 @@ static void unlock(grpc_chttp2_transport *t) { ref_transport(t); grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); } - unlock_check_cancellations(t); - unlock_check_reads(t); + unlock_check_read_write_state(t); /* unlock_check_parser(t); */ unlock_check_channel_callbacks(t); @@ -668,17 +674,28 @@ static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) { * INPUT PROCESSING */ -static void unlock_check_cancellations(grpc_chttp2_transport *t) { +static grpc_stream_state compute_state(gpr_uint8 write_closed, + gpr_uint8 read_closed) { + if (write_closed && read_closed) return GRPC_STREAM_CLOSED; + if (write_closed) return GRPC_STREAM_SEND_CLOSED; + if (read_closed) return GRPC_STREAM_RECV_CLOSED; + return GRPC_STREAM_OPEN; +} + +static void unlock_check_read_write_state(grpc_chttp2_transport *t) { grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_stream_global *stream_global; + grpc_stream_state state; /* if a stream is in the stream map, and gets cancelled, we need to ensure we are not parsing before continuing the cancellation to keep things in a sane state */ if (!t->parsing_active) { - while (grpc_chttp2_list_pop_cancelled_waiting_for_parsing(transport_global, - &stream_global)) { + while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, + &stream_global)) { GPR_ASSERT(stream_global->in_stream_map); + GPR_ASSERT(stream_global->write_state != WRITE_STATE_OPEN); + GPR_ASSERT(stream_global->read_closed); grpc_chttp2_stream_map_delete(&t->parsing_stream_map, stream_global->id); stream_global->in_stream_map = 0; grpc_chttp2_list_add_read_write_state_changed(transport_global, @@ -686,46 +703,27 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) { } } -#if 0 - grpc_chttp2_stream *s; - - if (t->writing_active) { - return; - } - - while ((s = stream_list_remove_head(t, CANCELLED))) { - s->global.read_closed = 1; - s->global.write_state = WRITE_STATE_SENT_CLOSE; - grpc_chttp2_list_add_read_write_state_changed(&t->global, &s->global); - } -#endif -} - -static grpc_stream_state compute_state(gpr_uint8 write_closed, - gpr_uint8 read_closed) { - if (write_closed && read_closed) return GRPC_STREAM_CLOSED; - if (write_closed) return GRPC_STREAM_SEND_CLOSED; - if (read_closed) return GRPC_STREAM_RECV_CLOSED; - return GRPC_STREAM_OPEN; -} - -static void unlock_check_reads(grpc_chttp2_transport *t) { - grpc_chttp2_stream_global *stream_global; - grpc_stream_state state; - - while (grpc_chttp2_list_pop_read_write_state_changed(&t->global, &stream_global)) { + while (grpc_chttp2_list_pop_read_write_state_changed(transport_global, &stream_global)) { if (!stream_global->publish_sopb) { continue; } + if (stream_global->write_state != WRITE_STATE_OPEN && stream_global->read_closed && stream_global->in_stream_map) { + if (t->parsing_active) { + grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, stream_global); + } else { + grpc_chttp2_stream_map_delete(&t->parsing_stream_map, stream_global->id); + stream_global->in_stream_map = 0; + } + } state = compute_state(stream_global->write_state == WRITE_STATE_SENT_CLOSE, stream_global->read_closed && !stream_global->in_stream_map); - gpr_log(GPR_DEBUG, "ws:%d rc:%d ism:%d => st:%d", stream_global->write_state, stream_global->read_closed, stream_global->in_stream_map, state); + gpr_log(GPR_DEBUG, "cl:%d id:%d ws:%d rc:%d ism:%d => st:%d", t->global.is_client, stream_global->id, stream_global->write_state, stream_global->read_closed, stream_global->in_stream_map, state); if (stream_global->incoming_sopb.nops == 0 && state == stream_global->published_state) { continue; } grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(&stream_global->incoming_metadata, &stream_global->incoming_sopb, &stream_global->outstanding_metadata); grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb); stream_global->published_state = *stream_global->publish_state = state; - grpc_chttp2_schedule_closure(&t->global, stream_global->recv_done_closure, 1); + grpc_chttp2_schedule_closure(transport_global, stream_global->recv_done_closure, 1); stream_global->recv_done_closure = NULL; stream_global->publish_sopb = NULL; stream_global->publish_state = NULL; |