aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/deadline/deadline_filter.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/deadline/deadline_filter.c')
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.c112
1 files changed, 74 insertions, 38 deletions
diff --git a/src/core/ext/filters/deadline/deadline_filter.c b/src/core/ext/filters/deadline/deadline_filter.c
index 6789903c95..565b0679dc 100644
--- a/src/core/ext/filters/deadline/deadline_filter.c
+++ b/src/core/ext/filters/deadline/deadline_filter.c
@@ -34,22 +34,56 @@
// grpc_deadline_state
//
+// The on_complete callback used when sending a cancel_error batch down the
+// filter stack. Yields the call combiner when the batch returns.
+static void yield_call_combiner(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* ignored) {
+ grpc_deadline_state* deadline_state = arg;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, deadline_state->call_combiner,
+ "got on_complete from cancel_stream batch");
+ GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
+}
+
+// This is called via the call combiner, so access to deadline_state is
+// synchronized.
+static void send_cancel_op_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = arg;
+ grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
+ GRPC_CLOSURE_INIT(&deadline_state->timer_callback, yield_call_combiner,
+ deadline_state, grpc_schedule_on_exec_ctx));
+ batch->cancel_stream = true;
+ batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
+}
+
// Timer callback.
static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
grpc_call_element* elem = (grpc_call_element*)arg;
grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
if (error != GRPC_ERROR_CANCELLED) {
- grpc_call_element_signal_error(
- exec_ctx, elem,
- grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED));
+ error = grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
+ grpc_call_combiner_cancel(exec_ctx, deadline_state->call_combiner,
+ GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_INIT(&deadline_state->timer_callback,
+ send_cancel_op_in_call_combiner, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(exec_ctx, deadline_state->call_combiner,
+ &deadline_state->timer_callback, error,
+ "deadline exceeded -- sending cancel_stream op");
+ } else {
+ GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack,
+ "deadline_timer");
}
- GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
}
// Starts the deadline timer.
+// This is called via the call combiner, so access to deadline_state is
+// synchronized.
static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
gpr_timespec deadline) {
@@ -58,51 +92,39 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
return;
}
grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
- grpc_deadline_timer_state cur_state;
grpc_closure* closure = NULL;
-retry:
- cur_state =
- (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state);
- switch (cur_state) {
+ switch (deadline_state->timer_state) {
case GRPC_DEADLINE_STATE_PENDING:
// Note: We do not start the timer if there is already a timer
return;
case GRPC_DEADLINE_STATE_FINISHED:
- if (gpr_atm_rel_cas(&deadline_state->timer_state,
- GRPC_DEADLINE_STATE_FINISHED,
- GRPC_DEADLINE_STATE_PENDING)) {
- // If we've already created and destroyed a timer, we always create a
- // new closure: we have no other guarantee that the inlined closure is
- // not in use (it may hold a pending call to timer_callback)
- closure = GRPC_CLOSURE_CREATE(timer_callback, elem,
- grpc_schedule_on_exec_ctx);
- } else {
- goto retry;
- }
+ deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
+ // If we've already created and destroyed a timer, we always create a
+ // new closure: we have no other guarantee that the inlined closure is
+ // not in use (it may hold a pending call to timer_callback)
+ closure =
+ GRPC_CLOSURE_CREATE(timer_callback, elem, grpc_schedule_on_exec_ctx);
break;
case GRPC_DEADLINE_STATE_INITIAL:
- if (gpr_atm_rel_cas(&deadline_state->timer_state,
- GRPC_DEADLINE_STATE_INITIAL,
- GRPC_DEADLINE_STATE_PENDING)) {
- closure =
- GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback,
- elem, grpc_schedule_on_exec_ctx);
- } else {
- goto retry;
- }
+ deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
+ closure =
+ GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback,
+ elem, grpc_schedule_on_exec_ctx);
break;
}
- GPR_ASSERT(closure);
+ GPR_ASSERT(closure != NULL);
GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure,
gpr_now(GPR_CLOCK_MONOTONIC));
}
// Cancels the deadline timer.
+// This is called via the call combiner, so access to deadline_state is
+// synchronized.
static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_deadline_state* deadline_state) {
- if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING,
- GRPC_DEADLINE_STATE_FINISHED)) {
+ if (deadline_state->timer_state == GRPC_DEADLINE_STATE_PENDING) {
+ deadline_state->timer_state = GRPC_DEADLINE_STATE_FINISHED;
grpc_timer_cancel(exec_ctx, &deadline_state->timer);
} else {
// timer was either in STATE_INITAL (nothing to cancel)
@@ -131,6 +153,7 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
// Callback and associated state for starting the timer after call stack
// initialization has been completed.
struct start_timer_after_init_state {
+ bool in_call_combiner;
grpc_call_element* elem;
gpr_timespec deadline;
grpc_closure closure;
@@ -138,15 +161,29 @@ struct start_timer_after_init_state {
static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
struct start_timer_after_init_state* state = arg;
+ grpc_deadline_state* deadline_state = state->elem->call_data;
+ if (!state->in_call_combiner) {
+ // We are initially called without holding the call combiner, so we
+ // need to bounce ourselves into it.
+ state->in_call_combiner = true;
+ GRPC_CALL_COMBINER_START(exec_ctx, deadline_state->call_combiner,
+ &state->closure, GRPC_ERROR_REF(error),
+ "scheduling deadline timer");
+ return;
+ }
start_timer_if_needed(exec_ctx, state->elem, state->deadline);
gpr_free(state);
+ GRPC_CALL_COMBINER_STOP(exec_ctx, deadline_state->call_combiner,
+ "done scheduling deadline timer");
}
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_stack* call_stack,
+ grpc_call_combiner* call_combiner,
gpr_timespec deadline) {
grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
deadline_state->call_stack = call_stack;
+ deadline_state->call_combiner = call_combiner;
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -158,7 +195,7 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
// call stack initialization is finished. To avoid that problem, we
// create a closure to start the timer, and we schedule that closure
// to be run after call stack initialization is done.
- struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
+ struct start_timer_after_init_state* state = gpr_zalloc(sizeof(*state));
state->elem = elem;
state->deadline = deadline;
GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
@@ -232,7 +269,8 @@ typedef struct server_call_data {
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
const grpc_call_element_args* args) {
- grpc_deadline_state_init(exec_ctx, elem, args->call_stack, args->deadline);
+ grpc_deadline_state_init(exec_ctx, elem, args->call_stack,
+ args->call_combiner, args->deadline);
return GRPC_ERROR_NONE;
}
@@ -310,7 +348,6 @@ const grpc_channel_filter grpc_client_deadline_filter = {
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"deadline",
};
@@ -325,7 +362,6 @@ const grpc_channel_filter grpc_server_deadline_filter = {
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"deadline",
};