diff options
author | 2016-10-13 15:12:55 -0700 | |
---|---|---|
committer | 2016-10-13 15:12:55 -0700 | |
commit | 8f7739bcd6f14e18e2f342cba8e940942f37a48b (patch) | |
tree | 807b13ab438322bcec31612fe434a69779b260d2 /src | |
parent | 96766195a6ed083e5fc239755aa76a2138cd1d7a (diff) |
Rename GrpcRpcManager -> ThreadManager
Diffstat (limited to 'src')
-rw-r--r-- | src/cpp/server/server_builder.cc | 2 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 30 | ||||
-rw-r--r-- | src/cpp/thread_manager/thread_manager.cc (renamed from src/cpp/rpcmanager/grpc_rpc_manager.cc) | 45 | ||||
-rw-r--r-- | src/cpp/thread_manager/thread_manager.h (renamed from src/cpp/rpcmanager/grpc_rpc_manager.h) | 62 |
4 files changed, 71 insertions, 68 deletions
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 6b4ff28972..7ab41ca1f8 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -247,7 +247,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { // server // 2. cqs_: Completion queues added via AddCompletionQueue() call - // All sync cqs (if any) are frequently polled by the GrpcRpcManager + // All sync cqs (if any) are frequently polled by ThreadManager int num_frequently_polled_cqs = sync_server_cqs->size(); for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) { diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 761f76fa12..3352aee822 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -242,12 +242,16 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; -class Server::SyncRequestManager : public GrpcRpcManager { +// Implementation of ThreadManager. Each instance of SyncRequestThreadManager +// manages a pool of threads that poll for incoming Sync RPCs and call the +// appropriate RPC handlers +class Server::SyncRequestThreadManager : public ThreadManager { public: - SyncRequestManager(Server* server, CompletionQueue* server_cq, - std::shared_ptr<GlobalCallbacks> global_callbacks, - int min_pollers, int max_pollers, int cq_timeout_msec) - : GrpcRpcManager(min_pollers, max_pollers), + SyncRequestThreadManager(Server* server, CompletionQueue* server_cq, + std::shared_ptr<GlobalCallbacks> global_callbacks, + int min_pollers, int max_pollers, + int cq_timeout_msec) + : ThreadManager(min_pollers, max_pollers), server_(server), server_cq_(server_cq), cq_timeout_msec_(cq_timeout_msec), @@ -333,7 +337,7 @@ class Server::SyncRequestManager : public GrpcRpcManager { m->Request(server_->c_server(), server_cq_->cq()); } - GrpcRpcManager::Initialize(); + ThreadManager::Initialize(); } } @@ -367,9 +371,9 @@ Server::Server( for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); it++) { - sync_req_mgrs_.emplace_back( - new SyncRequestManager(this, (*it).get(), global_callbacks_, - min_pollers, max_pollers, sync_cq_timeout_msec)); + sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( + this, (*it).get(), global_callbacks_, min_pollers, max_pollers, + sync_cq_timeout_msec)); } grpc_channel_args channel_args; @@ -509,10 +513,10 @@ void Server::ShutdownInternal(gpr_timespec deadline) { ShutdownTag shutdown_tag; // Dummy shutdown tag grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); - // Shutdown all RpcManagers. This will try to gracefully stop all the - // threads in the RpcManagers (once they process any inflight requests) + // Shutdown all ThreadManagers. This will try to gracefully stop all the + // threads in the ThreadManagers (once they process any inflight requests) for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { - (*it)->ShutdownRpcManager(); + (*it)->Shutdown(); } shutdown_cq.Shutdown(); @@ -530,7 +534,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has // successfully shutdown - // Wait for threads in all RpcManagers to terminate + // Wait for threads in all ThreadManagers to terminate for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Wait(); (*it)->ShutdownAndDrainCompletionQueue(); diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/thread_manager/thread_manager.cc index 2d791bb159..93ccfb4d98 100644 --- a/src/cpp/rpcmanager/grpc_rpc_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -40,29 +40,28 @@ namespace grpc { -GrpcRpcManager::GrpcRpcManagerThread::GrpcRpcManagerThread( - GrpcRpcManager* rpc_mgr) - : rpc_mgr_(rpc_mgr), - thd_(new std::thread(&GrpcRpcManager::GrpcRpcManagerThread::Run, this)) {} - -void GrpcRpcManager::GrpcRpcManagerThread::Run() { - rpc_mgr_->MainWorkLoop(); - rpc_mgr_->MarkAsCompleted(this); +ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr) + : thd_mgr_(thd_mgr), + thd_(new std::thread(&ThreadManager::WorkerThread::Run, this)) {} + +void ThreadManager::WorkerThread::Run() { + thd_mgr_->MainWorkLoop(); + thd_mgr_->MarkAsCompleted(this); } -GrpcRpcManager::GrpcRpcManagerThread::~GrpcRpcManagerThread() { +ThreadManager::WorkerThread::~WorkerThread() { thd_->join(); thd_.reset(); } -GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers) +ThreadManager::ThreadManager(int min_pollers, int max_pollers) : shutdown_(false), num_pollers_(0), min_pollers_(min_pollers), max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), num_threads_(0) {} -GrpcRpcManager::~GrpcRpcManager() { +ThreadManager::~ThreadManager() { { std::unique_lock<grpc::mutex> lock(mu_); GPR_ASSERT(num_threads_ == 0); @@ -71,24 +70,24 @@ GrpcRpcManager::~GrpcRpcManager() { CleanupCompletedThreads(); } -void GrpcRpcManager::Wait() { +void ThreadManager::Wait() { std::unique_lock<grpc::mutex> lock(mu_); while (num_threads_ != 0) { shutdown_cv_.wait(lock); } } -void GrpcRpcManager::ShutdownRpcManager() { +void ThreadManager::Shutdown() { std::unique_lock<grpc::mutex> lock(mu_); shutdown_ = true; } -bool GrpcRpcManager::IsShutdown() { +bool ThreadManager::IsShutdown() { std::unique_lock<grpc::mutex> lock(mu_); return shutdown_; } -void GrpcRpcManager::MarkAsCompleted(GrpcRpcManagerThread* thd) { +void ThreadManager::MarkAsCompleted(WorkerThread* thd) { { std::unique_lock<grpc::mutex> list_lock(list_mu_); completed_threads_.push_back(thd); @@ -101,7 +100,7 @@ void GrpcRpcManager::MarkAsCompleted(GrpcRpcManagerThread* thd) { } } -void GrpcRpcManager::CleanupCompletedThreads() { +void ThreadManager::CleanupCompletedThreads() { std::unique_lock<grpc::mutex> lock(list_mu_); for (auto thd = completed_threads_.begin(); thd != completed_threads_.end(); thd = completed_threads_.erase(thd)) { @@ -109,7 +108,7 @@ void GrpcRpcManager::CleanupCompletedThreads() { } } -void GrpcRpcManager::Initialize() { +void ThreadManager::Initialize() { for (int i = 0; i < min_pollers_; i++) { MaybeCreatePoller(); } @@ -118,7 +117,7 @@ void GrpcRpcManager::Initialize() { // If the number of pollers (i.e threads currently blocked in PollForWork()) is // less than max threshold (i.e max_pollers_) and the total number of threads is // below the maximum threshold, we can let the current thread continue as poller -bool GrpcRpcManager::MaybeContinueAsPoller() { +bool ThreadManager::MaybeContinueAsPoller() { std::unique_lock<grpc::mutex> lock(mu_); if (shutdown_ || num_pollers_ > max_pollers_) { return false; @@ -131,18 +130,18 @@ bool GrpcRpcManager::MaybeContinueAsPoller() { // Create a new poller if the current number of pollers i.e num_pollers_ (i.e // threads currently blocked in PollForWork()) is below the threshold (i.e // min_pollers_) and the total number of threads is below the maximum threshold -void GrpcRpcManager::MaybeCreatePoller() { +void ThreadManager::MaybeCreatePoller() { grpc::unique_lock<grpc::mutex> lock(mu_); if (!shutdown_ && num_pollers_ < min_pollers_) { num_pollers_++; num_threads_++; // Create a new thread (which ends up calling the MainWorkLoop() function - new GrpcRpcManagerThread(this); + new WorkerThread(this); } } -void GrpcRpcManager::MainWorkLoop() { +void ThreadManager::MainWorkLoop() { void* tag; bool ok; @@ -170,7 +169,7 @@ void GrpcRpcManager::MainWorkLoop() { } // Note that MaybeCreatePoller does check for shutdown and creates a new - // thread only if GrpcRpcManager is not shutdown + // thread only if ThreadManager is not shutdown if (work_status == WORK_FOUND) { MaybeCreatePoller(); DoWork(tag, ok); @@ -179,7 +178,7 @@ void GrpcRpcManager::MainWorkLoop() { CleanupCompletedThreads(); - // If we are here, either GrpcRpcManager is shutting down or it already has + // If we are here, either ThreadManager is shutting down or it already has // enough threads. } diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.h b/src/cpp/thread_manager/thread_manager.h index 77715c52fd..b667a645af 100644 --- a/src/cpp/rpcmanager/grpc_rpc_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H -#define GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H +#ifndef GRPC_INTERNAL_CPP_THREAD_MANAGER_H +#define GRPC_INTERNAL_CPP_THREAD_MANAGER_H #include <list> #include <memory> @@ -42,10 +42,10 @@ namespace grpc { -class GrpcRpcManager { +class ThreadManager { public: - explicit GrpcRpcManager(int min_pollers, int max_pollers); - virtual ~GrpcRpcManager(); + explicit ThreadManager(int min_pollers, int max_pollers); + virtual ~ThreadManager(); // Initializes and Starts the Rpc Manager threads void Initialize(); @@ -60,17 +60,17 @@ class GrpcRpcManager { // - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A // value of 'false' indicates some implemenation specific error (that is // neither SHUTDOWN nor TIMEOUT) - // - GrpcRpcManager does not interpret the values of 'tag' and 'ok' - // - GrpcRpcManager WILL call DoWork() and pass '*tag' and 'ok' as input to + // - ThreadManager does not interpret the values of 'tag' and 'ok' + // - ThreadManager WILL call DoWork() and pass '*tag' and 'ok' as input to // DoWork() // // If the return value is SHUTDOWN:, - // - GrpcManager WILL NOT call DoWork() and terminates the thead + // - ThreadManager WILL NOT call DoWork() and terminates the thead // // If the return value is TIMEOUT:, - // - GrpcManager WILL NOT call DoWork() - // - GrpcManager MAY terminate the thread depending on the current number of - // active poller threads and mix_pollers/max_pollers settings + // - ThreadManager WILL NOT call DoWork() + // - ThreadManager MAY terminate the thread depending on the current number + // of active poller threads and mix_pollers/max_pollers settings // - Also, the value of timeout is specific to the derived class // implementation virtual WorkStatus PollForWork(void** tag, bool* ok) = 0; @@ -84,40 +84,40 @@ class GrpcRpcManager { // actually finds some work virtual void DoWork(void* tag, bool ok) = 0; - // Mark the GrpcRpcManager as shutdown and begin draining the work. - // This is a non-blocking call and the caller should call Wait(), a blocking - // call which returns only once the shutdown is complete - void ShutdownRpcManager(); + // Mark the ThreadManager as shutdown and begin draining the work. This is a + // non-blocking call and the caller should call Wait(), a blocking call which + // returns only once the shutdown is complete + void Shutdown(); - // Has ShutdownRpcManager() been called + // Has Shutdown() been called bool IsShutdown(); - // A blocking call that returns only after the GrpcRpcManager has shutdown and + // A blocking call that returns only after the ThreadManager has shutdown and // all the threads have drained all the outstanding work void Wait(); private: - // Helper wrapper class around std::thread. This takes a GrpcRpcManager object + // Helper wrapper class around std::thread. This takes a ThreadManager object // and starts a new std::thread to calls the Run() function. // - // The Run() function calls GrpcManager::MainWorkLoop() function and once that - // completes, it marks the GrpcRpcManagerThread completed by calling - // GrpcRpcManager::MarkAsCompleted() - class GrpcRpcManagerThread { + // The Run() function calls ThreadManager::MainWorkLoop() function and once + // that completes, it marks the WorkerThread completed by calling + // ThreadManager::MarkAsCompleted() + class WorkerThread { public: - GrpcRpcManagerThread(GrpcRpcManager* rpc_mgr); - ~GrpcRpcManagerThread(); + WorkerThread(ThreadManager* thd_mgr); + ~WorkerThread(); private: - // Calls rpc_mgr_->MainWorkLoop() and once that completes, calls - // rpc_mgr_>MarkAsCompleted(this) to mark the thread as completed + // Calls thd_mgr_->MainWorkLoop() and once that completes, calls + // thd_mgr_>MarkAsCompleted(this) to mark the thread as completed void Run(); - GrpcRpcManager* rpc_mgr_; + ThreadManager* thd_mgr_; std::unique_ptr<grpc::thread> thd_; }; - // The main funtion in GrpcRpcManager + // The main funtion in ThreadManager void MainWorkLoop(); // Create a new poller if the number of current pollers is less than the @@ -128,7 +128,7 @@ class GrpcRpcManager { // current number of pollers is less than the max_pollers. bool MaybeContinueAsPoller(); - void MarkAsCompleted(GrpcRpcManagerThread* thd); + void MarkAsCompleted(WorkerThread* thd); void CleanupCompletedThreads(); // Protects shutdown_, num_pollers_ and num_threads_ @@ -150,9 +150,9 @@ class GrpcRpcManager { int num_threads_; grpc::mutex list_mu_; - std::list<GrpcRpcManagerThread*> completed_threads_; + std::list<WorkerThread*> completed_threads_; }; } // namespace grpc -#endif // GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H +#endif // GRPC_INTERNAL_CPP_THREAD_MANAGER_H |