diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/call_op_string.c | 29 | ||||
-rw-r--r-- | src/core/channel/census_filter.c | 13 | ||||
-rw-r--r-- | src/core/channel/channel_stack.c | 135 | ||||
-rw-r--r-- | src/core/channel/channel_stack.h | 43 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 51 | ||||
-rw-r--r-- | src/core/channel/connected_channel.c | 92 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 64 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 240 | ||||
-rw-r--r-- | src/core/channel/metadata_buffer.c | 149 | ||||
-rw-r--r-- | src/core/channel/metadata_buffer.h | 70 | ||||
-rw-r--r-- | src/core/security/auth.c | 52 | ||||
-rw-r--r-- | src/core/surface/call.c | 183 | ||||
-rw-r--r-- | src/core/surface/call.h | 17 | ||||
-rw-r--r-- | src/core/surface/channel.c | 52 | ||||
-rw-r--r-- | src/core/surface/client.c | 12 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 21 | ||||
-rw-r--r-- | src/core/surface/server.c | 39 |
17 files changed, 531 insertions, 731 deletions
diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c index 08f2e95deb..23e22d93d1 100644 --- a/src/core/channel/call_op_string.c +++ b/src/core/channel/call_op_string.c @@ -51,6 +51,18 @@ static void put_metadata(gpr_strvec *b, grpc_mdelem *md) { GPR_SLICE_LENGTH(md->value->slice), GPR_HEXDUMP_PLAINTEXT)); } +static void put_metadata_list(gpr_strvec *b, grpc_call_op_metadata md) { + grpc_linked_mdelem *m; + for (m = md.list.head; m; m = m->next) { + put_metadata(b, m->md); + } + if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) { + char *tmp; + gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec, md.deadline.tv_nsec); + gpr_strvec_add(b, tmp); + } +} + char *grpc_call_op_string(grpc_call_op *op) { char *tmp; char *out; @@ -69,12 +81,7 @@ char *grpc_call_op_string(grpc_call_op *op) { switch (op->type) { case GRPC_SEND_METADATA: gpr_strvec_add(&b, gpr_strdup("SEND_METADATA")); - put_metadata(&b, op->data.metadata); - break; - case GRPC_SEND_DEADLINE: - gpr_asprintf(&tmp, "SEND_DEADLINE %d.%09d", op->data.deadline.tv_sec, - op->data.deadline.tv_nsec); - gpr_strvec_add(&b, tmp); + put_metadata_list(&b, op->data.metadata); break; case GRPC_SEND_START: gpr_asprintf(&tmp, "SEND_START pollset=%p", op->data.start.pollset); @@ -94,15 +101,7 @@ char *grpc_call_op_string(grpc_call_op *op) { break; case GRPC_RECV_METADATA: gpr_strvec_add(&b, gpr_strdup("RECV_METADATA")); - put_metadata(&b, op->data.metadata); - break; - case GRPC_RECV_DEADLINE: - gpr_asprintf(&tmp, "RECV_DEADLINE %d.%09d", op->data.deadline.tv_sec, - op->data.deadline.tv_nsec); - gpr_strvec_add(&b, tmp); - break; - case GRPC_RECV_END_OF_INITIAL_METADATA: - gpr_strvec_add(&b, gpr_strdup("RECV_END_OF_INITIAL_METADATA")); + put_metadata_list(&b, op->data.metadata); break; case GRPC_RECV_MESSAGE: gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE")); diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c index ba7b7ba59c..7031e523da 100644 --- a/src/core/channel/census_filter.c +++ b/src/core/channel/census_filter.c @@ -62,11 +62,14 @@ static void init_rpc_stats(census_rpc_stats* stats) { static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld, channel_data* chand) { - if (op->data.metadata->key == chand->path_str) { - gpr_log(GPR_DEBUG, - (const char*)GPR_SLICE_START_PTR(op->data.metadata->value->slice)); - census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR( - op->data.metadata->value->slice)); + grpc_linked_mdelem *m; + for (m = op->data.metadata.list.head; m; m = m->next) { + if (m->md->key == chand->path_str) { + gpr_log(GPR_DEBUG, + (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); + census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR( + m->md->value->slice)); + } } } diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 21df9771ce..874d6c54fe 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -205,30 +205,6 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { static void do_nothing(void *user_data, grpc_op_error error) {} -void grpc_call_element_recv_metadata(grpc_call_element *cur_elem, - grpc_mdelem *mdelem) { - grpc_call_op metadata_op; - metadata_op.type = GRPC_RECV_METADATA; - metadata_op.dir = GRPC_CALL_UP; - metadata_op.done_cb = do_nothing; - metadata_op.user_data = NULL; - metadata_op.flags = 0; - metadata_op.data.metadata = mdelem; - grpc_call_next_op(cur_elem, &metadata_op); -} - -void grpc_call_element_send_metadata(grpc_call_element *cur_elem, - grpc_mdelem *mdelem) { - grpc_call_op metadata_op; - metadata_op.type = GRPC_SEND_METADATA; - metadata_op.dir = GRPC_CALL_DOWN; - metadata_op.done_cb = do_nothing; - metadata_op.user_data = NULL; - metadata_op.flags = 0; - metadata_op.data.metadata = mdelem; - grpc_call_next_op(cur_elem, &metadata_op); -} - void grpc_call_element_send_cancel(grpc_call_element *cur_elem) { grpc_call_op cancel_op; cancel_op.type = GRPC_CANCEL_OP; @@ -248,3 +224,114 @@ void grpc_call_element_send_finish(grpc_call_element *cur_elem) { finish_op.flags = 0; grpc_call_next_op(cur_elem, &finish_op); } + +void grpc_call_element_recv_status(grpc_call_element *cur_elem, grpc_status_code status, const char *message) { + abort(); +} + +static void assert_valid_list(grpc_mdelem_list *list) { + grpc_linked_mdelem *l; + + GPR_ASSERT((list->head == NULL) == (list->tail == NULL)); + if (!list->head) return; + GPR_ASSERT(list->head->prev == NULL); + GPR_ASSERT(list->tail->next == NULL); + GPR_ASSERT((list->head == list->tail) == (list->head->next == NULL)); + + for (l = list->head; l; l = l->next) { + GPR_ASSERT((l->prev == NULL) == (l == list->head)); + GPR_ASSERT((l->next == NULL) == (l == list->tail)); + if (l->next) GPR_ASSERT(l->next->prev == l); + if (l->prev) GPR_ASSERT(l->prev->next == l); + } +} + +void grpc_call_op_metadata_init(grpc_call_op_metadata *comd) { + abort(); +} + +void grpc_call_op_metadata_destroy(grpc_call_op_metadata *comd) { + abort(); +} + +void grpc_call_op_metadata_merge(grpc_call_op_metadata *target, grpc_call_op_metadata *add) { + abort(); +} + +void grpc_call_op_metadata_add_head(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage, grpc_mdelem *elem_to_add) { + storage->md = elem_to_add; + grpc_call_op_metadata_link_head(comd, storage); +} + +static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { + assert_valid_list(list); + storage->prev = NULL; + storage->next = list->head; + if (list->head != NULL) { + list->head->prev = storage; + } else { + list->tail = storage; + } + list->head = storage; + assert_valid_list(list); +} + +void grpc_call_op_metadata_link_head(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage) { + link_head(&comd->list, storage); +} + +void grpc_call_op_metadata_add_tail(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage, grpc_mdelem *elem_to_add) { + storage->md = elem_to_add; + grpc_call_op_metadata_link_tail(comd, storage); +} + +static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { + assert_valid_list(list); + storage->prev = list->tail; + storage->next = NULL; + if (list->tail != NULL) { + list->tail->next = storage; + } else { + list->head = storage; + } + list->tail = storage; + assert_valid_list(list); +} + +void grpc_call_op_metadata_link_tail(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage) { + link_tail(&comd->list, storage); +} + +void grpc_call_op_metadata_filter(grpc_call_op_metadata *comd, grpc_mdelem *(*filter)(void *user_data, grpc_mdelem *elem), void *user_data) { + grpc_linked_mdelem *l; + grpc_linked_mdelem *next; + + assert_valid_list(&comd->list); + assert_valid_list(&comd->garbage); + for (l = comd->list.head; l; l = next) { + grpc_mdelem *orig = l->md; + grpc_mdelem *filt = filter(user_data, orig); + next = l->next; + if (filt == NULL) { + if (l->prev) { + l->prev->next = l->next; + } + if (l->next) { + l->next->prev = l->prev; + } + if (comd->list.head == l) { + comd->list.head = l->next; + } + if (comd->list.tail == l) { + comd->list.tail = l->prev; + } + assert_valid_list(&comd->list); + link_head(&comd->garbage, l); + } else if (filt != orig) { + grpc_mdelem_unref(orig); + l->md = filt; + } + } + assert_valid_list(&comd->list); + assert_valid_list(&comd->garbage); +} diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index ef1da7b33b..53143ac278 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -62,8 +62,6 @@ typedef struct grpc_call_element grpc_call_element; typedef enum { /* send metadata to the channels peer */ GRPC_SEND_METADATA, - /* send a deadline */ - GRPC_SEND_DEADLINE, /* start a connection (corresponds to start_invoke/accept) */ GRPC_SEND_START, /* send a message to the channels peer */ @@ -76,10 +74,6 @@ typedef enum { GRPC_REQUEST_DATA, /* metadata was received from the channels peer */ GRPC_RECV_METADATA, - /* receive a deadline */ - GRPC_RECV_DEADLINE, - /* the end of the first batch of metadata was received */ - GRPC_RECV_END_OF_INITIAL_METADATA, /* a message was received from the channels peer */ GRPC_RECV_MESSAGE, /* half-close was received from the channels peer */ @@ -95,6 +89,35 @@ typedef enum { or decrement a pointer to find the next element to call */ typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir; +typedef struct grpc_linked_mdelem { + grpc_mdelem *md; + struct grpc_linked_mdelem *next; + struct grpc_linked_mdelem *prev; +} grpc_linked_mdelem; + +typedef struct grpc_mdelem_list { + grpc_linked_mdelem *head; + grpc_linked_mdelem *tail; +} grpc_mdelem_list; + +typedef struct grpc_call_op_metadata { + grpc_mdelem_list list; + grpc_mdelem_list garbage; + gpr_timespec deadline; +} grpc_call_op_metadata; + +void grpc_call_op_metadata_init(grpc_call_op_metadata *comd); +void grpc_call_op_metadata_destroy(grpc_call_op_metadata *comd); +void grpc_call_op_metadata_merge(grpc_call_op_metadata *target, grpc_call_op_metadata *add); + +void grpc_call_op_metadata_link_head(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage); +void grpc_call_op_metadata_link_tail(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage); + +void grpc_call_op_metadata_add_head(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage, grpc_mdelem *elem_to_add); +void grpc_call_op_metadata_add_tail(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage, grpc_mdelem *elem_to_add); + +void grpc_call_op_metadata_filter(grpc_call_op_metadata *comd, grpc_mdelem *(*filter)(void *user_data, grpc_mdelem *elem), void *user_data); + /* A single filterable operation to be performed on a call */ typedef struct { /* The type of operation we're performing */ @@ -113,8 +136,7 @@ typedef struct { grpc_pollset *pollset; } start; grpc_byte_buffer *message; - grpc_mdelem *metadata; - gpr_timespec deadline; + grpc_call_op_metadata metadata; } data; /* Must be called when processing of this call-op is complete. @@ -291,12 +313,9 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem); void grpc_call_log_op(char *file, int line, gpr_log_severity severity, grpc_call_element *elem, grpc_call_op *op); -void grpc_call_element_send_metadata(grpc_call_element *cur_elem, - grpc_mdelem *elem); -void grpc_call_element_recv_metadata(grpc_call_element *cur_elem, - grpc_mdelem *elem); void grpc_call_element_send_cancel(grpc_call_element *cur_elem); void grpc_call_element_send_finish(grpc_call_element *cur_elem); +void grpc_call_element_recv_status(grpc_call_element *cur_elem, grpc_status_code status, const char *message); extern int grpc_trace_channel; diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 9791f98be8..7b9c080eff 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -38,7 +38,6 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/child_channel.h" #include "src/core/channel/connected_channel.h" -#include "src/core/channel/metadata_buffer.h" #include "src/core/iomgr/iomgr.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> @@ -70,9 +69,6 @@ typedef struct { int transport_setup_initiated; grpc_channel_args *args; - - /* metadata cache */ - grpc_mdelem *cancel_status; } channel_data; typedef enum { @@ -87,7 +83,8 @@ struct call_data { grpc_call_element *elem; call_state state; - grpc_metadata_buffer pending_metadata; + grpc_call_op_metadata pending_metadata; + gpr_uint32 pending_metadata_flags; gpr_timespec deadline; union { struct { @@ -124,22 +121,18 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) { call_data *calld = elem->call_data; grpc_call_element *child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); + grpc_call_op mop; GPR_ASSERT(calld->state == CALL_ACTIVE); /* sending buffered metadata down the stack before the start call */ - grpc_metadata_buffer_flush(&calld->pending_metadata, child_elem); - - if (gpr_time_cmp(calld->deadline, gpr_inf_future) != 0) { - grpc_call_op dop; - dop.type = GRPC_SEND_DEADLINE; - dop.dir = GRPC_CALL_DOWN; - dop.flags = 0; - dop.data.deadline = calld->deadline; - dop.done_cb = do_nothing; - dop.user_data = NULL; - child_elem->filter->call_op(child_elem, elem, &dop); - } + mop.type = GRPC_SEND_METADATA; + mop.dir = GRPC_CALL_DOWN; + mop.flags = calld->pending_metadata_flags; + mop.data.metadata = calld->pending_metadata; + mop.done_cb = do_nothing; + mop.user_data = NULL; + child_elem->filter->call_op(child_elem, elem, &mop); /* continue the start call down the stack, this nees to happen after metadata are flushed*/ @@ -212,15 +205,8 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { static void send_up_cancelled_ops(grpc_call_element *elem) { grpc_call_op finish_op; - channel_data *chand = elem->channel_data; /* send up a synthesized status */ - finish_op.type = GRPC_RECV_METADATA; - finish_op.dir = GRPC_CALL_UP; - finish_op.flags = 0; - finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status); - finish_op.done_cb = do_nothing; - finish_op.user_data = NULL; - grpc_call_next_op(elem, &finish_op); + grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled"); /* send up a finish */ finish_op.type = GRPC_RECV_FINISH; finish_op.dir = GRPC_CALL_UP; @@ -271,10 +257,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, switch (op->type) { case GRPC_SEND_METADATA: - grpc_metadata_buffer_queue(&calld->pending_metadata, op); - break; - case GRPC_SEND_DEADLINE: - calld->deadline = op->data.deadline; + grpc_call_op_metadata_merge(&calld->pending_metadata, &op->data.metadata); op->done_cb(op->user_data, GRPC_OP_OK); break; case GRPC_SEND_START: @@ -400,7 +383,7 @@ static void init_call_elem(grpc_call_element *elem, calld->deadline = gpr_inf_future; calld->s.waiting.on_complete = error_bad_on_complete; calld->s.waiting.on_complete_user_data = NULL; - grpc_metadata_buffer_init(&calld->pending_metadata); + grpc_call_op_metadata_init(&calld->pending_metadata); } /* Destructor for call_data */ @@ -408,7 +391,7 @@ static void destroy_call_elem(grpc_call_element *elem) { call_data *calld = elem->call_data; /* if the metadata buffer is not flushed, destroy it here. */ - grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK); + grpc_call_op_metadata_destroy(&calld->pending_metadata); /* if the call got activated, we need to destroy the child stack also, and remove it from the in-flight requests tracked by the child_entry we picked */ @@ -423,7 +406,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_mdctx *metadata_context, int is_first, int is_last) { channel_data *chand = elem->channel_data; - char temp[GPR_LTOA_MIN_BUFSIZE]; GPR_ASSERT(!is_first); GPR_ASSERT(is_last); @@ -437,10 +419,6 @@ static void init_channel_elem(grpc_channel_element *elem, chand->transport_setup = NULL; chand->transport_setup_initiated = 0; chand->args = grpc_channel_args_copy(args); - - gpr_ltoa(GRPC_STATUS_CANCELLED, temp); - chand->cancel_status = - grpc_mdelem_from_strings(metadata_context, "grpc-status", temp); } /* Destructor for channel_data */ @@ -455,7 +433,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } grpc_channel_args_destroy(chand->args); - grpc_mdelem_unref(chand->cancel_status); gpr_mu_destroy(&chand->mu); GPR_ASSERT(chand->waiting_child_count == 0); diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 62611e08f3..519f05cb66 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -60,10 +60,14 @@ typedef struct connected_channel_call_data { gpr_uint32 max_message_length; gpr_uint32 incoming_message_length; gpr_uint8 reading_message; - gpr_uint8 got_metadata_boundary; gpr_uint8 got_read_close; gpr_slice_buffer incoming_message; gpr_uint32 outgoing_buffer_length_estimate; + + grpc_linked_mdelem *incoming_metadata; + size_t incoming_metadata_count; + size_t incoming_metadata_capacity; + gpr_timespec deadline; } call_data; /* We perform a small hack to locate transport data alongside the connected @@ -116,18 +120,19 @@ static void end_bufferable_op(grpc_call_op *op, channel_data *chand, static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_op *op) { call_data *calld = elem->call_data; + grpc_linked_mdelem *m; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { case GRPC_SEND_METADATA: - grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata); - grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, - op->user_data); - break; - case GRPC_SEND_DEADLINE: - grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.deadline); + for (m = op->data.metadata.list.head; m; m = m->next) { + grpc_sopb_add_metadata(&calld->outgoing_sopb, m->md); + } + if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) { + grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.metadata.deadline); + } grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data); break; @@ -200,10 +205,12 @@ static void init_call_elem(grpc_call_element *elem, grpc_sopb_init(&calld->outgoing_sopb); calld->reading_message = 0; - calld->got_metadata_boundary = 0; calld->got_read_close = 0; calld->outgoing_buffer_length_estimate = 0; calld->max_message_length = chand->max_message_length; + calld->incoming_metadata = NULL; + calld->incoming_metadata_capacity = 0; + calld->incoming_metadata_count = 0; gpr_slice_buffer_init(&calld->incoming_message); r = grpc_transport_init_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), @@ -320,6 +327,49 @@ static void finish_message(channel_data *chand, call_data *calld) { grpc_call_next_op(elem, &call_op); } +static void metadata_done_cb(void *ptr, grpc_op_error error) { + gpr_free(ptr); +} + +static void add_incoming_metadata(call_data *calld, grpc_mdelem *elem) { + if (calld->incoming_metadata_count == calld->incoming_metadata_capacity) { + calld->incoming_metadata_capacity = GPR_MAX(8, 2 * calld->incoming_metadata_capacity); + calld->incoming_metadata = gpr_realloc(calld->incoming_metadata, sizeof(*calld->incoming_metadata) * calld->incoming_metadata_capacity); + } + calld->incoming_metadata[calld->incoming_metadata_count++].md = elem; +} + +static void flush_metadata(grpc_call_element *elem) { + grpc_call_op op; + call_data *calld = elem->call_data; + size_t i; + + for (i = 1; i < calld->incoming_metadata_count; i++) { + calld->incoming_metadata[i].prev = &calld->incoming_metadata[i-1]; + } + for (i = 0; i < calld->incoming_metadata_count - 1; i++) { + calld->incoming_metadata[i].next = &calld->incoming_metadata[i+1]; + } + + calld->incoming_metadata[0].prev = calld->incoming_metadata[calld->incoming_metadata_count-1].next = NULL; + + op.type = GRPC_RECV_METADATA; + op.dir = GRPC_CALL_UP; + op.flags = 0; + op.data.metadata.list.head = &calld->incoming_metadata[0]; + op.data.metadata.list.tail = &calld->incoming_metadata[calld->incoming_metadata_count - 1]; + op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; + op.data.metadata.deadline = calld->deadline; + op.done_cb = metadata_done_cb; + op.user_data = calld->incoming_metadata; + + grpc_call_next_op(elem, &op); + + calld->incoming_metadata = NULL; + calld->incoming_metadata_count = 0; + calld->incoming_metadata_capacity = 0; +} + /* Handle incoming stream ops from the transport, translating them into call_ops to pass up the call stack */ static void recv_batch(void *user_data, grpc_transport *transport, @@ -346,33 +396,13 @@ static void recv_batch(void *user_data, grpc_transport *transport, case GRPC_NO_OP: break; case GRPC_OP_METADATA: - call_op.type = GRPC_RECV_METADATA; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.data.metadata = stream_op->data.metadata; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); + add_incoming_metadata(calld, stream_op->data.metadata); break; case GRPC_OP_DEADLINE: - call_op.type = GRPC_RECV_DEADLINE; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.data.deadline = stream_op->data.deadline; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); + calld->deadline = stream_op->data.deadline; break; case GRPC_OP_METADATA_BOUNDARY: - if (!calld->got_metadata_boundary) { - calld->got_metadata_boundary = 1; - call_op.type = GRPC_RECV_END_OF_INITIAL_METADATA; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - } + flush_metadata(elem); break; case GRPC_OP_BEGIN_MESSAGE: /* can't begin a message when we're still reading a message */ diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 3ccc39b717..5506fa180a 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -35,7 +35,10 @@ #include <grpc/support/log.h> typedef struct call_data { - int sent_headers; + grpc_linked_mdelem method; + grpc_linked_mdelem scheme; + grpc_linked_mdelem te_trailers; + grpc_linked_mdelem content_type; } call_data; typedef struct channel_data { @@ -49,6 +52,18 @@ typedef struct channel_data { /* used to silence 'variable not used' warnings */ static void ignore_unused(void *ignored) {} +static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; + channel_data *channeld = elem->channel_data; + if (md == channeld->status) { + return NULL; + } else if (md->key == channeld->status->key) { + grpc_call_element_send_cancel(elem); + return NULL; + } + return md; +} + /* Called either: - in response to an API call (or similar) from above, to send something - a network event (or similar) from below, to receive something @@ -61,42 +76,19 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, channel_data *channeld = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - ignore_unused(calld); - switch (op->type) { case GRPC_SEND_METADATA: - if (!calld->sent_headers) { - /* Send : prefixed headers, which have to be before any application - * layer headers. */ - calld->sent_headers = 1; - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->method)); - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->scheme)); - } - grpc_call_next_op(elem, op); - break; - case GRPC_SEND_START: - if (!calld->sent_headers) { - /* Send : prefixed headers, if we haven't already */ - calld->sent_headers = 1; - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->method)); - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->scheme)); - } - /* Send non : prefixed headers */ - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->te_trailers)); - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->content_type)); + /* Send : prefixed headers, which have to be before any application + * layer headers. */ + grpc_call_op_metadata_add_head(&op->data.metadata, &calld->method, grpc_mdelem_ref(channeld->method)); + grpc_call_op_metadata_add_head(&op->data.metadata, &calld->scheme, grpc_mdelem_ref(channeld->scheme)); + grpc_call_op_metadata_add_tail(&op->data.metadata, &calld->te_trailers, grpc_mdelem_ref(channeld->te_trailers)); + grpc_call_op_metadata_add_tail(&op->data.metadata, &calld->content_type, grpc_mdelem_ref(channeld->content_type)); grpc_call_next_op(elem, op); break; case GRPC_RECV_METADATA: - if (op->data.metadata == channeld->status) { - grpc_mdelem_unref(op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_OK); - } else if (op->data.metadata->key == channeld->status->key) { - grpc_mdelem_unref(op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_OK); - grpc_call_element_send_cancel(elem); - } else { - grpc_call_next_op(elem, op); - } + grpc_call_op_metadata_filter(&op->data.metadata, client_filter, elem); + grpc_call_next_op(elem, op); break; default: /* pass control up or down the stack depending on op->dir */ @@ -125,14 +117,6 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - - ignore_unused(channeld); - - /* initialize members */ - calld->sent_headers = 0; } /* Destructor for call_data */ diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 9da8b333ca..535bcfdca7 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -38,8 +38,6 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -typedef enum { NOT_RECEIVED, POST, GET } known_method_type; - typedef struct { grpc_mdelem *path; grpc_mdelem *content_type; @@ -47,16 +45,17 @@ typedef struct { } gettable; typedef struct call_data { - known_method_type seen_method; + gpr_uint8 got_initial_metadata; + gpr_uint8 seen_path; + gpr_uint8 seen_post; gpr_uint8 sent_status; gpr_uint8 seen_scheme; gpr_uint8 seen_te_trailers; - grpc_mdelem *path; + grpc_linked_mdelem status; } call_data; typedef struct channel_data { grpc_mdelem *te_trailers; - grpc_mdelem *method_get; grpc_mdelem *method_post; grpc_mdelem *http_scheme; grpc_mdelem *https_scheme; @@ -78,38 +77,75 @@ typedef struct channel_data { /* used to silence 'variable not used' warnings */ static void ignore_unused(void *ignored) {} -/* Handle 'GET': not technically grpc, so probably a web browser hitting - us */ -static void payload_done(void *elem, grpc_op_error error) { - if (error == GRPC_OP_OK) { - grpc_call_element_send_finish(elem); - } -} - -static void handle_get(grpc_call_element *elem) { +static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; channel_data *channeld = elem->channel_data; call_data *calld = elem->call_data; - grpc_call_op op; - size_t i; - for (i = 0; i < channeld->gettable_count; i++) { - if (channeld->gettables[i].path == calld->path) { - grpc_call_element_send_metadata(elem, - grpc_mdelem_ref(channeld->status_ok)); - grpc_call_element_send_metadata( - elem, grpc_mdelem_ref(channeld->gettables[i].content_type)); - op.type = GRPC_SEND_PREFORMATTED_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.message = channeld->gettables[i].content; - op.done_cb = payload_done; - op.user_data = elem; - grpc_call_next_op(elem, &op); + /* Check if it is one of the headers we care about. */ + if (md == channeld->te_trailers || + md == channeld->method_post || + md == channeld->http_scheme || + md == channeld->https_scheme || + md == channeld->grpc_scheme || + md == channeld->content_type) { + /* swallow it */ + if (md == channeld->method_post) { + calld->seen_post = 1; + } else if (md->key == channeld->http_scheme->key) { + calld->seen_scheme = 1; + } else if (md == channeld->te_trailers) { + calld->seen_te_trailers = 1; } + /* TODO(klempner): Track that we've seen all the headers we should + require */ + return NULL; + } else if (md->key == channeld->content_type->key) { + if (strncmp(grpc_mdstr_as_c_string(md->value), + "application/grpc+", 17) == 0) { + /* Although the C implementation doesn't (currently) generate them, + any + custom +-suffix is explicitly valid. */ + /* TODO(klempner): We should consider preallocating common values such + as +proto or +json, or at least stashing them if we see them. */ + /* TODO(klempner): Should we be surfacing this to application code? */ + } else { + /* TODO(klempner): We're currently allowing this, but we shouldn't + see it without a proxy so log for now. */ + gpr_log(GPR_INFO, "Unexpected content-type %s", + channeld->content_type->key); + } + return NULL; + } else if (md->key == channeld->te_trailers->key || + md->key == channeld->method_post->key || + md->key == channeld->http_scheme->key || + md->key == channeld->content_type->key) { + gpr_log(GPR_ERROR, "Invalid %s: header: '%s'", + grpc_mdstr_as_c_string(md->key), + grpc_mdstr_as_c_string(md->value)); + /* swallow it and error everything out. */ + /* TODO(klempner): We ought to generate more descriptive error messages + on the wire here. */ + grpc_call_element_send_cancel(elem); + return NULL; + } else if (md->key == channeld->path_key) { + if (calld->seen_path) { + gpr_log(GPR_ERROR, "Received :path twice"); + return NULL; + } + calld->seen_path = 1; + return md; + } else if (md->key == channeld->host_key) { + /* translate host to :authority since :authority may be + omitted */ + grpc_mdelem *authority = grpc_mdelem_from_metadata_strings( + channeld->mdctx, grpc_mdstr_ref(channeld->authority_key), + grpc_mdstr_ref(md->value)); + grpc_mdelem_unref(md); + return authority; + } else { + return md; } - grpc_call_element_send_metadata(elem, - grpc_mdelem_ref(channeld->status_not_found)); - grpc_call_element_send_finish(elem); } /* Called either: @@ -126,116 +162,37 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, switch (op->type) { case GRPC_RECV_METADATA: - /* Check if it is one of the headers we care about. */ - if (op->data.metadata == channeld->te_trailers || - op->data.metadata == channeld->method_get || - op->data.metadata == channeld->method_post || - op->data.metadata == channeld->http_scheme || - op->data.metadata == channeld->https_scheme || - op->data.metadata == channeld->grpc_scheme || - op->data.metadata == channeld->content_type) { - /* swallow it */ - if (op->data.metadata == channeld->method_get) { - calld->seen_method = GET; - } else if (op->data.metadata == channeld->method_post) { - calld->seen_method = POST; - } else if (op->data.metadata->key == channeld->http_scheme->key) { - calld->seen_scheme = 1; - } else if (op->data.metadata == channeld->te_trailers) { - calld->seen_te_trailers = 1; - } - /* TODO(klempner): Track that we've seen all the headers we should - require */ - grpc_mdelem_unref(op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_OK); - } else if (op->data.metadata->key == channeld->content_type->key) { - if (strncmp(grpc_mdstr_as_c_string(op->data.metadata->value), - "application/grpc+", 17) == 0) { - /* Although the C implementation doesn't (currently) generate them, - any - custom +-suffix is explicitly valid. */ - /* TODO(klempner): We should consider preallocating common values such - as +proto or +json, or at least stashing them if we see them. */ - /* TODO(klempner): Should we be surfacing this to application code? */ + grpc_call_op_metadata_filter(&op->data.metadata, server_filter, elem); + if (!calld->got_initial_metadata) { + calld->got_initial_metadata = 1; + /* Have we seen the required http2 transport headers? + (:method, :scheme, content-type, with :path and :authority covered + at the channel level right now) */ + if (calld->seen_post && calld->seen_scheme && + calld->seen_te_trailers && calld->seen_path) { + grpc_call_next_op(elem, op); } else { - /* TODO(klempner): We're currently allowing this, but we shouldn't - see it without a proxy so log for now. */ - gpr_log(GPR_INFO, "Unexpected content-type %s", - channeld->content_type->key); - } - grpc_mdelem_unref(op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_OK); - } else if (op->data.metadata->key == channeld->te_trailers->key || - op->data.metadata->key == channeld->method_post->key || - op->data.metadata->key == channeld->http_scheme->key || - op->data.metadata->key == channeld->content_type->key) { - gpr_log(GPR_ERROR, "Invalid %s: header: '%s'", - grpc_mdstr_as_c_string(op->data.metadata->key), - grpc_mdstr_as_c_string(op->data.metadata->value)); - /* swallow it and error everything out. */ - /* TODO(klempner): We ought to generate more descriptive error messages - on the wire here. */ - grpc_mdelem_unref(op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_OK); - grpc_call_element_send_cancel(elem); - } else if (op->data.metadata->key == channeld->path_key) { - if (calld->path != NULL) { - gpr_log(GPR_ERROR, "Received :path twice"); - grpc_mdelem_unref(calld->path); + if (!calld->seen_post) { + gpr_log(GPR_ERROR, "Missing :method header"); + } + if (!calld->seen_scheme) { + gpr_log(GPR_ERROR, "Missing :scheme header"); + } + if (!calld->seen_te_trailers) { + gpr_log(GPR_ERROR, "Missing te trailers header"); + } + /* Error this call out */ + op->done_cb(op->user_data, GRPC_OP_OK); + grpc_call_element_send_cancel(elem); } - calld->path = op->data.metadata; - op->done_cb(op->user_data, GRPC_OP_OK); - } else if (op->data.metadata->key == channeld->host_key) { - /* translate host to :authority since :authority may be - omitted */ - grpc_mdelem *authority = grpc_mdelem_from_metadata_strings( - channeld->mdctx, grpc_mdstr_ref(channeld->authority_key), - grpc_mdstr_ref(op->data.metadata->value)); - grpc_mdelem_unref(op->data.metadata); - op->data.metadata = authority; - /* pass the event up */ - grpc_call_next_op(elem, op); } else { - /* pass the event up */ grpc_call_next_op(elem, op); } break; - case GRPC_RECV_END_OF_INITIAL_METADATA: - /* Have we seen the required http2 transport headers? - (:method, :scheme, content-type, with :path and :authority covered - at the channel level right now) */ - if (calld->seen_method == POST && calld->seen_scheme && - calld->seen_te_trailers && calld->path) { - grpc_call_element_recv_metadata(elem, calld->path); - calld->path = NULL; - grpc_call_next_op(elem, op); - } else if (calld->seen_method == GET) { - handle_get(elem); - } else { - if (calld->seen_method == NOT_RECEIVED) { - gpr_log(GPR_ERROR, "Missing :method header"); - } - if (!calld->seen_scheme) { - gpr_log(GPR_ERROR, "Missing :scheme header"); - } - if (!calld->seen_te_trailers) { - gpr_log(GPR_ERROR, "Missing te trailers header"); - } - /* Error this call out */ - op->done_cb(op->user_data, GRPC_OP_OK); - grpc_call_element_send_cancel(elem); - } - break; - case GRPC_SEND_START: case GRPC_SEND_METADATA: /* If we haven't sent status 200 yet, we need to so so because it needs to come before any non : prefixed metadata. */ - if (!calld->sent_status) { - calld->sent_status = 1; - /* status is reffed by grpc_call_element_send_metadata */ - grpc_call_element_send_metadata(elem, - grpc_mdelem_ref(channeld->status_ok)); - } + grpc_call_op_metadata_add_head(&op->data.metadata, &calld->status, grpc_mdelem_ref(channeld->status_ok)); grpc_call_next_op(elem, op); break; default: @@ -272,24 +229,11 @@ static void init_call_elem(grpc_call_element *elem, ignore_unused(channeld); /* initialize members */ - calld->path = NULL; - calld->sent_status = 0; - calld->seen_scheme = 0; - calld->seen_method = NOT_RECEIVED; - calld->seen_te_trailers = 0; + memset(calld, 0, sizeof(*calld)); } /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element *elem) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - - ignore_unused(channeld); - - if (calld->path) { - grpc_mdelem_unref(calld->path); - } } /* Constructor for channel_data */ @@ -314,7 +258,6 @@ static void init_channel_elem(grpc_channel_element *elem, channeld->status_not_found = grpc_mdelem_from_strings(mdctx, ":status", "404"); channeld->method_post = grpc_mdelem_from_strings(mdctx, ":method", "POST"); - channeld->method_get = grpc_mdelem_from_strings(mdctx, ":method", "GET"); channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http"); channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https"); channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc"); @@ -369,7 +312,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { grpc_mdelem_unref(channeld->status_ok); grpc_mdelem_unref(channeld->status_not_found); grpc_mdelem_unref(channeld->method_post); - grpc_mdelem_unref(channeld->method_get); grpc_mdelem_unref(channeld->http_scheme); grpc_mdelem_unref(channeld->https_scheme); grpc_mdelem_unref(channeld->grpc_scheme); diff --git a/src/core/channel/metadata_buffer.c b/src/core/channel/metadata_buffer.c deleted file mode 100644 index eac852e4a4..0000000000 --- a/src/core/channel/metadata_buffer.c +++ /dev/null @@ -1,149 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/channel/metadata_buffer.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/useful.h> - -#include <string.h> - -#define INITIAL_ELEM_CAP 8 - -/* One queued call; we track offsets to string data in a shared buffer to - reduce allocations. See grpc_metadata_buffer_impl for the memory use - strategy */ -typedef struct { - grpc_mdelem *md; - void (*cb)(void *user_data, grpc_op_error error); - void *user_data; - gpr_uint32 flags; -} qelem; - -/* Memory layout: - - grpc_metadata_buffer_impl - followed by an array of qelem */ -struct grpc_metadata_buffer_impl { - /* number of elements in q */ - size_t elems; - /* capacity of q */ - size_t elem_cap; -}; - -#define ELEMS(buffer) ((qelem *)((buffer) + 1)) - -void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer) { - /* start buffer as NULL, indicating no elements */ - *buffer = NULL; -} - -void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer, - grpc_op_error error) { - size_t i; - qelem *qe; - if (*buffer) { - for (i = 0; i < (*buffer)->elems; i++) { - qe = &ELEMS(*buffer)[i]; - grpc_mdelem_unref(qe->md); - qe->cb(qe->user_data, error); - } - gpr_free(*buffer); - } -} - -void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer, - grpc_call_op *op) { - grpc_metadata_buffer_impl *impl = *buffer; - qelem *qe; - size_t bytes; - - GPR_ASSERT(op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA); - - if (!impl) { - /* this is the first element: allocate enough space to hold the - header object and the initial element capacity of qelems */ - bytes = - sizeof(grpc_metadata_buffer_impl) + INITIAL_ELEM_CAP * sizeof(qelem); - impl = gpr_malloc(bytes); - /* initialize the header object */ - impl->elems = 0; - impl->elem_cap = INITIAL_ELEM_CAP; - } else if (impl->elems == impl->elem_cap) { - /* more qelems than what we can deal with: grow by doubling size */ - impl->elem_cap *= 2; - bytes = sizeof(grpc_metadata_buffer_impl) + impl->elem_cap * sizeof(qelem); - impl = gpr_realloc(impl, bytes); - } - - /* append an element to the queue */ - qe = &ELEMS(impl)[impl->elems]; - impl->elems++; - - qe->md = op->data.metadata; - qe->cb = op->done_cb; - qe->user_data = op->user_data; - qe->flags = op->flags; - - /* header object may have changed location: store it back */ - *buffer = impl; -} - -void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer, - grpc_call_element *elem) { - grpc_metadata_buffer_impl *impl = *buffer; - grpc_call_op op; - qelem *qe; - size_t i; - - if (!impl) { - /* nothing to send */ - return; - } - - /* construct call_op's, and push them down the stack */ - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - for (i = 0; i < impl->elems; i++) { - qe = &ELEMS(impl)[i]; - op.done_cb = qe->cb; - op.user_data = qe->user_data; - op.flags = qe->flags; - op.data.metadata = qe->md; - grpc_call_next_op(elem, &op); - } - - /* free data structures and reset to NULL: we can only flush once */ - gpr_free(impl); - *buffer = NULL; -} diff --git a/src/core/channel/metadata_buffer.h b/src/core/channel/metadata_buffer.h deleted file mode 100644 index b7cc5170d1..0000000000 --- a/src/core/channel/metadata_buffer.h +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H -#define GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H - -#include "src/core/channel/channel_stack.h" - -/* Utility code to buffer GRPC_SEND_METADATA calls and pass them down the stack - all at once at some otherwise-determined time. Useful for implementing - filters that want to queue metadata until a START event chooses some - underlying filter stack to send an rpc on. */ - -/* Clients should declare a member of grpc_metadata_buffer. This may at some - point become a typedef for a struct, but for now a pointer suffices */ -typedef struct grpc_metadata_buffer_impl grpc_metadata_buffer_impl; -typedef grpc_metadata_buffer_impl *grpc_metadata_buffer; - -/* Initializes the metadata buffer. Allocates no memory. */ -void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer); -/* Destroy the metadata buffer. */ -void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer, - grpc_op_error error); -/* Append a call to the end of a metadata buffer: may allocate memory */ -void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer, grpc_call_op *op); -/* Flush all queued operations from the metadata buffer to the element below - self */ -void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer, - grpc_call_element *self); -/* Count the number of queued elements in the buffer. */ -size_t grpc_metadata_buffer_count(const grpc_metadata_buffer *buffer); -/* Extract elements as a grpc_metadata*, for presentation to applications. - The returned buffer must be freed with - grpc_metadata_buffer_cleanup_elements. - Clears the metadata buffer (this is a one-shot operation) */ -grpc_metadata *grpc_metadata_buffer_extract_elements( - grpc_metadata_buffer *buffer); -void grpc_metadata_buffer_cleanup_elements(void *elements, grpc_op_error error); - -#endif /* GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H */ diff --git a/src/core/security/auth.c b/src/core/security/auth.c index 5fc6d2717f..14e06dd310 100644 --- a/src/core/security/auth.c +++ b/src/core/security/auth.c @@ -44,12 +44,15 @@ #include "src/core/security/credentials.h" #include "src/core/surface/call.h" +#define MAX_CREDENTIAL_METADATA_COUNT 4 + /* We can have a per-call credentials. */ typedef struct { grpc_credentials *creds; grpc_mdstr *host; grpc_mdstr *method; grpc_call_op op; + grpc_linked_mdelem md_links[MAX_CREDENTIAL_METADATA_COUNT]; } call_data; /* We can have a per-channel credentials. */ @@ -62,30 +65,8 @@ typedef struct { grpc_mdstr *status_key; } channel_data; -static void do_nothing(void *ignored, grpc_op_error error) {} - static void bubbleup_error(grpc_call_element *elem, const char *error_msg) { - grpc_call_op finish_op; - channel_data *channeld = elem->channel_data; - char status[GPR_LTOA_MIN_BUFSIZE]; - - gpr_log(GPR_ERROR, "%s", error_msg); - finish_op.type = GRPC_RECV_METADATA; - finish_op.dir = GRPC_CALL_UP; - finish_op.flags = 0; - finish_op.data.metadata = grpc_mdelem_from_metadata_strings( - channeld->md_ctx, grpc_mdstr_ref(channeld->error_msg_key), - grpc_mdstr_from_string(channeld->md_ctx, error_msg)); - finish_op.done_cb = do_nothing; - finish_op.user_data = NULL; - grpc_call_next_op(elem, &finish_op); - - gpr_ltoa(GRPC_STATUS_UNAUTHENTICATED, status); - finish_op.data.metadata = grpc_mdelem_from_metadata_strings( - channeld->md_ctx, grpc_mdstr_ref(channeld->status_key), - grpc_mdstr_from_string(channeld->md_ctx, status)); - grpc_call_next_op(elem, &finish_op); - + grpc_call_element_recv_status(elem, GRPC_STATUS_UNAUTHENTICATED, error_msg); grpc_call_element_send_cancel(elem); } @@ -93,11 +74,14 @@ static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems, size_t num_md, grpc_credentials_status status) { grpc_call_element *elem = (grpc_call_element *)user_data; + call_data *calld = elem->call_data; + grpc_call_op op = calld->op; size_t i; + GPR_ASSERT(num_md <= MAX_CREDENTIAL_METADATA_COUNT); for (i = 0; i < num_md; i++) { - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(md_elems[i])); + grpc_call_op_metadata_add_tail(&op.data.metadata, &calld->md_links[i], grpc_mdelem_ref(md_elems[i])); } - grpc_call_next_op(elem, &((call_data *)elem->call_data)->op); + grpc_call_next_op(elem, &op); } static char *build_service_url(const char *url_scheme, call_data *calld) { @@ -174,16 +158,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; + grpc_linked_mdelem *l; switch (op->type) { case GRPC_SEND_METADATA: - /* Pointer comparison is OK for md_elems created from the same context. */ - if (op->data.metadata->key == channeld->authority_string) { - if (calld->host != NULL) grpc_mdstr_unref(calld->host); - calld->host = grpc_mdstr_ref(op->data.metadata->value); - } else if (op->data.metadata->key == channeld->path_string) { - if (calld->method != NULL) grpc_mdstr_unref(calld->method); - calld->method = grpc_mdstr_ref(op->data.metadata->value); + for (l = op->data.metadata.list.head; l; l = l->next) { + grpc_mdelem *md = l->md; + /* Pointer comparison is OK for md_elems created from the same context. */ + if (md->key == channeld->authority_string) { + if (calld->host != NULL) grpc_mdstr_unref(calld->host); + calld->host = grpc_mdstr_ref(md->value); + } else if (md->key == channeld->path_string) { + if (calld->method != NULL) grpc_mdstr_unref(calld->method); + calld->method = grpc_mdstr_ref(md->value); + } } grpc_call_next_op(elem, op); break; diff --git a/src/core/surface/call.c b/src/core/surface/call.c index dba63058b8..9c0c65e21c 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -33,7 +33,6 @@ #include "src/core/surface/call.h" #include "src/core/channel/channel_stack.h" -#include "src/core/channel/metadata_buffer.h" #include "src/core/iomgr/alarm.h" #include "src/core/support/string.h" #include "src/core/surface/byte_buffer_queue.h" @@ -41,6 +40,7 @@ #include "src/core/surface/completion_queue.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <assert.h> #include <stdio.h> #include <stdlib.h> @@ -68,8 +68,10 @@ typedef struct { } completed_request; /* See request_set in grpc_call below for a description */ -#define REQSET_EMPTY 255 -#define REQSET_DONE 254 +#define REQSET_EMPTY 'X' +#define REQSET_DONE 'Y' + +#define MAX_SEND_INITIAL_METADATA_COUNT 3 typedef struct { /* Overall status of the operation: starts OK, may degrade to @@ -204,6 +206,12 @@ struct grpc_call { /* Call refcount - to keep the call alive during asynchronous operations */ gpr_refcount internal_refcount; + grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT]; + grpc_linked_mdelem status_link; + grpc_linked_mdelem details_link; + size_t send_initial_metadata_count; + gpr_timespec send_deadline; + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; @@ -226,9 +234,11 @@ struct grpc_call { static void do_nothing(void *ignored, grpc_op_error also_ignored) {} static send_action choose_send_action(grpc_call *call); static void enact_send_action(grpc_call *call, send_action sa); +static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, - const void *server_transport_data) { + const void *server_transport_data, grpc_mdelem **add_initial_metadata, + size_t add_initial_metadata_count, gpr_timespec send_deadline) { size_t i; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_call *call = @@ -245,6 +255,12 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE; call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE; } + GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT); + for (i = 0; i < add_initial_metadata_count; i++) { + call->send_initial_metadata[i].md = add_initial_metadata[i]; + } + call->send_initial_metadata_count = add_initial_metadata_count; + call->send_deadline = send_deadline; grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); /* one ref is dropped in response to destroy, the other in @@ -252,6 +268,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, gpr_ref_init(&call->internal_refcount, 2); grpc_call_stack_init(channel_stack, server_transport_data, CALL_STACK_FROM_CALL(call)); + if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) { + set_deadline_alarm(call, send_deadline); + } return call; } @@ -587,15 +606,28 @@ static send_action choose_send_action(grpc_call *call) { return SEND_NOTHING; } -static void send_metadata(grpc_call *call, grpc_mdelem *elem) { - grpc_call_op op; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = GRPC_WRITE_BUFFER_HINT; - op.data.metadata = elem; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); +static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, grpc_metadata *metadata) { + size_t i; + grpc_mdelem_list out; + if (count == 0) { + out.head = out.tail = NULL; + return out; + } + for (i = 0; i < count; i++) { + grpc_metadata *md = &metadata[i]; + grpc_metadata *next_md = (i == count-1) ? NULL : &metadata[i+1]; + grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i-1]; + grpc_linked_mdelem *l = (grpc_linked_mdelem*)&md->internal_data; + assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); + l->md = grpc_mdelem_from_string_and_buffer( + call->metadata_context, md->key, + (const gpr_uint8 *)md->value, md->value_length); + l->next = next_md ? (grpc_linked_mdelem*)&next_md->internal_data : NULL; + l->prev = prev_md ? (grpc_linked_mdelem*)&prev_md->internal_data : NULL; + } + out.head = (grpc_linked_mdelem*)&(metadata[0].internal_data); + out.tail = (grpc_linked_mdelem*)&(metadata[count-1].internal_data); + return out; } static void enact_send_action(grpc_call *call, send_action sa) { @@ -614,13 +646,18 @@ static void enact_send_action(grpc_call *call, send_action sa) { /* fallthrough */ case SEND_INITIAL_METADATA: data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata(call, - grpc_mdelem_from_string_and_buffer( - call->metadata_context, md->key, - (const gpr_uint8 *)md->value, md->value_length)); + op.type = GRPC_SEND_METADATA; + op.dir = GRPC_CALL_DOWN; + op.flags = flags; + op.data.metadata.list = chain_metadata_from_app(call, data.send_metadata.count, data.send_metadata.metadata); + op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; + op.data.metadata.deadline = call->send_deadline; + for (i = 0; i < call->send_initial_metadata_count; i++) { + grpc_call_op_metadata_link_head(&op.data.metadata, &call->send_initial_metadata[i]); } + op.done_cb = do_nothing; + op.user_data = NULL; + grpc_call_execute_op(call, &op); op.type = GRPC_SEND_START; op.dir = GRPC_CALL_DOWN; op.flags = flags; @@ -645,32 +682,32 @@ static void enact_send_action(grpc_call *call, send_action sa) { case SEND_TRAILING_METADATA_AND_FINISH: /* send trailing metadata */ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata(call, - grpc_mdelem_from_string_and_buffer( - call->metadata_context, md->key, - (const gpr_uint8 *)md->value, md->value_length)); - } + op.type = GRPC_SEND_METADATA; + op.dir = GRPC_CALL_DOWN; + op.flags = flags; + op.data.metadata.list = chain_metadata_from_app(call, data.send_metadata.count, data.send_metadata.metadata); + op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; + op.data.metadata.deadline = call->send_deadline; /* send status */ /* TODO(ctiller): cache common status values */ data = call->request_data[GRPC_IOREQ_SEND_STATUS]; gpr_ltoa(data.send_status.code, status_str); - send_metadata( - call, - grpc_mdelem_from_metadata_strings( + grpc_call_op_metadata_add_tail(&op.data.metadata, &call->status_link, + grpc_mdelem_from_metadata_strings( call->metadata_context, grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, status_str))); if (data.send_status.details) { - send_metadata( - call, + grpc_call_op_metadata_add_tail(&op.data.metadata, &call->details_link, grpc_mdelem_from_metadata_strings( call->metadata_context, grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, data.send_status.details))); } + op.done_cb = do_nothing; + op.user_data = NULL; + grpc_call_execute_op(call, &op); /* fallthrough: see choose_send_action for details */ case SEND_FINISH: op.type = GRPC_SEND_FINISH; @@ -875,9 +912,7 @@ static void call_alarm(void *arg, int success) { grpc_call_internal_unref(call, 1); } -void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - +static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { if (call->have_alarm) { gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); } @@ -886,11 +921,15 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); } -static void set_read_state(grpc_call *call, read_state state) { - lock(call); +static void set_read_state_locked(grpc_call *call, read_state state) { GPR_ASSERT(call->read_state < state); call->read_state = state; finish_read_ops(call); +} + +static void set_read_state(grpc_call *call, read_state state) { + lock(call); + set_read_state_locked(call, state); unlock(call); } @@ -936,52 +975,60 @@ void grpc_call_recv_message(grpc_call_element *elem, unlock(call); } -void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { +int grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op_metadata *md) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); - grpc_mdstr *key = md->key; + grpc_linked_mdelem *l; grpc_metadata_array *dest; grpc_metadata *mdusr; + int is_trailing; lock(call); - if (key == grpc_channel_get_status_string(call->channel)) { - set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); - grpc_mdelem_unref(md); - } else if (key == grpc_channel_get_message_string(call->channel)) { - set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); - grpc_mdelem_unref(md); - } else { - dest = &call->buffered_metadata[call->read_state >= - READ_STATE_GOT_INITIAL_METADATA]; - if (dest->count == dest->capacity) { - dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); - dest->metadata = - gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); - } - mdusr = &dest->metadata[dest->count++]; - mdusr->key = grpc_mdstr_as_c_string(md->key); - mdusr->value = grpc_mdstr_as_c_string(md->value); - mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); - if (call->owned_metadata_count == call->owned_metadata_capacity) { - call->owned_metadata_capacity = GPR_MAX( - call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2); - call->owned_metadata = - gpr_realloc(call->owned_metadata, - sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; + for (l = md->list.head; l; l = l->next) { + grpc_mdelem *md = l->md; + grpc_mdstr *key = md->key; + if (key == grpc_channel_get_status_string(call->channel)) { + set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); + grpc_mdelem_unref(md); + } else if (key == grpc_channel_get_message_string(call->channel)) { + set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); + grpc_mdelem_unref(md); + } else { + dest = &call->buffered_metadata[is_trailing]; + if (dest->count == dest->capacity) { + dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); + dest->metadata = + gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); + } + mdusr = &dest->metadata[dest->count++]; + mdusr->key = grpc_mdstr_as_c_string(md->key); + mdusr->value = grpc_mdstr_as_c_string(md->value); + mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); + if (call->owned_metadata_count == call->owned_metadata_capacity) { + call->owned_metadata_capacity = GPR_MAX( + call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2); + call->owned_metadata = + gpr_realloc(call->owned_metadata, + sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + } + call->owned_metadata[call->owned_metadata_count++] = md; } - call->owned_metadata[call->owned_metadata_count++] = md; + } + if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) { + set_deadline_alarm(call, md->deadline); + } + if (!is_trailing) { + set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA); } unlock(call); + + return !is_trailing; } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } -void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { - grpc_call *call = grpc_call_from_top_element(surface_element); - set_read_state(call, READ_STATE_GOT_INITIAL_METADATA); -} - /* * BATCH API IMPLEMENTATION */ diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 06434f87ac..25b0fd3cc0 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -35,7 +35,6 @@ #define GRPC_INTERNAL_CORE_SURFACE_CALL_H #include "src/core/channel/channel_stack.h" -#include "src/core/channel/metadata_buffer.h" #include <grpc/grpc.h> /* Primitive operation types - grpc_op's get rewritten into these */ @@ -67,7 +66,7 @@ typedef union { } recv_status_details; struct { size_t count; - const grpc_metadata *metadata; + grpc_metadata *metadata; } send_metadata; grpc_byte_buffer *send_message; struct { @@ -86,7 +85,8 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call, void *user_data); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, - const void *server_transport_data); + const void *server_transport_data, grpc_mdelem **add_initial_metadata, + size_t add_initial_metadata_count, gpr_timespec send_deadline); void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq); grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); @@ -96,8 +96,9 @@ void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); /* Helpers for grpc_client, grpc_server filters to publish received data to the completion queue/surface layer */ -void grpc_call_recv_metadata(grpc_call_element *surface_element, - grpc_mdelem *md); +/* receive metadata - returns 1 if this was initial metadata */ +int grpc_call_recv_metadata(grpc_call_element *surface_element, + grpc_call_op_metadata *md); void grpc_call_recv_message(grpc_call_element *surface_element, grpc_byte_buffer *message); void grpc_call_read_closed(grpc_call_element *surface_element); @@ -108,12 +109,6 @@ grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data); -/* Called when it's known that the initial batch of metadata is complete */ -void grpc_call_initial_metadata_complete(grpc_call_element *surface_element); - -void grpc_call_set_deadline(grpc_call_element *surface_element, - gpr_timespec deadline); - grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); /* Given the top call_element, get the call object. */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index d3962a00c4..29b042e7c1 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -62,7 +62,7 @@ struct grpc_channel { registered_call *registered_calls; }; -#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1)) +#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) #define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \ (((grpc_channel *)(channel_stack)) - 1) #define CHANNEL_FROM_TOP_ELEM(top_elem) \ @@ -91,44 +91,25 @@ grpc_channel *grpc_channel_create_from_filters( return channel; } -static void do_nothing(void *ignored, grpc_op_error error) {} - static grpc_call *grpc_channel_create_call_internal( grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem, grpc_mdelem *authority_mdelem, gpr_timespec deadline) { - grpc_call *call; - grpc_call_op op; + grpc_mdelem *send_metadata[2]; - if (!channel->is_client) { - gpr_log(GPR_ERROR, "Cannot create a call on the server."); - return NULL; - } + GPR_ASSERT(channel->is_client); - call = grpc_call_create(channel, cq, NULL); + send_metadata[0] = path_mdelem; + send_metadata[1] = authority_mdelem; - /* Add :path and :authority headers. */ - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.metadata = path_mdelem; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - - op.data.metadata = authority_mdelem; - grpc_call_execute_op(call, &op); - - if (0 != gpr_time_cmp(deadline, gpr_inf_future)) { - op.type = GRPC_SEND_DEADLINE; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.deadline = deadline; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - } + return grpc_call_create(channel, cq, NULL, send_metadata, + GPR_ARRAY_SIZE(send_metadata), deadline); +} - return call; +grpc_call *grpc_channel_create_call_old(grpc_channel *channel, + const char *method, const char *host, + gpr_timespec absolute_deadline) { + return grpc_channel_create_call(channel, NULL, method, host, + absolute_deadline); } grpc_call *grpc_channel_create_call(grpc_channel *channel, @@ -146,13 +127,6 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, deadline); } -grpc_call *grpc_channel_create_call_old(grpc_channel *channel, - const char *method, const char *host, - gpr_timespec absolute_deadline) { - return grpc_channel_create_call(channel, NULL, method, host, - absolute_deadline); -} - void *grpc_channel_register_call(grpc_channel *channel, const char *method, const char *host) { registered_call *rc = gpr_malloc(sizeof(registered_call)); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index 4d54865d16..71ca26e65d 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -52,15 +52,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { - case GRPC_SEND_DEADLINE: - grpc_call_set_deadline(elem, op->data.deadline); - grpc_call_next_op(elem, op); - break; case GRPC_RECV_METADATA: - grpc_call_recv_metadata(elem, op->data.metadata); - break; - case GRPC_RECV_DEADLINE: - gpr_log(GPR_ERROR, "Deadline received by client (ignored)"); + grpc_call_recv_metadata(elem, &op->data.metadata); break; case GRPC_RECV_MESSAGE: grpc_call_recv_message(elem, op->data.message); @@ -72,9 +65,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, case GRPC_RECV_FINISH: grpc_call_stream_closed(elem); break; - case GRPC_RECV_END_OF_INITIAL_METADATA: - grpc_call_initial_metadata_complete(elem); - break; default: GPR_ASSERT(op->dir == GRPC_CALL_DOWN); grpc_call_next_op(elem, op); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index b40c48381f..f2497e28e6 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -47,23 +47,20 @@ typedef struct { } call_data; typedef struct { - grpc_mdelem *status; - grpc_mdelem *message; + void *unused; } channel_data; static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_op *op) { - channel_data *channeld = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { case GRPC_SEND_START: - grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->status)); - grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message)); + grpc_call_element_recv_status(elem, GRPC_STATUS_UNKNOWN, "Rpc sent on a lame channel."); grpc_call_stream_closed(elem); break; case GRPC_SEND_METADATA: - grpc_mdelem_unref(op->data.metadata); + abort(); break; default: break; @@ -94,23 +91,11 @@ static void destroy_call_elem(grpc_call_element *elem) {} static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { - channel_data *channeld = elem->channel_data; - char status[12]; - GPR_ASSERT(is_first); GPR_ASSERT(is_last); - - channeld->message = grpc_mdelem_from_strings(mdctx, "grpc-message", - "Rpc sent on a lame channel."); - gpr_ltoa(GRPC_STATUS_UNKNOWN, status); - channeld->status = grpc_mdelem_from_strings(mdctx, "grpc-status", status); } static void destroy_channel_elem(grpc_channel_element *elem) { - channel_data *channeld = elem->channel_data; - - grpc_mdelem_unref(channeld->message); - grpc_mdelem_unref(channeld->status); } static const grpc_channel_filter lame_filter = { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 17cba9a505..e0d7950dea 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -411,29 +411,32 @@ static void read_closed(grpc_call_element *elem) { gpr_mu_unlock(&chand->server->mu); } +static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (md->key == chand->path_key) { + calld->path = grpc_mdstr_ref(md->value); + return NULL; + } else if (md->key == chand->authority_key) { + calld->host = grpc_mdstr_ref(md->value); + return NULL; + } + return md; +} + static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, grpc_call_op *op) { - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - grpc_mdelem *md; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { case GRPC_RECV_METADATA: - md = op->data.metadata; - if (md->key == chand->path_key) { - calld->path = grpc_mdstr_ref(md->value); - grpc_mdelem_unref(md); - } else if (md->key == chand->authority_key) { - calld->host = grpc_mdstr_ref(md->value); - grpc_mdelem_unref(md); - } else { - grpc_call_recv_metadata(elem, md); + grpc_call_op_metadata_filter(&op->data.metadata, server_filter, elem); + if (grpc_call_recv_metadata(elem, &op->data.metadata)) { + calld->deadline = op->data.metadata.deadline; + start_new_rpc(elem); } break; - case GRPC_RECV_END_OF_INITIAL_METADATA: - start_new_rpc(elem); - grpc_call_initial_metadata_complete(elem); - break; case GRPC_RECV_MESSAGE: grpc_call_recv_message(elem, op->data.message); op->done_cb(op->user_data, GRPC_OP_OK); @@ -444,10 +447,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, case GRPC_RECV_FINISH: stream_closed(elem); break; - case GRPC_RECV_DEADLINE: - grpc_call_set_deadline(elem, op->data.deadline); - ((call_data *)elem->call_data)->deadline = op->data.deadline; - break; default: GPR_ASSERT(op->dir == GRPC_CALL_DOWN); grpc_call_next_op(elem, op); @@ -464,7 +463,7 @@ static void channel_op(grpc_channel_element *elem, case GRPC_ACCEPT_CALL: /* create a call */ grpc_call_create(chand->channel, NULL, - op->data.accept_call.transport_server_data); + op->data.accept_call.transport_server_data, NULL, 0, gpr_inf_future); break; case GRPC_TRANSPORT_CLOSED: /* if the transport is closed for a server channel, we destroy the |