diff options
author | Craig Tiller <ctiller@google.com> | 2016-12-05 15:23:24 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-12-05 15:23:24 -0800 |
commit | 749ccd99df001c2b93e95fbd90c01a9dfce57d5a (patch) | |
tree | 1ae8ba1e3d1e57b022e56ba8021b4a31579865f6 /src/cpp | |
parent | 9c5318aba097dc8634626ba0b106fc8993ce32b5 (diff) | |
parent | 7e499aa3fbc9b7baa6571ed4a2c595f6c06e8faa (diff) |
Merge branch 'track_cq_finalize' of github.com:vjpai/grpc into fixit29
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/common/completion_queue_cc.cc | 14 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 5 |
2 files changed, 17 insertions, 2 deletions
diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 00cc102f92..0408a41085 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -43,11 +43,21 @@ namespace grpc { static internal::GrpcLibraryInitializer g_gli_initializer; -CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {} +CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) { + InitialAvalanching(); +} void CompletionQueue::Shutdown() { g_gli_initializer.summon(); - grpc_completion_queue_shutdown(cq_); + CompleteAvalanching(); +} + +void CompletionQueue::CompleteAvalanching() { + // Check if this was the last avalanching operation + if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, + static_cast<gpr_atm>(-1)) == 1) { + grpc_completion_queue_shutdown(cq_); + } } CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 3ec7faddad..817d85a81c 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -575,9 +575,14 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest( tag_(tag), delete_on_finalize_(delete_on_finalize), call_(nullptr) { + call_cq_->RegisterAvalanching(); // This op will trigger more ops memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); } +ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { + call_cq_->CompleteAvalanching(); +} + bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { if (*status) { |