aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported/Eigen/CXX11/src/ThreadPool
diff options
context:
space:
mode:
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool')
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/CMakeLists.txt6
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h9
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h206
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h4
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h37
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h4
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h7
7 files changed, 171 insertions, 102 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/CMakeLists.txt b/unsupported/Eigen/CXX11/src/ThreadPool/CMakeLists.txt
deleted file mode 100644
index 88fef50c6..000000000
--- a/unsupported/Eigen/CXX11/src/ThreadPool/CMakeLists.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-FILE(GLOB Eigen_CXX11_ThreadPool_SRCS "*.h")
-
-INSTALL(FILES
- ${Eigen_CXX11_ThreadPool_SRCS}
- DESTINATION ${INCLUDE_INSTALL_DIR}/unsupported/Eigen/CXX11/src/ThreadPool COMPONENT Devel
- )
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
index 6dd64f185..71d55552d 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
@@ -50,7 +50,7 @@ class EventCount {
public:
class Waiter;
- EventCount(std::vector<Waiter>& waiters) : waiters_(waiters) {
+ EventCount(MaxSizeVector<Waiter>& waiters) : waiters_(waiters) {
eigen_assert(waiters.size() < (1 << kWaiterBits) - 1);
// Initialize epoch to something close to overflow to test overflow.
state_ = kStackMask | (kEpochMask - kEpochInc * waiters.size() * 2);
@@ -169,7 +169,8 @@ class EventCount {
class Waiter {
friend class EventCount;
- std::atomic<Waiter*> next;
+ // Align to 128 byte boundary to prevent false sharing with other Waiter objects in the same vector.
+ EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<Waiter*> next;
std::mutex mu;
std::condition_variable cv;
uint64_t epoch;
@@ -179,8 +180,6 @@ class EventCount {
kWaiting,
kSignaled,
};
- // Prevent false sharing with other Waiter objects in the same vector.
- char pad_[128];
};
private:
@@ -200,7 +199,7 @@ class EventCount {
static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift;
static const uint64_t kEpochInc = 1ull << kEpochShift;
std::atomic<uint64_t> state_;
- std::vector<Waiter>& waiters_;
+ MaxSizeVector<Waiter>& waiters_;
void Park(Waiter* w) {
std::unique_lock<std::mutex> lock(w->mu);
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
index 1c471a19f..354bce52a 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
@@ -23,18 +23,44 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
: env_(env),
threads_(num_threads),
queues_(num_threads),
+ coprimes_(num_threads),
waiters_(num_threads),
- blocked_(),
- spinning_(),
- done_(),
+ blocked_(0),
+ spinning_(0),
+ done_(false),
ec_(waiters_) {
- for (int i = 0; i < num_threads; i++) queues_.push_back(new Queue());
- for (int i = 0; i < num_threads; i++)
+ waiters_.resize(num_threads);
+
+ // Calculate coprimes of num_threads.
+ // Coprimes are used for a random walk over all threads in Steal
+ // and NonEmptyQueueIndex. Iteration is based on the fact that if we take
+ // a walk starting thread index t and calculate num_threads - 1 subsequent
+ // indices as (t + coprime) % num_threads, we will cover all threads without
+ // repetitions (effectively getting a presudo-random permutation of thread
+ // indices).
+ for (int i = 1; i <= num_threads; i++) {
+ unsigned a = i;
+ unsigned b = num_threads;
+ // If GCD(a, b) == 1, then a and b are coprimes.
+ while (b != 0) {
+ unsigned tmp = a;
+ a = b;
+ b = tmp % b;
+ }
+ if (a == 1) {
+ coprimes_.push_back(i);
+ }
+ }
+ for (int i = 0; i < num_threads; i++) {
+ queues_.push_back(new Queue());
+ }
+ for (int i = 0; i < num_threads; i++) {
threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
+ }
}
~NonBlockingThreadPoolTempl() {
- done_.store(true, std::memory_order_relaxed);
+ done_ = true;
// Now if all threads block without work, they will start exiting.
// But note that threads can continue to work arbitrary long,
// block, submit new work, unblock and otherwise live full life.
@@ -50,7 +76,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
PerThread* pt = GetPerThread();
if (pt->pool == this) {
// Worker thread of this pool, push onto the thread's queue.
- Queue* q = queues_[pt->index];
+ Queue* q = queues_[pt->thread_id];
t = q->PushFront(std::move(t));
} else {
// A free-standing thread (or worker of another pool), push onto a random
@@ -71,108 +97,111 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
env_.ExecuteTask(t); // Push failed, execute directly.
}
+ int NumThreads() const final {
+ return static_cast<int>(threads_.size());
+ }
+
+ int CurrentThreadId() const final {
+ const PerThread* pt =
+ const_cast<NonBlockingThreadPoolTempl*>(this)->GetPerThread();
+ if (pt->pool == this) {
+ return pt->thread_id;
+ } else {
+ return -1;
+ }
+ }
+
private:
typedef typename Environment::EnvThread Thread;
struct PerThread {
- bool inited;
+ constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { }
NonBlockingThreadPoolTempl* pool; // Parent pool, or null for normal threads.
- unsigned index; // Worker thread index in pool.
- unsigned rand; // Random generator state.
+ uint64_t rand; // Random generator state.
+ int thread_id; // Worker thread index in pool.
};
Environment env_;
MaxSizeVector<Thread*> threads_;
MaxSizeVector<Queue*> queues_;
- std::vector<EventCount::Waiter> waiters_;
+ MaxSizeVector<unsigned> coprimes_;
+ MaxSizeVector<EventCount::Waiter> waiters_;
std::atomic<unsigned> blocked_;
std::atomic<bool> spinning_;
std::atomic<bool> done_;
EventCount ec_;
// Main worker thread loop.
- void WorkerLoop(unsigned index) {
+ void WorkerLoop(int thread_id) {
PerThread* pt = GetPerThread();
pt->pool = this;
- pt->index = index;
- Queue* q = queues_[index];
- EventCount::Waiter* waiter = &waiters_[index];
- std::vector<Task> stolen;
+ pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
+ pt->thread_id = thread_id;
+ Queue* q = queues_[thread_id];
+ EventCount::Waiter* waiter = &waiters_[thread_id];
for (;;) {
- Task t;
- if (!stolen.empty()) {
- t = std::move(stolen.back());
- stolen.pop_back();
- }
- if (!t.f) t = q->PopFront();
+ Task t = q->PopFront();
if (!t.f) {
- if (Steal(&stolen)) {
- t = std::move(stolen.back());
- stolen.pop_back();
- while (stolen.size()) {
- Task t1 = q->PushFront(std::move(stolen.back()));
- stolen.pop_back();
- if (t1.f) {
- // There is not much we can do in this case. Just execute the
- // remaining directly.
- stolen.push_back(std::move(t1));
- break;
+ t = Steal();
+ if (!t.f) {
+ // Leave one thread spinning. This reduces latency.
+ // TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it.
+ // Also, the time it takes to attempt to steal work 1000 times depends
+ // on the size of the thread pool. However the speed at which the user
+ // of the thread pool submit tasks is independent of the size of the
+ // pool. Consider a time based limit instead.
+ if (!spinning_ && !spinning_.exchange(true)) {
+ for (int i = 0; i < 1000 && !t.f; i++) {
+ t = Steal();
+ }
+ spinning_ = false;
+ }
+ if (!t.f) {
+ if (!WaitForWork(waiter, &t)) {
+ return;
}
}
}
}
if (t.f) {
env_.ExecuteTask(t);
- continue;
}
- // Leave one thread spinning. This reduces latency.
- if (!spinning_ && !spinning_.exchange(true)) {
- bool nowork = true;
- for (int i = 0; i < 1000; i++) {
- if (!OutOfWork()) {
- nowork = false;
- break;
- }
- }
- spinning_ = false;
- if (!nowork) continue;
- }
- if (!WaitForWork(waiter)) return;
}
}
// Steal tries to steal work from other worker threads in best-effort manner.
- bool Steal(std::vector<Task>* stolen) {
- if (queues_.size() == 1) return false;
+ Task Steal() {
PerThread* pt = GetPerThread();
- unsigned lastq = pt->index;
- for (unsigned i = queues_.size(); i > 0; i--) {
- unsigned victim = Rand(&pt->rand) % queues_.size();
- if (victim == lastq && queues_.size() > 2) {
- i++;
- continue;
+ const size_t size = queues_.size();
+ unsigned r = Rand(&pt->rand);
+ unsigned inc = coprimes_[r % coprimes_.size()];
+ unsigned victim = r % size;
+ for (unsigned i = 0; i < size; i++) {
+ Task t = queues_[victim]->PopBack();
+ if (t.f) {
+ return t;
+ }
+ victim += inc;
+ if (victim >= size) {
+ victim -= size;
}
- // Steal half of elements from a victim queue.
- // It is typical to steal just one element, but that assumes that work is
- // recursively subdivided in halves so that the stolen element is exactly
- // half of work. If work elements are equally-sized, then is makes sense
- // to steal half of elements at once and then work locally for a while.
- if (queues_[victim]->PopBackHalf(stolen)) return true;
- lastq = victim;
}
- // Just to make sure that we did not miss anything.
- for (unsigned i = queues_.size(); i > 0; i--)
- if (queues_[i - 1]->PopBackHalf(stolen)) return true;
- return false;
+ return Task();
}
- // WaitForWork blocks until new work is available, or if it is time to exit.
- bool WaitForWork(EventCount::Waiter* waiter) {
- // We already did best-effort emptiness check in Steal, so prepare blocking.
+ // WaitForWork blocks until new work is available (returns true), or if it is
+ // time to exit (returns false). Can optionally return a task to execute in t
+ // (in such case t.f != nullptr on return).
+ bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
+ eigen_assert(!t->f);
+ // We already did best-effort emptiness check in Steal, so prepare for
+ // blocking.
ec_.Prewait(waiter);
- // Now do reliable emptiness check.
- if (!OutOfWork()) {
+ // Now do a reliable emptiness check.
+ int victim = NonEmptyQueueIndex();
+ if (victim != -1) {
ec_.CancelWait(waiter);
+ *t = queues_[victim]->PopBack();
return true;
}
// Number of blocked threads is used as termination condition.
@@ -186,7 +215,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
// right after incrementing blocked_ above. Now a free-standing thread
// submits work and calls destructor (which sets done_). If we don't
// re-check queues, we will exit leaving the work unexecuted.
- if (!OutOfWork()) {
+ if (NonEmptyQueueIndex() != -1) {
// Note: we must not pop from queues before we decrement blocked_,
// otherwise the following scenario is possible. Consider that instead
// of checking for emptiness we popped the only element from queues.
@@ -205,23 +234,36 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
return true;
}
- bool OutOfWork() {
- for (unsigned i = 0; i < queues_.size(); i++)
- if (!queues_[i]->Empty()) return false;
- return true;
+ int NonEmptyQueueIndex() {
+ PerThread* pt = GetPerThread();
+ const size_t size = queues_.size();
+ unsigned r = Rand(&pt->rand);
+ unsigned inc = coprimes_[r % coprimes_.size()];
+ unsigned victim = r % size;
+ for (unsigned i = 0; i < size; i++) {
+ if (!queues_[victim]->Empty()) {
+ return victim;
+ }
+ victim += inc;
+ if (victim >= size) {
+ victim -= size;
+ }
+ }
+ return -1;
}
- PerThread* GetPerThread() {
+ static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
EIGEN_THREAD_LOCAL PerThread per_thread_;
PerThread* pt = &per_thread_;
- if (pt->inited) return pt;
- pt->inited = true;
- pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
return pt;
}
- static unsigned Rand(unsigned* state) {
- return *state = *state * 1103515245 + 12345;
+ static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
+ uint64_t current = *state;
+ // Update the internal state
+ *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
+ // Generate the random output (using the PCG-XSH-RS scheme)
+ return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
}
};
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
index 0544a6e15..05ed76cbe 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
@@ -38,7 +38,7 @@ namespace Eigen {
template <typename Work, unsigned kSize>
class RunQueue {
public:
- RunQueue() : front_(), back_() {
+ RunQueue() : front_(0), back_(0) {
// require power-of-two for fast masking
eigen_assert((kSize & (kSize - 1)) == 0);
eigen_assert(kSize > 2); // why would you do this?
@@ -100,7 +100,7 @@ class RunQueue {
// PopBack removes and returns the last elements in the queue.
// Can fail spuriously.
Work PopBack() {
- if (Empty()) return 0;
+ if (Empty()) return Work();
std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
if (!lock) return Work();
unsigned back = back_.load(std::memory_order_relaxed);
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h
index 17fd1658b..e75d0f467 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h
@@ -24,7 +24,7 @@ class SimpleThreadPoolTempl : public ThreadPoolInterface {
explicit SimpleThreadPoolTempl(int num_threads, Environment env = Environment())
: env_(env), threads_(num_threads), waiters_(num_threads) {
for (int i = 0; i < num_threads; i++) {
- threads_.push_back(env.CreateThread([this]() { WorkerLoop(); }));
+ threads_.push_back(env.CreateThread([this, i]() { WorkerLoop(i); }));
}
}
@@ -55,7 +55,7 @@ class SimpleThreadPoolTempl : public ThreadPoolInterface {
// Schedule fn() for execution in the pool of threads. The functions are
// executed in the order in which they are scheduled.
- void Schedule(std::function<void()> fn) {
+ void Schedule(std::function<void()> fn) final {
Task t = env_.CreateTask(std::move(fn));
std::unique_lock<std::mutex> l(mu_);
if (waiters_.empty()) {
@@ -69,9 +69,25 @@ class SimpleThreadPoolTempl : public ThreadPoolInterface {
}
}
+ int NumThreads() const final {
+ return static_cast<int>(threads_.size());
+ }
+
+ int CurrentThreadId() const final {
+ const PerThread* pt = this->GetPerThread();
+ if (pt->pool == this) {
+ return pt->thread_id;
+ } else {
+ return -1;
+ }
+ }
+
protected:
- void WorkerLoop() {
+ void WorkerLoop(int thread_id) {
std::unique_lock<std::mutex> l(mu_);
+ PerThread* pt = GetPerThread();
+ pt->pool = this;
+ pt->thread_id = thread_id;
Waiter w;
Task t;
while (!exiting_) {
@@ -111,13 +127,24 @@ class SimpleThreadPoolTempl : public ThreadPoolInterface {
bool ready;
};
+ struct PerThread {
+ constexpr PerThread() : pool(NULL), thread_id(-1) { }
+ SimpleThreadPoolTempl* pool; // Parent pool, or null for normal threads.
+ int thread_id; // Worker thread index in pool.
+ };
+
Environment env_;
std::mutex mu_;
MaxSizeVector<Thread*> threads_; // All threads
MaxSizeVector<Waiter*> waiters_; // Stack of waiting threads.
- std::deque<Task> pending_; // Queue of pending work
- std::condition_variable empty_; // Signaled on pending_.empty()
+ std::deque<Task> pending_; // Queue of pending work
+ std::condition_variable empty_; // Signaled on pending_.empty()
bool exiting_ = false;
+
+ PerThread* GetPerThread() const {
+ EIGEN_THREAD_LOCAL PerThread per_thread;
+ return &per_thread;
+ }
};
typedef SimpleThreadPoolTempl<StlThreadEnvironment> SimpleThreadPool;
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h
index d2204ad5b..399f95cc1 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h
@@ -21,14 +21,14 @@ struct StlThreadEnvironment {
// destructor must join the thread.
class EnvThread {
public:
- EnvThread(std::function<void()> f) : thr_(f) {}
+ EnvThread(std::function<void()> f) : thr_(std::move(f)) {}
~EnvThread() { thr_.join(); }
private:
std::thread thr_;
};
- EnvThread* CreateThread(std::function<void()> f) { return new EnvThread(f); }
+ EnvThread* CreateThread(std::function<void()> f) { return new EnvThread(std::move(f)); }
Task CreateTask(std::function<void()> f) { return Task{std::move(f)}; }
void ExecuteTask(const Task& t) { t.f(); }
};
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h
index 38b40aceb..a65ee97c9 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h
@@ -18,6 +18,13 @@ class ThreadPoolInterface {
public:
virtual void Schedule(std::function<void()> fn) = 0;
+ // Returns the number of threads in the pool.
+ virtual int NumThreads() const = 0;
+
+ // Returns a logical thread index between 0 and NumThreads() - 1 if called
+ // from one of the threads in the pool. Returns -1 otherwise.
+ virtual int CurrentThreadId() const = 0;
+
virtual ~ThreadPoolInterface() {}
};