diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/surface/completion_queue.cc | 155 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 21 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue_factory.cc | 17 |
3 files changed, 179 insertions, 14 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 7da9e6b74c..3ded712b70 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -184,7 +184,7 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; size_t data_size; - void (*init)(void* data); + void (*init)(void* data, grpc_core::CQCallbackInterface* shutdown_callback); void (*shutdown)(grpc_completion_queue* cq); void (*destroy)(void* data); bool (*begin_op)(grpc_completion_queue* cq, void* tag); @@ -253,6 +253,23 @@ typedef struct cq_pluck_data { plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; } cq_pluck_data; +typedef struct cq_callback_data { + /** No actual completed events queue, unlike other types */ + + /** Number of pending events (+1 if we're not shutdown) */ + gpr_atm pending_events; + + /** Counter of how many things have ever been queued on this completion queue + useful for avoiding locks to check the queue */ + gpr_atm things_queued_ever; + + /** 0 initially. 1 once we initiated shutdown */ + bool shutdown_called; + + /** A callback that gets invoked when the CQ completes shutdown */ + grpc_core::CQCallbackInterface* shutdown_callback; +} cq_callback_data; + /* Completion queue structure */ struct grpc_completion_queue { /** Once owning_refs drops to zero, we will destroy the cq */ @@ -276,12 +293,21 @@ struct grpc_completion_queue { /* Forward declarations */ static void cq_finish_shutdown_next(grpc_completion_queue* cq); static void cq_finish_shutdown_pluck(grpc_completion_queue* cq); +static void cq_finish_shutdown_callback(grpc_completion_queue* cq); static void cq_shutdown_next(grpc_completion_queue* cq); static void cq_shutdown_pluck(grpc_completion_queue* cq); +static void cq_shutdown_callback(grpc_completion_queue* cq); static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag); - +static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); + +// A cq_end_op function is called when an operation on a given CQ with +// a given tag has completed. The storage argument is a reference to the +// space reserved for this completion as it is placed into the corresponding +// queue. The done argument is a callback that will be invoked when it is +// safe to free up that storage. The storage MUST NOT be freed until the +// done callback is invoked. static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, @@ -294,16 +320,28 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage); +static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag, + grpc_error* error, + void (*done)(void* done_arg, + grpc_cq_completion* storage), + void* done_arg, grpc_cq_completion* storage); + static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); -static void cq_init_next(void* data); -static void cq_init_pluck(void* data); +// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback +static void cq_init_next(void* data, + grpc_core::CQCallbackInterface* shutdown_callback); +static void cq_init_pluck(void* data, + grpc_core::CQCallbackInterface* shutdown_callback); +static void cq_init_callback(void* data, + grpc_core::CQCallbackInterface* shutdown_callback); static void cq_destroy_next(void* data); static void cq_destroy_pluck(void* data); +static void cq_destroy_callback(void* data); /* Completion queue vtables based on the completion-type */ static const cq_vtable g_cq_vtable[] = { @@ -315,6 +353,10 @@ static const cq_vtable g_cq_vtable[] = { {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck, cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr, cq_pluck}, + /* GRPC_CQ_CALLBACK */ + {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback, + cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback, + cq_end_op_for_callback, nullptr, nullptr}, }; #define DATA_FROM_CQ(cq) ((void*)(cq + 1)) @@ -419,8 +461,8 @@ static long cq_event_queue_num_items(grpc_cq_event_queue* q) { } grpc_completion_queue* grpc_completion_queue_create_internal( - grpc_cq_completion_type completion_type, - grpc_cq_polling_type polling_type) { + grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, + grpc_core::CQCallbackInterface* shutdown_callback) { GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0); grpc_completion_queue* cq; @@ -448,14 +490,15 @@ grpc_completion_queue* grpc_completion_queue_create_internal( gpr_ref_init(&cq->owning_refs, 2); poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu); - vtable->init(DATA_FROM_CQ(cq)); + vtable->init(DATA_FROM_CQ(cq), shutdown_callback); GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq, grpc_schedule_on_exec_ctx); return cq; } -static void cq_init_next(void* ptr) { +static void cq_init_next(void* ptr, + grpc_core::CQCallbackInterface* shutdown_callback) { cq_next_data* cqd = static_cast<cq_next_data*>(ptr); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -470,7 +513,8 @@ static void cq_destroy_next(void* ptr) { cq_event_queue_destroy(&cqd->queue); } -static void cq_init_pluck(void* ptr) { +static void cq_init_pluck(void* ptr, + grpc_core::CQCallbackInterface* shutdown_callback) { cq_pluck_data* cqd = static_cast<cq_pluck_data*>(ptr); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -487,6 +531,18 @@ static void cq_destroy_pluck(void* ptr) { GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); } +static void cq_init_callback( + void* ptr, grpc_core::CQCallbackInterface* shutdown_callback) { + cq_callback_data* cqd = static_cast<cq_callback_data*>(ptr); + /* Initial count is dropped by grpc_completion_queue_shutdown */ + gpr_atm_no_barrier_store(&cqd->pending_events, 1); + cqd->shutdown_called = false; + gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); + cqd->shutdown_callback = shutdown_callback; +} + +static void cq_destroy_callback(void* ptr) {} + grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) { return cq->vtable->cq_completion_type; } @@ -596,6 +652,11 @@ static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) { return atm_inc_if_nonzero(&cqd->pending_events); } +static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag) { + cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); + return atm_inc_if_nonzero(&cqd->pending_events); +} + bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { #ifndef NDEBUG gpr_mu_lock(cq->mu); @@ -759,6 +820,48 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, GRPC_ERROR_UNREF(error); } +/* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */ +static void cq_end_op_for_callback( + grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, + grpc_cq_completion* storage) { + GPR_TIMER_SCOPE("cq_end_op_for_callback", 0); + + cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); + bool is_success = (error == GRPC_ERROR_NONE); + + if (grpc_api_trace.enabled() || + (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) { + const char* errmsg = grpc_error_string(error); + GRPC_API_TRACE( + "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, " + "done=%p, done_arg=%p, storage=%p)", + 6, (cq, tag, errmsg, done, done_arg, storage)); + if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); + } + } + + // The callback-based CQ isn't really a queue at all and thus has no need + // for reserved storage. Invoke the done callback right away to release it. + done(done_arg, storage); + + gpr_mu_lock(cq->mu); + cq_check_tag(cq, tag, false); /* Used in debug builds only */ + + gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + cq_finish_shutdown_callback(cq); + gpr_mu_unlock(cq->mu); + } else { + gpr_mu_unlock(cq->mu); + } + + GRPC_ERROR_UNREF(error); + + (static_cast<grpc_core::CQCallbackInterface*>(tag))->Run(is_success); +} + void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage) { @@ -1233,6 +1336,40 @@ static void cq_shutdown_pluck(grpc_completion_queue* cq) { GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)"); } +static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { + cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); + auto* callback = cqd->shutdown_callback; + + GPR_ASSERT(cqd->shutdown_called); + + cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); + callback->Run(true); +} + +static void cq_shutdown_callback(grpc_completion_queue* cq) { + cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); + + /* Need an extra ref for cq here because: + * We call cq_finish_shutdown_callback() below, which calls pollset shutdown. + * Pollset shutdown decrements the cq ref count which can potentially destroy + * the cq (if that happens to be the last ref). + * Creating an extra ref here prevents the cq from getting destroyed while + * this function is still active */ + GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)"); + gpr_mu_lock(cq->mu); + if (cqd->shutdown_called) { + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); + return; + } + cqd->shutdown_called = true; + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + cq_finish_shutdown_callback(cq); + } + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); +} + /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue* cq) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 84446a4d92..a7c524d8e8 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -25,6 +25,7 @@ #include <grpc/grpc.h> #include "src/core/lib/debug/trace.h" +#include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/iomgr/pollset.h" /* These trace flags default to 1. The corresponding lines are only traced @@ -47,6 +48,23 @@ typedef struct grpc_cq_completion { uintptr_t next; } grpc_cq_completion; +/// For callback CQs, the tag that is passed in for an operation must +/// actually be a pointer to an implementation of the following class. +/// When the operation completes, the tag will be typecasted from void* +/// to grpc_core::CQCallbackInterface* and then the Run method will be +/// invoked on it. In practice, the language binding (e.g., C++ API +/// implementation) is responsible for providing and using an implementation +/// of this abstract base class. +namespace grpc_core { +class CQCallbackInterface { + public: + virtual ~CQCallbackInterface() {} + virtual void Run(bool) GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS +}; +} // namespace grpc_core + #ifndef NDEBUG void grpc_cq_internal_ref(grpc_completion_queue* cc, const char* reason, const char* file, int line); @@ -87,6 +105,7 @@ grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cc); int grpc_get_cq_poll_num(grpc_completion_queue* cc); grpc_completion_queue* grpc_completion_queue_create_internal( - grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type); + grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, + grpc_core::CQCallbackInterface* shutdown_callback); #endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/lib/surface/completion_queue_factory.cc b/src/core/lib/surface/completion_queue_factory.cc index 51c1183c5f..ed92dd7eba 100644 --- a/src/core/lib/surface/completion_queue_factory.cc +++ b/src/core/lib/surface/completion_queue_factory.cc @@ -30,8 +30,9 @@ static grpc_completion_queue* default_create( const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attr) { - return grpc_completion_queue_create_internal(attr->cq_completion_type, - attr->cq_polling_type); + return grpc_completion_queue_create_internal( + attr->cq_completion_type, attr->cq_polling_type, + static_cast<grpc_core::CQCallbackInterface*>(attr->cq_shutdown_cb)); } static grpc_completion_queue_factory_vtable default_vtable = {default_create}; @@ -60,14 +61,22 @@ const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup( grpc_completion_queue* grpc_completion_queue_create_for_next(void* reserved) { GPR_ASSERT(!reserved); grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT, - GRPC_CQ_DEFAULT_POLLING}; + GRPC_CQ_DEFAULT_POLLING, nullptr}; return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr); } grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) { GPR_ASSERT(!reserved); grpc_completion_queue_attributes attr = {1, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}; + GRPC_CQ_DEFAULT_POLLING, nullptr}; + return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr); +} + +grpc_completion_queue* grpc_completion_queue_create_for_callback( + void* shutdown_callback, void* reserved) { + GPR_ASSERT(!reserved); + grpc_completion_queue_attributes attr = { + 2, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}; return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr); } |