diff options
Diffstat (limited to 'src/core/lib/channel/connected_channel.c')
-rw-r--r-- | src/core/lib/channel/connected_channel.c | 92 |
1 files changed, 11 insertions, 81 deletions
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 8285226fc4..af06ca802e 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -36,57 +36,7 @@ typedef struct connected_channel_channel_data { grpc_transport *transport; } channel_data; -typedef struct { - grpc_closure closure; - grpc_closure *original_closure; - grpc_call_combiner *call_combiner; - const char *reason; -} callback_state; - -typedef struct connected_channel_call_data { - grpc_call_combiner *call_combiner; - // Closures used for returning results on the call combiner. - callback_state on_complete[6]; // Max number of pending batches. - callback_state recv_initial_metadata_ready; - callback_state recv_message_ready; -} call_data; - -static void run_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - callback_state *state = (callback_state *)arg; - GRPC_CALL_COMBINER_START(exec_ctx, state->call_combiner, - state->original_closure, GRPC_ERROR_REF(error), - state->reason); -} - -static void run_cancel_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - run_in_call_combiner(exec_ctx, arg, error); - gpr_free(arg); -} - -static void intercept_callback(call_data *calld, callback_state *state, - bool free_when_done, const char *reason, - grpc_closure **original_closure) { - state->original_closure = *original_closure; - state->call_combiner = calld->call_combiner; - state->reason = reason; - *original_closure = GRPC_CLOSURE_INIT( - &state->closure, - free_when_done ? run_cancel_in_call_combiner : run_in_call_combiner, - state, grpc_schedule_on_exec_ctx); -} - -static callback_state *get_state_for_batch( - call_data *calld, grpc_transport_stream_op_batch *batch) { - if (batch->send_initial_metadata) return &calld->on_complete[0]; - if (batch->send_message) return &calld->on_complete[1]; - if (batch->send_trailing_metadata) return &calld->on_complete[2]; - if (batch->recv_initial_metadata) return &calld->on_complete[3]; - if (batch->recv_message) return &calld->on_complete[4]; - if (batch->recv_trailing_metadata) return &calld->on_complete[5]; - GPR_UNREACHABLE_CODE(return NULL); -} +typedef struct connected_channel_call_data { void *unused; } call_data; /* We perform a small hack to locate transport data alongside the connected channel data in call allocations, to allow everything to be pulled in minimal @@ -99,38 +49,13 @@ static callback_state *get_state_for_batch( into transport stream operations */ static void con_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *batch) { + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - if (batch->recv_initial_metadata) { - callback_state *state = &calld->recv_initial_metadata_ready; - intercept_callback( - calld, state, false, "recv_initial_metadata_ready", - &batch->payload->recv_initial_metadata.recv_initial_metadata_ready); - } - if (batch->recv_message) { - callback_state *state = &calld->recv_message_ready; - intercept_callback(calld, state, false, "recv_message_ready", - &batch->payload->recv_message.recv_message_ready); - } - if (batch->cancel_stream) { - // There can be more than one cancellation batch in flight at any - // given time, so we can't just pick out a fixed index into - // calld->on_complete like we can for the other ops. However, - // cancellation isn't in the fast path, so we just allocate a new - // closure for each one. - callback_state *state = (callback_state *)gpr_malloc(sizeof(*state)); - intercept_callback(calld, state, true, "on_complete (cancel_stream)", - &batch->on_complete); - } else { - callback_state *state = get_state_for_batch(calld, batch); - intercept_callback(calld, state, false, "on_complete", &batch->on_complete); - } + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_transport_perform_stream_op(exec_ctx, chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - batch); - GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, - "passed batch to transport"); + TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); } static void con_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -146,7 +71,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_element_args *args) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - calld->call_combiner = args->call_combiner; int r = grpc_transport_init_stream( exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), &args->call_stack->refcount, args->server_transport_data, args->arena); @@ -194,6 +118,11 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } } +static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + return grpc_transport_get_peer(exec_ctx, chand->transport); +} + /* No-op. */ static void con_get_channel_info(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, @@ -209,6 +138,7 @@ const grpc_channel_filter grpc_connected_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + con_get_peer, con_get_channel_info, "connected", }; |