diff options
author | Craig Tiller <ctiller@google.com> | 2017-02-23 06:45:04 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-02-23 06:45:04 -0800 |
commit | 4374fd3a908cd811ac51d9f71ca093cb64604096 (patch) | |
tree | e68a3317a402d93e791346ccd331fb4da0c05424 /src/core | |
parent | 473380e9f1516a9ba916738299751f486740e5ef (diff) | |
parent | 7e1d5178a1e5d9a7dc5a8ec593a4b87b36531ec3 (diff) |
Merge pull request #9750 from ctiller/deadline_mu
Eliminate mutex in deadline_filter
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/channel/deadline_filter.c | 102 | ||||
-rw-r--r-- | src/core/lib/channel/deadline_filter.h | 12 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_generic.c | 18 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_generic.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_uv.c | 12 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_uv.h | 2 |
6 files changed, 77 insertions, 71 deletions
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 720cfb44e2..f9668be0fa 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -52,9 +52,6 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_call_element* elem = arg; grpc_deadline_state* deadline_state = elem->call_data; - gpr_mu_lock(&deadline_state->timer_mu); - deadline_state->timer_pending = false; - gpr_mu_unlock(&deadline_state->timer_mu); if (error != GRPC_ERROR_CANCELLED) { grpc_call_element_signal_error( exec_ctx, elem, @@ -66,53 +63,64 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, } // Starts the deadline timer. -static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - gpr_timespec deadline) { - grpc_deadline_state* deadline_state = elem->call_data; - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - // Note: We do not start the timer if there is already a timer - // pending. This should be okay, because this is only called from two - // functions exported by this module: grpc_deadline_state_start(), which - // starts the initial timer, and grpc_deadline_state_reset(), which - // cancels any pre-existing timer before starting a new one. In - // particular, we want to ensure that if grpc_deadline_state_start() - // winds up trying to start the timer after grpc_deadline_state_reset() - // has already done so, we ignore the value from the former. - if (!deadline_state->timer_pending && - gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { - // Take a reference to the call stack, to be owned by the timer. - GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); - deadline_state->timer_pending = true; - grpc_closure_init(&deadline_state->timer_callback, timer_callback, elem, - grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, - &deadline_state->timer_callback, - gpr_now(GPR_CLOCK_MONOTONIC)); - } -} static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec deadline) { + deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); + if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) { + return; + } grpc_deadline_state* deadline_state = elem->call_data; - gpr_mu_lock(&deadline_state->timer_mu); - start_timer_if_needed_locked(exec_ctx, elem, deadline); - gpr_mu_unlock(&deadline_state->timer_mu); + grpc_deadline_timer_state cur_state; + grpc_closure* closure = NULL; +retry: + cur_state = + (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state); + switch (cur_state) { + case GRPC_DEADLINE_STATE_PENDING: + // Note: We do not start the timer if there is already a timer + return; + case GRPC_DEADLINE_STATE_FINISHED: + if (gpr_atm_rel_cas(&deadline_state->timer_state, + GRPC_DEADLINE_STATE_FINISHED, + GRPC_DEADLINE_STATE_PENDING)) { + // If we've already created and destroyed a timer, we always create a + // new closure: we have no other guarantee that the inlined closure is + // not in use (it may hold a pending call to timer_callback) + closure = grpc_closure_create(timer_callback, elem, + grpc_schedule_on_exec_ctx); + } else { + goto retry; + } + break; + case GRPC_DEADLINE_STATE_INITIAL: + if (gpr_atm_rel_cas(&deadline_state->timer_state, + GRPC_DEADLINE_STATE_INITIAL, + GRPC_DEADLINE_STATE_PENDING)) { + closure = + grpc_closure_init(&deadline_state->timer_callback, timer_callback, + elem, grpc_schedule_on_exec_ctx); + } else { + goto retry; + } + break; + } + GPR_ASSERT(closure); + GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); + grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure, + gpr_now(GPR_CLOCK_MONOTONIC)); } // Cancels the deadline timer. -static void cancel_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, - grpc_deadline_state* deadline_state) { - if (deadline_state->timer_pending) { - grpc_timer_cancel(exec_ctx, &deadline_state->timer); - deadline_state->timer_pending = false; - } -} static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, grpc_deadline_state* deadline_state) { - gpr_mu_lock(&deadline_state->timer_mu); - cancel_timer_if_needed_locked(exec_ctx, deadline_state); - gpr_mu_unlock(&deadline_state->timer_mu); + if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING, + GRPC_DEADLINE_STATE_FINISHED)) { + grpc_timer_cancel(exec_ctx, &deadline_state->timer); + } else { + // timer was either in STATE_INITAL (nothing to cancel) + // OR in STATE_FINISHED (again nothing to cancel) + } } // Callback run when the call is complete. @@ -120,8 +128,8 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_deadline_state* deadline_state = arg; cancel_timer_if_needed(exec_ctx, deadline_state); // Invoke the next callback. - deadline_state->next_on_complete->cb( - exec_ctx, deadline_state->next_on_complete->cb_arg, error); + grpc_closure_run(exec_ctx, deadline_state->next_on_complete, + GRPC_ERROR_REF(error)); } // Inject our own on_complete callback into op. @@ -138,14 +146,12 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_deadline_state* deadline_state = elem->call_data; memset(deadline_state, 0, sizeof(*deadline_state)); deadline_state->call_stack = call_stack; - gpr_mu_init(&deadline_state->timer_mu); } void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, grpc_call_element* elem) { grpc_deadline_state* deadline_state = elem->call_data; cancel_timer_if_needed(exec_ctx, deadline_state); - gpr_mu_destroy(&deadline_state->timer_mu); } // Callback and associated state for starting the timer after call stack @@ -187,10 +193,8 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec new_deadline) { grpc_deadline_state* deadline_state = elem->call_data; - gpr_mu_lock(&deadline_state->timer_mu); - cancel_timer_if_needed_locked(exec_ctx, deadline_state); - start_timer_if_needed_locked(exec_ctx, elem, new_deadline); - gpr_mu_unlock(&deadline_state->timer_mu); + cancel_timer_if_needed(exec_ctx, deadline_state); + start_timer_if_needed(exec_ctx, elem, new_deadline); } void grpc_deadline_state_client_start_transport_stream_op( diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index bd2b84f79e..94717f6bc7 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -35,16 +35,18 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/timer.h" +typedef enum grpc_deadline_timer_state { + GRPC_DEADLINE_STATE_INITIAL, + GRPC_DEADLINE_STATE_PENDING, + GRPC_DEADLINE_STATE_FINISHED +} grpc_deadline_timer_state; + // State used for filters that enforce call deadlines. // Must be the first field in the filter's call_data. typedef struct grpc_deadline_state { // We take a reference to the call stack for the timer callback. grpc_call_stack* call_stack; - // Guards access to timer_pending and timer. - gpr_mu timer_mu; - // True if the timer callback is currently pending. - bool timer_pending; - // The deadline timer. + gpr_atm timer_state; grpc_timer timer; grpc_closure timer_callback; // Closure to invoke when the call is complete. diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 8a5617e7c1..6d638bcbaa 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -180,25 +180,25 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; timer->deadline = deadline; - timer->triggered = 0; if (!g_initialized) { - timer->triggered = 1; + timer->pending = false; grpc_closure_sched( exec_ctx, timer->closure, GRPC_ERROR_CREATE("Attempt to create timer before initialization")); return; } + gpr_mu_lock(&shard->mu); + timer->pending = true; if (gpr_time_cmp(deadline, now) <= 0) { - timer->triggered = 1; + timer->pending = false; grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE); + gpr_mu_unlock(&shard->mu); + /* early out */ return; } - /* TODO(ctiller): check deadline expired */ - - gpr_mu_lock(&shard->mu); grpc_time_averaged_stats_add_sample(&shard->stats, ts_to_dbl(gpr_time_sub(deadline, now))); if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) { @@ -243,9 +243,9 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; gpr_mu_lock(&shard->mu); - if (!timer->triggered) { + if (timer->pending) { grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); - timer->triggered = 1; + timer->pending = false; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); } else { @@ -296,7 +296,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { } timer = grpc_timer_heap_top(&shard->heap); if (gpr_time_cmp(timer->deadline, now) > 0) return NULL; - timer->triggered = 1; + timer->pending = false; grpc_timer_heap_pop(&shard->heap); return timer; } diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h index 9d901c7e68..1608dce9fb 100644 --- a/src/core/lib/iomgr/timer_generic.h +++ b/src/core/lib/iomgr/timer_generic.h @@ -40,7 +40,7 @@ struct grpc_timer { gpr_timespec deadline; uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */ - int triggered; + bool pending; struct grpc_timer *next; struct grpc_timer *prev; grpc_closure *closure; diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index fa2cdee964..f28a14405d 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -53,8 +53,8 @@ static void stop_uv_timer(uv_timer_t *handle) { void run_expired_timer(uv_timer_t *handle) { grpc_timer *timer = (grpc_timer *)handle->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_ASSERT(!timer->triggered); - timer->triggered = 1; + GPR_ASSERT(timer->pending); + timer->pending = 0; grpc_closure_sched(&exec_ctx, timer->closure, GRPC_ERROR_NONE); stop_uv_timer(handle); grpc_exec_ctx_finish(&exec_ctx); @@ -67,11 +67,11 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, uv_timer_t *uv_timer; timer->closure = closure; if (gpr_time_cmp(deadline, now) <= 0) { - timer->triggered = 1; + timer->pending = 0; grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE); return; } - timer->triggered = 0; + timer->pending = 1; timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); uv_timer = gpr_malloc(sizeof(uv_timer_t)); uv_timer_init(uv_default_loop(), uv_timer); @@ -81,8 +81,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { - if (!timer->triggered) { - timer->triggered = 1; + if (timer->pending) { + timer->pending = 0; grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); stop_uv_timer((uv_timer_t *)timer->uv_timer); } diff --git a/src/core/lib/iomgr/timer_uv.h b/src/core/lib/iomgr/timer_uv.h index 13cf8bd4fa..9870cd4a5c 100644 --- a/src/core/lib/iomgr/timer_uv.h +++ b/src/core/lib/iomgr/timer_uv.h @@ -41,7 +41,7 @@ struct grpc_timer { /* This is actually a uv_timer_t*, but we want to keep platform-specific types out of headers */ void *uv_timer; - int triggered; + int pending; }; #endif /* GRPC_CORE_LIB_IOMGR_TIMER_UV_H */ |