diff options
Diffstat (limited to 'src/core/lib/surface/completion_queue.cc')
-rw-r--r-- | src/core/lib/surface/completion_queue.cc | 47 |
1 files changed, 29 insertions, 18 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 0769d9e4f6..661022ec5f 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -79,6 +79,7 @@ typedef struct non_polling_worker { typedef struct { gpr_mu mu; + bool kicked_without_poller; non_polling_worker* root; grpc_closure* shutdown; } non_polling_poller; @@ -103,6 +104,10 @@ static grpc_error* non_polling_poller_work(grpc_pollset* pollset, grpc_millis deadline) { non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset); if (npp->shutdown) return GRPC_ERROR_NONE; + if (npp->kicked_without_poller) { + npp->kicked_without_poller = false; + return GRPC_ERROR_NONE; + } non_polling_worker w; gpr_cv_init(&w.cv); if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w); @@ -148,6 +153,8 @@ static grpc_error* non_polling_poller_kick( w->kicked = true; gpr_cv_signal(&w->cv); } + } else { + p->kicked_without_poller = true; } return GRPC_ERROR_NONE; } @@ -184,7 +191,8 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; size_t data_size; - void (*init)(void* data, grpc_core::CQCallbackInterface* shutdown_callback); + void (*init)(void* data, + grpc_experimental_completion_queue_functor* shutdown_callback); void (*shutdown)(grpc_completion_queue* cq); void (*destroy)(void* data); bool (*begin_op)(grpc_completion_queue* cq, void* tag); @@ -267,7 +275,7 @@ typedef struct cq_callback_data { bool shutdown_called; /** A callback that gets invoked when the CQ completes shutdown */ - grpc_core::CQCallbackInterface* shutdown_callback; + grpc_experimental_completion_queue_functor* shutdown_callback; } cq_callback_data; /* Completion queue structure */ @@ -333,12 +341,12 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback -static void cq_init_next(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); -static void cq_init_pluck(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); -static void cq_init_callback(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); +static void cq_init_next( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); +static void cq_init_pluck( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); +static void cq_init_callback( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); static void cq_destroy_next(void* data); static void cq_destroy_pluck(void* data); static void cq_destroy_callback(void* data); @@ -462,7 +470,7 @@ static long cq_event_queue_num_items(grpc_cq_event_queue* q) { grpc_completion_queue* grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, - grpc_core::CQCallbackInterface* shutdown_callback) { + grpc_experimental_completion_queue_functor* shutdown_callback) { GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0); grpc_completion_queue* cq; @@ -497,8 +505,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal( return cq; } -static void cq_init_next(void* data, - grpc_core::CQCallbackInterface* shutdown_callback) { +static void cq_init_next( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_next_data* cqd = static_cast<cq_next_data*>(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -513,8 +521,8 @@ static void cq_destroy_next(void* data) { cq_event_queue_destroy(&cqd->queue); } -static void cq_init_pluck(void* data, - grpc_core::CQCallbackInterface* shutdown_callback) { +static void cq_init_pluck( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -532,7 +540,7 @@ static void cq_destroy_pluck(void* data) { } static void cq_init_callback( - void* data, grpc_core::CQCallbackInterface* shutdown_callback) { + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_callback_data* cqd = static_cast<cq_callback_data*>(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -851,15 +859,16 @@ static void cq_end_op_for_callback( gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { - cq_finish_shutdown_callback(cq); gpr_mu_unlock(cq->mu); + cq_finish_shutdown_callback(cq); } else { gpr_mu_unlock(cq->mu); } GRPC_ERROR_UNREF(error); - (static_cast<grpc_core::CQCallbackInterface*>(tag))->Run(is_success); + auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag); + (*functor->functor_run)(functor, is_success); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, @@ -1343,7 +1352,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - callback->Run(true); + (*callback->functor_run)(callback, true); } static void cq_shutdown_callback(grpc_completion_queue* cq) { @@ -1364,9 +1373,11 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) { } cqd->shutdown_called = true; if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + gpr_mu_unlock(cq->mu); cq_finish_shutdown_callback(cq); + } else { + gpr_mu_unlock(cq->mu); } - gpr_mu_unlock(cq->mu); GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); } |