aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--unsupported/Eigen/CXX11/Tensor1
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h75
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h10
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h25
4 files changed, 73 insertions, 38 deletions
diff --git a/unsupported/Eigen/CXX11/Tensor b/unsupported/Eigen/CXX11/Tensor
index 969f25481..16132398d 100644
--- a/unsupported/Eigen/CXX11/Tensor
+++ b/unsupported/Eigen/CXX11/Tensor
@@ -51,6 +51,7 @@ typedef unsigned __int64 uint64_t;
#endif
#ifdef EIGEN_USE_THREADS
+#include <atomic>
#include <condition_variable>
#include <deque>
#include <mutex>
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
index dcbef5b03..e4165bbf8 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
@@ -118,47 +118,82 @@ class ThreadPool : public ThreadPoolInterface {
};
-// Notification is an object that allows a user to to wait for another
-// thread to signal a notification that an event has occurred.
-//
-// Multiple threads can wait on the same Notification object.
-// but only one caller must call Notify() on the object.
-class Notification {
+// Barrier is an object that allows one or more threads to wait until
+// Notify has been called a specified number of times.
+class Barrier {
public:
- Notification() : notified_(false) {}
- ~Notification() {}
+ Barrier(unsigned int count) : state_(count << 1), notified_(false) {
+ eigen_assert(((count << 1) >> 1) == count);
+ }
+ ~Barrier() {
+ eigen_assert((state_>>1) == 0);
+ }
void Notify() {
+ unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2;
+ if (v != 1) {
+ eigen_assert(((v + 2) & ~1) != 0);
+ return; // either count has not dropped to 0, or waiter is not waiting
+ }
std::unique_lock<std::mutex> l(mu_);
eigen_assert(!notified_);
notified_ = true;
cv_.notify_all();
}
- void WaitForNotification() {
+ void Wait() {
+ unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel);
+ if ((v >> 1) == 0) return;
std::unique_lock<std::mutex> l(mu_);
- cv_.wait(l, [this]() { return notified_; } );
+ while (!notified_) {
+ cv_.wait(l);
+ }
}
private:
std::mutex mu_;
std::condition_variable cv_;
+ std::atomic<unsigned int> state_; // low bit is waiter flag
bool notified_;
};
+
+// Notification is an object that allows a user to to wait for another
+// thread to signal a notification that an event has occurred.
+//
+// Multiple threads can wait on the same Notification object,
+// but only one caller must call Notify() on the object.
+struct Notification : Barrier {
+ Notification() : Barrier(1) {};
+};
+
+
// Runs an arbitrary function and then calls Notify() on the passed in
// Notification.
-template <typename Function, typename... Args> struct FunctionWrapper
+template <typename Function, typename... Args> struct FunctionWrapperWithNotification
{
static void run(Notification* n, Function f, Args... args) {
f(args...);
- n->Notify();
+ if (n) {
+ n->Notify();
+ }
+ }
+};
+
+template <typename Function, typename... Args> struct FunctionWrapperWithBarrier
+{
+ static void run(Barrier* b, Function f, Args... args) {
+ f(args...);
+ if (b) {
+ b->Notify();
+ }
}
};
-static EIGEN_STRONG_INLINE void wait_until_ready(Notification* n) {
+template <typename SyncType>
+static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
if (n) {
- n->WaitForNotification();
+ n->Wait();
}
}
@@ -203,10 +238,20 @@ struct ThreadPoolDevice {
EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const {
Notification* n = new Notification();
std::function<void()> func =
- std::bind(&FunctionWrapper<Function, Args...>::run, n, f, args...);
+ std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, f, args...);
pool_->Schedule(func);
return n;
}
+
+ template <class Function, class... Args>
+ EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b,
+ Function&& f,
+ Args&&... args) const {
+ std::function<void()> func = std::bind(
+ &FunctionWrapperWithBarrier<Function, Args...>::run, b, f, args...);
+ pool_->Schedule(func);
+ }
+
template <class Function, class... Args>
EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
std::function<void()> func = std::bind(f, args...);
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
index 54da77bcf..6bbf235cc 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
@@ -127,20 +127,16 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable>
const Index blocksize = numext::maxi<Index>(PacketSize, (blocksz - (blocksz % PacketSize)));
const Index numblocks = size / blocksize;
- MaxSizeVector<Notification*> results(numblocks);
+ Barrier barrier(numblocks);
for (int i = 0; i < numblocks; ++i) {
- results.push_back(device.enqueue(&EvalRange<Evaluator, Index, Vectorizable>::run, evaluator, i*blocksize, (i+1)*blocksize));
+ device.enqueue_with_barrier(&barrier, &EvalRange<Evaluator, Index, Vectorizable>::run, evaluator, i*blocksize, (i+1)*blocksize);
}
if (numblocks * blocksize < size) {
EvalRange<Evaluator, Index, Vectorizable>::run(evaluator, numblocks * blocksize, size);
}
- for (int i = 0; i < numblocks; ++i) {
- wait_until_ready(results[i]);
- delete results[i];
- }
-
+ barrier.Wait();
}
evaluator.cleanup();
}
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h b/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h
index fe1dc22ee..489451215 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h
@@ -256,12 +256,11 @@ struct FullReducer<Self, Op, ThreadPoolDevice, false> {
const Index numblocks = blocksize > 0 ? num_coeffs / blocksize : 0;
eigen_assert(num_coeffs >= numblocks * blocksize);
- MaxSizeVector<Notification*> results(numblocks);
+ Barrier barrier(numblocks);
MaxSizeVector<typename Self::CoeffReturnType> shards(numblocks, reducer.initialize());
for (Index i = 0; i < numblocks; ++i) {
- results.push_back(
- device.enqueue(&FullReducerShard<Self, Op, false>::run, self,
- i * blocksize, blocksize, reducer, &shards[i]));
+ device.enqueue_with_barrier(&barrier, &FullReducerShard<Self, Op, false>::run, self,
+ i * blocksize, blocksize, reducer, &shards[i]);
}
typename Self::CoeffReturnType finalShard;
@@ -271,10 +270,7 @@ struct FullReducer<Self, Op, ThreadPoolDevice, false> {
} else {
finalShard = reducer.initialize();
}
- for (Index i = 0; i < numblocks; ++i) {
- wait_until_ready(results[i]);
- delete results[i];
- }
+ barrier.Wait();
for (Index i = 0; i < numblocks; ++i) {
reducer.reduce(shards[i], &finalShard);
}
@@ -307,12 +303,12 @@ struct FullReducer<Self, Op, ThreadPoolDevice, true> {
const Index numblocks = blocksize > 0 ? num_coeffs / blocksize : 0;
eigen_assert(num_coeffs >= numblocks * blocksize);
- MaxSizeVector<Notification*> results(numblocks);
+ Barrier barrier(numblocks);
MaxSizeVector<typename Self::CoeffReturnType> shards(numblocks, reducer.initialize());
for (Index i = 0; i < numblocks; ++i) {
- results.push_back(device.enqueue(&FullReducerShard<Self, Op, true>::run,
- self, i * blocksize, blocksize, reducer,
- &shards[i]));
+ device.enqueue_with_barrier(&barrier, &FullReducerShard<Self, Op, true>::run,
+ self, i * blocksize, blocksize, reducer,
+ &shards[i]);
}
typename Self::CoeffReturnType finalShard;
if (numblocks * blocksize < num_coeffs) {
@@ -322,10 +318,7 @@ struct FullReducer<Self, Op, ThreadPoolDevice, true> {
finalShard = reducer.initialize();
}
- for (Index i = 0; i < numblocks; ++i) {
- wait_until_ready(results[i]);
- delete results[i];
- }
+ barrier.Wait();
for (Index i = 0; i < numblocks; ++i) {
reducer.reduce(shards[i], &finalShard);
}