From bb5519f5a52aeb23d32ec6ca817e008a65fdfa30 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 19 Jul 2016 11:00:39 -0700 Subject: More changes --- include/grpc++/server.h | 10 +++- src/cpp/rpcmanager/grpc_rpc_manager.cc | 19 +++---- src/cpp/rpcmanager/grpc_rpc_manager.h | 3 +- src/cpp/server/server.cc | 93 +++++++++++++++++++++++++--------- 4 files changed, 87 insertions(+), 38 deletions(-) diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 6876961e21..03c9778468 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -50,6 +50,8 @@ #include #include +#include "src/cpp/rpcmanager/grpc_rpc_manager.h" + struct grpc_server; namespace grpc { @@ -64,7 +66,9 @@ class ThreadPoolInterface; /// Models a gRPC server. /// /// Servers are configured and started via \a grpc::ServerBuilder. -class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { +class Server GRPC_FINAL : public ServerInterface, + private GrpcLibraryCodegen, + public GrpcRpcManager { public: ~Server(); @@ -99,6 +103,10 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { // Returns a \em raw pointer to the underlying CompletionQueue. CompletionQueue* completion_queue(); + /// GRPC RPC Manager functions + void PollForWork(bool& is_work_found, void** tag) GRPC_OVERRIDE; + void DoWork(void* tag) GRPC_OVERRIDE; + private: friend class AsyncGenericService; friend class ServerBuilder; diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/rpcmanager/grpc_rpc_manager.cc index 1c7d5adeaf..7cffb23858 100644 --- a/src/cpp/rpcmanager/grpc_rpc_manager.cc +++ b/src/cpp/rpcmanager/grpc_rpc_manager.cc @@ -65,24 +65,20 @@ GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers, GrpcRpcManager::~GrpcRpcManager() { std::unique_lock lock(mu_); - - shutdown_ = true; - while (num_threads_ != 0) { - shutdown_cv_.wait(lock); - } + // ShutdownRpcManager() and Wait() must be called before destroying the object + GPR_ASSERT(shutdown_); + GPR_ASSERT(num_threads_ == 0); CleanupCompletedThreads(); } -// For testing only void GrpcRpcManager::Wait() { std::unique_lock lock(mu_); - while (!shutdown_) { + while (num_threads_ != 0) { shutdown_cv_.wait(lock); } } -// For testing only void GrpcRpcManager::ShutdownRpcManager() { std::unique_lock lock(mu_); shutdown_ = true; @@ -120,7 +116,8 @@ bool GrpcRpcManager::MaybeContinueAsPoller() { void GrpcRpcManager::MaybeCreatePoller() { grpc::unique_lock lock(mu_); - if (num_pollers_ < min_pollers_ && num_threads_ < max_threads_) { + if (!shutdown_ && num_pollers_ < min_pollers_ && + num_threads_ < max_threads_) { num_pollers_++; num_threads_++; @@ -131,7 +128,7 @@ void GrpcRpcManager::MaybeCreatePoller() { void GrpcRpcManager::MainWorkLoop() { bool is_work_found = false; - void *tag; + void* tag; do { PollForWork(is_work_found, &tag); @@ -159,7 +156,7 @@ void GrpcRpcManager::MainWorkLoop() { grpc::unique_lock lock(mu_); num_threads_--; if (num_threads_ == 0) { - shutdown_cv_.notify_all(); + shutdown_cv_.notify_one(); } } diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.h b/src/cpp/rpcmanager/grpc_rpc_manager.h index a8cc6eb80f..475ce97995 100644 --- a/src/cpp/rpcmanager/grpc_rpc_manager.h +++ b/src/cpp/rpcmanager/grpc_rpc_manager.h @@ -53,7 +53,6 @@ class GrpcRpcManager { virtual void PollForWork(bool& is_work_found, void **tag) = 0; virtual void DoWork(void *tag) = 0; - // Use the following two functions for testing purposes only void Wait(); void ShutdownRpcManager(); @@ -64,6 +63,8 @@ class GrpcRpcManager { // The Run() function calls GrpcManager::MainWorkLoop() function and once that // completes, it marks the GrpcRpcManagerThread completed by calling // GrpcRpcManager::MarkAsCompleted() + // TODO: sreek - Consider using a separate threadpool rather than implementing + // one in this class class GrpcRpcManagerThread { public: GrpcRpcManagerThread(GrpcRpcManager* rpc_mgr); diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index af04fd4ca6..732c20b2d2 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -278,7 +278,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { static internal::GrpcLibraryInitializer g_gli_initializer; Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, int max_message_size, ChannelArguments* args) - : max_message_size_(max_message_size), + : GrpcRpcManager(3, 5, 8), + max_message_size_(max_message_size), started_(false), shutdown_(false), shutdown_notified_(false), @@ -314,6 +315,7 @@ Server::~Server() { cq_.Shutdown(); } } + void* got_tag; bool ok; GPR_ASSERT(!cq_.Next(&got_tag, &ok)); @@ -429,7 +431,8 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { m->Request(server_, cq_.cq()); } - ScheduleCallback(); + GrpcRpcManager::Initialize(); + // ScheduleCallback(); } return true; @@ -442,6 +445,10 @@ void Server::ShutdownInternal(gpr_timespec deadline) { grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); cq_.Shutdown(); lock.unlock(); + + GrpcRpcManager::ShutdownRpcManager(); + GrpcRpcManager::Wait(); + // Spin, eating requests until the completion queue is completely shutdown. // If the deadline expires then cancel anything that's pending and keep // spinning forever until the work is actually drained. @@ -587,44 +594,80 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( request_->stream()->call_.PerformOps(this); } +// TODO: sreek - Remove this function void Server::ScheduleCallback() { + GPR_ASSERT(false); + /* { grpc::unique_lock lock(mu_); num_running_cb_++; } thread_pool_->Add(std::bind(&Server::RunRpc, this)); + */ } +// TODO: sreek - Remove this function void Server::RunRpc() { - // Wait for one more incoming rpc. - bool ok; - GPR_TIMER_SCOPE("Server::RunRpc", 0); - auto* mrd = SyncRequest::Wait(&cq_, &ok); - if (mrd) { - ScheduleCallback(); - if (ok) { - SyncRequest::CallData cd(this, mrd); - { - mrd->SetupRequest(); - grpc::unique_lock lock(mu_); - if (!shutdown_) { - mrd->Request(server_, cq_.cq()); - } else { - // destroy the structure that was created - mrd->TeardownRequest(); + GPR_ASSERT(false); + /* + // Wait for one more incoming rpc. + bool ok; + GPR_TIMER_SCOPE("Server::RunRpc", 0); + auto* mrd = SyncRequest::Wait(&cq_, &ok); + if (mrd) { + ScheduleCallback(); + if (ok) { + SyncRequest::CallData cd(this, mrd); + { + mrd->SetupRequest(); + grpc::unique_lock lock(mu_); + if (!shutdown_) { + mrd->Request(server_, cq_.cq()); + } else { + // destroy the structure that was created + mrd->TeardownRequest(); + } } + GPR_TIMER_SCOPE("cd.Run()", 0); + cd.Run(global_callbacks_); + } + } + + { + grpc::unique_lock lock(mu_); + num_running_cb_--; + if (shutdown_) { + callback_cv_.notify_all(); } - GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(global_callbacks_); } + */ +} + +void Server::PollForWork(bool& is_work_found, void** tag) { + is_work_found = true; + *tag = nullptr; + auto* mrd = SyncRequest::Wait(&cq_, &is_work_found); + if (is_work_found) { + *tag = mrd; } +} - { - grpc::unique_lock lock(mu_); - num_running_cb_--; - if (shutdown_) { - callback_cv_.notify_all(); +void Server::DoWork(void* tag) { + auto* mrd = static_cast(tag); + if (mrd) { + SyncRequest::CallData cd(this, mrd); + { + mrd->SetupRequest(); + grpc::unique_lock lock(mu_); + if (!shutdown_) { + mrd->Request(server_, cq_.cq()); + } else { + // destroy the structure that was created + mrd->TeardownRequest(); + } } + GPR_TIMER_SCOPE("cd.Run()", 0); + cd.Run(global_callbacks_); } } -- cgit v1.2.3