aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-09-26 09:48:48 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-09-26 09:48:48 -0700
commit862acb9f3a42cf4bacf75ba9dd831a539c93a4f1 (patch)
tree80a14ffc782848a86ea1b5c315e194daa13fe946 /src/cpp
parent18d3ace7dbb2e4f5ff0325802708de43db2bae71 (diff)
fix shutdown crash
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/rpcmanager/grpc_rpc_manager.cc2
-rw-r--r--src/cpp/server/server_cc.cc61
2 files changed, 31 insertions, 32 deletions
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/rpcmanager/grpc_rpc_manager.cc
index c47f76b5af..58b337da63 100644
--- a/src/cpp/rpcmanager/grpc_rpc_manager.cc
+++ b/src/cpp/rpcmanager/grpc_rpc_manager.cc
@@ -64,8 +64,6 @@ GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers)
GrpcRpcManager::~GrpcRpcManager() {
std::unique_lock<grpc::mutex> lock(mu_);
- // ShutdownRpcManager() and Wait() must be called before destroying the object
- GPR_ASSERT(shutdown_);
GPR_ASSERT(num_threads_ == 0);
CleanupCompletedThreads();
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 4ab531df42..54ac25d76b 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -129,9 +129,7 @@ class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
class ShutdownTag : public CompletionQueueTag {
public:
- bool FinalizeResult(void** tag, bool *status) {
- return false;
- }
+ bool FinalizeResult(void** tag, bool* status) { return false; }
};
class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
@@ -196,9 +194,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
cq_ = nullptr;
}
- void ResetRequest() {
- in_flight_ = false;
- }
+ void ResetRequest() { in_flight_ = false; }
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
@@ -301,7 +297,7 @@ class Server::SyncRequestManager : public GrpcRpcManager {
server_cq_(server_cq),
global_callbacks_(global_callbacks) {}
- static const int kRpcPollingTimeoutMsec = 500;
+ static const int kRpcPollingTimeoutMsec = 10;
WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE {
*tag = nullptr;
@@ -368,6 +364,17 @@ class Server::SyncRequestManager : public GrpcRpcManager {
}
}
+ void ShutdownAndDrainCompletionQueue() {
+ server_cq_->Shutdown();
+
+ // Drain any pending items from the queue
+ void* tag;
+ bool ok;
+ while (server_cq_->Next(&tag, &ok)) {
+ // Nothing to be done here
+ }
+ }
+
void Start() {
if (!sync_methods_.empty()) {
for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) {
@@ -420,23 +427,17 @@ Server::Server(
Server::~Server() {
{
+ // TODO (sreek) Check if we can just call Shutdown() even in case where
+ // started_ == false. This will make things much simpler
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
lock.unlock();
Shutdown();
} else if (!started_) {
- // TODO (sreek): Check if we can just do this once in ~Server() (i.e
- // Do not 'shutdown' queues in Shutdown() function and do it here in the
- // destructor
- for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
- it++) {
- (*it)->Shutdown();
+ // Shutdown the completion queues
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->ShutdownAndDrainCompletionQueue();
}
-
- // TODO (sreek) Delete this
- /*
- cq_.Shutdown();
- */
}
}
@@ -571,8 +572,10 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
if (started_ && !shutdown_) {
shutdown_ = true;
+ /// The completion queue to use for server shutdown completion notification
+ CompletionQueue shutdown_cq;
ShutdownTag shutdown_tag; // Dummy shutdown tag
- grpc_server_shutdown_and_notify(server_, shutdown_cq_.cq(), &shutdown_tag);
+ grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
// Shutdown all RpcManagers. This will try to gracefully stop all the
// threads in the RpcManagers (once they process any inflight requests)
@@ -580,16 +583,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
(*it)->ShutdownRpcManager();
}
- shutdown_cq_.Shutdown();
+ shutdown_cq.Shutdown();
void* tag;
bool ok;
CompletionQueue::NextStatus status =
- shutdown_cq_.AsyncNext(&tag, &ok, deadline);
+ shutdown_cq.AsyncNext(&tag, &ok, deadline);
- // If this timed out, it means we are done with the grace-period for
- // a clean shutdown. We should force a shutdown now by cancelling all
- // inflight calls
+ // If this timed out, it means we are done with the grace period for a clean
+ // shutdown. We should force a shutdown now by cancelling all inflight calls
if (status == CompletionQueue::NextStatus::TIMEOUT) {
grpc_server_cancel_all_calls(server_);
}
@@ -599,14 +601,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Wait for threads in all RpcManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
+ (*it)->ShutdownAndDrainCompletionQueue();
}
- // Shutdown the completion queues
- // TODO (sreek) Move this into SyncRequestManager (or move it to Server
- // destructor)
- for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
- it++) {
- (*it)->Shutdown();
+ // Drain the shutdown queue (if the previous call to AsyncNext() timed out
+ // and we didn't remove the tag from the queue yet)
+ while(shutdown_cq.Next(&tag, &ok)) {
+ // Nothing to be done here
}
/*