aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported/Eigen/CXX11/src/ThreadPool
diff options
context:
space:
mode:
authorGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2016-12-09 13:05:14 -0800
committerGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2016-12-09 13:05:14 -0800
commit2f5b7a199b9cecc9649a1ebec19fc214353b1422 (patch)
tree57918e980e646174b4b1e75f3506d29228319332 /unsupported/Eigen/CXX11/src/ThreadPool
parent3d59a477201d4d4f34b4332fda699c21387cf726 (diff)
Reworked the threadpool cancellation mechanism to not depend on pthread_cancel since it turns out that pthread_cancel doesn't work properly on numerous platforms.
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool')
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h44
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h2
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h3
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h3
4 files changed, 41 insertions, 11 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
index b57863163..0e6a0bf8f 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
@@ -28,6 +28,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
blocked_(0),
spinning_(0),
done_(false),
+ cancelled_(false),
ec_(waiters_) {
waiters_.resize(num_threads);
@@ -61,10 +62,19 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
~NonBlockingThreadPoolTempl() {
done_ = true;
+
// Now if all threads block without work, they will start exiting.
// But note that threads can continue to work arbitrary long,
// block, submit new work, unblock and otherwise live full life.
- ec_.Notify(true);
+ if (!cancelled_) {
+ ec_.Notify(true);
+ } else {
+ // Since we were cancelled, there might be entries in the queues.
+ // Empty them to prevent their destructor from asserting.
+ for (size_t i = 0; i < queues_.size(); i++) {
+ queues_[i]->Flush();
+ }
+ }
// Join threads explicitly to avoid destruction order issues.
for (size_t i = 0; i < threads_.size(); i++) delete threads_[i];
@@ -91,16 +101,25 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
// completes overall computations, which in turn leads to destruction of
// this. We expect that such scenario is prevented by program, that is,
// this is kept alive while any threads can potentially be in Schedule.
- if (!t.f)
+ if (!t.f) {
ec_.Notify(false);
- else
+ }
+ else {
env_.ExecuteTask(t); // Push failed, execute directly.
+ }
}
void Cancel() {
+ cancelled_ = true;
+ done_ = true;
+
+ // Let each thread know it's been cancelled.
for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->Cancel();
+ threads_[i]->OnCancel();
}
+
+ // Wake up the threads without work to let them exit on their own.
+ ec_.Notify(true);
}
int NumThreads() const final {
@@ -135,6 +154,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
std::atomic<unsigned> blocked_;
std::atomic<bool> spinning_;
std::atomic<bool> done_;
+ std::atomic<bool> cancelled_;
EventCount ec_;
// Main worker thread loop.
@@ -145,7 +165,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
pt->thread_id = thread_id;
Queue* q = queues_[thread_id];
EventCount::Waiter* waiter = &waiters_[thread_id];
- for (;;) {
+ while (!cancelled_) {
Task t = q->PopFront();
if (!t.f) {
t = Steal();
@@ -158,7 +178,11 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
// pool. Consider a time based limit instead.
if (!spinning_ && !spinning_.exchange(true)) {
for (int i = 0; i < 1000 && !t.f; i++) {
- t = Steal();
+ if (!cancelled_.load(std::memory_order_relaxed)) {
+ t = Steal();
+ } else {
+ return;
+ }
}
spinning_ = false;
}
@@ -207,8 +231,12 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
int victim = NonEmptyQueueIndex();
if (victim != -1) {
ec_.CancelWait(waiter);
- *t = queues_[victim]->PopBack();
- return true;
+ if (cancelled_) {
+ return false;
+ } else {
+ *t = queues_[victim]->PopBack();
+ return true;
+ }
}
// Number of blocked threads is used as termination condition.
// If we are shutting down and all worker threads blocked without work,
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h
index ab4f85fbf..fb08deb20 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h
@@ -71,7 +71,7 @@ class SimpleThreadPoolTempl : public ThreadPoolInterface {
void Cancel() {
for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->Cancel();
+ threads_[i]->OnCancel();
}
}
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h
index b3c45057d..d94a06416 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h
@@ -23,7 +23,8 @@ struct StlThreadEnvironment {
public:
EnvThread(std::function<void()> f) : thr_(std::move(f)) {}
~EnvThread() { thr_.join(); }
- void Cancel() { EIGEN_THREAD_CANCEL(thr_); }
+ // This function is called when the threadpool is cancelled.
+ void OnCancel() { }
private:
std::thread thr_;
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h
index 5935b7cd8..5f2e1a013 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h
@@ -19,7 +19,8 @@ class ThreadPoolInterface {
// Submits a closure to be run by a thread in the pool.
virtual void Schedule(std::function<void()> fn) = 0;
- // Cancel all the threads in the pool.
+ // Stop processing the closures that have been enqueued.
+ // Currently running closures may still be processed.
virtual void Cancel() = 0;
// Returns the number of threads in the pool.