diff options
Diffstat (limited to 'src/core/channel/connected_channel.c')
-rw-r--r-- | src/core/channel/connected_channel.c | 92 |
1 files changed, 61 insertions, 31 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 62611e08f3..519f05cb66 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -60,10 +60,14 @@ typedef struct connected_channel_call_data { gpr_uint32 max_message_length; gpr_uint32 incoming_message_length; gpr_uint8 reading_message; - gpr_uint8 got_metadata_boundary; gpr_uint8 got_read_close; gpr_slice_buffer incoming_message; gpr_uint32 outgoing_buffer_length_estimate; + + grpc_linked_mdelem *incoming_metadata; + size_t incoming_metadata_count; + size_t incoming_metadata_capacity; + gpr_timespec deadline; } call_data; /* We perform a small hack to locate transport data alongside the connected @@ -116,18 +120,19 @@ static void end_bufferable_op(grpc_call_op *op, channel_data *chand, static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_op *op) { call_data *calld = elem->call_data; + grpc_linked_mdelem *m; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { case GRPC_SEND_METADATA: - grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata); - grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, - op->user_data); - break; - case GRPC_SEND_DEADLINE: - grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.deadline); + for (m = op->data.metadata.list.head; m; m = m->next) { + grpc_sopb_add_metadata(&calld->outgoing_sopb, m->md); + } + if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) { + grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.metadata.deadline); + } grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data); break; @@ -200,10 +205,12 @@ static void init_call_elem(grpc_call_element *elem, grpc_sopb_init(&calld->outgoing_sopb); calld->reading_message = 0; - calld->got_metadata_boundary = 0; calld->got_read_close = 0; calld->outgoing_buffer_length_estimate = 0; calld->max_message_length = chand->max_message_length; + calld->incoming_metadata = NULL; + calld->incoming_metadata_capacity = 0; + calld->incoming_metadata_count = 0; gpr_slice_buffer_init(&calld->incoming_message); r = grpc_transport_init_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), @@ -320,6 +327,49 @@ static void finish_message(channel_data *chand, call_data *calld) { grpc_call_next_op(elem, &call_op); } +static void metadata_done_cb(void *ptr, grpc_op_error error) { + gpr_free(ptr); +} + +static void add_incoming_metadata(call_data *calld, grpc_mdelem *elem) { + if (calld->incoming_metadata_count == calld->incoming_metadata_capacity) { + calld->incoming_metadata_capacity = GPR_MAX(8, 2 * calld->incoming_metadata_capacity); + calld->incoming_metadata = gpr_realloc(calld->incoming_metadata, sizeof(*calld->incoming_metadata) * calld->incoming_metadata_capacity); + } + calld->incoming_metadata[calld->incoming_metadata_count++].md = elem; +} + +static void flush_metadata(grpc_call_element *elem) { + grpc_call_op op; + call_data *calld = elem->call_data; + size_t i; + + for (i = 1; i < calld->incoming_metadata_count; i++) { + calld->incoming_metadata[i].prev = &calld->incoming_metadata[i-1]; + } + for (i = 0; i < calld->incoming_metadata_count - 1; i++) { + calld->incoming_metadata[i].next = &calld->incoming_metadata[i+1]; + } + + calld->incoming_metadata[0].prev = calld->incoming_metadata[calld->incoming_metadata_count-1].next = NULL; + + op.type = GRPC_RECV_METADATA; + op.dir = GRPC_CALL_UP; + op.flags = 0; + op.data.metadata.list.head = &calld->incoming_metadata[0]; + op.data.metadata.list.tail = &calld->incoming_metadata[calld->incoming_metadata_count - 1]; + op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; + op.data.metadata.deadline = calld->deadline; + op.done_cb = metadata_done_cb; + op.user_data = calld->incoming_metadata; + + grpc_call_next_op(elem, &op); + + calld->incoming_metadata = NULL; + calld->incoming_metadata_count = 0; + calld->incoming_metadata_capacity = 0; +} + /* Handle incoming stream ops from the transport, translating them into call_ops to pass up the call stack */ static void recv_batch(void *user_data, grpc_transport *transport, @@ -346,33 +396,13 @@ static void recv_batch(void *user_data, grpc_transport *transport, case GRPC_NO_OP: break; case GRPC_OP_METADATA: - call_op.type = GRPC_RECV_METADATA; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.data.metadata = stream_op->data.metadata; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); + add_incoming_metadata(calld, stream_op->data.metadata); break; case GRPC_OP_DEADLINE: - call_op.type = GRPC_RECV_DEADLINE; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.data.deadline = stream_op->data.deadline; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); + calld->deadline = stream_op->data.deadline; break; case GRPC_OP_METADATA_BOUNDARY: - if (!calld->got_metadata_boundary) { - calld->got_metadata_boundary = 1; - call_op.type = GRPC_RECV_END_OF_INITIAL_METADATA; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - } + flush_metadata(elem); break; case GRPC_OP_BEGIN_MESSAGE: /* can't begin a message when we're still reading a message */ |