diff options
author | Eugene Zhulenev <ezhulenev@google.com> | 2019-08-30 14:49:40 -0700 |
---|---|---|
committer | Eugene Zhulenev <ezhulenev@google.com> | 2019-08-30 14:49:40 -0700 |
commit | 66665e7e76d2ad5aa37775b3777e9a53c6d1c18c (patch) | |
tree | cb62a23e970d9125475abd95e4c9e68a02a04461 | |
parent | f6c51d9209ccc04d28c39f4c8059e7d3e74d6e07 (diff) |
Asynchronous expression evaluation with TensorAsyncDevice
-rw-r--r-- | unsupported/Eigen/CXX11/ThreadPool | 2 | ||||
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorBase.h | 11 | ||||
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h | 1 | ||||
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h | 41 | ||||
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h | 271 | ||||
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h | 6 | ||||
-rw-r--r-- | unsupported/test/cxx11_tensor_executor.cpp | 107 | ||||
-rw-r--r-- | unsupported/test/cxx11_tensor_thread_pool.cpp | 39 |
8 files changed, 414 insertions, 64 deletions
diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool index d046af9b2..613fdb57a 100644 --- a/unsupported/Eigen/CXX11/ThreadPool +++ b/unsupported/Eigen/CXX11/ThreadPool @@ -43,7 +43,7 @@ #include <mutex> #include <thread> #include <functional> -#include <memory> +#include <utility> #include "src/util/CXX11Meta.h" #include "src/util/MaxSizeVector.h" diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h b/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h index dbacf494e..095c85dc4 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h @@ -1063,6 +1063,17 @@ class TensorBase : public TensorBase<Derived, ReadOnlyAccessors> { return TensorDevice<Derived, DeviceType>(dev, derived()); } +#ifdef EIGEN_USE_THREADS + // Select the async device on which to evaluate the expression. + template <typename DeviceType> + typename internal::enable_if< + internal::is_same<DeviceType, ThreadPoolDevice>::value, + TensorAsyncDevice<Derived, DeviceType>>::type + device(const DeviceType& dev, std::function<void()> done) { + return TensorAsyncDevice<Derived, DeviceType>(dev, derived(), std::move(done)); + } +#endif // EIGEN_USE_THREADS + protected: EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Derived& derived() { return *static_cast<Derived*>(this); } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h b/unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h index 49fb21dc8..c8a8b16db 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h @@ -932,6 +932,7 @@ class TensorBlockMapper { typedef TensorBlock<Scalar, StorageIndex, NumDims, Layout> Block; typedef DSizes<StorageIndex, NumDims> Dimensions; + TensorBlockMapper() {} TensorBlockMapper(const Dimensions& dims, const TensorBlockShapeType block_shape, Index min_target_size) diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h index 29e50a3b2..5122b3623 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h @@ -63,6 +63,47 @@ template <typename ExpressionType, typename DeviceType> class TensorDevice { ExpressionType& m_expression; }; +#ifdef EIGEN_USE_THREADS + +/** \class TensorAsyncDevice + * \ingroup CXX11_Tensor_Module + * + * \brief Pseudo expression providing an operator = that will evaluate its + * argument asynchronously on the specified device (currently supports only + * ThreadPoolDevice). + * + * Example: + * std::function<void()> done = []() {}; + * C.device(EIGEN_THREAD_POOL, std::move(done)) = A + B; + */ + +template <typename ExpressionType, typename DeviceType> +class TensorAsyncDevice { + public: + TensorAsyncDevice(const DeviceType& device, ExpressionType& expression, + std::function<void()> done) + : m_device(device), m_expression(expression), m_done(std::move(done)) {} + + template <typename OtherDerived> + EIGEN_STRONG_INLINE TensorAsyncDevice& operator=(const OtherDerived& other) { + typedef TensorAssignOp<ExpressionType, const OtherDerived> Assign; + typedef internal::TensorAsyncExecutor<const Assign, DeviceType> Executor; + + // WARNING: After assignment 'm_done' callback will be in undefined state. + Assign assign(m_expression, other); + Executor::runAsync(assign, m_device, std::move(m_done)); + + return *this; + } + + protected: + const DeviceType& m_device; + ExpressionType& m_expression; + std::function<void()> m_done; +}; + +#endif // EIGEN_USE_THREADS + } // end namespace Eigen #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_H diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h index fddb90d77..18d9de9e6 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h @@ -84,7 +84,7 @@ class TensorExecutor { EIGEN_DEVICE_FUNC static EIGEN_STRONG_INLINE void run(const Expression& expr, - const Device& device = Device()) { + const Device& device = Device()) { TensorEvaluator<Expression, Device> evaluator(expr, device); const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); if (needs_assign) { @@ -98,6 +98,14 @@ class TensorExecutor { }; /** + * Default async execution strategy is not implemented. Currently it's only + * available for ThreadPoolDevice (see definition below). + */ +template <typename Expression, typename Device, bool Vectorizable, + bool Tileable> +class TensorAsyncExecutor {}; + +/** * Process all the data with a single cpu thread, using vectorized instructions. */ template <typename Expression> @@ -107,8 +115,8 @@ class TensorExecutor<Expression, DefaultDevice, /*Vectorizable*/ true, typedef typename Expression::Index StorageIndex; EIGEN_DEVICE_FUNC - static EIGEN_STRONG_INLINE void run(const Expression& expr, - const DefaultDevice& device = DefaultDevice()) { + static EIGEN_STRONG_INLINE void run( + const Expression& expr, const DefaultDevice& device = DefaultDevice()) { TensorEvaluator<Expression, DefaultDevice> evaluator(expr, device); const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); if (needs_assign) { @@ -206,8 +214,81 @@ class TensorExecutor<Expression, DefaultDevice, Vectorizable, /** * Multicore strategy: the index space is partitioned and each partition is * executed on a single core. + * + * (1) TensorExecutor will submit work to the ThreadPoolDevice managed thread + * pool, and will block the caller thread until all tasks are finished. + * + * (2) TensorAsyncExecutor is a non-blocking version, that will submit work to + * the ThreadPoolDevice managed thread pool, and will return immediately. + * It will call 'done' callback after all tasks are finished. */ #ifdef EIGEN_USE_THREADS + +template <typename TensorBlockMapper> +struct TensorExecutorTilingContext { + typedef typename TensorBlockMapper::Block TensorBlock; + + TensorExecutorTilingContext() : buffer(nullptr) {} + TensorExecutorTilingContext(const TensorBlockMapper& b_mapper, + const TensorOpCost& b_cost, void* b_buffer, + size_t b_aligned_size) + : block_mapper(b_mapper), + cost(b_cost), + buffer(b_buffer), + aligned_blocksize(b_aligned_size) {} + + template <typename Scalar> + Scalar* GetCurrentThreadBuffer(const ThreadPoolDevice& device) const { + // ThreadPoolDevice::currentThreadId() returns -1 if called from a thread + // not in the thread pool, such as the main thread dispatching Eigen + // expressions. + const int thread_idx = device.currentThreadId(); + eigen_assert(thread_idx >= -1 && thread_idx < device.numThreads()); + + const Index offset = aligned_blocksize * (thread_idx + 1); + return reinterpret_cast<Scalar*>(static_cast<char*>(buffer) + offset); + } + + TensorBlockMapper block_mapper; // navigate through blocks + TensorOpCost cost; // cost of computing a single block + void* buffer; // temporary buffer for blocks + size_t aligned_blocksize; // block size after memory alignment +}; + +// Computes a block evaluation parameters, and allocates temporary memory buffer +// for blocks. See TensorExecutor/TensorAsyncExecutor (Tileable=true) below. +template <typename Evaluator, typename TensorBlockMapper, bool Vectorizable> +TensorExecutorTilingContext<TensorBlockMapper> GetTensorExecutorTilingContext( + const ThreadPoolDevice& device, const Evaluator& evaluator) { + // Prefer blocks skewed toward inner dimension. + TensorBlockShapeType block_shape = kSkewedInnerDims; + Index block_total_size = 0; + + // Query expression tree for desired block size/shape. + std::vector<TensorOpResourceRequirements> resources; + evaluator.getResourceRequirements(&resources); + MergeResourceRequirements(resources, &block_shape, &block_total_size); + int num_threads = device.numThreads(); + + // Estimate minimum block size based on cost. + TensorOpCost cost = evaluator.costPerCoeff(Vectorizable); + double taskSize = TensorCostModel<ThreadPoolDevice>::taskSize(1, cost); + size_t block_size = static_cast<size_t>(1.0 / taskSize); + + TensorBlockMapper block_mapper( + typename TensorBlockMapper::Dimensions(evaluator.dimensions()), + block_shape, block_size); + + block_size = block_mapper.block_dims_total_size(); + const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1); + const size_t aligned_blocksize = + align * + divup<size_t>(block_size * sizeof(typename Evaluator::Scalar), align); + void* buf = device.allocate((num_threads + 1) * aligned_blocksize); + + return {block_mapper, cost * block_size, buf, aligned_blocksize}; +} + template <typename Evaluator, typename StorageIndex, bool Vectorizable> struct EvalRange { static void run(Evaluator* evaluator_in, const StorageIndex firstIdx, @@ -274,7 +355,7 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> { typedef EvalRange<Evaluator, StorageIndex, Vectorizable> EvalRange; Evaluator evaluator(expr, device); - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { const StorageIndex size = array_prod(evaluator.dimensions()); device.parallelFor(size, evaluator.costPerCoeff(Vectorizable), @@ -290,18 +371,18 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> { template <typename Expression, bool Vectorizable> class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ true> { public: + typedef typename traits<Expression>::Index StorageIndex; typedef typename traits<Expression>::Scalar Scalar; typedef typename remove_const<Scalar>::type ScalarNoConst; - typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator; - typedef typename traits<Expression>::Index StorageIndex; - static const int NumDims = traits<Expression>::NumDimensions; + typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator; + typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> BlockMapper; + typedef TensorExecutorTilingContext<BlockMapper> TilingContext; + static EIGEN_STRONG_INLINE void run(const Expression& expr, const ThreadPoolDevice& device) { - typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> TensorBlockMapper; - Evaluator evaluator(expr, device); Index total_size = array_prod(evaluator.dimensions()); Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar); @@ -315,50 +396,152 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ tr return; } - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { - TensorBlockShapeType block_shape = kSkewedInnerDims; - Index block_total_size = 0; - // Query expression tree for desired block size/shape. - std::vector<internal::TensorOpResourceRequirements> resources; - evaluator.getResourceRequirements(&resources); - MergeResourceRequirements(resources, &block_shape, &block_total_size); - int num_threads = device.numThreads(); + const TilingContext tiling = + internal::GetTensorExecutorTilingContext<Evaluator, BlockMapper, + Vectorizable>(device, evaluator); - // Estimate minimum block size based on cost. - TensorOpCost cost = evaluator.costPerCoeff(Vectorizable); - double taskSize = TensorCostModel<ThreadPoolDevice>::taskSize(1, cost); - size_t block_size = static_cast<size_t>(1.0 / taskSize); - TensorBlockMapper block_mapper( - typename TensorBlockMapper::Dimensions(evaluator.dimensions()), - block_shape, block_size); - block_size = block_mapper.block_dims_total_size(); - const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1); - const size_t aligned_blocksize = - align * divup<size_t>(block_size * sizeof(Scalar), align); - void* buf = device.allocate((num_threads + 1) * aligned_blocksize); device.parallelFor( - block_mapper.total_block_count(), cost * block_size, - [=, &device, &evaluator, &block_mapper](StorageIndex firstIdx, - StorageIndex lastIdx) { - // currentThreadId() returns -1 if called from a thread not in the - // thread pool, such as the main thread dispatching Eigen - // expressions. - const int thread_idx = device.currentThreadId(); - eigen_assert(thread_idx >= -1 && thread_idx < num_threads); - ScalarNoConst* thread_buf = reinterpret_cast<ScalarNoConst*>( - static_cast<char*>(buf) + aligned_blocksize * (thread_idx + 1)); + tiling.block_mapper.total_block_count(), tiling.cost, + [=, &device, &evaluator, &tiling](StorageIndex firstIdx, + StorageIndex lastIdx) { + ScalarNoConst* thread_buf = + tiling.template GetCurrentThreadBuffer<ScalarNoConst>(device); for (StorageIndex i = firstIdx; i < lastIdx; ++i) { - auto block = block_mapper.GetBlockForIndex(i, thread_buf); + auto block = tiling.block_mapper.GetBlockForIndex(i, thread_buf); evaluator.evalBlock(&block); } }); - device.deallocate(buf); + device.deallocate(tiling.buffer); } evaluator.cleanup(); } }; +template <typename Expression, bool Vectorizable, bool Tileable> +class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> { + public: + typedef typename Expression::Index StorageIndex; + typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator; + + static EIGEN_STRONG_INLINE void runAsync(const Expression& expr, + const ThreadPoolDevice& device, + std::function<void()> done) { + TensorAsyncExecutorContext* const ctx = + new TensorAsyncExecutorContext(expr, device, std::move(done)); + // TODO(ezhulenev): This is a potentially blocking operation. Make it async! + const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr); + + typedef EvalRange<Evaluator, StorageIndex, Vectorizable> EvalRange; + + if (needs_assign) { + const StorageIndex size = array_prod(ctx->evaluator.dimensions()); + device.parallelForAsync( + size, ctx->evaluator.costPerCoeff(Vectorizable), + EvalRange::alignBlockSize, + [ctx](StorageIndex firstIdx, StorageIndex lastIdx) { + EvalRange::run(&ctx->evaluator, firstIdx, lastIdx); + }, + [ctx]() { delete ctx; }); + } + } + + private: + struct TensorAsyncExecutorContext { + TensorAsyncExecutorContext(const Expression& expr, + const ThreadPoolDevice& thread_pool, + std::function<void()> done) + : evaluator(expr, thread_pool), on_done(std::move(done)) {} + + ~TensorAsyncExecutorContext() { + on_done(); + evaluator.cleanup(); + } + + Evaluator evaluator; + + private: + std::function<void()> on_done; + }; +}; + +template <typename Expression, bool Vectorizable> +class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ true> { + public: + typedef typename traits<Expression>::Index StorageIndex; + typedef typename traits<Expression>::Scalar Scalar; + typedef typename remove_const<Scalar>::type ScalarNoConst; + + static const int NumDims = traits<Expression>::NumDimensions; + + typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator; + typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> BlockMapper; + typedef TensorExecutorTilingContext<BlockMapper> TilingContext; + + static EIGEN_STRONG_INLINE void runAsync(const Expression& expr, + const ThreadPoolDevice& device, + std::function<void()> done) { + TensorAsyncExecutorContext* const ctx = + new TensorAsyncExecutorContext(expr, device, std::move(done)); + + Index total_size = array_prod(ctx->evaluator.dimensions()); + Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar); + + if (total_size < cache_size && + !ExpressionHasTensorBroadcastingOp<Expression>::value) { + internal::TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, + /*Tileable*/ false>::runAsync( + expr, device, [ctx]() { delete ctx; }); + return; + } + + // TODO(ezhulenev): This is a potentially blocking operation. Make it async! + const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr); + + if (needs_assign) { + ctx->tiling = + internal::GetTensorExecutorTilingContext<Evaluator, BlockMapper, + Vectorizable>(device, ctx->evaluator); + + device.parallelForAsync( + ctx->tiling.block_mapper.total_block_count(), ctx->tiling.cost, + [ctx](StorageIndex firstIdx, StorageIndex lastIdx) { + ScalarNoConst* thread_buf = + ctx->tiling.template GetCurrentThreadBuffer<ScalarNoConst>(ctx->device); + for (StorageIndex i = firstIdx; i < lastIdx; ++i) { + auto block = ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf); + ctx->evaluator.evalBlock(&block); + } + }, + [ctx]() { delete ctx; }); + } + } + + private: + struct TensorAsyncExecutorContext { + TensorAsyncExecutorContext(const Expression& expr, + const ThreadPoolDevice& thread_pool, + std::function<void()> done) + : device(thread_pool), + evaluator(expr, thread_pool), + on_done(std::move(done)) {} + + ~TensorAsyncExecutorContext() { + on_done(); + device.deallocate(tiling.buffer); + evaluator.cleanup(); + } + + const ThreadPoolDevice& device; + Evaluator evaluator; + TilingContext tiling; + + private: + std::function<void()> on_done; + }; +}; + #endif // EIGEN_USE_THREADS @@ -419,7 +602,7 @@ template <typename Expression, bool Vectorizable, bool Tileable> EIGEN_STRONG_INLINE void TensorExecutor<Expression, GpuDevice, Vectorizable, Tileable>::run( const Expression& expr, const GpuDevice& device) { TensorEvaluator<Expression, GpuDevice> evaluator(expr, device); - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { const int block_size = device.maxGpuThreadsPerBlock(); @@ -517,10 +700,10 @@ struct ExecExprFunctorKernel<Expr, false, Evaluator> template <typename Expression, bool Vectorizable, bool Tileable> class TensorExecutor<Expression, Eigen::SyclDevice, Vectorizable, Tileable> { public: - typedef typename Expression::Index Index; + typedef typename Expression::Index Index; static EIGEN_STRONG_INLINE void run(const Expression &expr, const Eigen::SyclDevice &dev) { Eigen::TensorEvaluator<Expression, Eigen::SyclDevice> evaluator(expr, dev); - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { Index range, GRange, tileSize; Index total_size = ::Eigen::internal::array_prod(evaluator.dimensions()); diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h b/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h index 3cca0c7e9..e823bd932 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h @@ -94,6 +94,7 @@ template<typename XprType, template <class> class MakePointer_ = MakePointer> cl template<typename XprType> class TensorForcedEvalOp; template<typename ExpressionType, typename DeviceType> class TensorDevice; +template<typename ExpressionType, typename DeviceType> class TensorAsyncDevice; template<typename Derived, typename Device> struct TensorEvaluator; struct NoOpOutputKernel; @@ -167,6 +168,11 @@ template <typename Expression, typename Device, bool Tileable = IsTileable<Device, Expression>::value> class TensorExecutor; +template <typename Expression, typename Device, + bool Vectorizable = IsVectorizable<Device, Expression>::value, + bool Tileable = IsTileable<Device, Expression>::value> +class TensorAsyncExecutor; + } // end namespace internal } // end namespace Eigen diff --git a/unsupported/test/cxx11_tensor_executor.cpp b/unsupported/test/cxx11_tensor_executor.cpp index e9922a48d..f4d0401da 100644 --- a/unsupported/test/cxx11_tensor_executor.cpp +++ b/unsupported/test/cxx11_tensor_executor.cpp @@ -562,37 +562,112 @@ static void test_execute_reverse_rvalue(Device d) } } +template <typename T, int NumDims, typename Device, bool Vectorizable, + bool Tileable, int Layout> +static void test_async_execute_unary_expr(Device d) +{ + static constexpr int Options = 0 | Layout; + + // Pick a large enough tensor size to bypass small tensor block evaluation + // optimization. + auto dims = RandomDims<NumDims>(50 / NumDims, 100 / NumDims); + + Tensor<T, NumDims, Options, Index> src(dims); + Tensor<T, NumDims, Options, Index> dst(dims); + + src.setRandom(); + const auto expr = src.square(); + + using Assign = TensorAssignOp<decltype(dst), const decltype(expr)>; + using Executor = internal::TensorAsyncExecutor<const Assign, Device, + Vectorizable, Tileable>; + Eigen::Barrier done(1); + Executor::runAsync(Assign(dst, expr), d, [&done]() { done.Notify(); }); + done.Wait(); + + for (Index i = 0; i < dst.dimensions().TotalSize(); ++i) { + T square = src.coeff(i) * src.coeff(i); + VERIFY_IS_EQUAL(square, dst.coeff(i)); + } +} + +template <typename T, int NumDims, typename Device, bool Vectorizable, + bool Tileable, int Layout> +static void test_async_execute_binary_expr(Device d) +{ + static constexpr int Options = 0 | Layout; + + // Pick a large enough tensor size to bypass small tensor block evaluation + // optimization. + auto dims = RandomDims<NumDims>(50 / NumDims, 100 / NumDims); + + Tensor<T, NumDims, Options, Index> lhs(dims); + Tensor<T, NumDims, Options, Index> rhs(dims); + Tensor<T, NumDims, Options, Index> dst(dims); + + lhs.setRandom(); + rhs.setRandom(); + + const auto expr = lhs + rhs; + + using Assign = TensorAssignOp<decltype(dst), const decltype(expr)>; + using Executor = internal::TensorAsyncExecutor<const Assign, Device, + Vectorizable, Tileable>; + + Eigen::Barrier done(1); + Executor::runAsync(Assign(dst, expr), d, [&done]() { done.Notify(); }); + done.Wait(); + + for (Index i = 0; i < dst.dimensions().TotalSize(); ++i) { + T sum = lhs.coeff(i) + rhs.coeff(i); + VERIFY_IS_EQUAL(sum, dst.coeff(i)); + } +} + #ifdef EIGEN_DONT_VECTORIZE #define VECTORIZABLE(VAL) !EIGEN_DONT_VECTORIZE && VAL -#else +#else #define VECTORIZABLE(VAL) VAL #endif #define CALL_SUBTEST_PART(PART) \ CALL_SUBTEST_##PART -#define CALL_SUBTEST_COMBINATIONS(PART, NAME, T, NUM_DIMS) \ - CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, false, ColMajor>(default_device))); \ - CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, true, ColMajor>(default_device))); \ +#define CALL_SUBTEST_COMBINATIONS(PART, NAME, T, NUM_DIMS) \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, false, ColMajor>(default_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, true, ColMajor>(default_device))); \ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, VECTORIZABLE(true), false, ColMajor>(default_device))); \ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, VECTORIZABLE(true), true, ColMajor>(default_device))); \ - CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, false, RowMajor>(default_device))); \ - CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, true, RowMajor>(default_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, false, RowMajor>(default_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, true, RowMajor>(default_device))); \ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, VECTORIZABLE(true), false, RowMajor>(default_device))); \ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, VECTORIZABLE(true), true, RowMajor>(default_device))); \ - CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, ColMajor>(tp_device))); \ - CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, ColMajor>(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, ColMajor>(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, ColMajor>(tp_device))); \ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), false, ColMajor>(tp_device))); \ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), true, ColMajor>(tp_device))); \ - CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, RowMajor>(tp_device))); \ - CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, RowMajor>(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, RowMajor>(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, RowMajor>(tp_device))); \ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), false, RowMajor>(tp_device))); \ CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), true, RowMajor>(tp_device))) +// NOTE: Currently only ThreadPoolDevice supports async expression evaluation. +#define CALL_ASYNC_SUBTEST_COMBINATIONS(PART, NAME, T, NUM_DIMS) \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, ColMajor>(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, ColMajor>(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), false, ColMajor>(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), true, ColMajor>(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, RowMajor>(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, RowMajor>(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), false, RowMajor>(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), true, RowMajor>(tp_device))) + EIGEN_DECLARE_TEST(cxx11_tensor_executor) { Eigen::DefaultDevice default_device; + // Default device is unused in ASYNC tests. + EIGEN_UNUSED_VARIABLE(default_device); - const auto num_threads = internal::random<int>(1, 24); + const auto num_threads = internal::random<int>(20, 24); Eigen::ThreadPool tp(num_threads); Eigen::ThreadPoolDevice tp_device(&tp, num_threads); @@ -660,8 +735,16 @@ EIGEN_DECLARE_TEST(cxx11_tensor_executor) { CALL_SUBTEST_COMBINATIONS(14, test_execute_reverse_rvalue, float, 4); CALL_SUBTEST_COMBINATIONS(14, test_execute_reverse_rvalue, float, 5); + CALL_ASYNC_SUBTEST_COMBINATIONS(15, test_async_execute_unary_expr, float, 3); + CALL_ASYNC_SUBTEST_COMBINATIONS(15, test_async_execute_unary_expr, float, 4); + CALL_ASYNC_SUBTEST_COMBINATIONS(15, test_async_execute_unary_expr, float, 5); + + CALL_ASYNC_SUBTEST_COMBINATIONS(16, test_async_execute_binary_expr, float, 3); + CALL_ASYNC_SUBTEST_COMBINATIONS(16, test_async_execute_binary_expr, float, 4); + CALL_ASYNC_SUBTEST_COMBINATIONS(16, test_async_execute_binary_expr, float, 5); + // Force CMake to split this test. - // EIGEN_SUFFIXES;1;2;3;4;5;6;7;8;9;10;11;12;13;14 + // EIGEN_SUFFIXES;1;2;3;4;5;6;7;8;9;10;11;12;13;14;15;16 } #undef CALL_SUBTEST_COMBINATIONS diff --git a/unsupported/test/cxx11_tensor_thread_pool.cpp b/unsupported/test/cxx11_tensor_thread_pool.cpp index f8a7b3662..53b50d1ed 100644 --- a/unsupported/test/cxx11_tensor_thread_pool.cpp +++ b/unsupported/test/cxx11_tensor_thread_pool.cpp @@ -38,9 +38,9 @@ class TestAllocator : public Allocator { void test_multithread_elementwise() { - Tensor<float, 3> in1(2,3,7); - Tensor<float, 3> in2(2,3,7); - Tensor<float, 3> out(2,3,7); + Tensor<float, 3> in1(200, 30, 70); + Tensor<float, 3> in2(200, 30, 70); + Tensor<float, 3> out(200, 30, 70); in1.setRandom(); in2.setRandom(); @@ -49,15 +49,39 @@ void test_multithread_elementwise() Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11)); out.device(thread_pool_device) = in1 + in2 * 3.14f; - for (int i = 0; i < 2; ++i) { - for (int j = 0; j < 3; ++j) { - for (int k = 0; k < 7; ++k) { - VERIFY_IS_APPROX(out(i,j,k), in1(i,j,k) + in2(i,j,k) * 3.14f); + for (int i = 0; i < 200; ++i) { + for (int j = 0; j < 30; ++j) { + for (int k = 0; k < 70; ++k) { + VERIFY_IS_APPROX(out(i, j, k), in1(i, j, k) + in2(i, j, k) * 3.14f); } } } } +void test_async_multithread_elementwise() +{ + Tensor<float, 3> in1(200, 30, 70); + Tensor<float, 3> in2(200, 30, 70); + Tensor<float, 3> out(200, 30, 70); + + in1.setRandom(); + in2.setRandom(); + + Eigen::ThreadPool tp(internal::random<int>(3, 11)); + Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11)); + + Eigen::Barrier b(1); + out.device(thread_pool_device, [&b]() { b.Notify(); }) = in1 + in2 * 3.14f; + b.Wait(); + + for (int i = 0; i < 200; ++i) { + for (int j = 0; j < 30; ++j) { + for (int k = 0; k < 70; ++k) { + VERIFY_IS_APPROX(out(i, j, k), in1(i, j, k) + in2(i, j, k) * 3.14f); + } + } + } +} void test_multithread_compound_assignment() { @@ -516,6 +540,7 @@ void test_threadpool_allocate(TestAllocator* allocator) EIGEN_DECLARE_TEST(cxx11_tensor_thread_pool) { CALL_SUBTEST_1(test_multithread_elementwise()); + CALL_SUBTEST_1(test_async_multithread_elementwise()); CALL_SUBTEST_1(test_multithread_compound_assignment()); CALL_SUBTEST_2(test_multithread_contraction<ColMajor>()); |