aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/byte_buffer_queue.c8
-rw-r--r--src/core/surface/byte_buffer_queue.h2
-rw-r--r--src/core/surface/call.c13
3 files changed, 22 insertions, 1 deletions
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c
index 7c31bfe5da..e47dc4f4ce 100644
--- a/src/core/surface/byte_buffer_queue.c
+++ b/src/core/surface/byte_buffer_queue.c
@@ -62,6 +62,7 @@ int grpc_bbq_empty(grpc_byte_buffer_queue *q) {
}
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) {
+ q->bytes += grpc_byte_buffer_length(buffer);
bba_push(&q->filling, buffer);
}
@@ -72,8 +73,11 @@ void grpc_bbq_flush(grpc_byte_buffer_queue *q) {
}
}
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q) { return q->bytes; }
+
grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
grpc_bbq_array temp_array;
+ grpc_byte_buffer *out;
if (q->drain_pos == q->draining.count) {
if (q->filling.count == 0) {
@@ -87,5 +91,7 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
q->draining = temp_array;
}
- return q->draining.data[q->drain_pos++];
+ out = q->draining.data[q->drain_pos++];
+ q->bytes -= grpc_byte_buffer_length(out);
+ return out;
}
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h
index 32c57f8756..f01958984f 100644
--- a/src/core/surface/byte_buffer_queue.h
+++ b/src/core/surface/byte_buffer_queue.h
@@ -49,6 +49,7 @@ typedef struct {
size_t drain_pos;
grpc_bbq_array filling;
grpc_bbq_array draining;
+ size_t bytes;
} grpc_byte_buffer_queue;
void grpc_bbq_destroy(grpc_byte_buffer_queue *q);
@@ -56,5 +57,6 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q);
void grpc_bbq_flush(grpc_byte_buffer_queue *q);
int grpc_bbq_empty(grpc_byte_buffer_queue *q);
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb);
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q);
#endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 0a551ac47f..71f4235571 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -513,6 +513,8 @@ static void unlock(grpc_call *call) {
int completing_requests = 0;
int start_op = 0;
int i;
+ const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536;
+ size_t buffered_bytes;
int cancel_alarm = 0;
memset(&op, 0, sizeof(op));
@@ -528,6 +530,17 @@ static void unlock(grpc_call *call) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
op.on_done_recv = &call->on_done_recv;
+ if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
+ op.max_recv_bytes = call->incoming_message_length -
+ call->incoming_message.length + MAX_RECV_PEEK_AHEAD;
+ } else {
+ buffered_bytes = grpc_bbq_bytes(&call->incoming_queue);
+ if (buffered_bytes > MAX_RECV_PEEK_AHEAD) {
+ op.max_recv_bytes = 0;
+ } else {
+ op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes;
+ }
+ }
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
start_op = 1;