diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/byte_buffer_queue.c | 8 | ||||
-rw-r--r-- | src/core/surface/byte_buffer_queue.h | 2 | ||||
-rw-r--r-- | src/core/surface/call.c | 13 |
3 files changed, 22 insertions, 1 deletions
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c index 7c31bfe5da..e47dc4f4ce 100644 --- a/src/core/surface/byte_buffer_queue.c +++ b/src/core/surface/byte_buffer_queue.c @@ -62,6 +62,7 @@ int grpc_bbq_empty(grpc_byte_buffer_queue *q) { } void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) { + q->bytes += grpc_byte_buffer_length(buffer); bba_push(&q->filling, buffer); } @@ -72,8 +73,11 @@ void grpc_bbq_flush(grpc_byte_buffer_queue *q) { } } +size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q) { return q->bytes; } + grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { grpc_bbq_array temp_array; + grpc_byte_buffer *out; if (q->drain_pos == q->draining.count) { if (q->filling.count == 0) { @@ -87,5 +91,7 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { q->draining = temp_array; } - return q->draining.data[q->drain_pos++]; + out = q->draining.data[q->drain_pos++]; + q->bytes -= grpc_byte_buffer_length(out); + return out; } diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h index 32c57f8756..f01958984f 100644 --- a/src/core/surface/byte_buffer_queue.h +++ b/src/core/surface/byte_buffer_queue.h @@ -49,6 +49,7 @@ typedef struct { size_t drain_pos; grpc_bbq_array filling; grpc_bbq_array draining; + size_t bytes; } grpc_byte_buffer_queue; void grpc_bbq_destroy(grpc_byte_buffer_queue *q); @@ -56,5 +57,6 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q); void grpc_bbq_flush(grpc_byte_buffer_queue *q); int grpc_bbq_empty(grpc_byte_buffer_queue *q); void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb); +size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q); #endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 0a551ac47f..71f4235571 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -513,6 +513,8 @@ static void unlock(grpc_call *call) { int completing_requests = 0; int start_op = 0; int i; + const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536; + size_t buffered_bytes; int cancel_alarm = 0; memset(&op, 0, sizeof(op)); @@ -528,6 +530,17 @@ static void unlock(grpc_call *call) { op.recv_ops = &call->recv_ops; op.recv_state = &call->recv_state; op.on_done_recv = &call->on_done_recv; + if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) { + op.max_recv_bytes = call->incoming_message_length - + call->incoming_message.length + MAX_RECV_PEEK_AHEAD; + } else { + buffered_bytes = grpc_bbq_bytes(&call->incoming_queue); + if (buffered_bytes > MAX_RECV_PEEK_AHEAD) { + op.max_recv_bytes = 0; + } else { + op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes; + } + } call->receiving = 1; GRPC_CALL_INTERNAL_REF(call, "receiving"); start_op = 1; |