diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/call_op_string.c | 51 | ||||
-rw-r--r-- | src/core/channel/census_filter.c | 24 | ||||
-rw-r--r-- | src/core/channel/channel_stack.c | 57 | ||||
-rw-r--r-- | src/core/channel/channel_stack.h | 31 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 99 | ||||
-rw-r--r-- | src/core/channel/connected_channel.c | 77 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 77 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 240 | ||||
-rw-r--r-- | src/core/channel/metadata_buffer.c | 149 | ||||
-rw-r--r-- | src/core/channel/metadata_buffer.h | 70 | ||||
-rw-r--r-- | src/core/security/auth.c | 61 | ||||
-rw-r--r-- | src/core/surface/call.c | 225 | ||||
-rw-r--r-- | src/core/surface/call.h | 25 | ||||
-rw-r--r-- | src/core/surface/channel.c | 52 | ||||
-rw-r--r-- | src/core/surface/client.c | 28 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 42 | ||||
-rw-r--r-- | src/core/surface/server.c | 40 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_encoder.c | 42 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 106 | ||||
-rw-r--r-- | src/core/transport/metadata.c | 2 | ||||
-rw-r--r-- | src/core/transport/stream_op.c | 198 | ||||
-rw-r--r-- | src/core/transport/stream_op.h | 54 |
22 files changed, 834 insertions, 916 deletions
diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c index 08f2e95deb..5f7e1be268 100644 --- a/src/core/channel/call_op_string.c +++ b/src/core/channel/call_op_string.c @@ -43,12 +43,27 @@ static void put_metadata(gpr_strvec *b, grpc_mdelem *md) { gpr_strvec_add(b, gpr_strdup(" key=")); - gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice), - GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT)); + gpr_strvec_add( + b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice), + GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT)); gpr_strvec_add(b, gpr_strdup(" value=")); gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice), - GPR_SLICE_LENGTH(md->value->slice), GPR_HEXDUMP_PLAINTEXT)); + GPR_SLICE_LENGTH(md->value->slice), + GPR_HEXDUMP_PLAINTEXT)); +} + +static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { + grpc_linked_mdelem *m; + for (m = md.list.head; m != NULL; m = m->next) { + put_metadata(b, m->md); + } + if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) { + char *tmp; + gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec, + md.deadline.tv_nsec); + gpr_strvec_add(b, tmp); + } } char *grpc_call_op_string(grpc_call_op *op) { @@ -69,16 +84,7 @@ char *grpc_call_op_string(grpc_call_op *op) { switch (op->type) { case GRPC_SEND_METADATA: gpr_strvec_add(&b, gpr_strdup("SEND_METADATA")); - put_metadata(&b, op->data.metadata); - break; - case GRPC_SEND_DEADLINE: - gpr_asprintf(&tmp, "SEND_DEADLINE %d.%09d", op->data.deadline.tv_sec, - op->data.deadline.tv_nsec); - gpr_strvec_add(&b, tmp); - break; - case GRPC_SEND_START: - gpr_asprintf(&tmp, "SEND_START pollset=%p", op->data.start.pollset); - gpr_strvec_add(&b, tmp); + put_metadata_list(&b, op->data.metadata); break; case GRPC_SEND_MESSAGE: gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE")); @@ -94,15 +100,7 @@ char *grpc_call_op_string(grpc_call_op *op) { break; case GRPC_RECV_METADATA: gpr_strvec_add(&b, gpr_strdup("RECV_METADATA")); - put_metadata(&b, op->data.metadata); - break; - case GRPC_RECV_DEADLINE: - gpr_asprintf(&tmp, "RECV_DEADLINE %d.%09d", op->data.deadline.tv_sec, - op->data.deadline.tv_nsec); - gpr_strvec_add(&b, tmp); - break; - case GRPC_RECV_END_OF_INITIAL_METADATA: - gpr_strvec_add(&b, gpr_strdup("RECV_END_OF_INITIAL_METADATA")); + put_metadata_list(&b, op->data.metadata); break; case GRPC_RECV_MESSAGE: gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE")); @@ -113,12 +111,21 @@ char *grpc_call_op_string(grpc_call_op *op) { case GRPC_RECV_FINISH: gpr_strvec_add(&b, gpr_strdup("RECV_FINISH")); break; + case GRPC_RECV_SYNTHETIC_STATUS: + gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'", + op->data.synthetic_status.status, + op->data.synthetic_status.message); + gpr_strvec_add(&b, tmp); + break; case GRPC_CANCEL_OP: gpr_strvec_add(&b, gpr_strdup("CANCEL_OP")); break; } gpr_asprintf(&tmp, " flags=0x%08x", op->flags); gpr_strvec_add(&b, tmp); + if (op->bind_pollset) { + gpr_strvec_add(&b, gpr_strdup("bind_pollset")); + } out = gpr_strvec_flatten(&b, NULL); gpr_strvec_destroy(&b); diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c index ba7b7ba59c..9c0c20af22 100644 --- a/src/core/channel/census_filter.c +++ b/src/core/channel/census_filter.c @@ -62,11 +62,13 @@ static void init_rpc_stats(census_rpc_stats* stats) { static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld, channel_data* chand) { - if (op->data.metadata->key == chand->path_str) { - gpr_log(GPR_DEBUG, - (const char*)GPR_SLICE_START_PTR(op->data.metadata->value->slice)); - census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR( - op->data.metadata->value->slice)); + grpc_linked_mdelem* m; + for (m = op->data.metadata.list.head; m != NULL; m = m->next) { + if (m->md->key == chand->path_str) { + gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); + census_add_method_tag( + calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); + } } } @@ -178,11 +180,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) { } const grpc_channel_filter grpc_client_census_filter = { - client_call_op, channel_op, sizeof(call_data), - client_init_call_elem, client_destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "census-client"}; + client_call_op, channel_op, sizeof(call_data), client_init_call_elem, + client_destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "census-client"}; const grpc_channel_filter grpc_server_census_filter = { - server_call_op, channel_op, sizeof(call_data), - server_init_call_elem, server_destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "census-server"}; + server_call_op, channel_op, sizeof(call_data), server_init_call_elem, + server_destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "census-server"}; diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 21df9771ce..3a3a3a75b7 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -77,9 +77,9 @@ size_t grpc_channel_stack_size(const grpc_channel_filter **filters, return size; } -#define CHANNEL_ELEMS_FROM_STACK(stk) \ - ((grpc_channel_element *)( \ - (char *)(stk) + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack)))) +#define CHANNEL_ELEMS_FROM_STACK(stk) \ + ((grpc_channel_element *)((char *)(stk) + ROUND_UP_TO_ALIGNMENT_SIZE( \ + sizeof(grpc_channel_stack)))) #define CALL_ELEMS_FROM_STACK(stk) \ ((grpc_call_element *)((char *)(stk) + \ @@ -183,6 +183,9 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) { void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) { grpc_call_element *next_elem = elem + op->dir; + if (op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA) { + grpc_metadata_batch_assert_ok(&op->data.metadata); + } next_elem->filter->call_op(next_elem, elem, op); } @@ -193,42 +196,17 @@ void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) { grpc_channel_stack *grpc_channel_stack_from_top_element( grpc_channel_element *elem) { - return (grpc_channel_stack *)((char *)(elem) - - ROUND_UP_TO_ALIGNMENT_SIZE( - sizeof(grpc_channel_stack))); + return (grpc_channel_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE( + sizeof(grpc_channel_stack))); } grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { - return (grpc_call_stack *)((char *)(elem) - ROUND_UP_TO_ALIGNMENT_SIZE( - sizeof(grpc_call_stack))); + return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE( + sizeof(grpc_call_stack))); } static void do_nothing(void *user_data, grpc_op_error error) {} -void grpc_call_element_recv_metadata(grpc_call_element *cur_elem, - grpc_mdelem *mdelem) { - grpc_call_op metadata_op; - metadata_op.type = GRPC_RECV_METADATA; - metadata_op.dir = GRPC_CALL_UP; - metadata_op.done_cb = do_nothing; - metadata_op.user_data = NULL; - metadata_op.flags = 0; - metadata_op.data.metadata = mdelem; - grpc_call_next_op(cur_elem, &metadata_op); -} - -void grpc_call_element_send_metadata(grpc_call_element *cur_elem, - grpc_mdelem *mdelem) { - grpc_call_op metadata_op; - metadata_op.type = GRPC_SEND_METADATA; - metadata_op.dir = GRPC_CALL_DOWN; - metadata_op.done_cb = do_nothing; - metadata_op.user_data = NULL; - metadata_op.flags = 0; - metadata_op.data.metadata = mdelem; - grpc_call_next_op(cur_elem, &metadata_op); -} - void grpc_call_element_send_cancel(grpc_call_element *cur_elem) { grpc_call_op cancel_op; cancel_op.type = GRPC_CANCEL_OP; @@ -236,6 +214,7 @@ void grpc_call_element_send_cancel(grpc_call_element *cur_elem) { cancel_op.done_cb = do_nothing; cancel_op.user_data = NULL; cancel_op.flags = 0; + cancel_op.bind_pollset = NULL; grpc_call_next_op(cur_elem, &cancel_op); } @@ -246,5 +225,19 @@ void grpc_call_element_send_finish(grpc_call_element *cur_elem) { finish_op.done_cb = do_nothing; finish_op.user_data = NULL; finish_op.flags = 0; + finish_op.bind_pollset = NULL; grpc_call_next_op(cur_elem, &finish_op); } + +void grpc_call_element_recv_status(grpc_call_element *cur_elem, + grpc_status_code status, + const char *message) { + grpc_call_op op; + op.type = GRPC_RECV_SYNTHETIC_STATUS; + op.dir = GRPC_CALL_UP; + op.done_cb = do_nothing; + op.user_data = NULL; + op.data.synthetic_status.status = status; + op.data.synthetic_status.message = message; + grpc_call_next_op(cur_elem, &op); +} diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index ef1da7b33b..addc92b272 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -62,10 +62,6 @@ typedef struct grpc_call_element grpc_call_element; typedef enum { /* send metadata to the channels peer */ GRPC_SEND_METADATA, - /* send a deadline */ - GRPC_SEND_DEADLINE, - /* start a connection (corresponds to start_invoke/accept) */ - GRPC_SEND_START, /* send a message to the channels peer */ GRPC_SEND_MESSAGE, /* send a pre-formatted message to the channels peer */ @@ -76,16 +72,14 @@ typedef enum { GRPC_REQUEST_DATA, /* metadata was received from the channels peer */ GRPC_RECV_METADATA, - /* receive a deadline */ - GRPC_RECV_DEADLINE, - /* the end of the first batch of metadata was received */ - GRPC_RECV_END_OF_INITIAL_METADATA, /* a message was received from the channels peer */ GRPC_RECV_MESSAGE, /* half-close was received from the channels peer */ GRPC_RECV_HALF_CLOSE, /* full close was received from the channels peer */ GRPC_RECV_FINISH, + /* a status has been sythesized locally */ + GRPC_RECV_SYNTHETIC_STATUS, /* the call has been abnormally terminated */ GRPC_CANCEL_OP } grpc_call_op_type; @@ -109,14 +103,16 @@ typedef struct { /* Argument data, matching up with grpc_call_op_type names */ union { - struct { - grpc_pollset *pollset; - } start; grpc_byte_buffer *message; - grpc_mdelem *metadata; - gpr_timespec deadline; + grpc_metadata_batch metadata; + struct { + grpc_status_code status; + const char *message; + } synthetic_status; } data; + grpc_pollset *bind_pollset; + /* Must be called when processing of this call-op is complete. Signature chosen to match transport flow control callbacks */ void (*done_cb)(void *user_data, grpc_op_error error); @@ -291,16 +287,15 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem); void grpc_call_log_op(char *file, int line, gpr_log_severity severity, grpc_call_element *elem, grpc_call_op *op); -void grpc_call_element_send_metadata(grpc_call_element *cur_elem, - grpc_mdelem *elem); -void grpc_call_element_recv_metadata(grpc_call_element *cur_elem, - grpc_mdelem *elem); void grpc_call_element_send_cancel(grpc_call_element *cur_elem); void grpc_call_element_send_finish(grpc_call_element *cur_elem); +void grpc_call_element_recv_status(grpc_call_element *cur_elem, + grpc_status_code status, + const char *message); extern int grpc_trace_channel; #define GRPC_CALL_LOG_OP(sev, elem, op) \ if (grpc_trace_channel) grpc_call_log_op(sev, elem, op) -#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_STACK_H */ +#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_STACK_H */ diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 9791f98be8..bc481e59ca 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -38,7 +38,6 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/child_channel.h" #include "src/core/channel/connected_channel.h" -#include "src/core/channel/metadata_buffer.h" #include "src/core/iomgr/iomgr.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> @@ -70,9 +69,6 @@ typedef struct { int transport_setup_initiated; grpc_channel_args *args; - - /* metadata cache */ - grpc_mdelem *cancel_status; } channel_data; typedef enum { @@ -86,20 +82,16 @@ struct call_data { /* owning element */ grpc_call_element *elem; + gpr_uint8 got_first_send; + call_state state; - grpc_metadata_buffer pending_metadata; gpr_timespec deadline; union { struct { /* our child call stack */ grpc_child_call *child_call; } active; - struct { - void (*on_complete)(void *user_data, grpc_op_error error); - void *on_complete_user_data; - gpr_uint32 start_flags; - grpc_pollset *pollset; - } waiting; + grpc_call_op waiting_op; } s; }; @@ -127,20 +119,6 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) { GPR_ASSERT(calld->state == CALL_ACTIVE); - /* sending buffered metadata down the stack before the start call */ - grpc_metadata_buffer_flush(&calld->pending_metadata, child_elem); - - if (gpr_time_cmp(calld->deadline, gpr_inf_future) != 0) { - grpc_call_op dop; - dop.type = GRPC_SEND_DEADLINE; - dop.dir = GRPC_CALL_DOWN; - dop.flags = 0; - dop.data.deadline = calld->deadline; - dop.done_cb = do_nothing; - dop.user_data = NULL; - child_elem->filter->call_op(child_elem, elem, &dop); - } - /* continue the start call down the stack, this nees to happen after metadata are flushed*/ child_elem->filter->call_op(child_elem, elem, op); @@ -152,6 +130,7 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) { gpr_mu_lock(&chand->mu); if (calld->state == CALL_CANCELLED) { gpr_mu_unlock(&chand->mu); + grpc_metadata_batch_destroy(&op->data.metadata); op->done_cb(op->user_data, GRPC_OP_ERROR); return; } @@ -184,10 +163,7 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) { gpr_realloc(chand->waiting_children, chand->waiting_child_capacity * sizeof(call_data *)); } - calld->s.waiting.on_complete = op->done_cb; - calld->s.waiting.on_complete_user_data = op->user_data; - calld->s.waiting.start_flags = op->flags; - calld->s.waiting.pollset = op->data.start.pollset; + calld->s.waiting_op = *op; chand->waiting_children[chand->waiting_child_count++] = calld; gpr_mu_unlock(&chand->mu); @@ -212,15 +188,8 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { static void send_up_cancelled_ops(grpc_call_element *elem) { grpc_call_op finish_op; - channel_data *chand = elem->channel_data; /* send up a synthesized status */ - finish_op.type = GRPC_RECV_METADATA; - finish_op.dir = GRPC_CALL_UP; - finish_op.flags = 0; - finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status); - finish_op.done_cb = do_nothing; - finish_op.user_data = NULL; - grpc_call_next_op(elem, &finish_op); + grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled"); /* send up a finish */ finish_op.type = GRPC_RECV_FINISH; finish_op.dir = GRPC_CALL_UP; @@ -243,12 +212,12 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { child_elem->filter->call_op(child_elem, elem, op); return; /* early out */ case CALL_WAITING: + grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata); remove_waiting_child(chand, calld); calld->state = CALL_CANCELLED; gpr_mu_unlock(&chand->mu); send_up_cancelled_ops(elem); - calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data, - GRPC_OP_ERROR); + calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR); return; /* early out */ case CALL_CREATED: calld->state = CALL_CANCELLED; @@ -271,15 +240,13 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, switch (op->type) { case GRPC_SEND_METADATA: - grpc_metadata_buffer_queue(&calld->pending_metadata, op); - break; - case GRPC_SEND_DEADLINE: - calld->deadline = op->data.deadline; - op->done_cb(op->user_data, GRPC_OP_OK); - break; - case GRPC_SEND_START: - /* filter out the start event to find which child to send on */ - start_rpc(elem, op); + if (!calld->got_first_send) { + /* filter out the start event to find which child to send on */ + calld->got_first_send = 1; + start_rpc(elem, op); + } else { + grpc_call_next_op(elem, op); + } break; case GRPC_CANCEL_OP: cancel_rpc(elem, op); @@ -382,12 +349,6 @@ static void channel_op(grpc_channel_element *elem, } } -static void error_bad_on_complete(void *arg, grpc_op_error error) { - gpr_log(GPR_ERROR, - "Waiting finished but not started? Bad on_complete callback"); - abort(); -} - /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data) { @@ -398,23 +359,22 @@ static void init_call_elem(grpc_call_element *elem, calld->elem = elem; calld->state = CALL_CREATED; calld->deadline = gpr_inf_future; - calld->s.waiting.on_complete = error_bad_on_complete; - calld->s.waiting.on_complete_user_data = NULL; - grpc_metadata_buffer_init(&calld->pending_metadata); + calld->got_first_send = 0; } /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element *elem) { call_data *calld = elem->call_data; - /* if the metadata buffer is not flushed, destroy it here. */ - grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK); /* if the call got activated, we need to destroy the child stack also, and remove it from the in-flight requests tracked by the child_entry we picked */ if (calld->state == CALL_ACTIVE) { grpc_child_call_destroy(calld->s.active.child_call); } + if (calld->state == CALL_WAITING) { + grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata); + } } /* Constructor for channel_data */ @@ -423,7 +383,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_mdctx *metadata_context, int is_first, int is_last) { channel_data *chand = elem->channel_data; - char temp[GPR_LTOA_MIN_BUFSIZE]; GPR_ASSERT(!is_first); GPR_ASSERT(is_last); @@ -437,10 +396,6 @@ static void init_channel_elem(grpc_channel_element *elem, chand->transport_setup = NULL; chand->transport_setup_initiated = 0; chand->args = grpc_channel_args_copy(args); - - gpr_ltoa(GRPC_STATUS_CANCELLED, temp); - chand->cancel_status = - grpc_mdelem_from_strings(metadata_context, "grpc-status", temp); } /* Destructor for channel_data */ @@ -455,7 +410,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } grpc_channel_args_destroy(chand->args); - grpc_mdelem_unref(chand->cancel_status); gpr_mu_destroy(&chand->mu); GPR_ASSERT(chand->waiting_child_count == 0); @@ -463,9 +417,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_client_channel_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "client-channel", }; + call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + "client-channel", +}; grpc_transport_setup_result grpc_client_channel_transport_setup_complete( grpc_channel_stack *channel_stack, grpc_transport *transport, @@ -520,13 +475,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count); for (i = 0; i < waiting_child_count; i++) { - call_ops[i].type = GRPC_SEND_START; - call_ops[i].dir = GRPC_CALL_DOWN; - call_ops[i].flags = waiting_children[i]->s.waiting.start_flags; - call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete; - call_ops[i].user_data = - waiting_children[i]->s.waiting.on_complete_user_data; - call_ops[i].data.start.pollset = waiting_children[i]->s.waiting.pollset; + call_ops[i] = waiting_children[i]->s.waiting_op; if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) { waiting_children[i] = NULL; call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR); diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 62611e08f3..711274bfe1 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -60,7 +60,6 @@ typedef struct connected_channel_call_data { gpr_uint32 max_message_length; gpr_uint32 incoming_message_length; gpr_uint8 reading_message; - gpr_uint8 got_metadata_boundary; gpr_uint8 got_read_close; gpr_slice_buffer incoming_message; gpr_uint32 outgoing_buffer_length_estimate; @@ -120,27 +119,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + if (op->bind_pollset) { + grpc_transport_add_to_pollset(chand->transport, op->bind_pollset); + } + switch (op->type) { case GRPC_SEND_METADATA: grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata); - grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, - op->user_data); - break; - case GRPC_SEND_DEADLINE: - grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.deadline); - grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, - op->user_data); - break; - case GRPC_SEND_START: - grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset); - grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb); end_bufferable_op(op, chand, calld, 0); break; case GRPC_SEND_MESSAGE: grpc_sopb_add_begin_message(&calld->outgoing_sopb, grpc_byte_buffer_length(op->data.message), op->flags); - /* fall-through */ + /* fall-through */ case GRPC_SEND_PREFORMATTED_MESSAGE: copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb); calld->outgoing_buffer_length_estimate += @@ -200,7 +192,6 @@ static void init_call_elem(grpc_call_element *elem, grpc_sopb_init(&calld->outgoing_sopb); calld->reading_message = 0; - calld->got_metadata_boundary = 0; calld->got_read_close = 0; calld->outgoing_buffer_length_estimate = 0; calld->max_message_length = chand->max_message_length; @@ -259,9 +250,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_connected_channel_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "connected", }; + call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected", +}; static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport, grpc_stream *stream, size_t size_hint) { @@ -307,8 +298,8 @@ static void finish_message(channel_data *chand, call_data *calld) { call_op.type = GRPC_RECV_MESSAGE; call_op.done_cb = do_nothing; /* TODO(ctiller): this could be a lot faster if coded directly */ - call_op.data.message = grpc_byte_buffer_create( - calld->incoming_message.slices, calld->incoming_message.count); + call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices, + calld->incoming_message.count); gpr_slice_buffer_reset_and_unref(&calld->incoming_message); /* disable window updates until we get a request more from above */ @@ -320,6 +311,19 @@ static void finish_message(channel_data *chand, call_data *calld) { grpc_call_next_op(elem, &call_op); } +static void got_metadata(grpc_call_element *elem, + grpc_metadata_batch metadata) { + grpc_call_op op; + op.type = GRPC_RECV_METADATA; + op.dir = GRPC_CALL_UP; + op.flags = 0; + op.data.metadata = metadata; + op.done_cb = do_nothing; + op.user_data = NULL; + + grpc_call_next_op(elem, &op); +} + /* Handle incoming stream ops from the transport, translating them into call_ops to pass up the call stack */ static void recv_batch(void *user_data, grpc_transport *transport, @@ -339,40 +343,12 @@ static void recv_batch(void *user_data, grpc_transport *transport, stream_op = ops + i; switch (stream_op->type) { case GRPC_OP_FLOW_CTL_CB: - gpr_log(GPR_ERROR, - "should not receive flow control ops from transport"); - abort(); + stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1); break; case GRPC_NO_OP: break; case GRPC_OP_METADATA: - call_op.type = GRPC_RECV_METADATA; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.data.metadata = stream_op->data.metadata; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - break; - case GRPC_OP_DEADLINE: - call_op.type = GRPC_RECV_DEADLINE; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.data.deadline = stream_op->data.deadline; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - break; - case GRPC_OP_METADATA_BOUNDARY: - if (!calld->got_metadata_boundary) { - calld->got_metadata_boundary = 1; - call_op.type = GRPC_RECV_END_OF_INITIAL_METADATA; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - } + got_metadata(elem, stream_op->data.metadata); break; case GRPC_OP_BEGIN_MESSAGE: /* can't begin a message when we're still reading a message */ @@ -495,7 +471,8 @@ static void transport_closed(void *user_data, grpc_transport *transport) { const grpc_transport_callbacks connected_channel_transport_callbacks = { alloc_recv_buffer, accept_stream, recv_batch, - transport_goaway, transport_closed, }; + transport_goaway, transport_closed, +}; grpc_transport_setup_result grpc_connected_channel_bind_transport( grpc_channel_stack *channel_stack, grpc_transport *transport) { diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 3ccc39b717..56e12342d7 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -35,7 +35,10 @@ #include <grpc/support/log.h> typedef struct call_data { - int sent_headers; + grpc_linked_mdelem method; + grpc_linked_mdelem scheme; + grpc_linked_mdelem te_trailers; + grpc_linked_mdelem content_type; } call_data; typedef struct channel_data { @@ -49,6 +52,18 @@ typedef struct channel_data { /* used to silence 'variable not used' warnings */ static void ignore_unused(void *ignored) {} +static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; + channel_data *channeld = elem->channel_data; + if (md == channeld->status) { + return NULL; + } else if (md->key == channeld->status->key) { + grpc_call_element_send_cancel(elem); + return NULL; + } + return md; +} + /* Called either: - in response to an API call (or similar) from above, to send something - a network event (or similar) from below, to receive something @@ -61,42 +76,23 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, channel_data *channeld = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - ignore_unused(calld); - switch (op->type) { case GRPC_SEND_METADATA: - if (!calld->sent_headers) { - /* Send : prefixed headers, which have to be before any application - * layer headers. */ - calld->sent_headers = 1; - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->method)); - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->scheme)); - } - grpc_call_next_op(elem, op); - break; - case GRPC_SEND_START: - if (!calld->sent_headers) { - /* Send : prefixed headers, if we haven't already */ - calld->sent_headers = 1; - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->method)); - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->scheme)); - } - /* Send non : prefixed headers */ - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->te_trailers)); - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->content_type)); + /* Send : prefixed headers, which have to be before any application + * layer headers. */ + grpc_metadata_batch_add_head(&op->data.metadata, &calld->method, + grpc_mdelem_ref(channeld->method)); + grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme, + grpc_mdelem_ref(channeld->scheme)); + grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers, + grpc_mdelem_ref(channeld->te_trailers)); + grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, + grpc_mdelem_ref(channeld->content_type)); grpc_call_next_op(elem, op); break; case GRPC_RECV_METADATA: - if (op->data.metadata == channeld->status) { - grpc_mdelem_unref(op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_OK); - } else if (op->data.metadata->key == channeld->status->key) { - grpc_mdelem_unref(op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_OK); - grpc_call_element_send_cancel(elem); - } else { - grpc_call_next_op(elem, op); - } + grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem); + grpc_call_next_op(elem, op); break; default: /* pass control up or down the stack depending on op->dir */ @@ -124,16 +120,7 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - - ignore_unused(channeld); - - /* initialize members */ - calld->sent_headers = 0; -} + const void *server_transport_data) {} /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element *elem) { @@ -194,6 +181,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_http_client_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "http-client"}; + call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + "http-client"}; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 9da8b333ca..0bfe2f2e30 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -38,8 +38,6 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -typedef enum { NOT_RECEIVED, POST, GET } known_method_type; - typedef struct { grpc_mdelem *path; grpc_mdelem *content_type; @@ -47,16 +45,17 @@ typedef struct { } gettable; typedef struct call_data { - known_method_type seen_method; + gpr_uint8 got_initial_metadata; + gpr_uint8 seen_path; + gpr_uint8 seen_post; gpr_uint8 sent_status; gpr_uint8 seen_scheme; gpr_uint8 seen_te_trailers; - grpc_mdelem *path; + grpc_linked_mdelem status; } call_data; typedef struct channel_data { grpc_mdelem *te_trailers; - grpc_mdelem *method_get; grpc_mdelem *method_post; grpc_mdelem *http_scheme; grpc_mdelem *https_scheme; @@ -78,38 +77,70 @@ typedef struct channel_data { /* used to silence 'variable not used' warnings */ static void ignore_unused(void *ignored) {} -/* Handle 'GET': not technically grpc, so probably a web browser hitting - us */ -static void payload_done(void *elem, grpc_op_error error) { - if (error == GRPC_OP_OK) { - grpc_call_element_send_finish(elem); - } -} - -static void handle_get(grpc_call_element *elem) { +static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; channel_data *channeld = elem->channel_data; call_data *calld = elem->call_data; - grpc_call_op op; - size_t i; - for (i = 0; i < channeld->gettable_count; i++) { - if (channeld->gettables[i].path == calld->path) { - grpc_call_element_send_metadata(elem, - grpc_mdelem_ref(channeld->status_ok)); - grpc_call_element_send_metadata( - elem, grpc_mdelem_ref(channeld->gettables[i].content_type)); - op.type = GRPC_SEND_PREFORMATTED_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.message = channeld->gettables[i].content; - op.done_cb = payload_done; - op.user_data = elem; - grpc_call_next_op(elem, &op); + /* Check if it is one of the headers we care about. */ + if (md == channeld->te_trailers || md == channeld->method_post || + md == channeld->http_scheme || md == channeld->https_scheme || + md == channeld->grpc_scheme || md == channeld->content_type) { + /* swallow it */ + if (md == channeld->method_post) { + calld->seen_post = 1; + } else if (md->key == channeld->http_scheme->key) { + calld->seen_scheme = 1; + } else if (md == channeld->te_trailers) { + calld->seen_te_trailers = 1; } + /* TODO(klempner): Track that we've seen all the headers we should + require */ + return NULL; + } else if (md->key == channeld->content_type->key) { + if (strncmp(grpc_mdstr_as_c_string(md->value), "application/grpc+", 17) == + 0) { + /* Although the C implementation doesn't (currently) generate them, + any custom +-suffix is explicitly valid. */ + /* TODO(klempner): We should consider preallocating common values such + as +proto or +json, or at least stashing them if we see them. */ + /* TODO(klempner): Should we be surfacing this to application code? */ + } else { + /* TODO(klempner): We're currently allowing this, but we shouldn't + see it without a proxy so log for now. */ + gpr_log(GPR_INFO, "Unexpected content-type %s", + channeld->content_type->key); + } + return NULL; + } else if (md->key == channeld->te_trailers->key || + md->key == channeld->method_post->key || + md->key == channeld->http_scheme->key || + md->key == channeld->content_type->key) { + gpr_log(GPR_ERROR, "Invalid %s: header: '%s'", + grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)); + /* swallow it and error everything out. */ + /* TODO(klempner): We ought to generate more descriptive error messages + on the wire here. */ + grpc_call_element_send_cancel(elem); + return NULL; + } else if (md->key == channeld->path_key) { + if (calld->seen_path) { + gpr_log(GPR_ERROR, "Received :path twice"); + return NULL; + } + calld->seen_path = 1; + return md; + } else if (md->key == channeld->host_key) { + /* translate host to :authority since :authority may be + omitted */ + grpc_mdelem *authority = grpc_mdelem_from_metadata_strings( + channeld->mdctx, grpc_mdstr_ref(channeld->authority_key), + grpc_mdstr_ref(md->value)); + grpc_mdelem_unref(md); + return authority; + } else { + return md; } - grpc_call_element_send_metadata(elem, - grpc_mdelem_ref(channeld->status_not_found)); - grpc_call_element_send_finish(elem); } /* Called either: @@ -126,115 +157,44 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, switch (op->type) { case GRPC_RECV_METADATA: - /* Check if it is one of the headers we care about. */ - if (op->data.metadata == channeld->te_trailers || - op->data.metadata == channeld->method_get || - op->data.metadata == channeld->method_post || - op->data.metadata == channeld->http_scheme || - op->data.metadata == channeld->https_scheme || - op->data.metadata == channeld->grpc_scheme || - op->data.metadata == channeld->content_type) { - /* swallow it */ - if (op->data.metadata == channeld->method_get) { - calld->seen_method = GET; - } else if (op->data.metadata == channeld->method_post) { - calld->seen_method = POST; - } else if (op->data.metadata->key == channeld->http_scheme->key) { - calld->seen_scheme = 1; - } else if (op->data.metadata == channeld->te_trailers) { - calld->seen_te_trailers = 1; - } - /* TODO(klempner): Track that we've seen all the headers we should - require */ - grpc_mdelem_unref(op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_OK); - } else if (op->data.metadata->key == channeld->content_type->key) { - if (strncmp(grpc_mdstr_as_c_string(op->data.metadata->value), - "application/grpc+", 17) == 0) { - /* Although the C implementation doesn't (currently) generate them, - any - custom +-suffix is explicitly valid. */ - /* TODO(klempner): We should consider preallocating common values such - as +proto or +json, or at least stashing them if we see them. */ - /* TODO(klempner): Should we be surfacing this to application code? */ + grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); + if (!calld->got_initial_metadata) { + calld->got_initial_metadata = 1; + /* Have we seen the required http2 transport headers? + (:method, :scheme, content-type, with :path and :authority covered + at the channel level right now) */ + if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && + calld->seen_path) { + grpc_call_next_op(elem, op); } else { - /* TODO(klempner): We're currently allowing this, but we shouldn't - see it without a proxy so log for now. */ - gpr_log(GPR_INFO, "Unexpected content-type %s", - channeld->content_type->key); - } - grpc_mdelem_unref(op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_OK); - } else if (op->data.metadata->key == channeld->te_trailers->key || - op->data.metadata->key == channeld->method_post->key || - op->data.metadata->key == channeld->http_scheme->key || - op->data.metadata->key == channeld->content_type->key) { - gpr_log(GPR_ERROR, "Invalid %s: header: '%s'", - grpc_mdstr_as_c_string(op->data.metadata->key), - grpc_mdstr_as_c_string(op->data.metadata->value)); - /* swallow it and error everything out. */ - /* TODO(klempner): We ought to generate more descriptive error messages - on the wire here. */ - grpc_mdelem_unref(op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_OK); - grpc_call_element_send_cancel(elem); - } else if (op->data.metadata->key == channeld->path_key) { - if (calld->path != NULL) { - gpr_log(GPR_ERROR, "Received :path twice"); - grpc_mdelem_unref(calld->path); + if (!calld->seen_path) { + gpr_log(GPR_ERROR, "Missing :path header"); + } + if (!calld->seen_post) { + gpr_log(GPR_ERROR, "Missing :method header"); + } + if (!calld->seen_scheme) { + gpr_log(GPR_ERROR, "Missing :scheme header"); + } + if (!calld->seen_te_trailers) { + gpr_log(GPR_ERROR, "Missing te trailers header"); + } + /* Error this call out */ + grpc_metadata_batch_destroy(&op->data.metadata); + op->done_cb(op->user_data, GRPC_OP_OK); + grpc_call_element_send_cancel(elem); } - calld->path = op->data.metadata; - op->done_cb(op->user_data, GRPC_OP_OK); - } else if (op->data.metadata->key == channeld->host_key) { - /* translate host to :authority since :authority may be - omitted */ - grpc_mdelem *authority = grpc_mdelem_from_metadata_strings( - channeld->mdctx, grpc_mdstr_ref(channeld->authority_key), - grpc_mdstr_ref(op->data.metadata->value)); - grpc_mdelem_unref(op->data.metadata); - op->data.metadata = authority; - /* pass the event up */ - grpc_call_next_op(elem, op); } else { - /* pass the event up */ grpc_call_next_op(elem, op); } break; - case GRPC_RECV_END_OF_INITIAL_METADATA: - /* Have we seen the required http2 transport headers? - (:method, :scheme, content-type, with :path and :authority covered - at the channel level right now) */ - if (calld->seen_method == POST && calld->seen_scheme && - calld->seen_te_trailers && calld->path) { - grpc_call_element_recv_metadata(elem, calld->path); - calld->path = NULL; - grpc_call_next_op(elem, op); - } else if (calld->seen_method == GET) { - handle_get(elem); - } else { - if (calld->seen_method == NOT_RECEIVED) { - gpr_log(GPR_ERROR, "Missing :method header"); - } - if (!calld->seen_scheme) { - gpr_log(GPR_ERROR, "Missing :scheme header"); - } - if (!calld->seen_te_trailers) { - gpr_log(GPR_ERROR, "Missing te trailers header"); - } - /* Error this call out */ - op->done_cb(op->user_data, GRPC_OP_OK); - grpc_call_element_send_cancel(elem); - } - break; - case GRPC_SEND_START: case GRPC_SEND_METADATA: /* If we haven't sent status 200 yet, we need to so so because it needs to come before any non : prefixed metadata. */ if (!calld->sent_status) { calld->sent_status = 1; - /* status is reffed by grpc_call_element_send_metadata */ - grpc_call_element_send_metadata(elem, - grpc_mdelem_ref(channeld->status_ok)); + grpc_metadata_batch_add_head(&op->data.metadata, &calld->status, + grpc_mdelem_ref(channeld->status_ok)); } grpc_call_next_op(elem, op); break; @@ -272,25 +232,11 @@ static void init_call_elem(grpc_call_element *elem, ignore_unused(channeld); /* initialize members */ - calld->path = NULL; - calld->sent_status = 0; - calld->seen_scheme = 0; - calld->seen_method = NOT_RECEIVED; - calld->seen_te_trailers = 0; + memset(calld, 0, sizeof(*calld)); } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - - ignore_unused(channeld); - - if (calld->path) { - grpc_mdelem_unref(calld->path); - } -} +static void destroy_call_elem(grpc_call_element *elem) {} /* Constructor for channel_data */ static void init_channel_elem(grpc_channel_element *elem, @@ -314,7 +260,6 @@ static void init_channel_elem(grpc_channel_element *elem, channeld->status_not_found = grpc_mdelem_from_strings(mdctx, ":status", "404"); channeld->method_post = grpc_mdelem_from_strings(mdctx, ":method", "POST"); - channeld->method_get = grpc_mdelem_from_strings(mdctx, ":method", "GET"); channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http"); channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https"); channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc"); @@ -369,7 +314,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { grpc_mdelem_unref(channeld->status_ok); grpc_mdelem_unref(channeld->status_not_found); grpc_mdelem_unref(channeld->method_post); - grpc_mdelem_unref(channeld->method_get); grpc_mdelem_unref(channeld->http_scheme); grpc_mdelem_unref(channeld->https_scheme); grpc_mdelem_unref(channeld->grpc_scheme); diff --git a/src/core/channel/metadata_buffer.c b/src/core/channel/metadata_buffer.c deleted file mode 100644 index eac852e4a4..0000000000 --- a/src/core/channel/metadata_buffer.c +++ /dev/null @@ -1,149 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/channel/metadata_buffer.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/useful.h> - -#include <string.h> - -#define INITIAL_ELEM_CAP 8 - -/* One queued call; we track offsets to string data in a shared buffer to - reduce allocations. See grpc_metadata_buffer_impl for the memory use - strategy */ -typedef struct { - grpc_mdelem *md; - void (*cb)(void *user_data, grpc_op_error error); - void *user_data; - gpr_uint32 flags; -} qelem; - -/* Memory layout: - - grpc_metadata_buffer_impl - followed by an array of qelem */ -struct grpc_metadata_buffer_impl { - /* number of elements in q */ - size_t elems; - /* capacity of q */ - size_t elem_cap; -}; - -#define ELEMS(buffer) ((qelem *)((buffer) + 1)) - -void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer) { - /* start buffer as NULL, indicating no elements */ - *buffer = NULL; -} - -void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer, - grpc_op_error error) { - size_t i; - qelem *qe; - if (*buffer) { - for (i = 0; i < (*buffer)->elems; i++) { - qe = &ELEMS(*buffer)[i]; - grpc_mdelem_unref(qe->md); - qe->cb(qe->user_data, error); - } - gpr_free(*buffer); - } -} - -void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer, - grpc_call_op *op) { - grpc_metadata_buffer_impl *impl = *buffer; - qelem *qe; - size_t bytes; - - GPR_ASSERT(op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA); - - if (!impl) { - /* this is the first element: allocate enough space to hold the - header object and the initial element capacity of qelems */ - bytes = - sizeof(grpc_metadata_buffer_impl) + INITIAL_ELEM_CAP * sizeof(qelem); - impl = gpr_malloc(bytes); - /* initialize the header object */ - impl->elems = 0; - impl->elem_cap = INITIAL_ELEM_CAP; - } else if (impl->elems == impl->elem_cap) { - /* more qelems than what we can deal with: grow by doubling size */ - impl->elem_cap *= 2; - bytes = sizeof(grpc_metadata_buffer_impl) + impl->elem_cap * sizeof(qelem); - impl = gpr_realloc(impl, bytes); - } - - /* append an element to the queue */ - qe = &ELEMS(impl)[impl->elems]; - impl->elems++; - - qe->md = op->data.metadata; - qe->cb = op->done_cb; - qe->user_data = op->user_data; - qe->flags = op->flags; - - /* header object may have changed location: store it back */ - *buffer = impl; -} - -void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer, - grpc_call_element *elem) { - grpc_metadata_buffer_impl *impl = *buffer; - grpc_call_op op; - qelem *qe; - size_t i; - - if (!impl) { - /* nothing to send */ - return; - } - - /* construct call_op's, and push them down the stack */ - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - for (i = 0; i < impl->elems; i++) { - qe = &ELEMS(impl)[i]; - op.done_cb = qe->cb; - op.user_data = qe->user_data; - op.flags = qe->flags; - op.data.metadata = qe->md; - grpc_call_next_op(elem, &op); - } - - /* free data structures and reset to NULL: we can only flush once */ - gpr_free(impl); - *buffer = NULL; -} diff --git a/src/core/channel/metadata_buffer.h b/src/core/channel/metadata_buffer.h deleted file mode 100644 index b7cc5170d1..0000000000 --- a/src/core/channel/metadata_buffer.h +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H -#define GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H - -#include "src/core/channel/channel_stack.h" - -/* Utility code to buffer GRPC_SEND_METADATA calls and pass them down the stack - all at once at some otherwise-determined time. Useful for implementing - filters that want to queue metadata until a START event chooses some - underlying filter stack to send an rpc on. */ - -/* Clients should declare a member of grpc_metadata_buffer. This may at some - point become a typedef for a struct, but for now a pointer suffices */ -typedef struct grpc_metadata_buffer_impl grpc_metadata_buffer_impl; -typedef grpc_metadata_buffer_impl *grpc_metadata_buffer; - -/* Initializes the metadata buffer. Allocates no memory. */ -void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer); -/* Destroy the metadata buffer. */ -void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer, - grpc_op_error error); -/* Append a call to the end of a metadata buffer: may allocate memory */ -void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer, grpc_call_op *op); -/* Flush all queued operations from the metadata buffer to the element below - self */ -void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer, - grpc_call_element *self); -/* Count the number of queued elements in the buffer. */ -size_t grpc_metadata_buffer_count(const grpc_metadata_buffer *buffer); -/* Extract elements as a grpc_metadata*, for presentation to applications. - The returned buffer must be freed with - grpc_metadata_buffer_cleanup_elements. - Clears the metadata buffer (this is a one-shot operation) */ -grpc_metadata *grpc_metadata_buffer_extract_elements( - grpc_metadata_buffer *buffer); -void grpc_metadata_buffer_cleanup_elements(void *elements, grpc_op_error error); - -#endif /* GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H */ diff --git a/src/core/security/auth.c b/src/core/security/auth.c index 130698c11b..4af2c67d83 100644 --- a/src/core/security/auth.c +++ b/src/core/security/auth.c @@ -44,12 +44,15 @@ #include "src/core/security/credentials.h" #include "src/core/surface/call.h" +#define MAX_CREDENTIALS_METADATA_COUNT 4 + /* We can have a per-call credentials. */ typedef struct { grpc_credentials *creds; grpc_mdstr *host; grpc_mdstr *method; grpc_call_op op; + grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; } call_data; /* We can have a per-channel credentials. */ @@ -62,30 +65,8 @@ typedef struct { grpc_mdstr *status_key; } channel_data; -static void do_nothing(void *ignored, grpc_op_error error) {} - static void bubbleup_error(grpc_call_element *elem, const char *error_msg) { - grpc_call_op finish_op; - channel_data *channeld = elem->channel_data; - char status[GPR_LTOA_MIN_BUFSIZE]; - - gpr_log(GPR_ERROR, "%s", error_msg); - finish_op.type = GRPC_RECV_METADATA; - finish_op.dir = GRPC_CALL_UP; - finish_op.flags = 0; - finish_op.data.metadata = grpc_mdelem_from_metadata_strings( - channeld->md_ctx, grpc_mdstr_ref(channeld->error_msg_key), - grpc_mdstr_from_string(channeld->md_ctx, error_msg)); - finish_op.done_cb = do_nothing; - finish_op.user_data = NULL; - grpc_call_next_op(elem, &finish_op); - - gpr_ltoa(GRPC_STATUS_UNAUTHENTICATED, status); - finish_op.data.metadata = grpc_mdelem_from_metadata_strings( - channeld->md_ctx, grpc_mdstr_ref(channeld->status_key), - grpc_mdstr_from_string(channeld->md_ctx, status)); - grpc_call_next_op(elem, &finish_op); - + grpc_call_element_recv_status(elem, GRPC_STATUS_UNAUTHENTICATED, error_msg); grpc_call_element_send_cancel(elem); } @@ -93,11 +74,15 @@ static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems, size_t num_md, grpc_credentials_status status) { grpc_call_element *elem = (grpc_call_element *)user_data; + call_data *calld = elem->call_data; + grpc_call_op op = calld->op; size_t i; + GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT); for (i = 0; i < num_md; i++) { - grpc_call_element_send_metadata(elem, grpc_mdelem_ref(md_elems[i])); + grpc_metadata_batch_add_tail(&op.data.metadata, &calld->md_links[i], + grpc_mdelem_ref(md_elems[i])); } - grpc_call_next_op(elem, &((call_data *)elem->call_data)->op); + grpc_call_next_op(elem, &op); } static char *build_service_url(const char *url_scheme, call_data *calld) { @@ -159,6 +144,7 @@ static void on_host_checked(void *user_data, grpc_security_status status) { gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", grpc_mdstr_as_c_string(calld->host)); bubbleup_error(elem, error_msg); + grpc_metadata_batch_destroy(&calld->op.data.metadata); gpr_free(error_msg); calld->op.done_cb(calld->op.user_data, GRPC_OP_ERROR); } @@ -174,21 +160,22 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; + grpc_linked_mdelem *l; switch (op->type) { case GRPC_SEND_METADATA: - /* Pointer comparison is OK for md_elems created from the same context. */ - if (op->data.metadata->key == channeld->authority_string) { - if (calld->host != NULL) grpc_mdstr_unref(calld->host); - calld->host = grpc_mdstr_ref(op->data.metadata->value); - } else if (op->data.metadata->key == channeld->path_string) { - if (calld->method != NULL) grpc_mdstr_unref(calld->method); - calld->method = grpc_mdstr_ref(op->data.metadata->value); + for (l = op->data.metadata.list.head; l != NULL; l = l->next) { + grpc_mdelem *md = l->md; + /* Pointer comparison is OK for md_elems created from the same context. + */ + if (md->key == channeld->authority_string) { + if (calld->host != NULL) grpc_mdstr_unref(calld->host); + calld->host = grpc_mdstr_ref(md->value); + } else if (md->key == channeld->path_string) { + if (calld->method != NULL) grpc_mdstr_unref(calld->method); + calld->method = grpc_mdstr_ref(md->value); + } } - grpc_call_next_op(elem, op); - break; - - case GRPC_SEND_START: if (calld->host != NULL) { grpc_security_status status; const char *call_host = grpc_mdstr_as_c_string(calld->host); @@ -202,6 +189,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, "Invalid host %s set in :authority metadata.", call_host); bubbleup_error(elem, error_msg); + grpc_metadata_batch_destroy(&calld->op.data.metadata); gpr_free(error_msg); op->done_cb(op->user_data, GRPC_OP_ERROR); } @@ -210,7 +198,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, } send_security_metadata(elem, op); break; - default: /* pass control up or down the stack depending on op->dir */ grpc_call_next_op(elem, op); diff --git a/src/core/surface/call.c b/src/core/surface/call.c index dba63058b8..2949805622 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -33,7 +33,6 @@ #include "src/core/surface/call.h" #include "src/core/channel/channel_stack.h" -#include "src/core/channel/metadata_buffer.h" #include "src/core/iomgr/alarm.h" #include "src/core/support/string.h" #include "src/core/surface/byte_buffer_queue.h" @@ -41,6 +40,7 @@ #include "src/core/surface/completion_queue.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <assert.h> #include <stdio.h> #include <stdlib.h> @@ -68,8 +68,10 @@ typedef struct { } completed_request; /* See request_set in grpc_call below for a description */ -#define REQSET_EMPTY 255 -#define REQSET_DONE 254 +#define REQSET_EMPTY 'X' +#define REQSET_DONE 'Y' + +#define MAX_SEND_INITIAL_METADATA_COUNT 3 typedef struct { /* Overall status of the operation: starts OK, may degrade to @@ -92,6 +94,8 @@ typedef enum { /* Status came from the application layer overriding whatever the wire says */ STATUS_FROM_API_OVERRIDE = 0, + /* Status was created by some internal channel stack operation */ + STATUS_FROM_CORE, /* Status came from 'the wire' - or somewhere below the surface layer */ STATUS_FROM_WIRE, @@ -204,12 +208,18 @@ struct grpc_call { /* Call refcount - to keep the call alive during asynchronous operations */ gpr_refcount internal_refcount; + grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT]; + grpc_linked_mdelem status_link; + grpc_linked_mdelem details_link; + size_t send_initial_metadata_count; + gpr_timespec send_deadline; + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; }; -#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1)) +#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) @@ -226,9 +236,13 @@ struct grpc_call { static void do_nothing(void *ignored, grpc_op_error also_ignored) {} static send_action choose_send_action(grpc_call *call); static void enact_send_action(grpc_call *call, send_action sa); +static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, - const void *server_transport_data) { + const void *server_transport_data, + grpc_mdelem **add_initial_metadata, + size_t add_initial_metadata_count, + gpr_timespec send_deadline) { size_t i; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_call *call = @@ -245,6 +259,12 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE; call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE; } + GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT); + for (i = 0; i < add_initial_metadata_count; i++) { + call->send_initial_metadata[i].md = add_initial_metadata[i]; + } + call->send_initial_metadata_count = add_initial_metadata_count; + call->send_deadline = send_deadline; grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); /* one ref is dropped in response to destroy, the other in @@ -252,6 +272,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, gpr_ref_init(&call->internal_refcount, 2); grpc_call_stack_init(channel_stack, server_transport_data, CALL_STACK_FROM_CALL(call)); + if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) { + set_deadline_alarm(call, send_deadline); + } return call; } @@ -284,6 +307,9 @@ static void destroy_call(void *call, int ignored_success) { for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) { gpr_free(c->buffered_metadata[i].metadata); } + for (i = 0; i < c->send_initial_metadata_count; i++) { + grpc_mdelem_unref(c->send_initial_metadata[i].md); + } if (c->legacy_state) { destroy_legacy_state(c->legacy_state); } @@ -342,6 +368,7 @@ static void request_more_data(grpc_call *call) { op.flags = 0; op.done_cb = do_nothing; op.user_data = NULL; + op.bind_pollset = NULL; grpc_call_execute_op(call, &op); } @@ -587,15 +614,29 @@ static send_action choose_send_action(grpc_call *call) { return SEND_NOTHING; } -static void send_metadata(grpc_call *call, grpc_mdelem *elem) { - grpc_call_op op; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = GRPC_WRITE_BUFFER_HINT; - op.data.metadata = elem; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); +static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, + grpc_metadata *metadata) { + size_t i; + grpc_mdelem_list out; + if (count == 0) { + out.head = out.tail = NULL; + return out; + } + for (i = 0; i < count; i++) { + grpc_metadata *md = &metadata[i]; + grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1]; + grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1]; + grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); + l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, + (const gpr_uint8 *)md->value, + md->value_length); + l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL; + l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL; + } + out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data); + out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data); + return out; } static void enact_send_action(grpc_call *call, send_action sa) { @@ -614,19 +655,21 @@ static void enact_send_action(grpc_call *call, send_action sa) { /* fallthrough */ case SEND_INITIAL_METADATA: data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata(call, - grpc_mdelem_from_string_and_buffer( - call->metadata_context, md->key, - (const gpr_uint8 *)md->value, md->value_length)); - } - op.type = GRPC_SEND_START; + op.type = GRPC_SEND_METADATA; op.dir = GRPC_CALL_DOWN; op.flags = flags; - op.data.start.pollset = grpc_cq_pollset(call->cq); + op.data.metadata.list = chain_metadata_from_app( + call, data.send_metadata.count, data.send_metadata.metadata); + op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; + op.data.metadata.deadline = call->send_deadline; + for (i = 0; i < call->send_initial_metadata_count; i++) { + grpc_metadata_batch_link_head(&op.data.metadata, + &call->send_initial_metadata[i]); + } + call->send_initial_metadata_count = 0; op.done_cb = finish_start_step; op.user_data = call; + op.bind_pollset = grpc_cq_pollset(call->cq); grpc_call_execute_op(call, &op); break; case SEND_BUFFERED_MESSAGE: @@ -640,37 +683,42 @@ static void enact_send_action(grpc_call *call, send_action sa) { op.data.message = data.send_message; op.done_cb = finish_write_step; op.user_data = call; + op.bind_pollset = NULL; grpc_call_execute_op(call, &op); break; case SEND_TRAILING_METADATA_AND_FINISH: /* send trailing metadata */ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata(call, - grpc_mdelem_from_string_and_buffer( - call->metadata_context, md->key, - (const gpr_uint8 *)md->value, md->value_length)); - } + op.type = GRPC_SEND_METADATA; + op.dir = GRPC_CALL_DOWN; + op.flags = flags; + op.data.metadata.list = chain_metadata_from_app( + call, data.send_metadata.count, data.send_metadata.metadata); + op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; + op.data.metadata.deadline = call->send_deadline; + op.bind_pollset = NULL; /* send status */ /* TODO(ctiller): cache common status values */ data = call->request_data[GRPC_IOREQ_SEND_STATUS]; gpr_ltoa(data.send_status.code, status_str); - send_metadata( - call, + grpc_metadata_batch_add_tail( + &op.data.metadata, &call->status_link, grpc_mdelem_from_metadata_strings( call->metadata_context, grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, status_str))); if (data.send_status.details) { - send_metadata( - call, + grpc_metadata_batch_add_tail( + &op.data.metadata, &call->details_link, grpc_mdelem_from_metadata_strings( call->metadata_context, grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, data.send_status.details))); } + op.done_cb = do_nothing; + op.user_data = NULL; + grpc_call_execute_op(call, &op); /* fallthrough: see choose_send_action for details */ case SEND_FINISH: op.type = GRPC_SEND_FINISH; @@ -678,6 +726,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { op.flags = 0; op.done_cb = finish_finish_step; op.user_data = call; + op.bind_pollset = NULL; grpc_call_execute_op(call, &op); break; } @@ -831,6 +880,7 @@ grpc_call_error grpc_call_cancel(grpc_call *c) { op.flags = 0; op.done_cb = do_nothing; op.user_data = NULL; + op.bind_pollset = NULL; elem = CALL_ELEM_FROM_CALL(c, 0); elem->filter->call_op(elem, NULL, &op); @@ -875,9 +925,7 @@ static void call_alarm(void *arg, int success) { grpc_call_internal_unref(call, 1); } -void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - +static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { if (call->have_alarm) { gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); } @@ -886,11 +934,15 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); } -static void set_read_state(grpc_call *call, read_state state) { - lock(call); +static void set_read_state_locked(grpc_call *call, read_state state) { GPR_ASSERT(call->read_state < state); call->read_state = state; finish_read_ops(call); +} + +static void set_read_state(grpc_call *call, read_state state) { + lock(call); + set_read_state_locked(call, state); unlock(call); } @@ -914,7 +966,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; void *user_data = grpc_mdelem_get_user_data(md, destroy_status); if (user_data) { - status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET; + status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), GPR_SLICE_LENGTH(md->value->slice), @@ -936,52 +988,81 @@ void grpc_call_recv_message(grpc_call_element *elem, unlock(call); } -void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { +void grpc_call_recv_synthetic_status(grpc_call_element *elem, + grpc_status_code status, + const char *message) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); - grpc_mdstr *key = md->key; + lock(call); + set_status_code(call, STATUS_FROM_CORE, status); + set_status_details(call, STATUS_FROM_CORE, + grpc_mdstr_from_string(call->metadata_context, message)); + unlock(call); +} + +int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + grpc_linked_mdelem *l; grpc_metadata_array *dest; grpc_metadata *mdusr; + int is_trailing; + grpc_mdctx *mdctx = call->metadata_context; lock(call); - if (key == grpc_channel_get_status_string(call->channel)) { - set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); - grpc_mdelem_unref(md); - } else if (key == grpc_channel_get_message_string(call->channel)) { - set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); - grpc_mdelem_unref(md); - } else { - dest = &call->buffered_metadata[call->read_state >= - READ_STATE_GOT_INITIAL_METADATA]; - if (dest->count == dest->capacity) { - dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); - dest->metadata = - gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); - } - mdusr = &dest->metadata[dest->count++]; - mdusr->key = grpc_mdstr_as_c_string(md->key); - mdusr->value = grpc_mdstr_as_c_string(md->value); - mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); - if (call->owned_metadata_count == call->owned_metadata_capacity) { - call->owned_metadata_capacity = GPR_MAX( - call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2); - call->owned_metadata = - gpr_realloc(call->owned_metadata, - sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; + for (l = md->list.head; l != NULL; l = l->next) { + grpc_mdelem *md = l->md; + grpc_mdstr *key = md->key; + if (key == grpc_channel_get_status_string(call->channel)) { + set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); + } else if (key == grpc_channel_get_message_string(call->channel)) { + set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); + } else { + dest = &call->buffered_metadata[is_trailing]; + if (dest->count == dest->capacity) { + dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); + dest->metadata = + gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); + } + mdusr = &dest->metadata[dest->count++]; + mdusr->key = grpc_mdstr_as_c_string(md->key); + mdusr->value = grpc_mdstr_as_c_string(md->value); + mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); + if (call->owned_metadata_count == call->owned_metadata_capacity) { + call->owned_metadata_capacity = + GPR_MAX(call->owned_metadata_capacity + 8, + call->owned_metadata_capacity * 2); + call->owned_metadata = + gpr_realloc(call->owned_metadata, + sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + } + call->owned_metadata[call->owned_metadata_count++] = md; + l->md = 0; } - call->owned_metadata[call->owned_metadata_count++] = md; + } + if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) { + set_deadline_alarm(call, md->deadline); + } + if (!is_trailing) { + set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA); } unlock(call); + + grpc_mdctx_lock(mdctx); + for (l = md->list.head; l; l = l->next) { + if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + for (l = md->garbage.head; l; l = l->next) { + grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + grpc_mdctx_unlock(mdctx); + + return !is_trailing; } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } -void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { - grpc_call *call = grpc_call_from_top_element(surface_element); - set_read_state(call, READ_STATE_GOT_INITIAL_METADATA); -} - /* * BATCH API IMPLEMENTATION */ diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 06434f87ac..f8d0915349 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -35,7 +35,6 @@ #define GRPC_INTERNAL_CORE_SURFACE_CALL_H #include "src/core/channel/channel_stack.h" -#include "src/core/channel/metadata_buffer.h" #include <grpc/grpc.h> /* Primitive operation types - grpc_op's get rewritten into these */ @@ -67,7 +66,7 @@ typedef union { } recv_status_details; struct { size_t count; - const grpc_metadata *metadata; + grpc_metadata *metadata; } send_metadata; grpc_byte_buffer *send_message; struct { @@ -86,7 +85,10 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call, void *user_data); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, - const void *server_transport_data); + const void *server_transport_data, + grpc_mdelem **add_initial_metadata, + size_t add_initial_metadata_count, + gpr_timespec send_deadline); void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq); grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); @@ -96,8 +98,9 @@ void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); /* Helpers for grpc_client, grpc_server filters to publish received data to the completion queue/surface layer */ -void grpc_call_recv_metadata(grpc_call_element *surface_element, - grpc_mdelem *md); +/* receive metadata - returns 1 if this was initial metadata */ +int grpc_call_recv_metadata(grpc_call_element *surface_element, + grpc_metadata_batch *md); void grpc_call_recv_message(grpc_call_element *surface_element, grpc_byte_buffer *message); void grpc_call_read_closed(grpc_call_element *surface_element); @@ -108,14 +111,12 @@ grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data); -/* Called when it's known that the initial batch of metadata is complete */ -void grpc_call_initial_metadata_complete(grpc_call_element *surface_element); - -void grpc_call_set_deadline(grpc_call_element *surface_element, - gpr_timespec deadline); - grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); +void grpc_call_recv_synthetic_status(grpc_call_element *elem, + grpc_status_code status, + const char *message); + /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); @@ -128,4 +129,4 @@ void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, #define GRPC_CALL_LOG_BATCH(sev, call, ops, nops, tag) \ if (grpc_trace_batch) grpc_call_log_batch(sev, call, ops, nops, tag) -#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */ +#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index d3962a00c4..29b042e7c1 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -62,7 +62,7 @@ struct grpc_channel { registered_call *registered_calls; }; -#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1)) +#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) #define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \ (((grpc_channel *)(channel_stack)) - 1) #define CHANNEL_FROM_TOP_ELEM(top_elem) \ @@ -91,44 +91,25 @@ grpc_channel *grpc_channel_create_from_filters( return channel; } -static void do_nothing(void *ignored, grpc_op_error error) {} - static grpc_call *grpc_channel_create_call_internal( grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem, grpc_mdelem *authority_mdelem, gpr_timespec deadline) { - grpc_call *call; - grpc_call_op op; + grpc_mdelem *send_metadata[2]; - if (!channel->is_client) { - gpr_log(GPR_ERROR, "Cannot create a call on the server."); - return NULL; - } + GPR_ASSERT(channel->is_client); - call = grpc_call_create(channel, cq, NULL); + send_metadata[0] = path_mdelem; + send_metadata[1] = authority_mdelem; - /* Add :path and :authority headers. */ - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.metadata = path_mdelem; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - - op.data.metadata = authority_mdelem; - grpc_call_execute_op(call, &op); - - if (0 != gpr_time_cmp(deadline, gpr_inf_future)) { - op.type = GRPC_SEND_DEADLINE; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.deadline = deadline; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - } + return grpc_call_create(channel, cq, NULL, send_metadata, + GPR_ARRAY_SIZE(send_metadata), deadline); +} - return call; +grpc_call *grpc_channel_create_call_old(grpc_channel *channel, + const char *method, const char *host, + gpr_timespec absolute_deadline) { + return grpc_channel_create_call(channel, NULL, method, host, + absolute_deadline); } grpc_call *grpc_channel_create_call(grpc_channel *channel, @@ -146,13 +127,6 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, deadline); } -grpc_call *grpc_channel_create_call_old(grpc_channel *channel, - const char *method, const char *host, - gpr_timespec absolute_deadline) { - return grpc_channel_create_call(channel, NULL, method, host, - absolute_deadline); -} - void *grpc_channel_register_call(grpc_channel *channel, const char *method, const char *host) { registered_call *rc = gpr_malloc(sizeof(registered_call)); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index 4d54865d16..2f898ff7d7 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -39,28 +39,17 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -typedef struct { - void *unused; -} call_data; +typedef struct { void *unused; } call_data; -typedef struct { - void *unused; -} channel_data; +typedef struct { void *unused; } channel_data; static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { - case GRPC_SEND_DEADLINE: - grpc_call_set_deadline(elem, op->data.deadline); - grpc_call_next_op(elem, op); - break; case GRPC_RECV_METADATA: - grpc_call_recv_metadata(elem, op->data.metadata); - break; - case GRPC_RECV_DEADLINE: - gpr_log(GPR_ERROR, "Deadline received by client (ignored)"); + grpc_call_recv_metadata(elem, &op->data.metadata); break; case GRPC_RECV_MESSAGE: grpc_call_recv_message(elem, op->data.message); @@ -72,8 +61,9 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, case GRPC_RECV_FINISH: grpc_call_stream_closed(elem); break; - case GRPC_RECV_END_OF_INITIAL_METADATA: - grpc_call_initial_metadata_complete(elem); + case GRPC_RECV_SYNTHETIC_STATUS: + grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status, + op->data.synthetic_status.message); break; default: GPR_ASSERT(op->dir == GRPC_CALL_DOWN); @@ -114,6 +104,6 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} const grpc_channel_filter grpc_client_surface_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "client", }; + call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client", +}; diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index b40c48381f..78170806f1 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -42,28 +42,20 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -typedef struct { - void *unused; -} call_data; +typedef struct { void *unused; } call_data; -typedef struct { - grpc_mdelem *status; - grpc_mdelem *message; -} channel_data; +typedef struct { void *unused; } channel_data; static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_op *op) { - channel_data *channeld = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { - case GRPC_SEND_START: - grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->status)); - grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message)); - grpc_call_stream_closed(elem); - break; case GRPC_SEND_METADATA: - grpc_mdelem_unref(op->data.metadata); + grpc_metadata_batch_destroy(&op->data.metadata); + grpc_call_recv_synthetic_status(elem, GRPC_STATUS_UNKNOWN, + "Rpc sent on a lame channel."); + grpc_call_stream_closed(elem); break; default: break; @@ -94,29 +86,17 @@ static void destroy_call_elem(grpc_call_element *elem) {} static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { - channel_data *channeld = elem->channel_data; - char status[12]; - GPR_ASSERT(is_first); GPR_ASSERT(is_last); - - channeld->message = grpc_mdelem_from_strings(mdctx, "grpc-message", - "Rpc sent on a lame channel."); - gpr_ltoa(GRPC_STATUS_UNKNOWN, status); - channeld->status = grpc_mdelem_from_strings(mdctx, "grpc-status", status); } -static void destroy_channel_elem(grpc_channel_element *elem) { - channel_data *channeld = elem->channel_data; - - grpc_mdelem_unref(channeld->message); - grpc_mdelem_unref(channeld->status); -} +static void destroy_channel_elem(grpc_channel_element *elem) {} static const grpc_channel_filter lame_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "lame-client", }; + call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + "lame-client", +}; grpc_channel *grpc_lame_client_channel_create(void) { static const grpc_channel_filter *filters[] = {&lame_filter}; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 17cba9a505..e771929870 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -411,29 +411,32 @@ static void read_closed(grpc_call_element *elem) { gpr_mu_unlock(&chand->server->mu); } +static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (md->key == chand->path_key) { + calld->path = grpc_mdstr_ref(md->value); + return NULL; + } else if (md->key == chand->authority_key) { + calld->host = grpc_mdstr_ref(md->value); + return NULL; + } + return md; +} + static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, grpc_call_op *op) { - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - grpc_mdelem *md; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { case GRPC_RECV_METADATA: - md = op->data.metadata; - if (md->key == chand->path_key) { - calld->path = grpc_mdstr_ref(md->value); - grpc_mdelem_unref(md); - } else if (md->key == chand->authority_key) { - calld->host = grpc_mdstr_ref(md->value); - grpc_mdelem_unref(md); - } else { - grpc_call_recv_metadata(elem, md); + grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); + if (grpc_call_recv_metadata(elem, &op->data.metadata)) { + calld->deadline = op->data.metadata.deadline; + start_new_rpc(elem); } break; - case GRPC_RECV_END_OF_INITIAL_METADATA: - start_new_rpc(elem); - grpc_call_initial_metadata_complete(elem); - break; case GRPC_RECV_MESSAGE: grpc_call_recv_message(elem, op->data.message); op->done_cb(op->user_data, GRPC_OP_OK); @@ -444,10 +447,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, case GRPC_RECV_FINISH: stream_closed(elem); break; - case GRPC_RECV_DEADLINE: - grpc_call_set_deadline(elem, op->data.deadline); - ((call_data *)elem->call_data)->deadline = op->data.deadline; - break; default: GPR_ASSERT(op->dir == GRPC_CALL_DOWN); grpc_call_next_op(elem, op); @@ -464,7 +463,8 @@ static void channel_op(grpc_channel_element *elem, case GRPC_ACCEPT_CALL: /* create a call */ grpc_call_create(chand->channel, NULL, - op->data.accept_call.transport_server_data); + op->data.accept_call.transport_server_data, NULL, 0, + gpr_inf_future); break; case GRPC_TRANSPORT_CLOSED: /* if the transport is closed for a server channel, we destroy the diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 708bb06c7f..5ca31d6bc7 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -43,7 +43,7 @@ #include "src/core/transport/chttp2/timeout_encoding.h" #include "src/core/transport/chttp2/varint.h" -#define HASH_FRAGMENT_1(x) ((x) & 255) +#define HASH_FRAGMENT_1(x) ((x)&255) #define HASH_FRAGMENT_2(x) ((x >> 8) & 255) #define HASH_FRAGMENT_3(x) ((x >> 16) & 255) #define HASH_FRAGMENT_4(x) ((x >> 24) & 255) @@ -479,10 +479,9 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, /* skip */ curop++; break; - case GRPC_OP_FLOW_CTL_CB: - case GRPC_OP_DEADLINE: case GRPC_OP_METADATA: - case GRPC_OP_METADATA_BOUNDARY: + grpc_metadata_batch_assert_ok(&op->data.metadata); + case GRPC_OP_FLOW_CTL_CB: /* these just get copied as they don't impact the number of flow controlled bytes */ grpc_sopb_append(outops, op, 1); @@ -529,6 +528,12 @@ exit_loop: *inops_count -= curop; memmove(inops, inops + curop, *inops_count * sizeof(grpc_stream_op)); + for (curop = 0; curop < *inops_count; curop++) { + if (inops[curop].type == GRPC_OP_METADATA) { + grpc_metadata_batch_assert_ok(&inops[curop].data.metadata); + } + } + return flow_controlled_bytes_taken; } @@ -543,6 +548,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, gpr_uint32 curop = 0; gpr_uint32 unref_op; grpc_mdctx *mdctx = compressor->mdctx; + grpc_linked_mdelem *l; int need_unref = 0; GPR_ASSERT(stream_id != 0); @@ -566,19 +572,19 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, curop++; break; case GRPC_OP_METADATA: - /* Encode a metadata element; store the returned value, representing + /* Encode a metadata batch; store the returned values, representing a metadata element that needs to be unreffed back into the metadata slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got updated). After this loop, we'll do a batch unref of elements. */ - op->data.metadata = hpack_enc(compressor, op->data.metadata, &st); - need_unref |= op->data.metadata != NULL; - curop++; - break; - case GRPC_OP_DEADLINE: - deadline_enc(compressor, op->data.deadline, &st); - curop++; - break; - case GRPC_OP_METADATA_BOUNDARY: + need_unref |= op->data.metadata.garbage.head != NULL; + grpc_metadata_batch_assert_ok(&op->data.metadata); + for (l = op->data.metadata.list.head; l; l = l->next) { + l->md = hpack_enc(compressor, l->md, &st); + need_unref |= l->md != NULL; + } + if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) { + deadline_enc(compressor, op->data.metadata.deadline, &st); + } ensure_frame_type(&st, HEADER, 0); finish_frame(&st, 1, 0); st.last_was_header = 0; /* force a new header frame */ @@ -614,8 +620,12 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, for (unref_op = 0; unref_op < curop; unref_op++) { op = &ops[unref_op]; if (op->type != GRPC_OP_METADATA) continue; - if (!op->data.metadata) continue; - grpc_mdctx_locked_mdelem_unref(mdctx, op->data.metadata); + for (l = op->data.metadata.list.head; l; l = l->next) { + if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + for (l = op->data.metadata.garbage.head; l; l = l->next) { + grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } } grpc_mdctx_unlock(mdctx); } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 995d64015a..e32ee284e0 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -68,10 +68,10 @@ int grpc_http_trace = 0; typedef struct transport transport; typedef struct stream stream; -#define IF_TRACING(stmt) \ - if (!(grpc_http_trace)) \ - ; \ - else \ +#define IF_TRACING(stmt) \ + if (!(grpc_http_trace)) \ + ; \ + else \ stmt /* streams are kept in various linked lists depending on what things need to @@ -292,6 +292,12 @@ struct stream { stream_link links[STREAM_LIST_COUNT]; gpr_uint8 included[STREAM_LIST_COUNT]; + /* incoming metadata */ + grpc_linked_mdelem *incoming_metadata; + size_t incoming_metadata_count; + size_t incoming_metadata_capacity; + gpr_timespec incoming_deadline; + /* sops from application */ grpc_stream_op_buffer outgoing_sopb; /* sops that have passed flow control to be written */ @@ -577,7 +583,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, lock(t); s->id = 0; } else { - s->id = (gpr_uint32)(gpr_uintptr) server_data; + s->id = (gpr_uint32)(gpr_uintptr)server_data; t->incoming_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); } @@ -593,6 +599,10 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, s->cancelled = 0; s->allow_window_updates = 0; s->published_close = 0; + s->incoming_metadata_count = 0; + s->incoming_metadata_capacity = 0; + s->incoming_metadata = NULL; + s->incoming_deadline = gpr_inf_future; memset(&s->links, 0, sizeof(s->links)); memset(&s->included, 0, sizeof(s->included)); grpc_sopb_init(&s->outgoing_sopb); @@ -698,7 +708,8 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) { } static void stream_list_join(transport *t, stream *s, stream_list_id id) { - if (id == PENDING_CALLBACKS) GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE); + if (id == PENDING_CALLBACKS) + GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE); if (s->included[id]) { return; } @@ -760,7 +771,7 @@ static void unlock(transport *t) { if (t->error_state == ERROR_STATE_SEEN && !t->writing) { call_closed = 1; t->calling_back = 1; - t->cb = NULL; /* no more callbacks */ + t->cb = NULL; /* no more callbacks */ t->error_state = ERROR_STATE_NOTIFIED; } if (t->num_pending_goaways) { @@ -782,8 +793,7 @@ static void unlock(transport *t) { /* perform some callbacks if necessary */ for (i = 0; i < num_goaways; i++) { - cb->goaway(t->cb_user_data, &t->base, goaways[i].status, - goaways[i].debug); + cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug); } if (perform_callbacks) { @@ -1058,6 +1068,17 @@ static void finalize_cancellations(transport *t) { } } +static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) { + if (s->incoming_metadata_capacity == s->incoming_metadata_count) { + s->incoming_metadata_capacity = + GPR_MAX(8, 2 * s->incoming_metadata_capacity); + s->incoming_metadata = + gpr_realloc(s->incoming_metadata, sizeof(*s->incoming_metadata) * + s->incoming_metadata_capacity); + } + s->incoming_metadata[s->incoming_metadata_count++].md = elem; +} + static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, @@ -1077,9 +1098,18 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, stream_list_join(t, s, CANCELLED); gpr_ltoa(local_status, buffer); - grpc_sopb_add_metadata( - &s->parser.incoming_sopb, + add_incoming_metadata( + t, s, grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); + switch (local_status) { + case GRPC_STATUS_CANCELLED: + add_incoming_metadata( + t, s, grpc_mdelem_from_strings(t->metadata_context, + "grpc-message", "Cancelled")); + break; + default: + break; + } stream_list_join(t, s, PENDING_CALLBACKS); } @@ -1255,11 +1285,10 @@ static void on_header(void *tp, grpc_mdelem *md) { } grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } - grpc_sopb_add_deadline(&s->parser.incoming_sopb, - gpr_time_add(gpr_now(), *cached_timeout)); + s->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout); grpc_mdelem_unref(md); } else { - grpc_sopb_add_metadata(&s->parser.incoming_sopb, md); + add_incoming_metadata(t, s, md); } } @@ -1304,7 +1333,7 @@ static int init_header_frame_parser(transport *t, int is_continuation) { t->incoming_stream = NULL; /* if stream is accepted, we set incoming_stream in init_stream */ t->cb->accept_stream(t->cb_user_data, &t->base, - (void *)(gpr_uintptr) t->incoming_stream_id); + (void *)(gpr_uintptr)t->incoming_stream_id); s = t->incoming_stream; if (!s) { gpr_log(GPR_ERROR, "stream not accepted"); @@ -1435,6 +1464,35 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { return window + window_update < MAX_WINDOW; } +static void free_md(void *p, grpc_op_error result) { gpr_free(p); } + +static void add_metadata_batch(transport *t, stream *s) { + grpc_metadata_batch b; + size_t i; + + b.list.head = &s->incoming_metadata[0]; + b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1]; + b.garbage.head = b.garbage.tail = NULL; + b.deadline = s->incoming_deadline; + + for (i = 1; i < s->incoming_metadata_count; i++) { + s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1]; + s->incoming_metadata[i - 1].next = &s->incoming_metadata[i]; + } + s->incoming_metadata[0].prev = NULL; + s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL; + + grpc_sopb_add_metadata(&s->parser.incoming_sopb, b); + grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md, + s->incoming_metadata); + + /* reset */ + s->incoming_deadline = gpr_inf_future; + s->incoming_metadata = NULL; + s->incoming_metadata_count = 0; + s->incoming_metadata_capacity = 0; +} + static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { grpc_chttp2_parse_state st; size_t i; @@ -1449,8 +1507,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); } if (st.metadata_boundary) { - grpc_sopb_add_metadata_boundary( - &t->incoming_stream->parser.incoming_sopb); + add_metadata_batch(t, t->incoming_stream); stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); } if (st.ack_settings) { @@ -1580,8 +1637,8 @@ static int process_read(transport *t, gpr_slice slice) { "Connect string mismatch: expected '%c' (%d) got '%c' (%d) " "at byte %d", CLIENT_CONNECT_STRING[t->deframe_state], - (int)(gpr_uint8) CLIENT_CONNECT_STRING[t->deframe_state], - *cur, (int)*cur, t->deframe_state); + (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur, + (int)*cur, t->deframe_state); drop_connection(t); return 0; } @@ -1778,17 +1835,20 @@ static int prepare_callbacks(transport *t) { int n = 0; while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) { int execute = 1; - grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb); s->callback_state = compute_state(s->sent_write_closed, s->read_closed); if (s->callback_state == GRPC_STREAM_CLOSED) { remove_from_stream_map(t, s); if (s->published_close) { execute = 0; + } else if (s->incoming_metadata_count) { + add_metadata_batch(t, s); } s->published_close = 1; } + grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb); + if (execute) { stream_list_add_tail(t, s, EXECUTING_CALLBACKS); n = 1; @@ -1825,9 +1885,9 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { */ static const grpc_transport_vtable vtable = { - sizeof(stream), init_stream, send_batch, set_allow_window_updates, - add_to_pollset, destroy_stream, abort_stream, goaway, - close_transport, send_ping, destroy_transport}; + sizeof(stream), init_stream, send_batch, set_allow_window_updates, + add_to_pollset, destroy_stream, abort_stream, goaway, close_transport, + send_ping, destroy_transport}; void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, void *arg, diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index 44f6591c95..74e94b2c24 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -120,7 +120,7 @@ static void unlock(grpc_mdctx *ctx) { if (ctx->refs == 0) { /* uncomment if you're having trouble diagnosing an mdelem leak to make things clearer (slows down destruction a lot, however) */ - /* gc_mdtab(ctx); */ + gc_mdtab(ctx); if (ctx->mdtab_count && ctx->mdtab_count == ctx->mdtab_free) { discard_metadata(ctx); } diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index c30e3a27f1..882c078d51 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -33,11 +33,11 @@ #include "src/core/transport/stream_op.h" +#include <string.h> + #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <string.h> - /* Exponential growth function: Given x, return a larger x. Currently we grow by 1.5 times upon reallocation. */ #define GROW(x) (3 * (x) / 2) @@ -79,33 +79,46 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) { gpr_slice_unref(ops[i].data.slice); break; case GRPC_OP_METADATA: - grpc_mdelem_unref(ops[i].data.metadata); + grpc_metadata_batch_destroy(&ops[i].data.metadata); break; case GRPC_OP_FLOW_CTL_CB: ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR); break; case GRPC_NO_OP: - case GRPC_OP_DEADLINE: - case GRPC_OP_METADATA_BOUNDARY: case GRPC_OP_BEGIN_MESSAGE: break; } } } +static void assert_contained_metadata_ok(grpc_stream_op *ops, size_t nops) { +#ifndef NDEBUG + size_t i; + for (i = 0; i < nops; i++) { + if (ops[i].type == GRPC_OP_METADATA) { + grpc_metadata_batch_assert_ok(&ops[i].data.metadata); + } + } +#endif /* NDEBUG */ +} + static void expandto(grpc_stream_op_buffer *sopb, size_t new_capacity) { sopb->capacity = new_capacity; + assert_contained_metadata_ok(sopb->ops, sopb->nops); if (sopb->ops == sopb->inlined_ops) { sopb->ops = gpr_malloc(sizeof(grpc_stream_op) * new_capacity); memcpy(sopb->ops, sopb->inlined_ops, sopb->nops * sizeof(grpc_stream_op)); } else { sopb->ops = gpr_realloc(sopb->ops, sizeof(grpc_stream_op) * new_capacity); } + assert_contained_metadata_ok(sopb->ops, sopb->nops); } static grpc_stream_op *add(grpc_stream_op_buffer *sopb) { grpc_stream_op *out; + assert_contained_metadata_ok(sopb->ops, sopb->nops); + if (sopb->nops == sopb->capacity) { expandto(sopb, GROW(sopb->capacity)); } @@ -116,6 +129,7 @@ static grpc_stream_op *add(grpc_stream_op_buffer *sopb) { void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb) { add(sopb)->type = GRPC_NO_OP; + assert_contained_metadata_ok(sopb->ops, sopb->nops); } void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, @@ -124,30 +138,24 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, op->type = GRPC_OP_BEGIN_MESSAGE; op->data.begin_message.length = length; op->data.begin_message.flags = flags; + assert_contained_metadata_ok(sopb->ops, sopb->nops); } -void grpc_sopb_add_metadata_boundary(grpc_stream_op_buffer *sopb) { - grpc_stream_op *op = add(sopb); - op->type = GRPC_OP_METADATA_BOUNDARY; -} - -void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_mdelem *md) { +void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, + grpc_metadata_batch b) { grpc_stream_op *op = add(sopb); + grpc_metadata_batch_assert_ok(&b); op->type = GRPC_OP_METADATA; - op->data.metadata = md; -} - -void grpc_sopb_add_deadline(grpc_stream_op_buffer *sopb, - gpr_timespec deadline) { - grpc_stream_op *op = add(sopb); - op->type = GRPC_OP_DEADLINE; - op->data.deadline = deadline; + op->data.metadata = b; + grpc_metadata_batch_assert_ok(&op->data.metadata); + assert_contained_metadata_ok(sopb->ops, sopb->nops); } void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) { grpc_stream_op *op = add(sopb); op->type = GRPC_OP_SLICE; op->data.slice = slice; + assert_contained_metadata_ok(sopb->ops, sopb->nops); } void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb, @@ -157,6 +165,7 @@ void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb, op->type = GRPC_OP_FLOW_CTL_CB; op->data.flow_ctl_cb.cb = cb; op->data.flow_ctl_cb.arg = arg; + assert_contained_metadata_ok(sopb->ops, sopb->nops); } void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, @@ -164,10 +173,161 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, size_t orig_nops = sopb->nops; size_t new_nops = orig_nops + nops; + assert_contained_metadata_ok(ops, nops); + assert_contained_metadata_ok(sopb->ops, sopb->nops); if (new_nops > sopb->capacity) { expandto(sopb, GPR_MAX(GROW(sopb->capacity), new_nops)); } memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops); sopb->nops = new_nops; + assert_contained_metadata_ok(sopb->ops, sopb->nops); +} + +static void assert_valid_list(grpc_mdelem_list *list) { +#ifndef NDEBUG + grpc_linked_mdelem *l; + + GPR_ASSERT((list->head == NULL) == (list->tail == NULL)); + if (!list->head) return; + GPR_ASSERT(list->head->prev == NULL); + GPR_ASSERT(list->tail->next == NULL); + GPR_ASSERT((list->head == list->tail) == (list->head->next == NULL)); + + for (l = list->head; l; l = l->next) { + GPR_ASSERT(l->md); + GPR_ASSERT((l->prev == NULL) == (l == list->head)); + GPR_ASSERT((l->next == NULL) == (l == list->tail)); + if (l->next) GPR_ASSERT(l->next->prev == l); + if (l->prev) GPR_ASSERT(l->prev->next == l); + } +#endif /* NDEBUG */ +} + +#ifndef NDEBUG +void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd) { + assert_valid_list(&comd->list); + assert_valid_list(&comd->garbage); +} +#endif /* NDEBUG */ + +void grpc_metadata_batch_init(grpc_metadata_batch *comd) { + comd->list.head = comd->list.tail = comd->garbage.head = comd->garbage.tail = + NULL; + comd->deadline = gpr_inf_future; +} + +void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) { + grpc_linked_mdelem *l; + for (l = comd->list.head; l; l = l->next) { + grpc_mdelem_unref(l->md); + } + for (l = comd->garbage.head; l; l = l->next) { + grpc_mdelem_unref(l->md); + } +} + +void grpc_metadata_batch_add_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add) { + GPR_ASSERT(elem_to_add); + storage->md = elem_to_add; + grpc_metadata_batch_link_head(comd, storage); +} + +static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { + assert_valid_list(list); + GPR_ASSERT(storage->md); + storage->prev = NULL; + storage->next = list->head; + if (list->head != NULL) { + list->head->prev = storage; + } else { + list->tail = storage; + } + list->head = storage; + assert_valid_list(list); +} + +void grpc_metadata_batch_link_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage) { + link_head(&comd->list, storage); +} + +void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add) { + GPR_ASSERT(elem_to_add); + storage->md = elem_to_add; + grpc_metadata_batch_link_tail(comd, storage); +} + +static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { + assert_valid_list(list); + GPR_ASSERT(storage->md); + storage->prev = list->tail; + storage->next = NULL; + if (list->tail != NULL) { + list->tail->next = storage; + } else { + list->head = storage; + } + list->tail = storage; + assert_valid_list(list); +} + +void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage) { + link_tail(&comd->list, storage); +} + +void grpc_metadata_batch_merge(grpc_metadata_batch *target, + grpc_metadata_batch *add) { + grpc_linked_mdelem *l; + grpc_linked_mdelem *next; + for (l = add->list.head; l; l = next) { + next = l->next; + link_tail(&target->list, l); + } + for (l = add->garbage.head; l; l = next) { + next = l->next; + link_tail(&target->garbage, l); + } +} + +void grpc_metadata_batch_filter(grpc_metadata_batch *comd, + grpc_mdelem *(*filter)(void *user_data, + grpc_mdelem *elem), + void *user_data) { + grpc_linked_mdelem *l; + grpc_linked_mdelem *next; + + assert_valid_list(&comd->list); + assert_valid_list(&comd->garbage); + for (l = comd->list.head; l; l = next) { + grpc_mdelem *orig = l->md; + grpc_mdelem *filt = filter(user_data, orig); + next = l->next; + if (filt == NULL) { + if (l->prev) { + l->prev->next = l->next; + } + if (l->next) { + l->next->prev = l->prev; + } + if (comd->list.head == l) { + comd->list.head = l->next; + } + if (comd->list.tail == l) { + comd->list.tail = l->prev; + } + assert_valid_list(&comd->list); + link_head(&comd->garbage, l); + } else if (filt != orig) { + grpc_mdelem_unref(orig); + l->md = filt; + } + } + assert_valid_list(&comd->list); + assert_valid_list(&comd->garbage); } diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 2ffbcce87b..20146b9af2 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -50,8 +50,6 @@ typedef enum grpc_stream_op_code { Must be ignored by receivers */ GRPC_NO_OP, GRPC_OP_METADATA, - GRPC_OP_DEADLINE, - GRPC_OP_METADATA_BOUNDARY, /* Begin a message/metadata element/status - as defined by grpc_message_type. */ GRPC_OP_BEGIN_MESSAGE, @@ -76,6 +74,51 @@ typedef struct grpc_flow_ctl_cb { void *arg; } grpc_flow_ctl_cb; +typedef struct grpc_linked_mdelem { + grpc_mdelem *md; + struct grpc_linked_mdelem *next; + struct grpc_linked_mdelem *prev; +} grpc_linked_mdelem; + +typedef struct grpc_mdelem_list { + grpc_linked_mdelem *head; + grpc_linked_mdelem *tail; +} grpc_mdelem_list; + +typedef struct grpc_metadata_batch { + grpc_mdelem_list list; + grpc_mdelem_list garbage; + gpr_timespec deadline; +} grpc_metadata_batch; + +void grpc_metadata_batch_init(grpc_metadata_batch *comd); +void grpc_metadata_batch_destroy(grpc_metadata_batch *comd); +void grpc_metadata_batch_merge(grpc_metadata_batch *target, + grpc_metadata_batch *add); + +void grpc_metadata_batch_link_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage); +void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage); + +void grpc_metadata_batch_add_head(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add); +void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd, + grpc_linked_mdelem *storage, + grpc_mdelem *elem_to_add); + +void grpc_metadata_batch_filter(grpc_metadata_batch *comd, + grpc_mdelem *(*filter)(void *user_data, + grpc_mdelem *elem), + void *user_data); + +#ifndef NDEBUG +void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd); +#else +#define grpc_metadata_batch_assert_ok(comd) do {} while (0) +#endif + /* Represents a single operation performed on a stream/transport */ typedef struct grpc_stream_op { /* the operation to be applied */ @@ -84,8 +127,7 @@ typedef struct grpc_stream_op { associated op-code */ union { grpc_begin_message begin_message; - grpc_mdelem *metadata; - gpr_timespec deadline; + grpc_metadata_batch metadata; gpr_slice slice; grpc_flow_ctl_cb flow_ctl_cb; } data; @@ -118,9 +160,7 @@ void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb); /* Append a GRPC_OP_BEGIN to a buffer */ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, gpr_uint32 flags); -void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_mdelem *metadata); -void grpc_sopb_add_deadline(grpc_stream_op_buffer *sopb, gpr_timespec deadline); -void grpc_sopb_add_metadata_boundary(grpc_stream_op_buffer *sopb); +void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata); /* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */ void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice); /* Append a GRPC_OP_FLOW_CTL_CB to a buffer */ |