diff options
author | Rasmus Munk Larsen <rmlarsen@google.com> | 2019-10-22 12:42:44 -0700 |
---|---|---|
committer | Rasmus Munk Larsen <rmlarsen@google.com> | 2019-10-22 12:42:44 -0700 |
commit | 97c0c5d485ddec0369326825a41db48d8505cf4c (patch) | |
tree | 9072616f37eacc24f407061ac74954d67da8c5ee | |
parent | 668ab3fc474e54c7919eda4fbaf11f3a99246494 (diff) |
Add block evaluation V2 to TensorAsyncExecutor.
Add async evaluation to a number of ops.
9 files changed, 226 insertions, 66 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h b/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h index a951bea6d..7aa98fac6 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h @@ -1129,16 +1129,11 @@ class TensorBase : public TensorBase<Derived, ReadOnlyAccessors> { return TensorDevice<Derived, DeviceType>(dev, derived()); } -#ifdef EIGEN_USE_THREADS // Select the async device on which to evaluate the expression. template <typename DeviceType, typename DoneCallback> - typename internal::enable_if< - internal::is_same<DeviceType, ThreadPoolDevice>::value, - TensorAsyncDevice<Derived, DeviceType, DoneCallback>>::type - device(const DeviceType& dev, DoneCallback done) { + TensorAsyncDevice<Derived, DeviceType, DoneCallback> device(const DeviceType& dev, DoneCallback done) { return TensorAsyncDevice<Derived, DeviceType, DoneCallback>(dev, derived(), std::move(done)); } -#endif // EIGEN_USE_THREADS protected: EIGEN_DEVICE_FUNC diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h index cc9c65702..804a16cc5 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h @@ -63,18 +63,18 @@ template <typename ExpressionType, typename DeviceType> class TensorDevice { ExpressionType& m_expression; }; -#ifdef EIGEN_USE_THREADS - /** \class TensorAsyncDevice - * \ingroup CXX11_Tensor_Module - * - * \brief Pseudo expression providing an operator = that will evaluate its - * argument asynchronously on the specified device (currently supports only - * ThreadPoolDevice). - * - * Example: - * auto done = []() { ... expression evaluation done ... }; - * C.device(EIGEN_THREAD_POOL, std::move(done)) = A + B; + * \ingroup CXX11_Tensor_Module + * + * \brief Pseudo expression providing an operator = that will evaluate its + * argument asynchronously on the specified device. Currently only + * ThreadPoolDevice implements proper asynchronous execution, while the default + * and GPU devices just run the expression synchronously and call m_done() on + * completion.. + * + * Example: + * auto done = []() { ... expression evaluation done ... }; + * C.device(thread_pool_device, std::move(done)) = A + B; */ template <typename ExpressionType, typename DeviceType, typename DoneCallback> @@ -87,11 +87,11 @@ class TensorAsyncDevice { template <typename OtherDerived> EIGEN_STRONG_INLINE TensorAsyncDevice& operator=(const OtherDerived& other) { typedef TensorAssignOp<ExpressionType, const OtherDerived> Assign; - typedef internal::TensorAsyncExecutor<const Assign, DeviceType, DoneCallback> Executor; + typedef internal::TensorExecutor<const Assign, DeviceType> Executor; - // WARNING: After assignment 'm_done' callback will be in undefined state. Assign assign(m_expression, other); - Executor::runAsync(assign, m_device, std::move(m_done)); + Executor::run(assign, m_device); + m_done(); return *this; } @@ -102,7 +102,33 @@ class TensorAsyncDevice { DoneCallback m_done; }; -#endif // EIGEN_USE_THREADS + +#ifdef EIGEN_USE_THREADS +template <typename ExpressionType, typename DoneCallback> +class TensorAsyncDevice<ExpressionType, ThreadPoolDevice, DoneCallback> { + public: + TensorAsyncDevice(const ThreadPoolDevice& device, ExpressionType& expression, + DoneCallback done) + : m_device(device), m_expression(expression), m_done(std::move(done)) {} + + template <typename OtherDerived> + EIGEN_STRONG_INLINE TensorAsyncDevice& operator=(const OtherDerived& other) { + typedef TensorAssignOp<ExpressionType, const OtherDerived> Assign; + typedef internal::TensorAsyncExecutor<const Assign, ThreadPoolDevice, DoneCallback> Executor; + + // WARNING: After assignment 'm_done' callback will be in undefined state. + Assign assign(m_expression, other); + Executor::runAsync(assign, m_device, std::move(m_done)); + + return *this; + } + + protected: + const ThreadPoolDevice& m_device; + ExpressionType& m_expression; + DoneCallback m_done; +}; +#endif } // end namespace Eigen diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorEvalTo.h b/unsupported/Eigen/CXX11/src/Tensor/TensorEvalTo.h index 22fc64c1f..cd1338c66 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorEvalTo.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorEvalTo.h @@ -151,6 +151,16 @@ struct TensorEvaluator<const TensorEvalToOp<ArgType, MakePointer_>, Device> return m_impl.evalSubExprsIfNeeded(m_buffer); } +#ifdef EIGEN_USE_THREADS + template <typename EvalSubExprsCallback> + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync( + EvaluatorPointerType scalar, EvalSubExprsCallback done) { + EIGEN_UNUSED_VARIABLE(scalar); + eigen_assert(scalar == NULL); + m_impl.evalSubExprsIfNeededAsync(m_buffer, std::move(done)); + } +#endif + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalScalar(Index i) { m_buffer[i] = m_impl.coeff(i); } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h index 11cec3d1c..f736c238f 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h @@ -102,7 +102,7 @@ class TensorExecutor { * available for ThreadPoolDevice (see definition below). */ template <typename Expression, typename Device, typename DoneCallback, - bool Vectorizable, bool Tileable> + bool Vectorizable, TiledEvaluation Tiling> class TensorAsyncExecutor {}; /** @@ -544,9 +544,9 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, }; template <typename Expression, typename DoneCallback, bool Vectorizable, - bool Tileable> + TiledEvaluation Tiling> class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback, - Vectorizable, Tileable> { + Vectorizable, Tiling> { public: typedef typename Expression::Index StorageIndex; typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator; @@ -598,7 +598,7 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback, template <typename Expression, typename DoneCallback, bool Vectorizable> class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback, - Vectorizable, /*Tileable*/ true> { + Vectorizable, /*Tileable*/ TiledEvaluation::Legacy> { public: typedef typename traits<Expression>::Index StorageIndex; typedef typename traits<Expression>::Scalar Scalar; @@ -607,7 +607,9 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback, static const int NumDims = traits<Expression>::NumDimensions; typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator; - typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> BlockMapper; + typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, + Evaluator::Layout> + BlockMapper; typedef TensorExecutorTilingContext<BlockMapper> TilingContext; static EIGEN_STRONG_INLINE void runAsync(const Expression& expr, @@ -624,7 +626,7 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback, auto delete_ctx = [ctx]() { delete ctx; }; internal::TensorAsyncExecutor< Expression, ThreadPoolDevice, decltype(delete_ctx), Vectorizable, - /*Tileable*/ false>::runAsync(expr, device, std::move(delete_ctx)); + /*Tileable*/ TiledEvaluation::Off>::runAsync(expr, device, std::move(delete_ctx)); return; } @@ -635,22 +637,102 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback, } ctx->tiling = - GetTensorExecutorTilingContext<Evaluator, BlockMapper, - Vectorizable>(device, ctx->evaluator); + GetTensorExecutorTilingContext<Evaluator, BlockMapper, Vectorizable>( + device, ctx->evaluator); + + auto eval_block = [ctx](StorageIndex firstIdx, StorageIndex lastIdx) { + ScalarNoConst* thread_buf = + ctx->tiling.template GetCurrentThreadBuffer<ScalarNoConst>( + ctx->device); + for (StorageIndex i = firstIdx; i < lastIdx; ++i) { + auto block = ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf); + ctx->evaluator.evalBlock(&block); + } + }; + device.parallelForAsync(ctx->tiling.block_mapper.total_block_count(), + ctx->tiling.cost, eval_block, + [ctx]() { delete ctx; }); + }; - device.parallelForAsync( - ctx->tiling.block_mapper.total_block_count(), ctx->tiling.cost, - [ctx](StorageIndex firstIdx, StorageIndex lastIdx) { - ScalarNoConst* thread_buf = - ctx->tiling.template GetCurrentThreadBuffer<ScalarNoConst>( - ctx->device); - for (StorageIndex i = firstIdx; i < lastIdx; ++i) { - auto block = - ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf); - ctx->evaluator.evalBlock(&block); - } - }, - [ctx]() { delete ctx; }); + ctx->evaluator.evalSubExprsIfNeededAsync(nullptr, on_eval_subexprs); + } + + private: + struct TensorAsyncExecutorContext { + TensorAsyncExecutorContext(const Expression& expr, + const ThreadPoolDevice& thread_pool, + DoneCallback done) + : device(thread_pool), + evaluator(expr, thread_pool), + on_done(std::move(done)) {} + + ~TensorAsyncExecutorContext() { + on_done(); + device.deallocate(tiling.buffer); + evaluator.cleanup(); + } + + const ThreadPoolDevice& device; + Evaluator evaluator; + TilingContext tiling; + + private: + DoneCallback on_done; + }; +}; + +template <typename Expression, typename DoneCallback, bool Vectorizable> +class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback, + Vectorizable, /*Tileable*/ TiledEvaluation::On> { + public: + typedef typename traits<Expression>::Index IndexType; + typedef typename traits<Expression>::Scalar Scalar; + typedef typename remove_const<Scalar>::type ScalarNoConst; + + static const int NumDims = traits<Expression>::NumDimensions; + + typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator; + typedef TensorBlockMapper<ScalarNoConst, IndexType, NumDims, + Evaluator::Layout> + BlockMapper; + typedef TensorExecutorTilingContext<BlockMapper> TilingContext; + + typedef internal::TensorBlockDescriptor<NumDims, IndexType> TensorBlockDesc; + typedef internal::TensorBlockScratchAllocator<ThreadPoolDevice> + TensorBlockScratch; + + static EIGEN_STRONG_INLINE void runAsync(const Expression& expr, + const ThreadPoolDevice& device, + DoneCallback done) { + + TensorAsyncExecutorContext* const ctx = + new TensorAsyncExecutorContext(expr, device, std::move(done)); + + const auto on_eval_subexprs = [ctx](bool need_assign) -> void { + if (!need_assign) { + delete ctx; + return; + } + + ctx->tiling = + internal::GetTensorExecutorTilingContext<Evaluator, BlockMapper, + Vectorizable>( + ctx->device, ctx->evaluator, /*allocate_buffer=*/false); + + auto eval_block = [ctx](IndexType firstBlockIdx, IndexType lastBlockIdx) { + TensorBlockScratch scratch(ctx->device); + + for (IndexType block_idx = firstBlockIdx; block_idx < lastBlockIdx; + ++block_idx) { + auto block = + ctx->tiling.block_mapper.GetBlockForIndex(block_idx, nullptr); + TensorBlockDesc desc(block.first_coeff_index(), block.block_sizes()); + ctx->evaluator.evalBlockV2(desc, scratch); + scratch.reset(); + } + }; + ctx->device.parallelForAsync(ctx->tiling.block_mapper.total_block_count(), + ctx->tiling.cost, eval_block, [ctx]() { delete ctx; }); }; ctx->evaluator.evalSubExprsIfNeededAsync(nullptr, on_eval_subexprs); @@ -682,7 +764,6 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback, #endif // EIGEN_USE_THREADS - // GPU: the evaluation of the expression is offloaded to a GPU. #if defined(EIGEN_USE_GPU) diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorForcedEval.h b/unsupported/Eigen/CXX11/src/Tensor/TensorForcedEval.h index 7d12e781e..e5b67a18c 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorForcedEval.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorForcedEval.h @@ -132,14 +132,6 @@ struct TensorEvaluator<const TensorForcedEvalOp<ArgType_>, Device> EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(EvaluatorPointerType) { const Index numValues = internal::array_prod(m_impl.dimensions()); m_buffer = m_device.get((CoeffReturnType*)m_device.allocate_temp(numValues * sizeof(CoeffReturnType))); - #ifndef EIGEN_USE_SYCL - // Should initialize the memory in case we're dealing with non POD types. - if (NumTraits<CoeffReturnType>::RequireInitialization) { - for (Index i = 0; i < numValues; ++i) { - new(m_buffer+i) CoeffReturnType(); - } - } - #endif typedef TensorEvalToOp< const typename internal::remove_const<ArgType>::type > EvalTo; EvalTo evalToTmp(m_device.get(m_buffer), m_op); @@ -151,6 +143,29 @@ struct TensorEvaluator<const TensorForcedEvalOp<ArgType_>, Device> return true; } + +#ifdef EIGEN_USE_THREADS + template <typename EvalSubExprsCallback> + EIGEN_STRONG_INLINE EIGEN_DEVICE_FUNC void evalSubExprsIfNeededAsync( + EvaluatorPointerType, EvalSubExprsCallback done) { + const Index numValues = internal::array_prod(m_impl.dimensions()); + m_buffer = m_device.get((CoeffReturnType*)m_device.allocate_temp( + numValues * sizeof(CoeffReturnType))); + typedef TensorEvalToOp<const typename internal::remove_const<ArgType>::type> + EvalTo; + EvalTo evalToTmp(m_device.get(m_buffer), m_op); + + auto on_done = std::bind([](EvalSubExprsCallback done) { done(true); }, + std::move(done)); + internal::TensorAsyncExecutor< + const EvalTo, typename internal::remove_const<Device>::type, + decltype(on_done), + /*Vectorizable=*/internal::IsVectorizable<Device, const ArgType>::value, + /*Tiling=*/internal::IsTileable<Device, const ArgType>::value>:: + runAsync(evalToTmp, m_device, std::move(on_done)); + } +#endif + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() { m_device.deallocate_temp(m_buffer); m_buffer = NULL; diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h b/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h index 5549cbdb2..0da2d9e0d 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h @@ -185,12 +185,12 @@ template <typename Expression, typename Device, TiledEvaluation Tiling = IsTileable<Device, Expression>::value> class TensorExecutor; -// TODO(ezhulenev): Add TiledEvaluation support to async executor. template <typename Expression, typename Device, typename DoneCallback, bool Vectorizable = IsVectorizable<Device, Expression>::value, - bool Tileable = IsTileable<Device, Expression>::BlockAccess> + TiledEvaluation Tiling = IsTileable<Device, Expression>::value> class TensorAsyncExecutor; + } // end namespace internal } // end namespace Eigen diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorMorphing.h b/unsupported/Eigen/CXX11/src/Tensor/TensorMorphing.h index 606d49a20..781f1d75b 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorMorphing.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorMorphing.h @@ -205,6 +205,14 @@ struct TensorEvaluator<const TensorReshapingOp<NewDimensions, ArgType>, Device> EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE const Dimensions& dimensions() const { return m_dimensions; } +#ifdef EIGEN_USE_THREADS + template <typename EvalSubExprsCallback> + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync( + EvaluatorPointerType data, EvalSubExprsCallback done) { + m_impl.evalSubExprsIfNeededAsync(data, std::move(done)); + } +#endif + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(EvaluatorPointerType data) { return m_impl.evalSubExprsIfNeeded(data); } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h b/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h index a5c293cf9..d826cfb7e 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h @@ -689,15 +689,14 @@ struct TensorReductionEvaluatorBase<const TensorReductionOp<Op, Dims, ArgType, M EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE const Dimensions& dimensions() const { return m_dimensions; } EIGEN_STRONG_INLINE - #if !defined(EIGEN_HIPCC) - // Marking this as EIGEN_DEVICE_FUNC for HIPCC requires also doing the same for all the functions - // being called within here, which then leads to proliferation of EIGEN_DEVICE_FUNC markings, one - // of which will eventually result in an NVCC error - EIGEN_DEVICE_FUNC - #endif - bool evalSubExprsIfNeeded(EvaluatorPointerType data) { - m_impl.evalSubExprsIfNeeded(NULL); - +#if !defined(EIGEN_HIPCC) + // Marking this as EIGEN_DEVICE_FUNC for HIPCC requires also doing the same + // for all the functions being called within here, which then leads to + // proliferation of EIGEN_DEVICE_FUNC markings, one of which will eventually + // result in an NVCC error + EIGEN_DEVICE_FUNC +#endif + bool evalSubExprsIfNeededCommon(EvaluatorPointerType data) { // Use the FullReducer if possible. if ((RunningFullReduction && RunningOnSycl) ||(RunningFullReduction && internal::FullReducer<Self, Op, Device>::HasOptimizedImplementation && @@ -802,6 +801,34 @@ struct TensorReductionEvaluatorBase<const TensorReductionOp<Op, Dims, ArgType, M return true; } +#ifdef EIGEN_USE_THREADS + template <typename EvalSubExprsCallback> + EIGEN_STRONG_INLINE +#if !defined(EIGEN_HIPCC) + EIGEN_DEVICE_FUNC +#endif + void + evalSubExprsIfNeededAsync(EvaluatorPointerType data, + EvalSubExprsCallback done) { + m_impl.evalSubExprsIfNeededAsync(NULL, [this, data, done](bool) { + done(evalSubExprsIfNeededCommon(data)); + }); + } +#endif + + EIGEN_STRONG_INLINE +#if !defined(EIGEN_HIPCC) + // Marking this as EIGEN_DEVICE_FUNC for HIPCC requires also doing the same + // for all the functions being called within here, which then leads to + // proliferation of EIGEN_DEVICE_FUNC markings, one of which will eventually + // result in an NVCC error + EIGEN_DEVICE_FUNC +#endif + bool evalSubExprsIfNeeded(EvaluatorPointerType data) { + m_impl.evalSubExprsIfNeeded(NULL); + return evalSubExprsIfNeededCommon(data); + } + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() { m_impl.cleanup(); if (m_result) { diff --git a/unsupported/test/cxx11_tensor_executor.cpp b/unsupported/test/cxx11_tensor_executor.cpp index dd68ddf17..0e70e1770 100644 --- a/unsupported/test/cxx11_tensor_executor.cpp +++ b/unsupported/test/cxx11_tensor_executor.cpp @@ -604,11 +604,10 @@ static void test_async_execute_unary_expr(Device d) Eigen::Barrier done(1); auto on_done = [&done]() { done.Notify(); }; - static const bool TilingOn = Tiling == TiledEvaluation::Off ? false : true; using Assign = TensorAssignOp<decltype(dst), const decltype(expr)>; using DoneCallback = decltype(on_done); using Executor = internal::TensorAsyncExecutor<const Assign, Device, DoneCallback, - Vectorizable, TilingOn>; + Vectorizable, Tiling>; Executor::runAsync(Assign(dst, expr), d, on_done); done.Wait(); @@ -641,11 +640,10 @@ static void test_async_execute_binary_expr(Device d) Eigen::Barrier done(1); auto on_done = [&done]() { done.Notify(); }; - static const bool TilingOn = Tiling == TiledEvaluation::Off ? false : true; using Assign = TensorAssignOp<decltype(dst), const decltype(expr)>; using DoneCallback = decltype(on_done); using Executor = internal::TensorAsyncExecutor<const Assign, Device, DoneCallback, - Vectorizable, TilingOn>; + Vectorizable, Tiling>; Executor::runAsync(Assign(dst, expr), d, on_done); done.Wait(); |