diff options
Diffstat (limited to 'src/core/channel/connected_channel.c')
-rw-r--r-- | src/core/channel/connected_channel.c | 305 |
1 files changed, 12 insertions, 293 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 711274bfe1..14dda88698 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -45,25 +45,12 @@ #include <grpc/support/slice_buffer.h> #define MAX_BUFFER_LENGTH 8192 -/* the protobuf library will (by default) start warning at 100megs */ -#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) typedef struct connected_channel_channel_data { grpc_transport *transport; - gpr_uint32 max_message_length; } channel_data; -typedef struct connected_channel_call_data { - grpc_call_element *elem; - grpc_stream_op_buffer outgoing_sopb; - - gpr_uint32 max_message_length; - gpr_uint32 incoming_message_length; - gpr_uint8 reading_message; - gpr_uint8 got_read_close; - gpr_slice_buffer incoming_message; - gpr_uint32 outgoing_buffer_length_estimate; -} call_data; +typedef struct connected_channel_call_data { void *unused; } call_data; /* We perform a small hack to locate transport data alongside the connected channel data in call allocations, to allow everything to be pulled in minimal @@ -72,91 +59,17 @@ typedef struct connected_channel_call_data { #define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \ (((call_data *)(transport_stream)) - 1) -/* Copy the contents of a byte buffer into stream ops */ -static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, - grpc_stream_op_buffer *sopb) { - size_t i; - - switch (byte_buffer->type) { - case GRPC_BB_SLICE_BUFFER: - for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) { - gpr_slice slice = byte_buffer->data.slice_buffer.slices[i]; - gpr_slice_ref(slice); - grpc_sopb_add_slice(sopb, slice); - } - break; - } -} - -/* Flush queued stream operations onto the transport */ -static void end_bufferable_op(grpc_call_op *op, channel_data *chand, - call_data *calld, int is_last) { - size_t nops; - - if (op->flags & GRPC_WRITE_BUFFER_HINT) { - if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) { - op->done_cb(op->user_data, GRPC_OP_OK); - return; - } - } - - calld->outgoing_buffer_length_estimate = 0; - grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data); - - nops = calld->outgoing_sopb.nops; - calld->outgoing_sopb.nops = 0; - grpc_transport_send_batch(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - calld->outgoing_sopb.ops, nops, is_last); -} - /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void con_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - if (op->bind_pollset) { - grpc_transport_add_to_pollset(chand->transport, op->bind_pollset); - } - - switch (op->type) { - case GRPC_SEND_METADATA: - grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata); - end_bufferable_op(op, chand, calld, 0); - break; - case GRPC_SEND_MESSAGE: - grpc_sopb_add_begin_message(&calld->outgoing_sopb, - grpc_byte_buffer_length(op->data.message), - op->flags); - /* fall-through */ - case GRPC_SEND_PREFORMATTED_MESSAGE: - copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb); - calld->outgoing_buffer_length_estimate += - (5 + grpc_byte_buffer_length(op->data.message)); - end_bufferable_op(op, chand, calld, 0); - break; - case GRPC_SEND_FINISH: - end_bufferable_op(op, chand, calld, 1); - break; - case GRPC_REQUEST_DATA: - /* re-arm window updates if they were disarmed by finish_message */ - grpc_transport_set_allow_window_updates( - chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1); - break; - case GRPC_CANCEL_OP: - grpc_transport_abort_stream(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - GRPC_STATUS_CANCELLED); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_UP); - grpc_call_next_op(elem, op); - break; - } + grpc_transport_perform_op(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); } /* Currently we assume all channel operations should just be pushed up. */ @@ -182,23 +95,16 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { + const void *server_transport_data, + grpc_transport_op *initial_op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; int r; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - calld->elem = elem; - grpc_sopb_init(&calld->outgoing_sopb); - - calld->reading_message = 0; - calld->got_read_close = 0; - calld->outgoing_buffer_length_estimate = 0; - calld->max_message_length = chand->max_message_length; - gpr_slice_buffer_init(&calld->incoming_message); r = grpc_transport_init_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - server_transport_data); + server_transport_data, initial_op); GPR_ASSERT(r == 0); } @@ -207,8 +113,6 @@ static void destroy_call_elem(grpc_call_element *elem) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - grpc_sopb_destroy(&calld->outgoing_sopb); - gpr_slice_buffer_destroy(&calld->incoming_message); grpc_transport_destroy_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld)); } @@ -218,28 +122,10 @@ static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *cd = (channel_data *)elem->channel_data; - size_t i; GPR_ASSERT(!is_first); GPR_ASSERT(is_last); GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); cd->transport = NULL; - - cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; - if (args) { - for (i = 0; i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { - if (args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s ignored: it must be an integer", - GRPC_ARG_MAX_MESSAGE_LENGTH); - } else if (args->args[i].value.integer < 0) { - gpr_log(GPR_ERROR, "%s ignored: it must be >= 0", - GRPC_ARG_MAX_MESSAGE_LENGTH); - } else { - cd->max_message_length = args->args[i].value.integer; - } - } - } - } } /* Destructor for channel_data */ @@ -250,15 +136,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_connected_channel_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected", + con_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "connected", }; -static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport, - grpc_stream *stream, size_t size_hint) { - return gpr_slice_malloc(size_hint); -} - /* Transport callback to accept a new stream... calls up to handle it */ static void accept_stream(void *user_data, grpc_transport *transport, const void *transport_server_data) { @@ -276,168 +158,6 @@ static void accept_stream(void *user_data, grpc_transport *transport, channel_op(elem, NULL, &op); } -static void recv_error(channel_data *chand, call_data *calld, int line, - const char *message) { - gpr_log_message(__FILE__, line, GPR_LOG_SEVERITY_ERROR, message); - - if (chand->transport) { - grpc_transport_abort_stream(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - GRPC_STATUS_INVALID_ARGUMENT); - } -} - -static void do_nothing(void *calldata, grpc_op_error error) {} - -static void finish_message(channel_data *chand, call_data *calld) { - grpc_call_element *elem = calld->elem; - grpc_call_op call_op; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - /* if we got all the bytes for this message, call up the stack */ - call_op.type = GRPC_RECV_MESSAGE; - call_op.done_cb = do_nothing; - /* TODO(ctiller): this could be a lot faster if coded directly */ - call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices, - calld->incoming_message.count); - gpr_slice_buffer_reset_and_unref(&calld->incoming_message); - - /* disable window updates until we get a request more from above */ - grpc_transport_set_allow_window_updates( - chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0); - - GPR_ASSERT(calld->incoming_message.count == 0); - calld->reading_message = 0; - grpc_call_next_op(elem, &call_op); -} - -static void got_metadata(grpc_call_element *elem, - grpc_metadata_batch metadata) { - grpc_call_op op; - op.type = GRPC_RECV_METADATA; - op.dir = GRPC_CALL_UP; - op.flags = 0; - op.data.metadata = metadata; - op.done_cb = do_nothing; - op.user_data = NULL; - - grpc_call_next_op(elem, &op); -} - -/* Handle incoming stream ops from the transport, translating them into - call_ops to pass up the call stack */ -static void recv_batch(void *user_data, grpc_transport *transport, - grpc_stream *stream, grpc_stream_op *ops, - size_t ops_count, grpc_stream_state final_state) { - call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream); - grpc_call_element *elem = calld->elem; - channel_data *chand = elem->channel_data; - grpc_stream_op *stream_op; - grpc_call_op call_op; - size_t i; - gpr_uint32 length; - - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - - for (i = 0; i < ops_count; i++) { - stream_op = ops + i; - switch (stream_op->type) { - case GRPC_OP_FLOW_CTL_CB: - stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1); - break; - case GRPC_NO_OP: - break; - case GRPC_OP_METADATA: - got_metadata(elem, stream_op->data.metadata); - break; - case GRPC_OP_BEGIN_MESSAGE: - /* can't begin a message when we're still reading a message */ - if (calld->reading_message) { - char *message = NULL; - gpr_asprintf(&message, - "Message terminated early; read %d bytes, expected %d", - (int)calld->incoming_message.length, - (int)calld->incoming_message_length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - return; - } - /* stash away parameters, and prepare for incoming slices */ - length = stream_op->data.begin_message.length; - if (length > calld->max_message_length) { - char *message = NULL; - gpr_asprintf( - &message, - "Maximum message length of %d exceeded by a message of length %d", - calld->max_message_length, length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - } else if (length > 0) { - calld->reading_message = 1; - calld->incoming_message_length = length; - } else { - finish_message(chand, calld); - } - break; - case GRPC_OP_SLICE: - if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) { - gpr_slice_unref(stream_op->data.slice); - break; - } - /* we have to be reading a message to know what to do here */ - if (!calld->reading_message) { - recv_error(chand, calld, __LINE__, - "Received payload data while not reading a message"); - return; - } - /* append the slice to the incoming buffer */ - gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice); - if (calld->incoming_message.length > calld->incoming_message_length) { - /* if we got too many bytes, complain */ - char *message = NULL; - gpr_asprintf(&message, - "Receiving message overflow; read %d bytes, expected %d", - (int)calld->incoming_message.length, - (int)calld->incoming_message_length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - return; - } else if (calld->incoming_message.length == - calld->incoming_message_length) { - finish_message(chand, calld); - } - } - } - /* if the stream closed, then call up the stack to let it know */ - if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED || - final_state == GRPC_STREAM_CLOSED)) { - calld->got_read_close = 1; - if (calld->reading_message) { - char *message = NULL; - gpr_asprintf(&message, - "Last message truncated; read %d bytes, expected %d", - (int)calld->incoming_message.length, - (int)calld->incoming_message_length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - } - call_op.type = GRPC_RECV_HALF_CLOSE; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - } - if (final_state == GRPC_STREAM_CLOSED) { - call_op.type = GRPC_RECV_FINISH; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - } -} - static void transport_goaway(void *user_data, grpc_transport *transport, grpc_status_code status, gpr_slice debug) { /* transport got goaway ==> call up and handle it */ @@ -470,8 +190,7 @@ static void transport_closed(void *user_data, grpc_transport *transport) { } const grpc_transport_callbacks connected_channel_transport_callbacks = { - alloc_recv_buffer, accept_stream, recv_batch, - transport_goaway, transport_closed, + accept_stream, transport_goaway, transport_closed, }; grpc_transport_setup_result grpc_connected_channel_bind_transport( |