diff options
Diffstat (limited to 'src/core/surface/completion_queue.c')
-rw-r--r-- | src/core/surface/completion_queue.c | 78 |
1 files changed, 42 insertions, 36 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 8c9ca48a05..57ecf365cc 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -59,9 +59,6 @@ typedef struct event { /* Completion queue structure */ struct grpc_completion_queue { - /* TODO(ctiller): see if this can be removed */ - int allow_polling; - /* When refs drops to zero, we are in shutdown mode, and will be destroyable once all queued events are drained */ gpr_refcount refs; @@ -83,33 +80,41 @@ grpc_completion_queue *grpc_completion_queue_create(void) { memset(cc, 0, sizeof(*cc)); /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->refs, 1); - gpr_ref_init(&cc->owning_refs, 1); + /* One for destroy(), one for pollset_shutdown */ + gpr_ref_init(&cc->owning_refs, 2); grpc_pollset_init(&cc->pollset); - cc->allow_polling = 1; return cc; } +#ifdef GRPC_CQ_REF_COUNT_DEBUG +void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason) { + gpr_log(GPR_DEBUG, "CQ:%p ref %d -> %d %s", cc, (int)cc->owning_refs.count, + (int)cc->owning_refs.count + 1, reason); +#else void grpc_cq_internal_ref(grpc_completion_queue *cc) { +#endif gpr_ref(&cc->owning_refs); } static void on_pollset_destroy_done(void *arg) { grpc_completion_queue *cc = arg; - grpc_pollset_destroy(&cc->pollset); - gpr_free(cc); + GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy"); } +#ifdef GRPC_CQ_REF_COUNT_DEBUG +void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason) { + gpr_log(GPR_DEBUG, "CQ:%p unref %d -> %d %s", cc, (int)cc->owning_refs.count, + (int)cc->owning_refs.count - 1, reason); +#else void grpc_cq_internal_unref(grpc_completion_queue *cc) { +#endif if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->queue == NULL); - grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + grpc_pollset_destroy(&cc->pollset); + gpr_free(cc); } } -void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) { - cc->allow_polling = 0; -} - /* Create and append an event to the queue. Returns the event so that its data members can be filled in. Requires GRPC_POLLSET_MU(&cc->pollset) locked. */ @@ -133,7 +138,6 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, ev->bucket_prev = cc->buckets[bucket]->bucket_prev; ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev; } - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); grpc_pollset_kick(&cc->pollset); return ev; } @@ -145,25 +149,24 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) { /* Signal the end of an operation - if this is the last waiting-to-be-queued event, then enter shutdown mode */ -static void end_op_locked(grpc_completion_queue *cc, - grpc_completion_type type) { - if (gpr_unref(&cc->refs)) { - GPR_ASSERT(!cc->shutdown); - GPR_ASSERT(cc->shutdown_called); - cc->shutdown = 1; - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); - } -} - void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, int success) { event *ev; + int shutdown = 0; gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call); ev->base.success = success; - end_op_locked(cc, GRPC_OP_COMPLETE); + if (gpr_unref(&cc->refs)) { + GPR_ASSERT(!cc->shutdown); + GPR_ASSERT(cc->shutdown_called); + cc->shutdown = 1; + shutdown = 1; + } gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); if (call) GRPC_CALL_INTERNAL_UNREF(call, "cq", 0); + if (shutdown) { + grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + } } /* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ @@ -179,6 +182,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, event *ev = NULL; grpc_event ret; + GRPC_CQ_INTERNAL_REF(cc, "next"); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { if (cc->queue != NULL) { @@ -205,15 +209,12 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) { - continue; - } - if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), - GRPC_POLLSET_MU(&cc->pollset), deadline)) { + if (!grpc_pollset_work(&cc->pollset, deadline)) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); + GRPC_CQ_INTERNAL_UNREF(cc, "next"); return ret; } } @@ -221,6 +222,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ret = ev->base; gpr_free(ev); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); + GRPC_CQ_INTERNAL_UNREF(cc, "next"); return ret; } @@ -258,6 +260,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, event *ev = NULL; grpc_event ret; + GRPC_CQ_INTERNAL_REF(cc, "pluck"); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { if ((ev = pluck_event(cc, tag))) { @@ -267,15 +270,12 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) { - continue; - } - if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), - GRPC_POLLSET_MU(&cc->pollset), deadline)) { + if (!grpc_pollset_work(&cc->pollset, deadline)) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); + GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); return ret; } } @@ -283,6 +283,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ret = ev->base; gpr_free(ev); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); + GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); return ret; } @@ -290,6 +291,10 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + if (cc->shutdown_called) { + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + return; + } cc->shutdown_called = 1; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); @@ -297,13 +302,14 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); } } void grpc_completion_queue_destroy(grpc_completion_queue *cc) { - grpc_cq_internal_unref(cc); + grpc_completion_queue_shutdown(cc); + GRPC_CQ_INTERNAL_UNREF(cc, "destroy"); } grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { |