aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/connected_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/connected_channel.c')
-rw-r--r--src/core/channel/connected_channel.c77
1 files changed, 27 insertions, 50 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 62611e08f3..711274bfe1 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -60,7 +60,6 @@ typedef struct connected_channel_call_data {
gpr_uint32 max_message_length;
gpr_uint32 incoming_message_length;
gpr_uint8 reading_message;
- gpr_uint8 got_metadata_boundary;
gpr_uint8 got_read_close;
gpr_slice_buffer incoming_message;
gpr_uint32 outgoing_buffer_length_estimate;
@@ -120,27 +119,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ if (op->bind_pollset) {
+ grpc_transport_add_to_pollset(chand->transport, op->bind_pollset);
+ }
+
switch (op->type) {
case GRPC_SEND_METADATA:
grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
- op->user_data);
- break;
- case GRPC_SEND_DEADLINE:
- grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.deadline);
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
- op->user_data);
- break;
- case GRPC_SEND_START:
- grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset);
- grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb);
end_bufferable_op(op, chand, calld, 0);
break;
case GRPC_SEND_MESSAGE:
grpc_sopb_add_begin_message(&calld->outgoing_sopb,
grpc_byte_buffer_length(op->data.message),
op->flags);
- /* fall-through */
+ /* fall-through */
case GRPC_SEND_PREFORMATTED_MESSAGE:
copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb);
calld->outgoing_buffer_length_estimate +=
@@ -200,7 +192,6 @@ static void init_call_elem(grpc_call_element *elem,
grpc_sopb_init(&calld->outgoing_sopb);
calld->reading_message = 0;
- calld->got_metadata_boundary = 0;
calld->got_read_close = 0;
calld->outgoing_buffer_length_estimate = 0;
calld->max_message_length = chand->max_message_length;
@@ -259,9 +250,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_connected_channel_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "connected", };
+ call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected",
+};
static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport,
grpc_stream *stream, size_t size_hint) {
@@ -307,8 +298,8 @@ static void finish_message(channel_data *chand, call_data *calld) {
call_op.type = GRPC_RECV_MESSAGE;
call_op.done_cb = do_nothing;
/* TODO(ctiller): this could be a lot faster if coded directly */
- call_op.data.message = grpc_byte_buffer_create(
- calld->incoming_message.slices, calld->incoming_message.count);
+ call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices,
+ calld->incoming_message.count);
gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
/* disable window updates until we get a request more from above */
@@ -320,6 +311,19 @@ static void finish_message(channel_data *chand, call_data *calld) {
grpc_call_next_op(elem, &call_op);
}
+static void got_metadata(grpc_call_element *elem,
+ grpc_metadata_batch metadata) {
+ grpc_call_op op;
+ op.type = GRPC_RECV_METADATA;
+ op.dir = GRPC_CALL_UP;
+ op.flags = 0;
+ op.data.metadata = metadata;
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+
+ grpc_call_next_op(elem, &op);
+}
+
/* Handle incoming stream ops from the transport, translating them into
call_ops to pass up the call stack */
static void recv_batch(void *user_data, grpc_transport *transport,
@@ -339,40 +343,12 @@ static void recv_batch(void *user_data, grpc_transport *transport,
stream_op = ops + i;
switch (stream_op->type) {
case GRPC_OP_FLOW_CTL_CB:
- gpr_log(GPR_ERROR,
- "should not receive flow control ops from transport");
- abort();
+ stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1);
break;
case GRPC_NO_OP:
break;
case GRPC_OP_METADATA:
- call_op.type = GRPC_RECV_METADATA;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.data.metadata = stream_op->data.metadata;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- break;
- case GRPC_OP_DEADLINE:
- call_op.type = GRPC_RECV_DEADLINE;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.data.deadline = stream_op->data.deadline;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- break;
- case GRPC_OP_METADATA_BOUNDARY:
- if (!calld->got_metadata_boundary) {
- calld->got_metadata_boundary = 1;
- call_op.type = GRPC_RECV_END_OF_INITIAL_METADATA;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
+ got_metadata(elem, stream_op->data.metadata);
break;
case GRPC_OP_BEGIN_MESSAGE:
/* can't begin a message when we're still reading a message */
@@ -495,7 +471,8 @@ static void transport_closed(void *user_data, grpc_transport *transport) {
const grpc_transport_callbacks connected_channel_transport_callbacks = {
alloc_recv_buffer, accept_stream, recv_batch,
- transport_goaway, transport_closed, };
+ transport_goaway, transport_closed,
+};
grpc_transport_setup_result grpc_connected_channel_bind_transport(
grpc_channel_stack *channel_stack, grpc_transport *transport) {