diff options
3 files changed, 222 insertions, 62 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index 60a0c9fb6..2130625bc 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -26,9 +26,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { : env_(env), num_threads_(num_threads), allow_spinning_(allow_spinning), - threads_(num_threads), - queues_(num_threads), - coprimes_(num_threads), + thread_data_(num_threads), + all_coprimes_(num_threads), waiters_(num_threads), blocked_(0), spinning_(0), @@ -36,33 +35,26 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { cancelled_(false), ec_(waiters_) { waiters_.resize(num_threads_); - - // Calculate coprimes of num_threads_. - // Coprimes are used for a random walk over all threads in Steal + // Calculate coprimes of all numbers [1, num_threads]. + // Coprimes are used for random walks over all threads in Steal // and NonEmptyQueueIndex. Iteration is based on the fact that if we take - // a walk starting thread index t and calculate num_threads - 1 subsequent + // a random starting thread index t and calculate num_threads - 1 subsequent // indices as (t + coprime) % num_threads, we will cover all threads without // repetitions (effectively getting a presudo-random permutation of thread // indices). - for (int i = 1; i <= num_threads_; i++) { - unsigned a = i; - unsigned b = num_threads_; - // If GCD(a, b) == 1, then a and b are coprimes. - while (b != 0) { - unsigned tmp = a; - a = b; - b = tmp % b; - } - if (a == 1) { - coprimes_.push_back(i); - } + eigen_assert(num_threads_ < kMaxThreads); + for (int i = 1; i <= num_threads_; ++i) { + all_coprimes_.emplace_back(i); + ComputeCoprimes(i, &all_coprimes_.back()); } - queues_.resize(num_threads_); #ifndef EIGEN_THREAD_LOCAL init_barrier_.reset(new Barrier(num_threads_)); #endif + thread_data_.resize(num_threads_); for (int i = 0; i < num_threads_; i++) { - threads_.emplace_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); + SetStealPartition(i, EncodePartition(0, num_threads_)); + thread_data_[i].thread.reset( + env_.CreateThread([this, i]() { WorkerLoop(i); })); } #ifndef EIGEN_THREAD_LOCAL // Wait for workers to initialize per_thread_map_. Otherwise we might race @@ -82,27 +74,51 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { } else { // Since we were cancelled, there might be entries in the queues. // Empty them to prevent their destructor from asserting. - for (size_t i = 0; i < queues_.size(); i++) { - queues_[i].Flush(); + for (size_t i = 0; i < thread_data_.size(); i++) { + thread_data_[i].queue.Flush(); } } + // Join threads explicitly (by destroying) to avoid destruction order within + // this class. + for (size_t i = 0; i < thread_data_.size(); ++i) + thread_data_[i].thread.reset(); + } - // Join threads explicitly to avoid destruction order issues. - threads_.resize(0); - queues_.resize(0); + void SetStealPartitions(const std::vector<std::pair<unsigned, unsigned>>& partitions) { + int num_partitions = partitions.size(); + eigen_assert(num_partitions == num_threads_); + + // Pass this information to each thread queue. + for (int i = 0; i < num_threads_; i++) { + const auto& pair = partitions[i]; + unsigned start = pair.first, end = pair.second; + AssertBounds(start, end); + unsigned val = EncodePartition(start, end); + SetStealPartition(i, val); + } } void Schedule(std::function<void()> fn) { + ScheduleWithHint(std::move(fn), 0, num_threads_); + } + + void ScheduleWithHint(std::function<void()> fn, int start, + int limit) override { Task t = env_.CreateTask(std::move(fn)); PerThread* pt = GetPerThread(); if (pt->pool == this) { // Worker thread of this pool, push onto the thread's queue. - Queue& q = queues_[pt->thread_id]; + Queue& q = thread_data_[pt->thread_id].queue; t = q.PushFront(std::move(t)); } else { // A free-standing thread (or worker of another pool), push onto a random // queue. - Queue& q = queues_[Rand(&pt->rand) % queues_.size()]; + eigen_assert(start < limit); + eigen_assert(limit <= num_threads_); + int num_queues = limit - start; + int rnd = Rand(&pt->rand) % num_queues; + eigen_assert(start + rnd < limit); + Queue& q = thread_data_[start + rnd].queue; t = q.PushBack(std::move(t)); } // Note: below we touch this after making w available to worker threads. @@ -125,8 +141,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Let each thread know it's been cancelled. #ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION - for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->OnCancel(); + for (size_t i = 0; i < thread_data_.size(); i++) { + thread_data_[i].thread->OnCancel(); } #endif @@ -146,6 +162,56 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { } private: + // Create a single atomic<int> that encodes start and limit information for + // each thread. + // We expect num_threads_ < 65536, so we can store them in a single + // std::atomic<unsigned>. + // Exposed publicly as static functions so that external callers can reuse + // this encode/decode logic for maintaining their own thread-safe copies of + // scheduling and steal domain(s). + static const int kMaxPartitionBits = 16; + static const int kMaxThreads = 1 << kMaxPartitionBits; + + inline unsigned EncodePartition(unsigned start, unsigned limit) { + return (start << kMaxPartitionBits) | limit; + } + + inline void DecodePartition(unsigned val, unsigned* start, unsigned* limit) { + *limit = val & (kMaxThreads - 1); + val >>= kMaxPartitionBits; + *start = val; + } + + void AssertBounds(int start, int end) { + eigen_assert(start >= 0); + eigen_assert(start < end); // non-zero sized partition + eigen_assert(end <= num_threads_); + } + + inline void SetStealPartition(size_t i, unsigned val) { + thread_data_[i].steal_partition.store(val, std::memory_order_relaxed); + } + + inline unsigned GetStealPartition(int i) { + return thread_data_[i].steal_partition.load(std::memory_order_relaxed); + } + + void ComputeCoprimes(int N, MaxSizeVector<unsigned>* coprimes) { + for (int i = 1; i <= N; i++) { + unsigned a = i; + unsigned b = N; + // If GCD(a, b) == 1, then a and b are coprimes. + while (b != 0) { + unsigned tmp = a; + a = b; + b = tmp % b; + } + if (a == 1) { + coprimes->push_back(i); + } + } + } + typedef typename Environment::EnvThread Thread; struct PerThread { @@ -159,12 +225,18 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { #endif }; + struct ThreadData { + constexpr ThreadData() : thread(), steal_partition(0), queue() {} + std::unique_ptr<Thread> thread; + std::atomic<unsigned> steal_partition; + Queue queue; + }; + Environment env_; const int num_threads_; const bool allow_spinning_; - MaxSizeVector<std::unique_ptr<Thread> > threads_; - MaxSizeVector<Queue> queues_; - MaxSizeVector<unsigned> coprimes_; + MaxSizeVector<ThreadData> thread_data_; + MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_; MaxSizeVector<EventCount::Waiter> waiters_; std::atomic<unsigned> blocked_; std::atomic<bool> spinning_; @@ -191,19 +263,19 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { pt->pool = this; pt->rand = GlobalThreadIdHash(); pt->thread_id = thread_id; - Queue& q = queues_[thread_id]; + Queue& q = thread_data_[thread_id].queue; EventCount::Waiter* waiter = &waiters_[thread_id]; - // TODO(dvyukov,rmlarsen): The time spent in Steal() is proportional - // to num_threads_ and we assume that new work is scheduled at a - // constant rate, so we set spin_count to 5000 / num_threads_. The + // TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is + // proportional to num_threads_ and we assume that new work is scheduled at + // a constant rate, so we set spin_count to 5000 / num_threads_. The // constant was picked based on a fair dice roll, tune it. const int spin_count = allow_spinning_ && num_threads_ > 0 ? 5000 / num_threads_ : 0; if (num_threads_ == 1) { // For num_threads_ == 1 there is no point in going through the expensive - // steal loop. Moreover, since Steal() calls PopBack() on the victim - // queues it might reverse the order in which ops are executed compared to - // the order in which they are scheduled, which tends to be + // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the + // victim queues it might reverse the order in which ops are executed + // compared to the order in which they are scheduled, which tends to be // counter-productive for the types of I/O workloads the single thread // pools tend to be used for. while (!cancelled_) { @@ -226,22 +298,25 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { while (!cancelled_) { Task t = q.PopFront(); if (!t.f) { - t = Steal(); + t = LocalSteal(); if (!t.f) { - // Leave one thread spinning. This reduces latency. - if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) { - for (int i = 0; i < spin_count && !t.f; i++) { - if (!cancelled_.load(std::memory_order_relaxed)) { - t = Steal(); - } else { - return; + t = GlobalSteal(); + if (!t.f) { + // Leave one thread spinning. This reduces latency. + if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) { + for (int i = 0; i < spin_count && !t.f; i++) { + if (!cancelled_.load(std::memory_order_relaxed)) { + t = GlobalSteal(); + } else { + return; + } } + spinning_ = false; } - spinning_ = false; - } - if (!t.f) { - if (!WaitForWork(waiter, &t)) { - return; + if (!t.f) { + if (!WaitForWork(waiter, &t)) { + return; + } } } } @@ -253,15 +328,18 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { } } - // Steal tries to steal work from other worker threads in best-effort manner. - Task Steal() { + // Steal tries to steal work from other worker threads in the range [start, + // limit) in best-effort manner. + Task Steal(unsigned start, unsigned limit) { PerThread* pt = GetPerThread(); - const size_t size = queues_.size(); + const size_t size = limit - start; unsigned r = Rand(&pt->rand); - unsigned inc = coprimes_[r % coprimes_.size()]; unsigned victim = r % size; + unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()]; + for (unsigned i = 0; i < size; i++) { - Task t = queues_[victim].PopBack(); + eigen_assert(start + victim < limit); + Task t = thread_data_[start + victim].queue.PopBack(); if (t.f) { return t; } @@ -273,6 +351,23 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { return Task(); } + // Steals work within threads belonging to the partition. + Task LocalSteal() { + PerThread* pt = GetPerThread(); + unsigned partition = GetStealPartition(pt->thread_id); + unsigned start, limit; + DecodePartition(partition, &start, &limit); + AssertBounds(start, limit); + + return Steal(start, limit); + } + + // Steals work from any other thread in the pool. + Task GlobalSteal() { + return Steal(0, num_threads_); + } + + // WaitForWork blocks until new work is available (returns true), or if it is // time to exit (returns false). Can optionally return a task to execute in t // (in such case t.f != nullptr on return). @@ -288,7 +383,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { if (cancelled_) { return false; } else { - *t = queues_[victim].PopBack(); + *t = thread_data_[victim].queue.PopBack(); return true; } } @@ -325,12 +420,15 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { int NonEmptyQueueIndex() { PerThread* pt = GetPerThread(); - const size_t size = queues_.size(); + // We intentionally design NonEmptyQueueIndex to steal work from + // anywhere in the queue so threads don't block in WaitForWork() forever + // when all threads in their partition go to sleep. Steal is still local. + const size_t size = thread_data_.size(); unsigned r = Rand(&pt->rand); - unsigned inc = coprimes_[r % coprimes_.size()]; + unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()]; unsigned victim = r % size; for (unsigned i = 0; i < size; i++) { - if (!queues_[victim].Empty()) { + if (!thread_data_[victim].queue.Empty()) { return victim; } victim += inc; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h index 84e1e6cc0..d1fa4b23e 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h @@ -19,6 +19,15 @@ class ThreadPoolInterface { // Submits a closure to be run by a thread in the pool. virtual void Schedule(std::function<void()> fn) = 0; + // Submits a closure to be run by threads in the range [start, end) in the + // pool. + virtual void ScheduleWithHint(std::function<void()> fn, int start, + int end) { + // Just defer to Schedule in case sub-classes aren't interested in + // overriding this functionality. + Schedule(fn); + } + // If implemented, stop processing the closures that have been enqueued. // Currently running closures may still be processed. // If not implemented, does nothing. diff --git a/unsupported/test/cxx11_non_blocking_thread_pool.cpp b/unsupported/test/cxx11_non_blocking_thread_pool.cpp index e73a034b1..90b330fdc 100644 --- a/unsupported/test/cxx11_non_blocking_thread_pool.cpp +++ b/unsupported/test/cxx11_non_blocking_thread_pool.cpp @@ -116,10 +116,63 @@ static void test_cancel() tp.Cancel(); } +static void test_pool_partitions() { + const int kThreads = 2; + ThreadPool tp(kThreads); + + // Assign each thread to its own partition, so that stealing other work only + // occurs globally when a thread is idle. + std::vector<std::pair<unsigned, unsigned>> steal_partitions(kThreads); + for (int i = 0; i < kThreads; ++i) { + steal_partitions[i] = std::make_pair(i, i + 1); + } + tp.SetStealPartitions(steal_partitions); + + std::atomic<int> running(0); + std::atomic<int> done(0); + std::atomic<int> phase(0); + + // Schedule kThreads tasks and ensure that they all are running. + for (int i = 0; i < kThreads; ++i) { + tp.Schedule([&]() { + const int thread_id = tp.CurrentThreadId(); + VERIFY_GE(thread_id, 0); + VERIFY_LE(thread_id, kThreads - 1); + ++running; + while (phase < 1) { + } + ++done; + }); + } + while (running != kThreads) { + } + // Schedule each closure to only run on thread 'i' and verify that it does. + for (int i = 0; i < kThreads; ++i) { + tp.ScheduleWithHint( + [&, i]() { + ++running; + const int thread_id = tp.CurrentThreadId(); + VERIFY_IS_EQUAL(thread_id, i); + while (phase < 2) { + } + ++done; + }, + i, i + 1); + } + running = 0; + phase = 1; + while (running != kThreads) { + } + running = 0; + phase = 2; +} + + EIGEN_DECLARE_TEST(cxx11_non_blocking_thread_pool) { CALL_SUBTEST(test_create_destroy_empty_pool()); CALL_SUBTEST(test_parallelism(true)); CALL_SUBTEST(test_parallelism(false)); CALL_SUBTEST(test_cancel()); + CALL_SUBTEST(test_pool_partitions()); } |