diff options
author | Craig Tiller <ctiller@google.com> | 2015-04-16 14:46:41 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-04-16 14:46:41 -0700 |
commit | 205aee1c82ab2182f37017d1b70de7c9eefb898b (patch) | |
tree | 245426e21b61c744361c1e807ac7b81eb0b0e6a0 /src/core/transport | |
parent | 87d5b19da6f8ce3104ec1a2bc93533f8455d5ac7 (diff) |
Move metadata batching to stream_op
Diffstat (limited to 'src/core/transport')
-rw-r--r-- | src/core/transport/stream_op.c | 114 | ||||
-rw-r--r-- | src/core/transport/stream_op.h | 39 |
2 files changed, 153 insertions, 0 deletions
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index c30e3a27f1..38b4235931 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -171,3 +171,117 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops); sopb->nops = new_nops; } + + +static void assert_valid_list(grpc_mdelem_list *list) { + grpc_linked_mdelem *l; + + GPR_ASSERT((list->head == NULL) == (list->tail == NULL)); + if (!list->head) return; + GPR_ASSERT(list->head->prev == NULL); + GPR_ASSERT(list->tail->next == NULL); + GPR_ASSERT((list->head == list->tail) == (list->head->next == NULL)); + + for (l = list->head; l; l = l->next) { + GPR_ASSERT((l->prev == NULL) == (l == list->head)); + GPR_ASSERT((l->next == NULL) == (l == list->tail)); + if (l->next) GPR_ASSERT(l->next->prev == l); + if (l->prev) GPR_ASSERT(l->prev->next == l); + } +} + +void grpc_metadata_batch_init(grpc_metadata_batch *comd) { abort(); } + +void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) { abort(); } + +void grpc_metadata_batch_merge(grpc_metadata_batch *target, + grpc_metadata_batch *add) { + abort(); +} + +void grpc_metadata_batch_add_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add) { + storage->md = elem_to_add; + grpc_metadata_batch_link_head(comd, storage); +} + +static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { + assert_valid_list(list); + storage->prev = NULL; + storage->next = list->head; + if (list->head != NULL) { + list->head->prev = storage; + } else { + list->tail = storage; + } + list->head = storage; + assert_valid_list(list); +} + +void grpc_metadata_batch_link_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage) { + link_head(&comd->list, storage); +} + +void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add) { + storage->md = elem_to_add; + grpc_metadata_batch_link_tail(comd, storage); +} + +static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { + assert_valid_list(list); + storage->prev = list->tail; + storage->next = NULL; + if (list->tail != NULL) { + list->tail->next = storage; + } else { + list->head = storage; + } + list->tail = storage; + assert_valid_list(list); +} + +void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage) { + link_tail(&comd->list, storage); +} + +void grpc_metadata_batch_filter(grpc_metadata_batch *comd, + grpc_mdelem *(*filter)(void *user_data, + grpc_mdelem *elem), + void *user_data) { + grpc_linked_mdelem *l; + grpc_linked_mdelem *next; + + assert_valid_list(&comd->list); + assert_valid_list(&comd->garbage); + for (l = comd->list.head; l; l = next) { + grpc_mdelem *orig = l->md; + grpc_mdelem *filt = filter(user_data, orig); + next = l->next; + if (filt == NULL) { + if (l->prev) { + l->prev->next = l->next; + } + if (l->next) { + l->next->prev = l->prev; + } + if (comd->list.head == l) { + comd->list.head = l->next; + } + if (comd->list.tail == l) { + comd->list.tail = l->prev; + } + assert_valid_list(&comd->list); + link_head(&comd->garbage, l); + } else if (filt != orig) { + grpc_mdelem_unref(orig); + l->md = filt; + } + } + assert_valid_list(&comd->list); + assert_valid_list(&comd->garbage); +} diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 2ffbcce87b..81d08b60fb 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -76,6 +76,45 @@ typedef struct grpc_flow_ctl_cb { void *arg; } grpc_flow_ctl_cb; +typedef struct grpc_linked_mdelem { + grpc_mdelem *md; + struct grpc_linked_mdelem *next; + struct grpc_linked_mdelem *prev; +} grpc_linked_mdelem; + +typedef struct grpc_mdelem_list { + grpc_linked_mdelem *head; + grpc_linked_mdelem *tail; +} grpc_mdelem_list; + +typedef struct grpc_metadata_batch { + grpc_mdelem_list list; + grpc_mdelem_list garbage; + gpr_timespec deadline; +} grpc_metadata_batch; + +void grpc_metadata_batch_init(grpc_metadata_batch *comd); +void grpc_metadata_batch_destroy(grpc_metadata_batch *comd); +void grpc_metadata_batch_merge(grpc_metadata_batch *target, + grpc_metadata_batch *add); + +void grpc_metadata_batch_link_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage); +void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage); + +void grpc_metadata_batch_add_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add); +void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add); + +void grpc_metadata_batch_filter(grpc_metadata_batch *comd, + grpc_mdelem *(*filter)(void *user_data, + grpc_mdelem *elem), + void *user_data); + /* Represents a single operation performed on a stream/transport */ typedef struct grpc_stream_op { /* the operation to be applied */ |