diff options
author | Sree Kuchibhotla <sreecha@users.noreply.github.com> | 2017-09-18 15:11:43 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-18 15:11:43 -0700 |
commit | a63a4f9685fbc357e495a9e0c482b761366888e0 (patch) | |
tree | 11ac48e03478f0d87683a69fbb779f8a01d72834 /src | |
parent | 1db64c0c60033f10b83b0e4201238fb16ed82608 (diff) | |
parent | 0dbfde67507abf83d46bb24098419ce5523cc653 (diff) |
Merge pull request #12564 from sreecha/exp-cq-fix
Fix TSAN failure in completion queue
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 6452f0894d..468360fbde 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -565,13 +565,13 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} * true if the increment was successful; false if the counter is zero */ static bool atm_inc_if_nonzero(gpr_atm *counter) { while (true) { - gpr_atm count = gpr_atm_no_barrier_load(counter); + gpr_atm count = gpr_atm_acq_load(counter); /* If zero, we are done. If not, we must to a CAS (instead of an atomic * increment) to maintain the contract: do not increment the counter if it * is zero. */ if (count == 0) { return false; - } else if (gpr_atm_no_barrier_cas(counter, count, count + 1)) { + } else if (gpr_atm_full_cas(counter, count, count + 1)) { break; } } @@ -643,8 +643,12 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, /* Add the completion to the queue */ bool is_first = cq_event_queue_push(&cqd->queue, storage); gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - bool will_definitely_shutdown = - gpr_atm_no_barrier_load(&cqd->pending_events) == 1; + + /* Since we do not hold the cq lock here, it is important to do an 'acquire' + load here (instead of a 'no_barrier' load) to match with the release store + (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next + */ + bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; if (!will_definitely_shutdown) { /* Only kick if this is the first item queued */ @@ -888,7 +892,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, } } - if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) { + if (gpr_atm_acq_load(&cqd->pending_events) == 0) { /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ @@ -934,7 +938,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, } if (cq_event_queue_num_items(&cqd->queue) > 0 && - gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { + gpr_atm_acq_load(&cqd->pending_events) > 0) { gpr_mu_lock(cq->mu); cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); gpr_mu_unlock(cq->mu); @@ -985,6 +989,9 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, return; } cqd->shutdown_called = true; + /* Doing a full_fetch_add (i.e acq/release) here to match with + * cq_begin_op_for_next and and cq_end_op_for_next functions which read/write + * on this counter without necessarily holding a lock on cq */ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { cq_finish_shutdown_next(exec_ctx, cq); } |