aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/transport/chttp2_transport.c42
1 files changed, 31 insertions, 11 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 4194d9f1bd..6d3a825f2e 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -60,6 +60,8 @@
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu
+#define MAX_CLIENT_STREAM_ID 0x7fffffffu
+
#define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define CLIENT_CONNECT_STRLEN 24
@@ -1019,16 +1021,36 @@ static void perform_write(transport *t, grpc_endpoint *ep) {
}
}
+static void add_goaway(transport *t, gpr_uint32 goaway_error, gpr_slice goaway_text) {
+ if (t->num_pending_goaways == t->cap_pending_goaways) {
+ t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2);
+ t->pending_goaways =
+ gpr_realloc(t->pending_goaways,
+ sizeof(pending_goaway) * t->cap_pending_goaways);
+ }
+ t->pending_goaways[t->num_pending_goaways].status =
+ grpc_chttp2_http2_error_to_grpc_status(goaway_error);
+ t->pending_goaways[t->num_pending_goaways].debug = goaway_text;
+ t->num_pending_goaways++;
+}
+
+
static void maybe_start_some_streams(transport *t) {
+ /* start streams where we have free stream ids and free concurrency */
while (
+ t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
grpc_chttp2_stream_map_size(&t->stream_map) <
t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
- if (!s) break;
+ if (!s) return;
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d",
t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
+ if (t->next_stream_id == MAX_CLIENT_STREAM_ID) {
+ add_goaway(t, GRPC_CHTTP2_NO_ERROR, gpr_slice_from_copied_string("Exceeded sequence number limit"));
+ }
+
GPR_ASSERT(s->id == 0);
s->id = t->next_stream_id;
t->next_stream_id += 2;
@@ -1039,6 +1061,13 @@ static void maybe_start_some_streams(transport *t) {
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
stream_list_join(t, s, WRITABLE);
}
+ /* cancel out streams that will never be started */
+ while (t->next_stream_id > MAX_CLIENT_STREAM_ID) {
+ stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
+ if (!s) return;
+
+ cancel_stream(t, s, GRPC_STATUS_UNAVAILABLE, grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL, 0);
+ }
}
static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
@@ -1594,16 +1623,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes));
}
if (st.goaway) {
- if (t->num_pending_goaways == t->cap_pending_goaways) {
- t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2);
- t->pending_goaways =
- gpr_realloc(t->pending_goaways,
- sizeof(pending_goaway) * t->cap_pending_goaways);
- }
- t->pending_goaways[t->num_pending_goaways].status =
- grpc_chttp2_http2_error_to_grpc_status(st.goaway_error);
- t->pending_goaways[t->num_pending_goaways].debug = st.goaway_text;
- t->num_pending_goaways++;
+ add_goaway(t, st.goaway_error, st.goaway_text);
}
if (st.process_ping_reply) {
for (i = 0; i < t->ping_count; i++) {