diff options
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool')
7 files changed, 171 insertions, 102 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/CMakeLists.txt b/unsupported/Eigen/CXX11/src/ThreadPool/CMakeLists.txt deleted file mode 100644 index 88fef50c6..000000000 --- a/unsupported/Eigen/CXX11/src/ThreadPool/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -FILE(GLOB Eigen_CXX11_ThreadPool_SRCS "*.h") - -INSTALL(FILES - ${Eigen_CXX11_ThreadPool_SRCS} - DESTINATION ${INCLUDE_INSTALL_DIR}/unsupported/Eigen/CXX11/src/ThreadPool COMPONENT Devel - ) diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h index 6dd64f185..71d55552d 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h @@ -50,7 +50,7 @@ class EventCount { public: class Waiter; - EventCount(std::vector<Waiter>& waiters) : waiters_(waiters) { + EventCount(MaxSizeVector<Waiter>& waiters) : waiters_(waiters) { eigen_assert(waiters.size() < (1 << kWaiterBits) - 1); // Initialize epoch to something close to overflow to test overflow. state_ = kStackMask | (kEpochMask - kEpochInc * waiters.size() * 2); @@ -169,7 +169,8 @@ class EventCount { class Waiter { friend class EventCount; - std::atomic<Waiter*> next; + // Align to 128 byte boundary to prevent false sharing with other Waiter objects in the same vector. + EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<Waiter*> next; std::mutex mu; std::condition_variable cv; uint64_t epoch; @@ -179,8 +180,6 @@ class EventCount { kWaiting, kSignaled, }; - // Prevent false sharing with other Waiter objects in the same vector. - char pad_[128]; }; private: @@ -200,7 +199,7 @@ class EventCount { static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift; static const uint64_t kEpochInc = 1ull << kEpochShift; std::atomic<uint64_t> state_; - std::vector<Waiter>& waiters_; + MaxSizeVector<Waiter>& waiters_; void Park(Waiter* w) { std::unique_lock<std::mutex> lock(w->mu); diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index 1c471a19f..354bce52a 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -23,18 +23,44 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { : env_(env), threads_(num_threads), queues_(num_threads), + coprimes_(num_threads), waiters_(num_threads), - blocked_(), - spinning_(), - done_(), + blocked_(0), + spinning_(0), + done_(false), ec_(waiters_) { - for (int i = 0; i < num_threads; i++) queues_.push_back(new Queue()); - for (int i = 0; i < num_threads; i++) + waiters_.resize(num_threads); + + // Calculate coprimes of num_threads. + // Coprimes are used for a random walk over all threads in Steal + // and NonEmptyQueueIndex. Iteration is based on the fact that if we take + // a walk starting thread index t and calculate num_threads - 1 subsequent + // indices as (t + coprime) % num_threads, we will cover all threads without + // repetitions (effectively getting a presudo-random permutation of thread + // indices). + for (int i = 1; i <= num_threads; i++) { + unsigned a = i; + unsigned b = num_threads; + // If GCD(a, b) == 1, then a and b are coprimes. + while (b != 0) { + unsigned tmp = a; + a = b; + b = tmp % b; + } + if (a == 1) { + coprimes_.push_back(i); + } + } + for (int i = 0; i < num_threads; i++) { + queues_.push_back(new Queue()); + } + for (int i = 0; i < num_threads; i++) { threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); + } } ~NonBlockingThreadPoolTempl() { - done_.store(true, std::memory_order_relaxed); + done_ = true; // Now if all threads block without work, they will start exiting. // But note that threads can continue to work arbitrary long, // block, submit new work, unblock and otherwise live full life. @@ -50,7 +76,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { PerThread* pt = GetPerThread(); if (pt->pool == this) { // Worker thread of this pool, push onto the thread's queue. - Queue* q = queues_[pt->index]; + Queue* q = queues_[pt->thread_id]; t = q->PushFront(std::move(t)); } else { // A free-standing thread (or worker of another pool), push onto a random @@ -71,108 +97,111 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { env_.ExecuteTask(t); // Push failed, execute directly. } + int NumThreads() const final { + return static_cast<int>(threads_.size()); + } + + int CurrentThreadId() const final { + const PerThread* pt = + const_cast<NonBlockingThreadPoolTempl*>(this)->GetPerThread(); + if (pt->pool == this) { + return pt->thread_id; + } else { + return -1; + } + } + private: typedef typename Environment::EnvThread Thread; struct PerThread { - bool inited; + constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { } NonBlockingThreadPoolTempl* pool; // Parent pool, or null for normal threads. - unsigned index; // Worker thread index in pool. - unsigned rand; // Random generator state. + uint64_t rand; // Random generator state. + int thread_id; // Worker thread index in pool. }; Environment env_; MaxSizeVector<Thread*> threads_; MaxSizeVector<Queue*> queues_; - std::vector<EventCount::Waiter> waiters_; + MaxSizeVector<unsigned> coprimes_; + MaxSizeVector<EventCount::Waiter> waiters_; std::atomic<unsigned> blocked_; std::atomic<bool> spinning_; std::atomic<bool> done_; EventCount ec_; // Main worker thread loop. - void WorkerLoop(unsigned index) { + void WorkerLoop(int thread_id) { PerThread* pt = GetPerThread(); pt->pool = this; - pt->index = index; - Queue* q = queues_[index]; - EventCount::Waiter* waiter = &waiters_[index]; - std::vector<Task> stolen; + pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id()); + pt->thread_id = thread_id; + Queue* q = queues_[thread_id]; + EventCount::Waiter* waiter = &waiters_[thread_id]; for (;;) { - Task t; - if (!stolen.empty()) { - t = std::move(stolen.back()); - stolen.pop_back(); - } - if (!t.f) t = q->PopFront(); + Task t = q->PopFront(); if (!t.f) { - if (Steal(&stolen)) { - t = std::move(stolen.back()); - stolen.pop_back(); - while (stolen.size()) { - Task t1 = q->PushFront(std::move(stolen.back())); - stolen.pop_back(); - if (t1.f) { - // There is not much we can do in this case. Just execute the - // remaining directly. - stolen.push_back(std::move(t1)); - break; + t = Steal(); + if (!t.f) { + // Leave one thread spinning. This reduces latency. + // TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it. + // Also, the time it takes to attempt to steal work 1000 times depends + // on the size of the thread pool. However the speed at which the user + // of the thread pool submit tasks is independent of the size of the + // pool. Consider a time based limit instead. + if (!spinning_ && !spinning_.exchange(true)) { + for (int i = 0; i < 1000 && !t.f; i++) { + t = Steal(); + } + spinning_ = false; + } + if (!t.f) { + if (!WaitForWork(waiter, &t)) { + return; } } } } if (t.f) { env_.ExecuteTask(t); - continue; } - // Leave one thread spinning. This reduces latency. - if (!spinning_ && !spinning_.exchange(true)) { - bool nowork = true; - for (int i = 0; i < 1000; i++) { - if (!OutOfWork()) { - nowork = false; - break; - } - } - spinning_ = false; - if (!nowork) continue; - } - if (!WaitForWork(waiter)) return; } } // Steal tries to steal work from other worker threads in best-effort manner. - bool Steal(std::vector<Task>* stolen) { - if (queues_.size() == 1) return false; + Task Steal() { PerThread* pt = GetPerThread(); - unsigned lastq = pt->index; - for (unsigned i = queues_.size(); i > 0; i--) { - unsigned victim = Rand(&pt->rand) % queues_.size(); - if (victim == lastq && queues_.size() > 2) { - i++; - continue; + const size_t size = queues_.size(); + unsigned r = Rand(&pt->rand); + unsigned inc = coprimes_[r % coprimes_.size()]; + unsigned victim = r % size; + for (unsigned i = 0; i < size; i++) { + Task t = queues_[victim]->PopBack(); + if (t.f) { + return t; + } + victim += inc; + if (victim >= size) { + victim -= size; } - // Steal half of elements from a victim queue. - // It is typical to steal just one element, but that assumes that work is - // recursively subdivided in halves so that the stolen element is exactly - // half of work. If work elements are equally-sized, then is makes sense - // to steal half of elements at once and then work locally for a while. - if (queues_[victim]->PopBackHalf(stolen)) return true; - lastq = victim; } - // Just to make sure that we did not miss anything. - for (unsigned i = queues_.size(); i > 0; i--) - if (queues_[i - 1]->PopBackHalf(stolen)) return true; - return false; + return Task(); } - // WaitForWork blocks until new work is available, or if it is time to exit. - bool WaitForWork(EventCount::Waiter* waiter) { - // We already did best-effort emptiness check in Steal, so prepare blocking. + // WaitForWork blocks until new work is available (returns true), or if it is + // time to exit (returns false). Can optionally return a task to execute in t + // (in such case t.f != nullptr on return). + bool WaitForWork(EventCount::Waiter* waiter, Task* t) { + eigen_assert(!t->f); + // We already did best-effort emptiness check in Steal, so prepare for + // blocking. ec_.Prewait(waiter); - // Now do reliable emptiness check. - if (!OutOfWork()) { + // Now do a reliable emptiness check. + int victim = NonEmptyQueueIndex(); + if (victim != -1) { ec_.CancelWait(waiter); + *t = queues_[victim]->PopBack(); return true; } // Number of blocked threads is used as termination condition. @@ -186,7 +215,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // right after incrementing blocked_ above. Now a free-standing thread // submits work and calls destructor (which sets done_). If we don't // re-check queues, we will exit leaving the work unexecuted. - if (!OutOfWork()) { + if (NonEmptyQueueIndex() != -1) { // Note: we must not pop from queues before we decrement blocked_, // otherwise the following scenario is possible. Consider that instead // of checking for emptiness we popped the only element from queues. @@ -205,23 +234,36 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { return true; } - bool OutOfWork() { - for (unsigned i = 0; i < queues_.size(); i++) - if (!queues_[i]->Empty()) return false; - return true; + int NonEmptyQueueIndex() { + PerThread* pt = GetPerThread(); + const size_t size = queues_.size(); + unsigned r = Rand(&pt->rand); + unsigned inc = coprimes_[r % coprimes_.size()]; + unsigned victim = r % size; + for (unsigned i = 0; i < size; i++) { + if (!queues_[victim]->Empty()) { + return victim; + } + victim += inc; + if (victim >= size) { + victim -= size; + } + } + return -1; } - PerThread* GetPerThread() { + static EIGEN_STRONG_INLINE PerThread* GetPerThread() { EIGEN_THREAD_LOCAL PerThread per_thread_; PerThread* pt = &per_thread_; - if (pt->inited) return pt; - pt->inited = true; - pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id()); return pt; } - static unsigned Rand(unsigned* state) { - return *state = *state * 1103515245 + 12345; + static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) { + uint64_t current = *state; + // Update the internal state + *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL; + // Generate the random output (using the PCG-XSH-RS scheme) + return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61))); } }; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h index 0544a6e15..05ed76cbe 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h @@ -38,7 +38,7 @@ namespace Eigen { template <typename Work, unsigned kSize> class RunQueue { public: - RunQueue() : front_(), back_() { + RunQueue() : front_(0), back_(0) { // require power-of-two for fast masking eigen_assert((kSize & (kSize - 1)) == 0); eigen_assert(kSize > 2); // why would you do this? @@ -100,7 +100,7 @@ class RunQueue { // PopBack removes and returns the last elements in the queue. // Can fail spuriously. Work PopBack() { - if (Empty()) return 0; + if (Empty()) return Work(); std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); if (!lock) return Work(); unsigned back = back_.load(std::memory_order_relaxed); diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h index 17fd1658b..e75d0f467 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h @@ -24,7 +24,7 @@ class SimpleThreadPoolTempl : public ThreadPoolInterface { explicit SimpleThreadPoolTempl(int num_threads, Environment env = Environment()) : env_(env), threads_(num_threads), waiters_(num_threads) { for (int i = 0; i < num_threads; i++) { - threads_.push_back(env.CreateThread([this]() { WorkerLoop(); })); + threads_.push_back(env.CreateThread([this, i]() { WorkerLoop(i); })); } } @@ -55,7 +55,7 @@ class SimpleThreadPoolTempl : public ThreadPoolInterface { // Schedule fn() for execution in the pool of threads. The functions are // executed in the order in which they are scheduled. - void Schedule(std::function<void()> fn) { + void Schedule(std::function<void()> fn) final { Task t = env_.CreateTask(std::move(fn)); std::unique_lock<std::mutex> l(mu_); if (waiters_.empty()) { @@ -69,9 +69,25 @@ class SimpleThreadPoolTempl : public ThreadPoolInterface { } } + int NumThreads() const final { + return static_cast<int>(threads_.size()); + } + + int CurrentThreadId() const final { + const PerThread* pt = this->GetPerThread(); + if (pt->pool == this) { + return pt->thread_id; + } else { + return -1; + } + } + protected: - void WorkerLoop() { + void WorkerLoop(int thread_id) { std::unique_lock<std::mutex> l(mu_); + PerThread* pt = GetPerThread(); + pt->pool = this; + pt->thread_id = thread_id; Waiter w; Task t; while (!exiting_) { @@ -111,13 +127,24 @@ class SimpleThreadPoolTempl : public ThreadPoolInterface { bool ready; }; + struct PerThread { + constexpr PerThread() : pool(NULL), thread_id(-1) { } + SimpleThreadPoolTempl* pool; // Parent pool, or null for normal threads. + int thread_id; // Worker thread index in pool. + }; + Environment env_; std::mutex mu_; MaxSizeVector<Thread*> threads_; // All threads MaxSizeVector<Waiter*> waiters_; // Stack of waiting threads. - std::deque<Task> pending_; // Queue of pending work - std::condition_variable empty_; // Signaled on pending_.empty() + std::deque<Task> pending_; // Queue of pending work + std::condition_variable empty_; // Signaled on pending_.empty() bool exiting_ = false; + + PerThread* GetPerThread() const { + EIGEN_THREAD_LOCAL PerThread per_thread; + return &per_thread; + } }; typedef SimpleThreadPoolTempl<StlThreadEnvironment> SimpleThreadPool; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h index d2204ad5b..399f95cc1 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h @@ -21,14 +21,14 @@ struct StlThreadEnvironment { // destructor must join the thread. class EnvThread { public: - EnvThread(std::function<void()> f) : thr_(f) {} + EnvThread(std::function<void()> f) : thr_(std::move(f)) {} ~EnvThread() { thr_.join(); } private: std::thread thr_; }; - EnvThread* CreateThread(std::function<void()> f) { return new EnvThread(f); } + EnvThread* CreateThread(std::function<void()> f) { return new EnvThread(std::move(f)); } Task CreateTask(std::function<void()> f) { return Task{std::move(f)}; } void ExecuteTask(const Task& t) { t.f(); } }; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h index 38b40aceb..a65ee97c9 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h @@ -18,6 +18,13 @@ class ThreadPoolInterface { public: virtual void Schedule(std::function<void()> fn) = 0; + // Returns the number of threads in the pool. + virtual int NumThreads() const = 0; + + // Returns a logical thread index between 0 and NumThreads() - 1 if called + // from one of the threads in the pool. Returns -1 otherwise. + virtual int CurrentThreadId() const = 0; + virtual ~ThreadPoolInterface() {} }; |