From cbe159925005d6f71e891792d593b51263ac6b76 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 5 Dec 2016 13:56:29 -0800 Subject: Track requests that could cause other requests to be created, and don't do a real core shutdown of a CQ until such requests are done --- src/cpp/common/completion_queue_cc.cc | 15 +++++++++++---- src/cpp/server/server_cc.cc | 5 +++++ 2 files changed, 16 insertions(+), 4 deletions(-) (limited to 'src/cpp') diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 00cc102f92..558f47af84 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -43,11 +43,18 @@ namespace grpc { static internal::GrpcLibraryInitializer g_gli_initializer; -CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {} +CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) { + RegisterAvalanching(); +} + +void CompletionQueue::Shutdown() { CompleteAvalanching(); } -void CompletionQueue::Shutdown() { - g_gli_initializer.summon(); - grpc_completion_queue_shutdown(cq_); +void CompletionQueue::CompleteAvalanching() { + // Check if this was the last avalanching operation + if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, + static_cast(-1)) == 1) { + grpc_completion_queue_shutdown(cq_); + } } CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index b7cfd6dbf1..96820ff963 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -575,9 +575,14 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest( tag_(tag), delete_on_finalize_(delete_on_finalize), call_(nullptr) { + call_cq_->RegisterAvalanching(); // This op will trigger more ops memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); } +ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { + call_cq_->CompleteAvalanching(); +} + bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { if (*status) { -- cgit v1.2.3 From bf24dd9e51f433aec60fbef24ee68496829277b1 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 5 Dec 2016 13:59:09 -0800 Subject: clang-format --- include/grpc++/impl/codegen/completion_queue.h | 2 +- src/cpp/server/server_cc.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src/cpp') diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index a9bb98edd9..75e73ee1f4 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -102,7 +102,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// instance. CompletionQueue() { cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr); - RegisterAvalanching(); // reserve this for the future shutdown + RegisterAvalanching(); // reserve this for the future shutdown } /// Wrap \a take, taking ownership of the instance. diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 96820ff963..1063c5d7f2 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -575,7 +575,7 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest( tag_(tag), delete_on_finalize_(delete_on_finalize), call_(nullptr) { - call_cq_->RegisterAvalanching(); // This op will trigger more ops + call_cq_->RegisterAvalanching(); // This op will trigger more ops memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); } -- cgit v1.2.3 From 6510d47c8117d357dfaa289683aa5feca50d7c72 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 5 Dec 2016 14:16:20 -0800 Subject: gpr_atm isn't automatically initialized to 0. Thanks Obama. --- include/grpc++/impl/codegen/completion_queue.h | 5 ++++- src/cpp/common/completion_queue_cc.cc | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) (limited to 'src/cpp') diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 75e73ee1f4..944f2c3919 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -102,7 +102,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// instance. CompletionQueue() { cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr); - RegisterAvalanching(); // reserve this for the future shutdown + InitialAvalanching(); // reserve this for the future shutdown } /// Wrap \a take, taking ownership of the instance. @@ -174,6 +174,9 @@ class CompletionQueue : private GrpcLibraryCodegen { /// been finalized. Note that we maintain the requirement that an avalanche /// registration must take place before CQ shutdown (which must be maintained /// elsehwere) + void InitialAvalanching() { + gpr_atm_rel_store(&avalanches_in_flight_, static_cast(1)); + } void RegisterAvalanching() { gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, static_cast(1)); diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 558f47af84..418baac5b2 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -44,7 +44,7 @@ namespace grpc { static internal::GrpcLibraryInitializer g_gli_initializer; CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) { - RegisterAvalanching(); + InitialAvalanching(); } void CompletionQueue::Shutdown() { CompleteAvalanching(); } -- cgit v1.2.3 From 7e499aa3fbc9b7baa6571ed4a2c595f6c06e8faa Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 5 Dec 2016 14:50:14 -0800 Subject: Bring back gli initializer summoning --- src/cpp/common/completion_queue_cc.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src/cpp') diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 418baac5b2..0408a41085 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -47,7 +47,10 @@ CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) { InitialAvalanching(); } -void CompletionQueue::Shutdown() { CompleteAvalanching(); } +void CompletionQueue::Shutdown() { + g_gli_initializer.summon(); + CompleteAvalanching(); +} void CompletionQueue::CompleteAvalanching() { // Check if this was the last avalanching operation -- cgit v1.2.3 From 9c5318aba097dc8634626ba0b106fc8993ce32b5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 5 Dec 2016 15:07:04 -0800 Subject: Fix shutdown problems with sync server --- src/core/lib/surface/call.c | 4 ++++ src/cpp/server/server_cc.cc | 12 ++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) (limited to 'src/cpp') diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 1e0f3eeca5..8ca3cab9d5 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1551,6 +1551,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } + /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come + from server.c. In that case, it's coming from accept_stream, and in + that case we're not necessarily covered by a poller. */ + stream_op->covered_by_poller = call->is_client; call->received_initial_metadata = 1; call->buffered_metadata[0] = op->data.recv_initial_metadata; grpc_closure_init(&call->receiving_initial_metadata_ready, diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index b7cfd6dbf1..3ec7faddad 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -510,12 +510,6 @@ void Server::ShutdownInternal(gpr_timespec deadline) { ShutdownTag shutdown_tag; // Dummy shutdown tag grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); - // Shutdown all ThreadManagers. This will try to gracefully stop all the - // threads in the ThreadManagers (once they process any inflight requests) - for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { - (*it)->Shutdown(); // ThreadManager's Shutdown() - } - shutdown_cq.Shutdown(); void* tag; @@ -531,6 +525,12 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has // successfully shutdown + // Shutdown all ThreadManagers. This will try to gracefully stop all the + // threads in the ThreadManagers (once they process any inflight requests) + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->Shutdown(); // ThreadManager's Shutdown() + } + // Wait for threads in all ThreadManagers to terminate for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Wait(); -- cgit v1.2.3