diff options
-rw-r--r-- | src/core/transport/chttp2_transport.c | 42 |
1 files changed, 31 insertions, 11 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 4194d9f1bd..6d3a825f2e 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -60,6 +60,8 @@ #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) #define MAX_WINDOW 0x7fffffffu +#define MAX_CLIENT_STREAM_ID 0x7fffffffu + #define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define CLIENT_CONNECT_STRLEN 24 @@ -1019,16 +1021,36 @@ static void perform_write(transport *t, grpc_endpoint *ep) { } } +static void add_goaway(transport *t, gpr_uint32 goaway_error, gpr_slice goaway_text) { + if (t->num_pending_goaways == t->cap_pending_goaways) { + t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2); + t->pending_goaways = + gpr_realloc(t->pending_goaways, + sizeof(pending_goaway) * t->cap_pending_goaways); + } + t->pending_goaways[t->num_pending_goaways].status = + grpc_chttp2_http2_error_to_grpc_status(goaway_error); + t->pending_goaways[t->num_pending_goaways].debug = goaway_text; + t->num_pending_goaways++; +} + + static void maybe_start_some_streams(transport *t) { + /* start streams where we have free stream ids and free concurrency */ while ( + t->next_stream_id <= MAX_CLIENT_STREAM_ID && grpc_chttp2_stream_map_size(&t->stream_map) < t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); - if (!s) break; + if (!s) return; IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d", t->is_client ? "CLI" : "SVR", s, t->next_stream_id)); + if (t->next_stream_id == MAX_CLIENT_STREAM_ID) { + add_goaway(t, GRPC_CHTTP2_NO_ERROR, gpr_slice_from_copied_string("Exceeded sequence number limit")); + } + GPR_ASSERT(s->id == 0); s->id = t->next_stream_id; t->next_stream_id += 2; @@ -1039,6 +1061,13 @@ static void maybe_start_some_streams(transport *t) { grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); stream_list_join(t, s, WRITABLE); } + /* cancel out streams that will never be started */ + while (t->next_stream_id > MAX_CLIENT_STREAM_ID) { + stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); + if (!s) return; + + cancel_stream(t, s, GRPC_STATUS_UNAVAILABLE, grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL, 0); + } } static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { @@ -1594,16 +1623,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes)); } if (st.goaway) { - if (t->num_pending_goaways == t->cap_pending_goaways) { - t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2); - t->pending_goaways = - gpr_realloc(t->pending_goaways, - sizeof(pending_goaway) * t->cap_pending_goaways); - } - t->pending_goaways[t->num_pending_goaways].status = - grpc_chttp2_http2_error_to_grpc_status(st.goaway_error); - t->pending_goaways[t->num_pending_goaways].debug = st.goaway_text; - t->num_pending_goaways++; + add_goaway(t, st.goaway_error, st.goaway_text); } if (st.process_ping_reply) { for (i = 0; i < t->ping_count; i++) { |