aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported
diff options
context:
space:
mode:
authorGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2015-05-20 13:52:07 -0700
committerGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2015-05-20 13:52:07 -0700
commit6b800744ce914cf243ac3169e136c5000253f52e (patch)
tree8e383eec1b6ec168de3f22cedae0e35d7c880823 /unsupported
parent48f6b274e2d5a59477a368795867baea40e40eed (diff)
Moved away from std::async and std::future as the underlying mechnism for the thread pool device. On several platforms, the functions passed to std::async are not scheduled in the order in which they are given to std::async, which leads to massive performance issues in the contraction code.
Instead we now have a custom thread pool that ensures that the functions are picked up by the threads in the pool in the order in which they are enqueued in the pool.
Diffstat (limited to 'unsupported')
-rw-r--r--unsupported/Eigen/CXX11/Tensor5
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h67
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h162
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h5
-rw-r--r--unsupported/test/cxx11_tensor_thread_pool.cpp21
5 files changed, 212 insertions, 48 deletions
diff --git a/unsupported/Eigen/CXX11/Tensor b/unsupported/Eigen/CXX11/Tensor
index 520da66bb..05c5127a1 100644
--- a/unsupported/Eigen/CXX11/Tensor
+++ b/unsupported/Eigen/CXX11/Tensor
@@ -35,7 +35,10 @@
#endif
#ifdef EIGEN_USE_THREADS
-#include <future>
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+#include <thread>
#endif
#ifdef EIGEN_USE_GPU
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
index cb2fd53fe..ed87d3100 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
@@ -46,8 +46,8 @@ struct packRhsAndKernelArg {
const Index n_block_idx;
const Index m_blocks;
const Index n_blocks;
- std::vector<Promise>* kernel_promises;
- const std::vector<Future>* lhs_futures;
+ std::vector<Notification*>* kernel_notifications;
+ const std::vector<Notification*>* lhs_notifications;
const bool need_to_pack;
};
@@ -219,17 +219,13 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
blockBs.push_back(static_cast<RhsScalar *>(this->m_device.allocate(sizeB * sizeof(RhsScalar))));
}
- // lhs_futures starts with all null futures
- std::vector<Future> lhs_futures(num_threads);
+ // lhs_notifications starts with all null Notifications
+ std::vector<Notification*> lhs_notifications(num_threads, nullptr);
// this should really be numBlockAs * n_blocks;
- const Index num_kernel_promises = num_threads * n_blocks;
- std::vector<Promise> kernel_promises(num_kernel_promises);
- std::vector<Future> kernel_futures(num_kernel_promises);
- for (std::size_t i = 0; i < kernel_promises.size(); ++i) {
- kernel_promises[i].set_value();
- kernel_futures[i] = kernel_promises[i].get_future();
- }
+ const Index num_kernel_notifications = num_threads * n_blocks;
+ std::vector<Notification*> kernel_notifications(num_kernel_notifications,
+ nullptr);
for (Index k_block_idx = 0; k_block_idx < k_blocks; k_block_idx++) {
const Index k_start = k_block_idx * kc;
@@ -245,11 +241,16 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
eigen_assert(actual_mc > 0);
Index blockAId = (k_block_idx * m_blocks + mt_block_idx) % num_threads;
+
for (int i = 0; i < n_blocks; ++i) {
- Index future_id = (blockAId * n_blocks + i);
- wait_until_ready(&kernel_futures[future_id]);
- kernel_promises[future_id] = Promise();
- kernel_futures[future_id] = kernel_promises[future_id].get_future();
+ Index notification_id = (blockAId * n_blocks + i);
+ // Wait for any current kernels using this slot to complete
+ // before using it.
+ if (kernel_notifications[notification_id]) {
+ wait_until_ready(kernel_notifications[notification_id]);
+ delete kernel_notifications[notification_id];
+ }
+ kernel_notifications[notification_id] = new Notification();
}
const packLArg arg = {
blockAs[blockAId], // blockA
@@ -260,8 +261,12 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
actual_kc, // kc
};
- lhs_futures[blockAId] =
- this->m_device.enqueue(&Self::packLhs<packLArg, LhsPacker>, arg);
+ // Delete any existing notification since we may be
+ // replacing it. The algorithm should ensure that there are
+ // no existing waiters on this notification.
+ delete lhs_notifications[blockAId];
+ lhs_notifications[blockAId] =
+ this->m_device.enqueue(&Self::packLhs<packLArg, LhsPacker>, arg);
}
// now start kernels.
@@ -278,7 +283,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
for (Index i = num_blocks; i < num_threads; ++i) {
Index blockAId = (k_block_idx * m_blocks + i + m_block_idx) % num_threads;
Index future_id = (blockAId * n_blocks + n_block_idx);
- wait_until_ready(&kernel_futures[future_id]);
+ wait_until_ready(kernel_notifications[future_id]);
}
}
@@ -301,19 +306,29 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
n_block_idx, // n_block_idx
m_blocks, // m_blocks
n_blocks, // n_blocks
- &kernel_promises, // kernel_promises
- &lhs_futures, // lhs_futures
+ &kernel_notifications, // kernel notifications
+ &lhs_notifications, // lhs notifications
need_to_pack, // need_to_pack
};
- this->m_device.enqueueNoFuture(&Self::packRhsAndKernel<packRKArg, RhsPacker, GebpKernel>, arg);
+ // We asynchronously kick off this function, which ends up
+ // notifying the appropriate kernel_notifications objects,
+ // which this thread waits on before exiting.
+ this->m_device.enqueueNoNotification(&Self::packRhsAndKernel<packRKArg, RhsPacker, GebpKernel>, arg);
}
}
}
// Make sure all the kernels are done.
- for (size_t i = 0; i < kernel_futures.size(); ++i) {
- wait_until_ready(&kernel_futures[i]);
+ for (size_t i = 0; i < kernel_notifications.size(); ++i) {
+ wait_until_ready(kernel_notifications[i]);
+ delete kernel_notifications[i];
+ }
+
+ // No need to wait for lhs notifications since they should have
+ // already been waited on. Just clean them up.
+ for (size_t i = 0; i < lhs_notifications.size(); ++i) {
+ delete lhs_notifications[i];
}
// deallocate all of the memory for both A and B's
@@ -360,15 +375,15 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
const Index m_base_start = arg.m + arg.mc*mt_block_idx;
if (m_base_start < arg.max_m) {
Index blockAId = (arg.k_block_idx * arg.m_blocks + mt_block_idx + arg.m_block_idx) % arg.num_threads;
-
- wait_until_ready(&(*arg.lhs_futures)[blockAId]);
+ wait_until_ready((*arg.lhs_notifications)[blockAId]);
const Index actual_mc = (std::min)(m_base_start + arg.mc, arg.max_m) - m_base_start;
gebp(arg.output.getSubMapper(m_base_start, arg.n),
(*arg.blockAs)[blockAId], arg.blockB,
actual_mc, arg.kc, arg.nc, 1.0, -1, -1, 0, 0);
+ // Notify that the kernel is done.
const Index set_idx = blockAId * arg.n_blocks + arg.n_block_idx;
- (*arg.kernel_promises)[set_idx].set_value();
+ (*arg.kernel_notifications)[set_idx]->Notify();
}
}
}
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h
index efd207507..1018395a1 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h
@@ -38,19 +38,151 @@ struct DefaultDevice {
// We should really use a thread pool here but first we need to find a portable thread pool library.
#ifdef EIGEN_USE_THREADS
-typedef std::future<void> Future;
-typedef std::promise<void> Promise;
+// The implementation of the ThreadPool type ensures that the Schedule method
+// runs the functions it is provided in FIFO order when the scheduling is done
+// by a single thread.
+class ThreadPool {
+ public:
+ // Construct a pool that contains "num_threads" threads.
+ explicit ThreadPool(int num_threads) {
+ for (int i = 0; i < num_threads; i++) {
+ threads_.push_back(new std::thread([this]() { WorkerLoop(); }));
+ }
+ }
-static EIGEN_STRONG_INLINE void wait_until_ready(const Future* f) {
- f->wait();
-}
-static EIGEN_STRONG_INLINE void get_when_ready(Future* f) {
- f->get();
+ // Wait until all scheduled work has finished and then destroy the
+ // set of threads.
+ ~ThreadPool()
+ {
+ {
+ // Wait for all work to get done.
+ std::unique_lock<std::mutex> l(mu_);
+ empty_.wait(l, [this]() { return pending_.empty(); });
+ exiting_ = true;
+
+ // Wakeup all waiters.
+ for (auto w : waiters_) {
+ w->ready = true;
+ w->work = nullptr;
+ w->cv.notify_one();
+ }
+ }
+
+ // Wait for threads to finish.
+ for (auto t : threads_) {
+ t->join();
+ delete t;
+ }
+ }
+
+ // Schedule fn() for execution in the pool of threads. The functions are
+ // executed in the order in which they are scheduled.
+ void Schedule(std::function<void()> fn) {
+ std::unique_lock<std::mutex> l(mu_);
+ if (waiters_.empty()) {
+ pending_.push_back(fn);
+ } else {
+ Waiter* w = waiters_.back();
+ waiters_.pop_back();
+ w->ready = true;
+ w->work = fn;
+ w->cv.notify_one();
+ }
+ }
+
+ protected:
+ void WorkerLoop() {
+ std::unique_lock<std::mutex> l(mu_);
+ Waiter w;
+ while (!exiting_) {
+ std::function<void()> fn;
+ if (pending_.empty()) {
+ // Wait for work to be assigned to me
+ w.ready = false;
+ waiters_.push_back(&w);
+ w.cv.wait(l, [&w]() { return w.ready; });
+ fn = w.work;
+ w.work = nullptr;
+ } else {
+ // Pick up pending work
+ fn = pending_.front();
+ pending_.pop_front();
+ if (pending_.empty()) {
+ empty_.notify_all();
+ }
+ }
+ if (fn) {
+ mu_.unlock();
+ fn();
+ mu_.lock();
+ }
+ }
+ }
+
+ private:
+ struct Waiter {
+ std::condition_variable cv;
+ std::function<void()> work;
+ bool ready;
+ };
+
+ std::mutex mu_;
+ std::vector<std::thread*> threads_; // All threads
+ std::vector<Waiter*> waiters_; // Stack of waiting threads.
+ std::deque<std::function<void()>> pending_; // Queue of pending work
+ std::condition_variable empty_; // Signaled on pending_.empty()
+ bool exiting_ = false;
+};
+
+
+// Notification is an object that allows a user to to wait for another
+// thread to signal a notification that an event has occurred.
+//
+// Multiple threads can wait on the same Notification object.
+// but only one caller must call Notify() on the object.
+class Notification {
+ public:
+ Notification() : notified_(false) {}
+ ~Notification() {}
+
+ void Notify() {
+ std::unique_lock<std::mutex> l(mu_);
+ eigen_assert(!notified_);
+ notified_ = true;
+ cv_.notify_all();
+ }
+
+ void WaitForNotification() {
+ std::unique_lock<std::mutex> l(mu_);
+ cv_.wait(l, [this]() { return notified_; } );
+ }
+
+ private:
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool notified_;
+};
+
+// Runs an arbitrary function and then calls Notify() on the passed in
+// Notification.
+template <typename Function, typename... Args> struct FunctionWrapper
+{
+ static void run(Notification* n, Function f, Args... args) {
+ f(args...);
+ n->Notify();
+ }
+};
+
+static EIGEN_STRONG_INLINE void wait_until_ready(Notification* n) {
+ if (n) {
+ n->WaitForNotification();
+ }
}
+// Build a thread pool device on top the an existing pool of threads.
struct ThreadPoolDevice {
- ThreadPoolDevice(size_t num_cores) : num_threads_(num_cores) { }
+ ThreadPoolDevice(ThreadPool* pool, size_t num_cores) : pool_(pool), num_threads_(num_cores) { }
EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
return internal::aligned_malloc(num_bytes);
@@ -73,15 +205,21 @@ struct ThreadPoolDevice {
}
template <class Function, class... Args>
- EIGEN_STRONG_INLINE Future enqueue(Function&& f, Args&&... args) const {
- return std::async(std::launch::async, f, args...);
+ EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const {
+ Notification* n = new Notification();
+ std::function<void()> func =
+ std::bind(&FunctionWrapper<Function, Args...>::run, n, f, args...);
+ pool_->Schedule(func);
+ return n;
}
template <class Function, class... Args>
- EIGEN_STRONG_INLINE void enqueueNoFuture(Function&& f, Args&&... args) const {
- std::async(std::launch::async, f, args...);
+ EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
+ std::function<void()> func = std::bind(f, args...);
+ pool_->Schedule(func);
}
private:
+ ThreadPool* pool_;
size_t num_threads_;
};
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
index 02e1667b9..6ea588e4b 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
@@ -131,7 +131,7 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable>
const Index blocksize = std::max<Index>(PacketSize, (blocksz - (blocksz % PacketSize)));
const Index numblocks = size / blocksize;
- std::vector<Future> results;
+ std::vector<Notification*> results;
results.reserve(numblocks);
for (int i = 0; i < numblocks; ++i) {
results.push_back(device.enqueue(&EvalRange<Evaluator, Index>::run, evaluator, i*blocksize, (i+1)*blocksize));
@@ -142,7 +142,8 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable>
}
for (int i = 0; i < numblocks; ++i) {
- get_when_ready(&results[i]);
+ wait_until_ready(results[i]);
+ delete results[i];
}
}
diff --git a/unsupported/test/cxx11_tensor_thread_pool.cpp b/unsupported/test/cxx11_tensor_thread_pool.cpp
index 6fe65c7f9..05b55f706 100644
--- a/unsupported/test/cxx11_tensor_thread_pool.cpp
+++ b/unsupported/test/cxx11_tensor_thread_pool.cpp
@@ -26,7 +26,8 @@ static void test_multithread_elementwise()
in1.setRandom();
in2.setRandom();
- Eigen::ThreadPoolDevice thread_pool_device(internal::random<int>(3, 11));
+ Eigen::ThreadPool tp(internal::random<int>(3, 11));
+ Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11));
out.device(thread_pool_device) = in1 + in2 * 3.14f;
for (int i = 0; i < 2; ++i) {
@@ -48,7 +49,8 @@ static void test_multithread_compound_assignment()
in1.setRandom();
in2.setRandom();
- Eigen::ThreadPoolDevice thread_pool_device(internal::random<int>(3, 11));
+ Eigen::ThreadPool tp(internal::random<int>(3, 11));
+ Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11));
out.device(thread_pool_device) = in1;
out.device(thread_pool_device) += in2 * 3.14f;
@@ -80,7 +82,8 @@ static void test_multithread_contraction()
MapXf m_right(t_right.data(), 1147, 1400);
Matrix<float, Dynamic, Dynamic, DataLayout> m_result(1500, 1400);
- Eigen::ThreadPoolDevice thread_pool_device(4);
+ Eigen::ThreadPool tp(4);
+ Eigen::ThreadPoolDevice thread_pool_device(&tp, 4);
// compute results by separate methods
t_result.device(thread_pool_device) = t_left.contract(t_right, dims);
@@ -115,7 +118,8 @@ static void test_contraction_corner_cases()
MapXf m_right(t_right.data(), 32, 28*28);
Matrix<float, Dynamic, Dynamic, DataLayout> m_result(500, 28*28);
- Eigen::ThreadPoolDevice thread_pool_device(12);
+ Eigen::ThreadPool tp(12);
+ Eigen::ThreadPoolDevice thread_pool_device(&tp, 12);
// compute results by separate methods
t_result.device(thread_pool_device) = t_left.contract(t_right, dims);
@@ -204,7 +208,8 @@ static void test_multithread_contraction_agrees_with_singlethread() {
typedef Tensor<float, 1>::DimensionPair DimPair;
Eigen::array<DimPair, 1> dims({{DimPair(1, 2)}});
- Eigen::ThreadPoolDevice thread_pool_device(internal::random<int>(2, 11));
+ Eigen::ThreadPool tp(internal::random<int>(2, 11));
+ Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(2, 11));
Tensor<float, 5, DataLayout> st_result;
st_result = left.contract(right, dims);
@@ -227,7 +232,8 @@ static void test_memcpy() {
for (int i = 0; i < 5; ++i) {
const int num_threads = internal::random<int>(3, 11);
- Eigen::ThreadPoolDevice thread_pool_device(num_threads);
+ Eigen::ThreadPool tp(num_threads);
+ Eigen::ThreadPoolDevice thread_pool_device(&tp, num_threads);
const int size = internal::random<int>(13, 7632);
Tensor<float, 1> t1(size);
@@ -243,7 +249,8 @@ static void test_memcpy() {
static void test_multithread_random()
{
- Eigen::ThreadPoolDevice device(2);
+ Eigen::ThreadPool tp(2);
+ Eigen::ThreadPoolDevice device(&tp, 2);
Tensor<float, 1> t(1 << 20);
t.device(device) = t.random<Eigen::internal::NormalRandomGenerator<float>>();
}