diff options
author | Sree Kuchibhotla <sreek@google.com> | 2016-09-26 09:48:48 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2016-09-26 09:48:48 -0700 |
commit | 862acb9f3a42cf4bacf75ba9dd831a539c93a4f1 (patch) | |
tree | 80a14ffc782848a86ea1b5c315e194daa13fe946 /src/cpp/server/server_cc.cc | |
parent | 18d3ace7dbb2e4f5ff0325802708de43db2bae71 (diff) |
fix shutdown crash
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r-- | src/cpp/server/server_cc.cc | 61 |
1 files changed, 31 insertions, 30 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 4ab531df42..54ac25d76b 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -129,9 +129,7 @@ class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { class ShutdownTag : public CompletionQueueTag { public: - bool FinalizeResult(void** tag, bool *status) { - return false; - } + bool FinalizeResult(void** tag, bool* status) { return false; } }; class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { @@ -196,9 +194,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { cq_ = nullptr; } - void ResetRequest() { - in_flight_ = false; - } + void ResetRequest() { in_flight_ = false; } void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); @@ -301,7 +297,7 @@ class Server::SyncRequestManager : public GrpcRpcManager { server_cq_(server_cq), global_callbacks_(global_callbacks) {} - static const int kRpcPollingTimeoutMsec = 500; + static const int kRpcPollingTimeoutMsec = 10; WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE { *tag = nullptr; @@ -368,6 +364,17 @@ class Server::SyncRequestManager : public GrpcRpcManager { } } + void ShutdownAndDrainCompletionQueue() { + server_cq_->Shutdown(); + + // Drain any pending items from the queue + void* tag; + bool ok; + while (server_cq_->Next(&tag, &ok)) { + // Nothing to be done here + } + } + void Start() { if (!sync_methods_.empty()) { for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) { @@ -420,23 +427,17 @@ Server::Server( Server::~Server() { { + // TODO (sreek) Check if we can just call Shutdown() even in case where + // started_ == false. This will make things much simpler grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { lock.unlock(); Shutdown(); } else if (!started_) { - // TODO (sreek): Check if we can just do this once in ~Server() (i.e - // Do not 'shutdown' queues in Shutdown() function and do it here in the - // destructor - for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); - it++) { - (*it)->Shutdown(); + // Shutdown the completion queues + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->ShutdownAndDrainCompletionQueue(); } - - // TODO (sreek) Delete this - /* - cq_.Shutdown(); - */ } } @@ -571,8 +572,10 @@ void Server::ShutdownInternal(gpr_timespec deadline) { if (started_ && !shutdown_) { shutdown_ = true; + /// The completion queue to use for server shutdown completion notification + CompletionQueue shutdown_cq; ShutdownTag shutdown_tag; // Dummy shutdown tag - grpc_server_shutdown_and_notify(server_, shutdown_cq_.cq(), &shutdown_tag); + grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); // Shutdown all RpcManagers. This will try to gracefully stop all the // threads in the RpcManagers (once they process any inflight requests) @@ -580,16 +583,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) { (*it)->ShutdownRpcManager(); } - shutdown_cq_.Shutdown(); + shutdown_cq.Shutdown(); void* tag; bool ok; CompletionQueue::NextStatus status = - shutdown_cq_.AsyncNext(&tag, &ok, deadline); + shutdown_cq.AsyncNext(&tag, &ok, deadline); - // If this timed out, it means we are done with the grace-period for - // a clean shutdown. We should force a shutdown now by cancelling all - // inflight calls + // If this timed out, it means we are done with the grace period for a clean + // shutdown. We should force a shutdown now by cancelling all inflight calls if (status == CompletionQueue::NextStatus::TIMEOUT) { grpc_server_cancel_all_calls(server_); } @@ -599,14 +601,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Wait for threads in all RpcManagers to terminate for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Wait(); + (*it)->ShutdownAndDrainCompletionQueue(); } - // Shutdown the completion queues - // TODO (sreek) Move this into SyncRequestManager (or move it to Server - // destructor) - for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); - it++) { - (*it)->Shutdown(); + // Drain the shutdown queue (if the previous call to AsyncNext() timed out + // and we didn't remove the tag from the queue yet) + while(shutdown_cq.Next(&tag, &ok)) { + // Nothing to be done here } /* |