diff options
Diffstat (limited to 'src/core/lib/channel/connected_channel.cc')
-rw-r--r-- | src/core/lib/channel/connected_channel.cc | 57 |
1 files changed, 22 insertions, 35 deletions
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index af2f88ab2e..9d07cfff4e 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -51,17 +51,14 @@ typedef struct connected_channel_call_data { callback_state recv_message_ready; } call_data; -static void run_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void run_in_call_combiner(void* arg, grpc_error* error) { callback_state* state = (callback_state*)arg; - GRPC_CALL_COMBINER_START(exec_ctx, state->call_combiner, - state->original_closure, GRPC_ERROR_REF(error), - state->reason); + GRPC_CALL_COMBINER_START(state->call_combiner, state->original_closure, + GRPC_ERROR_REF(error), state->reason); } -static void run_cancel_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { - run_in_call_combiner(exec_ctx, arg, error); +static void run_cancel_in_call_combiner(void* arg, grpc_error* error) { + run_in_call_combiner(arg, error); gpr_free(arg); } @@ -98,8 +95,7 @@ static callback_state* get_state_for_batch( /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ static void con_start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* batch) { + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; if (batch->recv_initial_metadata) { @@ -126,58 +122,52 @@ static void con_start_transport_stream_op_batch( callback_state* state = get_state_for_batch(calld, batch); intercept_callback(calld, state, false, "on_complete", &batch->on_complete); } - grpc_transport_perform_stream_op(exec_ctx, chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - batch); - GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, - "passed batch to transport"); + grpc_transport_perform_stream_op( + chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), batch); + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "passed batch to transport"); } -static void con_start_transport_op(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, +static void con_start_transport_op(grpc_channel_element* elem, grpc_transport_op* op) { channel_data* chand = (channel_data*)elem->channel_data; - grpc_transport_perform_op(exec_ctx, chand->transport, op); + grpc_transport_perform_op(chand->transport, op); } /* Constructor for call_data */ -static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; calld->call_combiner = args->call_combiner; int r = grpc_transport_init_stream( - exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), + chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), &args->call_stack->refcount, args->server_transport_data, args->arena); return r == 0 ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING( "transport stream initialization failed"); } -static void set_pollset_or_pollset_set(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static void set_pollset_or_pollset_set(grpc_call_element* elem, grpc_polling_entity* pollent) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - grpc_transport_set_pops(exec_ctx, chand->transport, + grpc_transport_set_pops(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollent); } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, +static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* then_schedule_closure) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - grpc_transport_destroy_stream(exec_ctx, chand->transport, + grpc_transport_destroy_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), then_schedule_closure); } /* Constructor for channel_data */ -static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { channel_data* cd = (channel_data*)elem->channel_data; GPR_ASSERT(args->is_last); @@ -186,17 +176,15 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, } /* Destructor for channel_data */ -static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) { +static void destroy_channel_elem(grpc_channel_element* elem) { channel_data* cd = (channel_data*)elem->channel_data; if (cd->transport) { - grpc_transport_destroy(exec_ctx, cd->transport); + grpc_transport_destroy(cd->transport); } } /* No-op. */ -static void con_get_channel_info(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, +static void con_get_channel_info(grpc_channel_element* elem, const grpc_channel_info* channel_info) {} const grpc_channel_filter grpc_connected_filter = { @@ -230,8 +218,7 @@ static void bind_transport(grpc_channel_stack* channel_stack, grpc_transport_stream_size((grpc_transport*)t); } -bool grpc_add_connected_filter(grpc_exec_ctx* exec_ctx, - grpc_channel_stack_builder* builder, +bool grpc_add_connected_filter(grpc_channel_stack_builder* builder, void* arg_must_be_null) { GPR_ASSERT(arg_must_be_null == nullptr); grpc_transport* t = grpc_channel_stack_builder_get_transport(builder); |