aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2016-08-26 11:18:00 -0700
committerGravatar Mark D. Roth <roth@google.com>2016-08-26 11:18:00 -0700
commitd2b4533df27c54fc782be88b0b0ea581066fac40 (patch)
tree010cab69dffe506a404266f35d3051c64373fab6
parent14c072ccc0ee33c26c55999344a501d2f1f2d93d (diff)
Cancel deadline timer from on_complete instead of destroy_call_elem().
-rw-r--r--src/core/lib/channel/deadline_filter.c49
1 files changed, 35 insertions, 14 deletions
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
index 032dea0221..852fbaf003 100644
--- a/src/core/lib/channel/deadline_filter.c
+++ b/src/core/lib/channel/deadline_filter.c
@@ -45,9 +45,18 @@ typedef struct channel_data {
// Call data used for both client and server filter.
typedef struct base_call_data {
+ // We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
+ // True if the timer callback is currently pending.
bool timer_pending;
+ // The deadline timer.
grpc_timer timer;
+ // Closure to invoke when the call is complete.
+ // We use this to cancel the timer.
+ grpc_closure on_complete;
+ // The original on_complete closure, which we chain to after our own
+ // closure is invoked.
+ grpc_closure* next_on_complete;
} base_call_data;
// Additional call data used only for the server filter.
@@ -89,16 +98,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx,
static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
const grpc_call_final_info* final_info,
void* and_free_memory) {
- base_call_data* calld = elem->call_data;
-gpr_log(GPR_INFO, "==> destroy_call_elem()");
-// FIXME: this is not working -- timer holds a ref, so we won't get
-// called until after timer pops
- if (calld->timer_pending)
-{
-gpr_log(GPR_INFO, "CANCELLING TIMER");
- grpc_timer_cancel(exec_ctx, &calld->timer);
-}
-
}
// Timer callback.
@@ -108,13 +107,10 @@ static void timer_callback(grpc_exec_ctx *exec_ctx, void *arg,
base_call_data* calld = elem->call_data;
calld->timer_pending = false;
if (error != GRPC_ERROR_CANCELLED) {
-gpr_log(GPR_INFO, "DEADLINE EXCEEDED");
gpr_slice message = gpr_slice_from_static_string("Deadline Exceeded");
grpc_call_element_send_cancel_with_message(
exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &message);
}
-else gpr_log(GPR_INFO, "TIMER CANCELLED");
-gpr_log(GPR_INFO, "UNREF");
GRPC_CALL_STACK_UNREF(exec_ctx, calld->call_stack, "deadline");
}
@@ -126,7 +122,6 @@ static void start_timer_if_needed(grpc_exec_ctx *exec_ctx,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
// Take a reference to the call stack, to be owned by the timer.
-gpr_log(GPR_INFO, "REF");
GRPC_CALL_STACK_REF(calld->call_stack, "deadline");
grpc_timer_init(exec_ctx, &calld->timer, deadline, timer_callback, elem,
gpr_now(GPR_CLOCK_MONOTONIC));
@@ -134,16 +129,35 @@ gpr_log(GPR_INFO, "REF");
}
}
+// Callback run when the call is complete.
+static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ base_call_data* calld = arg;
+ if (calld->timer_pending) {
+ grpc_timer_cancel(exec_ctx, &calld->timer);
+ calld->timer_pending = false;
+ }
+ // Invoke the next callback.
+ calld->next_on_complete->cb(exec_ctx, calld->next_on_complete->cb_arg, error);
+}
+
// Method for starting a call op for client filter.
static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
+ base_call_data* calld = elem->call_data;
// If we're sending initial metadata, get the deadline from the metadata
// and start the timer if needed.
if (op->send_initial_metadata != NULL) {
start_timer_if_needed(exec_ctx, elem,
op->send_initial_metadata->deadline);
}
+ // Make sure we know when the call is complete, so that we can cancel
+ // the timer.
+ if (op->recv_trailing_metadata != NULL) {
+ calld->next_on_complete = op->on_complete;
+ grpc_closure_init(&calld->on_complete, on_complete, calld);
+ op->on_complete = &calld->on_complete;
+ }
// Chain to next filter.
grpc_call_next_op(exec_ctx, elem, op);
}
@@ -176,6 +190,13 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
recv_initial_metadata_ready, elem);
op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready;
}
+ // Make sure we know when the call is complete, so that we can cancel
+ // the timer.
+ if (op->send_trailing_metadata != NULL) {
+ calld->base.next_on_complete = op->on_complete;
+ grpc_closure_init(&calld->base.on_complete, on_complete, calld);
+ op->on_complete = &calld->base.on_complete;
+ }
// Chain to next filter.
grpc_call_next_op(exec_ctx, elem, op);
}