diff options
Diffstat (limited to 'src/core/channel/connected_channel.c')
-rw-r--r-- | src/core/channel/connected_channel.c | 72 |
1 files changed, 7 insertions, 65 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index f7fed7cae9..17abba06be 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -63,11 +63,6 @@ typedef struct connected_channel_call_data { gpr_uint8 got_read_close; gpr_slice_buffer incoming_message; gpr_uint32 outgoing_buffer_length_estimate; - - grpc_linked_mdelem *incoming_metadata; - size_t incoming_metadata_count; - size_t incoming_metadata_capacity; - gpr_timespec deadline; } call_data; /* We perform a small hack to locate transport data alongside the connected @@ -120,26 +115,18 @@ static void end_bufferable_op(grpc_call_op *op, channel_data *chand, static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_op *op) { call_data *calld = elem->call_data; - grpc_linked_mdelem *m; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { case GRPC_SEND_METADATA: - for (m = op->data.metadata.list.head; m; m = m->next) { - grpc_sopb_add_metadata(&calld->outgoing_sopb, m->md); - } - if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) { - grpc_sopb_add_deadline(&calld->outgoing_sopb, - op->data.metadata.deadline); - } + grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata); grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data); break; case GRPC_SEND_START: grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset); - grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb); end_bufferable_op(op, chand, calld, 0); break; case GRPC_SEND_MESSAGE: @@ -209,9 +196,6 @@ static void init_call_elem(grpc_call_element *elem, calld->got_read_close = 0; calld->outgoing_buffer_length_estimate = 0; calld->max_message_length = chand->max_message_length; - calld->incoming_metadata = NULL; - calld->incoming_metadata_capacity = 0; - calld->incoming_metadata_count = 0; gpr_slice_buffer_init(&calld->incoming_message); r = grpc_transport_init_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), @@ -328,50 +312,16 @@ static void finish_message(channel_data *chand, call_data *calld) { grpc_call_next_op(elem, &call_op); } -static void metadata_done_cb(void *ptr, grpc_op_error error) { gpr_free(ptr); } - -static void add_incoming_metadata(call_data *calld, grpc_mdelem *elem) { - if (calld->incoming_metadata_count == calld->incoming_metadata_capacity) { - calld->incoming_metadata_capacity = - GPR_MAX(8, 2 * calld->incoming_metadata_capacity); - calld->incoming_metadata = gpr_realloc( - calld->incoming_metadata, - sizeof(*calld->incoming_metadata) * calld->incoming_metadata_capacity); - } - calld->incoming_metadata[calld->incoming_metadata_count++].md = elem; -} - -static void flush_metadata(grpc_call_element *elem) { +static void got_metadata(grpc_call_element *elem, grpc_metadata_batch metadata) { grpc_call_op op; - call_data *calld = elem->call_data; - size_t i; - - for (i = 1; i < calld->incoming_metadata_count; i++) { - calld->incoming_metadata[i].prev = &calld->incoming_metadata[i - 1]; - } - for (i = 0; i < calld->incoming_metadata_count - 1; i++) { - calld->incoming_metadata[i].next = &calld->incoming_metadata[i + 1]; - } - - calld->incoming_metadata[0].prev = - calld->incoming_metadata[calld->incoming_metadata_count - 1].next = NULL; - op.type = GRPC_RECV_METADATA; op.dir = GRPC_CALL_UP; op.flags = 0; - op.data.metadata.list.head = &calld->incoming_metadata[0]; - op.data.metadata.list.tail = - &calld->incoming_metadata[calld->incoming_metadata_count - 1]; - op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; - op.data.metadata.deadline = calld->deadline; - op.done_cb = metadata_done_cb; - op.user_data = calld->incoming_metadata; + op.data.metadata = metadata; + op.done_cb = do_nothing; + op.user_data = NULL; grpc_call_next_op(elem, &op); - - calld->incoming_metadata = NULL; - calld->incoming_metadata_count = 0; - calld->incoming_metadata_capacity = 0; } /* Handle incoming stream ops from the transport, translating them into @@ -393,20 +343,12 @@ static void recv_batch(void *user_data, grpc_transport *transport, stream_op = ops + i; switch (stream_op->type) { case GRPC_OP_FLOW_CTL_CB: - gpr_log(GPR_ERROR, - "should not receive flow control ops from transport"); - abort(); + stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1); break; case GRPC_NO_OP: break; case GRPC_OP_METADATA: - add_incoming_metadata(calld, stream_op->data.metadata); - break; - case GRPC_OP_DEADLINE: - calld->deadline = stream_op->data.deadline; - break; - case GRPC_OP_METADATA_BOUNDARY: - flush_metadata(elem); + got_metadata(elem, stream_op->data.metadata); break; case GRPC_OP_BEGIN_MESSAGE: /* can't begin a message when we're still reading a message */ |