From 66665e7e76d2ad5aa37775b3777e9a53c6d1c18c Mon Sep 17 00:00:00 2001 From: Eugene Zhulenev Date: Fri, 30 Aug 2019 14:49:40 -0700 Subject: Asynchronous expression evaluation with TensorAsyncDevice --- .../Eigen/CXX11/src/Tensor/TensorExecutor.h | 271 +++++++++++++++++---- 1 file changed, 227 insertions(+), 44 deletions(-) (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h') diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h index fddb90d77..18d9de9e6 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h @@ -84,7 +84,7 @@ class TensorExecutor { EIGEN_DEVICE_FUNC static EIGEN_STRONG_INLINE void run(const Expression& expr, - const Device& device = Device()) { + const Device& device = Device()) { TensorEvaluator evaluator(expr, device); const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); if (needs_assign) { @@ -97,6 +97,14 @@ class TensorExecutor { } }; +/** + * Default async execution strategy is not implemented. Currently it's only + * available for ThreadPoolDevice (see definition below). + */ +template +class TensorAsyncExecutor {}; + /** * Process all the data with a single cpu thread, using vectorized instructions. */ @@ -107,8 +115,8 @@ class TensorExecutor evaluator(expr, device); const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); if (needs_assign) { @@ -206,8 +214,81 @@ class TensorExecutor +struct TensorExecutorTilingContext { + typedef typename TensorBlockMapper::Block TensorBlock; + + TensorExecutorTilingContext() : buffer(nullptr) {} + TensorExecutorTilingContext(const TensorBlockMapper& b_mapper, + const TensorOpCost& b_cost, void* b_buffer, + size_t b_aligned_size) + : block_mapper(b_mapper), + cost(b_cost), + buffer(b_buffer), + aligned_blocksize(b_aligned_size) {} + + template + Scalar* GetCurrentThreadBuffer(const ThreadPoolDevice& device) const { + // ThreadPoolDevice::currentThreadId() returns -1 if called from a thread + // not in the thread pool, such as the main thread dispatching Eigen + // expressions. + const int thread_idx = device.currentThreadId(); + eigen_assert(thread_idx >= -1 && thread_idx < device.numThreads()); + + const Index offset = aligned_blocksize * (thread_idx + 1); + return reinterpret_cast(static_cast(buffer) + offset); + } + + TensorBlockMapper block_mapper; // navigate through blocks + TensorOpCost cost; // cost of computing a single block + void* buffer; // temporary buffer for blocks + size_t aligned_blocksize; // block size after memory alignment +}; + +// Computes a block evaluation parameters, and allocates temporary memory buffer +// for blocks. See TensorExecutor/TensorAsyncExecutor (Tileable=true) below. +template +TensorExecutorTilingContext GetTensorExecutorTilingContext( + const ThreadPoolDevice& device, const Evaluator& evaluator) { + // Prefer blocks skewed toward inner dimension. + TensorBlockShapeType block_shape = kSkewedInnerDims; + Index block_total_size = 0; + + // Query expression tree for desired block size/shape. + std::vector resources; + evaluator.getResourceRequirements(&resources); + MergeResourceRequirements(resources, &block_shape, &block_total_size); + int num_threads = device.numThreads(); + + // Estimate minimum block size based on cost. + TensorOpCost cost = evaluator.costPerCoeff(Vectorizable); + double taskSize = TensorCostModel::taskSize(1, cost); + size_t block_size = static_cast(1.0 / taskSize); + + TensorBlockMapper block_mapper( + typename TensorBlockMapper::Dimensions(evaluator.dimensions()), + block_shape, block_size); + + block_size = block_mapper.block_dims_total_size(); + const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1); + const size_t aligned_blocksize = + align * + divup(block_size * sizeof(typename Evaluator::Scalar), align); + void* buf = device.allocate((num_threads + 1) * aligned_blocksize); + + return {block_mapper, cost * block_size, buf, aligned_blocksize}; +} + template struct EvalRange { static void run(Evaluator* evaluator_in, const StorageIndex firstIdx, @@ -274,7 +355,7 @@ class TensorExecutor { typedef EvalRange EvalRange; Evaluator evaluator(expr, device); - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { const StorageIndex size = array_prod(evaluator.dimensions()); device.parallelFor(size, evaluator.costPerCoeff(Vectorizable), @@ -290,18 +371,18 @@ class TensorExecutor { template class TensorExecutor { public: + typedef typename traits::Index StorageIndex; typedef typename traits::Scalar Scalar; typedef typename remove_const::type ScalarNoConst; - typedef TensorEvaluator Evaluator; - typedef typename traits::Index StorageIndex; - static const int NumDims = traits::NumDimensions; + typedef TensorEvaluator Evaluator; + typedef TensorBlockMapper BlockMapper; + typedef TensorExecutorTilingContext TilingContext; + static EIGEN_STRONG_INLINE void run(const Expression& expr, const ThreadPoolDevice& device) { - typedef TensorBlockMapper TensorBlockMapper; - Evaluator evaluator(expr, device); Index total_size = array_prod(evaluator.dimensions()); Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar); @@ -315,50 +396,152 @@ class TensorExecutor resources; - evaluator.getResourceRequirements(&resources); - MergeResourceRequirements(resources, &block_shape, &block_total_size); - int num_threads = device.numThreads(); + const TilingContext tiling = + internal::GetTensorExecutorTilingContext(device, evaluator); - // Estimate minimum block size based on cost. - TensorOpCost cost = evaluator.costPerCoeff(Vectorizable); - double taskSize = TensorCostModel::taskSize(1, cost); - size_t block_size = static_cast(1.0 / taskSize); - TensorBlockMapper block_mapper( - typename TensorBlockMapper::Dimensions(evaluator.dimensions()), - block_shape, block_size); - block_size = block_mapper.block_dims_total_size(); - const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1); - const size_t aligned_blocksize = - align * divup(block_size * sizeof(Scalar), align); - void* buf = device.allocate((num_threads + 1) * aligned_blocksize); device.parallelFor( - block_mapper.total_block_count(), cost * block_size, - [=, &device, &evaluator, &block_mapper](StorageIndex firstIdx, - StorageIndex lastIdx) { - // currentThreadId() returns -1 if called from a thread not in the - // thread pool, such as the main thread dispatching Eigen - // expressions. - const int thread_idx = device.currentThreadId(); - eigen_assert(thread_idx >= -1 && thread_idx < num_threads); - ScalarNoConst* thread_buf = reinterpret_cast( - static_cast(buf) + aligned_blocksize * (thread_idx + 1)); + tiling.block_mapper.total_block_count(), tiling.cost, + [=, &device, &evaluator, &tiling](StorageIndex firstIdx, + StorageIndex lastIdx) { + ScalarNoConst* thread_buf = + tiling.template GetCurrentThreadBuffer(device); for (StorageIndex i = firstIdx; i < lastIdx; ++i) { - auto block = block_mapper.GetBlockForIndex(i, thread_buf); + auto block = tiling.block_mapper.GetBlockForIndex(i, thread_buf); evaluator.evalBlock(&block); } }); - device.deallocate(buf); + device.deallocate(tiling.buffer); } evaluator.cleanup(); } }; +template +class TensorAsyncExecutor { + public: + typedef typename Expression::Index StorageIndex; + typedef TensorEvaluator Evaluator; + + static EIGEN_STRONG_INLINE void runAsync(const Expression& expr, + const ThreadPoolDevice& device, + std::function done) { + TensorAsyncExecutorContext* const ctx = + new TensorAsyncExecutorContext(expr, device, std::move(done)); + // TODO(ezhulenev): This is a potentially blocking operation. Make it async! + const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr); + + typedef EvalRange EvalRange; + + if (needs_assign) { + const StorageIndex size = array_prod(ctx->evaluator.dimensions()); + device.parallelForAsync( + size, ctx->evaluator.costPerCoeff(Vectorizable), + EvalRange::alignBlockSize, + [ctx](StorageIndex firstIdx, StorageIndex lastIdx) { + EvalRange::run(&ctx->evaluator, firstIdx, lastIdx); + }, + [ctx]() { delete ctx; }); + } + } + + private: + struct TensorAsyncExecutorContext { + TensorAsyncExecutorContext(const Expression& expr, + const ThreadPoolDevice& thread_pool, + std::function done) + : evaluator(expr, thread_pool), on_done(std::move(done)) {} + + ~TensorAsyncExecutorContext() { + on_done(); + evaluator.cleanup(); + } + + Evaluator evaluator; + + private: + std::function on_done; + }; +}; + +template +class TensorAsyncExecutor { + public: + typedef typename traits::Index StorageIndex; + typedef typename traits::Scalar Scalar; + typedef typename remove_const::type ScalarNoConst; + + static const int NumDims = traits::NumDimensions; + + typedef TensorEvaluator Evaluator; + typedef TensorBlockMapper BlockMapper; + typedef TensorExecutorTilingContext TilingContext; + + static EIGEN_STRONG_INLINE void runAsync(const Expression& expr, + const ThreadPoolDevice& device, + std::function done) { + TensorAsyncExecutorContext* const ctx = + new TensorAsyncExecutorContext(expr, device, std::move(done)); + + Index total_size = array_prod(ctx->evaluator.dimensions()); + Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar); + + if (total_size < cache_size && + !ExpressionHasTensorBroadcastingOp::value) { + internal::TensorAsyncExecutor::runAsync( + expr, device, [ctx]() { delete ctx; }); + return; + } + + // TODO(ezhulenev): This is a potentially blocking operation. Make it async! + const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr); + + if (needs_assign) { + ctx->tiling = + internal::GetTensorExecutorTilingContext(device, ctx->evaluator); + + device.parallelForAsync( + ctx->tiling.block_mapper.total_block_count(), ctx->tiling.cost, + [ctx](StorageIndex firstIdx, StorageIndex lastIdx) { + ScalarNoConst* thread_buf = + ctx->tiling.template GetCurrentThreadBuffer(ctx->device); + for (StorageIndex i = firstIdx; i < lastIdx; ++i) { + auto block = ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf); + ctx->evaluator.evalBlock(&block); + } + }, + [ctx]() { delete ctx; }); + } + } + + private: + struct TensorAsyncExecutorContext { + TensorAsyncExecutorContext(const Expression& expr, + const ThreadPoolDevice& thread_pool, + std::function done) + : device(thread_pool), + evaluator(expr, thread_pool), + on_done(std::move(done)) {} + + ~TensorAsyncExecutorContext() { + on_done(); + device.deallocate(tiling.buffer); + evaluator.cleanup(); + } + + const ThreadPoolDevice& device; + Evaluator evaluator; + TilingContext tiling; + + private: + std::function on_done; + }; +}; + #endif // EIGEN_USE_THREADS @@ -419,7 +602,7 @@ template EIGEN_STRONG_INLINE void TensorExecutor::run( const Expression& expr, const GpuDevice& device) { TensorEvaluator evaluator(expr, device); - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { const int block_size = device.maxGpuThreadsPerBlock(); @@ -517,10 +700,10 @@ struct ExecExprFunctorKernel template class TensorExecutor { public: - typedef typename Expression::Index Index; + typedef typename Expression::Index Index; static EIGEN_STRONG_INLINE void run(const Expression &expr, const Eigen::SyclDevice &dev) { Eigen::TensorEvaluator evaluator(expr, dev); - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { Index range, GRange, tileSize; Index total_size = ::Eigen::internal::array_prod(evaluator.dimensions()); -- cgit v1.2.3