diff options
Diffstat (limited to 'src/core/surface/completion_queue.c')
-rw-r--r-- | src/core/surface/completion_queue.c | 51 |
1 files changed, 28 insertions, 23 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 029a4213c0..e0135d9fb9 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -67,6 +67,8 @@ struct grpc_completion_queue { /* When refs drops to zero, we are in shutdown mode, and will be destroyable once all queued events are drained */ gpr_refcount refs; + /* Once owning_refs drops to zero, we will destroy the cq */ + gpr_refcount owning_refs; /* the set of low level i/o things that concern this cq */ grpc_pollset pollset; /* 0 initially, 1 once we've begun shutting down */ @@ -91,11 +93,29 @@ grpc_completion_queue *grpc_completion_queue_create(void) { memset(cc, 0, sizeof(*cc)); /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->refs, 1); + gpr_ref_init(&cc->owning_refs, 1); grpc_pollset_init(&cc->pollset); cc->allow_polling = 1; return cc; } +void grpc_cq_internal_ref(grpc_completion_queue *cc) { + gpr_ref(&cc->owning_refs); +} + +static void on_pollset_destroy_done(void *arg) { + grpc_completion_queue *cc = arg; + grpc_pollset_destroy(&cc->pollset); + gpr_free(cc); +} + +void grpc_cq_internal_unref(grpc_completion_queue *cc) { + if (gpr_unref(&cc->owning_refs)) { + GPR_ASSERT(cc->queue == NULL); + grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + } +} + void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) { cc->allow_polling = 0; } @@ -132,22 +152,9 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, return ev; } -void grpc_cq_internal_ref(grpc_completion_queue *cc) { - gpr_ref(&cc->refs); -} - -void grpc_cq_internal_unref(grpc_completion_queue *cc) { - if (gpr_unref(&cc->refs)) { - GPR_ASSERT(!cc->shutdown); - GPR_ASSERT(cc->shutdown_called); - cc->shutdown = 1; - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); - } -} - void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, grpc_completion_type type) { - grpc_cq_internal_ref(cc); + gpr_ref(&cc->refs); if (call) grpc_call_internal_ref(call); #ifndef NDEBUG gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1); @@ -161,7 +168,12 @@ static void end_op_locked(grpc_completion_queue *cc, #ifndef NDEBUG GPR_ASSERT(gpr_atm_full_fetch_add(&cc->pending_op_count[type], -1) > 0); #endif - grpc_cq_internal_unref(cc); + if (gpr_unref(&cc->refs)) { + GPR_ASSERT(!cc->shutdown); + GPR_ASSERT(cc->shutdown_called); + cc->shutdown = 1; + gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); + } } void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag) { @@ -402,15 +414,8 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { } } -static void on_pollset_destroy_done(void *arg) { - grpc_completion_queue *cc = arg; - grpc_pollset_destroy(&cc->pollset); - gpr_free(cc); -} - void grpc_completion_queue_destroy(grpc_completion_queue *cc) { - GPR_ASSERT(cc->queue == NULL); - grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + grpc_cq_internal_unref(cc); } void grpc_event_finish(grpc_event *base) { |