diff options
author | Craig Tiller <ctiller@google.com> | 2016-09-16 12:11:11 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-16 12:11:11 -0700 |
commit | ee43d7bbb11a36c91c6f1ebff6bf3da70b7546b2 (patch) | |
tree | 46e7b607b14425dc575520f089f4e9373322f250 /src/core/lib/support | |
parent | d72df881d62a89c7bb55424740cea8dbb58c99f3 (diff) | |
parent | 57726ca5a9c08ba17c882a798b93e03d333edd9f (diff) |
Merge pull request #8069 from grpc/revert-8068-revert-7279-grand-unified-closures
Revert "Revert "Grand unified closures""
Diffstat (limited to 'src/core/lib/support')
-rw-r--r-- | src/core/lib/support/mpscq.c | 83 | ||||
-rw-r--r-- | src/core/lib/support/mpscq.h | 65 |
2 files changed, 148 insertions, 0 deletions
diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c new file mode 100644 index 0000000000..5b9323275a --- /dev/null +++ b/src/core/lib/support/mpscq.c @@ -0,0 +1,83 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/support/mpscq.h" + +#include <grpc/support/log.h> + +void gpr_mpscq_init(gpr_mpscq *q) { + gpr_atm_no_barrier_store(&q->head, (gpr_atm)&q->stub); + q->tail = &q->stub; + gpr_atm_no_barrier_store(&q->stub.next, (gpr_atm)NULL); +} + +void gpr_mpscq_destroy(gpr_mpscq *q) { + GPR_ASSERT(gpr_atm_no_barrier_load(&q->head) == (gpr_atm)&q->stub); + GPR_ASSERT(q->tail == &q->stub); +} + +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { + gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); + gpr_mpscq_node *prev = + (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n); + gpr_atm_rel_store(&prev->next, (gpr_atm)n); +} + +gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { + gpr_mpscq_node *tail = q->tail; + gpr_mpscq_node *next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next); + if (tail == &q->stub) { + // indicates the list is actually (ephemerally) empty + if (next == NULL) return NULL; + q->tail = next; + tail = next; + next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next); + } + if (next != NULL) { + q->tail = next; + return tail; + } + gpr_mpscq_node *head = (gpr_mpscq_node *)gpr_atm_acq_load(&q->head); + if (tail != head) { + // indicates a retry is in order: we're still adding + return NULL; + } + gpr_mpscq_push(q, &q->stub); + next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next); + if (next != NULL) { + q->tail = next; + return tail; + } + // indicates a retry is in order: we're still adding + return NULL; +} diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h new file mode 100644 index 0000000000..977a117952 --- /dev/null +++ b/src/core/lib/support/mpscq.h @@ -0,0 +1,65 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_MPSCQ_H +#define GRPC_CORE_LIB_SUPPORT_MPSCQ_H + +#include <grpc/support/atm.h> +#include <stddef.h> + +// Multiple-producer single-consumer lock free queue, based upon the +// implementation from Dmitry Vyukov here: +// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue + +// List node (include this in a data structure at the top, and add application +// fields after it - to simulate inheritance) +typedef struct gpr_mpscq_node { gpr_atm next; } gpr_mpscq_node; + +// Actual queue type +typedef struct gpr_mpscq { + gpr_atm head; + // make sure head & tail don't share a cacheline + char padding[GPR_CACHELINE_SIZE]; + gpr_mpscq_node *tail; + gpr_mpscq_node stub; +} gpr_mpscq; + +void gpr_mpscq_init(gpr_mpscq *q); +void gpr_mpscq_destroy(gpr_mpscq *q); +// Push a node +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); +// Pop a node (returns NULL if no node is ready - which doesn't indicate that +// the queue is empty!!) +gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q); + +#endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */ |