aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/thread_manager
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-04-18 13:08:08 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-04-18 13:08:08 -0700
commita3e87894f2cc921d2c5bbdab250fa23a1ba8b52d (patch)
tree08ab88927183a4e8c1519ea3518037d493edc2cc /src/cpp/thread_manager
parent991c101de89ef8c3855c8503031e3b7423d59153 (diff)
Fix, restore draining
Diffstat (limited to 'src/cpp/thread_manager')
-rw-r--r--src/cpp/thread_manager/thread_manager.cc88
-rw-r--r--src/cpp/thread_manager/thread_manager.h6
2 files changed, 39 insertions, 55 deletions
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc
index b6d5754511..39b9691b5f 100644
--- a/src/cpp/thread_manager/thread_manager.cc
+++ b/src/cpp/thread_manager/thread_manager.cc
@@ -100,6 +100,8 @@ void ThreadManager::MarkAsCompleted(WorkerThread* thd) {
void ThreadManager::CleanupCompletedThreads() {
std::list<WorkerThread*> completed_threads;
{
+ // swap out the completed threads list: allows other threads to clean up
+ // more quickly
std::unique_lock<std::mutex> lock(list_mu_);
completed_threads.swap(completed_threads_);
}
@@ -112,20 +114,6 @@ void ThreadManager::Initialize() {
}
}
-// If the number of pollers (i.e threads currently blocked in PollForWork()) is
-// less than max threshold (i.e max_pollers_) and the total number of threads is
-// below the maximum threshold, we can let the current thread continue as poller
-bool ThreadManager::MaybeContinueAsPoller(bool work_found) {
- std::unique_lock<std::mutex> lock(mu_);
- gpr_log(GPR_DEBUG, "s=%d wf=%d np=%d mp=%d", shutdown_, work_found, num_pollers_, max_pollers_);
- if (shutdown_ || (!work_found && num_pollers_ > max_pollers_)) {
- return false;
- }
-
- num_pollers_++;
- return true;
-}
-
// Create a new poller if the current number of pollers i.e num_pollers_ (i.e
// threads currently blocked in PollForWork()) is below the threshold (i.e
// min_pollers_) and the total number of threads is below the maximum threshold
@@ -143,48 +131,48 @@ void ThreadManager::MaybeCreatePoller() {
}
void ThreadManager::MainWorkLoop() {
- void* tag;
- bool ok;
-
- /*
- 1. Poll for work (i.e PollForWork())
- 2. After returning from PollForWork, reduce the number of pollers by 1. If
- PollForWork() returned a TIMEOUT, then it may indicate that we have more
- polling threads than needed. Check if the number of pollers is greater
- than min_pollers and if so, terminate the thread.
- 3. Since we are short of one poller now, see if a new poller has to be
- created (i.e see MaybeCreatePoller() for more details)
- 4. Do the actual work (DoWork())
- 5. After doing the work, see it this thread can resume polling work (i.e
- see MaybeContinueAsPoller() for more details) */
- WorkStatus work_status;
while (true) {
- bool done = false;
- work_status = PollForWork(&tag, &ok);
+ void* tag;
+ bool ok;
+ WorkStatus work_status = PollForWork(&tag, &ok);
std::unique_lock<std::mutex> lock(mu_);
+ // Reduce the number of pollers by 1 and check what happened with the poll
num_pollers_--;
- gpr_log(GPR_DEBUG, "%p: work_status:%d num_pollers:%d min_pollers:%d max_pollers:%d num_threads:%d shutdown:%d", this, work_status, num_pollers_, min_pollers_, max_pollers_, num_threads_, shutdown_);
+ bool done = false;
switch (work_status) {
- case TIMEOUT:
- if (shutdown_ || num_pollers_ >= max_pollers_) done = true;
- break;
- case SHUTDOWN: done = true; break;
- case WORK_FOUND:
- if (!shutdown_ && num_pollers_ < min_pollers_) {
- num_pollers_++;
- num_threads_++;
- lock.unlock();
- new WorkerThread(this);
- } else {
- lock.unlock();
- }
- DoWork(tag, ok);
- lock.lock();
- if (shutdown_) done = true;
- break;
+ case TIMEOUT:
+ // If we timed out and we have more pollers than we need (or we are
+ // shutdown), finish this thread
+ if (shutdown_ || num_pollers_ > max_pollers_) done = true;
+ break;
+ case SHUTDOWN:
+ // If the thread manager is shutdown, finish this thread
+ done = true;
+ break;
+ case WORK_FOUND:
+ // If we got work and there are now insufficient pollers, start a new
+ // one
+ if (!shutdown_ && num_pollers_ < min_pollers_) {
+ num_pollers_++;
+ num_threads_++;
+ lock.unlock();
+ new WorkerThread(this);
+ } else {
+ lock.unlock();
+ }
+ DoWork(tag, ok);
+ lock.lock();
+ // If we're shutdown, we should finish at this point.
+ // If not, there's a chance that we'll exceed the max poller count: that
+ // is explicitly ok - we'll decrease after one poll timeout, and prevent
+ // some thrashing starting up and shutting down threads
+ if (shutdown_) done = true;
+ break;
}
-if (done) break;
+ // If we decided to finish the thread, break out of the while loop
+ if (done) break;
+ // ... otherwise increase poller count and continue
num_pollers_++;
};
diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h
index 7d832ad16a..c9435011f9 100644
--- a/src/cpp/thread_manager/thread_manager.h
+++ b/src/cpp/thread_manager/thread_manager.h
@@ -96,7 +96,7 @@ class ThreadManager {
// A blocking call that returns only after the ThreadManager has shutdown and
// all the threads have drained all the outstanding work
- void Wait();
+ virtual void Wait();
private:
// Helper wrapper class around std::thread. This takes a ThreadManager object
@@ -126,10 +126,6 @@ class ThreadManager {
// minimum number of pollers needed (i.e min_pollers).
void MaybeCreatePoller();
- // Returns true if the current thread can resume as a poller. i.e if the
- // current number of pollers is less than the max_pollers.
- bool MaybeContinueAsPoller(bool work_found);
-
void MarkAsCompleted(WorkerThread* thd);
void CleanupCompletedThreads();