diff options
author | 2018-08-14 00:58:33 -0700 | |
---|---|---|
committer | 2018-08-14 00:58:33 -0700 | |
commit | fe7f79189be8c774a3cc210d4b86191179742ca6 (patch) | |
tree | 679a3af39f7bf4b72ce97ad1a1cd1973cb4655ad /src/core/lib | |
parent | bbb97318e8db89ee57506bdfb835e897ed9ffa7b (diff) |
Address reviewer comments
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/surface/completion_queue.cc | 39 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 9 |
2 files changed, 31 insertions, 17 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index fd33ce044c..9086578f7c 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -263,12 +263,6 @@ typedef struct cq_callback_data { useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; - /** 0 initially. 1 once we completed shutting */ - /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if - * (pending_events == 0). So consider removing this in future and use - * pending_events */ - gpr_atm shutdown; - /** 0 initially. 1 once we initiated shutdown */ bool shutdown_called; @@ -308,6 +302,12 @@ static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); +// A cq_end_op function is called when an operation on a given CQ with +// a given tag has completed. The storage argument is a reference to the +// space reserved for this completion as it is placed into the corresponding +// queue. The done argument is a callback that will be invoked when it is +// safe to free up that storage. The storage MUST NOT be freed until the +// done callback is invoked. static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, @@ -332,8 +332,11 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); -static void cq_init_next(void* data, grpc_core::CQCallbackInterface*); -static void cq_init_pluck(void* data, grpc_core::CQCallbackInterface*); +// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback +static void cq_init_next(void* data, + grpc_core::CQCallbackInterface* shutdown_callback); +static void cq_init_pluck(void* data, + grpc_core::CQCallbackInterface* shutdown_callback); static void cq_init_callback(void* data, grpc_core::CQCallbackInterface* shutdown_callback); static void cq_destroy_next(void* data); @@ -494,7 +497,11 @@ grpc_completion_queue* grpc_completion_queue_create_internal( return cq; } -static void cq_init_next(void* ptr, grpc_core::CQCallbackInterface*) { +static void cq_init_next(void* ptr, + grpc_core::CQCallbackInterface* shutdown_callback) { + // shutdown_callback should not be provided to this CQ variant + GPR_ASSERT(shutdown_callback == nullptr); + cq_next_data* cqd = static_cast<cq_next_data*>(ptr); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -509,7 +516,11 @@ static void cq_destroy_next(void* ptr) { cq_event_queue_destroy(&cqd->queue); } -static void cq_init_pluck(void* ptr, grpc_core::CQCallbackInterface*) { +static void cq_init_pluck(void* ptr, + grpc_core::CQCallbackInterface* shutdown_callback) { + // shutdown_callback should not be provided to this CQ variant + GPR_ASSERT(shutdown_callback == nullptr); + cq_pluck_data* cqd = static_cast<cq_pluck_data*>(ptr); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -531,7 +542,6 @@ static void cq_init_callback( cq_callback_data* cqd = static_cast<cq_callback_data*>(ptr); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); - gpr_atm_no_barrier_store(&cqd->shutdown, 0); cqd->shutdown_called = false; gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); cqd->shutdown_callback = shutdown_callback; @@ -838,7 +848,8 @@ static void cq_end_op_for_callback( } } - /* We don't care for the storage content */ + // The callback-based CQ isn't really a queue at all and thus has no need + // for reserved storage. Invoke the done callback right away to release it. done(done_arg, storage); gpr_mu_lock(cq->mu); @@ -1336,8 +1347,6 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { auto* callback = cqd->shutdown_callback; GPR_ASSERT(cqd->shutdown_called); - GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); - gpr_atm_no_barrier_store(&cqd->shutdown, 1); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); callback->Run(true); @@ -1347,7 +1356,7 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) { cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); /* Need an extra ref for cq here because: - * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown. + * We call cq_finish_shutdown_callback() below, which calls pollset shutdown. * Pollset shutdown decrements the cq ref count which can potentially destroy * the cq (if that happens to be the last ref). * Creating an extra ref here prevents the cq from getting destroyed while diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 6d8c6c9b06..5aa54682e0 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -47,8 +47,13 @@ typedef struct grpc_cq_completion { uintptr_t next; } grpc_cq_completion; -/// For callback CQs, the following is what is actually intended by -/// the tag. +/// For callback CQs, the tag that is passed in for an operation must +/// actually be a pointer to an implementation of the following class. +/// When the operation completes, the tag will be typecasted from void* +/// to grpc_core::CQCallbackInterface* and then the Run method will be +/// invoked on it. In practice, the language binding (e.g., C++ API +/// implementation) is responsible for providing and using an implementation +/// of this abstract base class. namespace grpc_core { class CQCallbackInterface { public: |