diff options
author | Eugene Zhulenev <ezhulenev@google.com> | 2019-08-30 14:49:40 -0700 |
---|---|---|
committer | Eugene Zhulenev <ezhulenev@google.com> | 2019-08-30 14:49:40 -0700 |
commit | 66665e7e76d2ad5aa37775b3777e9a53c6d1c18c (patch) | |
tree | cb62a23e970d9125475abd95e4c9e68a02a04461 /unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h | |
parent | f6c51d9209ccc04d28c39f4c8059e7d3e74d6e07 (diff) |
Asynchronous expression evaluation with TensorAsyncDevice
Diffstat (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h')
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h | 271 |
1 files changed, 227 insertions, 44 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h index fddb90d77..18d9de9e6 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h @@ -84,7 +84,7 @@ class TensorExecutor { EIGEN_DEVICE_FUNC static EIGEN_STRONG_INLINE void run(const Expression& expr, - const Device& device = Device()) { + const Device& device = Device()) { TensorEvaluator<Expression, Device> evaluator(expr, device); const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); if (needs_assign) { @@ -98,6 +98,14 @@ class TensorExecutor { }; /** + * Default async execution strategy is not implemented. Currently it's only + * available for ThreadPoolDevice (see definition below). + */ +template <typename Expression, typename Device, bool Vectorizable, + bool Tileable> +class TensorAsyncExecutor {}; + +/** * Process all the data with a single cpu thread, using vectorized instructions. */ template <typename Expression> @@ -107,8 +115,8 @@ class TensorExecutor<Expression, DefaultDevice, /*Vectorizable*/ true, typedef typename Expression::Index StorageIndex; EIGEN_DEVICE_FUNC - static EIGEN_STRONG_INLINE void run(const Expression& expr, - const DefaultDevice& device = DefaultDevice()) { + static EIGEN_STRONG_INLINE void run( + const Expression& expr, const DefaultDevice& device = DefaultDevice()) { TensorEvaluator<Expression, DefaultDevice> evaluator(expr, device); const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); if (needs_assign) { @@ -206,8 +214,81 @@ class TensorExecutor<Expression, DefaultDevice, Vectorizable, /** * Multicore strategy: the index space is partitioned and each partition is * executed on a single core. + * + * (1) TensorExecutor will submit work to the ThreadPoolDevice managed thread + * pool, and will block the caller thread until all tasks are finished. + * + * (2) TensorAsyncExecutor is a non-blocking version, that will submit work to + * the ThreadPoolDevice managed thread pool, and will return immediately. + * It will call 'done' callback after all tasks are finished. */ #ifdef EIGEN_USE_THREADS + +template <typename TensorBlockMapper> +struct TensorExecutorTilingContext { + typedef typename TensorBlockMapper::Block TensorBlock; + + TensorExecutorTilingContext() : buffer(nullptr) {} + TensorExecutorTilingContext(const TensorBlockMapper& b_mapper, + const TensorOpCost& b_cost, void* b_buffer, + size_t b_aligned_size) + : block_mapper(b_mapper), + cost(b_cost), + buffer(b_buffer), + aligned_blocksize(b_aligned_size) {} + + template <typename Scalar> + Scalar* GetCurrentThreadBuffer(const ThreadPoolDevice& device) const { + // ThreadPoolDevice::currentThreadId() returns -1 if called from a thread + // not in the thread pool, such as the main thread dispatching Eigen + // expressions. + const int thread_idx = device.currentThreadId(); + eigen_assert(thread_idx >= -1 && thread_idx < device.numThreads()); + + const Index offset = aligned_blocksize * (thread_idx + 1); + return reinterpret_cast<Scalar*>(static_cast<char*>(buffer) + offset); + } + + TensorBlockMapper block_mapper; // navigate through blocks + TensorOpCost cost; // cost of computing a single block + void* buffer; // temporary buffer for blocks + size_t aligned_blocksize; // block size after memory alignment +}; + +// Computes a block evaluation parameters, and allocates temporary memory buffer +// for blocks. See TensorExecutor/TensorAsyncExecutor (Tileable=true) below. +template <typename Evaluator, typename TensorBlockMapper, bool Vectorizable> +TensorExecutorTilingContext<TensorBlockMapper> GetTensorExecutorTilingContext( + const ThreadPoolDevice& device, const Evaluator& evaluator) { + // Prefer blocks skewed toward inner dimension. + TensorBlockShapeType block_shape = kSkewedInnerDims; + Index block_total_size = 0; + + // Query expression tree for desired block size/shape. + std::vector<TensorOpResourceRequirements> resources; + evaluator.getResourceRequirements(&resources); + MergeResourceRequirements(resources, &block_shape, &block_total_size); + int num_threads = device.numThreads(); + + // Estimate minimum block size based on cost. + TensorOpCost cost = evaluator.costPerCoeff(Vectorizable); + double taskSize = TensorCostModel<ThreadPoolDevice>::taskSize(1, cost); + size_t block_size = static_cast<size_t>(1.0 / taskSize); + + TensorBlockMapper block_mapper( + typename TensorBlockMapper::Dimensions(evaluator.dimensions()), + block_shape, block_size); + + block_size = block_mapper.block_dims_total_size(); + const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1); + const size_t aligned_blocksize = + align * + divup<size_t>(block_size * sizeof(typename Evaluator::Scalar), align); + void* buf = device.allocate((num_threads + 1) * aligned_blocksize); + + return {block_mapper, cost * block_size, buf, aligned_blocksize}; +} + template <typename Evaluator, typename StorageIndex, bool Vectorizable> struct EvalRange { static void run(Evaluator* evaluator_in, const StorageIndex firstIdx, @@ -274,7 +355,7 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> { typedef EvalRange<Evaluator, StorageIndex, Vectorizable> EvalRange; Evaluator evaluator(expr, device); - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { const StorageIndex size = array_prod(evaluator.dimensions()); device.parallelFor(size, evaluator.costPerCoeff(Vectorizable), @@ -290,18 +371,18 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> { template <typename Expression, bool Vectorizable> class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ true> { public: + typedef typename traits<Expression>::Index StorageIndex; typedef typename traits<Expression>::Scalar Scalar; typedef typename remove_const<Scalar>::type ScalarNoConst; - typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator; - typedef typename traits<Expression>::Index StorageIndex; - static const int NumDims = traits<Expression>::NumDimensions; + typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator; + typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> BlockMapper; + typedef TensorExecutorTilingContext<BlockMapper> TilingContext; + static EIGEN_STRONG_INLINE void run(const Expression& expr, const ThreadPoolDevice& device) { - typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> TensorBlockMapper; - Evaluator evaluator(expr, device); Index total_size = array_prod(evaluator.dimensions()); Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar); @@ -315,50 +396,152 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ tr return; } - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { - TensorBlockShapeType block_shape = kSkewedInnerDims; - Index block_total_size = 0; - // Query expression tree for desired block size/shape. - std::vector<internal::TensorOpResourceRequirements> resources; - evaluator.getResourceRequirements(&resources); - MergeResourceRequirements(resources, &block_shape, &block_total_size); - int num_threads = device.numThreads(); + const TilingContext tiling = + internal::GetTensorExecutorTilingContext<Evaluator, BlockMapper, + Vectorizable>(device, evaluator); - // Estimate minimum block size based on cost. - TensorOpCost cost = evaluator.costPerCoeff(Vectorizable); - double taskSize = TensorCostModel<ThreadPoolDevice>::taskSize(1, cost); - size_t block_size = static_cast<size_t>(1.0 / taskSize); - TensorBlockMapper block_mapper( - typename TensorBlockMapper::Dimensions(evaluator.dimensions()), - block_shape, block_size); - block_size = block_mapper.block_dims_total_size(); - const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1); - const size_t aligned_blocksize = - align * divup<size_t>(block_size * sizeof(Scalar), align); - void* buf = device.allocate((num_threads + 1) * aligned_blocksize); device.parallelFor( - block_mapper.total_block_count(), cost * block_size, - [=, &device, &evaluator, &block_mapper](StorageIndex firstIdx, - StorageIndex lastIdx) { - // currentThreadId() returns -1 if called from a thread not in the - // thread pool, such as the main thread dispatching Eigen - // expressions. - const int thread_idx = device.currentThreadId(); - eigen_assert(thread_idx >= -1 && thread_idx < num_threads); - ScalarNoConst* thread_buf = reinterpret_cast<ScalarNoConst*>( - static_cast<char*>(buf) + aligned_blocksize * (thread_idx + 1)); + tiling.block_mapper.total_block_count(), tiling.cost, + [=, &device, &evaluator, &tiling](StorageIndex firstIdx, + StorageIndex lastIdx) { + ScalarNoConst* thread_buf = + tiling.template GetCurrentThreadBuffer<ScalarNoConst>(device); for (StorageIndex i = firstIdx; i < lastIdx; ++i) { - auto block = block_mapper.GetBlockForIndex(i, thread_buf); + auto block = tiling.block_mapper.GetBlockForIndex(i, thread_buf); evaluator.evalBlock(&block); } }); - device.deallocate(buf); + device.deallocate(tiling.buffer); } evaluator.cleanup(); } }; +template <typename Expression, bool Vectorizable, bool Tileable> +class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> { + public: + typedef typename Expression::Index StorageIndex; + typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator; + + static EIGEN_STRONG_INLINE void runAsync(const Expression& expr, + const ThreadPoolDevice& device, + std::function<void()> done) { + TensorAsyncExecutorContext* const ctx = + new TensorAsyncExecutorContext(expr, device, std::move(done)); + // TODO(ezhulenev): This is a potentially blocking operation. Make it async! + const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr); + + typedef EvalRange<Evaluator, StorageIndex, Vectorizable> EvalRange; + + if (needs_assign) { + const StorageIndex size = array_prod(ctx->evaluator.dimensions()); + device.parallelForAsync( + size, ctx->evaluator.costPerCoeff(Vectorizable), + EvalRange::alignBlockSize, + [ctx](StorageIndex firstIdx, StorageIndex lastIdx) { + EvalRange::run(&ctx->evaluator, firstIdx, lastIdx); + }, + [ctx]() { delete ctx; }); + } + } + + private: + struct TensorAsyncExecutorContext { + TensorAsyncExecutorContext(const Expression& expr, + const ThreadPoolDevice& thread_pool, + std::function<void()> done) + : evaluator(expr, thread_pool), on_done(std::move(done)) {} + + ~TensorAsyncExecutorContext() { + on_done(); + evaluator.cleanup(); + } + + Evaluator evaluator; + + private: + std::function<void()> on_done; + }; +}; + +template <typename Expression, bool Vectorizable> +class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ true> { + public: + typedef typename traits<Expression>::Index StorageIndex; + typedef typename traits<Expression>::Scalar Scalar; + typedef typename remove_const<Scalar>::type ScalarNoConst; + + static const int NumDims = traits<Expression>::NumDimensions; + + typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator; + typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> BlockMapper; + typedef TensorExecutorTilingContext<BlockMapper> TilingContext; + + static EIGEN_STRONG_INLINE void runAsync(const Expression& expr, + const ThreadPoolDevice& device, + std::function<void()> done) { + TensorAsyncExecutorContext* const ctx = + new TensorAsyncExecutorContext(expr, device, std::move(done)); + + Index total_size = array_prod(ctx->evaluator.dimensions()); + Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar); + + if (total_size < cache_size && + !ExpressionHasTensorBroadcastingOp<Expression>::value) { + internal::TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, + /*Tileable*/ false>::runAsync( + expr, device, [ctx]() { delete ctx; }); + return; + } + + // TODO(ezhulenev): This is a potentially blocking operation. Make it async! + const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr); + + if (needs_assign) { + ctx->tiling = + internal::GetTensorExecutorTilingContext<Evaluator, BlockMapper, + Vectorizable>(device, ctx->evaluator); + + device.parallelForAsync( + ctx->tiling.block_mapper.total_block_count(), ctx->tiling.cost, + [ctx](StorageIndex firstIdx, StorageIndex lastIdx) { + ScalarNoConst* thread_buf = + ctx->tiling.template GetCurrentThreadBuffer<ScalarNoConst>(ctx->device); + for (StorageIndex i = firstIdx; i < lastIdx; ++i) { + auto block = ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf); + ctx->evaluator.evalBlock(&block); + } + }, + [ctx]() { delete ctx; }); + } + } + + private: + struct TensorAsyncExecutorContext { + TensorAsyncExecutorContext(const Expression& expr, + const ThreadPoolDevice& thread_pool, + std::function<void()> done) + : device(thread_pool), + evaluator(expr, thread_pool), + on_done(std::move(done)) {} + + ~TensorAsyncExecutorContext() { + on_done(); + device.deallocate(tiling.buffer); + evaluator.cleanup(); + } + + const ThreadPoolDevice& device; + Evaluator evaluator; + TilingContext tiling; + + private: + std::function<void()> on_done; + }; +}; + #endif // EIGEN_USE_THREADS @@ -419,7 +602,7 @@ template <typename Expression, bool Vectorizable, bool Tileable> EIGEN_STRONG_INLINE void TensorExecutor<Expression, GpuDevice, Vectorizable, Tileable>::run( const Expression& expr, const GpuDevice& device) { TensorEvaluator<Expression, GpuDevice> evaluator(expr, device); - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { const int block_size = device.maxGpuThreadsPerBlock(); @@ -517,10 +700,10 @@ struct ExecExprFunctorKernel<Expr, false, Evaluator> template <typename Expression, bool Vectorizable, bool Tileable> class TensorExecutor<Expression, Eigen::SyclDevice, Vectorizable, Tileable> { public: - typedef typename Expression::Index Index; + typedef typename Expression::Index Index; static EIGEN_STRONG_INLINE void run(const Expression &expr, const Eigen::SyclDevice &dev) { Eigen::TensorEvaluator<Expression, Eigen::SyclDevice> evaluator(expr, dev); - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { Index range, GRange, tileSize; Index total_size = ::Eigen::internal::array_prod(evaluator.dimensions()); |