diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/iomgr/async_execution_lock.c | 137 | ||||
-rw-r--r-- | src/core/lib/iomgr/async_execution_lock.h | 63 | ||||
-rw-r--r-- | src/core/lib/support/mpscq.c | 80 | ||||
-rw-r--r-- | src/core/lib/support/mpscq.h | 65 |
4 files changed, 345 insertions, 0 deletions
diff --git a/src/core/lib/iomgr/async_execution_lock.c b/src/core/lib/iomgr/async_execution_lock.c new file mode 100644 index 0000000000..96ba175a5a --- /dev/null +++ b/src/core/lib/iomgr/async_execution_lock.c @@ -0,0 +1,137 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/async_execution_lock.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +typedef struct grpc_aelock_qnode { + gpr_mpscq_node mpscq_node; + grpc_aelock_action action; + void *arg; +} grpc_aelock_qnode; + +struct grpc_aelock { + grpc_workqueue *optional_workqueue; + gpr_mpscq queue; + // state is: + // lower bit - zero if orphaned + // other bits - number of items queued on the lock + gpr_atm state; + grpc_closure continue_finishing; +}; + +static void continue_finishing(grpc_exec_ctx *exec_ctx, void *arg, + bool success); + +grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue) { + grpc_aelock *lock = gpr_malloc(sizeof(*lock)); + lock->optional_workqueue = optional_workqueue; + gpr_atm_no_barrier_store(&lock->state, 1); + gpr_mpscq_init(&lock->queue); + grpc_closure_init(&lock->continue_finishing, continue_finishing, lock); + return lock; +} + +static void really_destroy(grpc_aelock *lock) { + GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); + gpr_mpscq_destroy(&lock->queue); + gpr_free(lock); +} + +void grpc_aelock_destroy(grpc_aelock *lock) { + if (gpr_atm_full_fetch_add(&lock->state, -1) == 1) { + really_destroy(lock); + } +} + +static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) { + gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue); + if (n == NULL) { + // queue is in an inconsistant state: use this as a cue that we should + // go off and do something else for a while (and come back later) + grpc_exec_ctx_enqueue(exec_ctx, &lock->continue_finishing, true, + lock->optional_workqueue); + return false; + } + grpc_aelock_qnode *ln = (grpc_aelock_qnode *)n; + ln->action(exec_ctx, ln->arg); + gpr_free(ln); + return true; +} + +static void finish(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) { + do { + switch (gpr_atm_full_fetch_add(&lock->state, -2)) { + case 3: // had one count, one unorphaned --> unlocked unorphaned + return; + case 2: // and one count, one orphaned --> unlocked and orphaned + really_destroy(lock); + return; + case 1: + case 0: + // these values are illegal - representing an already unlocked or + // deleted lock + GPR_UNREACHABLE_CODE(return ); + } + } while (maybe_finish_one(exec_ctx, lock)); +} + +static void continue_finishing(grpc_exec_ctx *exec_ctx, void *arg, + bool success) { + if (maybe_finish_one(exec_ctx, arg)) finish(exec_ctx, arg); +} + +void grpc_aelock_execute(grpc_exec_ctx *exec_ctx, grpc_aelock *lock, + grpc_aelock_action action, void *arg, + size_t sizeof_arg) { + gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); + GPR_ASSERT(last & 1); // ensure lock has not been destroyed + if (last == 1) { + action(exec_ctx, arg); + finish(exec_ctx, lock); + } else { + grpc_aelock_qnode *n = gpr_malloc(sizeof(*n) + sizeof_arg); + n->action = action; + if (sizeof_arg > 0) { + memcpy(n + 1, arg, sizeof_arg); + n->arg = n + 1; + } else { + n->arg = arg; + } + gpr_mpscq_push(&lock->queue, &n->mpscq_node); + } +} diff --git a/src/core/lib/iomgr/async_execution_lock.h b/src/core/lib/iomgr/async_execution_lock.h new file mode 100644 index 0000000000..e52762f443 --- /dev/null +++ b/src/core/lib/iomgr/async_execution_lock.h @@ -0,0 +1,63 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_ASYNC_EXECUTION_LOCK_H +#define GRPC_CORE_LIB_IOMGR_ASYNC_EXECUTION_LOCK_H + +#include <stddef.h> + +#include <grpc/support/atm.h> +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/support/mpscq.h" + +typedef struct grpc_aelock grpc_aelock; + +// Provides serialized access to some resource. +// Each action queued on an aelock is executed serially in a borrowed thread. +// The actual thread executing actions may change over time (but there will only +// every be one at a time). + +typedef void (*grpc_aelock_action)(grpc_exec_ctx *exec_ctx, void *arg); + +// Initialize the lock, with an optional workqueue to shift load to when +// necessary +grpc_aelock *grpc_aelock_create(grpc_workqueue *optional_workqueue); +// Destroy the lock +void grpc_aelock_destroy(grpc_aelock *lock); +// Execute \a action within the lock. \a arg is the argument to pass to \a +// action and sizeof_arg is the sizeof(*arg), or 0 if arg is non-copyable. +void grpc_aelock_execute(grpc_exec_ctx *exec_ctx, grpc_aelock *lock, + grpc_aelock_action action, void *arg, + size_t sizeof_arg); + +#endif /* GRPC_CORE_LIB_IOMGR_ASYNC_EXECUTION_LOCK_H */ diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c new file mode 100644 index 0000000000..25b055b172 --- /dev/null +++ b/src/core/lib/support/mpscq.c @@ -0,0 +1,80 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/support/mpscq.h" + +#include <grpc/support/log.h> + +void gpr_mpscq_init(gpr_mpscq *q) { + gpr_atm_no_barrier_store(&q->head, (gpr_atm)&q->stub); + q->tail = &q->stub; + gpr_atm_no_barrier_store(&q->stub.next, 0); +} + +void gpr_mpscq_destroy(gpr_mpscq *q) { + GPR_ASSERT(gpr_atm_no_barrier_load(&q->head) == (gpr_atm)&q->stub); + GPR_ASSERT(q->tail == &q->stub); +} + +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { + gpr_atm_no_barrier_store(&n->next, 0); + gpr_mpscq_node *prev = + (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n); + gpr_atm_rel_store(&prev->next, (gpr_atm)n); +} + +gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { + gpr_mpscq_node *tail = q->tail; + gpr_mpscq_node *next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next); + if (tail == &q->stub) { + if (next == NULL) return NULL; + q->tail = next; + tail = next; + next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next); + } + if (next != NULL) { + q->tail = next; + return tail; + } + gpr_mpscq_node *head = (gpr_mpscq_node *)gpr_atm_acq_load(&q->head); + if (tail != head) { + return 0; + } + gpr_mpscq_push(q, &q->stub); + next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next); + if (next != NULL) { + q->tail = next; + return tail; + } + return NULL; +} diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h new file mode 100644 index 0000000000..1201edceb1 --- /dev/null +++ b/src/core/lib/support/mpscq.h @@ -0,0 +1,65 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_MPSCQ_H +#define GRPC_CORE_LIB_SUPPORT_MPSCQ_H + +#include <grpc/support/atm.h> +#include <stddef.h> + +// Multiple-producer single-consumer lock free queue, based upon the +// implementation from Dmitry Vyukov here: +// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue + +// List node (include this in a data structure and dangle the rest of the +// interesting bits off the end) +typedef struct gpr_mpscq_node { gpr_atm next; } gpr_mpscq_node; + +// Actual queue type +typedef struct gpr_mpscq { + gpr_atm head; + // make sure head & tail don't share a cacheline + char padding[GPR_CACHELINE_SIZE]; + gpr_mpscq_node *tail; + gpr_mpscq_node stub; +} gpr_mpscq; + +void gpr_mpscq_init(gpr_mpscq *q); +void gpr_mpscq_destroy(gpr_mpscq *q); +// Push a node +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); +// Pop a node (returns NULL if no node is ready - which doesn't indicate that +// the queue is empty!!) +gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q); + +#endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */ |