aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/completion_queue.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-06-01 16:08:58 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-06-01 16:08:58 -0700
commitcaf8ea984bc6dc9ccca9b6895d6233e0539aae02 (patch)
tree33dd77f2832f3e898e18013ffcb6237c49bc85ad /src/core/lib/surface/completion_queue.c
parent38338e9922ce1a819a0a977914fa402780b2213f (diff)
Fix race condition
Diffstat (limited to 'src/core/lib/surface/completion_queue.c')
-rw-r--r--src/core/lib/surface/completion_queue.c105
1 files changed, 57 insertions, 48 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 3d3da15333..672ee935e4 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -239,15 +239,13 @@ typedef struct cq_next_data {
/** Completed events for completion-queues of type GRPC_CQ_NEXT */
grpc_cq_event_queue queue;
- /** Number of pending events (+1 if we're not shutdown) */
- gpr_refcount pending_events;
-
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
- /** 0 initially, 1 once we've begun shutting down */
- gpr_atm shutdown;
+ /* Number of outstanding events (+1 if not shut down) */
+ gpr_atm pending_events;
+
int shutdown_called;
} cq_next_data;
@@ -448,9 +446,8 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
static void cq_init_next(void *ptr) {
cq_next_data *cqd = ptr;
/* Initial ref is dropped by grpc_completion_queue_shutdown */
- gpr_ref_init(&cqd->pending_events, 1);
- gpr_atm_no_barrier_store(&cqd->shutdown, 0);
- cqd->shutdown_called = 0;
+ gpr_atm_no_barrier_store(&cqd->pending_events, 1);
+ cqd->shutdown_called = false;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
cq_event_queue_init(&cqd->queue);
}
@@ -530,7 +527,7 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
cq_next_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(!cqd->shutdown_called);
- gpr_ref(&cqd->pending_events);
+ gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1);
}
static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
@@ -619,9 +616,10 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
/* Add the completion to the queue */
bool is_first = cq_event_queue_push(&cqd->queue, storage);
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
- bool shutdown = gpr_unref(&cqd->pending_events);
+ bool will_definitely_shutdown =
+ gpr_atm_no_barrier_load(&cqd->pending_events) == 1;
- if (!shutdown) {
+ if (!will_definitely_shutdown) {
/* Only kick if this is the first item queued */
if (is_first) {
gpr_mu_lock(cq->mu);
@@ -635,10 +633,20 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(kick_error);
}
}
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_mu_lock(cq->mu);
+ cq_finish_shutdown_next(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ }
} else {
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_atm_rel_store(&cqd->pending_events, 0);
gpr_mu_lock(cq->mu);
cq_finish_shutdown_next(exec_ctx, cq);
gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
}
GPR_TIMER_END("cq_end_op_for_next", 0);
@@ -852,7 +860,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
}
}
- if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
+ if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) {
/* Before returning, check if the queue has any items left over (since
gpr_mpscq_pop() can sometimes return NULL even if the queue is not
empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
@@ -903,7 +911,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
if (cq_event_queue_num_items(&cqd->queue) > 0 &&
- gpr_atm_no_barrier_load(&cqd->shutdown) == 0) {
+ gpr_atm_no_barrier_load(&cqd->pending_events) > 0) {
gpr_mu_lock(cq->mu);
cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL);
gpr_mu_unlock(cq->mu);
@@ -914,6 +922,42 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
return ret;
}
+/* Finishes the completion queue shutdown. This means that there are no more
+ completion events / tags expected from the completion queue
+ - Must be called under completion queue lock
+ - Must be called only once in completion queue's lifetime
+ - grpc_completion_queue_shutdown() MUST have been called before calling
+ this function */
+static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+
+ GPR_ASSERT(cqd->shutdown_called);
+ GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
+
+ cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
+ &cq->pollset_shutdown_done);
+}
+
+static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_mu_lock(cq->mu);
+ if (cqd->shutdown_called) {
+ gpr_mu_unlock(cq->mu);
+ GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
+ return;
+ }
+ cqd->shutdown_called = 1;
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ cq_finish_shutdown_next(exec_ctx, cq);
+ }
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+}
+
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline, void *reserved) {
return cq->vtable->next(cq, deadline, reserved);
@@ -1106,24 +1150,6 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
return cq->vtable->pluck(cq, tag, deadline, reserved);
}
-/* Finishes the completion queue shutdown. This means that there are no more
- completion events / tags expected from the completion queue
- - Must be called under completion queue lock
- - Must be called only once in completion queue's lifetime
- - grpc_completion_queue_shutdown() MUST have been called before calling
- this function */
-static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
-
- GPR_ASSERT(cqd->shutdown_called);
- GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
- gpr_atm_no_barrier_store(&cqd->shutdown, 1);
-
- cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
- &cq->pollset_shutdown_done);
-}
-
static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq);
@@ -1136,23 +1162,6 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
&cq->pollset_shutdown_done);
}
-static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
-
- gpr_mu_lock(cq->mu);
- if (cqd->shutdown_called) {
- gpr_mu_unlock(cq->mu);
- GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
- return;
- }
- cqd->shutdown_called = 1;
- if (gpr_unref(&cqd->pending_events)) {
- cq_finish_shutdown_next(exec_ctx, cq);
- }
- gpr_mu_unlock(cq->mu);
-}
-
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq);