aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/thread_manager/thread_manager.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/thread_manager/thread_manager.cc')
-rw-r--r--src/cpp/thread_manager/thread_manager.cc54
1 files changed, 13 insertions, 41 deletions
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc
index 107c60f4eb..23264f1b5b 100644
--- a/src/cpp/thread_manager/thread_manager.cc
+++ b/src/cpp/thread_manager/thread_manager.cc
@@ -20,26 +20,18 @@
#include <climits>
#include <mutex>
+#include <thread>
#include <grpc/support/log.h>
-#include <grpc/support/thd.h>
namespace grpc {
-ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr, bool* valid)
+ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr)
: thd_mgr_(thd_mgr) {
- gpr_thd_options opt = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&opt);
-
// Make thread creation exclusive with respect to its join happening in
// ~WorkerThread().
std::lock_guard<std::mutex> lock(wt_mu_);
- *valid = valid_ = thd_mgr->thread_creator_(
- &thd_, "worker thread",
- [](void* th) {
- reinterpret_cast<ThreadManager::WorkerThread*>(th)->Run();
- },
- this, &opt);
+ thd_ = std::thread(&ThreadManager::WorkerThread::Run, this);
}
void ThreadManager::WorkerThread::Run() {
@@ -50,24 +42,15 @@ void ThreadManager::WorkerThread::Run() {
ThreadManager::WorkerThread::~WorkerThread() {
// Don't join until the thread is fully constructed.
std::lock_guard<std::mutex> lock(wt_mu_);
- if (valid_) {
- thd_mgr_->thread_joiner_(thd_);
- }
+ thd_.join();
}
-ThreadManager::ThreadManager(
- int min_pollers, int max_pollers,
- std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*,
- const gpr_thd_options*)>
- thread_creator,
- std::function<void(gpr_thd_id)> thread_joiner)
+ThreadManager::ThreadManager(int min_pollers, int max_pollers)
: shutdown_(false),
num_pollers_(0),
min_pollers_(min_pollers),
max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers),
- num_threads_(0),
- thread_creator_(thread_creator),
- thread_joiner_(thread_joiner) {}
+ num_threads_(0) {}
ThreadManager::~ThreadManager() {
{
@@ -128,9 +111,7 @@ void ThreadManager::Initialize() {
for (int i = 0; i < min_pollers_; i++) {
// Create a new thread (which ends up calling the MainWorkLoop() function
- bool valid;
- new WorkerThread(this, &valid);
- GPR_ASSERT(valid); // we need to have at least this minimum
+ new WorkerThread(this);
}
}
@@ -157,27 +138,18 @@ void ThreadManager::MainWorkLoop() {
case WORK_FOUND:
// If we got work and there are now insufficient pollers, start a new
// one
- bool resources;
if (!shutdown_ && num_pollers_ < min_pollers_) {
- bool valid;
+ num_pollers_++;
+ num_threads_++;
// Drop lock before spawning thread to avoid contention
lock.unlock();
- auto* th = new WorkerThread(this, &valid);
- lock.lock();
- if (valid) {
- num_pollers_++;
- num_threads_++;
- } else {
- delete th;
- }
- resources = (num_pollers_ > 0);
+ new WorkerThread(this);
} else {
- resources = true;
+ // Drop lock for consistency with above branch
+ lock.unlock();
}
- // Drop lock before any application work
- lock.unlock();
// Lock is always released at this point - do the application work
- DoWork(tag, ok, resources);
+ DoWork(tag, ok);
// Take the lock again to check post conditions
lock.lock();
// If we're shutdown, we should finish at this point.