aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/surface/byte_buffer_queue.c8
-rw-r--r--src/core/surface/byte_buffer_queue.h2
-rw-r--r--src/core/surface/call.c13
-rw-r--r--src/core/transport/chttp2_transport.c43
-rw-r--r--src/core/transport/transport.h6
-rw-r--r--src/core/transport/transport_op_string.c3
6 files changed, 61 insertions, 14 deletions
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c
index 7c31bfe5da..e47dc4f4ce 100644
--- a/src/core/surface/byte_buffer_queue.c
+++ b/src/core/surface/byte_buffer_queue.c
@@ -62,6 +62,7 @@ int grpc_bbq_empty(grpc_byte_buffer_queue *q) {
}
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) {
+ q->bytes += grpc_byte_buffer_length(buffer);
bba_push(&q->filling, buffer);
}
@@ -72,8 +73,11 @@ void grpc_bbq_flush(grpc_byte_buffer_queue *q) {
}
}
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q) { return q->bytes; }
+
grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
grpc_bbq_array temp_array;
+ grpc_byte_buffer *out;
if (q->drain_pos == q->draining.count) {
if (q->filling.count == 0) {
@@ -87,5 +91,7 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
q->draining = temp_array;
}
- return q->draining.data[q->drain_pos++];
+ out = q->draining.data[q->drain_pos++];
+ q->bytes -= grpc_byte_buffer_length(out);
+ return out;
}
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h
index 32c57f8756..f01958984f 100644
--- a/src/core/surface/byte_buffer_queue.h
+++ b/src/core/surface/byte_buffer_queue.h
@@ -49,6 +49,7 @@ typedef struct {
size_t drain_pos;
grpc_bbq_array filling;
grpc_bbq_array draining;
+ size_t bytes;
} grpc_byte_buffer_queue;
void grpc_bbq_destroy(grpc_byte_buffer_queue *q);
@@ -56,5 +57,6 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q);
void grpc_bbq_flush(grpc_byte_buffer_queue *q);
int grpc_bbq_empty(grpc_byte_buffer_queue *q);
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb);
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q);
#endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index cf0a595147..9d8913f5b0 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -449,6 +449,8 @@ static void unlock(grpc_call *call) {
int completing_requests = 0;
int start_op = 0;
int i;
+ const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536;
+ size_t buffered_bytes;
memset(&op, 0, sizeof(op));
@@ -461,6 +463,17 @@ static void unlock(grpc_call *call) {
op.recv_state = &call->recv_state;
op.on_done_recv = call_on_done_recv;
op.recv_user_data = call;
+ if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
+ op.max_recv_bytes = call->incoming_message_length -
+ call->incoming_message.length + MAX_RECV_PEEK_AHEAD;
+ } else {
+ buffered_bytes = grpc_bbq_bytes(&call->incoming_queue);
+ if (buffered_bytes > MAX_RECV_PEEK_AHEAD) {
+ op.max_recv_bytes = 0;
+ } else {
+ op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes;
+ }
+ }
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
start_op = 1;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 1cd1dc822d..e10bb6a13e 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -317,6 +317,18 @@ struct transport {
struct stream {
gpr_uint32 id;
+ /** The number of bytes the upper layers have offered to receive.
+ As the upper layer offers more bytes, this value increases.
+ As bytes are read, this value decreases. */
+ gpr_uint32 max_recv_bytes;
+ /** The number of bytes the upper layer has offered to read but we have
+ not yet announced to HTTP2 flow control.
+ As the upper layers offer to read more bytes, this value increases.
+ As we advertise incoming flow control window, this value decreases. */
+ gpr_uint32 unannounced_incoming_window;
+ /** The number of bytes of HTTP2 flow control we have advertised.
+ As we advertise incoming flow control window, this value increases.
+ As bytes are read, this value decreases. */
gpr_uint32 incoming_window;
gpr_int64 outgoing_window;
/* when the application requests writes be closed, the write_closed is
@@ -662,7 +674,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
s->id = (gpr_uint32)(gpr_uintptr)server_data;
s->outgoing_window =
t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->incoming_window =
+ s->max_recv_bytes = s->incoming_window =
t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
t->incoming_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
@@ -978,14 +990,13 @@ static int prepare_write(transport *t) {
/* for each stream that wants to update its window, add that window here */
while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
- window_delta =
- t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
- s->incoming_window;
- if (!s->read_closed && window_delta) {
- gpr_slice_buffer_add(
- &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
- FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
- s->incoming_window += window_delta;
+ if (!s->read_closed && s->unannounced_incoming_window > 0) {
+ gpr_slice_buffer_add(&t->outbuf,
+ grpc_chttp2_window_update_create(
+ s->id, s->unannounced_incoming_window));
+ FLOWCTL_TRACE(t, s, incoming, s->id, s->unannounced_incoming_window);
+ s->incoming_window += s->unannounced_incoming_window;
+ s->unannounced_incoming_window = 0;
}
}
@@ -1112,8 +1123,10 @@ static void maybe_start_some_streams(transport *t) {
t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->incoming_window =
t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ s->max_recv_bytes = GPR_MAX(s->incoming_window, s->max_recv_bytes);
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
stream_list_join(t, s, WRITABLE);
+ maybe_join_window_updates(t, s);
}
/* cancel out streams that will never be started */
while (t->next_stream_id > MAX_CLIENT_STREAM_ID) {
@@ -1167,6 +1180,10 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
s->incoming_sopb = op->recv_ops;
s->incoming_sopb->nops = 0;
s->publish_state = op->recv_state;
+ if (s->max_recv_bytes < op->max_recv_bytes) {
+ s->unannounced_incoming_window += op->max_recv_bytes - s->max_recv_bytes;
+ s->max_recv_bytes = op->max_recv_bytes;
+ }
gpr_free(s->old_incoming_metadata);
s->old_incoming_metadata = NULL;
maybe_finish_read(t, s);
@@ -1351,10 +1368,10 @@ static void maybe_finish_read(transport *t, stream *s) {
static void maybe_join_window_updates(transport *t, stream *s) {
if (s->incoming_sopb != NULL &&
- s->incoming_window <
+ s->unannounced_incoming_window >
t->settings[LOCAL_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
- 3 / 4) {
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] /
+ 4) {
stream_list_join(t, s, WINDOW_UPDATE);
}
}
@@ -1376,6 +1393,8 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
FLOWCTL_TRACE(t, s, incoming, s->id, -(gpr_int64)t->incoming_frame_size);
t->incoming_window -= t->incoming_frame_size;
s->incoming_window -= t->incoming_frame_size;
+ GPR_ASSERT(s->max_recv_bytes > t->incoming_frame_size);
+ s->max_recv_bytes -= t->incoming_frame_size;
/* if the stream incoming window is getting low, schedule an update */
maybe_join_window_updates(t, s);
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 7f60fdc037..98fcbe752e 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -74,6 +74,12 @@ typedef struct grpc_transport_op {
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
+ /** The number of bytes this peer is currently prepared to receive.
+
+ Bytes offered are used to replenish per-stream flow control windows.
+ Offers are not retractable: if 5 bytes are offered and no bytes are read,
+ a later offer of 3 bytes still implies that 5 have been offered. */
+ gpr_uint32 max_recv_bytes;
void (*on_done_recv)(void *user_data, int success);
void *recv_user_data;
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
index 5c4edb006a..2420a54396 100644
--- a/src/core/transport/transport_op_string.c
+++ b/src/core/transport/transport_op_string.c
@@ -130,7 +130,8 @@ char *grpc_transport_op_string(grpc_transport_op *op) {
if (op->recv_ops) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
- gpr_strvec_add(&b, gpr_strdup("RECV"));
+ gpr_asprintf(&tmp, "RECV:max_recv_bytes=%d", op->max_recv_bytes);
+ gpr_strvec_add(&b, tmp);
}
if (op->bind_pollset) {