aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-17 14:36:45 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-17 14:36:45 -0700
commitaf26143b6fc76559f0907aadfc9f23c4e2e60844 (patch)
tree2805cc3317954094eb8baf8a30898b1a93bbc00f /src/core/surface
parent232e04b6199090b5805b3e7148fcfce3b8ef9956 (diff)
parentfea28b79fb0a211b2ea8515dab25db19099ea595 (diff)
Merge github.com:grpc/grpc into i-want-to-wait-free
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/byte_buffer_queue.c8
-rw-r--r--src/core/surface/byte_buffer_queue.h2
-rw-r--r--src/core/surface/call.c13
-rw-r--r--src/core/surface/completion_queue.c6
4 files changed, 25 insertions, 4 deletions
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c
index 7c31bfe5da..e47dc4f4ce 100644
--- a/src/core/surface/byte_buffer_queue.c
+++ b/src/core/surface/byte_buffer_queue.c
@@ -62,6 +62,7 @@ int grpc_bbq_empty(grpc_byte_buffer_queue *q) {
}
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) {
+ q->bytes += grpc_byte_buffer_length(buffer);
bba_push(&q->filling, buffer);
}
@@ -72,8 +73,11 @@ void grpc_bbq_flush(grpc_byte_buffer_queue *q) {
}
}
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q) { return q->bytes; }
+
grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
grpc_bbq_array temp_array;
+ grpc_byte_buffer *out;
if (q->drain_pos == q->draining.count) {
if (q->filling.count == 0) {
@@ -87,5 +91,7 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
q->draining = temp_array;
}
- return q->draining.data[q->drain_pos++];
+ out = q->draining.data[q->drain_pos++];
+ q->bytes -= grpc_byte_buffer_length(out);
+ return out;
}
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h
index 32c57f8756..f01958984f 100644
--- a/src/core/surface/byte_buffer_queue.h
+++ b/src/core/surface/byte_buffer_queue.h
@@ -49,6 +49,7 @@ typedef struct {
size_t drain_pos;
grpc_bbq_array filling;
grpc_bbq_array draining;
+ size_t bytes;
} grpc_byte_buffer_queue;
void grpc_bbq_destroy(grpc_byte_buffer_queue *q);
@@ -56,5 +57,6 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q);
void grpc_bbq_flush(grpc_byte_buffer_queue *q);
int grpc_bbq_empty(grpc_byte_buffer_queue *q);
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb);
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q);
#endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 0a551ac47f..71f4235571 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -513,6 +513,8 @@ static void unlock(grpc_call *call) {
int completing_requests = 0;
int start_op = 0;
int i;
+ const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536;
+ size_t buffered_bytes;
int cancel_alarm = 0;
memset(&op, 0, sizeof(op));
@@ -528,6 +530,17 @@ static void unlock(grpc_call *call) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
op.on_done_recv = &call->on_done_recv;
+ if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
+ op.max_recv_bytes = call->incoming_message_length -
+ call->incoming_message.length + MAX_RECV_PEEK_AHEAD;
+ } else {
+ buffered_bytes = grpc_bbq_bytes(&call->incoming_queue);
+ if (buffered_bytes > MAX_RECV_PEEK_AHEAD) {
+ op.max_recv_bytes = 0;
+ } else {
+ op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes;
+ }
+ }
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
start_op = 1;
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index c67f75fc5c..f3630b31dd 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -116,7 +116,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) {
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
void (*done)(void *done_arg, grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
- int shutdown = gpr_unref(&cc->pending_events);
+ int shutdown;
storage->tag = tag;
storage->done = done;
@@ -124,15 +124,15 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
storage->next =
((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ shutdown = gpr_unref(&cc->pending_events);
if (!shutdown) {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
cc->completed_tail->next =
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;
grpc_pollset_kick(&cc->pollset);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
} else {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
cc->completed_tail->next =
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;