aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/connected_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/connected_channel.c')
-rw-r--r--src/core/channel/connected_channel.c107
1 files changed, 20 insertions, 87 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 14dda88698..34d07de519 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -61,42 +61,27 @@ typedef struct connected_channel_call_data { void *unused; } call_data;
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
-static void con_start_transport_op(grpc_call_element *elem,
- grpc_transport_op *op) {
+static void con_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_transport_perform_op(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
+ grpc_transport_perform_stream_op(chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
}
-/* Currently we assume all channel operations should just be pushed up. */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
+static void con_start_transport_op(grpc_channel_element *elem,
+ grpc_transport_op *op) {
channel_data *chand = elem->channel_data;
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
-
- switch (op->type) {
- case GRPC_CHANNEL_GOAWAY:
- grpc_transport_goaway(chand->transport, op->data.goaway.status,
- op->data.goaway.message);
- break;
- case GRPC_CHANNEL_DISCONNECT:
- grpc_transport_close(chand->transport);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_channel_next_op(elem, op);
- break;
- }
+ grpc_transport_perform_op(chand->transport, op);
}
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_op *initial_op) {
+ grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r;
@@ -118,11 +103,10 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *cd = (channel_data *)elem->channel_data;
- GPR_ASSERT(!is_first);
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
@@ -136,70 +120,23 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_connected_channel_filter = {
- con_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "connected",
+ con_start_transport_stream_op,
+ con_start_transport_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "connected",
};
-/* Transport callback to accept a new stream... calls up to handle it */
-static void accept_stream(void *user_data, grpc_transport *transport,
- const void *transport_server_data) {
- grpc_channel_element *elem = user_data;
- channel_data *chand = elem->channel_data;
- grpc_channel_op op;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- GPR_ASSERT(chand->transport == transport);
-
- op.type = GRPC_ACCEPT_CALL;
- op.dir = GRPC_CALL_UP;
- op.data.accept_call.transport = transport;
- op.data.accept_call.transport_server_data = transport_server_data;
- channel_op(elem, NULL, &op);
-}
-
-static void transport_goaway(void *user_data, grpc_transport *transport,
- grpc_status_code status, gpr_slice debug) {
- /* transport got goaway ==> call up and handle it */
- grpc_channel_element *elem = user_data;
- channel_data *chand = elem->channel_data;
- grpc_channel_op op;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- GPR_ASSERT(chand->transport == transport);
-
- op.type = GRPC_TRANSPORT_GOAWAY;
- op.dir = GRPC_CALL_UP;
- op.data.goaway.status = status;
- op.data.goaway.message = debug;
- channel_op(elem, NULL, &op);
-}
-
-static void transport_closed(void *user_data, grpc_transport *transport) {
- /* transport was closed ==> call up and handle it */
- grpc_channel_element *elem = user_data;
- channel_data *chand = elem->channel_data;
- grpc_channel_op op;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- GPR_ASSERT(chand->transport == transport);
-
- op.type = GRPC_TRANSPORT_CLOSED;
- op.dir = GRPC_CALL_UP;
- channel_op(elem, NULL, &op);
-}
-
-const grpc_transport_callbacks connected_channel_transport_callbacks = {
- accept_stream, transport_goaway, transport_closed,
-};
-
-grpc_transport_setup_result grpc_connected_channel_bind_transport(
- grpc_channel_stack *channel_stack, grpc_transport *transport) {
+void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack,
+ grpc_transport *transport) {
/* Assumes that the connected channel filter is always the last filter
in a channel stack */
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *cd = (channel_data *)elem->channel_data;
- grpc_transport_setup_result ret;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GPR_ASSERT(cd->transport == NULL);
cd->transport = transport;
@@ -211,8 +148,4 @@ grpc_transport_setup_result grpc_connected_channel_bind_transport(
the last call element, and the last call element MUST be the connected
channel. */
channel_stack->call_stack_size += grpc_transport_stream_size(transport);
-
- ret.user_data = elem;
- ret.callbacks = &connected_channel_transport_callbacks;
- return ret;
}