aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-12-05 17:50:27 -0800
committerGravatar GitHub <noreply@github.com>2016-12-05 17:50:27 -0800
commiteac8af73337485e4475ad6c543d3c16dfe5e255c (patch)
treef94b2203783ade45ea55ac335c2541943c8e10ce
parentbc74e1b2083ed16bfbc415b18aabbdde687e0b50 (diff)
parent7e499aa3fbc9b7baa6571ed4a2c595f6c06e8faa (diff)
Merge pull request #8959 from vjpai/track_cq_finalize
Don't allow core CQ shutdown whille C++ CQ tags may still trigger new ops
-rw-r--r--include/grpc++/impl/codegen/completion_queue.h22
-rw-r--r--include/grpc++/impl/codegen/server_interface.h2
-rw-r--r--src/cpp/common/completion_queue_cc.cc14
-rw-r--r--src/cpp/server/server_cc.cc5
4 files changed, 39 insertions, 4 deletions
diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h
index ef00163b7e..944f2c3919 100644
--- a/include/grpc++/impl/codegen/completion_queue.h
+++ b/include/grpc++/impl/codegen/completion_queue.h
@@ -52,6 +52,7 @@
#include <grpc++/impl/codegen/grpc_library.h>
#include <grpc++/impl/codegen/status.h>
#include <grpc++/impl/codegen/time.h>
+#include <grpc/impl/codegen/atm.h>
struct grpc_completion_queue;
@@ -101,6 +102,7 @@ class CompletionQueue : private GrpcLibraryCodegen {
/// instance.
CompletionQueue() {
cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr);
+ InitialAvalanching(); // reserve this for the future shutdown
}
/// Wrap \a take, taking ownership of the instance.
@@ -151,7 +153,8 @@ class CompletionQueue : private GrpcLibraryCodegen {
/// Request the shutdown of the queue.
///
- /// \warning This method must be called at some point. Once invoked, \a Next
+ /// \warning This method must be called at some point if this completion queue
+ /// is accessed with Next or AsyncNext. Once invoked, \a Next
/// will start to return false and \a AsyncNext will return \a
/// NextStatus::SHUTDOWN. Only once either one of these methods does that
/// (that is, once the queue has been \em drained) can an instance of this
@@ -165,6 +168,21 @@ class CompletionQueue : private GrpcLibraryCodegen {
/// owership is performed.
grpc_completion_queue* cq() { return cq_; }
+ /// Manage state of avalanching operations : completion queue tags that
+ /// trigger other completion queue operations. The underlying core completion
+ /// queue should not really shutdown until all avalanching operations have
+ /// been finalized. Note that we maintain the requirement that an avalanche
+ /// registration must take place before CQ shutdown (which must be maintained
+ /// elsehwere)
+ void InitialAvalanching() {
+ gpr_atm_rel_store(&avalanches_in_flight_, static_cast<gpr_atm>(1));
+ }
+ void RegisterAvalanching() {
+ gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
+ static_cast<gpr_atm>(1));
+ };
+ void CompleteAvalanching();
+
private:
// Friend synchronous wrappers so that they can access Pluck(), which is
// a semi-private API geared towards the synchronous implementation.
@@ -229,6 +247,8 @@ class CompletionQueue : private GrpcLibraryCodegen {
}
grpc_completion_queue* cq_; // owned
+
+ gpr_atm avalanches_in_flight_;
};
/// A specific type of completion queue used by the processing of notifications
diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h
index 41a64bead0..666b9ff66e 100644
--- a/include/grpc++/impl/codegen/server_interface.h
+++ b/include/grpc++/impl/codegen/server_interface.h
@@ -140,7 +140,7 @@ class ServerInterface : public CallHook {
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag,
bool delete_on_finalize);
- virtual ~BaseAsyncRequest() {}
+ virtual ~BaseAsyncRequest();
bool FinalizeResult(void** tag, bool* status) override;
diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc
index 00cc102f92..0408a41085 100644
--- a/src/cpp/common/completion_queue_cc.cc
+++ b/src/cpp/common/completion_queue_cc.cc
@@ -43,11 +43,21 @@ namespace grpc {
static internal::GrpcLibraryInitializer g_gli_initializer;
-CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {}
+CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {
+ InitialAvalanching();
+}
void CompletionQueue::Shutdown() {
g_gli_initializer.summon();
- grpc_completion_queue_shutdown(cq_);
+ CompleteAvalanching();
+}
+
+void CompletionQueue::CompleteAvalanching() {
+ // Check if this was the last avalanching operation
+ if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
+ static_cast<gpr_atm>(-1)) == 1) {
+ grpc_completion_queue_shutdown(cq_);
+ }
}
CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index b7cfd6dbf1..1063c5d7f2 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -575,9 +575,14 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
tag_(tag),
delete_on_finalize_(delete_on_finalize),
call_(nullptr) {
+ call_cq_->RegisterAvalanching(); // This op will trigger more ops
memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
}
+ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
+ call_cq_->CompleteAvalanching();
+}
+
bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
bool* status) {
if (*status) {