aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/completion_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/completion_queue.c')
-rw-r--r--src/core/surface/completion_queue.c35
1 files changed, 23 insertions, 12 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index f3c2453b5e..c1c97af337 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -67,6 +67,8 @@ struct grpc_completion_queue {
/* When refs drops to zero, we are in shutdown mode, and will be destroyable
once all queued events are drained */
gpr_refcount refs;
+ /* Once owning_refs drops to zero, we will destroy the cq */
+ gpr_refcount owning_refs;
/* the set of low level i/o things that concern this cq */
grpc_pollset pollset;
/* 0 initially, 1 once we've begun shutting down */
@@ -91,11 +93,29 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->refs, 1);
+ gpr_ref_init(&cc->owning_refs, 1);
grpc_pollset_init(&cc->pollset);
cc->allow_polling = 1;
return cc;
}
+void grpc_cq_internal_ref(grpc_completion_queue *cc) {
+ gpr_ref(&cc->owning_refs);
+}
+
+static void on_pollset_destroy_done(void *arg) {
+ grpc_completion_queue *cc = arg;
+ grpc_pollset_destroy(&cc->pollset);
+ gpr_free(cc);
+}
+
+void grpc_cq_internal_unref(grpc_completion_queue *cc) {
+ if (gpr_unref(&cc->owning_refs)) {
+ GPR_ASSERT(cc->queue == NULL);
+ grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
+ }
+}
+
void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
cc->allow_polling = 0;
}
@@ -135,7 +155,7 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
grpc_completion_type type) {
gpr_ref(&cc->refs);
- if (call) grpc_call_internal_ref(call);
+ if (call) GRPC_CALL_INTERNAL_REF(call, "cq");
#ifndef NDEBUG
gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1);
#endif
@@ -394,24 +414,15 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
}
}
-static void on_pollset_destroy_done(void *arg) {
- grpc_completion_queue *cc = arg;
- grpc_pollset_destroy(&cc->pollset);
- gpr_free(cc);
-}
-
void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- GPR_ASSERT(cc->queue == NULL);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
+ grpc_cq_internal_unref(cc);
}
void grpc_event_finish(grpc_event *base) {
event *ev = (event *)base;
ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
if (ev->base.call) {
- grpc_call_internal_unref(ev->base.call, 1);
+ GRPC_CALL_INTERNAL_UNREF(ev->base.call, "cq", 1);
}
gpr_free(ev);
}