diff options
Diffstat (limited to 'src/cpp/thread_manager/thread_manager.cc')
-rw-r--r-- | src/cpp/thread_manager/thread_manager.cc | 54 |
1 files changed, 13 insertions, 41 deletions
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index 107c60f4eb..23264f1b5b 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -20,26 +20,18 @@ #include <climits> #include <mutex> +#include <thread> #include <grpc/support/log.h> -#include <grpc/support/thd.h> namespace grpc { -ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr, bool* valid) +ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr) : thd_mgr_(thd_mgr) { - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - // Make thread creation exclusive with respect to its join happening in // ~WorkerThread(). std::lock_guard<std::mutex> lock(wt_mu_); - *valid = valid_ = thd_mgr->thread_creator_( - &thd_, "worker thread", - [](void* th) { - reinterpret_cast<ThreadManager::WorkerThread*>(th)->Run(); - }, - this, &opt); + thd_ = std::thread(&ThreadManager::WorkerThread::Run, this); } void ThreadManager::WorkerThread::Run() { @@ -50,24 +42,15 @@ void ThreadManager::WorkerThread::Run() { ThreadManager::WorkerThread::~WorkerThread() { // Don't join until the thread is fully constructed. std::lock_guard<std::mutex> lock(wt_mu_); - if (valid_) { - thd_mgr_->thread_joiner_(thd_); - } + thd_.join(); } -ThreadManager::ThreadManager( - int min_pollers, int max_pollers, - std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*, - const gpr_thd_options*)> - thread_creator, - std::function<void(gpr_thd_id)> thread_joiner) +ThreadManager::ThreadManager(int min_pollers, int max_pollers) : shutdown_(false), num_pollers_(0), min_pollers_(min_pollers), max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), - num_threads_(0), - thread_creator_(thread_creator), - thread_joiner_(thread_joiner) {} + num_threads_(0) {} ThreadManager::~ThreadManager() { { @@ -128,9 +111,7 @@ void ThreadManager::Initialize() { for (int i = 0; i < min_pollers_; i++) { // Create a new thread (which ends up calling the MainWorkLoop() function - bool valid; - new WorkerThread(this, &valid); - GPR_ASSERT(valid); // we need to have at least this minimum + new WorkerThread(this); } } @@ -157,27 +138,18 @@ void ThreadManager::MainWorkLoop() { case WORK_FOUND: // If we got work and there are now insufficient pollers, start a new // one - bool resources; if (!shutdown_ && num_pollers_ < min_pollers_) { - bool valid; + num_pollers_++; + num_threads_++; // Drop lock before spawning thread to avoid contention lock.unlock(); - auto* th = new WorkerThread(this, &valid); - lock.lock(); - if (valid) { - num_pollers_++; - num_threads_++; - } else { - delete th; - } - resources = (num_pollers_ > 0); + new WorkerThread(this); } else { - resources = true; + // Drop lock for consistency with above branch + lock.unlock(); } - // Drop lock before any application work - lock.unlock(); // Lock is always released at this point - do the application work - DoWork(tag, ok, resources); + DoWork(tag, ok); // Take the lock again to check post conditions lock.lock(); // If we're shutdown, we should finish at this point. |