diff options
author | Nicolas Noble <nnoble@google.com> | 2014-11-26 16:33:03 -0800 |
---|---|---|
committer | Nicolas Noble <nnoble@google.com> | 2014-11-26 16:33:03 -0800 |
commit | b7ebd3b8c6fe39f99c40b10c1b563e4adb607b6c (patch) | |
tree | c1decf819492d455ec81cd471942c5516138f825 /src/core/channel/connected_channel.c | |
parent | 0e905e63db21bcdd85d3d1af051fcdc5bb5caa38 (diff) |
Initial import.
Diffstat (limited to 'src/core/channel/connected_channel.c')
-rw-r--r-- | src/core/channel/connected_channel.c | 501 |
1 files changed, 501 insertions, 0 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c new file mode 100644 index 0000000000..336472e740 --- /dev/null +++ b/src/core/channel/connected_channel.c @@ -0,0 +1,501 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/connected_channel.h" + +#include <stdarg.h> +#include <stdio.h> +#include <string.h> + +#include "src/core/transport/transport.h" +#include <grpc/byte_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string.h> + +#define MAX_BUFFER_LENGTH 8192 +/* the protobuf library will (by default) start warning at 100megs */ +#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) + +typedef struct { + grpc_transport *transport; + gpr_uint32 max_message_length; +} channel_data; + +typedef struct { + grpc_call_element *elem; + grpc_stream_op_buffer outgoing_sopb; + + gpr_uint32 max_message_length; + gpr_uint32 incoming_message_length; + gpr_uint8 reading_message; + gpr_uint8 got_metadata_boundary; + gpr_uint8 got_read_close; + gpr_slice_buffer incoming_message; + gpr_uint32 outgoing_buffer_length_estimate; +} call_data; + +/* We perform a small hack to locate transport data alongside the connected + channel data in call allocations, to allow everything to be pulled in minimal + cache line requests */ +#define TRANSPORT_STREAM_FROM_CALL_DATA(calld) ((grpc_stream *)((calld)+1)) +#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \ + (((call_data *)(transport_stream)) - 1) + +/* Copy the contents of a byte buffer into stream ops */ +static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, + grpc_stream_op_buffer *sopb) { + size_t i; + + switch (byte_buffer->type) { + case GRPC_BB_SLICE_BUFFER: + for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) { + gpr_slice slice = byte_buffer->data.slice_buffer.slices[i]; + gpr_slice_ref(slice); + grpc_sopb_add_slice(sopb, slice); + } + break; + } +} + +/* Flush queued stream operations onto the transport */ +static void end_bufferable_op(grpc_call_op *op, channel_data *chand, + call_data *calld, int is_last) { + size_t nops; + + if (op->flags & GRPC_WRITE_BUFFER_HINT) { + if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) { + op->done_cb(op->user_data, GRPC_OP_OK); + return; + } + } + + calld->outgoing_buffer_length_estimate = 0; + grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data); + + nops = calld->outgoing_sopb.nops; + calld->outgoing_sopb.nops = 0; + grpc_transport_send_batch(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + calld->outgoing_sopb.ops, nops, is_last); +} + +/* Intercept a call operation and either push it directly up or translate it + into transport stream operations */ +static void call_op(grpc_call_element *elem, grpc_call_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + + switch (op->type) { + case GRPC_SEND_METADATA: + grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata); + grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, + op->user_data); + break; + case GRPC_SEND_DEADLINE: + grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.deadline); + grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, + op->user_data); + break; + case GRPC_SEND_START: + grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb); + end_bufferable_op(op, chand, calld, 0); + break; + case GRPC_SEND_MESSAGE: + grpc_sopb_add_begin_message(&calld->outgoing_sopb, + grpc_byte_buffer_length(op->data.message), + op->flags); + copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb); + calld->outgoing_buffer_length_estimate += + (5 + grpc_byte_buffer_length(op->data.message)); + end_bufferable_op(op, chand, calld, 0); + break; + case GRPC_SEND_FINISH: + end_bufferable_op(op, chand, calld, 1); + break; + case GRPC_REQUEST_DATA: + /* re-arm window updates if they were disarmed by finish_message */ + grpc_transport_set_allow_window_updates( + chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1); + break; + case GRPC_CANCEL_OP: + grpc_transport_abort_stream(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + GRPC_STATUS_CANCELLED); + break; + default: + GPR_ASSERT(op->dir == GRPC_CALL_UP); + grpc_call_next_op(elem, op); + break; + } +} + +/* Currently we assume all channel operations should just be pushed up. */ +static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) { + channel_data *chand = elem->channel_data; + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + + switch (op->type) { + case GRPC_CHANNEL_SHUTDOWN: + grpc_transport_close(chand->transport); + break; + default: + GPR_ASSERT(op->dir == GRPC_CALL_UP); + grpc_channel_next_op(elem, op); + break; + } +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_call_element *elem, + const void *server_transport_data) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + int r; + + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + calld->elem = elem; + grpc_sopb_init(&calld->outgoing_sopb); + + calld->reading_message = 0; + calld->got_metadata_boundary = 0; + calld->got_read_close = 0; + calld->outgoing_buffer_length_estimate = 0; + calld->max_message_length = chand->max_message_length; + gpr_slice_buffer_init(&calld->incoming_message); + r = grpc_transport_init_stream(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + server_transport_data); + GPR_ASSERT(r == 0); +} + +/* Destructor for call_data */ +static void destroy_call_elem(grpc_call_element *elem) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + grpc_sopb_destroy(&calld->outgoing_sopb); + gpr_slice_buffer_destroy(&calld->incoming_message); + grpc_transport_destroy_stream(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld)); +} + +/* Constructor for channel_data */ +static void init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, grpc_mdctx *mdctx, + int is_first, int is_last) { + channel_data *cd = (channel_data *)elem->channel_data; + size_t i; + GPR_ASSERT(!is_first); + GPR_ASSERT(is_last); + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + cd->transport = NULL; + + cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; + if (args) { + for (i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { + if (args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s ignored: it must be an integer", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else if (args->args[i].value.integer < 0) { + gpr_log(GPR_ERROR, "%s ignored: it must be >= 0", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else { + cd->max_message_length = args->args[i].value.integer; + } + } + } + } +} + +/* Destructor for channel_data */ +static void destroy_channel_elem(grpc_channel_element *elem) { + channel_data *cd = (channel_data *)elem->channel_data; + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + grpc_transport_destroy(cd->transport); +} + +const grpc_channel_filter grpc_connected_channel_filter = { + call_op, channel_op, + + sizeof(call_data), init_call_elem, destroy_call_elem, + + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + + "connected", +}; + +static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport, + grpc_stream *stream, size_t size_hint) { + return gpr_slice_malloc(size_hint); +} + +/* Transport callback to accept a new stream... calls up to handle it */ +static void accept_stream(void *user_data, grpc_transport *transport, + const void *transport_server_data) { + grpc_channel_element *elem = user_data; + channel_data *chand = elem->channel_data; + grpc_channel_op op; + + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + GPR_ASSERT(chand->transport == transport); + + op.type = GRPC_ACCEPT_CALL; + op.dir = GRPC_CALL_UP; + op.data.accept_call.transport = transport; + op.data.accept_call.transport_server_data = transport_server_data; + channel_op(elem, &op); +} + +static void recv_error(channel_data *chand, call_data *calld, int line, + const char *fmt, ...) { + char msg[512]; + va_list a; + + va_start(a, fmt); + vsprintf(msg, fmt, a); + va_end(a); + + gpr_log(__FILE__, line, GPR_LOG_SEVERITY_ERROR, "%s", msg); + + if (chand->transport) { + grpc_transport_abort_stream(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + GRPC_STATUS_INVALID_ARGUMENT); + } +} + +static void do_nothing(void *calldata, grpc_op_error error) {} + +static void done_message(void *user_data, grpc_op_error error) { + grpc_byte_buffer_destroy(user_data); +} + +static void finish_message(channel_data *chand, call_data *calld) { + grpc_call_element *elem = calld->elem; + grpc_call_op call_op; + call_op.dir = GRPC_CALL_UP; + call_op.flags = 0; + /* if we got all the bytes for this message, call up the stack */ + call_op.type = GRPC_RECV_MESSAGE; + call_op.done_cb = done_message; + /* TODO(ctiller): this could be a lot faster if coded directly */ + call_op.user_data = call_op.data.message = grpc_byte_buffer_create( + calld->incoming_message.slices, calld->incoming_message.count); + gpr_slice_buffer_reset_and_unref(&calld->incoming_message); + + /* disable window updates until we get a request more from above */ + grpc_transport_set_allow_window_updates( + chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0); + + GPR_ASSERT(calld->incoming_message.count == 0); + calld->reading_message = 0; + grpc_call_next_op(elem, &call_op); +} + +/* Handle incoming stream ops from the transport, translating them into + call_ops to pass up the call stack */ +static void recv_batch(void *user_data, grpc_transport *transport, + grpc_stream *stream, grpc_stream_op *ops, + size_t ops_count, grpc_stream_state final_state) { + call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream); + grpc_call_element *elem = calld->elem; + channel_data *chand = elem->channel_data; + grpc_stream_op *stream_op; + grpc_call_op call_op; + size_t i; + gpr_uint32 length; + + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + + for (i = 0; i < ops_count; i++) { + stream_op = ops + i; + switch (stream_op->type) { + case GRPC_OP_FLOW_CTL_CB: + gpr_log(GPR_ERROR, + "should not receive flow control ops from transport"); + abort(); + break; + case GRPC_NO_OP: + break; + case GRPC_OP_METADATA: + call_op.type = GRPC_RECV_METADATA; + call_op.dir = GRPC_CALL_UP; + call_op.flags = 0; + call_op.data.metadata = stream_op->data.metadata; + call_op.done_cb = do_nothing; + call_op.user_data = NULL; + grpc_call_next_op(elem, &call_op); + break; + case GRPC_OP_DEADLINE: + call_op.type = GRPC_RECV_DEADLINE; + call_op.dir = GRPC_CALL_UP; + call_op.flags = 0; + call_op.data.deadline = stream_op->data.deadline; + call_op.done_cb = do_nothing; + call_op.user_data = NULL; + grpc_call_next_op(elem, &call_op); + break; + case GRPC_OP_METADATA_BOUNDARY: + if (!calld->got_metadata_boundary) { + calld->got_metadata_boundary = 1; + call_op.type = GRPC_RECV_END_OF_INITIAL_METADATA; + call_op.dir = GRPC_CALL_UP; + call_op.flags = 0; + call_op.done_cb = do_nothing; + call_op.user_data = NULL; + grpc_call_next_op(elem, &call_op); + } + break; + case GRPC_OP_BEGIN_MESSAGE: + /* can't begin a message when we're still reading a message */ + if (calld->reading_message) { + recv_error(chand, calld, __LINE__, + "Message terminated early; read %d bytes, expected %d", + calld->incoming_message.length, + calld->incoming_message_length); + return; + } + /* stash away parameters, and prepare for incoming slices */ + length = stream_op->data.begin_message.length; + if (length > calld->max_message_length) { + recv_error( + chand, calld, __LINE__, + "Maximum message length of %d exceeded by a message of length %d", + calld->max_message_length, length); + } else if (length > 0) { + calld->reading_message = 1; + calld->incoming_message_length = length; + } else { + finish_message(chand, calld); + } + break; + case GRPC_OP_SLICE: + if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) { + gpr_slice_unref(stream_op->data.slice); + break; + } + /* we have to be reading a message to know what to do here */ + if (!calld->reading_message) { + recv_error(chand, calld, __LINE__, + "Received payload data while not reading a message"); + return; + } + /* append the slice to the incoming buffer */ + gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice); + if (calld->incoming_message.length > calld->incoming_message_length) { + /* if we got too many bytes, complain */ + recv_error(chand, calld, __LINE__, + "Receiving message overflow; read %d bytes, expected %d", + calld->incoming_message.length, + calld->incoming_message_length); + return; + } else if (calld->incoming_message.length == + calld->incoming_message_length) { + finish_message(chand, calld); + } + } + } + /* if the stream closed, then call up the stack to let it know */ + if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED || + final_state == GRPC_STREAM_CLOSED)) { + calld->got_read_close = 1; + if (calld->reading_message) { + recv_error(chand, calld, __LINE__, + "Last message truncated; read %d bytes, expected %d", + calld->incoming_message.length, + calld->incoming_message_length); + return; + } + call_op.type = GRPC_RECV_HALF_CLOSE; + call_op.dir = GRPC_CALL_UP; + call_op.flags = 0; + call_op.done_cb = do_nothing; + call_op.user_data = NULL; + grpc_call_next_op(elem, &call_op); + } + if (final_state == GRPC_STREAM_CLOSED) { + call_op.type = GRPC_RECV_FINISH; + call_op.dir = GRPC_CALL_UP; + call_op.flags = 0; + call_op.done_cb = do_nothing; + call_op.user_data = NULL; + grpc_call_next_op(elem, &call_op); + } +} + +static void transport_closed(void *user_data, grpc_transport *transport) { + /* transport was closed ==> call up and handle it */ + grpc_channel_element *elem = user_data; + channel_data *chand = elem->channel_data; + grpc_channel_op op; + + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + GPR_ASSERT(chand->transport == transport); + + op.type = GRPC_TRANSPORT_CLOSED; + op.dir = GRPC_CALL_UP; + channel_op(elem, &op); +} + +const grpc_transport_callbacks connected_channel_transport_callbacks = { + alloc_recv_buffer, accept_stream, recv_batch, transport_closed, +}; + +grpc_transport_setup_result grpc_connected_channel_bind_transport( + grpc_channel_stack *channel_stack, grpc_transport *transport) { + /* Assumes that the connected channel filter is always the last filter + in a channel stack */ + grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); + channel_data *cd = (channel_data *)elem->channel_data; + grpc_transport_setup_result ret; + GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); + GPR_ASSERT(cd->transport == NULL); + cd->transport = transport; + + /* HACK(ctiller): increase call stack size for the channel to make space + for channel data. We need a cleaner (but performant) way to do this, + and I'm not sure what that is yet. + This is only "safe" because call stacks place no additional data after + the last call element, and the last call element MUST be the connected + channel. */ + channel_stack->call_stack_size += grpc_transport_stream_size(transport); + + ret.user_data = elem; + ret.callbacks = &connected_channel_transport_callbacks; + return ret; +} |