diff options
author | Craig Tiller <ctiller@google.com> | 2017-06-01 16:08:58 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-06-01 16:08:58 -0700 |
commit | caf8ea984bc6dc9ccca9b6895d6233e0539aae02 (patch) | |
tree | 33dd77f2832f3e898e18013ffcb6237c49bc85ad /src/core/lib/surface/completion_queue.c | |
parent | 38338e9922ce1a819a0a977914fa402780b2213f (diff) |
Fix race condition
Diffstat (limited to 'src/core/lib/surface/completion_queue.c')
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 105 |
1 files changed, 57 insertions, 48 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 3d3da15333..672ee935e4 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -239,15 +239,13 @@ typedef struct cq_next_data { /** Completed events for completion-queues of type GRPC_CQ_NEXT */ grpc_cq_event_queue queue; - /** Number of pending events (+1 if we're not shutdown) */ - gpr_refcount pending_events; - /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; - /** 0 initially, 1 once we've begun shutting down */ - gpr_atm shutdown; + /* Number of outstanding events (+1 if not shut down) */ + gpr_atm pending_events; + int shutdown_called; } cq_next_data; @@ -448,9 +446,8 @@ grpc_completion_queue *grpc_completion_queue_create_internal( static void cq_init_next(void *ptr) { cq_next_data *cqd = ptr; /* Initial ref is dropped by grpc_completion_queue_shutdown */ - gpr_ref_init(&cqd->pending_events, 1); - gpr_atm_no_barrier_store(&cqd->shutdown, 0); - cqd->shutdown_called = 0; + gpr_atm_no_barrier_store(&cqd->pending_events, 1); + cqd->shutdown_called = false; gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); cq_event_queue_init(&cqd->queue); } @@ -530,7 +527,7 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(!cqd->shutdown_called); - gpr_ref(&cqd->pending_events); + gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1); } static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { @@ -619,9 +616,10 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, /* Add the completion to the queue */ bool is_first = cq_event_queue_push(&cqd->queue, storage); gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - bool shutdown = gpr_unref(&cqd->pending_events); + bool will_definitely_shutdown = + gpr_atm_no_barrier_load(&cqd->pending_events) == 1; - if (!shutdown) { + if (!will_definitely_shutdown) { /* Only kick if this is the first item queued */ if (is_first) { gpr_mu_lock(cq->mu); @@ -635,10 +633,20 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(kick_error); } } + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + } } else { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_atm_rel_store(&cqd->pending_events, 0); gpr_mu_lock(cq->mu); cq_finish_shutdown_next(exec_ctx, cq); gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } GPR_TIMER_END("cq_end_op_for_next", 0); @@ -852,7 +860,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, } } - if (gpr_atm_no_barrier_load(&cqd->shutdown)) { + if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) { /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ @@ -903,7 +911,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, GPR_ASSERT(is_finished_arg.stolen_completion == NULL); if (cq_event_queue_num_items(&cqd->queue) > 0 && - gpr_atm_no_barrier_load(&cqd->shutdown) == 0) { + gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { gpr_mu_lock(cq->mu); cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); gpr_mu_unlock(cq->mu); @@ -914,6 +922,42 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, return ret; } +/* Finishes the completion queue shutdown. This means that there are no more + completion events / tags expected from the completion queue + - Must be called under completion queue lock + - Must be called only once in completion queue's lifetime + - grpc_completion_queue_shutdown() MUST have been called before calling + this function */ +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GPR_ASSERT(cqd->shutdown_called); + GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); + + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); +} + +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + if (cqd->shutdown_called) { + gpr_mu_unlock(cq->mu); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); + return; + } + cqd->shutdown_called = 1; + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + cq_finish_shutdown_next(exec_ctx, cq); + } + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); +} + grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { return cq->vtable->next(cq, deadline, reserved); @@ -1106,24 +1150,6 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, return cq->vtable->pluck(cq, tag, deadline, reserved); } -/* Finishes the completion queue shutdown. This means that there are no more - completion events / tags expected from the completion queue - - Must be called under completion queue lock - - Must be called only once in completion queue's lifetime - - grpc_completion_queue_shutdown() MUST have been called before calling - this function */ -static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq) { - cq_next_data *cqd = DATA_FROM_CQ(cq); - - GPR_ASSERT(cqd->shutdown_called); - GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); - gpr_atm_no_barrier_store(&cqd->shutdown, 1); - - cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), - &cq->pollset_shutdown_done); -} - static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { cq_pluck_data *cqd = DATA_FROM_CQ(cq); @@ -1136,23 +1162,6 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, &cq->pollset_shutdown_done); } -static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq) { - cq_next_data *cqd = DATA_FROM_CQ(cq); - - gpr_mu_lock(cq->mu); - if (cqd->shutdown_called) { - gpr_mu_unlock(cq->mu); - GPR_TIMER_END("grpc_completion_queue_shutdown", 0); - return; - } - cqd->shutdown_called = 1; - if (gpr_unref(&cqd->pending_events)) { - cq_finish_shutdown_next(exec_ctx, cq); - } - gpr_mu_unlock(cq->mu); -} - static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { cq_pluck_data *cqd = DATA_FROM_CQ(cq); |