aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport')
-rw-r--r--src/core/transport/chttp2/internal.h14
-rw-r--r--src/core/transport/chttp2/parsing.c7
-rw-r--r--src/core/transport/chttp2/writing.c16
-rw-r--r--src/core/transport/chttp2_transport.c17
-rw-r--r--src/core/transport/transport.h6
-rw-r--r--src/core/transport/transport_op_string.c3
6 files changed, 52 insertions, 11 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index bdd4b432eb..9d4bfc2651 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -353,7 +353,19 @@ typedef struct {
/** window available for us to send to peer */
gpr_int64 outgoing_window;
- /** window available for peer to send to us - updated after parse */
+ /** The number of bytes the upper layers have offered to receive.
+ As the upper layer offers more bytes, this value increases.
+ As bytes are read, this value decreases. */
+ gpr_uint32 max_recv_bytes;
+ /** The number of bytes the upper layer has offered to read but we have
+ not yet announced to HTTP2 flow control.
+ As the upper layers offer to read more bytes, this value increases.
+ As we advertise incoming flow control window, this value decreases. */
+ gpr_uint32 unannounced_incoming_window;
+ /** The number of bytes of HTTP2 flow control we have advertised.
+ As we advertise incoming flow control window, this value increases.
+ As bytes are read, this value decreases.
+ Updated after parse. */
gpr_uint32 incoming_window;
/** stream ops the transport user would like to send */
grpc_stream_op_buffer *outgoing_sopb;
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 130167f830..2da96be452 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -173,7 +173,14 @@ void grpc_chttp2_publish_reads(
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
"parsed", transport_parsing, stream_parsing, incoming_window_delta,
-(gpr_int64)stream_parsing->incoming_window_delta);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
+ "parsed", transport_parsing, stream_global, max_recv_bytes,
+ -(gpr_int64)stream_parsing->incoming_window_delta);
stream_global->incoming_window -= stream_parsing->incoming_window_delta;
+ GPR_ASSERT(stream_global->max_recv_bytes >=
+ stream_parsing->incoming_window_delta);
+ stream_global->max_recv_bytes -=
+ stream_parsing->incoming_window_delta;
stream_parsing->incoming_window_delta = 0;
grpc_chttp2_list_add_writable_window_update_stream(transport_global,
stream_global);
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index a78654334e..d49caa4870 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -107,17 +107,17 @@ int grpc_chttp2_unlocking_check_writes(
* window here */
while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global,
&stream_global)) {
- window_delta =
- transport_global->settings[GRPC_LOCAL_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
- stream_global->incoming_window;
- if (!stream_global->read_closed && window_delta > 0) {
+ if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) {
gpr_slice_buffer_add(
&transport_writing->outbuf,
- grpc_chttp2_window_update_create(stream_global->id, window_delta));
+ grpc_chttp2_window_update_create(
+ stream_global->id, stream_global->unannounced_incoming_window));
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
- incoming_window, window_delta);
- stream_global->incoming_window += window_delta;
+ incoming_window, stream_global->unannounced_incoming_window);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
+ unannounced_incoming_window, -(gpr_int64)stream_global->unannounced_incoming_window);
+ stream_global->incoming_window += stream_global->unannounced_incoming_window;
+ stream_global->unannounced_incoming_window = 0;
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 322ff39820..9c82c02c57 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -358,7 +358,9 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
s->global.outgoing_window =
t->global.settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->parsing.incoming_window = s->global.incoming_window =
+ s->global.max_recv_bytes =
+ s->parsing.incoming_window =
+ s->global.incoming_window =
t->global.settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
@@ -562,6 +564,8 @@ static void maybe_start_some_streams(
stream_global->incoming_window =
transport_global->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ stream_global->max_recv_bytes =
+ GPR_MAX(stream_global->incoming_window, stream_global->max_recv_bytes);
grpc_chttp2_stream_map_add(
&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
@@ -570,6 +574,9 @@ static void maybe_start_some_streams(
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_list_add_writable_window_update_stream(transport_global,
+ stream_global);
+
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -620,6 +627,14 @@ static void perform_stream_op_locked(
stream_global->publish_sopb = op->recv_ops;
stream_global->publish_sopb->nops = 0;
stream_global->publish_state = op->recv_state;
+ if (stream_global->max_recv_bytes < op->max_recv_bytes) {
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("op", transport_global, stream_global,
+ unannounced_incoming_window, op->max_recv_bytes - stream_global->max_recv_bytes);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("op", transport_global, stream_global,
+ max_recv_bytes, op->max_recv_bytes - stream_global->max_recv_bytes);
+ stream_global->unannounced_incoming_window += op->max_recv_bytes - stream_global->max_recv_bytes;
+ stream_global->max_recv_bytes = op->max_recv_bytes;
+ }
grpc_chttp2_incoming_metadata_live_op_buffer_end(
&stream_global->outstanding_metadata);
grpc_chttp2_list_add_read_write_state_changed(transport_global,
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 1429737721..d5bec63f66 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -72,6 +72,12 @@ typedef struct grpc_transport_stream_op {
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
+ /** The number of bytes this peer is currently prepared to receive.
+
+ Bytes offered are used to replenish per-stream flow control windows.
+ Offers are not retractable: if 5 bytes are offered and no bytes are read,
+ a later offer of 3 bytes still implies that 5 have been offered. */
+ gpr_uint32 max_recv_bytes;
grpc_iomgr_closure *on_done_recv;
grpc_pollset *bind_pollset;
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
index 0da396a320..862eb40c4b 100644
--- a/src/core/transport/transport_op_string.c
+++ b/src/core/transport/transport_op_string.c
@@ -128,7 +128,8 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
if (op->recv_ops) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
- gpr_strvec_add(&b, gpr_strdup("RECV"));
+ gpr_asprintf(&tmp, "RECV:max_recv_bytes=%d", op->max_recv_bytes);
+ gpr_strvec_add(&b, tmp);
}
if (op->bind_pollset) {