aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported
diff options
context:
space:
mode:
authorGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2016-05-09 10:17:17 -0700
committerGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2016-05-09 10:17:17 -0700
commitdc7dbc2df71e88615c4f179a2eded7f617fca7a9 (patch)
treee2a6c5d76adc94e0afb7511cbbcf7eb6751f3ebd /unsupported
parent05c365fb16a6ce63688d465e5d30c98a8742407b (diff)
Optimized the non blocking thread pool:
* Use a pseudo-random permutation of queue indices during random stealing. This ensures that all the queues are considered. * Directly pop from a non-empty queue when we are waiting for work, instead of first noticing that there is a non-empty queue and then doing another round of random stealing to re-discover the non-empty queue. * Steal only 1 task from a remote queue instead of half of tasks.
Diffstat (limited to 'unsupported')
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h150
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h2
-rw-r--r--unsupported/test/cxx11_runqueue.cpp8
3 files changed, 96 insertions, 64 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
index 1c471a19f..f7e73aabe 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
@@ -23,14 +23,38 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
: env_(env),
threads_(num_threads),
queues_(num_threads),
+ coprimes_(num_threads),
waiters_(num_threads),
blocked_(),
spinning_(),
done_(),
ec_(waiters_) {
- for (int i = 0; i < num_threads; i++) queues_.push_back(new Queue());
- for (int i = 0; i < num_threads; i++)
+ // Calculate coprimes of num_threads.
+ // Coprimes are used for a random walk over all threads in Steal
+ // and NonEmptyQueueIndex. Iteration is based on the fact that if we take
+ // a walk starting thread index t and calculate num_threads - 1 subsequent
+ // indices as (t + coprime) % num_threads, we will cover all threads without
+ // repetitions (effectively getting a presudo-random permutation of thread
+ // indices).
+ for (unsigned i = 1; i <= num_threads; i++) {
+ unsigned a = i;
+ unsigned b = num_threads;
+ // If GCD(a, b) == 1, then a and b are coprimes.
+ while (b != 0) {
+ unsigned tmp = a;
+ a = b;
+ b = tmp % b;
+ }
+ if (a == 1) {
+ coprimes_.push_back(i);
+ }
+ }
+ for (int i = 0; i < num_threads; i++) {
+ queues_.push_back(new Queue());
+ }
+ for (int i = 0; i < num_threads; i++) {
threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
+ }
}
~NonBlockingThreadPoolTempl() {
@@ -84,6 +108,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
Environment env_;
MaxSizeVector<Thread*> threads_;
MaxSizeVector<Queue*> queues_;
+ MaxSizeVector<unsigned> coprimes_;
std::vector<EventCount::Waiter> waiters_;
std::atomic<unsigned> blocked_;
std::atomic<bool> spinning_;
@@ -97,82 +122,69 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
pt->index = index;
Queue* q = queues_[index];
EventCount::Waiter* waiter = &waiters_[index];
- std::vector<Task> stolen;
for (;;) {
- Task t;
- if (!stolen.empty()) {
- t = std::move(stolen.back());
- stolen.pop_back();
- }
- if (!t.f) t = q->PopFront();
+ Task t = q->PopFront();
if (!t.f) {
- if (Steal(&stolen)) {
- t = std::move(stolen.back());
- stolen.pop_back();
- while (stolen.size()) {
- Task t1 = q->PushFront(std::move(stolen.back()));
- stolen.pop_back();
- if (t1.f) {
- // There is not much we can do in this case. Just execute the
- // remaining directly.
- stolen.push_back(std::move(t1));
- break;
+ t = Steal();
+ if (!t.f) {
+ // Leave one thread spinning. This reduces latency.
+ // TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it.
+ // Also, the time it takes to attempt to steal work 1000 times depends
+ // on the size of the thread pool. However the speed at which the user
+ // of the thread pool submit tasks is independent of the size of the
+ // pool. Consider a time based limit instead.
+ if (!spinning_ && !spinning_.exchange(true)) {
+ for (int i = 0; i < 1000 && !t.f; i++) {
+ t = Steal();
+ }
+ spinning_ = false;
+ }
+ if (!t.f) {
+ if (!WaitForWork(waiter, &t)) {
+ return;
}
}
}
}
if (t.f) {
env_.ExecuteTask(t);
- continue;
- }
- // Leave one thread spinning. This reduces latency.
- if (!spinning_ && !spinning_.exchange(true)) {
- bool nowork = true;
- for (int i = 0; i < 1000; i++) {
- if (!OutOfWork()) {
- nowork = false;
- break;
- }
- }
- spinning_ = false;
- if (!nowork) continue;
}
- if (!WaitForWork(waiter)) return;
}
}
// Steal tries to steal work from other worker threads in best-effort manner.
- bool Steal(std::vector<Task>* stolen) {
- if (queues_.size() == 1) return false;
+ Task Steal() {
PerThread* pt = GetPerThread();
- unsigned lastq = pt->index;
- for (unsigned i = queues_.size(); i > 0; i--) {
- unsigned victim = Rand(&pt->rand) % queues_.size();
- if (victim == lastq && queues_.size() > 2) {
- i++;
- continue;
+ unsigned size = queues_.size();
+ unsigned r = Rand(&pt->rand);
+ unsigned inc = coprimes_[r % coprimes_.size()];
+ unsigned victim = r % size;
+ for (unsigned i = 0; i < size; i++) {
+ Task t = queues_[victim]->PopBack();
+ if (t.f) {
+ return t;
+ }
+ victim += inc;
+ if (victim >= size) {
+ victim -= size;
}
- // Steal half of elements from a victim queue.
- // It is typical to steal just one element, but that assumes that work is
- // recursively subdivided in halves so that the stolen element is exactly
- // half of work. If work elements are equally-sized, then is makes sense
- // to steal half of elements at once and then work locally for a while.
- if (queues_[victim]->PopBackHalf(stolen)) return true;
- lastq = victim;
}
- // Just to make sure that we did not miss anything.
- for (unsigned i = queues_.size(); i > 0; i--)
- if (queues_[i - 1]->PopBackHalf(stolen)) return true;
- return false;
+ return Task();
}
- // WaitForWork blocks until new work is available, or if it is time to exit.
- bool WaitForWork(EventCount::Waiter* waiter) {
- // We already did best-effort emptiness check in Steal, so prepare blocking.
+ // WaitForWork blocks until new work is available (returns true), or if it is
+ // time to exit (returns false). Can optionally return a task to execute in t
+ // (in such case t.f != nullptr on return).
+ bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
+ eigen_assert(!t->f);
+ // We already did best-effort emptiness check in Steal, so prepare for
+ // blocking.
ec_.Prewait(waiter);
- // Now do reliable emptiness check.
- if (!OutOfWork()) {
+ // Now do a reliable emptiness check.
+ int victim = NonEmptyQueueIndex();
+ if (victim != -1) {
ec_.CancelWait(waiter);
+ *t = queues_[victim]->PopBack();
return true;
}
// Number of blocked threads is used as termination condition.
@@ -186,7 +198,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
// right after incrementing blocked_ above. Now a free-standing thread
// submits work and calls destructor (which sets done_). If we don't
// re-check queues, we will exit leaving the work unexecuted.
- if (!OutOfWork()) {
+ if (NonEmptyQueueIndex() != -1) {
// Note: we must not pop from queues before we decrement blocked_,
// otherwise the following scenario is possible. Consider that instead
// of checking for emptiness we popped the only element from queues.
@@ -205,10 +217,22 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
return true;
}
- bool OutOfWork() {
- for (unsigned i = 0; i < queues_.size(); i++)
- if (!queues_[i]->Empty()) return false;
- return true;
+ int NonEmptyQueueIndex() {
+ PerThread* pt = GetPerThread();
+ unsigned size = queues_.size();
+ unsigned r = Rand(&pt->rand);
+ unsigned inc = coprimes_[r % coprimes_.size()];
+ unsigned victim = r % size;
+ for (unsigned i = 0; i < size; i++) {
+ if (!queues_[victim]->Empty()) {
+ return victim;
+ }
+ victim += inc;
+ if (victim >= size) {
+ victim -= size;
+ }
+ }
+ return -1;
}
PerThread* GetPerThread() {
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
index 0544a6e15..48de960ae 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
@@ -100,7 +100,7 @@ class RunQueue {
// PopBack removes and returns the last elements in the queue.
// Can fail spuriously.
Work PopBack() {
- if (Empty()) return 0;
+ if (Empty()) return Work();
std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
if (!lock) return Work();
unsigned back = back_.load(std::memory_order_relaxed);
diff --git a/unsupported/test/cxx11_runqueue.cpp b/unsupported/test/cxx11_runqueue.cpp
index d20d87111..2594ff0c5 100644
--- a/unsupported/test/cxx11_runqueue.cpp
+++ b/unsupported/test/cxx11_runqueue.cpp
@@ -100,6 +100,14 @@ void test_basic_runqueue()
// Empty again.
VERIFY(q.Empty());
VERIFY_IS_EQUAL(0u, q.Size());
+ VERIFY_IS_EQUAL(0, q.PushFront(1));
+ VERIFY_IS_EQUAL(0, q.PushFront(2));
+ VERIFY_IS_EQUAL(0, q.PushFront(3));
+ VERIFY_IS_EQUAL(1, q.PopBack());
+ VERIFY_IS_EQUAL(2, q.PopBack());
+ VERIFY_IS_EQUAL(3, q.PopBack());
+ VERIFY(q.Empty());
+ VERIFY_IS_EQUAL(0, q.Size());
}
// Empty tests that the queue is not claimed to be empty when is is in fact not.