diff options
author | 2016-09-14 15:18:40 -0700 | |
---|---|---|
committer | 2016-09-14 15:18:40 -0700 | |
commit | f28763c68c9b3caf539f4d38ff123ae5de69e6d8 (patch) | |
tree | c5e7ccae33214c71f50fc006a0662bebc9e9787b /src/core/lib/channel/deadline_filter.c | |
parent | a99e02cc0d165a226bf57eb49a290f3a626168d7 (diff) |
Pass deadline into filters via grpc_call_element_args, so that we can
start the timer before the first op is sent down.
Diffstat (limited to 'src/core/lib/channel/deadline_filter.c')
-rw-r--r-- | src/core/lib/channel/deadline_filter.c | 58 |
1 files changed, 43 insertions, 15 deletions
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 010fedd7b7..079b98a2f8 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -34,10 +34,12 @@ #include <stdbool.h> #include <string.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/timer.h" // @@ -106,15 +108,49 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state, op->on_complete = &deadline_state->on_complete; } -void grpc_deadline_state_init(grpc_deadline_state* deadline_state, - grpc_call_stack* call_stack) { +// Callback and associated state for starting the timer after call stack +// initialization has been completed. +struct start_timer_after_init_state { + grpc_call_element* elem; + gpr_timespec deadline; + grpc_closure closure; +}; +static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + struct start_timer_after_init_state* state = arg; + start_timer_if_needed(exec_ctx, state->elem, state->deadline); + gpr_free(state); +} + +void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_call_element_args* args) { + grpc_deadline_state* deadline_state = elem->call_data; memset(deadline_state, 0, sizeof(*deadline_state)); - deadline_state->call_stack = call_stack; + deadline_state->call_stack = args->call_stack; gpr_mu_init(&deadline_state->timer_mu); + // Deadline will always be infinite on servers, so the timer will only be + // set on clients with a finite deadline. + const gpr_timespec deadline = + gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); + if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { + // When the deadline passes, we indicate the failure by sending down + // an op with cancel_error set. However, we can't send down any ops + // until after the call stack is fully initialized. If we start the + // timer here, we have no guarantee that the timer won't pop before + // call stack initialization is finished. To avoid that problem, we + // create a closure to start the timer, and we schedule that closure + // to be run after call stack initialization is done. + struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state)); + state->elem = elem; + state->deadline = deadline; + grpc_closure_init(&state->closure, start_timer_after_init, state); + grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL); + } } void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, - grpc_deadline_state* deadline_state) { + grpc_call_element* elem) { + grpc_deadline_state* deadline_state = elem->call_data; cancel_timer_if_needed(exec_ctx, deadline_state); gpr_mu_destroy(&deadline_state->timer_mu); } @@ -127,12 +163,6 @@ void grpc_deadline_state_client_start_transport_stream_op( op->close_error != GRPC_ERROR_NONE) { cancel_timer_if_needed(exec_ctx, deadline_state); } else { - // If we're sending initial metadata, get the deadline from the metadata - // and start the timer if needed. - if (op->send_initial_metadata != NULL) { - start_timer_if_needed(exec_ctx, elem, - op->send_initial_metadata->deadline); - } // Make sure we know when the call is complete, so that we can cancel // the timer. if (op->recv_trailing_metadata != NULL) { @@ -177,10 +207,9 @@ typedef struct server_call_data { static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_element_args* args) { - base_call_data* calld = elem->call_data; // Note: size of call data is different between client and server. - memset(calld, 0, elem->filter->sizeof_call_data); - grpc_deadline_state_init(&calld->deadline_state, args->call_stack); + memset(elem->call_data, 0, elem->filter->sizeof_call_data); + grpc_deadline_state_init(exec_ctx, elem, args); return GRPC_ERROR_NONE; } @@ -188,8 +217,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, void* and_free_memory) { - base_call_data* calld = elem->call_data; - grpc_deadline_state_destroy(exec_ctx, &calld->deadline_state); + grpc_deadline_state_destroy(exec_ctx, elem); } // Method for starting a call op for client filter. |