diff options
author | Benoit Steiner <benoit.steiner.goog@gmail.com> | 2015-05-20 13:52:07 -0700 |
---|---|---|
committer | Benoit Steiner <benoit.steiner.goog@gmail.com> | 2015-05-20 13:52:07 -0700 |
commit | 6b800744ce914cf243ac3169e136c5000253f52e (patch) | |
tree | 8e383eec1b6ec168de3f22cedae0e35d7c880823 | |
parent | 48f6b274e2d5a59477a368795867baea40e40eed (diff) |
Moved away from std::async and std::future as the underlying mechnism for the thread pool device. On several platforms, the functions passed to std::async are not scheduled in the order in which they are given to std::async, which leads to massive performance issues in the contraction code.
Instead we now have a custom thread pool that ensures that the functions are picked up by the threads in the pool in the order in which they are enqueued in the pool.
-rw-r--r-- | unsupported/Eigen/CXX11/Tensor | 5 | ||||
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h | 67 | ||||
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h | 162 | ||||
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h | 5 | ||||
-rw-r--r-- | unsupported/test/cxx11_tensor_thread_pool.cpp | 21 |
5 files changed, 212 insertions, 48 deletions
diff --git a/unsupported/Eigen/CXX11/Tensor b/unsupported/Eigen/CXX11/Tensor index 520da66bb..05c5127a1 100644 --- a/unsupported/Eigen/CXX11/Tensor +++ b/unsupported/Eigen/CXX11/Tensor @@ -35,7 +35,10 @@ #endif #ifdef EIGEN_USE_THREADS -#include <future> +#include <condition_variable> +#include <deque> +#include <mutex> +#include <thread> #endif #ifdef EIGEN_USE_GPU diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h index cb2fd53fe..ed87d3100 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h @@ -46,8 +46,8 @@ struct packRhsAndKernelArg { const Index n_block_idx; const Index m_blocks; const Index n_blocks; - std::vector<Promise>* kernel_promises; - const std::vector<Future>* lhs_futures; + std::vector<Notification*>* kernel_notifications; + const std::vector<Notification*>* lhs_notifications; const bool need_to_pack; }; @@ -219,17 +219,13 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT blockBs.push_back(static_cast<RhsScalar *>(this->m_device.allocate(sizeB * sizeof(RhsScalar)))); } - // lhs_futures starts with all null futures - std::vector<Future> lhs_futures(num_threads); + // lhs_notifications starts with all null Notifications + std::vector<Notification*> lhs_notifications(num_threads, nullptr); // this should really be numBlockAs * n_blocks; - const Index num_kernel_promises = num_threads * n_blocks; - std::vector<Promise> kernel_promises(num_kernel_promises); - std::vector<Future> kernel_futures(num_kernel_promises); - for (std::size_t i = 0; i < kernel_promises.size(); ++i) { - kernel_promises[i].set_value(); - kernel_futures[i] = kernel_promises[i].get_future(); - } + const Index num_kernel_notifications = num_threads * n_blocks; + std::vector<Notification*> kernel_notifications(num_kernel_notifications, + nullptr); for (Index k_block_idx = 0; k_block_idx < k_blocks; k_block_idx++) { const Index k_start = k_block_idx * kc; @@ -245,11 +241,16 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT eigen_assert(actual_mc > 0); Index blockAId = (k_block_idx * m_blocks + mt_block_idx) % num_threads; + for (int i = 0; i < n_blocks; ++i) { - Index future_id = (blockAId * n_blocks + i); - wait_until_ready(&kernel_futures[future_id]); - kernel_promises[future_id] = Promise(); - kernel_futures[future_id] = kernel_promises[future_id].get_future(); + Index notification_id = (blockAId * n_blocks + i); + // Wait for any current kernels using this slot to complete + // before using it. + if (kernel_notifications[notification_id]) { + wait_until_ready(kernel_notifications[notification_id]); + delete kernel_notifications[notification_id]; + } + kernel_notifications[notification_id] = new Notification(); } const packLArg arg = { blockAs[blockAId], // blockA @@ -260,8 +261,12 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT actual_kc, // kc }; - lhs_futures[blockAId] = - this->m_device.enqueue(&Self::packLhs<packLArg, LhsPacker>, arg); + // Delete any existing notification since we may be + // replacing it. The algorithm should ensure that there are + // no existing waiters on this notification. + delete lhs_notifications[blockAId]; + lhs_notifications[blockAId] = + this->m_device.enqueue(&Self::packLhs<packLArg, LhsPacker>, arg); } // now start kernels. @@ -278,7 +283,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT for (Index i = num_blocks; i < num_threads; ++i) { Index blockAId = (k_block_idx * m_blocks + i + m_block_idx) % num_threads; Index future_id = (blockAId * n_blocks + n_block_idx); - wait_until_ready(&kernel_futures[future_id]); + wait_until_ready(kernel_notifications[future_id]); } } @@ -301,19 +306,29 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT n_block_idx, // n_block_idx m_blocks, // m_blocks n_blocks, // n_blocks - &kernel_promises, // kernel_promises - &lhs_futures, // lhs_futures + &kernel_notifications, // kernel notifications + &lhs_notifications, // lhs notifications need_to_pack, // need_to_pack }; - this->m_device.enqueueNoFuture(&Self::packRhsAndKernel<packRKArg, RhsPacker, GebpKernel>, arg); + // We asynchronously kick off this function, which ends up + // notifying the appropriate kernel_notifications objects, + // which this thread waits on before exiting. + this->m_device.enqueueNoNotification(&Self::packRhsAndKernel<packRKArg, RhsPacker, GebpKernel>, arg); } } } // Make sure all the kernels are done. - for (size_t i = 0; i < kernel_futures.size(); ++i) { - wait_until_ready(&kernel_futures[i]); + for (size_t i = 0; i < kernel_notifications.size(); ++i) { + wait_until_ready(kernel_notifications[i]); + delete kernel_notifications[i]; + } + + // No need to wait for lhs notifications since they should have + // already been waited on. Just clean them up. + for (size_t i = 0; i < lhs_notifications.size(); ++i) { + delete lhs_notifications[i]; } // deallocate all of the memory for both A and B's @@ -360,15 +375,15 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT const Index m_base_start = arg.m + arg.mc*mt_block_idx; if (m_base_start < arg.max_m) { Index blockAId = (arg.k_block_idx * arg.m_blocks + mt_block_idx + arg.m_block_idx) % arg.num_threads; - - wait_until_ready(&(*arg.lhs_futures)[blockAId]); + wait_until_ready((*arg.lhs_notifications)[blockAId]); const Index actual_mc = (std::min)(m_base_start + arg.mc, arg.max_m) - m_base_start; gebp(arg.output.getSubMapper(m_base_start, arg.n), (*arg.blockAs)[blockAId], arg.blockB, actual_mc, arg.kc, arg.nc, 1.0, -1, -1, 0, 0); + // Notify that the kernel is done. const Index set_idx = blockAId * arg.n_blocks + arg.n_block_idx; - (*arg.kernel_promises)[set_idx].set_value(); + (*arg.kernel_notifications)[set_idx]->Notify(); } } } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h index efd207507..1018395a1 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h @@ -38,19 +38,151 @@ struct DefaultDevice { // We should really use a thread pool here but first we need to find a portable thread pool library. #ifdef EIGEN_USE_THREADS -typedef std::future<void> Future; -typedef std::promise<void> Promise; +// The implementation of the ThreadPool type ensures that the Schedule method +// runs the functions it is provided in FIFO order when the scheduling is done +// by a single thread. +class ThreadPool { + public: + // Construct a pool that contains "num_threads" threads. + explicit ThreadPool(int num_threads) { + for (int i = 0; i < num_threads; i++) { + threads_.push_back(new std::thread([this]() { WorkerLoop(); })); + } + } -static EIGEN_STRONG_INLINE void wait_until_ready(const Future* f) { - f->wait(); -} -static EIGEN_STRONG_INLINE void get_when_ready(Future* f) { - f->get(); + // Wait until all scheduled work has finished and then destroy the + // set of threads. + ~ThreadPool() + { + { + // Wait for all work to get done. + std::unique_lock<std::mutex> l(mu_); + empty_.wait(l, [this]() { return pending_.empty(); }); + exiting_ = true; + + // Wakeup all waiters. + for (auto w : waiters_) { + w->ready = true; + w->work = nullptr; + w->cv.notify_one(); + } + } + + // Wait for threads to finish. + for (auto t : threads_) { + t->join(); + delete t; + } + } + + // Schedule fn() for execution in the pool of threads. The functions are + // executed in the order in which they are scheduled. + void Schedule(std::function<void()> fn) { + std::unique_lock<std::mutex> l(mu_); + if (waiters_.empty()) { + pending_.push_back(fn); + } else { + Waiter* w = waiters_.back(); + waiters_.pop_back(); + w->ready = true; + w->work = fn; + w->cv.notify_one(); + } + } + + protected: + void WorkerLoop() { + std::unique_lock<std::mutex> l(mu_); + Waiter w; + while (!exiting_) { + std::function<void()> fn; + if (pending_.empty()) { + // Wait for work to be assigned to me + w.ready = false; + waiters_.push_back(&w); + w.cv.wait(l, [&w]() { return w.ready; }); + fn = w.work; + w.work = nullptr; + } else { + // Pick up pending work + fn = pending_.front(); + pending_.pop_front(); + if (pending_.empty()) { + empty_.notify_all(); + } + } + if (fn) { + mu_.unlock(); + fn(); + mu_.lock(); + } + } + } + + private: + struct Waiter { + std::condition_variable cv; + std::function<void()> work; + bool ready; + }; + + std::mutex mu_; + std::vector<std::thread*> threads_; // All threads + std::vector<Waiter*> waiters_; // Stack of waiting threads. + std::deque<std::function<void()>> pending_; // Queue of pending work + std::condition_variable empty_; // Signaled on pending_.empty() + bool exiting_ = false; +}; + + +// Notification is an object that allows a user to to wait for another +// thread to signal a notification that an event has occurred. +// +// Multiple threads can wait on the same Notification object. +// but only one caller must call Notify() on the object. +class Notification { + public: + Notification() : notified_(false) {} + ~Notification() {} + + void Notify() { + std::unique_lock<std::mutex> l(mu_); + eigen_assert(!notified_); + notified_ = true; + cv_.notify_all(); + } + + void WaitForNotification() { + std::unique_lock<std::mutex> l(mu_); + cv_.wait(l, [this]() { return notified_; } ); + } + + private: + std::mutex mu_; + std::condition_variable cv_; + bool notified_; +}; + +// Runs an arbitrary function and then calls Notify() on the passed in +// Notification. +template <typename Function, typename... Args> struct FunctionWrapper +{ + static void run(Notification* n, Function f, Args... args) { + f(args...); + n->Notify(); + } +}; + +static EIGEN_STRONG_INLINE void wait_until_ready(Notification* n) { + if (n) { + n->WaitForNotification(); + } } +// Build a thread pool device on top the an existing pool of threads. struct ThreadPoolDevice { - ThreadPoolDevice(size_t num_cores) : num_threads_(num_cores) { } + ThreadPoolDevice(ThreadPool* pool, size_t num_cores) : pool_(pool), num_threads_(num_cores) { } EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const { return internal::aligned_malloc(num_bytes); @@ -73,15 +205,21 @@ struct ThreadPoolDevice { } template <class Function, class... Args> - EIGEN_STRONG_INLINE Future enqueue(Function&& f, Args&&... args) const { - return std::async(std::launch::async, f, args...); + EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const { + Notification* n = new Notification(); + std::function<void()> func = + std::bind(&FunctionWrapper<Function, Args...>::run, n, f, args...); + pool_->Schedule(func); + return n; } template <class Function, class... Args> - EIGEN_STRONG_INLINE void enqueueNoFuture(Function&& f, Args&&... args) const { - std::async(std::launch::async, f, args...); + EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const { + std::function<void()> func = std::bind(f, args...); + pool_->Schedule(func); } private: + ThreadPool* pool_; size_t num_threads_; }; diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h index 02e1667b9..6ea588e4b 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h @@ -131,7 +131,7 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable> const Index blocksize = std::max<Index>(PacketSize, (blocksz - (blocksz % PacketSize))); const Index numblocks = size / blocksize; - std::vector<Future> results; + std::vector<Notification*> results; results.reserve(numblocks); for (int i = 0; i < numblocks; ++i) { results.push_back(device.enqueue(&EvalRange<Evaluator, Index>::run, evaluator, i*blocksize, (i+1)*blocksize)); @@ -142,7 +142,8 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable> } for (int i = 0; i < numblocks; ++i) { - get_when_ready(&results[i]); + wait_until_ready(results[i]); + delete results[i]; } } diff --git a/unsupported/test/cxx11_tensor_thread_pool.cpp b/unsupported/test/cxx11_tensor_thread_pool.cpp index 6fe65c7f9..05b55f706 100644 --- a/unsupported/test/cxx11_tensor_thread_pool.cpp +++ b/unsupported/test/cxx11_tensor_thread_pool.cpp @@ -26,7 +26,8 @@ static void test_multithread_elementwise() in1.setRandom(); in2.setRandom(); - Eigen::ThreadPoolDevice thread_pool_device(internal::random<int>(3, 11)); + Eigen::ThreadPool tp(internal::random<int>(3, 11)); + Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11)); out.device(thread_pool_device) = in1 + in2 * 3.14f; for (int i = 0; i < 2; ++i) { @@ -48,7 +49,8 @@ static void test_multithread_compound_assignment() in1.setRandom(); in2.setRandom(); - Eigen::ThreadPoolDevice thread_pool_device(internal::random<int>(3, 11)); + Eigen::ThreadPool tp(internal::random<int>(3, 11)); + Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11)); out.device(thread_pool_device) = in1; out.device(thread_pool_device) += in2 * 3.14f; @@ -80,7 +82,8 @@ static void test_multithread_contraction() MapXf m_right(t_right.data(), 1147, 1400); Matrix<float, Dynamic, Dynamic, DataLayout> m_result(1500, 1400); - Eigen::ThreadPoolDevice thread_pool_device(4); + Eigen::ThreadPool tp(4); + Eigen::ThreadPoolDevice thread_pool_device(&tp, 4); // compute results by separate methods t_result.device(thread_pool_device) = t_left.contract(t_right, dims); @@ -115,7 +118,8 @@ static void test_contraction_corner_cases() MapXf m_right(t_right.data(), 32, 28*28); Matrix<float, Dynamic, Dynamic, DataLayout> m_result(500, 28*28); - Eigen::ThreadPoolDevice thread_pool_device(12); + Eigen::ThreadPool tp(12); + Eigen::ThreadPoolDevice thread_pool_device(&tp, 12); // compute results by separate methods t_result.device(thread_pool_device) = t_left.contract(t_right, dims); @@ -204,7 +208,8 @@ static void test_multithread_contraction_agrees_with_singlethread() { typedef Tensor<float, 1>::DimensionPair DimPair; Eigen::array<DimPair, 1> dims({{DimPair(1, 2)}}); - Eigen::ThreadPoolDevice thread_pool_device(internal::random<int>(2, 11)); + Eigen::ThreadPool tp(internal::random<int>(2, 11)); + Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(2, 11)); Tensor<float, 5, DataLayout> st_result; st_result = left.contract(right, dims); @@ -227,7 +232,8 @@ static void test_memcpy() { for (int i = 0; i < 5; ++i) { const int num_threads = internal::random<int>(3, 11); - Eigen::ThreadPoolDevice thread_pool_device(num_threads); + Eigen::ThreadPool tp(num_threads); + Eigen::ThreadPoolDevice thread_pool_device(&tp, num_threads); const int size = internal::random<int>(13, 7632); Tensor<float, 1> t1(size); @@ -243,7 +249,8 @@ static void test_memcpy() { static void test_multithread_random() { - Eigen::ThreadPoolDevice device(2); + Eigen::ThreadPool tp(2); + Eigen::ThreadPoolDevice device(&tp, 2); Tensor<float, 1> t(1 << 20); t.device(device) = t.random<Eigen::internal::NormalRandomGenerator<float>>(); } |