diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/surface/byte_buffer_queue.c | 8 | ||||
-rw-r--r-- | src/core/surface/byte_buffer_queue.h | 2 | ||||
-rw-r--r-- | src/core/surface/call.c | 13 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 43 | ||||
-rw-r--r-- | src/core/transport/transport.h | 6 | ||||
-rw-r--r-- | src/core/transport/transport_op_string.c | 3 |
6 files changed, 61 insertions, 14 deletions
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c index 7c31bfe5da..e47dc4f4ce 100644 --- a/src/core/surface/byte_buffer_queue.c +++ b/src/core/surface/byte_buffer_queue.c @@ -62,6 +62,7 @@ int grpc_bbq_empty(grpc_byte_buffer_queue *q) { } void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) { + q->bytes += grpc_byte_buffer_length(buffer); bba_push(&q->filling, buffer); } @@ -72,8 +73,11 @@ void grpc_bbq_flush(grpc_byte_buffer_queue *q) { } } +size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q) { return q->bytes; } + grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { grpc_bbq_array temp_array; + grpc_byte_buffer *out; if (q->drain_pos == q->draining.count) { if (q->filling.count == 0) { @@ -87,5 +91,7 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { q->draining = temp_array; } - return q->draining.data[q->drain_pos++]; + out = q->draining.data[q->drain_pos++]; + q->bytes -= grpc_byte_buffer_length(out); + return out; } diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h index 32c57f8756..f01958984f 100644 --- a/src/core/surface/byte_buffer_queue.h +++ b/src/core/surface/byte_buffer_queue.h @@ -49,6 +49,7 @@ typedef struct { size_t drain_pos; grpc_bbq_array filling; grpc_bbq_array draining; + size_t bytes; } grpc_byte_buffer_queue; void grpc_bbq_destroy(grpc_byte_buffer_queue *q); @@ -56,5 +57,6 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q); void grpc_bbq_flush(grpc_byte_buffer_queue *q); int grpc_bbq_empty(grpc_byte_buffer_queue *q); void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb); +size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q); #endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index cf0a595147..9d8913f5b0 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -449,6 +449,8 @@ static void unlock(grpc_call *call) { int completing_requests = 0; int start_op = 0; int i; + const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536; + size_t buffered_bytes; memset(&op, 0, sizeof(op)); @@ -461,6 +463,17 @@ static void unlock(grpc_call *call) { op.recv_state = &call->recv_state; op.on_done_recv = call_on_done_recv; op.recv_user_data = call; + if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) { + op.max_recv_bytes = call->incoming_message_length - + call->incoming_message.length + MAX_RECV_PEEK_AHEAD; + } else { + buffered_bytes = grpc_bbq_bytes(&call->incoming_queue); + if (buffered_bytes > MAX_RECV_PEEK_AHEAD) { + op.max_recv_bytes = 0; + } else { + op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes; + } + } call->receiving = 1; GRPC_CALL_INTERNAL_REF(call, "receiving"); start_op = 1; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 1cd1dc822d..e10bb6a13e 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -317,6 +317,18 @@ struct transport { struct stream { gpr_uint32 id; + /** The number of bytes the upper layers have offered to receive. + As the upper layer offers more bytes, this value increases. + As bytes are read, this value decreases. */ + gpr_uint32 max_recv_bytes; + /** The number of bytes the upper layer has offered to read but we have + not yet announced to HTTP2 flow control. + As the upper layers offer to read more bytes, this value increases. + As we advertise incoming flow control window, this value decreases. */ + gpr_uint32 unannounced_incoming_window; + /** The number of bytes of HTTP2 flow control we have advertised. + As we advertise incoming flow control window, this value increases. + As bytes are read, this value decreases. */ gpr_uint32 incoming_window; gpr_int64 outgoing_window; /* when the application requests writes be closed, the write_closed is @@ -662,7 +674,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, s->id = (gpr_uint32)(gpr_uintptr)server_data; s->outgoing_window = t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->incoming_window = + s->max_recv_bytes = s->incoming_window = t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; t->incoming_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); @@ -978,14 +990,13 @@ static int prepare_write(transport *t) { /* for each stream that wants to update its window, add that window here */ while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) { - window_delta = - t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - - s->incoming_window; - if (!s->read_closed && window_delta) { - gpr_slice_buffer_add( - &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta)); - FLOWCTL_TRACE(t, s, incoming, s->id, window_delta); - s->incoming_window += window_delta; + if (!s->read_closed && s->unannounced_incoming_window > 0) { + gpr_slice_buffer_add(&t->outbuf, + grpc_chttp2_window_update_create( + s->id, s->unannounced_incoming_window)); + FLOWCTL_TRACE(t, s, incoming, s->id, s->unannounced_incoming_window); + s->incoming_window += s->unannounced_incoming_window; + s->unannounced_incoming_window = 0; } } @@ -1112,8 +1123,10 @@ static void maybe_start_some_streams(transport *t) { t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->incoming_window = t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + s->max_recv_bytes = GPR_MAX(s->incoming_window, s->max_recv_bytes); grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); stream_list_join(t, s, WRITABLE); + maybe_join_window_updates(t, s); } /* cancel out streams that will never be started */ while (t->next_stream_id > MAX_CLIENT_STREAM_ID) { @@ -1167,6 +1180,10 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { s->incoming_sopb = op->recv_ops; s->incoming_sopb->nops = 0; s->publish_state = op->recv_state; + if (s->max_recv_bytes < op->max_recv_bytes) { + s->unannounced_incoming_window += op->max_recv_bytes - s->max_recv_bytes; + s->max_recv_bytes = op->max_recv_bytes; + } gpr_free(s->old_incoming_metadata); s->old_incoming_metadata = NULL; maybe_finish_read(t, s); @@ -1351,10 +1368,10 @@ static void maybe_finish_read(transport *t, stream *s) { static void maybe_join_window_updates(transport *t, stream *s) { if (s->incoming_sopb != NULL && - s->incoming_window < + s->unannounced_incoming_window > t->settings[LOCAL_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] * - 3 / 4) { + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] / + 4) { stream_list_join(t, s, WINDOW_UPDATE); } } @@ -1376,6 +1393,8 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) { FLOWCTL_TRACE(t, s, incoming, s->id, -(gpr_int64)t->incoming_frame_size); t->incoming_window -= t->incoming_frame_size; s->incoming_window -= t->incoming_frame_size; + GPR_ASSERT(s->max_recv_bytes > t->incoming_frame_size); + s->max_recv_bytes -= t->incoming_frame_size; /* if the stream incoming window is getting low, schedule an update */ maybe_join_window_updates(t, s); diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 7f60fdc037..98fcbe752e 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -74,6 +74,12 @@ typedef struct grpc_transport_op { grpc_stream_op_buffer *recv_ops; grpc_stream_state *recv_state; + /** The number of bytes this peer is currently prepared to receive. + + Bytes offered are used to replenish per-stream flow control windows. + Offers are not retractable: if 5 bytes are offered and no bytes are read, + a later offer of 3 bytes still implies that 5 have been offered. */ + gpr_uint32 max_recv_bytes; void (*on_done_recv)(void *user_data, int success); void *recv_user_data; diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 5c4edb006a..2420a54396 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -130,7 +130,8 @@ char *grpc_transport_op_string(grpc_transport_op *op) { if (op->recv_ops) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_strvec_add(&b, gpr_strdup("RECV")); + gpr_asprintf(&tmp, "RECV:max_recv_bytes=%d", op->max_recv_bytes); + gpr_strvec_add(&b, tmp); } if (op->bind_pollset) { |