aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/deadline
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2017-08-29 16:59:07 -0700
committerGravatar GitHub <noreply@github.com>2017-08-29 16:59:07 -0700
commitbf19961d0a49b43cb528392efeb4880eeebb9b5e (patch)
tree1d4c96db4d3bdc05c634e5d386c14a77845a1758 /src/core/ext/filters/deadline
parent9811915ba3fa1ccdf44b6a70fe1b1dd4782cd508 (diff)
Revert "Implement call combiner"
Diffstat (limited to 'src/core/ext/filters/deadline')
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.c112
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.h8
2 files changed, 39 insertions, 81 deletions
diff --git a/src/core/ext/filters/deadline/deadline_filter.c b/src/core/ext/filters/deadline/deadline_filter.c
index 565b0679dc..6789903c95 100644
--- a/src/core/ext/filters/deadline/deadline_filter.c
+++ b/src/core/ext/filters/deadline/deadline_filter.c
@@ -34,56 +34,22 @@
// grpc_deadline_state
//
-// The on_complete callback used when sending a cancel_error batch down the
-// filter stack. Yields the call combiner when the batch returns.
-static void yield_call_combiner(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* ignored) {
- grpc_deadline_state* deadline_state = arg;
- GRPC_CALL_COMBINER_STOP(exec_ctx, deadline_state->call_combiner,
- "got on_complete from cancel_stream batch");
- GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
-}
-
-// This is called via the call combiner, so access to deadline_state is
-// synchronized.
-static void send_cancel_op_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- grpc_call_element* elem = arg;
- grpc_deadline_state* deadline_state = elem->call_data;
- grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
- GRPC_CLOSURE_INIT(&deadline_state->timer_callback, yield_call_combiner,
- deadline_state, grpc_schedule_on_exec_ctx));
- batch->cancel_stream = true;
- batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
-}
-
// Timer callback.
static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
grpc_call_element* elem = (grpc_call_element*)arg;
grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
if (error != GRPC_ERROR_CANCELLED) {
- error = grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
- grpc_call_combiner_cancel(exec_ctx, deadline_state->call_combiner,
- GRPC_ERROR_REF(error));
- GRPC_CLOSURE_INIT(&deadline_state->timer_callback,
- send_cancel_op_in_call_combiner, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(exec_ctx, deadline_state->call_combiner,
- &deadline_state->timer_callback, error,
- "deadline exceeded -- sending cancel_stream op");
- } else {
- GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack,
- "deadline_timer");
+ grpc_call_element_signal_error(
+ exec_ctx, elem,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED));
}
+ GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
}
// Starts the deadline timer.
-// This is called via the call combiner, so access to deadline_state is
-// synchronized.
static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
gpr_timespec deadline) {
@@ -92,39 +58,51 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
return;
}
grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
+ grpc_deadline_timer_state cur_state;
grpc_closure* closure = NULL;
- switch (deadline_state->timer_state) {
+retry:
+ cur_state =
+ (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state);
+ switch (cur_state) {
case GRPC_DEADLINE_STATE_PENDING:
// Note: We do not start the timer if there is already a timer
return;
case GRPC_DEADLINE_STATE_FINISHED:
- deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
- // If we've already created and destroyed a timer, we always create a
- // new closure: we have no other guarantee that the inlined closure is
- // not in use (it may hold a pending call to timer_callback)
- closure =
- GRPC_CLOSURE_CREATE(timer_callback, elem, grpc_schedule_on_exec_ctx);
+ if (gpr_atm_rel_cas(&deadline_state->timer_state,
+ GRPC_DEADLINE_STATE_FINISHED,
+ GRPC_DEADLINE_STATE_PENDING)) {
+ // If we've already created and destroyed a timer, we always create a
+ // new closure: we have no other guarantee that the inlined closure is
+ // not in use (it may hold a pending call to timer_callback)
+ closure = GRPC_CLOSURE_CREATE(timer_callback, elem,
+ grpc_schedule_on_exec_ctx);
+ } else {
+ goto retry;
+ }
break;
case GRPC_DEADLINE_STATE_INITIAL:
- deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
- closure =
- GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback,
- elem, grpc_schedule_on_exec_ctx);
+ if (gpr_atm_rel_cas(&deadline_state->timer_state,
+ GRPC_DEADLINE_STATE_INITIAL,
+ GRPC_DEADLINE_STATE_PENDING)) {
+ closure =
+ GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback,
+ elem, grpc_schedule_on_exec_ctx);
+ } else {
+ goto retry;
+ }
break;
}
- GPR_ASSERT(closure != NULL);
+ GPR_ASSERT(closure);
GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure,
gpr_now(GPR_CLOCK_MONOTONIC));
}
// Cancels the deadline timer.
-// This is called via the call combiner, so access to deadline_state is
-// synchronized.
static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_deadline_state* deadline_state) {
- if (deadline_state->timer_state == GRPC_DEADLINE_STATE_PENDING) {
- deadline_state->timer_state = GRPC_DEADLINE_STATE_FINISHED;
+ if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING,
+ GRPC_DEADLINE_STATE_FINISHED)) {
grpc_timer_cancel(exec_ctx, &deadline_state->timer);
} else {
// timer was either in STATE_INITAL (nothing to cancel)
@@ -153,7 +131,6 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
// Callback and associated state for starting the timer after call stack
// initialization has been completed.
struct start_timer_after_init_state {
- bool in_call_combiner;
grpc_call_element* elem;
gpr_timespec deadline;
grpc_closure closure;
@@ -161,29 +138,15 @@ struct start_timer_after_init_state {
static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
struct start_timer_after_init_state* state = arg;
- grpc_deadline_state* deadline_state = state->elem->call_data;
- if (!state->in_call_combiner) {
- // We are initially called without holding the call combiner, so we
- // need to bounce ourselves into it.
- state->in_call_combiner = true;
- GRPC_CALL_COMBINER_START(exec_ctx, deadline_state->call_combiner,
- &state->closure, GRPC_ERROR_REF(error),
- "scheduling deadline timer");
- return;
- }
start_timer_if_needed(exec_ctx, state->elem, state->deadline);
gpr_free(state);
- GRPC_CALL_COMBINER_STOP(exec_ctx, deadline_state->call_combiner,
- "done scheduling deadline timer");
}
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_stack* call_stack,
- grpc_call_combiner* call_combiner,
gpr_timespec deadline) {
grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
deadline_state->call_stack = call_stack;
- deadline_state->call_combiner = call_combiner;
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -195,7 +158,7 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
// call stack initialization is finished. To avoid that problem, we
// create a closure to start the timer, and we schedule that closure
// to be run after call stack initialization is done.
- struct start_timer_after_init_state* state = gpr_zalloc(sizeof(*state));
+ struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
state->elem = elem;
state->deadline = deadline;
GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
@@ -269,8 +232,7 @@ typedef struct server_call_data {
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
const grpc_call_element_args* args) {
- grpc_deadline_state_init(exec_ctx, elem, args->call_stack,
- args->call_combiner, args->deadline);
+ grpc_deadline_state_init(exec_ctx, elem, args->call_stack, args->deadline);
return GRPC_ERROR_NONE;
}
@@ -348,6 +310,7 @@ const grpc_channel_filter grpc_client_deadline_filter = {
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"deadline",
};
@@ -362,6 +325,7 @@ const grpc_channel_filter grpc_server_deadline_filter = {
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"deadline",
};
diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h
index 3eb102ad28..420bf7065a 100644
--- a/src/core/ext/filters/deadline/deadline_filter.h
+++ b/src/core/ext/filters/deadline/deadline_filter.h
@@ -31,8 +31,7 @@ typedef enum grpc_deadline_timer_state {
typedef struct grpc_deadline_state {
// We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
- grpc_call_combiner* call_combiner;
- grpc_deadline_timer_state timer_state;
+ gpr_atm timer_state;
grpc_timer timer;
grpc_closure timer_callback;
// Closure to invoke when the call is complete.
@@ -51,7 +50,6 @@ typedef struct grpc_deadline_state {
// assumes elem->call_data is zero'd
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_stack* call_stack,
- grpc_call_combiner* call_combiner,
gpr_timespec deadline);
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem);
@@ -63,8 +61,6 @@ void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
// to ensure that the timer callback is not invoked while it is in the
// process of being reset, which means that attempting to increase the
// deadline may result in the timer being called twice.
-//
-// Note: Must be called while holding the call combiner.
void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
gpr_timespec new_deadline);
@@ -74,8 +70,6 @@ void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
//
// Note: It is the caller's responsibility to chain to the next filter if
// necessary after this function returns.
-//
-// Note: Must be called while holding the call combiner.
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op_batch* op);