diff options
-rw-r--r-- | src/core/channel/call_op_string.c | 2 | ||||
-rw-r--r-- | src/core/channel/channel_stack.c | 113 | ||||
-rw-r--r-- | src/core/channel/channel_stack.h | 41 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 8 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 10 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 4 | ||||
-rw-r--r-- | src/core/security/auth.c | 2 | ||||
-rw-r--r-- | src/core/surface/call.c | 8 | ||||
-rw-r--r-- | src/core/surface/call.h | 2 | ||||
-rw-r--r-- | src/core/surface/server.c | 2 | ||||
-rw-r--r-- | src/core/transport/stream_op.c | 114 | ||||
-rw-r--r-- | src/core/transport/stream_op.h | 39 |
12 files changed, 173 insertions, 172 deletions
diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c index 534ee29bac..3e30d822f4 100644 --- a/src/core/channel/call_op_string.c +++ b/src/core/channel/call_op_string.c @@ -53,7 +53,7 @@ static void put_metadata(gpr_strvec *b, grpc_mdelem *md) { GPR_HEXDUMP_PLAINTEXT)); } -static void put_metadata_list(gpr_strvec *b, grpc_call_op_metadata md) { +static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { grpc_linked_mdelem *m; for (m = md.list.head; m; m = m->next) { put_metadata(b, m->md); diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 7fc492fcea..cd3496b42b 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -229,116 +229,3 @@ void grpc_call_element_recv_status(grpc_call_element *cur_elem, const char *message) { abort(); } - -static void assert_valid_list(grpc_mdelem_list *list) { - grpc_linked_mdelem *l; - - GPR_ASSERT((list->head == NULL) == (list->tail == NULL)); - if (!list->head) return; - GPR_ASSERT(list->head->prev == NULL); - GPR_ASSERT(list->tail->next == NULL); - GPR_ASSERT((list->head == list->tail) == (list->head->next == NULL)); - - for (l = list->head; l; l = l->next) { - GPR_ASSERT((l->prev == NULL) == (l == list->head)); - GPR_ASSERT((l->next == NULL) == (l == list->tail)); - if (l->next) GPR_ASSERT(l->next->prev == l); - if (l->prev) GPR_ASSERT(l->prev->next == l); - } -} - -void grpc_call_op_metadata_init(grpc_call_op_metadata *comd) { abort(); } - -void grpc_call_op_metadata_destroy(grpc_call_op_metadata *comd) { abort(); } - -void grpc_call_op_metadata_merge(grpc_call_op_metadata *target, - grpc_call_op_metadata *add) { - abort(); -} - -void grpc_call_op_metadata_add_head(grpc_call_op_metadata *comd, - grpc_linked_mdelem *storage, - grpc_mdelem *elem_to_add) { - storage->md = elem_to_add; - grpc_call_op_metadata_link_head(comd, storage); -} - -static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { - assert_valid_list(list); - storage->prev = NULL; - storage->next = list->head; - if (list->head != NULL) { - list->head->prev = storage; - } else { - list->tail = storage; - } - list->head = storage; - assert_valid_list(list); -} - -void grpc_call_op_metadata_link_head(grpc_call_op_metadata *comd, - grpc_linked_mdelem *storage) { - link_head(&comd->list, storage); -} - -void grpc_call_op_metadata_add_tail(grpc_call_op_metadata *comd, - grpc_linked_mdelem *storage, - grpc_mdelem *elem_to_add) { - storage->md = elem_to_add; - grpc_call_op_metadata_link_tail(comd, storage); -} - -static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { - assert_valid_list(list); - storage->prev = list->tail; - storage->next = NULL; - if (list->tail != NULL) { - list->tail->next = storage; - } else { - list->head = storage; - } - list->tail = storage; - assert_valid_list(list); -} - -void grpc_call_op_metadata_link_tail(grpc_call_op_metadata *comd, - grpc_linked_mdelem *storage) { - link_tail(&comd->list, storage); -} - -void grpc_call_op_metadata_filter(grpc_call_op_metadata *comd, - grpc_mdelem *(*filter)(void *user_data, - grpc_mdelem *elem), - void *user_data) { - grpc_linked_mdelem *l; - grpc_linked_mdelem *next; - - assert_valid_list(&comd->list); - assert_valid_list(&comd->garbage); - for (l = comd->list.head; l; l = next) { - grpc_mdelem *orig = l->md; - grpc_mdelem *filt = filter(user_data, orig); - next = l->next; - if (filt == NULL) { - if (l->prev) { - l->prev->next = l->next; - } - if (l->next) { - l->next->prev = l->prev; - } - if (comd->list.head == l) { - comd->list.head = l->next; - } - if (comd->list.tail == l) { - comd->list.tail = l->prev; - } - assert_valid_list(&comd->list); - link_head(&comd->garbage, l); - } else if (filt != orig) { - grpc_mdelem_unref(orig); - l->md = filt; - } - } - assert_valid_list(&comd->list); - assert_valid_list(&comd->garbage); -} diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 4cd73ab6bf..cc1eab7cf2 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -89,45 +89,6 @@ typedef enum { or decrement a pointer to find the next element to call */ typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir; -typedef struct grpc_linked_mdelem { - grpc_mdelem *md; - struct grpc_linked_mdelem *next; - struct grpc_linked_mdelem *prev; -} grpc_linked_mdelem; - -typedef struct grpc_mdelem_list { - grpc_linked_mdelem *head; - grpc_linked_mdelem *tail; -} grpc_mdelem_list; - -typedef struct grpc_call_op_metadata { - grpc_mdelem_list list; - grpc_mdelem_list garbage; - gpr_timespec deadline; -} grpc_call_op_metadata; - -void grpc_call_op_metadata_init(grpc_call_op_metadata *comd); -void grpc_call_op_metadata_destroy(grpc_call_op_metadata *comd); -void grpc_call_op_metadata_merge(grpc_call_op_metadata *target, - grpc_call_op_metadata *add); - -void grpc_call_op_metadata_link_head(grpc_call_op_metadata *comd, - grpc_linked_mdelem *storage); -void grpc_call_op_metadata_link_tail(grpc_call_op_metadata *comd, - grpc_linked_mdelem *storage); - -void grpc_call_op_metadata_add_head(grpc_call_op_metadata *comd, - grpc_linked_mdelem *storage, - grpc_mdelem *elem_to_add); -void grpc_call_op_metadata_add_tail(grpc_call_op_metadata *comd, - grpc_linked_mdelem *storage, - grpc_mdelem *elem_to_add); - -void grpc_call_op_metadata_filter(grpc_call_op_metadata *comd, - grpc_mdelem *(*filter)(void *user_data, - grpc_mdelem *elem), - void *user_data); - /* A single filterable operation to be performed on a call */ typedef struct { /* The type of operation we're performing */ @@ -146,7 +107,7 @@ typedef struct { grpc_pollset *pollset; } start; grpc_byte_buffer *message; - grpc_call_op_metadata metadata; + grpc_metadata_batch metadata; } data; /* Must be called when processing of this call-op is complete. diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index e66c5bf366..018228b4e6 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -83,7 +83,7 @@ struct call_data { grpc_call_element *elem; call_state state; - grpc_call_op_metadata pending_metadata; + grpc_metadata_batch pending_metadata; gpr_uint32 pending_metadata_flags; gpr_timespec deadline; union { @@ -257,7 +257,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, switch (op->type) { case GRPC_SEND_METADATA: - grpc_call_op_metadata_merge(&calld->pending_metadata, &op->data.metadata); + grpc_metadata_batch_merge(&calld->pending_metadata, &op->data.metadata); op->done_cb(op->user_data, GRPC_OP_OK); break; case GRPC_SEND_START: @@ -383,7 +383,7 @@ static void init_call_elem(grpc_call_element *elem, calld->deadline = gpr_inf_future; calld->s.waiting.on_complete = error_bad_on_complete; calld->s.waiting.on_complete_user_data = NULL; - grpc_call_op_metadata_init(&calld->pending_metadata); + grpc_metadata_batch_init(&calld->pending_metadata); } /* Destructor for call_data */ @@ -391,7 +391,7 @@ static void destroy_call_elem(grpc_call_element *elem) { call_data *calld = elem->call_data; /* if the metadata buffer is not flushed, destroy it here. */ - grpc_call_op_metadata_destroy(&calld->pending_metadata); + grpc_metadata_batch_destroy(&calld->pending_metadata); /* if the call got activated, we need to destroy the child stack also, and remove it from the in-flight requests tracked by the child_entry we picked */ diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index fd3a15d9df..bc014b15ff 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -80,18 +80,18 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, case GRPC_SEND_METADATA: /* Send : prefixed headers, which have to be before any application * layer headers. */ - grpc_call_op_metadata_add_head(&op->data.metadata, &calld->method, + grpc_metadata_batch_add_head(&op->data.metadata, &calld->method, grpc_mdelem_ref(channeld->method)); - grpc_call_op_metadata_add_head(&op->data.metadata, &calld->scheme, + grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme, grpc_mdelem_ref(channeld->scheme)); - grpc_call_op_metadata_add_tail(&op->data.metadata, &calld->te_trailers, + grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers, grpc_mdelem_ref(channeld->te_trailers)); - grpc_call_op_metadata_add_tail(&op->data.metadata, &calld->content_type, + grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, grpc_mdelem_ref(channeld->content_type)); grpc_call_next_op(elem, op); break; case GRPC_RECV_METADATA: - grpc_call_op_metadata_filter(&op->data.metadata, client_filter, elem); + grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem); grpc_call_next_op(elem, op); break; default: diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 432b41908a..0e50f408b5 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -158,7 +158,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, switch (op->type) { case GRPC_RECV_METADATA: - grpc_call_op_metadata_filter(&op->data.metadata, server_filter, elem); + grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); if (!calld->got_initial_metadata) { calld->got_initial_metadata = 1; /* Have we seen the required http2 transport headers? @@ -188,7 +188,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, case GRPC_SEND_METADATA: /* If we haven't sent status 200 yet, we need to so so because it needs to come before any non : prefixed metadata. */ - grpc_call_op_metadata_add_head(&op->data.metadata, &calld->status, + grpc_metadata_batch_add_head(&op->data.metadata, &calld->status, grpc_mdelem_ref(channeld->status_ok)); grpc_call_next_op(elem, op); break; diff --git a/src/core/security/auth.c b/src/core/security/auth.c index 6c87aab737..f61287ad3f 100644 --- a/src/core/security/auth.c +++ b/src/core/security/auth.c @@ -79,7 +79,7 @@ static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems, size_t i; GPR_ASSERT(num_md <= MAX_CREDENTIAL_METADATA_COUNT); for (i = 0; i < num_md; i++) { - grpc_call_op_metadata_add_tail(&op.data.metadata, &calld->md_links[i], + grpc_metadata_batch_add_tail(&op.data.metadata, &calld->md_links[i], grpc_mdelem_ref(md_elems[i])); } grpc_call_next_op(elem, &op); diff --git a/src/core/surface/call.c b/src/core/surface/call.c index bd3374297e..5facaa503d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -657,7 +657,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; op.data.metadata.deadline = call->send_deadline; for (i = 0; i < call->send_initial_metadata_count; i++) { - grpc_call_op_metadata_link_head(&op.data.metadata, + grpc_metadata_batch_link_head(&op.data.metadata, &call->send_initial_metadata[i]); } op.done_cb = do_nothing; @@ -698,14 +698,14 @@ static void enact_send_action(grpc_call *call, send_action sa) { /* TODO(ctiller): cache common status values */ data = call->request_data[GRPC_IOREQ_SEND_STATUS]; gpr_ltoa(data.send_status.code, status_str); - grpc_call_op_metadata_add_tail( + grpc_metadata_batch_add_tail( &op.data.metadata, &call->status_link, grpc_mdelem_from_metadata_strings( call->metadata_context, grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, status_str))); if (data.send_status.details) { - grpc_call_op_metadata_add_tail( + grpc_metadata_batch_add_tail( &op.data.metadata, &call->details_link, grpc_mdelem_from_metadata_strings( call->metadata_context, @@ -984,7 +984,7 @@ void grpc_call_recv_message(grpc_call_element *elem, } int grpc_call_recv_metadata(grpc_call_element *elem, - grpc_call_op_metadata *md) { + grpc_metadata_batch *md) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); grpc_linked_mdelem *l; grpc_metadata_array *dest; diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 93c5e8e3f3..2848ccc5a1 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -100,7 +100,7 @@ void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); the completion queue/surface layer */ /* receive metadata - returns 1 if this was initial metadata */ int grpc_call_recv_metadata(grpc_call_element *surface_element, - grpc_call_op_metadata *md); + grpc_metadata_batch *md); void grpc_call_recv_message(grpc_call_element *surface_element, grpc_byte_buffer *message); void grpc_call_read_closed(grpc_call_element *surface_element); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index f5ff9409f3..e771929870 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -431,7 +431,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { case GRPC_RECV_METADATA: - grpc_call_op_metadata_filter(&op->data.metadata, server_filter, elem); + grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); if (grpc_call_recv_metadata(elem, &op->data.metadata)) { calld->deadline = op->data.metadata.deadline; start_new_rpc(elem); diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index c30e3a27f1..38b4235931 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -171,3 +171,117 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops); sopb->nops = new_nops; } + + +static void assert_valid_list(grpc_mdelem_list *list) { + grpc_linked_mdelem *l; + + GPR_ASSERT((list->head == NULL) == (list->tail == NULL)); + if (!list->head) return; + GPR_ASSERT(list->head->prev == NULL); + GPR_ASSERT(list->tail->next == NULL); + GPR_ASSERT((list->head == list->tail) == (list->head->next == NULL)); + + for (l = list->head; l; l = l->next) { + GPR_ASSERT((l->prev == NULL) == (l == list->head)); + GPR_ASSERT((l->next == NULL) == (l == list->tail)); + if (l->next) GPR_ASSERT(l->next->prev == l); + if (l->prev) GPR_ASSERT(l->prev->next == l); + } +} + +void grpc_metadata_batch_init(grpc_metadata_batch *comd) { abort(); } + +void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) { abort(); } + +void grpc_metadata_batch_merge(grpc_metadata_batch *target, + grpc_metadata_batch *add) { + abort(); +} + +void grpc_metadata_batch_add_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add) { + storage->md = elem_to_add; + grpc_metadata_batch_link_head(comd, storage); +} + +static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { + assert_valid_list(list); + storage->prev = NULL; + storage->next = list->head; + if (list->head != NULL) { + list->head->prev = storage; + } else { + list->tail = storage; + } + list->head = storage; + assert_valid_list(list); +} + +void grpc_metadata_batch_link_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage) { + link_head(&comd->list, storage); +} + +void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add) { + storage->md = elem_to_add; + grpc_metadata_batch_link_tail(comd, storage); +} + +static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { + assert_valid_list(list); + storage->prev = list->tail; + storage->next = NULL; + if (list->tail != NULL) { + list->tail->next = storage; + } else { + list->head = storage; + } + list->tail = storage; + assert_valid_list(list); +} + +void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage) { + link_tail(&comd->list, storage); +} + +void grpc_metadata_batch_filter(grpc_metadata_batch *comd, + grpc_mdelem *(*filter)(void *user_data, + grpc_mdelem *elem), + void *user_data) { + grpc_linked_mdelem *l; + grpc_linked_mdelem *next; + + assert_valid_list(&comd->list); + assert_valid_list(&comd->garbage); + for (l = comd->list.head; l; l = next) { + grpc_mdelem *orig = l->md; + grpc_mdelem *filt = filter(user_data, orig); + next = l->next; + if (filt == NULL) { + if (l->prev) { + l->prev->next = l->next; + } + if (l->next) { + l->next->prev = l->prev; + } + if (comd->list.head == l) { + comd->list.head = l->next; + } + if (comd->list.tail == l) { + comd->list.tail = l->prev; + } + assert_valid_list(&comd->list); + link_head(&comd->garbage, l); + } else if (filt != orig) { + grpc_mdelem_unref(orig); + l->md = filt; + } + } + assert_valid_list(&comd->list); + assert_valid_list(&comd->garbage); +} diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 2ffbcce87b..81d08b60fb 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -76,6 +76,45 @@ typedef struct grpc_flow_ctl_cb { void *arg; } grpc_flow_ctl_cb; +typedef struct grpc_linked_mdelem { + grpc_mdelem *md; + struct grpc_linked_mdelem *next; + struct grpc_linked_mdelem *prev; +} grpc_linked_mdelem; + +typedef struct grpc_mdelem_list { + grpc_linked_mdelem *head; + grpc_linked_mdelem *tail; +} grpc_mdelem_list; + +typedef struct grpc_metadata_batch { + grpc_mdelem_list list; + grpc_mdelem_list garbage; + gpr_timespec deadline; +} grpc_metadata_batch; + +void grpc_metadata_batch_init(grpc_metadata_batch *comd); +void grpc_metadata_batch_destroy(grpc_metadata_batch *comd); +void grpc_metadata_batch_merge(grpc_metadata_batch *target, + grpc_metadata_batch *add); + +void grpc_metadata_batch_link_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage); +void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage); + +void grpc_metadata_batch_add_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add); +void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add); + +void grpc_metadata_batch_filter(grpc_metadata_batch *comd, + grpc_mdelem *(*filter)(void *user_data, + grpc_mdelem *elem), + void *user_data); + /* Represents a single operation performed on a stream/transport */ typedef struct grpc_stream_op { /* the operation to be applied */ |