aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-08-14 00:58:33 -0700
committerGravatar Vijay Pai <vpai@google.com>2018-08-14 00:58:33 -0700
commitfe7f79189be8c774a3cc210d4b86191179742ca6 (patch)
tree679a3af39f7bf4b72ce97ad1a1cd1973cb4655ad /src/core/lib
parentbbb97318e8db89ee57506bdfb835e897ed9ffa7b (diff)
Address reviewer comments
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/surface/completion_queue.cc39
-rw-r--r--src/core/lib/surface/completion_queue.h9
2 files changed, 31 insertions, 17 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index fd33ce044c..9086578f7c 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -263,12 +263,6 @@ typedef struct cq_callback_data {
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
- /** 0 initially. 1 once we completed shutting */
- /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
- * (pending_events == 0). So consider removing this in future and use
- * pending_events */
- gpr_atm shutdown;
-
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called;
@@ -308,6 +302,12 @@ static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
+// A cq_end_op function is called when an operation on a given CQ with
+// a given tag has completed. The storage argument is a reference to the
+// space reserved for this completion as it is placed into the corresponding
+// queue. The done argument is a callback that will be invoked when it is
+// safe to free up that storage. The storage MUST NOT be freed until the
+// done callback is invoked.
static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
grpc_error* error,
void (*done)(void* done_arg,
@@ -332,8 +332,11 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved);
-static void cq_init_next(void* data, grpc_core::CQCallbackInterface*);
-static void cq_init_pluck(void* data, grpc_core::CQCallbackInterface*);
+// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
+static void cq_init_next(void* data,
+ grpc_core::CQCallbackInterface* shutdown_callback);
+static void cq_init_pluck(void* data,
+ grpc_core::CQCallbackInterface* shutdown_callback);
static void cq_init_callback(void* data,
grpc_core::CQCallbackInterface* shutdown_callback);
static void cq_destroy_next(void* data);
@@ -494,7 +497,11 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
return cq;
}
-static void cq_init_next(void* ptr, grpc_core::CQCallbackInterface*) {
+static void cq_init_next(void* ptr,
+ grpc_core::CQCallbackInterface* shutdown_callback) {
+ // shutdown_callback should not be provided to this CQ variant
+ GPR_ASSERT(shutdown_callback == nullptr);
+
cq_next_data* cqd = static_cast<cq_next_data*>(ptr);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -509,7 +516,11 @@ static void cq_destroy_next(void* ptr) {
cq_event_queue_destroy(&cqd->queue);
}
-static void cq_init_pluck(void* ptr, grpc_core::CQCallbackInterface*) {
+static void cq_init_pluck(void* ptr,
+ grpc_core::CQCallbackInterface* shutdown_callback) {
+ // shutdown_callback should not be provided to this CQ variant
+ GPR_ASSERT(shutdown_callback == nullptr);
+
cq_pluck_data* cqd = static_cast<cq_pluck_data*>(ptr);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -531,7 +542,6 @@ static void cq_init_callback(
cq_callback_data* cqd = static_cast<cq_callback_data*>(ptr);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
- gpr_atm_no_barrier_store(&cqd->shutdown, 0);
cqd->shutdown_called = false;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
cqd->shutdown_callback = shutdown_callback;
@@ -838,7 +848,8 @@ static void cq_end_op_for_callback(
}
}
- /* We don't care for the storage content */
+ // The callback-based CQ isn't really a queue at all and thus has no need
+ // for reserved storage. Invoke the done callback right away to release it.
done(done_arg, storage);
gpr_mu_lock(cq->mu);
@@ -1336,8 +1347,6 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
auto* callback = cqd->shutdown_callback;
GPR_ASSERT(cqd->shutdown_called);
- GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
- gpr_atm_no_barrier_store(&cqd->shutdown, 1);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
callback->Run(true);
@@ -1347,7 +1356,7 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
- * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
+ * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
* Pollset shutdown decrements the cq ref count which can potentially destroy
* the cq (if that happens to be the last ref).
* Creating an extra ref here prevents the cq from getting destroyed while
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 6d8c6c9b06..5aa54682e0 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -47,8 +47,13 @@ typedef struct grpc_cq_completion {
uintptr_t next;
} grpc_cq_completion;
-/// For callback CQs, the following is what is actually intended by
-/// the tag.
+/// For callback CQs, the tag that is passed in for an operation must
+/// actually be a pointer to an implementation of the following class.
+/// When the operation completes, the tag will be typecasted from void*
+/// to grpc_core::CQCallbackInterface* and then the Run method will be
+/// invoked on it. In practice, the language binding (e.g., C++ API
+/// implementation) is responsible for providing and using an implementation
+/// of this abstract base class.
namespace grpc_core {
class CQCallbackInterface {
public: