diff options
author | Benoit Steiner <benoit.steiner.goog@gmail.com> | 2016-03-22 15:24:23 -0700 |
---|---|---|
committer | Benoit Steiner <benoit.steiner.goog@gmail.com> | 2016-03-22 15:24:23 -0700 |
commit | 002cf0d1c979857e057879d8c84b92439dbcc90d (patch) | |
tree | 0a317243eb0b125176826f704ddf7c68168ecb43 /unsupported | |
parent | 65a7113a36f70aeca34eac29f32b24ef865cb6e4 (diff) |
Use a single Barrier instead of a collection of Notifications to reduce the thread synchronization overhead
Diffstat (limited to 'unsupported')
-rw-r--r-- | unsupported/Eigen/CXX11/Tensor | 1 | ||||
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h | 75 | ||||
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h | 10 | ||||
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h | 25 |
4 files changed, 73 insertions, 38 deletions
diff --git a/unsupported/Eigen/CXX11/Tensor b/unsupported/Eigen/CXX11/Tensor index 969f25481..16132398d 100644 --- a/unsupported/Eigen/CXX11/Tensor +++ b/unsupported/Eigen/CXX11/Tensor @@ -51,6 +51,7 @@ typedef unsigned __int64 uint64_t; #endif #ifdef EIGEN_USE_THREADS +#include <atomic> #include <condition_variable> #include <deque> #include <mutex> diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index dcbef5b03..e4165bbf8 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -118,47 +118,82 @@ class ThreadPool : public ThreadPoolInterface { }; -// Notification is an object that allows a user to to wait for another -// thread to signal a notification that an event has occurred. -// -// Multiple threads can wait on the same Notification object. -// but only one caller must call Notify() on the object. -class Notification { +// Barrier is an object that allows one or more threads to wait until +// Notify has been called a specified number of times. +class Barrier { public: - Notification() : notified_(false) {} - ~Notification() {} + Barrier(unsigned int count) : state_(count << 1), notified_(false) { + eigen_assert(((count << 1) >> 1) == count); + } + ~Barrier() { + eigen_assert((state_>>1) == 0); + } void Notify() { + unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2; + if (v != 1) { + eigen_assert(((v + 2) & ~1) != 0); + return; // either count has not dropped to 0, or waiter is not waiting + } std::unique_lock<std::mutex> l(mu_); eigen_assert(!notified_); notified_ = true; cv_.notify_all(); } - void WaitForNotification() { + void Wait() { + unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel); + if ((v >> 1) == 0) return; std::unique_lock<std::mutex> l(mu_); - cv_.wait(l, [this]() { return notified_; } ); + while (!notified_) { + cv_.wait(l); + } } private: std::mutex mu_; std::condition_variable cv_; + std::atomic<unsigned int> state_; // low bit is waiter flag bool notified_; }; + +// Notification is an object that allows a user to to wait for another +// thread to signal a notification that an event has occurred. +// +// Multiple threads can wait on the same Notification object, +// but only one caller must call Notify() on the object. +struct Notification : Barrier { + Notification() : Barrier(1) {}; +}; + + // Runs an arbitrary function and then calls Notify() on the passed in // Notification. -template <typename Function, typename... Args> struct FunctionWrapper +template <typename Function, typename... Args> struct FunctionWrapperWithNotification { static void run(Notification* n, Function f, Args... args) { f(args...); - n->Notify(); + if (n) { + n->Notify(); + } + } +}; + +template <typename Function, typename... Args> struct FunctionWrapperWithBarrier +{ + static void run(Barrier* b, Function f, Args... args) { + f(args...); + if (b) { + b->Notify(); + } } }; -static EIGEN_STRONG_INLINE void wait_until_ready(Notification* n) { +template <typename SyncType> +static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) { if (n) { - n->WaitForNotification(); + n->Wait(); } } @@ -203,10 +238,20 @@ struct ThreadPoolDevice { EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const { Notification* n = new Notification(); std::function<void()> func = - std::bind(&FunctionWrapper<Function, Args...>::run, n, f, args...); + std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, f, args...); pool_->Schedule(func); return n; } + + template <class Function, class... Args> + EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, + Function&& f, + Args&&... args) const { + std::function<void()> func = std::bind( + &FunctionWrapperWithBarrier<Function, Args...>::run, b, f, args...); + pool_->Schedule(func); + } + template <class Function, class... Args> EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const { std::function<void()> func = std::bind(f, args...); diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h index 54da77bcf..6bbf235cc 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h @@ -127,20 +127,16 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable> const Index blocksize = numext::maxi<Index>(PacketSize, (blocksz - (blocksz % PacketSize))); const Index numblocks = size / blocksize; - MaxSizeVector<Notification*> results(numblocks); + Barrier barrier(numblocks); for (int i = 0; i < numblocks; ++i) { - results.push_back(device.enqueue(&EvalRange<Evaluator, Index, Vectorizable>::run, evaluator, i*blocksize, (i+1)*blocksize)); + device.enqueue_with_barrier(&barrier, &EvalRange<Evaluator, Index, Vectorizable>::run, evaluator, i*blocksize, (i+1)*blocksize); } if (numblocks * blocksize < size) { EvalRange<Evaluator, Index, Vectorizable>::run(evaluator, numblocks * blocksize, size); } - for (int i = 0; i < numblocks; ++i) { - wait_until_ready(results[i]); - delete results[i]; - } - + barrier.Wait(); } evaluator.cleanup(); } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h b/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h index fe1dc22ee..489451215 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h @@ -256,12 +256,11 @@ struct FullReducer<Self, Op, ThreadPoolDevice, false> { const Index numblocks = blocksize > 0 ? num_coeffs / blocksize : 0; eigen_assert(num_coeffs >= numblocks * blocksize); - MaxSizeVector<Notification*> results(numblocks); + Barrier barrier(numblocks); MaxSizeVector<typename Self::CoeffReturnType> shards(numblocks, reducer.initialize()); for (Index i = 0; i < numblocks; ++i) { - results.push_back( - device.enqueue(&FullReducerShard<Self, Op, false>::run, self, - i * blocksize, blocksize, reducer, &shards[i])); + device.enqueue_with_barrier(&barrier, &FullReducerShard<Self, Op, false>::run, self, + i * blocksize, blocksize, reducer, &shards[i]); } typename Self::CoeffReturnType finalShard; @@ -271,10 +270,7 @@ struct FullReducer<Self, Op, ThreadPoolDevice, false> { } else { finalShard = reducer.initialize(); } - for (Index i = 0; i < numblocks; ++i) { - wait_until_ready(results[i]); - delete results[i]; - } + barrier.Wait(); for (Index i = 0; i < numblocks; ++i) { reducer.reduce(shards[i], &finalShard); } @@ -307,12 +303,12 @@ struct FullReducer<Self, Op, ThreadPoolDevice, true> { const Index numblocks = blocksize > 0 ? num_coeffs / blocksize : 0; eigen_assert(num_coeffs >= numblocks * blocksize); - MaxSizeVector<Notification*> results(numblocks); + Barrier barrier(numblocks); MaxSizeVector<typename Self::CoeffReturnType> shards(numblocks, reducer.initialize()); for (Index i = 0; i < numblocks; ++i) { - results.push_back(device.enqueue(&FullReducerShard<Self, Op, true>::run, - self, i * blocksize, blocksize, reducer, - &shards[i])); + device.enqueue_with_barrier(&barrier, &FullReducerShard<Self, Op, true>::run, + self, i * blocksize, blocksize, reducer, + &shards[i]); } typename Self::CoeffReturnType finalShard; if (numblocks * blocksize < num_coeffs) { @@ -322,10 +318,7 @@ struct FullReducer<Self, Op, ThreadPoolDevice, true> { finalShard = reducer.initialize(); } - for (Index i = 0; i < numblocks; ++i) { - wait_until_ready(results[i]); - delete results[i]; - } + barrier.Wait(); for (Index i = 0; i < numblocks; ++i) { reducer.reduce(shards[i], &finalShard); } |