diff options
author | Craig Tiller <ctiller@google.com> | 2016-05-24 10:50:40 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-05-24 10:50:40 -0700 |
commit | c9d4b81dabf4dc2c262fc771c31790f2d60fe551 (patch) | |
tree | 421870f38c26dc012a2d9a07cb51dc169e675067 /src | |
parent | e2c1040f9c52d0fea341f390bce860ae2e3c85cc (diff) |
Add the ability to run some action when the lock becomes idle
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/iomgr/async_execution_lock.c | 94 | ||||
-rw-r--r-- | src/core/lib/iomgr/async_execution_lock.h | 4 |
2 files changed, 80 insertions, 18 deletions
diff --git a/src/core/lib/iomgr/async_execution_lock.c b/src/core/lib/iomgr/async_execution_lock.c index 96ba175a5a..c656e7c940 100644 --- a/src/core/lib/iomgr/async_execution_lock.c +++ b/src/core/lib/iomgr/async_execution_lock.c @@ -38,6 +38,9 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#define STATE_BIT_ALIVE 1 +#define STATE_BIT_REFS 2 + typedef struct grpc_aelock_qnode { gpr_mpscq_node mpscq_node; grpc_aelock_action action; @@ -50,17 +53,24 @@ struct grpc_aelock { // state is: // lower bit - zero if orphaned // other bits - number of items queued on the lock + // see: STATE_BIT_xxx gpr_atm state; + grpc_aelock_action before_idle_action; + void *before_idle_action_arg; grpc_closure continue_finishing; }; static void continue_finishing(grpc_exec_ctx *exec_ctx, void *arg, bool success); -grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue) { +grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue, + grpc_aelock_action before_idle_action, + void *before_idle_action_arg) { grpc_aelock *lock = gpr_malloc(sizeof(*lock)); + lock->before_idle_action = before_idle_action; + lock->before_idle_action_arg = before_idle_action_arg; lock->optional_workqueue = optional_workqueue; - gpr_atm_no_barrier_store(&lock->state, 1); + gpr_atm_no_barrier_store(&lock->state, STATE_BIT_ALIVE); gpr_mpscq_init(&lock->queue); grpc_closure_init(&lock->continue_finishing, continue_finishing, lock); return lock; @@ -73,7 +83,8 @@ static void really_destroy(grpc_aelock *lock) { } void grpc_aelock_destroy(grpc_aelock *lock) { - if (gpr_atm_full_fetch_add(&lock->state, -1) == 1) { + if (gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_ALIVE) == + STATE_BIT_ALIVE) { really_destroy(lock); } } @@ -81,10 +92,6 @@ void grpc_aelock_destroy(grpc_aelock *lock) { static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) { gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue); if (n == NULL) { - // queue is in an inconsistant state: use this as a cue that we should - // go off and do something else for a while (and come back later) - grpc_exec_ctx_enqueue(exec_ctx, &lock->continue_finishing, true, - lock->optional_workqueue); return false; } grpc_aelock_qnode *ln = (grpc_aelock_qnode *)n; @@ -94,36 +101,89 @@ static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) { } static void finish(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) { - do { - switch (gpr_atm_full_fetch_add(&lock->state, -2)) { - case 3: // had one count, one unorphaned --> unlocked unorphaned + for (;;) { + gpr_atm last_state = gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_REFS); + switch (last_state) { + default: + perform_one_step: + gpr_log(GPR_DEBUG, "ls=%d execute", last_state); + if (!maybe_finish_one(exec_ctx, lock)) { + // perform the idle action before going off to do something else + lock->before_idle_action(exec_ctx, lock->before_idle_action_arg); + // quick peek to see if we can immediately resume + if (!maybe_finish_one(exec_ctx, lock)) { + // queue is in an inconsistant state: use this as a cue that we + // should + // go off and do something else for a while (and come back later) + grpc_exec_ctx_enqueue(exec_ctx, &lock->continue_finishing, true, + lock->optional_workqueue); + return; + } + } + break; + case STATE_BIT_ALIVE | (2 * STATE_BIT_REFS): + gpr_log(GPR_DEBUG, "ls=%d final", last_state); + lock->before_idle_action(exec_ctx, lock->before_idle_action_arg); + switch (gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_REFS)) { + case STATE_BIT_ALIVE | STATE_BIT_REFS: + return; + case STATE_BIT_REFS: + really_destroy(lock); + return; + default: + gpr_log(GPR_DEBUG, "retry"); + // oops: did the before action, but something else came in + // better add another ref so we remember to do this again + gpr_atm_full_fetch_add(&lock->state, STATE_BIT_REFS); + goto perform_one_step; + } + break; + case STATE_BIT_ALIVE | STATE_BIT_REFS: + gpr_log(GPR_DEBUG, "ls=%d unlock", last_state); return; - case 2: // and one count, one orphaned --> unlocked and orphaned + case 2 * STATE_BIT_REFS: + gpr_log(GPR_DEBUG, "ls=%d idle", last_state); + lock->before_idle_action(exec_ctx, lock->before_idle_action_arg); + GPR_ASSERT(gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_REFS) == + STATE_BIT_REFS); + case STATE_BIT_REFS: + gpr_log(GPR_DEBUG, "ls=%d destroy", last_state); really_destroy(lock); return; - case 1: + case STATE_BIT_ALIVE: case 0: // these values are illegal - representing an already unlocked or // deleted lock GPR_UNREACHABLE_CODE(return ); } - } while (maybe_finish_one(exec_ctx, lock)); + } + + // while (maybe_finish_one(exec_ctx, lock)); } static void continue_finishing(grpc_exec_ctx *exec_ctx, void *arg, bool success) { - if (maybe_finish_one(exec_ctx, arg)) finish(exec_ctx, arg); + grpc_aelock *lock = arg; + if (maybe_finish_one(exec_ctx, lock)) { + finish(exec_ctx, lock); + } else { + // queue is in an inconsistant state: use this as a cue that we should + // go off and do something else for a while (and come back later) + grpc_exec_ctx_enqueue(exec_ctx, &lock->continue_finishing, true, + lock->optional_workqueue); + } } void grpc_aelock_execute(grpc_exec_ctx *exec_ctx, grpc_aelock *lock, grpc_aelock_action action, void *arg, size_t sizeof_arg) { - gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); - GPR_ASSERT(last & 1); // ensure lock has not been destroyed - if (last == 1) { + gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2 * STATE_BIT_REFS); + GPR_ASSERT(last & STATE_BIT_ALIVE); // ensure lock has not been destroyed + if (last == STATE_BIT_ALIVE) { action(exec_ctx, arg); finish(exec_ctx, lock); } else { + gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_REFS); grpc_aelock_qnode *n = gpr_malloc(sizeof(*n) + sizeof_arg); n->action = action; if (sizeof_arg > 0) { diff --git a/src/core/lib/iomgr/async_execution_lock.h b/src/core/lib/iomgr/async_execution_lock.h index e52762f443..ea7a322fae 100644 --- a/src/core/lib/iomgr/async_execution_lock.h +++ b/src/core/lib/iomgr/async_execution_lock.h @@ -51,7 +51,9 @@ typedef void (*grpc_aelock_action)(grpc_exec_ctx *exec_ctx, void *arg); // Initialize the lock, with an optional workqueue to shift load to when // necessary -grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue); +grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue, + grpc_aelock_action before_idle_action, + void *before_idle_action_arg); // Destroy the lock void grpc_aelock_destroy(grpc_aelock *lock); // Execute \a action within the lock. \a arg is the argument to pass to \a |