diff options
Diffstat (limited to 'src/core/channel/connected_channel.c')
-rw-r--r-- | src/core/channel/connected_channel.c | 107 |
1 files changed, 20 insertions, 87 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 14dda88698..34d07de519 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -61,42 +61,27 @@ typedef struct connected_channel_call_data { void *unused; } call_data; /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ -static void con_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { +static void con_start_transport_stream_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_transport_perform_op(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); + grpc_transport_perform_stream_op(chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); } -/* Currently we assume all channel operations should just be pushed up. */ -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { +static void con_start_transport_op(grpc_channel_element *elem, + grpc_transport_op *op) { channel_data *chand = elem->channel_data; - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - - switch (op->type) { - case GRPC_CHANNEL_GOAWAY: - grpc_transport_goaway(chand->transport, op->data.goaway.status, - op->data.goaway.message); - break; - case GRPC_CHANNEL_DISCONNECT: - grpc_transport_close(chand->transport); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_UP); - grpc_channel_next_op(elem, op); - break; - } + grpc_transport_perform_op(chand->transport, op); } /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_op *initial_op) { + grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; int r; @@ -118,11 +103,10 @@ static void destroy_call_elem(grpc_call_element *elem) { } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, +static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *cd = (channel_data *)elem->channel_data; - GPR_ASSERT(!is_first); GPR_ASSERT(is_last); GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); cd->transport = NULL; @@ -136,70 +120,23 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_connected_channel_filter = { - con_start_transport_op, channel_op, sizeof(call_data), init_call_elem, - destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "connected", + con_start_transport_stream_op, + con_start_transport_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + "connected", }; -/* Transport callback to accept a new stream... calls up to handle it */ -static void accept_stream(void *user_data, grpc_transport *transport, - const void *transport_server_data) { - grpc_channel_element *elem = user_data; - channel_data *chand = elem->channel_data; - grpc_channel_op op; - - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - GPR_ASSERT(chand->transport == transport); - - op.type = GRPC_ACCEPT_CALL; - op.dir = GRPC_CALL_UP; - op.data.accept_call.transport = transport; - op.data.accept_call.transport_server_data = transport_server_data; - channel_op(elem, NULL, &op); -} - -static void transport_goaway(void *user_data, grpc_transport *transport, - grpc_status_code status, gpr_slice debug) { - /* transport got goaway ==> call up and handle it */ - grpc_channel_element *elem = user_data; - channel_data *chand = elem->channel_data; - grpc_channel_op op; - - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - GPR_ASSERT(chand->transport == transport); - - op.type = GRPC_TRANSPORT_GOAWAY; - op.dir = GRPC_CALL_UP; - op.data.goaway.status = status; - op.data.goaway.message = debug; - channel_op(elem, NULL, &op); -} - -static void transport_closed(void *user_data, grpc_transport *transport) { - /* transport was closed ==> call up and handle it */ - grpc_channel_element *elem = user_data; - channel_data *chand = elem->channel_data; - grpc_channel_op op; - - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - GPR_ASSERT(chand->transport == transport); - - op.type = GRPC_TRANSPORT_CLOSED; - op.dir = GRPC_CALL_UP; - channel_op(elem, NULL, &op); -} - -const grpc_transport_callbacks connected_channel_transport_callbacks = { - accept_stream, transport_goaway, transport_closed, -}; - -grpc_transport_setup_result grpc_connected_channel_bind_transport( - grpc_channel_stack *channel_stack, grpc_transport *transport) { +void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack, + grpc_transport *transport) { /* Assumes that the connected channel filter is always the last filter in a channel stack */ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *cd = (channel_data *)elem->channel_data; - grpc_transport_setup_result ret; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GPR_ASSERT(cd->transport == NULL); cd->transport = transport; @@ -211,8 +148,4 @@ grpc_transport_setup_result grpc_connected_channel_bind_transport( the last call element, and the last call element MUST be the connected channel. */ channel_stack->call_stack_size += grpc_transport_stream_size(transport); - - ret.user_data = elem; - ret.callbacks = &connected_channel_transport_callbacks; - return ret; } |