aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2015-05-20 13:52:07 -0700
committerGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2015-05-20 13:52:07 -0700
commit6b800744ce914cf243ac3169e136c5000253f52e (patch)
tree8e383eec1b6ec168de3f22cedae0e35d7c880823
parent48f6b274e2d5a59477a368795867baea40e40eed (diff)
Moved away from std::async and std::future as the underlying mechnism for the thread pool device. On several platforms, the functions passed to std::async are not scheduled in the order in which they are given to std::async, which leads to massive performance issues in the contraction code.
Instead we now have a custom thread pool that ensures that the functions are picked up by the threads in the pool in the order in which they are enqueued in the pool.
-rw-r--r--unsupported/Eigen/CXX11/Tensor5
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h67
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h162
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h5
-rw-r--r--unsupported/test/cxx11_tensor_thread_pool.cpp21
5 files changed, 212 insertions, 48 deletions
diff --git a/unsupported/Eigen/CXX11/Tensor b/unsupported/Eigen/CXX11/Tensor
index 520da66bb..05c5127a1 100644
--- a/unsupported/Eigen/CXX11/Tensor
+++ b/unsupported/Eigen/CXX11/Tensor
@@ -35,7 +35,10 @@
#endif
#ifdef EIGEN_USE_THREADS
-#include <future>
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+#include <thread>
#endif
#ifdef EIGEN_USE_GPU
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
index cb2fd53fe..ed87d3100 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
@@ -46,8 +46,8 @@ struct packRhsAndKernelArg {
const Index n_block_idx;
const Index m_blocks;
const Index n_blocks;
- std::vector<Promise>* kernel_promises;
- const std::vector<Future>* lhs_futures;
+ std::vector<Notification*>* kernel_notifications;
+ const std::vector<Notification*>* lhs_notifications;
const bool need_to_pack;
};
@@ -219,17 +219,13 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
blockBs.push_back(static_cast<RhsScalar *>(this->m_device.allocate(sizeB * sizeof(RhsScalar))));
}
- // lhs_futures starts with all null futures
- std::vector<Future> lhs_futures(num_threads);
+ // lhs_notifications starts with all null Notifications
+ std::vector<Notification*> lhs_notifications(num_threads, nullptr);
// this should really be numBlockAs * n_blocks;
- const Index num_kernel_promises = num_threads * n_blocks;
- std::vector<Promise> kernel_promises(num_kernel_promises);
- std::vector<Future> kernel_futures(num_kernel_promises);
- for (std::size_t i = 0; i < kernel_promises.size(); ++i) {
- kernel_promises[i].set_value();
- kernel_futures[i] = kernel_promises[i].get_future();
- }
+ const Index num_kernel_notifications = num_threads * n_blocks;
+ std::vector<Notification*> kernel_notifications(num_kernel_notifications,
+ nullptr);
for (Index k_block_idx = 0; k_block_idx < k_blocks; k_block_idx++) {
const Index k_start = k_block_idx * kc;
@@ -245,11 +241,16 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
eigen_assert(actual_mc > 0);
Index blockAId = (k_block_idx * m_blocks + mt_block_idx) % num_threads;
+
for (int i = 0; i < n_blocks; ++i) {
- Index future_id = (blockAId * n_blocks + i);
- wait_until_ready(&kernel_futures[future_id]);
- kernel_promises[future_id] = Promise();
- kernel_futures[future_id] = kernel_promises[future_id].get_future();
+ Index notification_id = (blockAId * n_blocks + i);
+ // Wait for any current kernels using this slot to complete
+ // before using it.
+ if (kernel_notifications[notification_id]) {
+ wait_until_ready(kernel_notifications[notification_id]);
+ delete kernel_notifications[notification_id];
+ }
+ kernel_notifications[notification_id] = new Notification();
}
const packLArg arg = {
blockAs[blockAId], // blockA
@@ -260,8 +261,12 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
actual_kc, // kc
};
- lhs_futures[blockAId] =
- this->m_device.enqueue(&Self::packLhs<packLArg, LhsPacker>, arg);
+ // Delete any existing notification since we may be
+ // replacing it. The algorithm should ensure that there are
+ // no existing waiters on this notification.
+ delete lhs_notifications[blockAId];
+ lhs_notifications[blockAId] =
+ this->m_device.enqueue(&Self::packLhs<packLArg, LhsPacker>, arg);
}
// now start kernels.
@@ -278,7 +283,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
for (Index i = num_blocks; i < num_threads; ++i) {
Index blockAId = (k_block_idx * m_blocks + i + m_block_idx) % num_threads;
Index future_id = (blockAId * n_blocks + n_block_idx);
- wait_until_ready(&kernel_futures[future_id]);
+ wait_until_ready(kernel_notifications[future_id]);
}
}
@@ -301,19 +306,29 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
n_block_idx, // n_block_idx
m_blocks, // m_blocks
n_blocks, // n_blocks
- &kernel_promises, // kernel_promises
- &lhs_futures, // lhs_futures
+ &kernel_notifications, // kernel notifications
+ &lhs_notifications, // lhs notifications
need_to_pack, // need_to_pack
};
- this->m_device.enqueueNoFuture(&Self::packRhsAndKernel<packRKArg, RhsPacker, GebpKernel>, arg);
+ // We asynchronously kick off this function, which ends up
+ // notifying the appropriate kernel_notifications objects,
+ // which this thread waits on before exiting.
+ this->m_device.enqueueNoNotification(&Self::packRhsAndKernel<packRKArg, RhsPacker, GebpKernel>, arg);
}
}
}
// Make sure all the kernels are done.
- for (size_t i = 0; i < kernel_futures.size(); ++i) {
- wait_until_ready(&kernel_futures[i]);
+ for (size_t i = 0; i < kernel_notifications.size(); ++i) {
+ wait_until_ready(kernel_notifications[i]);
+ delete kernel_notifications[i];
+ }
+
+ // No need to wait for lhs notifications since they should have
+ // already been waited on. Just clean them up.
+ for (size_t i = 0; i < lhs_notifications.size(); ++i) {
+ delete lhs_notifications[i];
}
// deallocate all of the memory for both A and B's
@@ -360,15 +375,15 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
const Index m_base_start = arg.m + arg.mc*mt_block_idx;
if (m_base_start < arg.max_m) {
Index blockAId = (arg.k_block_idx * arg.m_blocks + mt_block_idx + arg.m_block_idx) % arg.num_threads;
-
- wait_until_ready(&(*arg.lhs_futures)[blockAId]);
+ wait_until_ready((*arg.lhs_notifications)[blockAId]);
const Index actual_mc = (std::min)(m_base_start + arg.mc, arg.max_m) - m_base_start;
gebp(arg.output.getSubMapper(m_base_start, arg.n),
(*arg.blockAs)[blockAId], arg.blockB,
actual_mc, arg.kc, arg.nc, 1.0, -1, -1, 0, 0);
+ // Notify that the kernel is done.
const Index set_idx = blockAId * arg.n_blocks + arg.n_block_idx;
- (*arg.kernel_promises)[set_idx].set_value();
+ (*arg.kernel_notifications)[set_idx]->Notify();
}
}
}
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h
index efd207507..1018395a1 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h
@@ -38,19 +38,151 @@ struct DefaultDevice {
// We should really use a thread pool here but first we need to find a portable thread pool library.
#ifdef EIGEN_USE_THREADS
-typedef std::future<void> Future;
-typedef std::promise<void> Promise;
+// The implementation of the ThreadPool type ensures that the Schedule method
+// runs the functions it is provided in FIFO order when the scheduling is done
+// by a single thread.
+class ThreadPool {
+ public:
+ // Construct a pool that contains "num_threads" threads.
+ explicit ThreadPool(int num_threads) {
+ for (int i = 0; i < num_threads; i++) {
+ threads_.push_back(new std::thread([this]() { WorkerLoop(); }));
+ }
+ }
-static EIGEN_STRONG_INLINE void wait_until_ready(const Future* f) {
- f->wait();
-}
-static EIGEN_STRONG_INLINE void get_when_ready(Future* f) {
- f->get();
+ // Wait until all scheduled work has finished and then destroy the
+ // set of threads.
+ ~ThreadPool()
+ {
+ {
+ // Wait for all work to get done.
+ std::unique_lock<std::mutex> l(mu_);
+ empty_.wait(l, [this]() { return pending_.empty(); });
+ exiting_ = true;
+
+ // Wakeup all waiters.
+ for (auto w : waiters_) {
+ w->ready = true;
+ w->work = nullptr;
+ w->cv.notify_one();
+ }
+ }
+
+ // Wait for threads to finish.
+ for (auto t : threads_) {
+ t->join();
+ delete t;
+ }
+ }
+
+ // Schedule fn() for execution in the pool of threads. The functions are
+ // executed in the order in which they are scheduled.
+ void Schedule(std::function<void()> fn) {
+ std::unique_lock<std::mutex> l(mu_);
+ if (waiters_.empty()) {
+ pending_.push_back(fn);
+ } else {
+ Waiter* w = waiters_.back();
+ waiters_.pop_back();
+ w->ready = true;
+ w->work = fn;
+ w->cv.notify_one();
+ }
+ }
+
+ protected:
+ void WorkerLoop() {
+ std::unique_lock<std::mutex> l(mu_);
+ Waiter w;
+ while (!exiting_) {
+ std::function<void()> fn;
+ if (pending_.empty()) {
+ // Wait for work to be assigned to me
+ w.ready = false;
+ waiters_.push_back(&w);
+ w.cv.wait(l, [&w]() { return w.ready; });
+ fn = w.work;
+ w.work = nullptr;
+ } else {
+ // Pick up pending work
+ fn = pending_.front();
+ pending_.pop_front();
+ if (pending_.empty()) {
+ empty_.notify_all();
+ }
+ }
+ if (fn) {
+ mu_.unlock();
+ fn();
+ mu_.lock();
+ }
+ }
+ }
+
+ private:
+ struct Waiter {
+ std::condition_variable cv;
+ std::function<void()> work;
+ bool ready;
+ };
+
+ std::mutex mu_;
+ std::vector<std::thread*> threads_; // All threads
+ std::vector<Waiter*> waiters_; // Stack of waiting threads.
+ std::deque<std::function<void()>> pending_; // Queue of pending work
+ std::condition_variable empty_; // Signaled on pending_.empty()
+ bool exiting_ = false;
+};
+
+
+// Notification is an object that allows a user to to wait for another
+// thread to signal a notification that an event has occurred.
+//
+// Multiple threads can wait on the same Notification object.
+// but only one caller must call Notify() on the object.
+class Notification {
+ public:
+ Notification() : notified_(false) {}
+ ~Notification() {}
+
+ void Notify() {
+ std::unique_lock<std::mutex> l(mu_);
+ eigen_assert(!notified_);
+ notified_ = true;
+ cv_.notify_all();
+ }
+
+ void WaitForNotification() {
+ std::unique_lock<std::mutex> l(mu_);
+ cv_.wait(l, [this]() { return notified_; } );
+ }
+
+ private:
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool notified_;
+};
+
+// Runs an arbitrary function and then calls Notify() on the passed in
+// Notification.
+template <typename Function, typename... Args> struct FunctionWrapper
+{
+ static void run(Notification* n, Function f, Args... args) {
+ f(args...);
+ n->Notify();
+ }
+};
+
+static EIGEN_STRONG_INLINE void wait_until_ready(Notification* n) {
+ if (n) {
+ n->WaitForNotification();
+ }
}
+// Build a thread pool device on top the an existing pool of threads.
struct ThreadPoolDevice {
- ThreadPoolDevice(size_t num_cores) : num_threads_(num_cores) { }
+ ThreadPoolDevice(ThreadPool* pool, size_t num_cores) : pool_(pool), num_threads_(num_cores) { }
EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
return internal::aligned_malloc(num_bytes);
@@ -73,15 +205,21 @@ struct ThreadPoolDevice {
}
template <class Function, class... Args>
- EIGEN_STRONG_INLINE Future enqueue(Function&& f, Args&&... args) const {
- return std::async(std::launch::async, f, args...);
+ EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const {
+ Notification* n = new Notification();
+ std::function<void()> func =
+ std::bind(&FunctionWrapper<Function, Args...>::run, n, f, args...);
+ pool_->Schedule(func);
+ return n;
}
template <class Function, class... Args>
- EIGEN_STRONG_INLINE void enqueueNoFuture(Function&& f, Args&&... args) const {
- std::async(std::launch::async, f, args...);
+ EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
+ std::function<void()> func = std::bind(f, args...);
+ pool_->Schedule(func);
}
private:
+ ThreadPool* pool_;
size_t num_threads_;
};
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
index 02e1667b9..6ea588e4b 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
@@ -131,7 +131,7 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable>
const Index blocksize = std::max<Index>(PacketSize, (blocksz - (blocksz % PacketSize)));
const Index numblocks = size / blocksize;
- std::vector<Future> results;
+ std::vector<Notification*> results;
results.reserve(numblocks);
for (int i = 0; i < numblocks; ++i) {
results.push_back(device.enqueue(&EvalRange<Evaluator, Index>::run, evaluator, i*blocksize, (i+1)*blocksize));
@@ -142,7 +142,8 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable>
}
for (int i = 0; i < numblocks; ++i) {
- get_when_ready(&results[i]);
+ wait_until_ready(results[i]);
+ delete results[i];
}
}
diff --git a/unsupported/test/cxx11_tensor_thread_pool.cpp b/unsupported/test/cxx11_tensor_thread_pool.cpp
index 6fe65c7f9..05b55f706 100644
--- a/unsupported/test/cxx11_tensor_thread_pool.cpp
+++ b/unsupported/test/cxx11_tensor_thread_pool.cpp
@@ -26,7 +26,8 @@ static void test_multithread_elementwise()
in1.setRandom();
in2.setRandom();
- Eigen::ThreadPoolDevice thread_pool_device(internal::random<int>(3, 11));
+ Eigen::ThreadPool tp(internal::random<int>(3, 11));
+ Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11));
out.device(thread_pool_device) = in1 + in2 * 3.14f;
for (int i = 0; i < 2; ++i) {
@@ -48,7 +49,8 @@ static void test_multithread_compound_assignment()
in1.setRandom();
in2.setRandom();
- Eigen::ThreadPoolDevice thread_pool_device(internal::random<int>(3, 11));
+ Eigen::ThreadPool tp(internal::random<int>(3, 11));
+ Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11));
out.device(thread_pool_device) = in1;
out.device(thread_pool_device) += in2 * 3.14f;
@@ -80,7 +82,8 @@ static void test_multithread_contraction()
MapXf m_right(t_right.data(), 1147, 1400);
Matrix<float, Dynamic, Dynamic, DataLayout> m_result(1500, 1400);
- Eigen::ThreadPoolDevice thread_pool_device(4);
+ Eigen::ThreadPool tp(4);
+ Eigen::ThreadPoolDevice thread_pool_device(&tp, 4);
// compute results by separate methods
t_result.device(thread_pool_device) = t_left.contract(t_right, dims);
@@ -115,7 +118,8 @@ static void test_contraction_corner_cases()
MapXf m_right(t_right.data(), 32, 28*28);
Matrix<float, Dynamic, Dynamic, DataLayout> m_result(500, 28*28);
- Eigen::ThreadPoolDevice thread_pool_device(12);
+ Eigen::ThreadPool tp(12);
+ Eigen::ThreadPoolDevice thread_pool_device(&tp, 12);
// compute results by separate methods
t_result.device(thread_pool_device) = t_left.contract(t_right, dims);
@@ -204,7 +208,8 @@ static void test_multithread_contraction_agrees_with_singlethread() {
typedef Tensor<float, 1>::DimensionPair DimPair;
Eigen::array<DimPair, 1> dims({{DimPair(1, 2)}});
- Eigen::ThreadPoolDevice thread_pool_device(internal::random<int>(2, 11));
+ Eigen::ThreadPool tp(internal::random<int>(2, 11));
+ Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(2, 11));
Tensor<float, 5, DataLayout> st_result;
st_result = left.contract(right, dims);
@@ -227,7 +232,8 @@ static void test_memcpy() {
for (int i = 0; i < 5; ++i) {
const int num_threads = internal::random<int>(3, 11);
- Eigen::ThreadPoolDevice thread_pool_device(num_threads);
+ Eigen::ThreadPool tp(num_threads);
+ Eigen::ThreadPoolDevice thread_pool_device(&tp, num_threads);
const int size = internal::random<int>(13, 7632);
Tensor<float, 1> t1(size);
@@ -243,7 +249,8 @@ static void test_memcpy() {
static void test_multithread_random()
{
- Eigen::ThreadPoolDevice device(2);
+ Eigen::ThreadPool tp(2);
+ Eigen::ThreadPoolDevice device(&tp, 2);
Tensor<float, 1> t(1 << 20);
t.device(device) = t.random<Eigen::internal::NormalRandomGenerator<float>>();
}