aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-05-24 10:50:40 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-05-24 10:50:40 -0700
commitc9d4b81dabf4dc2c262fc771c31790f2d60fe551 (patch)
tree421870f38c26dc012a2d9a07cb51dc169e675067 /src
parente2c1040f9c52d0fea341f390bce860ae2e3c85cc (diff)
Add the ability to run some action when the lock becomes idle
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/iomgr/async_execution_lock.c94
-rw-r--r--src/core/lib/iomgr/async_execution_lock.h4
2 files changed, 80 insertions, 18 deletions
diff --git a/src/core/lib/iomgr/async_execution_lock.c b/src/core/lib/iomgr/async_execution_lock.c
index 96ba175a5a..c656e7c940 100644
--- a/src/core/lib/iomgr/async_execution_lock.c
+++ b/src/core/lib/iomgr/async_execution_lock.c
@@ -38,6 +38,9 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#define STATE_BIT_ALIVE 1
+#define STATE_BIT_REFS 2
+
typedef struct grpc_aelock_qnode {
gpr_mpscq_node mpscq_node;
grpc_aelock_action action;
@@ -50,17 +53,24 @@ struct grpc_aelock {
// state is:
// lower bit - zero if orphaned
// other bits - number of items queued on the lock
+ // see: STATE_BIT_xxx
gpr_atm state;
+ grpc_aelock_action before_idle_action;
+ void *before_idle_action_arg;
grpc_closure continue_finishing;
};
static void continue_finishing(grpc_exec_ctx *exec_ctx, void *arg,
bool success);
-grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue) {
+grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue,
+ grpc_aelock_action before_idle_action,
+ void *before_idle_action_arg) {
grpc_aelock *lock = gpr_malloc(sizeof(*lock));
+ lock->before_idle_action = before_idle_action;
+ lock->before_idle_action_arg = before_idle_action_arg;
lock->optional_workqueue = optional_workqueue;
- gpr_atm_no_barrier_store(&lock->state, 1);
+ gpr_atm_no_barrier_store(&lock->state, STATE_BIT_ALIVE);
gpr_mpscq_init(&lock->queue);
grpc_closure_init(&lock->continue_finishing, continue_finishing, lock);
return lock;
@@ -73,7 +83,8 @@ static void really_destroy(grpc_aelock *lock) {
}
void grpc_aelock_destroy(grpc_aelock *lock) {
- if (gpr_atm_full_fetch_add(&lock->state, -1) == 1) {
+ if (gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_ALIVE) ==
+ STATE_BIT_ALIVE) {
really_destroy(lock);
}
}
@@ -81,10 +92,6 @@ void grpc_aelock_destroy(grpc_aelock *lock) {
static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) {
gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
if (n == NULL) {
- // queue is in an inconsistant state: use this as a cue that we should
- // go off and do something else for a while (and come back later)
- grpc_exec_ctx_enqueue(exec_ctx, &lock->continue_finishing, true,
- lock->optional_workqueue);
return false;
}
grpc_aelock_qnode *ln = (grpc_aelock_qnode *)n;
@@ -94,36 +101,89 @@ static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) {
}
static void finish(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) {
- do {
- switch (gpr_atm_full_fetch_add(&lock->state, -2)) {
- case 3: // had one count, one unorphaned --> unlocked unorphaned
+ for (;;) {
+ gpr_atm last_state = gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_REFS);
+ switch (last_state) {
+ default:
+ perform_one_step:
+ gpr_log(GPR_DEBUG, "ls=%d execute", last_state);
+ if (!maybe_finish_one(exec_ctx, lock)) {
+ // perform the idle action before going off to do something else
+ lock->before_idle_action(exec_ctx, lock->before_idle_action_arg);
+ // quick peek to see if we can immediately resume
+ if (!maybe_finish_one(exec_ctx, lock)) {
+ // queue is in an inconsistant state: use this as a cue that we
+ // should
+ // go off and do something else for a while (and come back later)
+ grpc_exec_ctx_enqueue(exec_ctx, &lock->continue_finishing, true,
+ lock->optional_workqueue);
+ return;
+ }
+ }
+ break;
+ case STATE_BIT_ALIVE | (2 * STATE_BIT_REFS):
+ gpr_log(GPR_DEBUG, "ls=%d final", last_state);
+ lock->before_idle_action(exec_ctx, lock->before_idle_action_arg);
+ switch (gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_REFS)) {
+ case STATE_BIT_ALIVE | STATE_BIT_REFS:
+ return;
+ case STATE_BIT_REFS:
+ really_destroy(lock);
+ return;
+ default:
+ gpr_log(GPR_DEBUG, "retry");
+ // oops: did the before action, but something else came in
+ // better add another ref so we remember to do this again
+ gpr_atm_full_fetch_add(&lock->state, STATE_BIT_REFS);
+ goto perform_one_step;
+ }
+ break;
+ case STATE_BIT_ALIVE | STATE_BIT_REFS:
+ gpr_log(GPR_DEBUG, "ls=%d unlock", last_state);
return;
- case 2: // and one count, one orphaned --> unlocked and orphaned
+ case 2 * STATE_BIT_REFS:
+ gpr_log(GPR_DEBUG, "ls=%d idle", last_state);
+ lock->before_idle_action(exec_ctx, lock->before_idle_action_arg);
+ GPR_ASSERT(gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_REFS) ==
+ STATE_BIT_REFS);
+ case STATE_BIT_REFS:
+ gpr_log(GPR_DEBUG, "ls=%d destroy", last_state);
really_destroy(lock);
return;
- case 1:
+ case STATE_BIT_ALIVE:
case 0:
// these values are illegal - representing an already unlocked or
// deleted lock
GPR_UNREACHABLE_CODE(return );
}
- } while (maybe_finish_one(exec_ctx, lock));
+ }
+
+ // while (maybe_finish_one(exec_ctx, lock));
}
static void continue_finishing(grpc_exec_ctx *exec_ctx, void *arg,
bool success) {
- if (maybe_finish_one(exec_ctx, arg)) finish(exec_ctx, arg);
+ grpc_aelock *lock = arg;
+ if (maybe_finish_one(exec_ctx, lock)) {
+ finish(exec_ctx, lock);
+ } else {
+ // queue is in an inconsistant state: use this as a cue that we should
+ // go off and do something else for a while (and come back later)
+ grpc_exec_ctx_enqueue(exec_ctx, &lock->continue_finishing, true,
+ lock->optional_workqueue);
+ }
}
void grpc_aelock_execute(grpc_exec_ctx *exec_ctx, grpc_aelock *lock,
grpc_aelock_action action, void *arg,
size_t sizeof_arg) {
- gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
- GPR_ASSERT(last & 1); // ensure lock has not been destroyed
- if (last == 1) {
+ gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2 * STATE_BIT_REFS);
+ GPR_ASSERT(last & STATE_BIT_ALIVE); // ensure lock has not been destroyed
+ if (last == STATE_BIT_ALIVE) {
action(exec_ctx, arg);
finish(exec_ctx, lock);
} else {
+ gpr_atm_full_fetch_add(&lock->state, -STATE_BIT_REFS);
grpc_aelock_qnode *n = gpr_malloc(sizeof(*n) + sizeof_arg);
n->action = action;
if (sizeof_arg > 0) {
diff --git a/src/core/lib/iomgr/async_execution_lock.h b/src/core/lib/iomgr/async_execution_lock.h
index e52762f443..ea7a322fae 100644
--- a/src/core/lib/iomgr/async_execution_lock.h
+++ b/src/core/lib/iomgr/async_execution_lock.h
@@ -51,7 +51,9 @@ typedef void (*grpc_aelock_action)(grpc_exec_ctx *exec_ctx, void *arg);
// Initialize the lock, with an optional workqueue to shift load to when
// necessary
-grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue);
+grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue,
+ grpc_aelock_action before_idle_action,
+ void *before_idle_action_arg);
// Destroy the lock
void grpc_aelock_destroy(grpc_aelock *lock);
// Execute \a action within the lock. \a arg is the argument to pass to \a