From 0ee7574732a06e8cace4e099a678f4bd5dbff679 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 13 Oct 2017 16:07:13 -0700 Subject: Removing instances of exec_ctx being passed around in functions in src/core. exec_ctx is now a thread_local pointer of type ExecCtx instead of grpc_exec_ctx which is initialized whenever ExecCtx is instantiated. ExecCtx also keeps track of the previous exec_ctx so that nesting of exec_ctx is allowed. This means that there is only one exec_ctx being used at any time. Also, grpc_exec_ctx_finish is called in the destructor of the object, and the previous exec_ctx is restored to avoid breaking current functionality. The code still explicitly calls grpc_exec_ctx_finish because removing all such instances causes the code to break. --- src/core/lib/channel/connected_channel.cc | 57 ++++++++++++------------------- 1 file changed, 22 insertions(+), 35 deletions(-) (limited to 'src/core/lib/channel/connected_channel.cc') diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index 4f37908958..460e8b4c65 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -51,17 +51,14 @@ typedef struct connected_channel_call_data { callback_state recv_message_ready; } call_data; -static void run_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void run_in_call_combiner(void *arg, grpc_error *error) { callback_state *state = (callback_state *)arg; - GRPC_CALL_COMBINER_START(exec_ctx, state->call_combiner, - state->original_closure, GRPC_ERROR_REF(error), - state->reason); + GRPC_CALL_COMBINER_START(state->call_combiner, state->original_closure, + GRPC_ERROR_REF(error), state->reason); } -static void run_cancel_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - run_in_call_combiner(exec_ctx, arg, error); +static void run_cancel_in_call_combiner(void *arg, grpc_error *error) { + run_in_call_combiner(arg, error); gpr_free(arg); } @@ -98,8 +95,7 @@ static callback_state *get_state_for_batch( /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ static void con_start_transport_stream_op_batch( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *batch) { + grpc_call_element *elem, grpc_transport_stream_op_batch *batch) { call_data *calld = (call_data *)elem->call_data; channel_data *chand = (channel_data *)elem->channel_data; if (batch->recv_initial_metadata) { @@ -126,58 +122,52 @@ static void con_start_transport_stream_op_batch( callback_state *state = get_state_for_batch(calld, batch); intercept_callback(calld, state, false, "on_complete", &batch->on_complete); } - grpc_transport_perform_stream_op(exec_ctx, chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - batch); - GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, - "passed batch to transport"); + grpc_transport_perform_stream_op( + chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), batch); + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "passed batch to transport"); } -static void con_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, +static void con_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) { channel_data *chand = (channel_data *)elem->channel_data; - grpc_transport_perform_op(exec_ctx, chand->transport, op); + grpc_transport_perform_op(chand->transport, op); } /* Constructor for call_data */ -static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, +static grpc_error *init_call_elem(grpc_call_element *elem, const grpc_call_element_args *args) { call_data *calld = (call_data *)elem->call_data; channel_data *chand = (channel_data *)elem->channel_data; calld->call_combiner = args->call_combiner; int r = grpc_transport_init_stream( - exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), + chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), &args->call_stack->refcount, args->server_transport_data, args->arena); return r == 0 ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING( "transport stream initialization failed"); } -static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, +static void set_pollset_or_pollset_set(grpc_call_element *elem, grpc_polling_entity *pollent) { call_data *calld = (call_data *)elem->call_data; channel_data *chand = (channel_data *)elem->channel_data; - grpc_transport_set_pops(exec_ctx, chand->transport, + grpc_transport_set_pops(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollent); } /* Destructor for call_data */ -static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, +static void destroy_call_elem(grpc_call_element *elem, const grpc_call_final_info *final_info, grpc_closure *then_schedule_closure) { call_data *calld = (call_data *)elem->call_data; channel_data *chand = (channel_data *)elem->channel_data; - grpc_transport_destroy_stream(exec_ctx, chand->transport, + grpc_transport_destroy_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), then_schedule_closure); } /* Constructor for channel_data */ -static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, +static grpc_error *init_channel_elem(grpc_channel_element *elem, grpc_channel_element_args *args) { channel_data *cd = (channel_data *)elem->channel_data; GPR_ASSERT(args->is_last); @@ -186,17 +176,15 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, } /* Destructor for channel_data */ -static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) { +static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *cd = (channel_data *)elem->channel_data; if (cd->transport) { - grpc_transport_destroy(exec_ctx, cd->transport); + grpc_transport_destroy(cd->transport); } } /* No-op. */ -static void con_get_channel_info(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, +static void con_get_channel_info(grpc_channel_element *elem, const grpc_channel_info *channel_info) {} const grpc_channel_filter grpc_connected_filter = { @@ -230,8 +218,7 @@ static void bind_transport(grpc_channel_stack *channel_stack, grpc_transport_stream_size((grpc_transport *)t); } -bool grpc_add_connected_filter(grpc_exec_ctx *exec_ctx, - grpc_channel_stack_builder *builder, +bool grpc_add_connected_filter(grpc_channel_stack_builder *builder, void *arg_must_be_null) { GPR_ASSERT(arg_must_be_null == NULL); grpc_transport *t = grpc_channel_stack_builder_get_transport(builder); -- cgit v1.2.3