aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/lib/iomgr/combiner.c9
-rw-r--r--src/core/lib/iomgr/combiner.h2
-rw-r--r--src/core/lib/support/mpscq.c11
-rw-r--r--src/core/lib/support/mpscq.h4
-rw-r--r--test/core/iomgr/combiner_test.c3
5 files changed, 19 insertions, 10 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index eb5ad634bd..831bdb4aff 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -176,8 +176,10 @@ static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n));
GPR_ASSERT(exec_ctx->active_combiner == lock);
if (n == NULL) {
- // queue is in an inconsistant state: use this as a cue that we should
- // go off and do something else for a while (and come back later)
+ // Queue is in an transiently inconsistent state: a new item is being queued
+ // but is not visible to this thread yet.
+ // Use this as a cue that we should go off and do something else for a while
+ // (and come back later)
grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline,
lock);
grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
@@ -204,6 +206,9 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
"C:%p finish[%d] old_state=%" PRIdPTR, lock,
loops, old_state));
switch (old_state) {
+ default:
+ // we have multiple queued work items: just continue executing them
+ break;
case 5: // we're down to one queued item: if it's the final list we
case 4: // should do that
if (!grpc_closure_list_empty(lock->final_list)) {
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index 57df8f0ba8..08acbb7441 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -41,7 +41,7 @@
#include "src/core/lib/support/mpscq.h"
// Provides serialized access to some resource.
-// Each action queued on an aelock is executed serially in a borrowed thread.
+// Each action queued on a combiner is executed serially in a borrowed thread.
// The actual thread executing actions may change over time (but there will only
// every be one at a time).
diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c
index 25b055b172..cdd6335f82 100644
--- a/src/core/lib/support/mpscq.c
+++ b/src/core/lib/support/mpscq.c
@@ -38,7 +38,7 @@
void gpr_mpscq_init(gpr_mpscq *q) {
gpr_atm_no_barrier_store(&q->head, (gpr_atm)&q->stub);
q->tail = &q->stub;
- gpr_atm_no_barrier_store(&q->stub.next, 0);
+ gpr_atm_no_barrier_store(&q->stub.next, (gpr_atm)NULL);
}
void gpr_mpscq_destroy(gpr_mpscq *q) {
@@ -47,16 +47,17 @@ void gpr_mpscq_destroy(gpr_mpscq *q) {
}
void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
- gpr_atm_no_barrier_store(&n->next, 0);
+ gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
gpr_mpscq_node *prev =
(gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
- gpr_atm_rel_store(&prev->next, (gpr_atm)n);
+ gpr_atm_no_barrier_store(&prev->next, (gpr_atm)n);
}
gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
gpr_mpscq_node *tail = q->tail;
gpr_mpscq_node *next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next);
if (tail == &q->stub) {
+ // indicates the list is actually (ephemerally) empty
if (next == NULL) return NULL;
q->tail = next;
tail = next;
@@ -68,7 +69,8 @@ gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
}
gpr_mpscq_node *head = (gpr_mpscq_node *)gpr_atm_acq_load(&q->head);
if (tail != head) {
- return 0;
+ // indicates a retry is in order: we're still adding
+ return NULL;
}
gpr_mpscq_push(q, &q->stub);
next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next);
@@ -76,5 +78,6 @@ gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
q->tail = next;
return tail;
}
+ // indicates a retry is in order: we're still adding
return NULL;
}
diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h
index 1201edceb1..977a117952 100644
--- a/src/core/lib/support/mpscq.h
+++ b/src/core/lib/support/mpscq.h
@@ -41,8 +41,8 @@
// implementation from Dmitry Vyukov here:
// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
-// List node (include this in a data structure and dangle the rest of the
-// interesting bits off the end)
+// List node (include this in a data structure at the top, and add application
+// fields after it - to simulate inheritance)
typedef struct gpr_mpscq_node { gpr_atm next; } gpr_mpscq_node;
// Actual queue type
diff --git a/test/core/iomgr/combiner_test.c b/test/core/iomgr/combiner_test.c
index 7cf016d82c..197998c1e5 100644
--- a/test/core/iomgr/combiner_test.c
+++ b/test/core/iomgr/combiner_test.c
@@ -80,7 +80,6 @@ typedef struct {
static void check_one(grpc_exec_ctx *exec_ctx, void *a, grpc_error *error) {
ex_args *args = a;
- // gpr_log(GPR_DEBUG, "*%p=%d; step %d", args->ctr, *args->ctr, args->value);
GPR_ASSERT(*args->ctr == args->value - 1);
*args->ctr = args->value;
gpr_free(a);
@@ -99,6 +98,8 @@ static void execute_many_loop(void *a) {
grpc_closure_create(check_one, c), GRPC_ERROR_NONE);
grpc_exec_ctx_flush(&exec_ctx);
}
+ // sleep for a little bit, to test a combiner draining and another thread
+ // picking it up
gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100));
}
grpc_exec_ctx_finish(&exec_ctx);