From 4447c2c6fc51ad0b9cbb542338084c8000543c8e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 16 Feb 2017 12:35:13 -0800 Subject: Lock free deadline filter --- src/core/lib/channel/deadline_filter.c | 90 ++++++++++++++++------------------ src/core/lib/channel/deadline_filter.h | 17 ++++--- 2 files changed, 53 insertions(+), 54 deletions(-) diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 720cfb44e2..fcc08c53ac 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -43,6 +43,8 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/slice/slice_internal.h" +#define TOMBSTONE_TIMER 1 + // // grpc_deadline_state // @@ -52,9 +54,6 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_call_element* elem = arg; grpc_deadline_state* deadline_state = elem->call_data; - gpr_mu_lock(&deadline_state->timer_mu); - deadline_state->timer_pending = false; - gpr_mu_unlock(&deadline_state->timer_mu); if (error != GRPC_ERROR_CANCELLED) { grpc_call_element_signal_error( exec_ctx, elem, @@ -66,53 +65,54 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, } // Starts the deadline timer. -static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - gpr_timespec deadline) { - grpc_deadline_state* deadline_state = elem->call_data; - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - // Note: We do not start the timer if there is already a timer - // pending. This should be okay, because this is only called from two - // functions exported by this module: grpc_deadline_state_start(), which - // starts the initial timer, and grpc_deadline_state_reset(), which - // cancels any pre-existing timer before starting a new one. In - // particular, we want to ensure that if grpc_deadline_state_start() - // winds up trying to start the timer after grpc_deadline_state_reset() - // has already done so, we ignore the value from the former. - if (!deadline_state->timer_pending && - gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { - // Take a reference to the call stack, to be owned by the timer. - GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); - deadline_state->timer_pending = true; - grpc_closure_init(&deadline_state->timer_callback, timer_callback, elem, - grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, - &deadline_state->timer_callback, - gpr_now(GPR_CLOCK_MONOTONIC)); - } -} static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec deadline) { + deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); + if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { + return; + } grpc_deadline_state* deadline_state = elem->call_data; - gpr_mu_lock(&deadline_state->timer_mu); - start_timer_if_needed_locked(exec_ctx, elem, deadline); - gpr_mu_unlock(&deadline_state->timer_mu); + for (size_t i = 0; i < GPR_ARRAY_SIZE(deadline_state->timers); i++) { + if (gpr_atm_acq_load(&deadline_state->timers[i]) == 0) { + grpc_deadline_timer* timer = (i == 0 ? &deadline_state->inlined_timer + : gpr_malloc(sizeof(*timer))); + if (gpr_atm_rel_cas(&deadline_state->timers[i], 0, (gpr_atm)timer)) { + grpc_timer_init( + exec_ctx, &timer->timer, deadline, + grpc_closure_init(&timer->timer_callback, timer_callback, elem, + grpc_schedule_on_exec_ctx), + gpr_now(GPR_CLOCK_MONOTONIC)); + } else if (i != 0) { + gpr_free(timer); + } + } + } + GPR_UNREACHABLE_CODE(return;); } // Cancels the deadline timer. -static void cancel_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, - grpc_deadline_state* deadline_state) { - if (deadline_state->timer_pending) { - grpc_timer_cancel(exec_ctx, &deadline_state->timer); - deadline_state->timer_pending = false; - } -} static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, grpc_deadline_state* deadline_state) { - gpr_mu_lock(&deadline_state->timer_mu); - cancel_timer_if_needed_locked(exec_ctx, deadline_state); - gpr_mu_unlock(&deadline_state->timer_mu); + for (size_t i = 0; i < GPR_ARRAY_SIZE(deadline_state->timers); i++) { + gpr_atm timer_val; + timer_val = gpr_atm_acq_load(&deadline_state->timers[i]); + switch (timer_val) { + case 0: + break; + case TOMBSTONE_TIMER: + break; + default: + if (!gpr_atm_rel_cas(&deadline_state->timers[i], timer_val, + TOMBSTONE_TIMER)) { + break; // must have become a tombstone + } + grpc_deadline_timer* timer = (grpc_deadline_timer*)timer_val; + grpc_timer_cancel(exec_ctx, &timer->timer); + if (i != 0) gpr_free(timer); + break; + } + } } // Callback run when the call is complete. @@ -138,14 +138,12 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_deadline_state* deadline_state = elem->call_data; memset(deadline_state, 0, sizeof(*deadline_state)); deadline_state->call_stack = call_stack; - gpr_mu_init(&deadline_state->timer_mu); } void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, grpc_call_element* elem) { grpc_deadline_state* deadline_state = elem->call_data; cancel_timer_if_needed(exec_ctx, deadline_state); - gpr_mu_destroy(&deadline_state->timer_mu); } // Callback and associated state for starting the timer after call stack @@ -187,10 +185,8 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec new_deadline) { grpc_deadline_state* deadline_state = elem->call_data; - gpr_mu_lock(&deadline_state->timer_mu); - cancel_timer_if_needed_locked(exec_ctx, deadline_state); - start_timer_if_needed_locked(exec_ctx, elem, new_deadline); - gpr_mu_unlock(&deadline_state->timer_mu); + cancel_timer_if_needed(exec_ctx, deadline_state); + start_timer_if_needed(exec_ctx, elem, new_deadline); } void grpc_deadline_state_client_start_transport_stream_op( diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index bd2b84f79e..45d524a830 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -35,18 +35,21 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/timer.h" +typedef struct grpc_deadline_timer { + grpc_timer timer; + grpc_closure timer_callback; +} grpc_deadline_timer; + // State used for filters that enforce call deadlines. // Must be the first field in the filter's call_data. typedef struct grpc_deadline_state { // We take a reference to the call stack for the timer callback. grpc_call_stack* call_stack; - // Guards access to timer_pending and timer. - gpr_mu timer_mu; - // True if the timer callback is currently pending. - bool timer_pending; - // The deadline timer. - grpc_timer timer; - grpc_closure timer_callback; + // We allow an initial timer and one reset.. these atomics point to the + // grpc_deadline_timer instances + gpr_atm timers[2]; + // Pre-allocated initial timer. + grpc_deadline_timer inlined_timer; // Closure to invoke when the call is complete. // We use this to cancel the timer. grpc_closure on_complete; -- cgit v1.2.3 From 0a77de87ad8c5d2de0998186026c93a99afa5914 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 16 Feb 2017 12:39:33 -0800 Subject: Fix conditional, optimize cancellation --- src/core/lib/channel/deadline_filter.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index fcc08c53ac..fbc858bccc 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -69,7 +69,7 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec deadline) { deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { + if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) { return; } grpc_deadline_state* deadline_state = elem->call_data; @@ -99,7 +99,7 @@ static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, timer_val = gpr_atm_acq_load(&deadline_state->timers[i]); switch (timer_val) { case 0: - break; + return; case TOMBSTONE_TIMER: break; default: -- cgit v1.2.3 From c84886b2754d77cd413313b1c3719e5f638b814e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 16 Feb 2017 13:10:38 -0800 Subject: Better implementation, flip timer logic to make 0-init pre-triggered --- src/core/lib/channel/deadline_filter.c | 69 ++++++++++++++++++---------------- src/core/lib/channel/deadline_filter.h | 17 ++++----- src/core/lib/iomgr/timer_generic.c | 18 ++++----- src/core/lib/iomgr/timer_generic.h | 2 +- src/core/lib/iomgr/timer_uv.c | 12 +++--- src/core/lib/iomgr/timer_uv.h | 2 +- 6 files changed, 61 insertions(+), 59 deletions(-) diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index fbc858bccc..8a8c3cb830 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -73,45 +73,48 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, return; } grpc_deadline_state* deadline_state = elem->call_data; - for (size_t i = 0; i < GPR_ARRAY_SIZE(deadline_state->timers); i++) { - if (gpr_atm_acq_load(&deadline_state->timers[i]) == 0) { - grpc_deadline_timer* timer = (i == 0 ? &deadline_state->inlined_timer - : gpr_malloc(sizeof(*timer))); - if (gpr_atm_rel_cas(&deadline_state->timers[i], 0, (gpr_atm)timer)) { - grpc_timer_init( - exec_ctx, &timer->timer, deadline, - grpc_closure_init(&timer->timer_callback, timer_callback, elem, - grpc_schedule_on_exec_ctx), - gpr_now(GPR_CLOCK_MONOTONIC)); - } else if (i != 0) { - gpr_free(timer); + grpc_deadline_timer_state cur_state; + grpc_closure* closure = NULL; +retry: + cur_state = + (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state); + switch (cur_state) { + case GRPC_DEADLINE_STATE_PENDING: + return; + case GRPC_DEADLINE_STATE_FINISHED: + if (gpr_atm_rel_cas(&deadline_state->timer_state, + GRPC_DEADLINE_STATE_FINISHED, + GRPC_DEADLINE_STATE_PENDING)) { + closure = grpc_closure_create(timer_callback, elem, + grpc_schedule_on_exec_ctx); + } else { + goto retry; } - } + break; + case GRPC_DEADLINE_STATE_INITIAL: + if (gpr_atm_rel_cas(&deadline_state->timer_state, + GRPC_DEADLINE_STATE_INITIAL, + GRPC_DEADLINE_STATE_PENDING)) { + closure = + grpc_closure_init(&deadline_state->timer_callback, timer_callback, + elem, grpc_schedule_on_exec_ctx); + } else { + goto retry; + } + break; } + GPR_ASSERT(closure); + grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure, + gpr_now(GPR_CLOCK_MONOTONIC)); GPR_UNREACHABLE_CODE(return;); } // Cancels the deadline timer. static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, grpc_deadline_state* deadline_state) { - for (size_t i = 0; i < GPR_ARRAY_SIZE(deadline_state->timers); i++) { - gpr_atm timer_val; - timer_val = gpr_atm_acq_load(&deadline_state->timers[i]); - switch (timer_val) { - case 0: - return; - case TOMBSTONE_TIMER: - break; - default: - if (!gpr_atm_rel_cas(&deadline_state->timers[i], timer_val, - TOMBSTONE_TIMER)) { - break; // must have become a tombstone - } - grpc_deadline_timer* timer = (grpc_deadline_timer*)timer_val; - grpc_timer_cancel(exec_ctx, &timer->timer); - if (i != 0) gpr_free(timer); - break; - } + if (gpr_atm_acq_load(&deadline_state->timer_state) != + GRPC_DEADLINE_STATE_INITIAL) { + grpc_timer_cancel(exec_ctx, &deadline_state->timer); } } @@ -120,8 +123,8 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_deadline_state* deadline_state = arg; cancel_timer_if_needed(exec_ctx, deadline_state); // Invoke the next callback. - deadline_state->next_on_complete->cb( - exec_ctx, deadline_state->next_on_complete->cb_arg, error); + grpc_closure_run(exec_ctx, deadline_state->next_on_complete, + GRPC_ERROR_REF(error)); } // Inject our own on_complete callback into op. diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index 45d524a830..94717f6bc7 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -35,21 +35,20 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/timer.h" -typedef struct grpc_deadline_timer { - grpc_timer timer; - grpc_closure timer_callback; -} grpc_deadline_timer; +typedef enum grpc_deadline_timer_state { + GRPC_DEADLINE_STATE_INITIAL, + GRPC_DEADLINE_STATE_PENDING, + GRPC_DEADLINE_STATE_FINISHED +} grpc_deadline_timer_state; // State used for filters that enforce call deadlines. // Must be the first field in the filter's call_data. typedef struct grpc_deadline_state { // We take a reference to the call stack for the timer callback. grpc_call_stack* call_stack; - // We allow an initial timer and one reset.. these atomics point to the - // grpc_deadline_timer instances - gpr_atm timers[2]; - // Pre-allocated initial timer. - grpc_deadline_timer inlined_timer; + gpr_atm timer_state; + grpc_timer timer; + grpc_closure timer_callback; // Closure to invoke when the call is complete. // We use this to cancel the timer. grpc_closure on_complete; diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 40c8351472..45a32f10da 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -186,25 +186,25 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; timer->deadline = deadline; - timer->triggered = 0; if (!g_initialized) { - timer->triggered = 1; + timer->pending = false; grpc_closure_sched( exec_ctx, timer->closure, GRPC_ERROR_CREATE("Attempt to create timer before initialization")); return; } + gpr_mu_lock(&shard->mu); + timer->pending = true; if (gpr_time_cmp(deadline, now) <= 0) { - timer->triggered = 1; + timer->pending = false; grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE); + gpr_mu_unlock(&shard->mu); + /* early out */ return; } - /* TODO(ctiller): check deadline expired */ - - gpr_mu_lock(&shard->mu); grpc_time_averaged_stats_add_sample(&shard->stats, ts_to_dbl(gpr_time_sub(deadline, now))); if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) { @@ -249,9 +249,9 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { shard_type *shard = &g_shards[shard_idx(timer)]; gpr_mu_lock(&shard->mu); - if (!timer->triggered) { + if (timer->pending) { grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); - timer->triggered = 1; + timer->pending = false; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); } else { @@ -302,7 +302,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { } timer = grpc_timer_heap_top(&shard->heap); if (gpr_time_cmp(timer->deadline, now) > 0) return NULL; - timer->triggered = 1; + timer->pending = false; grpc_timer_heap_pop(&shard->heap); return timer; } diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h index 9d901c7e68..1608dce9fb 100644 --- a/src/core/lib/iomgr/timer_generic.h +++ b/src/core/lib/iomgr/timer_generic.h @@ -40,7 +40,7 @@ struct grpc_timer { gpr_timespec deadline; uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */ - int triggered; + bool pending; struct grpc_timer *next; struct grpc_timer *prev; grpc_closure *closure; diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index fa2cdee964..f28a14405d 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -53,8 +53,8 @@ static void stop_uv_timer(uv_timer_t *handle) { void run_expired_timer(uv_timer_t *handle) { grpc_timer *timer = (grpc_timer *)handle->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_ASSERT(!timer->triggered); - timer->triggered = 1; + GPR_ASSERT(timer->pending); + timer->pending = 0; grpc_closure_sched(&exec_ctx, timer->closure, GRPC_ERROR_NONE); stop_uv_timer(handle); grpc_exec_ctx_finish(&exec_ctx); @@ -67,11 +67,11 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, uv_timer_t *uv_timer; timer->closure = closure; if (gpr_time_cmp(deadline, now) <= 0) { - timer->triggered = 1; + timer->pending = 0; grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE); return; } - timer->triggered = 0; + timer->pending = 1; timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); uv_timer = gpr_malloc(sizeof(uv_timer_t)); uv_timer_init(uv_default_loop(), uv_timer); @@ -81,8 +81,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { - if (!timer->triggered) { - timer->triggered = 1; + if (timer->pending) { + timer->pending = 0; grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); stop_uv_timer((uv_timer_t *)timer->uv_timer); } diff --git a/src/core/lib/iomgr/timer_uv.h b/src/core/lib/iomgr/timer_uv.h index 13cf8bd4fa..9870cd4a5c 100644 --- a/src/core/lib/iomgr/timer_uv.h +++ b/src/core/lib/iomgr/timer_uv.h @@ -41,7 +41,7 @@ struct grpc_timer { /* This is actually a uv_timer_t*, but we want to keep platform-specific types out of headers */ void *uv_timer; - int triggered; + int pending; }; #endif /* GRPC_CORE_LIB_IOMGR_TIMER_UV_H */ -- cgit v1.2.3 From ac942f430f5219633111774d95b1d7a68aaf0dee Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 22 Feb 2017 09:13:14 -0800 Subject: Fix refcounting bug --- src/core/lib/channel/deadline_filter.c | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 8a8c3cb830..f9668be0fa 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -43,8 +43,6 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/slice/slice_internal.h" -#define TOMBSTONE_TIMER 1 - // // grpc_deadline_state // @@ -80,11 +78,15 @@ retry: (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state); switch (cur_state) { case GRPC_DEADLINE_STATE_PENDING: + // Note: We do not start the timer if there is already a timer return; case GRPC_DEADLINE_STATE_FINISHED: if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_FINISHED, GRPC_DEADLINE_STATE_PENDING)) { + // If we've already created and destroyed a timer, we always create a + // new closure: we have no other guarantee that the inlined closure is + // not in use (it may hold a pending call to timer_callback) closure = grpc_closure_create(timer_callback, elem, grpc_schedule_on_exec_ctx); } else { @@ -104,17 +106,20 @@ retry: break; } GPR_ASSERT(closure); + GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure, gpr_now(GPR_CLOCK_MONOTONIC)); - GPR_UNREACHABLE_CODE(return;); } // Cancels the deadline timer. static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, grpc_deadline_state* deadline_state) { - if (gpr_atm_acq_load(&deadline_state->timer_state) != - GRPC_DEADLINE_STATE_INITIAL) { + if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING, + GRPC_DEADLINE_STATE_FINISHED)) { grpc_timer_cancel(exec_ctx, &deadline_state->timer); + } else { + // timer was either in STATE_INITAL (nothing to cancel) + // OR in STATE_FINISHED (again nothing to cancel) } } -- cgit v1.2.3