aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported
diff options
context:
space:
mode:
authorGravatar Eugene Zhulenev <ezhulenev@google.com>2019-08-30 14:49:40 -0700
committerGravatar Eugene Zhulenev <ezhulenev@google.com>2019-08-30 14:49:40 -0700
commit66665e7e76d2ad5aa37775b3777e9a53c6d1c18c (patch)
treecb62a23e970d9125475abd95e4c9e68a02a04461 /unsupported
parentf6c51d9209ccc04d28c39f4c8059e7d3e74d6e07 (diff)
Asynchronous expression evaluation with TensorAsyncDevice
Diffstat (limited to 'unsupported')
-rw-r--r--unsupported/Eigen/CXX11/ThreadPool2
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorBase.h11
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h1
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h41
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h271
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h6
-rw-r--r--unsupported/test/cxx11_tensor_executor.cpp107
-rw-r--r--unsupported/test/cxx11_tensor_thread_pool.cpp39
8 files changed, 414 insertions, 64 deletions
diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool
index d046af9b2..613fdb57a 100644
--- a/unsupported/Eigen/CXX11/ThreadPool
+++ b/unsupported/Eigen/CXX11/ThreadPool
@@ -43,7 +43,7 @@
#include <mutex>
#include <thread>
#include <functional>
-#include <memory>
+#include <utility>
#include "src/util/CXX11Meta.h"
#include "src/util/MaxSizeVector.h"
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h b/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h
index dbacf494e..095c85dc4 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h
@@ -1063,6 +1063,17 @@ class TensorBase : public TensorBase<Derived, ReadOnlyAccessors> {
return TensorDevice<Derived, DeviceType>(dev, derived());
}
+#ifdef EIGEN_USE_THREADS
+ // Select the async device on which to evaluate the expression.
+ template <typename DeviceType>
+ typename internal::enable_if<
+ internal::is_same<DeviceType, ThreadPoolDevice>::value,
+ TensorAsyncDevice<Derived, DeviceType>>::type
+ device(const DeviceType& dev, std::function<void()> done) {
+ return TensorAsyncDevice<Derived, DeviceType>(dev, derived(), std::move(done));
+ }
+#endif // EIGEN_USE_THREADS
+
protected:
EIGEN_DEVICE_FUNC
EIGEN_STRONG_INLINE Derived& derived() { return *static_cast<Derived*>(this); }
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h b/unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h
index 49fb21dc8..c8a8b16db 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h
@@ -932,6 +932,7 @@ class TensorBlockMapper {
typedef TensorBlock<Scalar, StorageIndex, NumDims, Layout> Block;
typedef DSizes<StorageIndex, NumDims> Dimensions;
+ TensorBlockMapper() {}
TensorBlockMapper(const Dimensions& dims,
const TensorBlockShapeType block_shape,
Index min_target_size)
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h
index 29e50a3b2..5122b3623 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h
@@ -63,6 +63,47 @@ template <typename ExpressionType, typename DeviceType> class TensorDevice {
ExpressionType& m_expression;
};
+#ifdef EIGEN_USE_THREADS
+
+/** \class TensorAsyncDevice
+ * \ingroup CXX11_Tensor_Module
+ *
+ * \brief Pseudo expression providing an operator = that will evaluate its
+ * argument asynchronously on the specified device (currently supports only
+ * ThreadPoolDevice).
+ *
+ * Example:
+ * std::function<void()> done = []() {};
+ * C.device(EIGEN_THREAD_POOL, std::move(done)) = A + B;
+ */
+
+template <typename ExpressionType, typename DeviceType>
+class TensorAsyncDevice {
+ public:
+ TensorAsyncDevice(const DeviceType& device, ExpressionType& expression,
+ std::function<void()> done)
+ : m_device(device), m_expression(expression), m_done(std::move(done)) {}
+
+ template <typename OtherDerived>
+ EIGEN_STRONG_INLINE TensorAsyncDevice& operator=(const OtherDerived& other) {
+ typedef TensorAssignOp<ExpressionType, const OtherDerived> Assign;
+ typedef internal::TensorAsyncExecutor<const Assign, DeviceType> Executor;
+
+ // WARNING: After assignment 'm_done' callback will be in undefined state.
+ Assign assign(m_expression, other);
+ Executor::runAsync(assign, m_device, std::move(m_done));
+
+ return *this;
+ }
+
+ protected:
+ const DeviceType& m_device;
+ ExpressionType& m_expression;
+ std::function<void()> m_done;
+};
+
+#endif // EIGEN_USE_THREADS
+
} // end namespace Eigen
#endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_H
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
index fddb90d77..18d9de9e6 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
@@ -84,7 +84,7 @@ class TensorExecutor {
EIGEN_DEVICE_FUNC
static EIGEN_STRONG_INLINE void run(const Expression& expr,
- const Device& device = Device()) {
+ const Device& device = Device()) {
TensorEvaluator<Expression, Device> evaluator(expr, device);
const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
if (needs_assign) {
@@ -98,6 +98,14 @@ class TensorExecutor {
};
/**
+ * Default async execution strategy is not implemented. Currently it's only
+ * available for ThreadPoolDevice (see definition below).
+ */
+template <typename Expression, typename Device, bool Vectorizable,
+ bool Tileable>
+class TensorAsyncExecutor {};
+
+/**
* Process all the data with a single cpu thread, using vectorized instructions.
*/
template <typename Expression>
@@ -107,8 +115,8 @@ class TensorExecutor<Expression, DefaultDevice, /*Vectorizable*/ true,
typedef typename Expression::Index StorageIndex;
EIGEN_DEVICE_FUNC
- static EIGEN_STRONG_INLINE void run(const Expression& expr,
- const DefaultDevice& device = DefaultDevice()) {
+ static EIGEN_STRONG_INLINE void run(
+ const Expression& expr, const DefaultDevice& device = DefaultDevice()) {
TensorEvaluator<Expression, DefaultDevice> evaluator(expr, device);
const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
if (needs_assign) {
@@ -206,8 +214,81 @@ class TensorExecutor<Expression, DefaultDevice, Vectorizable,
/**
* Multicore strategy: the index space is partitioned and each partition is
* executed on a single core.
+ *
+ * (1) TensorExecutor will submit work to the ThreadPoolDevice managed thread
+ * pool, and will block the caller thread until all tasks are finished.
+ *
+ * (2) TensorAsyncExecutor is a non-blocking version, that will submit work to
+ * the ThreadPoolDevice managed thread pool, and will return immediately.
+ * It will call 'done' callback after all tasks are finished.
*/
#ifdef EIGEN_USE_THREADS
+
+template <typename TensorBlockMapper>
+struct TensorExecutorTilingContext {
+ typedef typename TensorBlockMapper::Block TensorBlock;
+
+ TensorExecutorTilingContext() : buffer(nullptr) {}
+ TensorExecutorTilingContext(const TensorBlockMapper& b_mapper,
+ const TensorOpCost& b_cost, void* b_buffer,
+ size_t b_aligned_size)
+ : block_mapper(b_mapper),
+ cost(b_cost),
+ buffer(b_buffer),
+ aligned_blocksize(b_aligned_size) {}
+
+ template <typename Scalar>
+ Scalar* GetCurrentThreadBuffer(const ThreadPoolDevice& device) const {
+ // ThreadPoolDevice::currentThreadId() returns -1 if called from a thread
+ // not in the thread pool, such as the main thread dispatching Eigen
+ // expressions.
+ const int thread_idx = device.currentThreadId();
+ eigen_assert(thread_idx >= -1 && thread_idx < device.numThreads());
+
+ const Index offset = aligned_blocksize * (thread_idx + 1);
+ return reinterpret_cast<Scalar*>(static_cast<char*>(buffer) + offset);
+ }
+
+ TensorBlockMapper block_mapper; // navigate through blocks
+ TensorOpCost cost; // cost of computing a single block
+ void* buffer; // temporary buffer for blocks
+ size_t aligned_blocksize; // block size after memory alignment
+};
+
+// Computes a block evaluation parameters, and allocates temporary memory buffer
+// for blocks. See TensorExecutor/TensorAsyncExecutor (Tileable=true) below.
+template <typename Evaluator, typename TensorBlockMapper, bool Vectorizable>
+TensorExecutorTilingContext<TensorBlockMapper> GetTensorExecutorTilingContext(
+ const ThreadPoolDevice& device, const Evaluator& evaluator) {
+ // Prefer blocks skewed toward inner dimension.
+ TensorBlockShapeType block_shape = kSkewedInnerDims;
+ Index block_total_size = 0;
+
+ // Query expression tree for desired block size/shape.
+ std::vector<TensorOpResourceRequirements> resources;
+ evaluator.getResourceRequirements(&resources);
+ MergeResourceRequirements(resources, &block_shape, &block_total_size);
+ int num_threads = device.numThreads();
+
+ // Estimate minimum block size based on cost.
+ TensorOpCost cost = evaluator.costPerCoeff(Vectorizable);
+ double taskSize = TensorCostModel<ThreadPoolDevice>::taskSize(1, cost);
+ size_t block_size = static_cast<size_t>(1.0 / taskSize);
+
+ TensorBlockMapper block_mapper(
+ typename TensorBlockMapper::Dimensions(evaluator.dimensions()),
+ block_shape, block_size);
+
+ block_size = block_mapper.block_dims_total_size();
+ const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1);
+ const size_t aligned_blocksize =
+ align *
+ divup<size_t>(block_size * sizeof(typename Evaluator::Scalar), align);
+ void* buf = device.allocate((num_threads + 1) * aligned_blocksize);
+
+ return {block_mapper, cost * block_size, buf, aligned_blocksize};
+}
+
template <typename Evaluator, typename StorageIndex, bool Vectorizable>
struct EvalRange {
static void run(Evaluator* evaluator_in, const StorageIndex firstIdx,
@@ -274,7 +355,7 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> {
typedef EvalRange<Evaluator, StorageIndex, Vectorizable> EvalRange;
Evaluator evaluator(expr, device);
- const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
+ const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr);
if (needs_assign) {
const StorageIndex size = array_prod(evaluator.dimensions());
device.parallelFor(size, evaluator.costPerCoeff(Vectorizable),
@@ -290,18 +371,18 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> {
template <typename Expression, bool Vectorizable>
class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ true> {
public:
+ typedef typename traits<Expression>::Index StorageIndex;
typedef typename traits<Expression>::Scalar Scalar;
typedef typename remove_const<Scalar>::type ScalarNoConst;
- typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
- typedef typename traits<Expression>::Index StorageIndex;
-
static const int NumDims = traits<Expression>::NumDimensions;
+ typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
+ typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> BlockMapper;
+ typedef TensorExecutorTilingContext<BlockMapper> TilingContext;
+
static EIGEN_STRONG_INLINE void run(const Expression& expr,
const ThreadPoolDevice& device) {
- typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> TensorBlockMapper;
-
Evaluator evaluator(expr, device);
Index total_size = array_prod(evaluator.dimensions());
Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar);
@@ -315,50 +396,152 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ tr
return;
}
- const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
+ const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr);
if (needs_assign) {
- TensorBlockShapeType block_shape = kSkewedInnerDims;
- Index block_total_size = 0;
- // Query expression tree for desired block size/shape.
- std::vector<internal::TensorOpResourceRequirements> resources;
- evaluator.getResourceRequirements(&resources);
- MergeResourceRequirements(resources, &block_shape, &block_total_size);
- int num_threads = device.numThreads();
+ const TilingContext tiling =
+ internal::GetTensorExecutorTilingContext<Evaluator, BlockMapper,
+ Vectorizable>(device, evaluator);
- // Estimate minimum block size based on cost.
- TensorOpCost cost = evaluator.costPerCoeff(Vectorizable);
- double taskSize = TensorCostModel<ThreadPoolDevice>::taskSize(1, cost);
- size_t block_size = static_cast<size_t>(1.0 / taskSize);
- TensorBlockMapper block_mapper(
- typename TensorBlockMapper::Dimensions(evaluator.dimensions()),
- block_shape, block_size);
- block_size = block_mapper.block_dims_total_size();
- const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1);
- const size_t aligned_blocksize =
- align * divup<size_t>(block_size * sizeof(Scalar), align);
- void* buf = device.allocate((num_threads + 1) * aligned_blocksize);
device.parallelFor(
- block_mapper.total_block_count(), cost * block_size,
- [=, &device, &evaluator, &block_mapper](StorageIndex firstIdx,
- StorageIndex lastIdx) {
- // currentThreadId() returns -1 if called from a thread not in the
- // thread pool, such as the main thread dispatching Eigen
- // expressions.
- const int thread_idx = device.currentThreadId();
- eigen_assert(thread_idx >= -1 && thread_idx < num_threads);
- ScalarNoConst* thread_buf = reinterpret_cast<ScalarNoConst*>(
- static_cast<char*>(buf) + aligned_blocksize * (thread_idx + 1));
+ tiling.block_mapper.total_block_count(), tiling.cost,
+ [=, &device, &evaluator, &tiling](StorageIndex firstIdx,
+ StorageIndex lastIdx) {
+ ScalarNoConst* thread_buf =
+ tiling.template GetCurrentThreadBuffer<ScalarNoConst>(device);
for (StorageIndex i = firstIdx; i < lastIdx; ++i) {
- auto block = block_mapper.GetBlockForIndex(i, thread_buf);
+ auto block = tiling.block_mapper.GetBlockForIndex(i, thread_buf);
evaluator.evalBlock(&block);
}
});
- device.deallocate(buf);
+ device.deallocate(tiling.buffer);
}
evaluator.cleanup();
}
};
+template <typename Expression, bool Vectorizable, bool Tileable>
+class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> {
+ public:
+ typedef typename Expression::Index StorageIndex;
+ typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
+
+ static EIGEN_STRONG_INLINE void runAsync(const Expression& expr,
+ const ThreadPoolDevice& device,
+ std::function<void()> done) {
+ TensorAsyncExecutorContext* const ctx =
+ new TensorAsyncExecutorContext(expr, device, std::move(done));
+ // TODO(ezhulenev): This is a potentially blocking operation. Make it async!
+ const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr);
+
+ typedef EvalRange<Evaluator, StorageIndex, Vectorizable> EvalRange;
+
+ if (needs_assign) {
+ const StorageIndex size = array_prod(ctx->evaluator.dimensions());
+ device.parallelForAsync(
+ size, ctx->evaluator.costPerCoeff(Vectorizable),
+ EvalRange::alignBlockSize,
+ [ctx](StorageIndex firstIdx, StorageIndex lastIdx) {
+ EvalRange::run(&ctx->evaluator, firstIdx, lastIdx);
+ },
+ [ctx]() { delete ctx; });
+ }
+ }
+
+ private:
+ struct TensorAsyncExecutorContext {
+ TensorAsyncExecutorContext(const Expression& expr,
+ const ThreadPoolDevice& thread_pool,
+ std::function<void()> done)
+ : evaluator(expr, thread_pool), on_done(std::move(done)) {}
+
+ ~TensorAsyncExecutorContext() {
+ on_done();
+ evaluator.cleanup();
+ }
+
+ Evaluator evaluator;
+
+ private:
+ std::function<void()> on_done;
+ };
+};
+
+template <typename Expression, bool Vectorizable>
+class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ true> {
+ public:
+ typedef typename traits<Expression>::Index StorageIndex;
+ typedef typename traits<Expression>::Scalar Scalar;
+ typedef typename remove_const<Scalar>::type ScalarNoConst;
+
+ static const int NumDims = traits<Expression>::NumDimensions;
+
+ typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
+ typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> BlockMapper;
+ typedef TensorExecutorTilingContext<BlockMapper> TilingContext;
+
+ static EIGEN_STRONG_INLINE void runAsync(const Expression& expr,
+ const ThreadPoolDevice& device,
+ std::function<void()> done) {
+ TensorAsyncExecutorContext* const ctx =
+ new TensorAsyncExecutorContext(expr, device, std::move(done));
+
+ Index total_size = array_prod(ctx->evaluator.dimensions());
+ Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar);
+
+ if (total_size < cache_size &&
+ !ExpressionHasTensorBroadcastingOp<Expression>::value) {
+ internal::TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable,
+ /*Tileable*/ false>::runAsync(
+ expr, device, [ctx]() { delete ctx; });
+ return;
+ }
+
+ // TODO(ezhulenev): This is a potentially blocking operation. Make it async!
+ const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr);
+
+ if (needs_assign) {
+ ctx->tiling =
+ internal::GetTensorExecutorTilingContext<Evaluator, BlockMapper,
+ Vectorizable>(device, ctx->evaluator);
+
+ device.parallelForAsync(
+ ctx->tiling.block_mapper.total_block_count(), ctx->tiling.cost,
+ [ctx](StorageIndex firstIdx, StorageIndex lastIdx) {
+ ScalarNoConst* thread_buf =
+ ctx->tiling.template GetCurrentThreadBuffer<ScalarNoConst>(ctx->device);
+ for (StorageIndex i = firstIdx; i < lastIdx; ++i) {
+ auto block = ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf);
+ ctx->evaluator.evalBlock(&block);
+ }
+ },
+ [ctx]() { delete ctx; });
+ }
+ }
+
+ private:
+ struct TensorAsyncExecutorContext {
+ TensorAsyncExecutorContext(const Expression& expr,
+ const ThreadPoolDevice& thread_pool,
+ std::function<void()> done)
+ : device(thread_pool),
+ evaluator(expr, thread_pool),
+ on_done(std::move(done)) {}
+
+ ~TensorAsyncExecutorContext() {
+ on_done();
+ device.deallocate(tiling.buffer);
+ evaluator.cleanup();
+ }
+
+ const ThreadPoolDevice& device;
+ Evaluator evaluator;
+ TilingContext tiling;
+
+ private:
+ std::function<void()> on_done;
+ };
+};
+
#endif // EIGEN_USE_THREADS
@@ -419,7 +602,7 @@ template <typename Expression, bool Vectorizable, bool Tileable>
EIGEN_STRONG_INLINE void TensorExecutor<Expression, GpuDevice, Vectorizable, Tileable>::run(
const Expression& expr, const GpuDevice& device) {
TensorEvaluator<Expression, GpuDevice> evaluator(expr, device);
- const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
+ const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr);
if (needs_assign) {
const int block_size = device.maxGpuThreadsPerBlock();
@@ -517,10 +700,10 @@ struct ExecExprFunctorKernel<Expr, false, Evaluator>
template <typename Expression, bool Vectorizable, bool Tileable>
class TensorExecutor<Expression, Eigen::SyclDevice, Vectorizable, Tileable> {
public:
- typedef typename Expression::Index Index;
+ typedef typename Expression::Index Index;
static EIGEN_STRONG_INLINE void run(const Expression &expr, const Eigen::SyclDevice &dev) {
Eigen::TensorEvaluator<Expression, Eigen::SyclDevice> evaluator(expr, dev);
- const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
+ const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr);
if (needs_assign) {
Index range, GRange, tileSize;
Index total_size = ::Eigen::internal::array_prod(evaluator.dimensions());
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h b/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h
index 3cca0c7e9..e823bd932 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h
@@ -94,6 +94,7 @@ template<typename XprType, template <class> class MakePointer_ = MakePointer> cl
template<typename XprType> class TensorForcedEvalOp;
template<typename ExpressionType, typename DeviceType> class TensorDevice;
+template<typename ExpressionType, typename DeviceType> class TensorAsyncDevice;
template<typename Derived, typename Device> struct TensorEvaluator;
struct NoOpOutputKernel;
@@ -167,6 +168,11 @@ template <typename Expression, typename Device,
bool Tileable = IsTileable<Device, Expression>::value>
class TensorExecutor;
+template <typename Expression, typename Device,
+ bool Vectorizable = IsVectorizable<Device, Expression>::value,
+ bool Tileable = IsTileable<Device, Expression>::value>
+class TensorAsyncExecutor;
+
} // end namespace internal
} // end namespace Eigen
diff --git a/unsupported/test/cxx11_tensor_executor.cpp b/unsupported/test/cxx11_tensor_executor.cpp
index e9922a48d..f4d0401da 100644
--- a/unsupported/test/cxx11_tensor_executor.cpp
+++ b/unsupported/test/cxx11_tensor_executor.cpp
@@ -562,37 +562,112 @@ static void test_execute_reverse_rvalue(Device d)
}
}
+template <typename T, int NumDims, typename Device, bool Vectorizable,
+ bool Tileable, int Layout>
+static void test_async_execute_unary_expr(Device d)
+{
+ static constexpr int Options = 0 | Layout;
+
+ // Pick a large enough tensor size to bypass small tensor block evaluation
+ // optimization.
+ auto dims = RandomDims<NumDims>(50 / NumDims, 100 / NumDims);
+
+ Tensor<T, NumDims, Options, Index> src(dims);
+ Tensor<T, NumDims, Options, Index> dst(dims);
+
+ src.setRandom();
+ const auto expr = src.square();
+
+ using Assign = TensorAssignOp<decltype(dst), const decltype(expr)>;
+ using Executor = internal::TensorAsyncExecutor<const Assign, Device,
+ Vectorizable, Tileable>;
+ Eigen::Barrier done(1);
+ Executor::runAsync(Assign(dst, expr), d, [&done]() { done.Notify(); });
+ done.Wait();
+
+ for (Index i = 0; i < dst.dimensions().TotalSize(); ++i) {
+ T square = src.coeff(i) * src.coeff(i);
+ VERIFY_IS_EQUAL(square, dst.coeff(i));
+ }
+}
+
+template <typename T, int NumDims, typename Device, bool Vectorizable,
+ bool Tileable, int Layout>
+static void test_async_execute_binary_expr(Device d)
+{
+ static constexpr int Options = 0 | Layout;
+
+ // Pick a large enough tensor size to bypass small tensor block evaluation
+ // optimization.
+ auto dims = RandomDims<NumDims>(50 / NumDims, 100 / NumDims);
+
+ Tensor<T, NumDims, Options, Index> lhs(dims);
+ Tensor<T, NumDims, Options, Index> rhs(dims);
+ Tensor<T, NumDims, Options, Index> dst(dims);
+
+ lhs.setRandom();
+ rhs.setRandom();
+
+ const auto expr = lhs + rhs;
+
+ using Assign = TensorAssignOp<decltype(dst), const decltype(expr)>;
+ using Executor = internal::TensorAsyncExecutor<const Assign, Device,
+ Vectorizable, Tileable>;
+
+ Eigen::Barrier done(1);
+ Executor::runAsync(Assign(dst, expr), d, [&done]() { done.Notify(); });
+ done.Wait();
+
+ for (Index i = 0; i < dst.dimensions().TotalSize(); ++i) {
+ T sum = lhs.coeff(i) + rhs.coeff(i);
+ VERIFY_IS_EQUAL(sum, dst.coeff(i));
+ }
+}
+
#ifdef EIGEN_DONT_VECTORIZE
#define VECTORIZABLE(VAL) !EIGEN_DONT_VECTORIZE && VAL
-#else
+#else
#define VECTORIZABLE(VAL) VAL
#endif
#define CALL_SUBTEST_PART(PART) \
CALL_SUBTEST_##PART
-#define CALL_SUBTEST_COMBINATIONS(PART, NAME, T, NUM_DIMS) \
- CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, false, ColMajor>(default_device))); \
- CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, true, ColMajor>(default_device))); \
+#define CALL_SUBTEST_COMBINATIONS(PART, NAME, T, NUM_DIMS) \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, false, ColMajor>(default_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, true, ColMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, VECTORIZABLE(true), false, ColMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, VECTORIZABLE(true), true, ColMajor>(default_device))); \
- CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, false, RowMajor>(default_device))); \
- CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, true, RowMajor>(default_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, false, RowMajor>(default_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, true, RowMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, VECTORIZABLE(true), false, RowMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, VECTORIZABLE(true), true, RowMajor>(default_device))); \
- CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, ColMajor>(tp_device))); \
- CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, ColMajor>(tp_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, ColMajor>(tp_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, ColMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), false, ColMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), true, ColMajor>(tp_device))); \
- CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, RowMajor>(tp_device))); \
- CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, RowMajor>(tp_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, RowMajor>(tp_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, RowMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), false, RowMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), true, RowMajor>(tp_device)))
+// NOTE: Currently only ThreadPoolDevice supports async expression evaluation.
+#define CALL_ASYNC_SUBTEST_COMBINATIONS(PART, NAME, T, NUM_DIMS) \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, ColMajor>(tp_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, ColMajor>(tp_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), false, ColMajor>(tp_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), true, ColMajor>(tp_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, RowMajor>(tp_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, RowMajor>(tp_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), false, RowMajor>(tp_device))); \
+ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), true, RowMajor>(tp_device)))
+
EIGEN_DECLARE_TEST(cxx11_tensor_executor) {
Eigen::DefaultDevice default_device;
+ // Default device is unused in ASYNC tests.
+ EIGEN_UNUSED_VARIABLE(default_device);
- const auto num_threads = internal::random<int>(1, 24);
+ const auto num_threads = internal::random<int>(20, 24);
Eigen::ThreadPool tp(num_threads);
Eigen::ThreadPoolDevice tp_device(&tp, num_threads);
@@ -660,8 +735,16 @@ EIGEN_DECLARE_TEST(cxx11_tensor_executor) {
CALL_SUBTEST_COMBINATIONS(14, test_execute_reverse_rvalue, float, 4);
CALL_SUBTEST_COMBINATIONS(14, test_execute_reverse_rvalue, float, 5);
+ CALL_ASYNC_SUBTEST_COMBINATIONS(15, test_async_execute_unary_expr, float, 3);
+ CALL_ASYNC_SUBTEST_COMBINATIONS(15, test_async_execute_unary_expr, float, 4);
+ CALL_ASYNC_SUBTEST_COMBINATIONS(15, test_async_execute_unary_expr, float, 5);
+
+ CALL_ASYNC_SUBTEST_COMBINATIONS(16, test_async_execute_binary_expr, float, 3);
+ CALL_ASYNC_SUBTEST_COMBINATIONS(16, test_async_execute_binary_expr, float, 4);
+ CALL_ASYNC_SUBTEST_COMBINATIONS(16, test_async_execute_binary_expr, float, 5);
+
// Force CMake to split this test.
- // EIGEN_SUFFIXES;1;2;3;4;5;6;7;8;9;10;11;12;13;14
+ // EIGEN_SUFFIXES;1;2;3;4;5;6;7;8;9;10;11;12;13;14;15;16
}
#undef CALL_SUBTEST_COMBINATIONS
diff --git a/unsupported/test/cxx11_tensor_thread_pool.cpp b/unsupported/test/cxx11_tensor_thread_pool.cpp
index f8a7b3662..53b50d1ed 100644
--- a/unsupported/test/cxx11_tensor_thread_pool.cpp
+++ b/unsupported/test/cxx11_tensor_thread_pool.cpp
@@ -38,9 +38,9 @@ class TestAllocator : public Allocator {
void test_multithread_elementwise()
{
- Tensor<float, 3> in1(2,3,7);
- Tensor<float, 3> in2(2,3,7);
- Tensor<float, 3> out(2,3,7);
+ Tensor<float, 3> in1(200, 30, 70);
+ Tensor<float, 3> in2(200, 30, 70);
+ Tensor<float, 3> out(200, 30, 70);
in1.setRandom();
in2.setRandom();
@@ -49,15 +49,39 @@ void test_multithread_elementwise()
Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11));
out.device(thread_pool_device) = in1 + in2 * 3.14f;
- for (int i = 0; i < 2; ++i) {
- for (int j = 0; j < 3; ++j) {
- for (int k = 0; k < 7; ++k) {
- VERIFY_IS_APPROX(out(i,j,k), in1(i,j,k) + in2(i,j,k) * 3.14f);
+ for (int i = 0; i < 200; ++i) {
+ for (int j = 0; j < 30; ++j) {
+ for (int k = 0; k < 70; ++k) {
+ VERIFY_IS_APPROX(out(i, j, k), in1(i, j, k) + in2(i, j, k) * 3.14f);
}
}
}
}
+void test_async_multithread_elementwise()
+{
+ Tensor<float, 3> in1(200, 30, 70);
+ Tensor<float, 3> in2(200, 30, 70);
+ Tensor<float, 3> out(200, 30, 70);
+
+ in1.setRandom();
+ in2.setRandom();
+
+ Eigen::ThreadPool tp(internal::random<int>(3, 11));
+ Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11));
+
+ Eigen::Barrier b(1);
+ out.device(thread_pool_device, [&b]() { b.Notify(); }) = in1 + in2 * 3.14f;
+ b.Wait();
+
+ for (int i = 0; i < 200; ++i) {
+ for (int j = 0; j < 30; ++j) {
+ for (int k = 0; k < 70; ++k) {
+ VERIFY_IS_APPROX(out(i, j, k), in1(i, j, k) + in2(i, j, k) * 3.14f);
+ }
+ }
+ }
+}
void test_multithread_compound_assignment()
{
@@ -516,6 +540,7 @@ void test_threadpool_allocate(TestAllocator* allocator)
EIGEN_DECLARE_TEST(cxx11_tensor_thread_pool)
{
CALL_SUBTEST_1(test_multithread_elementwise());
+ CALL_SUBTEST_1(test_async_multithread_elementwise());
CALL_SUBTEST_1(test_multithread_compound_assignment());
CALL_SUBTEST_2(test_multithread_contraction<ColMajor>());