aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported/Eigen/CXX11/src/ThreadPool
diff options
context:
space:
mode:
authorGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2016-04-14 15:23:10 -0700
committerGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2016-04-14 15:23:10 -0700
commit78a51abc123dff49d6e1b1a6dd5b193e92ae0817 (patch)
tree81b653f81785ed9276ed82de0c0ffc51e4e71873 /unsupported/Eigen/CXX11/src/ThreadPool
parent7718749fee835095f0671fa6ce5d257609f8e56b (diff)
Added a more scalable non blocking thread pool
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool')
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h234
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h232
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h210
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h127
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h38
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h26
6 files changed, 867 insertions, 0 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
new file mode 100644
index 000000000..16eee1a41
--- /dev/null
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
@@ -0,0 +1,234 @@
+// This file is part of Eigen, a lightweight C++ template library
+// for linear algebra.
+//
+// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla
+// Public License v. 2.0. If a copy of the MPL was not distributed
+// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
+#define EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
+
+namespace Eigen {
+
+// EventCount allows to wait for arbitrary predicates in non-blocking
+// algorithms. Think of condition variable, but wait predicate does not need to
+// be protected by a mutex. Usage:
+// Waiting thread does:
+//
+// if (predicate)
+// return act();
+// EventCount::Waiter& w = waiters[my_index];
+// ec.Prewait(&w);
+// if (predicate) {
+// ec.CancelWait(&w);
+// return act();
+// }
+// ec.CommitWait(&w);
+//
+// Notifying thread does:
+//
+// predicate = true;
+// ec.Notify(true);
+//
+// Notify is cheap if there are no waiting threads. Prewait/CommitWait are not
+// cheap, but they are executed only if the preceeding predicate check has
+// failed.
+//
+// Algorihtm outline:
+// There are two main variables: predicate (managed by user) and state_.
+// Operation closely resembles Dekker mutual algorithm:
+// https://en.wikipedia.org/wiki/Dekker%27s_algorithm
+// Waiting thread sets state_ then checks predicate, Notifying thread sets
+// predicate then checks state_. Due to seq_cst fences in between these
+// operations it is guaranteed than either waiter will see predicate change
+// and won't block, or notifying thread will see state_ change and will unblock
+// the waiter, or both. But it can't happen that both threads don't see each
+// other changes, which would lead to deadlock.
+class EventCount {
+ public:
+ class Waiter;
+
+ EventCount(std::vector<Waiter>& waiters) : waiters_(waiters) {
+ eigen_assert(waiters.size() < (1 << kWaiterBits) - 1);
+ // Initialize epoch to something close to overflow to test overflow.
+ state_ = kStackMask | (kEpochMask - kEpochInc * waiters.size() * 2);
+ }
+
+ ~EventCount() {
+ // Ensure there are no waiters.
+ eigen_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask);
+ }
+
+ // Prewait prepares for waiting.
+ // After calling this function the thread must re-check the wait predicate
+ // and call either CancelWait or CommitWait passing the same Waiter object.
+ void Prewait(Waiter* w) {
+ w->epoch = state_.fetch_add(kWaiterInc, std::memory_order_relaxed);
+ std::atomic_thread_fence(std::memory_order_seq_cst);
+ }
+
+ // CommitWait commits waiting.
+ void CommitWait(Waiter* w) {
+ w->state = Waiter::kNotSignaled;
+ // Modification epoch of this waiter.
+ uint64_t epoch =
+ (w->epoch & kEpochMask) +
+ (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
+ uint64_t state = state_.load(std::memory_order_seq_cst);
+ for (;;) {
+ if (int64_t((state & kEpochMask) - epoch) < 0) {
+ // The preceeding waiter has not decided on its fate. Wait until it
+ // calls either CancelWait or CommitWait, or is notified.
+ std::this_thread::yield();
+ state = state_.load(std::memory_order_seq_cst);
+ continue;
+ }
+ // We've already been notified.
+ if (int64_t((state & kEpochMask) - epoch) > 0) return;
+ // Remove this thread from prewait counter and add it to the waiter list.
+ eigen_assert((state & kWaiterMask) != 0);
+ uint64_t newstate = state - kWaiterInc + kEpochInc;
+ newstate = (newstate & ~kStackMask) | (w - &waiters_[0]);
+ if ((state & kStackMask) == kStackMask)
+ w->next.store(nullptr, std::memory_order_relaxed);
+ else
+ w->next.store(&waiters_[state & kStackMask], std::memory_order_relaxed);
+ if (state_.compare_exchange_weak(state, newstate,
+ std::memory_order_release))
+ break;
+ }
+ Park(w);
+ }
+
+ // CancelWait cancels effects of the previous Prewait call.
+ void CancelWait(Waiter* w) {
+ uint64_t epoch =
+ (w->epoch & kEpochMask) +
+ (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
+ uint64_t state = state_.load(std::memory_order_relaxed);
+ for (;;) {
+ if (int64_t((state & kEpochMask) - epoch) < 0) {
+ // The preceeding waiter has not decided on its fate. Wait until it
+ // calls either CancelWait or CommitWait, or is notified.
+ std::this_thread::yield();
+ state = state_.load(std::memory_order_relaxed);
+ continue;
+ }
+ // We've already been notified.
+ if (int64_t((state & kEpochMask) - epoch) > 0) return;
+ // Remove this thread from prewait counter.
+ eigen_assert((state & kWaiterMask) != 0);
+ if (state_.compare_exchange_weak(state, state - kWaiterInc + kEpochInc,
+ std::memory_order_relaxed))
+ return;
+ }
+ }
+
+ // Notify wakes one or all waiting threads.
+ // Must be called after changing the associated wait predicate.
+ void Notify(bool all) {
+ std::atomic_thread_fence(std::memory_order_seq_cst);
+ uint64_t state = state_.load(std::memory_order_acquire);
+ for (;;) {
+ // Easy case: no waiters.
+ if ((state & kStackMask) == kStackMask && (state & kWaiterMask) == 0)
+ return;
+ uint64_t waiters = (state & kWaiterMask) >> kWaiterShift;
+ uint64_t newstate;
+ if (all) {
+ // Reset prewait counter and empty wait list.
+ newstate = (state & kEpochMask) + (kEpochInc * waiters) + kStackMask;
+ } else if (waiters) {
+ // There is a thread in pre-wait state, unblock it.
+ newstate = state + kEpochInc - kWaiterInc;
+ } else {
+ // Pop a waiter from list and unpark it.
+ Waiter* w = &waiters_[state & kStackMask];
+ Waiter* wnext = w->next.load(std::memory_order_relaxed);
+ uint64_t next = kStackMask;
+ if (wnext != nullptr) next = wnext - &waiters_[0];
+ // Note: we don't add kEpochInc here. ABA problem on the lock-free stack
+ // can't happen because a waiter is re-pushed onto the stack only after
+ // it was in the pre-wait state which inevitably leads to epoch
+ // increment.
+ newstate = (state & kEpochMask) + next;
+ }
+ if (state_.compare_exchange_weak(state, newstate,
+ std::memory_order_acquire)) {
+ if (!all && waiters) return; // unblocked pre-wait thread
+ if ((state & kStackMask) == kStackMask) return;
+ Waiter* w = &waiters_[state & kStackMask];
+ if (!all) w->next.store(nullptr, std::memory_order_relaxed);
+ Unpark(w);
+ return;
+ }
+ }
+ }
+
+ class Waiter {
+ friend class EventCount;
+ std::atomic<Waiter*> next;
+ std::mutex mu;
+ std::condition_variable cv;
+ uint64_t epoch;
+ unsigned state;
+ enum {
+ kNotSignaled,
+ kWaiting,
+ kSignaled,
+ };
+ // Prevent false sharing with other Waiter objects in the same vector.
+ char pad_[128];
+ };
+
+ private:
+ // State_ layout:
+ // - low kStackBits is a stack of waiters committed wait.
+ // - next kWaiterBits is count of waiters in prewait state.
+ // - next kEpochBits is modification counter.
+ static const uint64_t kStackBits = 16;
+ static const uint64_t kStackMask = (1ull << kStackBits) - 1;
+ static const uint64_t kWaiterBits = 16;
+ static const uint64_t kWaiterShift = 16;
+ static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1)
+ << kWaiterShift;
+ static const uint64_t kWaiterInc = 1ull << kWaiterBits;
+ static const uint64_t kEpochBits = 32;
+ static const uint64_t kEpochShift = 32;
+ static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift;
+ static const uint64_t kEpochInc = 1ull << kEpochShift;
+ std::atomic<uint64_t> state_;
+ std::vector<Waiter>& waiters_;
+
+ void Park(Waiter* w) {
+ std::unique_lock<std::mutex> lock(w->mu);
+ while (w->state != Waiter::kSignaled) {
+ w->state = Waiter::kWaiting;
+ w->cv.wait(lock);
+ }
+ }
+
+ void Unpark(Waiter* waiters) {
+ Waiter* next = nullptr;
+ for (Waiter* w = waiters; w; w = next) {
+ next = w->next.load(std::memory_order_relaxed);
+ unsigned state;
+ {
+ std::unique_lock<std::mutex> lock(w->mu);
+ state = w->state;
+ w->state = Waiter::kSignaled;
+ }
+ // Avoid notifying if it wasn't waiting.
+ if (state == Waiter::kWaiting) w->cv.notify_one();
+ }
+ }
+
+ EventCount(const EventCount&) = delete;
+ void operator=(const EventCount&) = delete;
+};
+
+} // namespace Eigen
+
+#endif // EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
new file mode 100644
index 000000000..18dec5393
--- /dev/null
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
@@ -0,0 +1,232 @@
+// This file is part of Eigen, a lightweight C++ template library
+// for linear algebra.
+//
+// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla
+// Public License v. 2.0. If a copy of the MPL was not distributed
+// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
+#define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
+
+
+namespace Eigen {
+
+template <typename Environment>
+class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
+ public:
+ typedef typename Environment::Task Task;
+ typedef RunQueue<Task, 1024> Queue;
+
+ NonBlockingThreadPoolTempl(int num_threads, Environment env = Environment())
+ : env_(env),
+ threads_(num_threads),
+ queues_(num_threads),
+ waiters_(num_threads),
+ blocked_(),
+ spinning_(),
+ done_(),
+ ec_(waiters_) {
+ for (int i = 0; i < num_threads; i++) queues_.push_back(new Queue());
+ for (int i = 0; i < num_threads; i++)
+ threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
+ }
+
+ ~NonBlockingThreadPoolTempl() {
+ done_.store(true, std::memory_order_relaxed);
+ // Now if all threads block without work, they will start exiting.
+ // But note that threads can continue to work arbitrary long,
+ // block, submit new work, unblock and otherwise live full life.
+ ec_.Notify(true);
+
+ // Join threads explicitly to avoid destruction order issues.
+ for (size_t i = 0; i < threads_.size(); i++) delete threads_[i];
+ for (size_t i = 0; i < threads_.size(); i++) delete queues_[i];
+ }
+
+ void Schedule(std::function<void()> fn) {
+ Task t = env_.CreateTask(std::move(fn));
+ PerThread* pt = GetPerThread();
+ if (pt->pool == this) {
+ // Worker thread of this pool, push onto the thread's queue.
+ Queue* q = queues_[pt->index];
+ t = q->PushFront(std::move(t));
+ } else {
+ // A free-standing thread (or worker of another pool), push onto a random
+ // queue.
+ Queue* q = queues_[Rand(&pt->rand) % queues_.size()];
+ t = q->PushBack(std::move(t));
+ }
+ // Note: below we touch this after making w available to worker threads.
+ // Strictly speaking, this can lead to a racy-use-after-free. Consider that
+ // Schedule is called from a thread that is neither main thread nor a worker
+ // thread of this pool. Then, execution of w directly or indirectly
+ // completes overall computations, which in turn leads to destruction of
+ // this. We expect that such scenario is prevented by program, that is,
+ // this is kept alive while any threads can potentially be in Schedule.
+ if (!t.f)
+ ec_.Notify(false);
+ else
+ env_.ExecuteTask(t); // Push failed, execute directly.
+ }
+
+ private:
+ typedef typename Environment::EnvThread Thread;
+
+ struct PerThread {
+ bool inited;
+ NonBlockingThreadPoolTempl* pool; // Parent pool, or null for normal threads.
+ unsigned index; // Worker thread index in pool.
+ unsigned rand; // Random generator state.
+ };
+
+ Environment env_;
+ MaxSizeVector<Thread*> threads_;
+ MaxSizeVector<Queue*> queues_;
+ std::vector<EventCount::Waiter> waiters_;
+ std::atomic<unsigned> blocked_;
+ std::atomic<bool> spinning_;
+ std::atomic<bool> done_;
+ EventCount ec_;
+
+ // Main worker thread loop.
+ void WorkerLoop(unsigned index) {
+ PerThread* pt = GetPerThread();
+ pt->pool = this;
+ pt->index = index;
+ Queue* q = queues_[index];
+ EventCount::Waiter* waiter = &waiters_[index];
+ std::vector<Task> stolen;
+ for (;;) {
+ Task t;
+ if (!stolen.empty()) {
+ t = std::move(stolen.back());
+ stolen.pop_back();
+ }
+ if (!t.f) t = q->PopFront();
+ if (!t.f) {
+ if (Steal(&stolen)) {
+ t = std::move(stolen.back());
+ stolen.pop_back();
+ while (stolen.size()) {
+ Task t1 = q->PushFront(std::move(stolen.back()));
+ stolen.pop_back();
+ if (t1.f) {
+ // There is not much we can do in this case. Just execute the
+ // remaining directly.
+ stolen.push_back(std::move(t1));
+ break;
+ }
+ }
+ }
+ }
+ if (t.f) {
+ env_.ExecuteTask(t);
+ continue;
+ }
+ // Leave one thread spinning. This reduces latency.
+ if (!spinning_ && !spinning_.exchange(true)) {
+ bool nowork = true;
+ for (int i = 0; i < 1000; i++) {
+ if (!OutOfWork()) {
+ nowork = false;
+ break;
+ }
+ }
+ spinning_ = false;
+ if (!nowork) continue;
+ }
+ if (!WaitForWork(waiter)) return;
+ }
+ }
+
+ // Steal tries to steal work from other worker threads in best-effort manner.
+ bool Steal(std::vector<Task>* stolen) {
+ if (queues_.size() == 1) return false;
+ PerThread* pt = GetPerThread();
+ unsigned lastq = pt->index;
+ for (unsigned i = queues_.size(); i > 0; i--) {
+ unsigned victim = Rand(&pt->rand) % queues_.size();
+ if (victim == lastq && queues_.size() > 2) {
+ i++;
+ continue;
+ }
+ // Steal half of elements from a victim queue.
+ // It is typical to steal just one element, but that assumes that work is
+ // recursively subdivided in halves so that the stolen element is exactly
+ // half of work. If work elements are equally-sized, then is makes sense
+ // to steal half of elements at once and then work locally for a while.
+ if (queues_[victim]->PopBackHalf(stolen)) return true;
+ lastq = victim;
+ }
+ // Just to make sure that we did not miss anything.
+ for (unsigned i = queues_.size(); i > 0; i--)
+ if (queues_[i - 1]->PopBackHalf(stolen)) return true;
+ return false;
+ }
+
+ // WaitForWork blocks until new work is available, or if it is time to exit.
+ bool WaitForWork(EventCount::Waiter* waiter) {
+ // We already did best-effort emptiness check in Steal, so prepare blocking.
+ ec_.Prewait(waiter);
+ // Now do reliable emptiness check.
+ if (!OutOfWork()) {
+ ec_.CancelWait(waiter);
+ return true;
+ }
+ // Number of blocked threads is used as termination condition.
+ // If we are shutting down and all worker threads blocked without work,
+ // that's we are done.
+ blocked_++;
+ if (done_ && blocked_ == threads_.size()) {
+ ec_.CancelWait(waiter);
+ // Almost done, but need to re-check queues.
+ // Consider that all queues are empty and all worker threads are preempted
+ // right after incrementing blocked_ above. Now a free-standing thread
+ // submits work and calls destructor (which sets done_). If we don't
+ // re-check queues, we will exit leaving the work unexecuted.
+ if (!OutOfWork()) {
+ // Note: we must not pop from queues before we decrement blocked_,
+ // otherwise the following scenario is possible. Consider that instead
+ // of checking for emptiness we popped the only element from queues.
+ // Now other worker threads can start exiting, which is bad if the
+ // work item submits other work. So we just check emptiness here,
+ // which ensures that all worker threads exit at the same time.
+ blocked_--;
+ return true;
+ }
+ // Reached stable termination state.
+ ec_.Notify(true);
+ return false;
+ }
+ ec_.CommitWait(waiter);
+ blocked_--;
+ return true;
+ }
+
+ bool OutOfWork() {
+ for (unsigned i = 0; i < queues_.size(); i++)
+ if (!queues_[i]->Empty()) return false;
+ return true;
+ }
+
+ PerThread* GetPerThread() {
+ static thread_local PerThread per_thread_;
+ PerThread* pt = &per_thread_;
+ if (pt->inited) return pt;
+ pt->inited = true;
+ pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
+ return pt;
+ }
+
+ static unsigned Rand(unsigned* state) {
+ return *state = *state * 1103515245 + 12345;
+ }
+};
+
+typedef NonBlockingThreadPoolTempl<StlThreadEnvironment> NonBlockingThreadPool;
+
+} // namespace Eigen
+
+#endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
new file mode 100644
index 000000000..aaa1d92c7
--- /dev/null
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
@@ -0,0 +1,210 @@
+// This file is part of Eigen, a lightweight C++ template library
+// for linear algebra.
+//
+// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla
+// Public License v. 2.0. If a copy of the MPL was not distributed
+// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
+#define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
+
+
+namespace Eigen {
+
+// RunQueue is a fixed-size, partially non-blocking deque or Work items.
+// Operations on front of the queue must be done by a single thread (owner),
+// operations on back of the queue can be done by multiple threads concurrently.
+//
+// Algorithm outline:
+// All remote threads operating on the queue back are serialized by a mutex.
+// This ensures that at most two threads access state: owner and one remote
+// thread (Size aside). The algorithm ensures that the occupied region of the
+// underlying array is logically continuous (can wraparound, but no stray
+// occupied elements). Owner operates on one end of this region, remote thread
+// operates on the other end. Synchronization between these threads
+// (potential consumption of the last element and take up of the last empty
+// element) happens by means of state variable in each element. States are:
+// empty, busy (in process of insertion of removal) and ready. Threads claim
+// elements (empty->busy and ready->busy transitions) by means of a CAS
+// operation. The finishing transition (busy->empty and busy->ready) are done
+// with plain store as the element is exclusively owned by the current thread.
+//
+// Note: we could permit only pointers as elements, then we would not need
+// separate state variable as null/non-null pointer value would serve as state,
+// but that would require malloc/free per operation for large, complex values
+// (and this is designed to store std::function<()>).
+template <typename Work, unsigned kSize>
+class RunQueue {
+ public:
+ RunQueue() : front_(), back_() {
+ // require power-of-two for fast masking
+ eigen_assert((kSize & (kSize - 1)) == 0);
+ eigen_assert(kSize > 2); // why would you do this?
+ eigen_assert(kSize <= (64 << 10)); // leave enough space for counter
+ for (unsigned i = 0; i < kSize; i++)
+ array_[i].state.store(kEmpty, std::memory_order_relaxed);
+ }
+
+ ~RunQueue() { eigen_assert(Size() == 0); }
+
+ // PushFront inserts w at the beginning of the queue.
+ // If queue is full returns w, otherwise returns default-constructed Work.
+ Work PushFront(Work w) {
+ unsigned front = front_.load(std::memory_order_relaxed);
+ Elem* e = &array_[front & kMask];
+ uint8_t s = e->state.load(std::memory_order_relaxed);
+ if (s != kEmpty ||
+ !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
+ return w;
+ front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
+ e->w = std::move(w);
+ e->state.store(kReady, std::memory_order_release);
+ return Work();
+ }
+
+ // PopFront removes and returns the first element in the queue.
+ // If the queue was empty returns default-constructed Work.
+ Work PopFront() {
+ unsigned front = front_.load(std::memory_order_relaxed);
+ Elem* e = &array_[(front - 1) & kMask];
+ uint8_t s = e->state.load(std::memory_order_relaxed);
+ if (s != kReady ||
+ !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
+ return Work();
+ Work w = std::move(e->w);
+ e->state.store(kEmpty, std::memory_order_release);
+ front = ((front - 1) & kMask2) | (front & ~kMask2);
+ front_.store(front, std::memory_order_relaxed);
+ return w;
+ }
+
+ // PushBack adds w at the end of the queue.
+ // If queue is full returns w, otherwise returns default-constructed Work.
+ Work PushBack(Work w) {
+ std::unique_lock<std::mutex> lock(mutex_);
+ unsigned back = back_.load(std::memory_order_relaxed);
+ Elem* e = &array_[(back - 1) & kMask];
+ uint8_t s = e->state.load(std::memory_order_relaxed);
+ if (s != kEmpty ||
+ !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
+ return w;
+ back = ((back - 1) & kMask2) | (back & ~kMask2);
+ back_.store(back, std::memory_order_relaxed);
+ e->w = std::move(w);
+ e->state.store(kReady, std::memory_order_release);
+ return Work();
+ }
+
+ // PopBack removes and returns the last elements in the queue.
+ // Can fail spuriously.
+ Work PopBack() {
+ if (Empty()) return 0;
+ std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
+ if (!lock) return Work();
+ unsigned back = back_.load(std::memory_order_relaxed);
+ Elem* e = &array_[back & kMask];
+ uint8_t s = e->state.load(std::memory_order_relaxed);
+ if (s != kReady ||
+ !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
+ return Work();
+ Work w = std::move(e->w);
+ e->state.store(kEmpty, std::memory_order_release);
+ back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
+ return w;
+ }
+
+ // PopBackHalf removes and returns half last elements in the queue.
+ // Returns number of elements removed. But can also fail spuriously.
+ unsigned PopBackHalf(std::vector<Work>* result) {
+ if (Empty()) return 0;
+ std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
+ if (!lock) return 0;
+ unsigned back = back_.load(std::memory_order_relaxed);
+ unsigned size = Size();
+ unsigned mid = back;
+ if (size > 1) mid = back + (size - 1) / 2;
+ unsigned n = 0;
+ unsigned start = 0;
+ for (; static_cast<int>(mid - back) >= 0; mid--) {
+ Elem* e = &array_[mid & kMask];
+ uint8_t s = e->state.load(std::memory_order_relaxed);
+ if (n == 0) {
+ if (s != kReady ||
+ !e->state.compare_exchange_strong(s, kBusy,
+ std::memory_order_acquire))
+ continue;
+ start = mid;
+ } else {
+ // Note: no need to store temporal kBusy, we exclusively own these
+ // elements.
+ eigen_assert(s == kReady);
+ }
+ result->push_back(std::move(e->w));
+ e->state.store(kEmpty, std::memory_order_release);
+ n++;
+ }
+ if (n != 0)
+ back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
+ return n;
+ }
+
+ // Size returns current queue size.
+ // Can be called by any thread at any time.
+ unsigned Size() const {
+ // Emptiness plays critical role in thread pool blocking. So we go to great
+ // effort to not produce false positives (claim non-empty queue as empty).
+ for (;;) {
+ // Capture a consistent snapshot of front/tail.
+ unsigned front = front_.load(std::memory_order_acquire);
+ unsigned back = back_.load(std::memory_order_acquire);
+ unsigned front1 = front_.load(std::memory_order_relaxed);
+ if (front != front1) continue;
+ int size = (front & kMask2) - (back & kMask2);
+ // Fix overflow.
+ if (size < 0) size += 2 * kSize;
+ // Order of modification in push/pop is crafted to make the queue look
+ // larger than it is during concurrent modifications. E.g. pop can
+ // decrement size before the corresponding push has incremented it.
+ // So the computed size can be up to kSize + 1, fix it.
+ if (size > kSize) size = kSize;
+ return size;
+ }
+ }
+
+ // Empty tests whether container is empty.
+ // Can be called by any thread at any time.
+ bool Empty() const { return Size() == 0; }
+
+ private:
+ static const unsigned kMask = kSize - 1;
+ static const unsigned kMask2 = (kSize << 1) - 1;
+ struct Elem {
+ std::atomic<uint8_t> state;
+ Work w;
+ };
+ enum {
+ kEmpty,
+ kBusy,
+ kReady,
+ };
+ std::mutex mutex_;
+ // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
+ // front/back, repsectively. The remaining bits contain modification counters
+ // that are incremented on Push operations. This allows us to (1) distinguish
+ // between empty and full conditions (if we would use log(kSize) bits for
+ // position, these conditions would be indistinguishable); (2) obtain
+ // consistent snapshot of front_/back_ for Size operation using the
+ // modification counters.
+ std::atomic<unsigned> front_;
+ std::atomic<unsigned> back_;
+ Elem array_[kSize];
+
+ RunQueue(const RunQueue&) = delete;
+ void operator=(const RunQueue&) = delete;
+};
+
+} // namespace Eigen
+
+#endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h
new file mode 100644
index 000000000..17fd1658b
--- /dev/null
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h
@@ -0,0 +1,127 @@
+// This file is part of Eigen, a lightweight C++ template library
+// for linear algebra.
+//
+// Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla
+// Public License v. 2.0. If a copy of the MPL was not distributed
+// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
+#define EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
+
+namespace Eigen {
+
+// The implementation of the ThreadPool type ensures that the Schedule method
+// runs the functions it is provided in FIFO order when the scheduling is done
+// by a single thread.
+// Environment provides a way to create threads and also allows to intercept
+// task submission and execution.
+template <typename Environment>
+class SimpleThreadPoolTempl : public ThreadPoolInterface {
+ public:
+ // Construct a pool that contains "num_threads" threads.
+ explicit SimpleThreadPoolTempl(int num_threads, Environment env = Environment())
+ : env_(env), threads_(num_threads), waiters_(num_threads) {
+ for (int i = 0; i < num_threads; i++) {
+ threads_.push_back(env.CreateThread([this]() { WorkerLoop(); }));
+ }
+ }
+
+ // Wait until all scheduled work has finished and then destroy the
+ // set of threads.
+ ~SimpleThreadPoolTempl() {
+ {
+ // Wait for all work to get done.
+ std::unique_lock<std::mutex> l(mu_);
+ while (!pending_.empty()) {
+ empty_.wait(l);
+ }
+ exiting_ = true;
+
+ // Wakeup all waiters.
+ for (auto w : waiters_) {
+ w->ready = true;
+ w->task.f = nullptr;
+ w->cv.notify_one();
+ }
+ }
+
+ // Wait for threads to finish.
+ for (auto t : threads_) {
+ delete t;
+ }
+ }
+
+ // Schedule fn() for execution in the pool of threads. The functions are
+ // executed in the order in which they are scheduled.
+ void Schedule(std::function<void()> fn) {
+ Task t = env_.CreateTask(std::move(fn));
+ std::unique_lock<std::mutex> l(mu_);
+ if (waiters_.empty()) {
+ pending_.push_back(std::move(t));
+ } else {
+ Waiter* w = waiters_.back();
+ waiters_.pop_back();
+ w->ready = true;
+ w->task = std::move(t);
+ w->cv.notify_one();
+ }
+ }
+
+ protected:
+ void WorkerLoop() {
+ std::unique_lock<std::mutex> l(mu_);
+ Waiter w;
+ Task t;
+ while (!exiting_) {
+ if (pending_.empty()) {
+ // Wait for work to be assigned to me
+ w.ready = false;
+ waiters_.push_back(&w);
+ while (!w.ready) {
+ w.cv.wait(l);
+ }
+ t = w.task;
+ w.task.f = nullptr;
+ } else {
+ // Pick up pending work
+ t = std::move(pending_.front());
+ pending_.pop_front();
+ if (pending_.empty()) {
+ empty_.notify_all();
+ }
+ }
+ if (t.f) {
+ mu_.unlock();
+ env_.ExecuteTask(t);
+ t.f = nullptr;
+ mu_.lock();
+ }
+ }
+ }
+
+ private:
+ typedef typename Environment::Task Task;
+ typedef typename Environment::EnvThread Thread;
+
+ struct Waiter {
+ std::condition_variable cv;
+ Task task;
+ bool ready;
+ };
+
+ Environment env_;
+ std::mutex mu_;
+ MaxSizeVector<Thread*> threads_; // All threads
+ MaxSizeVector<Waiter*> waiters_; // Stack of waiting threads.
+ std::deque<Task> pending_; // Queue of pending work
+ std::condition_variable empty_; // Signaled on pending_.empty()
+ bool exiting_ = false;
+};
+
+typedef SimpleThreadPoolTempl<StlThreadEnvironment> SimpleThreadPool;
+
+} // namespace Eigen
+
+#endif // EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h
new file mode 100644
index 000000000..d2204ad5b
--- /dev/null
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h
@@ -0,0 +1,38 @@
+// This file is part of Eigen, a lightweight C++ template library
+// for linear algebra.
+//
+// Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla
+// Public License v. 2.0. If a copy of the MPL was not distributed
+// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef EIGEN_CXX11_THREADPOOL_THREAD_ENVIRONMENT_H
+#define EIGEN_CXX11_THREADPOOL_THREAD_ENVIRONMENT_H
+
+namespace Eigen {
+
+struct StlThreadEnvironment {
+ struct Task {
+ std::function<void()> f;
+ };
+
+ // EnvThread constructor must start the thread,
+ // destructor must join the thread.
+ class EnvThread {
+ public:
+ EnvThread(std::function<void()> f) : thr_(f) {}
+ ~EnvThread() { thr_.join(); }
+
+ private:
+ std::thread thr_;
+ };
+
+ EnvThread* CreateThread(std::function<void()> f) { return new EnvThread(f); }
+ Task CreateTask(std::function<void()> f) { return Task{std::move(f)}; }
+ void ExecuteTask(const Task& t) { t.f(); }
+};
+
+} // namespace Eigen
+
+#endif // EIGEN_CXX11_THREADPOOL_THREAD_ENVIRONMENT_H
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h
new file mode 100644
index 000000000..38b40aceb
--- /dev/null
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h
@@ -0,0 +1,26 @@
+// This file is part of Eigen, a lightweight C++ template library
+// for linear algebra.
+//
+// Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla
+// Public License v. 2.0. If a copy of the MPL was not distributed
+// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef EIGEN_CXX11_THREADPOOL_THREAD_POOL_INTERFACE_H
+#define EIGEN_CXX11_THREADPOOL_THREAD_POOL_INTERFACE_H
+
+namespace Eigen {
+
+// This defines an interface that ThreadPoolDevice can take to use
+// custom thread pools underneath.
+class ThreadPoolInterface {
+ public:
+ virtual void Schedule(std::function<void()> fn) = 0;
+
+ virtual ~ThreadPoolInterface() {}
+};
+
+} // namespace Eigen
+
+#endif // EIGEN_CXX11_THREADPOOL_THREAD_POOL_INTERFACE_H