diff options
author | Benoit Steiner <benoit.steiner.goog@gmail.com> | 2016-05-09 10:17:17 -0700 |
---|---|---|
committer | Benoit Steiner <benoit.steiner.goog@gmail.com> | 2016-05-09 10:17:17 -0700 |
commit | dc7dbc2df71e88615c4f179a2eded7f617fca7a9 (patch) | |
tree | e2a6c5d76adc94e0afb7511cbbcf7eb6751f3ebd /unsupported | |
parent | 05c365fb16a6ce63688d465e5d30c98a8742407b (diff) |
Optimized the non blocking thread pool:
* Use a pseudo-random permutation of queue indices during random stealing. This ensures that all the queues are considered.
* Directly pop from a non-empty queue when we are waiting for work,
instead of first noticing that there is a non-empty queue and
then doing another round of random stealing to re-discover the non-empty
queue.
* Steal only 1 task from a remote queue instead of half of tasks.
Diffstat (limited to 'unsupported')
-rw-r--r-- | unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h | 150 | ||||
-rw-r--r-- | unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h | 2 | ||||
-rw-r--r-- | unsupported/test/cxx11_runqueue.cpp | 8 |
3 files changed, 96 insertions, 64 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index 1c471a19f..f7e73aabe 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -23,14 +23,38 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { : env_(env), threads_(num_threads), queues_(num_threads), + coprimes_(num_threads), waiters_(num_threads), blocked_(), spinning_(), done_(), ec_(waiters_) { - for (int i = 0; i < num_threads; i++) queues_.push_back(new Queue()); - for (int i = 0; i < num_threads; i++) + // Calculate coprimes of num_threads. + // Coprimes are used for a random walk over all threads in Steal + // and NonEmptyQueueIndex. Iteration is based on the fact that if we take + // a walk starting thread index t and calculate num_threads - 1 subsequent + // indices as (t + coprime) % num_threads, we will cover all threads without + // repetitions (effectively getting a presudo-random permutation of thread + // indices). + for (unsigned i = 1; i <= num_threads; i++) { + unsigned a = i; + unsigned b = num_threads; + // If GCD(a, b) == 1, then a and b are coprimes. + while (b != 0) { + unsigned tmp = a; + a = b; + b = tmp % b; + } + if (a == 1) { + coprimes_.push_back(i); + } + } + for (int i = 0; i < num_threads; i++) { + queues_.push_back(new Queue()); + } + for (int i = 0; i < num_threads; i++) { threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); + } } ~NonBlockingThreadPoolTempl() { @@ -84,6 +108,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { Environment env_; MaxSizeVector<Thread*> threads_; MaxSizeVector<Queue*> queues_; + MaxSizeVector<unsigned> coprimes_; std::vector<EventCount::Waiter> waiters_; std::atomic<unsigned> blocked_; std::atomic<bool> spinning_; @@ -97,82 +122,69 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { pt->index = index; Queue* q = queues_[index]; EventCount::Waiter* waiter = &waiters_[index]; - std::vector<Task> stolen; for (;;) { - Task t; - if (!stolen.empty()) { - t = std::move(stolen.back()); - stolen.pop_back(); - } - if (!t.f) t = q->PopFront(); + Task t = q->PopFront(); if (!t.f) { - if (Steal(&stolen)) { - t = std::move(stolen.back()); - stolen.pop_back(); - while (stolen.size()) { - Task t1 = q->PushFront(std::move(stolen.back())); - stolen.pop_back(); - if (t1.f) { - // There is not much we can do in this case. Just execute the - // remaining directly. - stolen.push_back(std::move(t1)); - break; + t = Steal(); + if (!t.f) { + // Leave one thread spinning. This reduces latency. + // TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it. + // Also, the time it takes to attempt to steal work 1000 times depends + // on the size of the thread pool. However the speed at which the user + // of the thread pool submit tasks is independent of the size of the + // pool. Consider a time based limit instead. + if (!spinning_ && !spinning_.exchange(true)) { + for (int i = 0; i < 1000 && !t.f; i++) { + t = Steal(); + } + spinning_ = false; + } + if (!t.f) { + if (!WaitForWork(waiter, &t)) { + return; } } } } if (t.f) { env_.ExecuteTask(t); - continue; - } - // Leave one thread spinning. This reduces latency. - if (!spinning_ && !spinning_.exchange(true)) { - bool nowork = true; - for (int i = 0; i < 1000; i++) { - if (!OutOfWork()) { - nowork = false; - break; - } - } - spinning_ = false; - if (!nowork) continue; } - if (!WaitForWork(waiter)) return; } } // Steal tries to steal work from other worker threads in best-effort manner. - bool Steal(std::vector<Task>* stolen) { - if (queues_.size() == 1) return false; + Task Steal() { PerThread* pt = GetPerThread(); - unsigned lastq = pt->index; - for (unsigned i = queues_.size(); i > 0; i--) { - unsigned victim = Rand(&pt->rand) % queues_.size(); - if (victim == lastq && queues_.size() > 2) { - i++; - continue; + unsigned size = queues_.size(); + unsigned r = Rand(&pt->rand); + unsigned inc = coprimes_[r % coprimes_.size()]; + unsigned victim = r % size; + for (unsigned i = 0; i < size; i++) { + Task t = queues_[victim]->PopBack(); + if (t.f) { + return t; + } + victim += inc; + if (victim >= size) { + victim -= size; } - // Steal half of elements from a victim queue. - // It is typical to steal just one element, but that assumes that work is - // recursively subdivided in halves so that the stolen element is exactly - // half of work. If work elements are equally-sized, then is makes sense - // to steal half of elements at once and then work locally for a while. - if (queues_[victim]->PopBackHalf(stolen)) return true; - lastq = victim; } - // Just to make sure that we did not miss anything. - for (unsigned i = queues_.size(); i > 0; i--) - if (queues_[i - 1]->PopBackHalf(stolen)) return true; - return false; + return Task(); } - // WaitForWork blocks until new work is available, or if it is time to exit. - bool WaitForWork(EventCount::Waiter* waiter) { - // We already did best-effort emptiness check in Steal, so prepare blocking. + // WaitForWork blocks until new work is available (returns true), or if it is + // time to exit (returns false). Can optionally return a task to execute in t + // (in such case t.f != nullptr on return). + bool WaitForWork(EventCount::Waiter* waiter, Task* t) { + eigen_assert(!t->f); + // We already did best-effort emptiness check in Steal, so prepare for + // blocking. ec_.Prewait(waiter); - // Now do reliable emptiness check. - if (!OutOfWork()) { + // Now do a reliable emptiness check. + int victim = NonEmptyQueueIndex(); + if (victim != -1) { ec_.CancelWait(waiter); + *t = queues_[victim]->PopBack(); return true; } // Number of blocked threads is used as termination condition. @@ -186,7 +198,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // right after incrementing blocked_ above. Now a free-standing thread // submits work and calls destructor (which sets done_). If we don't // re-check queues, we will exit leaving the work unexecuted. - if (!OutOfWork()) { + if (NonEmptyQueueIndex() != -1) { // Note: we must not pop from queues before we decrement blocked_, // otherwise the following scenario is possible. Consider that instead // of checking for emptiness we popped the only element from queues. @@ -205,10 +217,22 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { return true; } - bool OutOfWork() { - for (unsigned i = 0; i < queues_.size(); i++) - if (!queues_[i]->Empty()) return false; - return true; + int NonEmptyQueueIndex() { + PerThread* pt = GetPerThread(); + unsigned size = queues_.size(); + unsigned r = Rand(&pt->rand); + unsigned inc = coprimes_[r % coprimes_.size()]; + unsigned victim = r % size; + for (unsigned i = 0; i < size; i++) { + if (!queues_[victim]->Empty()) { + return victim; + } + victim += inc; + if (victim >= size) { + victim -= size; + } + } + return -1; } PerThread* GetPerThread() { diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h index 0544a6e15..48de960ae 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h @@ -100,7 +100,7 @@ class RunQueue { // PopBack removes and returns the last elements in the queue. // Can fail spuriously. Work PopBack() { - if (Empty()) return 0; + if (Empty()) return Work(); std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); if (!lock) return Work(); unsigned back = back_.load(std::memory_order_relaxed); diff --git a/unsupported/test/cxx11_runqueue.cpp b/unsupported/test/cxx11_runqueue.cpp index d20d87111..2594ff0c5 100644 --- a/unsupported/test/cxx11_runqueue.cpp +++ b/unsupported/test/cxx11_runqueue.cpp @@ -100,6 +100,14 @@ void test_basic_runqueue() // Empty again. VERIFY(q.Empty()); VERIFY_IS_EQUAL(0u, q.Size()); + VERIFY_IS_EQUAL(0, q.PushFront(1)); + VERIFY_IS_EQUAL(0, q.PushFront(2)); + VERIFY_IS_EQUAL(0, q.PushFront(3)); + VERIFY_IS_EQUAL(1, q.PopBack()); + VERIFY_IS_EQUAL(2, q.PopBack()); + VERIFY_IS_EQUAL(3, q.PopBack()); + VERIFY(q.Empty()); + VERIFY_IS_EQUAL(0, q.Size()); } // Empty tests that the queue is not claimed to be empty when is is in fact not. |