diff options
author | Craig Tiller <ctiller@google.com> | 2017-04-18 19:43:14 +0000 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-04-18 19:43:14 +0000 |
commit | 991c101de89ef8c3855c8503031e3b7423d59153 (patch) | |
tree | 7cbc51484177dd39d16ac93448666247ee98e7e4 /src/cpp/thread_manager | |
parent | 68f6a6732771e0265bb656d189de4b76d8268033 (diff) |
Initial thread manager fixes
Diffstat (limited to 'src/cpp/thread_manager')
-rw-r--r-- | src/cpp/thread_manager/thread_manager.cc | 57 | ||||
-rw-r--r-- | src/cpp/thread_manager/thread_manager.h | 4 |
2 files changed, 38 insertions, 23 deletions
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index 1450d009e4..b6d5754511 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -98,11 +98,12 @@ void ThreadManager::MarkAsCompleted(WorkerThread* thd) { } void ThreadManager::CleanupCompletedThreads() { - std::unique_lock<std::mutex> lock(list_mu_); - for (auto thd = completed_threads_.begin(); thd != completed_threads_.end(); - thd = completed_threads_.erase(thd)) { - delete *thd; + std::list<WorkerThread*> completed_threads; + { + std::unique_lock<std::mutex> lock(list_mu_); + completed_threads.swap(completed_threads_); } + for (auto thd : completed_threads) delete thd; } void ThreadManager::Initialize() { @@ -114,9 +115,10 @@ void ThreadManager::Initialize() { // If the number of pollers (i.e threads currently blocked in PollForWork()) is // less than max threshold (i.e max_pollers_) and the total number of threads is // below the maximum threshold, we can let the current thread continue as poller -bool ThreadManager::MaybeContinueAsPoller() { +bool ThreadManager::MaybeContinueAsPoller(bool work_found) { std::unique_lock<std::mutex> lock(mu_); - if (shutdown_ || num_pollers_ > max_pollers_) { + gpr_log(GPR_DEBUG, "s=%d wf=%d np=%d mp=%d", shutdown_, work_found, num_pollers_, max_pollers_); + if (shutdown_ || (!work_found && num_pollers_ > max_pollers_)) { return false; } @@ -133,6 +135,8 @@ void ThreadManager::MaybeCreatePoller() { num_pollers_++; num_threads_++; + lock.unlock(); + // Create a new thread (which ends up calling the MainWorkLoop() function new WorkerThread(this); } @@ -153,25 +157,36 @@ void ThreadManager::MainWorkLoop() { 4. Do the actual work (DoWork()) 5. After doing the work, see it this thread can resume polling work (i.e see MaybeContinueAsPoller() for more details) */ - do { - WorkStatus work_status = PollForWork(&tag, &ok); + WorkStatus work_status; + while (true) { + bool done = false; + work_status = PollForWork(&tag, &ok); - { - std::unique_lock<std::mutex> lock(mu_); - num_pollers_--; - - if (work_status == TIMEOUT && num_pollers_ > min_pollers_) { - break; + std::unique_lock<std::mutex> lock(mu_); + num_pollers_--; + gpr_log(GPR_DEBUG, "%p: work_status:%d num_pollers:%d min_pollers:%d max_pollers:%d num_threads:%d shutdown:%d", this, work_status, num_pollers_, min_pollers_, max_pollers_, num_threads_, shutdown_); + switch (work_status) { + case TIMEOUT: + if (shutdown_ || num_pollers_ >= max_pollers_) done = true; + break; + case SHUTDOWN: done = true; break; + case WORK_FOUND: + if (!shutdown_ && num_pollers_ < min_pollers_) { + num_pollers_++; + num_threads_++; + lock.unlock(); + new WorkerThread(this); + } else { + lock.unlock(); } - } - - // Note that MaybeCreatePoller does check for shutdown and creates a new - // thread only if ThreadManager is not shutdown - if (work_status == WORK_FOUND) { - MaybeCreatePoller(); DoWork(tag, ok); + lock.lock(); + if (shutdown_) done = true; + break; } - } while (MaybeContinueAsPoller()); +if (done) break; + num_pollers_++; + }; CleanupCompletedThreads(); diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index 9c0569c62c..7d832ad16a 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -89,7 +89,7 @@ class ThreadManager { // Mark the ThreadManager as shutdown and begin draining the work. This is a // non-blocking call and the caller should call Wait(), a blocking call which // returns only once the shutdown is complete - void Shutdown(); + virtual void Shutdown(); // Has Shutdown() been called bool IsShutdown(); @@ -128,7 +128,7 @@ class ThreadManager { // Returns true if the current thread can resume as a poller. i.e if the // current number of pollers is less than the max_pollers. - bool MaybeContinueAsPoller(); + bool MaybeContinueAsPoller(bool work_found); void MarkAsCompleted(WorkerThread* thd); void CleanupCompletedThreads(); |