diff options
author | Mark D. Roth <roth@google.com> | 2016-08-26 11:18:00 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-08-26 11:18:00 -0700 |
commit | d2b4533df27c54fc782be88b0b0ea581066fac40 (patch) | |
tree | 010cab69dffe506a404266f35d3051c64373fab6 | |
parent | 14c072ccc0ee33c26c55999344a501d2f1f2d93d (diff) |
Cancel deadline timer from on_complete instead of destroy_call_elem().
-rw-r--r-- | src/core/lib/channel/deadline_filter.c | 49 |
1 files changed, 35 insertions, 14 deletions
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 032dea0221..852fbaf003 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -45,9 +45,18 @@ typedef struct channel_data { // Call data used for both client and server filter. typedef struct base_call_data { + // We take a reference to the call stack for the timer callback. grpc_call_stack* call_stack; + // True if the timer callback is currently pending. bool timer_pending; + // The deadline timer. grpc_timer timer; + // Closure to invoke when the call is complete. + // We use this to cancel the timer. + grpc_closure on_complete; + // The original on_complete closure, which we chain to after our own + // closure is invoked. + grpc_closure* next_on_complete; } base_call_data; // Additional call data used only for the server filter. @@ -89,16 +98,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx, static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, void* and_free_memory) { - base_call_data* calld = elem->call_data; -gpr_log(GPR_INFO, "==> destroy_call_elem()"); -// FIXME: this is not working -- timer holds a ref, so we won't get -// called until after timer pops - if (calld->timer_pending) -{ -gpr_log(GPR_INFO, "CANCELLING TIMER"); - grpc_timer_cancel(exec_ctx, &calld->timer); -} - } // Timer callback. @@ -108,13 +107,10 @@ static void timer_callback(grpc_exec_ctx *exec_ctx, void *arg, base_call_data* calld = elem->call_data; calld->timer_pending = false; if (error != GRPC_ERROR_CANCELLED) { -gpr_log(GPR_INFO, "DEADLINE EXCEEDED"); gpr_slice message = gpr_slice_from_static_string("Deadline Exceeded"); grpc_call_element_send_cancel_with_message( exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &message); } -else gpr_log(GPR_INFO, "TIMER CANCELLED"); -gpr_log(GPR_INFO, "UNREF"); GRPC_CALL_STACK_UNREF(exec_ctx, calld->call_stack, "deadline"); } @@ -126,7 +122,6 @@ static void start_timer_if_needed(grpc_exec_ctx *exec_ctx, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { // Take a reference to the call stack, to be owned by the timer. -gpr_log(GPR_INFO, "REF"); GRPC_CALL_STACK_REF(calld->call_stack, "deadline"); grpc_timer_init(exec_ctx, &calld->timer, deadline, timer_callback, elem, gpr_now(GPR_CLOCK_MONOTONIC)); @@ -134,16 +129,35 @@ gpr_log(GPR_INFO, "REF"); } } +// Callback run when the call is complete. +static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + base_call_data* calld = arg; + if (calld->timer_pending) { + grpc_timer_cancel(exec_ctx, &calld->timer); + calld->timer_pending = false; + } + // Invoke the next callback. + calld->next_on_complete->cb(exec_ctx, calld->next_on_complete->cb_arg, error); +} + // Method for starting a call op for client filter. static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op) { + base_call_data* calld = elem->call_data; // If we're sending initial metadata, get the deadline from the metadata // and start the timer if needed. if (op->send_initial_metadata != NULL) { start_timer_if_needed(exec_ctx, elem, op->send_initial_metadata->deadline); } + // Make sure we know when the call is complete, so that we can cancel + // the timer. + if (op->recv_trailing_metadata != NULL) { + calld->next_on_complete = op->on_complete; + grpc_closure_init(&calld->on_complete, on_complete, calld); + op->on_complete = &calld->on_complete; + } // Chain to next filter. grpc_call_next_op(exec_ctx, elem, op); } @@ -176,6 +190,13 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, recv_initial_metadata_ready, elem); op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; } + // Make sure we know when the call is complete, so that we can cancel + // the timer. + if (op->send_trailing_metadata != NULL) { + calld->base.next_on_complete = op->on_complete; + grpc_closure_init(&calld->base.on_complete, on_complete, calld); + op->on_complete = &calld->base.on_complete; + } // Chain to next filter. grpc_call_next_op(exec_ctx, elem, op); } |