From cbe159925005d6f71e891792d593b51263ac6b76 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 5 Dec 2016 13:56:29 -0800 Subject: Track requests that could cause other requests to be created, and don't do a real core shutdown of a CQ until such requests are done --- include/grpc++/impl/codegen/completion_queue.h | 19 ++++++++++++++++++- include/grpc++/impl/codegen/server_interface.h | 2 +- src/cpp/common/completion_queue_cc.cc | 15 +++++++++++---- src/cpp/server/server_cc.cc | 5 +++++ 4 files changed, 35 insertions(+), 6 deletions(-) diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index ef00163b7e..a9bb98edd9 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -52,6 +52,7 @@ #include #include #include +#include struct grpc_completion_queue; @@ -101,6 +102,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// instance. CompletionQueue() { cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr); + RegisterAvalanching(); // reserve this for the future shutdown } /// Wrap \a take, taking ownership of the instance. @@ -151,7 +153,8 @@ class CompletionQueue : private GrpcLibraryCodegen { /// Request the shutdown of the queue. /// - /// \warning This method must be called at some point. Once invoked, \a Next + /// \warning This method must be called at some point if this completion queue + /// is accessed with Next or AsyncNext. Once invoked, \a Next /// will start to return false and \a AsyncNext will return \a /// NextStatus::SHUTDOWN. Only once either one of these methods does that /// (that is, once the queue has been \em drained) can an instance of this @@ -165,6 +168,18 @@ class CompletionQueue : private GrpcLibraryCodegen { /// owership is performed. grpc_completion_queue* cq() { return cq_; } + /// Manage state of avalanching operations : completion queue tags that + /// trigger other completion queue operations. The underlying core completion + /// queue should not really shutdown until all avalanching operations have + /// been finalized. Note that we maintain the requirement that an avalanche + /// registration must take place before CQ shutdown (which must be maintained + /// elsehwere) + void RegisterAvalanching() { + gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, + static_cast(1)); + }; + void CompleteAvalanching(); + private: // Friend synchronous wrappers so that they can access Pluck(), which is // a semi-private API geared towards the synchronous implementation. @@ -229,6 +244,8 @@ class CompletionQueue : private GrpcLibraryCodegen { } grpc_completion_queue* cq_; // owned + + gpr_atm avalanches_in_flight_; }; /// A specific type of completion queue used by the processing of notifications diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index 41a64bead0..666b9ff66e 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -140,7 +140,7 @@ class ServerInterface : public CallHook { ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, bool delete_on_finalize); - virtual ~BaseAsyncRequest() {} + virtual ~BaseAsyncRequest(); bool FinalizeResult(void** tag, bool* status) override; diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 00cc102f92..558f47af84 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -43,11 +43,18 @@ namespace grpc { static internal::GrpcLibraryInitializer g_gli_initializer; -CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {} +CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) { + RegisterAvalanching(); +} + +void CompletionQueue::Shutdown() { CompleteAvalanching(); } -void CompletionQueue::Shutdown() { - g_gli_initializer.summon(); - grpc_completion_queue_shutdown(cq_); +void CompletionQueue::CompleteAvalanching() { + // Check if this was the last avalanching operation + if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, + static_cast(-1)) == 1) { + grpc_completion_queue_shutdown(cq_); + } } CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index b7cfd6dbf1..96820ff963 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -575,9 +575,14 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest( tag_(tag), delete_on_finalize_(delete_on_finalize), call_(nullptr) { + call_cq_->RegisterAvalanching(); // This op will trigger more ops memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); } +ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { + call_cq_->CompleteAvalanching(); +} + bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { if (*status) { -- cgit v1.2.3 From bf24dd9e51f433aec60fbef24ee68496829277b1 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 5 Dec 2016 13:59:09 -0800 Subject: clang-format --- include/grpc++/impl/codegen/completion_queue.h | 2 +- src/cpp/server/server_cc.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index a9bb98edd9..75e73ee1f4 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -102,7 +102,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// instance. CompletionQueue() { cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr); - RegisterAvalanching(); // reserve this for the future shutdown + RegisterAvalanching(); // reserve this for the future shutdown } /// Wrap \a take, taking ownership of the instance. diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 96820ff963..1063c5d7f2 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -575,7 +575,7 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest( tag_(tag), delete_on_finalize_(delete_on_finalize), call_(nullptr) { - call_cq_->RegisterAvalanching(); // This op will trigger more ops + call_cq_->RegisterAvalanching(); // This op will trigger more ops memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); } -- cgit v1.2.3 From 6510d47c8117d357dfaa289683aa5feca50d7c72 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 5 Dec 2016 14:16:20 -0800 Subject: gpr_atm isn't automatically initialized to 0. Thanks Obama. --- include/grpc++/impl/codegen/completion_queue.h | 5 ++++- src/cpp/common/completion_queue_cc.cc | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 75e73ee1f4..944f2c3919 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -102,7 +102,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// instance. CompletionQueue() { cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr); - RegisterAvalanching(); // reserve this for the future shutdown + InitialAvalanching(); // reserve this for the future shutdown } /// Wrap \a take, taking ownership of the instance. @@ -174,6 +174,9 @@ class CompletionQueue : private GrpcLibraryCodegen { /// been finalized. Note that we maintain the requirement that an avalanche /// registration must take place before CQ shutdown (which must be maintained /// elsehwere) + void InitialAvalanching() { + gpr_atm_rel_store(&avalanches_in_flight_, static_cast(1)); + } void RegisterAvalanching() { gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, static_cast(1)); diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 558f47af84..418baac5b2 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -44,7 +44,7 @@ namespace grpc { static internal::GrpcLibraryInitializer g_gli_initializer; CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) { - RegisterAvalanching(); + InitialAvalanching(); } void CompletionQueue::Shutdown() { CompleteAvalanching(); } -- cgit v1.2.3 From 7e499aa3fbc9b7baa6571ed4a2c595f6c06e8faa Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 5 Dec 2016 14:50:14 -0800 Subject: Bring back gli initializer summoning --- src/cpp/common/completion_queue_cc.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 418baac5b2..0408a41085 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -47,7 +47,10 @@ CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) { InitialAvalanching(); } -void CompletionQueue::Shutdown() { CompleteAvalanching(); } +void CompletionQueue::Shutdown() { + g_gli_initializer.summon(); + CompleteAvalanching(); +} void CompletionQueue::CompleteAvalanching() { // Check if this was the last avalanching operation -- cgit v1.2.3