diff options
Diffstat (limited to 'src/core/channel/connected_channel.c')
-rw-r--r-- | src/core/channel/connected_channel.c | 77 |
1 files changed, 27 insertions, 50 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 62611e08f3..711274bfe1 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -60,7 +60,6 @@ typedef struct connected_channel_call_data { gpr_uint32 max_message_length; gpr_uint32 incoming_message_length; gpr_uint8 reading_message; - gpr_uint8 got_metadata_boundary; gpr_uint8 got_read_close; gpr_slice_buffer incoming_message; gpr_uint32 outgoing_buffer_length_estimate; @@ -120,27 +119,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + if (op->bind_pollset) { + grpc_transport_add_to_pollset(chand->transport, op->bind_pollset); + } + switch (op->type) { case GRPC_SEND_METADATA: grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata); - grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, - op->user_data); - break; - case GRPC_SEND_DEADLINE: - grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.deadline); - grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, - op->user_data); - break; - case GRPC_SEND_START: - grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset); - grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb); end_bufferable_op(op, chand, calld, 0); break; case GRPC_SEND_MESSAGE: grpc_sopb_add_begin_message(&calld->outgoing_sopb, grpc_byte_buffer_length(op->data.message), op->flags); - /* fall-through */ + /* fall-through */ case GRPC_SEND_PREFORMATTED_MESSAGE: copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb); calld->outgoing_buffer_length_estimate += @@ -200,7 +192,6 @@ static void init_call_elem(grpc_call_element *elem, grpc_sopb_init(&calld->outgoing_sopb); calld->reading_message = 0; - calld->got_metadata_boundary = 0; calld->got_read_close = 0; calld->outgoing_buffer_length_estimate = 0; calld->max_message_length = chand->max_message_length; @@ -259,9 +250,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_connected_channel_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "connected", }; + call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected", +}; static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport, grpc_stream *stream, size_t size_hint) { @@ -307,8 +298,8 @@ static void finish_message(channel_data *chand, call_data *calld) { call_op.type = GRPC_RECV_MESSAGE; call_op.done_cb = do_nothing; /* TODO(ctiller): this could be a lot faster if coded directly */ - call_op.data.message = grpc_byte_buffer_create( - calld->incoming_message.slices, calld->incoming_message.count); + call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices, + calld->incoming_message.count); gpr_slice_buffer_reset_and_unref(&calld->incoming_message); /* disable window updates until we get a request more from above */ @@ -320,6 +311,19 @@ static void finish_message(channel_data *chand, call_data *calld) { grpc_call_next_op(elem, &call_op); } +static void got_metadata(grpc_call_element *elem, + grpc_metadata_batch metadata) { + grpc_call_op op; + op.type = GRPC_RECV_METADATA; + op.dir = GRPC_CALL_UP; + op.flags = 0; + op.data.metadata = metadata; + op.done_cb = do_nothing; + op.user_data = NULL; + + grpc_call_next_op(elem, &op); +} + /* Handle incoming stream ops from the transport, translating them into call_ops to pass up the call stack */ static void recv_batch(void *user_data, grpc_transport *transport, @@ -339,40 +343,12 @@ static void recv_batch(void *user_data, grpc_transport *transport, stream_op = ops + i; switch (stream_op->type) { case GRPC_OP_FLOW_CTL_CB: - gpr_log(GPR_ERROR, - "should not receive flow control ops from transport"); - abort(); + stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1); break; case GRPC_NO_OP: break; case GRPC_OP_METADATA: - call_op.type = GRPC_RECV_METADATA; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.data.metadata = stream_op->data.metadata; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - break; - case GRPC_OP_DEADLINE: - call_op.type = GRPC_RECV_DEADLINE; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.data.deadline = stream_op->data.deadline; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - break; - case GRPC_OP_METADATA_BOUNDARY: - if (!calld->got_metadata_boundary) { - calld->got_metadata_boundary = 1; - call_op.type = GRPC_RECV_END_OF_INITIAL_METADATA; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - } + got_metadata(elem, stream_op->data.metadata); break; case GRPC_OP_BEGIN_MESSAGE: /* can't begin a message when we're still reading a message */ @@ -495,7 +471,8 @@ static void transport_closed(void *user_data, grpc_transport *transport) { const grpc_transport_callbacks connected_channel_transport_callbacks = { alloc_recv_buffer, accept_stream, recv_batch, - transport_goaway, transport_closed, }; + transport_goaway, transport_closed, +}; grpc_transport_setup_result grpc_connected_channel_bind_transport( grpc_channel_stack *channel_stack, grpc_transport *transport) { |