aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/connected_channel.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-21 16:02:05 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-21 16:02:05 -0700
commit83f88d90b9c61c94994ed00d03a6fa469359d559 (patch)
treef1320aa3cda8b54961de246437c1ea980459750c /src/core/channel/connected_channel.c
parent65582323ad6b934b4cbb3a4632bb4c985acc1bdc (diff)
stuff
Diffstat (limited to 'src/core/channel/connected_channel.c')
-rw-r--r--src/core/channel/connected_channel.c261
1 files changed, 9 insertions, 252 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 711274bfe1..d0b834a10a 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -50,19 +50,10 @@
typedef struct connected_channel_channel_data {
grpc_transport *transport;
- gpr_uint32 max_message_length;
} channel_data;
typedef struct connected_channel_call_data {
- grpc_call_element *elem;
- grpc_stream_op_buffer outgoing_sopb;
-
- gpr_uint32 max_message_length;
- gpr_uint32 incoming_message_length;
- gpr_uint8 reading_message;
- gpr_uint8 got_read_close;
- gpr_slice_buffer incoming_message;
- gpr_uint32 outgoing_buffer_length_estimate;
+ void *unused;
} call_data;
/* We perform a small hack to locate transport data alongside the connected
@@ -72,6 +63,7 @@ typedef struct connected_channel_call_data {
#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
(((call_data *)(transport_stream)) - 1)
+#if 0
/* Copy the contents of a byte buffer into stream ops */
static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
grpc_stream_op_buffer *sopb) {
@@ -87,76 +79,17 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
break;
}
}
-
-/* Flush queued stream operations onto the transport */
-static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
- call_data *calld, int is_last) {
- size_t nops;
-
- if (op->flags & GRPC_WRITE_BUFFER_HINT) {
- if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) {
- op->done_cb(op->user_data, GRPC_OP_OK);
- return;
- }
- }
-
- calld->outgoing_buffer_length_estimate = 0;
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data);
-
- nops = calld->outgoing_sopb.nops;
- calld->outgoing_sopb.nops = 0;
- grpc_transport_send_batch(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- calld->outgoing_sopb.ops, nops, is_last);
-}
+#endif
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void con_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- if (op->bind_pollset) {
- grpc_transport_add_to_pollset(chand->transport, op->bind_pollset);
- }
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
- end_bufferable_op(op, chand, calld, 0);
- break;
- case GRPC_SEND_MESSAGE:
- grpc_sopb_add_begin_message(&calld->outgoing_sopb,
- grpc_byte_buffer_length(op->data.message),
- op->flags);
- /* fall-through */
- case GRPC_SEND_PREFORMATTED_MESSAGE:
- copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb);
- calld->outgoing_buffer_length_estimate +=
- (5 + grpc_byte_buffer_length(op->data.message));
- end_bufferable_op(op, chand, calld, 0);
- break;
- case GRPC_SEND_FINISH:
- end_bufferable_op(op, chand, calld, 1);
- break;
- case GRPC_REQUEST_DATA:
- /* re-arm window updates if they were disarmed by finish_message */
- grpc_transport_set_allow_window_updates(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1);
- break;
- case GRPC_CANCEL_OP:
- grpc_transport_abort_stream(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- GRPC_STATUS_CANCELLED);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_call_next_op(elem, op);
- break;
- }
+ grpc_transport_perform_op(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
}
/* Currently we assume all channel operations should just be pushed up. */
@@ -188,14 +121,6 @@ static void init_call_elem(grpc_call_element *elem,
int r;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- calld->elem = elem;
- grpc_sopb_init(&calld->outgoing_sopb);
-
- calld->reading_message = 0;
- calld->got_read_close = 0;
- calld->outgoing_buffer_length_estimate = 0;
- calld->max_message_length = chand->max_message_length;
- gpr_slice_buffer_init(&calld->incoming_message);
r = grpc_transport_init_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
server_transport_data);
@@ -207,8 +132,6 @@ static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- grpc_sopb_destroy(&calld->outgoing_sopb);
- gpr_slice_buffer_destroy(&calld->incoming_message);
grpc_transport_destroy_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld));
}
@@ -218,12 +141,12 @@ static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *cd = (channel_data *)elem->channel_data;
- size_t i;
GPR_ASSERT(!is_first);
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
+#if 0
cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
if (args) {
for (i = 0; i < args->num_args; i++) {
@@ -240,6 +163,7 @@ static void init_channel_elem(grpc_channel_element *elem,
}
}
}
+#endif
}
/* Destructor for channel_data */
@@ -250,15 +174,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_connected_channel_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ con_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected",
};
-static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport,
- grpc_stream *stream, size_t size_hint) {
- return gpr_slice_malloc(size_hint);
-}
-
/* Transport callback to accept a new stream... calls up to handle it */
static void accept_stream(void *user_data, grpc_transport *transport,
const void *transport_server_data) {
@@ -276,168 +195,6 @@ static void accept_stream(void *user_data, grpc_transport *transport,
channel_op(elem, NULL, &op);
}
-static void recv_error(channel_data *chand, call_data *calld, int line,
- const char *message) {
- gpr_log_message(__FILE__, line, GPR_LOG_SEVERITY_ERROR, message);
-
- if (chand->transport) {
- grpc_transport_abort_stream(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- GRPC_STATUS_INVALID_ARGUMENT);
- }
-}
-
-static void do_nothing(void *calldata, grpc_op_error error) {}
-
-static void finish_message(channel_data *chand, call_data *calld) {
- grpc_call_element *elem = calld->elem;
- grpc_call_op call_op;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- /* if we got all the bytes for this message, call up the stack */
- call_op.type = GRPC_RECV_MESSAGE;
- call_op.done_cb = do_nothing;
- /* TODO(ctiller): this could be a lot faster if coded directly */
- call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices,
- calld->incoming_message.count);
- gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
-
- /* disable window updates until we get a request more from above */
- grpc_transport_set_allow_window_updates(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0);
-
- GPR_ASSERT(calld->incoming_message.count == 0);
- calld->reading_message = 0;
- grpc_call_next_op(elem, &call_op);
-}
-
-static void got_metadata(grpc_call_element *elem,
- grpc_metadata_batch metadata) {
- grpc_call_op op;
- op.type = GRPC_RECV_METADATA;
- op.dir = GRPC_CALL_UP;
- op.flags = 0;
- op.data.metadata = metadata;
- op.done_cb = do_nothing;
- op.user_data = NULL;
-
- grpc_call_next_op(elem, &op);
-}
-
-/* Handle incoming stream ops from the transport, translating them into
- call_ops to pass up the call stack */
-static void recv_batch(void *user_data, grpc_transport *transport,
- grpc_stream *stream, grpc_stream_op *ops,
- size_t ops_count, grpc_stream_state final_state) {
- call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream);
- grpc_call_element *elem = calld->elem;
- channel_data *chand = elem->channel_data;
- grpc_stream_op *stream_op;
- grpc_call_op call_op;
- size_t i;
- gpr_uint32 length;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
-
- for (i = 0; i < ops_count; i++) {
- stream_op = ops + i;
- switch (stream_op->type) {
- case GRPC_OP_FLOW_CTL_CB:
- stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1);
- break;
- case GRPC_NO_OP:
- break;
- case GRPC_OP_METADATA:
- got_metadata(elem, stream_op->data.metadata);
- break;
- case GRPC_OP_BEGIN_MESSAGE:
- /* can't begin a message when we're still reading a message */
- if (calld->reading_message) {
- char *message = NULL;
- gpr_asprintf(&message,
- "Message terminated early; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- return;
- }
- /* stash away parameters, and prepare for incoming slices */
- length = stream_op->data.begin_message.length;
- if (length > calld->max_message_length) {
- char *message = NULL;
- gpr_asprintf(
- &message,
- "Maximum message length of %d exceeded by a message of length %d",
- calld->max_message_length, length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- } else if (length > 0) {
- calld->reading_message = 1;
- calld->incoming_message_length = length;
- } else {
- finish_message(chand, calld);
- }
- break;
- case GRPC_OP_SLICE:
- if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) {
- gpr_slice_unref(stream_op->data.slice);
- break;
- }
- /* we have to be reading a message to know what to do here */
- if (!calld->reading_message) {
- recv_error(chand, calld, __LINE__,
- "Received payload data while not reading a message");
- return;
- }
- /* append the slice to the incoming buffer */
- gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice);
- if (calld->incoming_message.length > calld->incoming_message_length) {
- /* if we got too many bytes, complain */
- char *message = NULL;
- gpr_asprintf(&message,
- "Receiving message overflow; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- return;
- } else if (calld->incoming_message.length ==
- calld->incoming_message_length) {
- finish_message(chand, calld);
- }
- }
- }
- /* if the stream closed, then call up the stack to let it know */
- if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED ||
- final_state == GRPC_STREAM_CLOSED)) {
- calld->got_read_close = 1;
- if (calld->reading_message) {
- char *message = NULL;
- gpr_asprintf(&message,
- "Last message truncated; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- }
- call_op.type = GRPC_RECV_HALF_CLOSE;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
- if (final_state == GRPC_STREAM_CLOSED) {
- call_op.type = GRPC_RECV_FINISH;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
-}
-
static void transport_goaway(void *user_data, grpc_transport *transport,
grpc_status_code status, gpr_slice debug) {
/* transport got goaway ==> call up and handle it */
@@ -470,7 +227,7 @@ static void transport_closed(void *user_data, grpc_transport *transport) {
}
const grpc_transport_callbacks connected_channel_transport_callbacks = {
- alloc_recv_buffer, accept_stream, recv_batch,
+ accept_stream,
transport_goaway, transport_closed,
};