diff options
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h')
-rw-r--r-- | unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h | 44 |
1 files changed, 36 insertions, 8 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index b57863163..0e6a0bf8f 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -28,6 +28,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { blocked_(0), spinning_(0), done_(false), + cancelled_(false), ec_(waiters_) { waiters_.resize(num_threads); @@ -61,10 +62,19 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { ~NonBlockingThreadPoolTempl() { done_ = true; + // Now if all threads block without work, they will start exiting. // But note that threads can continue to work arbitrary long, // block, submit new work, unblock and otherwise live full life. - ec_.Notify(true); + if (!cancelled_) { + ec_.Notify(true); + } else { + // Since we were cancelled, there might be entries in the queues. + // Empty them to prevent their destructor from asserting. + for (size_t i = 0; i < queues_.size(); i++) { + queues_[i]->Flush(); + } + } // Join threads explicitly to avoid destruction order issues. for (size_t i = 0; i < threads_.size(); i++) delete threads_[i]; @@ -91,16 +101,25 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // completes overall computations, which in turn leads to destruction of // this. We expect that such scenario is prevented by program, that is, // this is kept alive while any threads can potentially be in Schedule. - if (!t.f) + if (!t.f) { ec_.Notify(false); - else + } + else { env_.ExecuteTask(t); // Push failed, execute directly. + } } void Cancel() { + cancelled_ = true; + done_ = true; + + // Let each thread know it's been cancelled. for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->Cancel(); + threads_[i]->OnCancel(); } + + // Wake up the threads without work to let them exit on their own. + ec_.Notify(true); } int NumThreads() const final { @@ -135,6 +154,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { std::atomic<unsigned> blocked_; std::atomic<bool> spinning_; std::atomic<bool> done_; + std::atomic<bool> cancelled_; EventCount ec_; // Main worker thread loop. @@ -145,7 +165,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { pt->thread_id = thread_id; Queue* q = queues_[thread_id]; EventCount::Waiter* waiter = &waiters_[thread_id]; - for (;;) { + while (!cancelled_) { Task t = q->PopFront(); if (!t.f) { t = Steal(); @@ -158,7 +178,11 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // pool. Consider a time based limit instead. if (!spinning_ && !spinning_.exchange(true)) { for (int i = 0; i < 1000 && !t.f; i++) { - t = Steal(); + if (!cancelled_.load(std::memory_order_relaxed)) { + t = Steal(); + } else { + return; + } } spinning_ = false; } @@ -207,8 +231,12 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { int victim = NonEmptyQueueIndex(); if (victim != -1) { ec_.CancelWait(waiter); - *t = queues_[victim]->PopBack(); - return true; + if (cancelled_) { + return false; + } else { + *t = queues_[victim]->PopBack(); + return true; + } } // Number of blocked threads is used as termination condition. // If we are shutting down and all worker threads blocked without work, |