aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/byte_buffer_queue.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-01-30 16:56:58 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-01-30 16:56:58 -0800
commitb8a318acd9c2d7c289b3fda532c22d486173d9e7 (patch)
tree18d8a3ab7629bb3df43ee0ede05afd19794ebfa5 /src/core/surface/byte_buffer_queue.c
parent4450db2ac982fbabcbc46c5e39bffd24370f2df4 (diff)
Finish bbq api
Modelled after old pending read queue code.
Diffstat (limited to 'src/core/surface/byte_buffer_queue.c')
-rw-r--r--src/core/surface/byte_buffer_queue.c63
1 files changed, 15 insertions, 48 deletions
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c
index bd5263b2f6..36c082f484 100644
--- a/src/core/surface/byte_buffer_queue.c
+++ b/src/core/surface/byte_buffer_queue.c
@@ -32,69 +32,40 @@
*/
#include "src/core/surface/byte_buffer_queue.h"
+#include <grpc/support/alloc.h>
-#define INITIAL_PENDING_READ_COUNT 4
-
-static void pra_init(pending_read_array *array) {
- array->data = gpr_malloc(sizeof(pending_read) * INITIAL_PENDING_READ_COUNT);
- array->count = 0;
- array->capacity = INITIAL_PENDING_READ_COUNT;
-}
-
-static void pra_destroy(pending_read_array *array,
- size_t finish_starting_from) {
- size_t i;
- for (i = finish_starting_from; i < array->count; i++) {
- array->data[i].on_finish(array->data[i].user_data, GRPC_OP_ERROR);
- }
+static void bba_destroy(grpc_bbq_array *array) {
gpr_free(array->data);
}
/* Append an operation to an array, expanding as needed */
-static void pra_push(pending_read_array *a, grpc_byte_buffer *buffer,
- void (*on_finish)(void *user_data, grpc_op_error error),
- void *user_data) {
+static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) {
if (a->count == a->capacity) {
a->capacity *= 2;
- a->data = gpr_realloc(a->data, sizeof(pending_read) * a->capacity);
+ a->data = gpr_realloc(a->data, sizeof(grpc_byte_buffer*) * a->capacity);
}
- a->data[a->count].byte_buffer = buffer;
- a->data[a->count].user_data = user_data;
- a->data[a->count].on_finish = on_finish;
- a->count++;
-}
-
-static void prq_init(pending_read_queue *q) {
- q->drain_pos = 0;
- pra_init(&q->filling);
- pra_init(&q->draining);
+ a->data[a->count++] = buffer;
}
-static void prq_destroy(pending_read_queue *q) {
- pra_destroy(&q->filling, 0);
- pra_destroy(&q->draining, q->drain_pos);
+void grpc_bbq_destroy(grpc_byte_buffer_queue *q) {
+ bba_destroy(&q->filling);
+ bba_destroy(&q->draining);
}
-static int prq_is_empty(pending_read_queue *q) {
+int grpc_bbq_empty(grpc_byte_buffer_queue *q) {
return (q->drain_pos == q->draining.count && q->filling.count == 0);
}
-static void prq_push(pending_read_queue *q, grpc_byte_buffer *buffer,
- void (*on_finish)(void *user_data, grpc_op_error error),
- void *user_data) {
- pra_push(&q->filling, buffer, on_finish, user_data);
+void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) {
+ bba_push(&q->filling, buffer);
}
-/* Take the first queue element and move it to the completion queue. Do nothing
- if q is empty */
-static int prq_pop_to_cq(pending_read_queue *q, void *tag, grpc_call *call,
- grpc_completion_queue *cq) {
- pending_read_array temp_array;
- pending_read *pr;
+grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
+ grpc_bbq_array temp_array;
if (q->drain_pos == q->draining.count) {
if (q->filling.count == 0) {
- return 0;
+ return NULL;
}
q->draining.count = 0;
q->drain_pos = 0;
@@ -104,9 +75,5 @@ static int prq_pop_to_cq(pending_read_queue *q, void *tag, grpc_call *call,
q->draining = temp_array;
}
- pr = q->draining.data + q->drain_pos;
- q->drain_pos++;
- grpc_cq_end_read(cq, tag, call, pr->on_finish, pr->user_data,
- pr->byte_buffer);
- return 1;
+ return q->draining.data[q->drain_pos++];
}