diff options
Diffstat (limited to 'src/core/ext/filters/deadline/deadline_filter.cc')
-rw-r--r-- | src/core/ext/filters/deadline/deadline_filter.cc | 112 |
1 files changed, 47 insertions, 65 deletions
diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index 5db7584a59..c430f3d2d4 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -36,18 +36,16 @@ // The on_complete callback used when sending a cancel_error batch down the // filter stack. Yields the call combiner when the batch returns. -static void yield_call_combiner(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* ignored) { +static void yield_call_combiner(void* arg, grpc_error* ignored) { grpc_deadline_state* deadline_state = (grpc_deadline_state*)arg; - GRPC_CALL_COMBINER_STOP(exec_ctx, deadline_state->call_combiner, + GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner, "got on_complete from cancel_stream batch"); - GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer"); + GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "deadline_timer"); } // This is called via the call combiner, so access to deadline_state is // synchronized. -static void send_cancel_op_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void send_cancel_op_in_call_combiner(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op( @@ -55,37 +53,34 @@ static void send_cancel_op_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg, deadline_state, grpc_schedule_on_exec_ctx)); batch->cancel_stream = true; batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error); - elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch); + elem->filter->start_transport_stream_op_batch(elem, batch); } // Timer callback. -static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void timer_callback(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; if (error != GRPC_ERROR_CANCELLED) { error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED); - grpc_call_combiner_cancel(exec_ctx, deadline_state->call_combiner, + grpc_call_combiner_cancel(deadline_state->call_combiner, GRPC_ERROR_REF(error)); GRPC_CLOSURE_INIT(&deadline_state->timer_callback, send_cancel_op_in_call_combiner, elem, grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(exec_ctx, deadline_state->call_combiner, + GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &deadline_state->timer_callback, error, "deadline exceeded -- sending cancel_stream op"); } else { - GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, - "deadline_timer"); + GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "deadline_timer"); } } // Starts the deadline timer. // This is called via the call combiner, so access to deadline_state is // synchronized. -static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static void start_timer_if_needed(grpc_call_element* elem, grpc_millis deadline) { if (deadline == GRPC_MILLIS_INF_FUTURE) { return; @@ -113,17 +108,16 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, } GPR_ASSERT(closure != nullptr); GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); - grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure); + grpc_timer_init(&deadline_state->timer, deadline, closure); } // Cancels the deadline timer. // This is called via the call combiner, so access to deadline_state is // synchronized. -static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, - grpc_deadline_state* deadline_state) { +static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) { if (deadline_state->timer_state == GRPC_DEADLINE_STATE_PENDING) { deadline_state->timer_state = GRPC_DEADLINE_STATE_FINISHED; - grpc_timer_cancel(exec_ctx, &deadline_state->timer); + grpc_timer_cancel(&deadline_state->timer); } else { // timer was either in STATE_INITAL (nothing to cancel) // OR in STATE_FINISHED (again nothing to cancel) @@ -131,12 +125,11 @@ static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, } // Callback run when the call is complete. -static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { +static void on_complete(void* arg, grpc_error* error) { grpc_deadline_state* deadline_state = (grpc_deadline_state*)arg; - cancel_timer_if_needed(exec_ctx, deadline_state); + cancel_timer_if_needed(deadline_state); // Invoke the next callback. - GRPC_CLOSURE_RUN(exec_ctx, deadline_state->next_on_complete, - GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(deadline_state->next_on_complete, GRPC_ERROR_REF(error)); } // Inject our own on_complete callback into op. @@ -156,8 +149,7 @@ struct start_timer_after_init_state { grpc_millis deadline; grpc_closure closure; }; -static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void start_timer_after_init(void* arg, grpc_error* error) { struct start_timer_after_init_state* state = (struct start_timer_after_init_state*)arg; grpc_deadline_state* deadline_state = @@ -166,18 +158,18 @@ static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, // We are initially called without holding the call combiner, so we // need to bounce ourselves into it. state->in_call_combiner = true; - GRPC_CALL_COMBINER_START(exec_ctx, deadline_state->call_combiner, - &state->closure, GRPC_ERROR_REF(error), + GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &state->closure, + GRPC_ERROR_REF(error), "scheduling deadline timer"); return; } - start_timer_if_needed(exec_ctx, state->elem, state->deadline); + start_timer_if_needed(state->elem, state->deadline); gpr_free(state); - GRPC_CALL_COMBINER_STOP(exec_ctx, deadline_state->call_combiner, + GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner, "done scheduling deadline timer"); } -void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, +void grpc_deadline_state_init(grpc_call_element* elem, grpc_call_stack* call_stack, grpc_call_combiner* call_combiner, grpc_millis deadline) { @@ -200,29 +192,27 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, state->deadline = deadline; GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(exec_ctx, &state->closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&state->closure, GRPC_ERROR_NONE); } } -void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +void grpc_deadline_state_destroy(grpc_call_element* elem) { grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; - cancel_timer_if_needed(exec_ctx, deadline_state); + cancel_timer_if_needed(deadline_state); } -void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, +void grpc_deadline_state_reset(grpc_call_element* elem, grpc_millis new_deadline) { grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; - cancel_timer_if_needed(exec_ctx, deadline_state); - start_timer_if_needed(exec_ctx, elem, new_deadline); + cancel_timer_if_needed(deadline_state); + start_timer_if_needed(elem, new_deadline); } void grpc_deadline_state_client_start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* op) { + grpc_call_element* elem, grpc_transport_stream_op_batch* op) { grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; if (op->cancel_stream) { - cancel_timer_if_needed(exec_ctx, deadline_state); + cancel_timer_if_needed(deadline_state); } else { // Make sure we know when the call is complete, so that we can cancel // the timer. @@ -237,16 +227,14 @@ void grpc_deadline_state_client_start_transport_stream_op_batch( // // Constructor for channel_data. Used for both client and server filters. -static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); return GRPC_ERROR_NONE; } // Destructor for channel_data. Used for both client and server filters. -static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) {} +static void destroy_channel_elem(grpc_channel_element* elem) {} // Call data used for both client and server filter. typedef struct base_call_data { @@ -266,50 +254,45 @@ typedef struct server_call_data { } server_call_data; // Constructor for call_data. Used for both client and server filters. -static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { - grpc_deadline_state_init(exec_ctx, elem, args->call_stack, - args->call_combiner, args->deadline); + grpc_deadline_state_init(elem, args->call_stack, args->call_combiner, + args->deadline); return GRPC_ERROR_NONE; } // Destructor for call_data. Used for both client and server filters. -static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, +static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { - grpc_deadline_state_destroy(exec_ctx, elem); + grpc_deadline_state_destroy(elem); } // Method for starting a call op for client filter. static void client_start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* op) { - grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, - op); + grpc_call_element* elem, grpc_transport_stream_op_batch* op) { + grpc_deadline_state_client_start_transport_stream_op_batch(elem, op); // Chain to next filter. - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(elem, op); } // Callback for receiving initial metadata on the server. -static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void recv_initial_metadata_ready(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; server_call_data* calld = (server_call_data*)elem->call_data; // Get deadline from metadata and start the timer if needed. - start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline); + start_timer_if_needed(elem, calld->recv_initial_metadata->deadline); // Invoke the next callback. calld->next_recv_initial_metadata_ready->cb( - exec_ctx, calld->next_recv_initial_metadata_ready->cb_arg, error); + calld->next_recv_initial_metadata_ready->cb_arg, error); } // Method for starting a call op for server filter. static void server_start_transport_stream_op_batch( - grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op_batch* op) { + grpc_call_element* elem, grpc_transport_stream_op_batch* op) { server_call_data* calld = (server_call_data*)elem->call_data; if (op->cancel_stream) { - cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state); + cancel_timer_if_needed(&calld->base.deadline_state); } else { // If we're receiving initial metadata, we need to get the deadline // from the recv_initial_metadata_ready callback. So we inject our @@ -335,7 +318,7 @@ static void server_start_transport_stream_op_batch( } } // Chain to next filter. - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(elem, op); } const grpc_channel_filter grpc_client_deadline_filter = { @@ -372,8 +355,7 @@ bool grpc_deadline_checking_enabled(const grpc_channel_args* channel_args) { !grpc_channel_args_want_minimal_stack(channel_args)); } -static bool maybe_add_deadline_filter(grpc_exec_ctx* exec_ctx, - grpc_channel_stack_builder* builder, +static bool maybe_add_deadline_filter(grpc_channel_stack_builder* builder, void* arg) { return grpc_deadline_checking_enabled( grpc_channel_stack_builder_get_channel_arguments(builder)) |