diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-01-30 16:56:58 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-01-30 16:56:58 -0800 |
commit | b8a318acd9c2d7c289b3fda532c22d486173d9e7 (patch) | |
tree | 18d8a3ab7629bb3df43ee0ede05afd19794ebfa5 /src/core/surface/byte_buffer_queue.c | |
parent | 4450db2ac982fbabcbc46c5e39bffd24370f2df4 (diff) |
Finish bbq api
Modelled after old pending read queue code.
Diffstat (limited to 'src/core/surface/byte_buffer_queue.c')
-rw-r--r-- | src/core/surface/byte_buffer_queue.c | 63 |
1 files changed, 15 insertions, 48 deletions
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c index bd5263b2f6..36c082f484 100644 --- a/src/core/surface/byte_buffer_queue.c +++ b/src/core/surface/byte_buffer_queue.c @@ -32,69 +32,40 @@ */ #include "src/core/surface/byte_buffer_queue.h" +#include <grpc/support/alloc.h> -#define INITIAL_PENDING_READ_COUNT 4 - -static void pra_init(pending_read_array *array) { - array->data = gpr_malloc(sizeof(pending_read) * INITIAL_PENDING_READ_COUNT); - array->count = 0; - array->capacity = INITIAL_PENDING_READ_COUNT; -} - -static void pra_destroy(pending_read_array *array, - size_t finish_starting_from) { - size_t i; - for (i = finish_starting_from; i < array->count; i++) { - array->data[i].on_finish(array->data[i].user_data, GRPC_OP_ERROR); - } +static void bba_destroy(grpc_bbq_array *array) { gpr_free(array->data); } /* Append an operation to an array, expanding as needed */ -static void pra_push(pending_read_array *a, grpc_byte_buffer *buffer, - void (*on_finish)(void *user_data, grpc_op_error error), - void *user_data) { +static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) { if (a->count == a->capacity) { a->capacity *= 2; - a->data = gpr_realloc(a->data, sizeof(pending_read) * a->capacity); + a->data = gpr_realloc(a->data, sizeof(grpc_byte_buffer*) * a->capacity); } - a->data[a->count].byte_buffer = buffer; - a->data[a->count].user_data = user_data; - a->data[a->count].on_finish = on_finish; - a->count++; -} - -static void prq_init(pending_read_queue *q) { - q->drain_pos = 0; - pra_init(&q->filling); - pra_init(&q->draining); + a->data[a->count++] = buffer; } -static void prq_destroy(pending_read_queue *q) { - pra_destroy(&q->filling, 0); - pra_destroy(&q->draining, q->drain_pos); +void grpc_bbq_destroy(grpc_byte_buffer_queue *q) { + bba_destroy(&q->filling); + bba_destroy(&q->draining); } -static int prq_is_empty(pending_read_queue *q) { +int grpc_bbq_empty(grpc_byte_buffer_queue *q) { return (q->drain_pos == q->draining.count && q->filling.count == 0); } -static void prq_push(pending_read_queue *q, grpc_byte_buffer *buffer, - void (*on_finish)(void *user_data, grpc_op_error error), - void *user_data) { - pra_push(&q->filling, buffer, on_finish, user_data); +void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) { + bba_push(&q->filling, buffer); } -/* Take the first queue element and move it to the completion queue. Do nothing - if q is empty */ -static int prq_pop_to_cq(pending_read_queue *q, void *tag, grpc_call *call, - grpc_completion_queue *cq) { - pending_read_array temp_array; - pending_read *pr; +grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { + grpc_bbq_array temp_array; if (q->drain_pos == q->draining.count) { if (q->filling.count == 0) { - return 0; + return NULL; } q->draining.count = 0; q->drain_pos = 0; @@ -104,9 +75,5 @@ static int prq_pop_to_cq(pending_read_queue *q, void *tag, grpc_call *call, q->draining = temp_array; } - pr = q->draining.data + q->drain_pos; - q->drain_pos++; - grpc_cq_end_read(cq, tag, call, pr->on_finish, pr->user_data, - pr->byte_buffer); - return 1; + return q->draining.data[q->drain_pos++]; } |