aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Nicolas Noble <nicolasnoble@users.noreply.github.com>2015-04-20 15:18:13 -0700
committerGravatar Nicolas Noble <nicolasnoble@users.noreply.github.com>2015-04-20 15:18:13 -0700
commitfe00787fec195499fbc4e95b0f6414616c347972 (patch)
treed65e2fd69a9fd8293bc2cbeefac6ae631eda126d
parent291800b78f73ad974a4e472555ac4845a2b32aa0 (diff)
parent42c15a35e54a4ae8722d2558cfaf1bfe7b5a624a (diff)
Merge pull request #1318 from yang-g/flowfix
Fix the flow control issue.
-rw-r--r--src/core/transport/chttp2/frame.h1
-rw-r--r--src/core/transport/chttp2/frame_settings.c8
-rw-r--r--src/core/transport/chttp2_transport.c24
3 files changed, 27 insertions, 6 deletions
diff --git a/src/core/transport/chttp2/frame.h b/src/core/transport/chttp2/frame.h
index fbb941969e..ac76c4cc9c 100644
--- a/src/core/transport/chttp2/frame.h
+++ b/src/core/transport/chttp2/frame.h
@@ -54,6 +54,7 @@ typedef struct {
gpr_uint8 process_ping_reply;
gpr_uint8 goaway;
+ gpr_int64 initial_window_update;
gpr_uint32 window_update;
gpr_uint32 goaway_last_stream_index;
gpr_uint32 goaway_error;
diff --git a/src/core/transport/chttp2/frame_settings.c b/src/core/transport/chttp2/frame_settings.c
index 8d3250c34f..2ffce730d5 100644
--- a/src/core/transport/chttp2/frame_settings.c
+++ b/src/core/transport/chttp2/frame_settings.c
@@ -218,6 +218,14 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
return GRPC_CHTTP2_CONNECTION_ERROR;
}
}
+ if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
+ parser->incoming_settings[parser->id] != parser->value) {
+ state->initial_window_update =
+ (gpr_int64)parser->value -
+ parser->incoming_settings[parser->id];
+ gpr_log(GPR_DEBUG, "adding %d for initial_window change",
+ (int)state->initial_window_update);
+ }
parser->incoming_settings[parser->id] = parser->value;
if (grpc_http_trace) {
gpr_log(GPR_DEBUG, "CHTTP2: got setting %d = %d", parser->id,
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 110a4b544f..995d64015a 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -276,8 +276,8 @@ struct transport {
struct stream {
gpr_uint32 id;
- gpr_uint32 outgoing_window;
gpr_uint32 incoming_window;
+ gpr_int64 outgoing_window;
/* when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
@@ -852,7 +852,8 @@ static int prepare_write(transport *t) {
/* for each stream that's become writable, frame it's data (according to
available window sizes) and add to the output buffer */
- while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE))) {
+ while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
+ s->outgoing_window > 0) {
window_delta = grpc_chttp2_preencode(
s->outgoing_sopb.ops, &s->outgoing_sopb.nops,
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
@@ -867,7 +868,7 @@ static int prepare_write(transport *t) {
/* if there are still writes to do and the stream still has window
available, then schedule a further write */
- if (s->outgoing_sopb.nops && s->outgoing_window) {
+ if (s->outgoing_sopb.nops > 0 && s->outgoing_window > 0) {
GPR_ASSERT(!t->outgoing_window);
stream_list_add_tail(t, s, WRITABLE);
}
@@ -1430,8 +1431,8 @@ static int init_frame_parser(transport *t) {
}
}
-static int is_window_update_legal(gpr_uint32 window_update, gpr_uint32 window) {
- return window_update < MAX_WINDOW - window;
+static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
+ return window + window_update < MAX_WINDOW;
}
static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
@@ -1485,12 +1486,23 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
}
}
}
+ if (st.initial_window_update) {
+ for (i = 0; i < t->stream_map.count; i++) {
+ stream *s = (stream*)(t->stream_map.values[i]);
+ int was_window_empty = s->outgoing_window <= 0;
+ s->outgoing_window += st.initial_window_update;
+ if (was_window_empty && s->outgoing_window > 0 &&
+ s->outgoing_sopb.nops > 0) {
+ stream_list_join(t, s, WRITABLE);
+ }
+ }
+ }
if (st.window_update) {
if (t->incoming_stream_id) {
/* if there was a stream id, this is for some stream */
stream *s = lookup_stream(t, t->incoming_stream_id);
if (s) {
- int was_window_empty = s->outgoing_window == 0;
+ int was_window_empty = s->outgoing_window <= 0;
if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
GRPC_CHTTP2_FLOW_CONTROL_ERROR),