diff options
Diffstat (limited to 'src/cpp/thread_manager')
-rw-r--r-- | src/cpp/thread_manager/thread_manager.cc | 29 | ||||
-rw-r--r-- | src/cpp/thread_manager/thread_manager.h | 13 |
2 files changed, 22 insertions, 20 deletions
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index caae4c457d..1450d009e4 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -31,12 +31,13 @@ * */ -#include <grpc++/impl/sync.h> -#include <grpc++/impl/thd.h> -#include <grpc/support/log.h> +#include "src/cpp/thread_manager/thread_manager.h" + #include <climits> +#include <mutex> +#include <thread> -#include "src/cpp/thread_manager/thread_manager.h" +#include <grpc/support/log.h> namespace grpc { @@ -59,7 +60,7 @@ ThreadManager::ThreadManager(int min_pollers, int max_pollers) ThreadManager::~ThreadManager() { { - std::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); GPR_ASSERT(num_threads_ == 0); } @@ -67,29 +68,29 @@ ThreadManager::~ThreadManager() { } void ThreadManager::Wait() { - std::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); while (num_threads_ != 0) { shutdown_cv_.wait(lock); } } void ThreadManager::Shutdown() { - std::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); shutdown_ = true; } bool ThreadManager::IsShutdown() { - std::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); return shutdown_; } void ThreadManager::MarkAsCompleted(WorkerThread* thd) { { - std::unique_lock<grpc::mutex> list_lock(list_mu_); + std::unique_lock<std::mutex> list_lock(list_mu_); completed_threads_.push_back(thd); } - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); num_threads_--; if (num_threads_ == 0) { shutdown_cv_.notify_one(); @@ -97,7 +98,7 @@ void ThreadManager::MarkAsCompleted(WorkerThread* thd) { } void ThreadManager::CleanupCompletedThreads() { - std::unique_lock<grpc::mutex> lock(list_mu_); + std::unique_lock<std::mutex> lock(list_mu_); for (auto thd = completed_threads_.begin(); thd != completed_threads_.end(); thd = completed_threads_.erase(thd)) { delete *thd; @@ -114,7 +115,7 @@ void ThreadManager::Initialize() { // less than max threshold (i.e max_pollers_) and the total number of threads is // below the maximum threshold, we can let the current thread continue as poller bool ThreadManager::MaybeContinueAsPoller() { - std::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); if (shutdown_ || num_pollers_ > max_pollers_) { return false; } @@ -127,7 +128,7 @@ bool ThreadManager::MaybeContinueAsPoller() { // threads currently blocked in PollForWork()) is below the threshold (i.e // min_pollers_) and the total number of threads is below the maximum threshold void ThreadManager::MaybeCreatePoller() { - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); if (!shutdown_ && num_pollers_ < min_pollers_) { num_pollers_++; num_threads_++; @@ -156,7 +157,7 @@ void ThreadManager::MainWorkLoop() { WorkStatus work_status = PollForWork(&tag, &ok); { - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); num_pollers_--; if (work_status == TIMEOUT && num_pollers_ > min_pollers_) { diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index 9cfdb8af25..9c0569c62c 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -34,11 +34,12 @@ #ifndef GRPC_INTERNAL_CPP_THREAD_MANAGER_H #define GRPC_INTERNAL_CPP_THREAD_MANAGER_H +#include <condition_variable> #include <list> #include <memory> +#include <mutex> +#include <thread> -#include <grpc++/impl/sync.h> -#include <grpc++/impl/thd.h> #include <grpc++/support/config.h> namespace grpc { @@ -115,7 +116,7 @@ class ThreadManager { void Run(); ThreadManager* thd_mgr_; - grpc::thread thd_; + std::thread thd_; }; // The main funtion in ThreadManager @@ -134,10 +135,10 @@ class ThreadManager { // Protects shutdown_, num_pollers_ and num_threads_ // TODO: sreek - Change num_pollers and num_threads_ to atomics - grpc::mutex mu_; + std::mutex mu_; bool shutdown_; - grpc::condition_variable shutdown_cv_; + std::condition_variable shutdown_cv_; // Number of threads doing polling int num_pollers_; @@ -150,7 +151,7 @@ class ThreadManager { // currently polling i.e num_pollers_) int num_threads_; - grpc::mutex list_mu_; + std::mutex list_mu_; std::list<WorkerThread*> completed_threads_; }; |