aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-12-09 22:42:54 -0800
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-12-09 22:42:54 -0800
commite33daabb8806de494bcaa9adf201025dadf5d102 (patch)
tree9fcf80078db52fca0bfff84618042414f4f8d4cc /src/cpp
parent762ce2744c4c3814d537705e8fdfb54f4a64f26a (diff)
parent48b1558823355016ffc852ca6b97d6b39f4c04eb (diff)
Merge branch 'master' into reduce_cq
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/common/completion_queue_cc.cc14
-rw-r--r--src/cpp/server/server_cc.cc17
2 files changed, 23 insertions, 8 deletions
diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc
index 00cc102f92..0408a41085 100644
--- a/src/cpp/common/completion_queue_cc.cc
+++ b/src/cpp/common/completion_queue_cc.cc
@@ -43,11 +43,21 @@ namespace grpc {
static internal::GrpcLibraryInitializer g_gli_initializer;
-CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {}
+CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {
+ InitialAvalanching();
+}
void CompletionQueue::Shutdown() {
g_gli_initializer.summon();
- grpc_completion_queue_shutdown(cq_);
+ CompleteAvalanching();
+}
+
+void CompletionQueue::CompleteAvalanching() {
+ // Check if this was the last avalanching operation
+ if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
+ static_cast<gpr_atm>(-1)) == 1) {
+ grpc_completion_queue_shutdown(cq_);
+ }
}
CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index b7cfd6dbf1..817d85a81c 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -510,12 +510,6 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
ShutdownTag shutdown_tag; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
- // Shutdown all ThreadManagers. This will try to gracefully stop all the
- // threads in the ThreadManagers (once they process any inflight requests)
- for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
- (*it)->Shutdown(); // ThreadManager's Shutdown()
- }
-
shutdown_cq.Shutdown();
void* tag;
@@ -531,6 +525,12 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown
+ // Shutdown all ThreadManagers. This will try to gracefully stop all the
+ // threads in the ThreadManagers (once they process any inflight requests)
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Shutdown(); // ThreadManager's Shutdown()
+ }
+
// Wait for threads in all ThreadManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
@@ -575,9 +575,14 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
tag_(tag),
delete_on_finalize_(delete_on_finalize),
call_(nullptr) {
+ call_cq_->RegisterAvalanching(); // This op will trigger more ops
memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
}
+ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
+ call_cq_->CompleteAvalanching();
+}
+
bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
bool* status) {
if (*status) {