diff options
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h')
-rw-r--r-- | unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h | 206 |
1 files changed, 124 insertions, 82 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index 1c471a19f..354bce52a 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -23,18 +23,44 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { : env_(env), threads_(num_threads), queues_(num_threads), + coprimes_(num_threads), waiters_(num_threads), - blocked_(), - spinning_(), - done_(), + blocked_(0), + spinning_(0), + done_(false), ec_(waiters_) { - for (int i = 0; i < num_threads; i++) queues_.push_back(new Queue()); - for (int i = 0; i < num_threads; i++) + waiters_.resize(num_threads); + + // Calculate coprimes of num_threads. + // Coprimes are used for a random walk over all threads in Steal + // and NonEmptyQueueIndex. Iteration is based on the fact that if we take + // a walk starting thread index t and calculate num_threads - 1 subsequent + // indices as (t + coprime) % num_threads, we will cover all threads without + // repetitions (effectively getting a presudo-random permutation of thread + // indices). + for (int i = 1; i <= num_threads; i++) { + unsigned a = i; + unsigned b = num_threads; + // If GCD(a, b) == 1, then a and b are coprimes. + while (b != 0) { + unsigned tmp = a; + a = b; + b = tmp % b; + } + if (a == 1) { + coprimes_.push_back(i); + } + } + for (int i = 0; i < num_threads; i++) { + queues_.push_back(new Queue()); + } + for (int i = 0; i < num_threads; i++) { threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); + } } ~NonBlockingThreadPoolTempl() { - done_.store(true, std::memory_order_relaxed); + done_ = true; // Now if all threads block without work, they will start exiting. // But note that threads can continue to work arbitrary long, // block, submit new work, unblock and otherwise live full life. @@ -50,7 +76,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { PerThread* pt = GetPerThread(); if (pt->pool == this) { // Worker thread of this pool, push onto the thread's queue. - Queue* q = queues_[pt->index]; + Queue* q = queues_[pt->thread_id]; t = q->PushFront(std::move(t)); } else { // A free-standing thread (or worker of another pool), push onto a random @@ -71,108 +97,111 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { env_.ExecuteTask(t); // Push failed, execute directly. } + int NumThreads() const final { + return static_cast<int>(threads_.size()); + } + + int CurrentThreadId() const final { + const PerThread* pt = + const_cast<NonBlockingThreadPoolTempl*>(this)->GetPerThread(); + if (pt->pool == this) { + return pt->thread_id; + } else { + return -1; + } + } + private: typedef typename Environment::EnvThread Thread; struct PerThread { - bool inited; + constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { } NonBlockingThreadPoolTempl* pool; // Parent pool, or null for normal threads. - unsigned index; // Worker thread index in pool. - unsigned rand; // Random generator state. + uint64_t rand; // Random generator state. + int thread_id; // Worker thread index in pool. }; Environment env_; MaxSizeVector<Thread*> threads_; MaxSizeVector<Queue*> queues_; - std::vector<EventCount::Waiter> waiters_; + MaxSizeVector<unsigned> coprimes_; + MaxSizeVector<EventCount::Waiter> waiters_; std::atomic<unsigned> blocked_; std::atomic<bool> spinning_; std::atomic<bool> done_; EventCount ec_; // Main worker thread loop. - void WorkerLoop(unsigned index) { + void WorkerLoop(int thread_id) { PerThread* pt = GetPerThread(); pt->pool = this; - pt->index = index; - Queue* q = queues_[index]; - EventCount::Waiter* waiter = &waiters_[index]; - std::vector<Task> stolen; + pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id()); + pt->thread_id = thread_id; + Queue* q = queues_[thread_id]; + EventCount::Waiter* waiter = &waiters_[thread_id]; for (;;) { - Task t; - if (!stolen.empty()) { - t = std::move(stolen.back()); - stolen.pop_back(); - } - if (!t.f) t = q->PopFront(); + Task t = q->PopFront(); if (!t.f) { - if (Steal(&stolen)) { - t = std::move(stolen.back()); - stolen.pop_back(); - while (stolen.size()) { - Task t1 = q->PushFront(std::move(stolen.back())); - stolen.pop_back(); - if (t1.f) { - // There is not much we can do in this case. Just execute the - // remaining directly. - stolen.push_back(std::move(t1)); - break; + t = Steal(); + if (!t.f) { + // Leave one thread spinning. This reduces latency. + // TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it. + // Also, the time it takes to attempt to steal work 1000 times depends + // on the size of the thread pool. However the speed at which the user + // of the thread pool submit tasks is independent of the size of the + // pool. Consider a time based limit instead. + if (!spinning_ && !spinning_.exchange(true)) { + for (int i = 0; i < 1000 && !t.f; i++) { + t = Steal(); + } + spinning_ = false; + } + if (!t.f) { + if (!WaitForWork(waiter, &t)) { + return; } } } } if (t.f) { env_.ExecuteTask(t); - continue; } - // Leave one thread spinning. This reduces latency. - if (!spinning_ && !spinning_.exchange(true)) { - bool nowork = true; - for (int i = 0; i < 1000; i++) { - if (!OutOfWork()) { - nowork = false; - break; - } - } - spinning_ = false; - if (!nowork) continue; - } - if (!WaitForWork(waiter)) return; } } // Steal tries to steal work from other worker threads in best-effort manner. - bool Steal(std::vector<Task>* stolen) { - if (queues_.size() == 1) return false; + Task Steal() { PerThread* pt = GetPerThread(); - unsigned lastq = pt->index; - for (unsigned i = queues_.size(); i > 0; i--) { - unsigned victim = Rand(&pt->rand) % queues_.size(); - if (victim == lastq && queues_.size() > 2) { - i++; - continue; + const size_t size = queues_.size(); + unsigned r = Rand(&pt->rand); + unsigned inc = coprimes_[r % coprimes_.size()]; + unsigned victim = r % size; + for (unsigned i = 0; i < size; i++) { + Task t = queues_[victim]->PopBack(); + if (t.f) { + return t; + } + victim += inc; + if (victim >= size) { + victim -= size; } - // Steal half of elements from a victim queue. - // It is typical to steal just one element, but that assumes that work is - // recursively subdivided in halves so that the stolen element is exactly - // half of work. If work elements are equally-sized, then is makes sense - // to steal half of elements at once and then work locally for a while. - if (queues_[victim]->PopBackHalf(stolen)) return true; - lastq = victim; } - // Just to make sure that we did not miss anything. - for (unsigned i = queues_.size(); i > 0; i--) - if (queues_[i - 1]->PopBackHalf(stolen)) return true; - return false; + return Task(); } - // WaitForWork blocks until new work is available, or if it is time to exit. - bool WaitForWork(EventCount::Waiter* waiter) { - // We already did best-effort emptiness check in Steal, so prepare blocking. + // WaitForWork blocks until new work is available (returns true), or if it is + // time to exit (returns false). Can optionally return a task to execute in t + // (in such case t.f != nullptr on return). + bool WaitForWork(EventCount::Waiter* waiter, Task* t) { + eigen_assert(!t->f); + // We already did best-effort emptiness check in Steal, so prepare for + // blocking. ec_.Prewait(waiter); - // Now do reliable emptiness check. - if (!OutOfWork()) { + // Now do a reliable emptiness check. + int victim = NonEmptyQueueIndex(); + if (victim != -1) { ec_.CancelWait(waiter); + *t = queues_[victim]->PopBack(); return true; } // Number of blocked threads is used as termination condition. @@ -186,7 +215,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // right after incrementing blocked_ above. Now a free-standing thread // submits work and calls destructor (which sets done_). If we don't // re-check queues, we will exit leaving the work unexecuted. - if (!OutOfWork()) { + if (NonEmptyQueueIndex() != -1) { // Note: we must not pop from queues before we decrement blocked_, // otherwise the following scenario is possible. Consider that instead // of checking for emptiness we popped the only element from queues. @@ -205,23 +234,36 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { return true; } - bool OutOfWork() { - for (unsigned i = 0; i < queues_.size(); i++) - if (!queues_[i]->Empty()) return false; - return true; + int NonEmptyQueueIndex() { + PerThread* pt = GetPerThread(); + const size_t size = queues_.size(); + unsigned r = Rand(&pt->rand); + unsigned inc = coprimes_[r % coprimes_.size()]; + unsigned victim = r % size; + for (unsigned i = 0; i < size; i++) { + if (!queues_[victim]->Empty()) { + return victim; + } + victim += inc; + if (victim >= size) { + victim -= size; + } + } + return -1; } - PerThread* GetPerThread() { + static EIGEN_STRONG_INLINE PerThread* GetPerThread() { EIGEN_THREAD_LOCAL PerThread per_thread_; PerThread* pt = &per_thread_; - if (pt->inited) return pt; - pt->inited = true; - pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id()); return pt; } - static unsigned Rand(unsigned* state) { - return *state = *state * 1103515245 + 12345; + static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) { + uint64_t current = *state; + // Update the internal state + *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL; + // Generate the random output (using the PCG-XSH-RS scheme) + return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61))); } }; |