From 83f88d90b9c61c94994ed00d03a6fa469359d559 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 21 Apr 2015 16:02:05 -0700 Subject: stuff --- src/core/channel/connected_channel.c | 261 ++--------------------------------- 1 file changed, 9 insertions(+), 252 deletions(-) (limited to 'src/core/channel/connected_channel.c') diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 711274bfe1..d0b834a10a 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -50,19 +50,10 @@ typedef struct connected_channel_channel_data { grpc_transport *transport; - gpr_uint32 max_message_length; } channel_data; typedef struct connected_channel_call_data { - grpc_call_element *elem; - grpc_stream_op_buffer outgoing_sopb; - - gpr_uint32 max_message_length; - gpr_uint32 incoming_message_length; - gpr_uint8 reading_message; - gpr_uint8 got_read_close; - gpr_slice_buffer incoming_message; - gpr_uint32 outgoing_buffer_length_estimate; + void *unused; } call_data; /* We perform a small hack to locate transport data alongside the connected @@ -72,6 +63,7 @@ typedef struct connected_channel_call_data { #define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \ (((call_data *)(transport_stream)) - 1) +#if 0 /* Copy the contents of a byte buffer into stream ops */ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, grpc_stream_op_buffer *sopb) { @@ -87,76 +79,17 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, break; } } - -/* Flush queued stream operations onto the transport */ -static void end_bufferable_op(grpc_call_op *op, channel_data *chand, - call_data *calld, int is_last) { - size_t nops; - - if (op->flags & GRPC_WRITE_BUFFER_HINT) { - if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) { - op->done_cb(op->user_data, GRPC_OP_OK); - return; - } - } - - calld->outgoing_buffer_length_estimate = 0; - grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data); - - nops = calld->outgoing_sopb.nops; - calld->outgoing_sopb.nops = 0; - grpc_transport_send_batch(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - calld->outgoing_sopb.ops, nops, is_last); -} +#endif /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void con_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - if (op->bind_pollset) { - grpc_transport_add_to_pollset(chand->transport, op->bind_pollset); - } - - switch (op->type) { - case GRPC_SEND_METADATA: - grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata); - end_bufferable_op(op, chand, calld, 0); - break; - case GRPC_SEND_MESSAGE: - grpc_sopb_add_begin_message(&calld->outgoing_sopb, - grpc_byte_buffer_length(op->data.message), - op->flags); - /* fall-through */ - case GRPC_SEND_PREFORMATTED_MESSAGE: - copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb); - calld->outgoing_buffer_length_estimate += - (5 + grpc_byte_buffer_length(op->data.message)); - end_bufferable_op(op, chand, calld, 0); - break; - case GRPC_SEND_FINISH: - end_bufferable_op(op, chand, calld, 1); - break; - case GRPC_REQUEST_DATA: - /* re-arm window updates if they were disarmed by finish_message */ - grpc_transport_set_allow_window_updates( - chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1); - break; - case GRPC_CANCEL_OP: - grpc_transport_abort_stream(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - GRPC_STATUS_CANCELLED); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_UP); - grpc_call_next_op(elem, op); - break; - } + grpc_transport_perform_op(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); } /* Currently we assume all channel operations should just be pushed up. */ @@ -188,14 +121,6 @@ static void init_call_elem(grpc_call_element *elem, int r; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - calld->elem = elem; - grpc_sopb_init(&calld->outgoing_sopb); - - calld->reading_message = 0; - calld->got_read_close = 0; - calld->outgoing_buffer_length_estimate = 0; - calld->max_message_length = chand->max_message_length; - gpr_slice_buffer_init(&calld->incoming_message); r = grpc_transport_init_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), server_transport_data); @@ -207,8 +132,6 @@ static void destroy_call_elem(grpc_call_element *elem) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - grpc_sopb_destroy(&calld->outgoing_sopb); - gpr_slice_buffer_destroy(&calld->incoming_message); grpc_transport_destroy_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld)); } @@ -218,12 +141,12 @@ static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *cd = (channel_data *)elem->channel_data; - size_t i; GPR_ASSERT(!is_first); GPR_ASSERT(is_last); GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); cd->transport = NULL; +#if 0 cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; if (args) { for (i = 0; i < args->num_args; i++) { @@ -240,6 +163,7 @@ static void init_channel_elem(grpc_channel_element *elem, } } } +#endif } /* Destructor for channel_data */ @@ -250,15 +174,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_connected_channel_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + con_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected", }; -static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport, - grpc_stream *stream, size_t size_hint) { - return gpr_slice_malloc(size_hint); -} - /* Transport callback to accept a new stream... calls up to handle it */ static void accept_stream(void *user_data, grpc_transport *transport, const void *transport_server_data) { @@ -276,168 +195,6 @@ static void accept_stream(void *user_data, grpc_transport *transport, channel_op(elem, NULL, &op); } -static void recv_error(channel_data *chand, call_data *calld, int line, - const char *message) { - gpr_log_message(__FILE__, line, GPR_LOG_SEVERITY_ERROR, message); - - if (chand->transport) { - grpc_transport_abort_stream(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - GRPC_STATUS_INVALID_ARGUMENT); - } -} - -static void do_nothing(void *calldata, grpc_op_error error) {} - -static void finish_message(channel_data *chand, call_data *calld) { - grpc_call_element *elem = calld->elem; - grpc_call_op call_op; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - /* if we got all the bytes for this message, call up the stack */ - call_op.type = GRPC_RECV_MESSAGE; - call_op.done_cb = do_nothing; - /* TODO(ctiller): this could be a lot faster if coded directly */ - call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices, - calld->incoming_message.count); - gpr_slice_buffer_reset_and_unref(&calld->incoming_message); - - /* disable window updates until we get a request more from above */ - grpc_transport_set_allow_window_updates( - chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0); - - GPR_ASSERT(calld->incoming_message.count == 0); - calld->reading_message = 0; - grpc_call_next_op(elem, &call_op); -} - -static void got_metadata(grpc_call_element *elem, - grpc_metadata_batch metadata) { - grpc_call_op op; - op.type = GRPC_RECV_METADATA; - op.dir = GRPC_CALL_UP; - op.flags = 0; - op.data.metadata = metadata; - op.done_cb = do_nothing; - op.user_data = NULL; - - grpc_call_next_op(elem, &op); -} - -/* Handle incoming stream ops from the transport, translating them into - call_ops to pass up the call stack */ -static void recv_batch(void *user_data, grpc_transport *transport, - grpc_stream *stream, grpc_stream_op *ops, - size_t ops_count, grpc_stream_state final_state) { - call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream); - grpc_call_element *elem = calld->elem; - channel_data *chand = elem->channel_data; - grpc_stream_op *stream_op; - grpc_call_op call_op; - size_t i; - gpr_uint32 length; - - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - - for (i = 0; i < ops_count; i++) { - stream_op = ops + i; - switch (stream_op->type) { - case GRPC_OP_FLOW_CTL_CB: - stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1); - break; - case GRPC_NO_OP: - break; - case GRPC_OP_METADATA: - got_metadata(elem, stream_op->data.metadata); - break; - case GRPC_OP_BEGIN_MESSAGE: - /* can't begin a message when we're still reading a message */ - if (calld->reading_message) { - char *message = NULL; - gpr_asprintf(&message, - "Message terminated early; read %d bytes, expected %d", - (int)calld->incoming_message.length, - (int)calld->incoming_message_length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - return; - } - /* stash away parameters, and prepare for incoming slices */ - length = stream_op->data.begin_message.length; - if (length > calld->max_message_length) { - char *message = NULL; - gpr_asprintf( - &message, - "Maximum message length of %d exceeded by a message of length %d", - calld->max_message_length, length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - } else if (length > 0) { - calld->reading_message = 1; - calld->incoming_message_length = length; - } else { - finish_message(chand, calld); - } - break; - case GRPC_OP_SLICE: - if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) { - gpr_slice_unref(stream_op->data.slice); - break; - } - /* we have to be reading a message to know what to do here */ - if (!calld->reading_message) { - recv_error(chand, calld, __LINE__, - "Received payload data while not reading a message"); - return; - } - /* append the slice to the incoming buffer */ - gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice); - if (calld->incoming_message.length > calld->incoming_message_length) { - /* if we got too many bytes, complain */ - char *message = NULL; - gpr_asprintf(&message, - "Receiving message overflow; read %d bytes, expected %d", - (int)calld->incoming_message.length, - (int)calld->incoming_message_length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - return; - } else if (calld->incoming_message.length == - calld->incoming_message_length) { - finish_message(chand, calld); - } - } - } - /* if the stream closed, then call up the stack to let it know */ - if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED || - final_state == GRPC_STREAM_CLOSED)) { - calld->got_read_close = 1; - if (calld->reading_message) { - char *message = NULL; - gpr_asprintf(&message, - "Last message truncated; read %d bytes, expected %d", - (int)calld->incoming_message.length, - (int)calld->incoming_message_length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - } - call_op.type = GRPC_RECV_HALF_CLOSE; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - } - if (final_state == GRPC_STREAM_CLOSED) { - call_op.type = GRPC_RECV_FINISH; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - } -} - static void transport_goaway(void *user_data, grpc_transport *transport, grpc_status_code status, gpr_slice debug) { /* transport got goaway ==> call up and handle it */ @@ -470,7 +227,7 @@ static void transport_closed(void *user_data, grpc_transport *transport) { } const grpc_transport_callbacks connected_channel_transport_callbacks = { - alloc_recv_buffer, accept_stream, recv_batch, + accept_stream, transport_goaway, transport_closed, }; -- cgit v1.2.3