diff options
Diffstat (limited to 'src/core/transport/stream_op.c')
-rw-r--r-- | src/core/transport/stream_op.c | 186 |
1 files changed, 155 insertions, 31 deletions
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index c30e3a27f1..e1a75adcb6 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -33,11 +33,11 @@ #include "src/core/transport/stream_op.h" +#include <string.h> + #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <string.h> - /* Exponential growth function: Given x, return a larger x. Currently we grow by 1.5 times upon reallocation. */ #define GROW(x) (3 * (x) / 2) @@ -79,14 +79,9 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) { gpr_slice_unref(ops[i].data.slice); break; case GRPC_OP_METADATA: - grpc_mdelem_unref(ops[i].data.metadata); - break; - case GRPC_OP_FLOW_CTL_CB: - ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR); + grpc_metadata_batch_destroy(&ops[i].data.metadata); break; case GRPC_NO_OP: - case GRPC_OP_DEADLINE: - case GRPC_OP_METADATA_BOUNDARY: case GRPC_OP_BEGIN_MESSAGE: break; } @@ -106,6 +101,7 @@ static void expandto(grpc_stream_op_buffer *sopb, size_t new_capacity) { static grpc_stream_op *add(grpc_stream_op_buffer *sopb) { grpc_stream_op *out; + GPR_ASSERT(sopb->nops <= sopb->capacity); if (sopb->nops == sopb->capacity) { expandto(sopb, GROW(sopb->capacity)); } @@ -126,22 +122,11 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, op->data.begin_message.flags = flags; } -void grpc_sopb_add_metadata_boundary(grpc_stream_op_buffer *sopb) { - grpc_stream_op *op = add(sopb); - op->type = GRPC_OP_METADATA_BOUNDARY; -} - -void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_mdelem *md) { +void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, + grpc_metadata_batch b) { grpc_stream_op *op = add(sopb); op->type = GRPC_OP_METADATA; - op->data.metadata = md; -} - -void grpc_sopb_add_deadline(grpc_stream_op_buffer *sopb, - gpr_timespec deadline) { - grpc_stream_op *op = add(sopb); - op->type = GRPC_OP_DEADLINE; - op->data.deadline = deadline; + op->data.metadata = b; } void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) { @@ -150,15 +135,6 @@ void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) { op->data.slice = slice; } -void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb, - void (*cb)(void *arg, grpc_op_error error), - void *arg) { - grpc_stream_op *op = add(sopb); - op->type = GRPC_OP_FLOW_CTL_CB; - op->data.flow_ctl_cb.cb = cb; - op->data.flow_ctl_cb.arg = arg; -} - void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, size_t nops) { size_t orig_nops = sopb->nops; @@ -171,3 +147,151 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops); sopb->nops = new_nops; } + +static void assert_valid_list(grpc_mdelem_list *list) { +#ifndef NDEBUG + grpc_linked_mdelem *l; + + GPR_ASSERT((list->head == NULL) == (list->tail == NULL)); + if (!list->head) return; + GPR_ASSERT(list->head->prev == NULL); + GPR_ASSERT(list->tail->next == NULL); + GPR_ASSERT((list->head == list->tail) == (list->head->next == NULL)); + + for (l = list->head; l; l = l->next) { + GPR_ASSERT(l->md); + GPR_ASSERT((l->prev == NULL) == (l == list->head)); + GPR_ASSERT((l->next == NULL) == (l == list->tail)); + if (l->next) GPR_ASSERT(l->next->prev == l); + if (l->prev) GPR_ASSERT(l->prev->next == l); + } +#endif /* NDEBUG */ +} + +#ifndef NDEBUG +void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd) { + assert_valid_list(&comd->list); + assert_valid_list(&comd->garbage); +} +#endif /* NDEBUG */ + +void grpc_metadata_batch_init(grpc_metadata_batch *comd) { + comd->list.head = comd->list.tail = comd->garbage.head = comd->garbage.tail = + NULL; + comd->deadline = gpr_inf_future; +} + +void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) { + grpc_linked_mdelem *l; + for (l = comd->list.head; l; l = l->next) { + grpc_mdelem_unref(l->md); + } + for (l = comd->garbage.head; l; l = l->next) { + grpc_mdelem_unref(l->md); + } +} + +void grpc_metadata_batch_add_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add) { + GPR_ASSERT(elem_to_add); + storage->md = elem_to_add; + grpc_metadata_batch_link_head(comd, storage); +} + +static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { + assert_valid_list(list); + GPR_ASSERT(storage->md); + storage->prev = NULL; + storage->next = list->head; + if (list->head != NULL) { + list->head->prev = storage; + } else { + list->tail = storage; + } + list->head = storage; + assert_valid_list(list); +} + +void grpc_metadata_batch_link_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage) { + link_head(&comd->list, storage); +} + +void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add) { + GPR_ASSERT(elem_to_add); + storage->md = elem_to_add; + grpc_metadata_batch_link_tail(comd, storage); +} + +static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { + assert_valid_list(list); + GPR_ASSERT(storage->md); + storage->prev = list->tail; + storage->next = NULL; + if (list->tail != NULL) { + list->tail->next = storage; + } else { + list->head = storage; + } + list->tail = storage; + assert_valid_list(list); +} + +void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage) { + link_tail(&comd->list, storage); +} + +void grpc_metadata_batch_merge(grpc_metadata_batch *target, + grpc_metadata_batch *add) { + grpc_linked_mdelem *l; + grpc_linked_mdelem *next; + for (l = add->list.head; l; l = next) { + next = l->next; + link_tail(&target->list, l); + } + for (l = add->garbage.head; l; l = next) { + next = l->next; + link_tail(&target->garbage, l); + } +} + +void grpc_metadata_batch_filter(grpc_metadata_batch *comd, + grpc_mdelem *(*filter)(void *user_data, + grpc_mdelem *elem), + void *user_data) { + grpc_linked_mdelem *l; + grpc_linked_mdelem *next; + + assert_valid_list(&comd->list); + assert_valid_list(&comd->garbage); + for (l = comd->list.head; l; l = next) { + grpc_mdelem *orig = l->md; + grpc_mdelem *filt = filter(user_data, orig); + next = l->next; + if (filt == NULL) { + if (l->prev) { + l->prev->next = l->next; + } + if (l->next) { + l->next->prev = l->prev; + } + if (comd->list.head == l) { + comd->list.head = l->next; + } + if (comd->list.tail == l) { + comd->list.tail = l->prev; + } + assert_valid_list(&comd->list); + link_head(&comd->garbage, l); + } else if (filt != orig) { + grpc_mdelem_unref(orig); + l->md = filt; + } + } + assert_valid_list(&comd->list); + assert_valid_list(&comd->garbage); +} |