From e50e5cbde2f77166b9f557938252de6792557f84 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 18 Aug 2015 12:44:57 -0700 Subject: Add a timeout to shutdown to forcefully end calls --- src/cpp/server/server.cc | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) (limited to 'src/cpp') diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index a70b555855..fca1e517b3 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -90,6 +90,26 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } + static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok, + gpr_timespec deadline) { + void* tag = nullptr; + *ok = false; + switch (cq->AsyncNext(&tag, ok, deadline)) { + case CompletionQueue::TIMEOUT: + *req = nullptr; + return true; + case CompletionQueue::SHUTDOWN: + *req = nullptr; + return false; + case CompletionQueue::GOT_EVENT: + *req = static_cast(tag); + GPR_ASSERT((*req)->in_flight_); + return true; + } + gpr_log(GPR_ERROR, "Should never reach here"); + abort(); + } + void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void TeardownRequest() { @@ -303,12 +323,21 @@ bool Server::Start() { return true; } -void Server::Shutdown() { +void Server::ShutdownInternal(gpr_timespec deadline) { grpc::unique_lock lock(mu_); if (started_ && !shutdown_) { shutdown_ = true; grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); cq_.Shutdown(); + SyncRequest* request; + bool ok; + while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { + if (request == NULL) { // deadline expired + grpc_server_cancel_all_calls(server_); + } else if (ok) { + SyncRequest::CallData call_data(this, request); + } + } // Wait for running callbacks to finish. while (num_running_cb_ != 0) { -- cgit v1.2.3 From 9374ce819bff3c933f08b9512ded5c513527fd1f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 19 Aug 2015 10:15:44 -0700 Subject: Add comments, fix a subtle bug --- include/grpc++/server.h | 1 + src/cpp/server/server.cc | 4 ++++ 2 files changed, 5 insertions(+) (limited to 'src/cpp') diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 6a15dcb371..a2bc097c7f 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -69,6 +69,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { ShutdownInternal(TimePoint(deadline).raw_time()); } + // Shutdown the server, waiting for all rpc processing to finish. void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); } // Block waiting for all work to complete (the server must either diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index b27aa32276..8b21337529 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -329,11 +329,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) { shutdown_ = true; grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); cq_.Shutdown(); + // Spin, eating requests until the completion queue is completely shutdown. + // If the deadline expires then cancel anything that's pending and keep + // spinning forever until the work is actually drained. SyncRequest* request; bool ok; while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { if (request == NULL) { // deadline expired grpc_server_cancel_all_calls(server_); + deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); } else if (ok) { SyncRequest::CallData call_data(this, request); } -- cgit v1.2.3 From 681a291d12bb8fc7f8c238fdb674cd0f140294db Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 19 Aug 2015 11:31:25 -0700 Subject: Extend comment --- src/cpp/server/server.cc | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/cpp') diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 8b21337529..e039c07374 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -332,6 +332,8 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Spin, eating requests until the completion queue is completely shutdown. // If the deadline expires then cancel anything that's pending and keep // spinning forever until the work is actually drained. + // Since nothing else needs to touch state guarded by mu_, holding it + // through this loop is fine. SyncRequest* request; bool ok; while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { -- cgit v1.2.3 From 2f543f205cdb42e7324974c8ab093e33055e4476 Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 19 Aug 2015 15:14:17 -0700 Subject: Bug fix. Called c_str on a temp string --- src/cpp/client/channel.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/cpp') diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 9695a0f14b..17f31c22cb 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -71,7 +71,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, } else { const char* host_str = NULL; if (!context->authority().empty()) { - host_str = context->authority().c_str(); + host_str = context->authority_.c_str(); } else if (!host_.empty()) { host_str = host_.c_str(); } -- cgit v1.2.3