aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-10-13 15:12:55 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-10-13 15:12:55 -0700
commit8f7739bcd6f14e18e2f342cba8e940942f37a48b (patch)
tree807b13ab438322bcec31612fe434a69779b260d2 /src
parent96766195a6ed083e5fc239755aa76a2138cd1d7a (diff)
Rename GrpcRpcManager -> ThreadManager
Diffstat (limited to 'src')
-rw-r--r--src/cpp/server/server_builder.cc2
-rw-r--r--src/cpp/server/server_cc.cc30
-rw-r--r--src/cpp/thread_manager/thread_manager.cc (renamed from src/cpp/rpcmanager/grpc_rpc_manager.cc)45
-rw-r--r--src/cpp/thread_manager/thread_manager.h (renamed from src/cpp/rpcmanager/grpc_rpc_manager.h)62
4 files changed, 71 insertions, 68 deletions
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 6b4ff28972..7ab41ca1f8 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -247,7 +247,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
// server
// 2. cqs_: Completion queues added via AddCompletionQueue() call
- // All sync cqs (if any) are frequently polled by the GrpcRpcManager
+ // All sync cqs (if any) are frequently polled by ThreadManager
int num_frequently_polled_cqs = sync_server_cqs->size();
for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 761f76fa12..3352aee822 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -242,12 +242,16 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
-class Server::SyncRequestManager : public GrpcRpcManager {
+// Implementation of ThreadManager. Each instance of SyncRequestThreadManager
+// manages a pool of threads that poll for incoming Sync RPCs and call the
+// appropriate RPC handlers
+class Server::SyncRequestThreadManager : public ThreadManager {
public:
- SyncRequestManager(Server* server, CompletionQueue* server_cq,
- std::shared_ptr<GlobalCallbacks> global_callbacks,
- int min_pollers, int max_pollers, int cq_timeout_msec)
- : GrpcRpcManager(min_pollers, max_pollers),
+ SyncRequestThreadManager(Server* server, CompletionQueue* server_cq,
+ std::shared_ptr<GlobalCallbacks> global_callbacks,
+ int min_pollers, int max_pollers,
+ int cq_timeout_msec)
+ : ThreadManager(min_pollers, max_pollers),
server_(server),
server_cq_(server_cq),
cq_timeout_msec_(cq_timeout_msec),
@@ -333,7 +337,7 @@ class Server::SyncRequestManager : public GrpcRpcManager {
m->Request(server_->c_server(), server_cq_->cq());
}
- GrpcRpcManager::Initialize();
+ ThreadManager::Initialize();
}
}
@@ -367,9 +371,9 @@ Server::Server(
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
- sync_req_mgrs_.emplace_back(
- new SyncRequestManager(this, (*it).get(), global_callbacks_,
- min_pollers, max_pollers, sync_cq_timeout_msec));
+ sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
+ this, (*it).get(), global_callbacks_, min_pollers, max_pollers,
+ sync_cq_timeout_msec));
}
grpc_channel_args channel_args;
@@ -509,10 +513,10 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
ShutdownTag shutdown_tag; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
- // Shutdown all RpcManagers. This will try to gracefully stop all the
- // threads in the RpcManagers (once they process any inflight requests)
+ // Shutdown all ThreadManagers. This will try to gracefully stop all the
+ // threads in the ThreadManagers (once they process any inflight requests)
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
- (*it)->ShutdownRpcManager();
+ (*it)->Shutdown();
}
shutdown_cq.Shutdown();
@@ -530,7 +534,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown
- // Wait for threads in all RpcManagers to terminate
+ // Wait for threads in all ThreadManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
(*it)->ShutdownAndDrainCompletionQueue();
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/thread_manager/thread_manager.cc
index 2d791bb159..93ccfb4d98 100644
--- a/src/cpp/rpcmanager/grpc_rpc_manager.cc
+++ b/src/cpp/thread_manager/thread_manager.cc
@@ -40,29 +40,28 @@
namespace grpc {
-GrpcRpcManager::GrpcRpcManagerThread::GrpcRpcManagerThread(
- GrpcRpcManager* rpc_mgr)
- : rpc_mgr_(rpc_mgr),
- thd_(new std::thread(&GrpcRpcManager::GrpcRpcManagerThread::Run, this)) {}
-
-void GrpcRpcManager::GrpcRpcManagerThread::Run() {
- rpc_mgr_->MainWorkLoop();
- rpc_mgr_->MarkAsCompleted(this);
+ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr)
+ : thd_mgr_(thd_mgr),
+ thd_(new std::thread(&ThreadManager::WorkerThread::Run, this)) {}
+
+void ThreadManager::WorkerThread::Run() {
+ thd_mgr_->MainWorkLoop();
+ thd_mgr_->MarkAsCompleted(this);
}
-GrpcRpcManager::GrpcRpcManagerThread::~GrpcRpcManagerThread() {
+ThreadManager::WorkerThread::~WorkerThread() {
thd_->join();
thd_.reset();
}
-GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers)
+ThreadManager::ThreadManager(int min_pollers, int max_pollers)
: shutdown_(false),
num_pollers_(0),
min_pollers_(min_pollers),
max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers),
num_threads_(0) {}
-GrpcRpcManager::~GrpcRpcManager() {
+ThreadManager::~ThreadManager() {
{
std::unique_lock<grpc::mutex> lock(mu_);
GPR_ASSERT(num_threads_ == 0);
@@ -71,24 +70,24 @@ GrpcRpcManager::~GrpcRpcManager() {
CleanupCompletedThreads();
}
-void GrpcRpcManager::Wait() {
+void ThreadManager::Wait() {
std::unique_lock<grpc::mutex> lock(mu_);
while (num_threads_ != 0) {
shutdown_cv_.wait(lock);
}
}
-void GrpcRpcManager::ShutdownRpcManager() {
+void ThreadManager::Shutdown() {
std::unique_lock<grpc::mutex> lock(mu_);
shutdown_ = true;
}
-bool GrpcRpcManager::IsShutdown() {
+bool ThreadManager::IsShutdown() {
std::unique_lock<grpc::mutex> lock(mu_);
return shutdown_;
}
-void GrpcRpcManager::MarkAsCompleted(GrpcRpcManagerThread* thd) {
+void ThreadManager::MarkAsCompleted(WorkerThread* thd) {
{
std::unique_lock<grpc::mutex> list_lock(list_mu_);
completed_threads_.push_back(thd);
@@ -101,7 +100,7 @@ void GrpcRpcManager::MarkAsCompleted(GrpcRpcManagerThread* thd) {
}
}
-void GrpcRpcManager::CleanupCompletedThreads() {
+void ThreadManager::CleanupCompletedThreads() {
std::unique_lock<grpc::mutex> lock(list_mu_);
for (auto thd = completed_threads_.begin(); thd != completed_threads_.end();
thd = completed_threads_.erase(thd)) {
@@ -109,7 +108,7 @@ void GrpcRpcManager::CleanupCompletedThreads() {
}
}
-void GrpcRpcManager::Initialize() {
+void ThreadManager::Initialize() {
for (int i = 0; i < min_pollers_; i++) {
MaybeCreatePoller();
}
@@ -118,7 +117,7 @@ void GrpcRpcManager::Initialize() {
// If the number of pollers (i.e threads currently blocked in PollForWork()) is
// less than max threshold (i.e max_pollers_) and the total number of threads is
// below the maximum threshold, we can let the current thread continue as poller
-bool GrpcRpcManager::MaybeContinueAsPoller() {
+bool ThreadManager::MaybeContinueAsPoller() {
std::unique_lock<grpc::mutex> lock(mu_);
if (shutdown_ || num_pollers_ > max_pollers_) {
return false;
@@ -131,18 +130,18 @@ bool GrpcRpcManager::MaybeContinueAsPoller() {
// Create a new poller if the current number of pollers i.e num_pollers_ (i.e
// threads currently blocked in PollForWork()) is below the threshold (i.e
// min_pollers_) and the total number of threads is below the maximum threshold
-void GrpcRpcManager::MaybeCreatePoller() {
+void ThreadManager::MaybeCreatePoller() {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (!shutdown_ && num_pollers_ < min_pollers_) {
num_pollers_++;
num_threads_++;
// Create a new thread (which ends up calling the MainWorkLoop() function
- new GrpcRpcManagerThread(this);
+ new WorkerThread(this);
}
}
-void GrpcRpcManager::MainWorkLoop() {
+void ThreadManager::MainWorkLoop() {
void* tag;
bool ok;
@@ -170,7 +169,7 @@ void GrpcRpcManager::MainWorkLoop() {
}
// Note that MaybeCreatePoller does check for shutdown and creates a new
- // thread only if GrpcRpcManager is not shutdown
+ // thread only if ThreadManager is not shutdown
if (work_status == WORK_FOUND) {
MaybeCreatePoller();
DoWork(tag, ok);
@@ -179,7 +178,7 @@ void GrpcRpcManager::MainWorkLoop() {
CleanupCompletedThreads();
- // If we are here, either GrpcRpcManager is shutting down or it already has
+ // If we are here, either ThreadManager is shutting down or it already has
// enough threads.
}
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.h b/src/cpp/thread_manager/thread_manager.h
index 77715c52fd..b667a645af 100644
--- a/src/cpp/rpcmanager/grpc_rpc_manager.h
+++ b/src/cpp/thread_manager/thread_manager.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H
-#define GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H
+#ifndef GRPC_INTERNAL_CPP_THREAD_MANAGER_H
+#define GRPC_INTERNAL_CPP_THREAD_MANAGER_H
#include <list>
#include <memory>
@@ -42,10 +42,10 @@
namespace grpc {
-class GrpcRpcManager {
+class ThreadManager {
public:
- explicit GrpcRpcManager(int min_pollers, int max_pollers);
- virtual ~GrpcRpcManager();
+ explicit ThreadManager(int min_pollers, int max_pollers);
+ virtual ~ThreadManager();
// Initializes and Starts the Rpc Manager threads
void Initialize();
@@ -60,17 +60,17 @@ class GrpcRpcManager {
// - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A
// value of 'false' indicates some implemenation specific error (that is
// neither SHUTDOWN nor TIMEOUT)
- // - GrpcRpcManager does not interpret the values of 'tag' and 'ok'
- // - GrpcRpcManager WILL call DoWork() and pass '*tag' and 'ok' as input to
+ // - ThreadManager does not interpret the values of 'tag' and 'ok'
+ // - ThreadManager WILL call DoWork() and pass '*tag' and 'ok' as input to
// DoWork()
//
// If the return value is SHUTDOWN:,
- // - GrpcManager WILL NOT call DoWork() and terminates the thead
+ // - ThreadManager WILL NOT call DoWork() and terminates the thead
//
// If the return value is TIMEOUT:,
- // - GrpcManager WILL NOT call DoWork()
- // - GrpcManager MAY terminate the thread depending on the current number of
- // active poller threads and mix_pollers/max_pollers settings
+ // - ThreadManager WILL NOT call DoWork()
+ // - ThreadManager MAY terminate the thread depending on the current number
+ // of active poller threads and mix_pollers/max_pollers settings
// - Also, the value of timeout is specific to the derived class
// implementation
virtual WorkStatus PollForWork(void** tag, bool* ok) = 0;
@@ -84,40 +84,40 @@ class GrpcRpcManager {
// actually finds some work
virtual void DoWork(void* tag, bool ok) = 0;
- // Mark the GrpcRpcManager as shutdown and begin draining the work.
- // This is a non-blocking call and the caller should call Wait(), a blocking
- // call which returns only once the shutdown is complete
- void ShutdownRpcManager();
+ // Mark the ThreadManager as shutdown and begin draining the work. This is a
+ // non-blocking call and the caller should call Wait(), a blocking call which
+ // returns only once the shutdown is complete
+ void Shutdown();
- // Has ShutdownRpcManager() been called
+ // Has Shutdown() been called
bool IsShutdown();
- // A blocking call that returns only after the GrpcRpcManager has shutdown and
+ // A blocking call that returns only after the ThreadManager has shutdown and
// all the threads have drained all the outstanding work
void Wait();
private:
- // Helper wrapper class around std::thread. This takes a GrpcRpcManager object
+ // Helper wrapper class around std::thread. This takes a ThreadManager object
// and starts a new std::thread to calls the Run() function.
//
- // The Run() function calls GrpcManager::MainWorkLoop() function and once that
- // completes, it marks the GrpcRpcManagerThread completed by calling
- // GrpcRpcManager::MarkAsCompleted()
- class GrpcRpcManagerThread {
+ // The Run() function calls ThreadManager::MainWorkLoop() function and once
+ // that completes, it marks the WorkerThread completed by calling
+ // ThreadManager::MarkAsCompleted()
+ class WorkerThread {
public:
- GrpcRpcManagerThread(GrpcRpcManager* rpc_mgr);
- ~GrpcRpcManagerThread();
+ WorkerThread(ThreadManager* thd_mgr);
+ ~WorkerThread();
private:
- // Calls rpc_mgr_->MainWorkLoop() and once that completes, calls
- // rpc_mgr_>MarkAsCompleted(this) to mark the thread as completed
+ // Calls thd_mgr_->MainWorkLoop() and once that completes, calls
+ // thd_mgr_>MarkAsCompleted(this) to mark the thread as completed
void Run();
- GrpcRpcManager* rpc_mgr_;
+ ThreadManager* thd_mgr_;
std::unique_ptr<grpc::thread> thd_;
};
- // The main funtion in GrpcRpcManager
+ // The main funtion in ThreadManager
void MainWorkLoop();
// Create a new poller if the number of current pollers is less than the
@@ -128,7 +128,7 @@ class GrpcRpcManager {
// current number of pollers is less than the max_pollers.
bool MaybeContinueAsPoller();
- void MarkAsCompleted(GrpcRpcManagerThread* thd);
+ void MarkAsCompleted(WorkerThread* thd);
void CleanupCompletedThreads();
// Protects shutdown_, num_pollers_ and num_threads_
@@ -150,9 +150,9 @@ class GrpcRpcManager {
int num_threads_;
grpc::mutex list_mu_;
- std::list<GrpcRpcManagerThread*> completed_threads_;
+ std::list<WorkerThread*> completed_threads_;
};
} // namespace grpc
-#endif // GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H
+#endif // GRPC_INTERNAL_CPP_THREAD_MANAGER_H