diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/channel/deadline_filter.c | 84 | ||||
-rw-r--r-- | src/core/lib/channel/deadline_filter.h | 34 |
2 files changed, 80 insertions, 38 deletions
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 5216338833..d2ea5250f6 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -64,30 +64,49 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, } // Starts the deadline timer. -static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - gpr_timespec deadline) { +static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + gpr_timespec deadline) { grpc_deadline_state* deadline_state = elem->call_data; deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { + // Note: We do not start the timer if there is already a timer + // pending. This should be okay, because this is only called from two + // functions exported by this module: grpc_deadline_state_start(), which + // starts the initial timer, and grpc_deadline_state_reset(), which + // cancels any pre-existing timer before starting a new one. In + // particular, we want to ensure that if grpc_deadline_state_start() + // winds up trying to start the timer after grpc_deadline_state_reset() + // has already done so, we ignore the value from the former. + if (!deadline_state->timer_pending && + gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { // Take a reference to the call stack, to be owned by the timer. GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); - gpr_mu_lock(&deadline_state->timer_mu); deadline_state->timer_pending = true; grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback, elem, gpr_now(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(&deadline_state->timer_mu); } } +static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + gpr_timespec deadline) { + grpc_deadline_state* deadline_state = elem->call_data; + gpr_mu_lock(&deadline_state->timer_mu); + start_timer_if_needed_locked(exec_ctx, elem, deadline); + gpr_mu_unlock(&deadline_state->timer_mu); +} // Cancels the deadline timer. -static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, - grpc_deadline_state* deadline_state) { - gpr_mu_lock(&deadline_state->timer_mu); +static void cancel_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, + grpc_deadline_state* deadline_state) { if (deadline_state->timer_pending) { grpc_timer_cancel(exec_ctx, &deadline_state->timer); deadline_state->timer_pending = false; } +} +static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, + grpc_deadline_state* deadline_state) { + gpr_mu_lock(&deadline_state->timer_mu); + cancel_timer_if_needed_locked(exec_ctx, deadline_state); gpr_mu_unlock(&deadline_state->timer_mu); } @@ -108,6 +127,21 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state, op->on_complete = &deadline_state->on_complete; } +void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_call_stack* call_stack) { + grpc_deadline_state* deadline_state = elem->call_data; + memset(deadline_state, 0, sizeof(*deadline_state)); + deadline_state->call_stack = call_stack; + gpr_mu_init(&deadline_state->timer_mu); +} + +void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { + grpc_deadline_state* deadline_state = elem->call_data; + cancel_timer_if_needed(exec_ctx, deadline_state); + gpr_mu_destroy(&deadline_state->timer_mu); +} + // Callback and associated state for starting the timer after call stack // initialization has been completed. struct start_timer_after_init_state { @@ -122,24 +156,11 @@ static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, gpr_free(state); } -void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_call_element_args* args, - grpc_method_config* method_config) { - grpc_deadline_state* deadline_state = elem->call_data; - memset(deadline_state, 0, sizeof(*deadline_state)); - deadline_state->call_stack = args->call_stack; - gpr_mu_init(&deadline_state->timer_mu); +void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + gpr_timespec deadline) { // Deadline will always be infinite on servers, so the timer will only be // set on clients with a finite deadline. - gpr_timespec deadline = - gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); - if (method_config != NULL) { - gpr_timespec* per_method_deadline = - grpc_method_config_get_timeout(method_config); - if (per_method_deadline != NULL) { - deadline = gpr_time_min(deadline, *per_method_deadline); - } - } + deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { // When the deadline passes, we indicate the failure by sending down // an op with cancel_error set. However, we can't send down any ops @@ -156,11 +177,13 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, } } -void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + gpr_timespec new_deadline) { grpc_deadline_state* deadline_state = elem->call_data; - cancel_timer_if_needed(exec_ctx, deadline_state); - gpr_mu_destroy(&deadline_state->timer_mu); + gpr_mu_lock(&deadline_state->timer_mu); + cancel_timer_if_needed_locked(exec_ctx, deadline_state); + start_timer_if_needed_locked(exec_ctx, elem, new_deadline); + gpr_mu_unlock(&deadline_state->timer_mu); } void grpc_deadline_state_client_start_transport_stream_op( @@ -217,7 +240,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element_args* args) { // Note: size of call data is different between client and server. memset(elem->call_data, 0, elem->filter->sizeof_call_data); - grpc_deadline_state_init(exec_ctx, elem, args, NULL /* method_config */); + grpc_deadline_state_init(exec_ctx, elem, args->call_stack); + grpc_deadline_state_start(exec_ctx, elem, args->deadline); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index ce6e8ea974..9fd3802721 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -56,19 +56,37 @@ typedef struct grpc_deadline_state { grpc_closure* next_on_complete; } grpc_deadline_state; -// To be used in a filter's init_call_elem(), destroy_call_elem(), and -// start_transport_stream_op() methods to enforce call deadlines. // -// REQUIRES: The first field in elem->call_data is a grpc_deadline_state. +// NOTE: All of these functions require that the first field in +// elem->call_data is a grpc_deadline_state. // -// For grpc_deadline_state_client_start_transport_stream_op(), it is the -// caller's responsibility to chain to the next filter if necessary -// after the function returns. + void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_call_element_args* args, - grpc_method_config* method_config); + grpc_call_stack* call_stack); void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, grpc_call_element* elem); + +// Starts the timer with the specified deadline. +// Should be called from the filter's init_call_elem() method. +void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + gpr_timespec deadline); + +// Cancels the existing timer and starts a new one with new_deadline. +// +// Note: It is generally safe to call this with an earlier deadline +// value than the current one, but not the reverse. No checks are done +// to ensure that the timer callback is not invoked while it is in the +// process of being reset, which means that attempting to increase the +// deadline may result in the timer being called twice. +void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + gpr_timespec new_deadline); + +// To be called from the client-side filter's start_transport_stream_op() +// method. Ensures that the deadline timer is cancelled when the call +// is completed. +// +// Note: It is the caller's responsibility to chain to the next filter if +// necessary after this function returns. void grpc_deadline_state_client_start_transport_stream_op( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op); |