aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/thread_manager
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-04-18 19:43:14 +0000
committerGravatar Craig Tiller <ctiller@google.com>2017-04-18 19:43:14 +0000
commit991c101de89ef8c3855c8503031e3b7423d59153 (patch)
tree7cbc51484177dd39d16ac93448666247ee98e7e4 /src/cpp/thread_manager
parent68f6a6732771e0265bb656d189de4b76d8268033 (diff)
Initial thread manager fixes
Diffstat (limited to 'src/cpp/thread_manager')
-rw-r--r--src/cpp/thread_manager/thread_manager.cc57
-rw-r--r--src/cpp/thread_manager/thread_manager.h4
2 files changed, 38 insertions, 23 deletions
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc
index 1450d009e4..b6d5754511 100644
--- a/src/cpp/thread_manager/thread_manager.cc
+++ b/src/cpp/thread_manager/thread_manager.cc
@@ -98,11 +98,12 @@ void ThreadManager::MarkAsCompleted(WorkerThread* thd) {
}
void ThreadManager::CleanupCompletedThreads() {
- std::unique_lock<std::mutex> lock(list_mu_);
- for (auto thd = completed_threads_.begin(); thd != completed_threads_.end();
- thd = completed_threads_.erase(thd)) {
- delete *thd;
+ std::list<WorkerThread*> completed_threads;
+ {
+ std::unique_lock<std::mutex> lock(list_mu_);
+ completed_threads.swap(completed_threads_);
}
+ for (auto thd : completed_threads) delete thd;
}
void ThreadManager::Initialize() {
@@ -114,9 +115,10 @@ void ThreadManager::Initialize() {
// If the number of pollers (i.e threads currently blocked in PollForWork()) is
// less than max threshold (i.e max_pollers_) and the total number of threads is
// below the maximum threshold, we can let the current thread continue as poller
-bool ThreadManager::MaybeContinueAsPoller() {
+bool ThreadManager::MaybeContinueAsPoller(bool work_found) {
std::unique_lock<std::mutex> lock(mu_);
- if (shutdown_ || num_pollers_ > max_pollers_) {
+ gpr_log(GPR_DEBUG, "s=%d wf=%d np=%d mp=%d", shutdown_, work_found, num_pollers_, max_pollers_);
+ if (shutdown_ || (!work_found && num_pollers_ > max_pollers_)) {
return false;
}
@@ -133,6 +135,8 @@ void ThreadManager::MaybeCreatePoller() {
num_pollers_++;
num_threads_++;
+ lock.unlock();
+
// Create a new thread (which ends up calling the MainWorkLoop() function
new WorkerThread(this);
}
@@ -153,25 +157,36 @@ void ThreadManager::MainWorkLoop() {
4. Do the actual work (DoWork())
5. After doing the work, see it this thread can resume polling work (i.e
see MaybeContinueAsPoller() for more details) */
- do {
- WorkStatus work_status = PollForWork(&tag, &ok);
+ WorkStatus work_status;
+ while (true) {
+ bool done = false;
+ work_status = PollForWork(&tag, &ok);
- {
- std::unique_lock<std::mutex> lock(mu_);
- num_pollers_--;
-
- if (work_status == TIMEOUT && num_pollers_ > min_pollers_) {
- break;
+ std::unique_lock<std::mutex> lock(mu_);
+ num_pollers_--;
+ gpr_log(GPR_DEBUG, "%p: work_status:%d num_pollers:%d min_pollers:%d max_pollers:%d num_threads:%d shutdown:%d", this, work_status, num_pollers_, min_pollers_, max_pollers_, num_threads_, shutdown_);
+ switch (work_status) {
+ case TIMEOUT:
+ if (shutdown_ || num_pollers_ >= max_pollers_) done = true;
+ break;
+ case SHUTDOWN: done = true; break;
+ case WORK_FOUND:
+ if (!shutdown_ && num_pollers_ < min_pollers_) {
+ num_pollers_++;
+ num_threads_++;
+ lock.unlock();
+ new WorkerThread(this);
+ } else {
+ lock.unlock();
}
- }
-
- // Note that MaybeCreatePoller does check for shutdown and creates a new
- // thread only if ThreadManager is not shutdown
- if (work_status == WORK_FOUND) {
- MaybeCreatePoller();
DoWork(tag, ok);
+ lock.lock();
+ if (shutdown_) done = true;
+ break;
}
- } while (MaybeContinueAsPoller());
+if (done) break;
+ num_pollers_++;
+ };
CleanupCompletedThreads();
diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h
index 9c0569c62c..7d832ad16a 100644
--- a/src/cpp/thread_manager/thread_manager.h
+++ b/src/cpp/thread_manager/thread_manager.h
@@ -89,7 +89,7 @@ class ThreadManager {
// Mark the ThreadManager as shutdown and begin draining the work. This is a
// non-blocking call and the caller should call Wait(), a blocking call which
// returns only once the shutdown is complete
- void Shutdown();
+ virtual void Shutdown();
// Has Shutdown() been called
bool IsShutdown();
@@ -128,7 +128,7 @@ class ThreadManager {
// Returns true if the current thread can resume as a poller. i.e if the
// current number of pollers is less than the max_pollers.
- bool MaybeContinueAsPoller();
+ bool MaybeContinueAsPoller(bool work_found);
void MarkAsCompleted(WorkerThread* thd);
void CleanupCompletedThreads();