diff options
author | Benoit Steiner <benoit.steiner.goog@gmail.com> | 2016-12-09 13:05:14 -0800 |
---|---|---|
committer | Benoit Steiner <benoit.steiner.goog@gmail.com> | 2016-12-09 13:05:14 -0800 |
commit | 2f5b7a199b9cecc9649a1ebec19fc214353b1422 (patch) | |
tree | 57918e980e646174b4b1e75f3506d29228319332 /unsupported | |
parent | 3d59a477201d4d4f34b4332fda699c21387cf726 (diff) |
Reworked the threadpool cancellation mechanism to not depend on pthread_cancel since it turns out that pthread_cancel doesn't work properly on numerous platforms.
Diffstat (limited to 'unsupported')
5 files changed, 48 insertions, 26 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index b57863163..0e6a0bf8f 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -28,6 +28,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { blocked_(0), spinning_(0), done_(false), + cancelled_(false), ec_(waiters_) { waiters_.resize(num_threads); @@ -61,10 +62,19 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { ~NonBlockingThreadPoolTempl() { done_ = true; + // Now if all threads block without work, they will start exiting. // But note that threads can continue to work arbitrary long, // block, submit new work, unblock and otherwise live full life. - ec_.Notify(true); + if (!cancelled_) { + ec_.Notify(true); + } else { + // Since we were cancelled, there might be entries in the queues. + // Empty them to prevent their destructor from asserting. + for (size_t i = 0; i < queues_.size(); i++) { + queues_[i]->Flush(); + } + } // Join threads explicitly to avoid destruction order issues. for (size_t i = 0; i < threads_.size(); i++) delete threads_[i]; @@ -91,16 +101,25 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // completes overall computations, which in turn leads to destruction of // this. We expect that such scenario is prevented by program, that is, // this is kept alive while any threads can potentially be in Schedule. - if (!t.f) + if (!t.f) { ec_.Notify(false); - else + } + else { env_.ExecuteTask(t); // Push failed, execute directly. + } } void Cancel() { + cancelled_ = true; + done_ = true; + + // Let each thread know it's been cancelled. for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->Cancel(); + threads_[i]->OnCancel(); } + + // Wake up the threads without work to let them exit on their own. + ec_.Notify(true); } int NumThreads() const final { @@ -135,6 +154,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { std::atomic<unsigned> blocked_; std::atomic<bool> spinning_; std::atomic<bool> done_; + std::atomic<bool> cancelled_; EventCount ec_; // Main worker thread loop. @@ -145,7 +165,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { pt->thread_id = thread_id; Queue* q = queues_[thread_id]; EventCount::Waiter* waiter = &waiters_[thread_id]; - for (;;) { + while (!cancelled_) { Task t = q->PopFront(); if (!t.f) { t = Steal(); @@ -158,7 +178,11 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // pool. Consider a time based limit instead. if (!spinning_ && !spinning_.exchange(true)) { for (int i = 0; i < 1000 && !t.f; i++) { - t = Steal(); + if (!cancelled_.load(std::memory_order_relaxed)) { + t = Steal(); + } else { + return; + } } spinning_ = false; } @@ -207,8 +231,12 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { int victim = NonEmptyQueueIndex(); if (victim != -1) { ec_.CancelWait(waiter); - *t = queues_[victim]->PopBack(); - return true; + if (cancelled_) { + return false; + } else { + *t = queues_[victim]->PopBack(); + return true; + } } // Number of blocked threads is used as termination condition. // If we are shutting down and all worker threads blocked without work, diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h index ab4f85fbf..fb08deb20 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h @@ -71,7 +71,7 @@ class SimpleThreadPoolTempl : public ThreadPoolInterface { void Cancel() { for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->Cancel(); + threads_[i]->OnCancel(); } } diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h index b3c45057d..d94a06416 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h @@ -23,7 +23,8 @@ struct StlThreadEnvironment { public: EnvThread(std::function<void()> f) : thr_(std::move(f)) {} ~EnvThread() { thr_.join(); } - void Cancel() { EIGEN_THREAD_CANCEL(thr_); } + // This function is called when the threadpool is cancelled. + void OnCancel() { } private: std::thread thr_; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h index 5935b7cd8..5f2e1a013 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h @@ -19,7 +19,8 @@ class ThreadPoolInterface { // Submits a closure to be run by a thread in the pool. virtual void Schedule(std::function<void()> fn) = 0; - // Cancel all the threads in the pool. + // Stop processing the closures that have been enqueued. + // Currently running closures may still be processed. virtual void Cancel() = 0; // Returns the number of threads in the pool. diff --git a/unsupported/test/cxx11_non_blocking_thread_pool.cpp b/unsupported/test/cxx11_non_blocking_thread_pool.cpp index fe30551ce..80d0ee080 100644 --- a/unsupported/test/cxx11_non_blocking_thread_pool.cpp +++ b/unsupported/test/cxx11_non_blocking_thread_pool.cpp @@ -104,23 +104,15 @@ static void test_parallelism() static void test_cancel() { - NonBlockingThreadPool tp(4); + NonBlockingThreadPool tp(2); -#ifdef EIGEN_SUPPORTS_THREAD_CANCELLATION - std::cout << "Thread cancellation is supported on this platform" << std::endl; - - // Put 2 threads to sleep for much longer than the default test timeout. - tp.Schedule([]() { sleep(3600); } ); - tp.Schedule([]() { sleep(3600 * 24); } ); -#else - std::cout << "Thread cancellation is a no-op on this platform" << std::endl; - - // Make 2 threads sleep for a short period of time - tp.Schedule([]() { sleep(1); } ); - tp.Schedule([]() { sleep(2); } ); -#endif + // Schedule a large number of closure that each sleeps for one second. This + // will keep the thread pool busy for much longer than the default test timeout. + for (int i = 0; i < 1000; ++i) { + tp.Schedule([]() { sleep(2); }); + } - // Call cancel: + // Cancel the processing of all the closures that are still pending. tp.Cancel(); } |