aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
diff options
context:
space:
mode:
authorGravatar Eugene Zhulenev <ezhulenev@google.com>2019-08-30 14:49:40 -0700
committerGravatar Eugene Zhulenev <ezhulenev@google.com>2019-08-30 14:49:40 -0700
commit66665e7e76d2ad5aa37775b3777e9a53c6d1c18c (patch)
treecb62a23e970d9125475abd95e4c9e68a02a04461 /unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
parentf6c51d9209ccc04d28c39f4c8059e7d3e74d6e07 (diff)
Asynchronous expression evaluation with TensorAsyncDevice
Diffstat (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h')
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h271
1 files changed, 227 insertions, 44 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
index fddb90d77..18d9de9e6 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
@@ -84,7 +84,7 @@ class TensorExecutor {
EIGEN_DEVICE_FUNC
static EIGEN_STRONG_INLINE void run(const Expression& expr,
- const Device& device = Device()) {
+ const Device& device = Device()) {
TensorEvaluator<Expression, Device> evaluator(expr, device);
const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
if (needs_assign) {
@@ -98,6 +98,14 @@ class TensorExecutor {
};
/**
+ * Default async execution strategy is not implemented. Currently it's only
+ * available for ThreadPoolDevice (see definition below).
+ */
+template <typename Expression, typename Device, bool Vectorizable,
+ bool Tileable>
+class TensorAsyncExecutor {};
+
+/**
* Process all the data with a single cpu thread, using vectorized instructions.
*/
template <typename Expression>
@@ -107,8 +115,8 @@ class TensorExecutor<Expression, DefaultDevice, /*Vectorizable*/ true,
typedef typename Expression::Index StorageIndex;
EIGEN_DEVICE_FUNC
- static EIGEN_STRONG_INLINE void run(const Expression& expr,
- const DefaultDevice& device = DefaultDevice()) {
+ static EIGEN_STRONG_INLINE void run(
+ const Expression& expr, const DefaultDevice& device = DefaultDevice()) {
TensorEvaluator<Expression, DefaultDevice> evaluator(expr, device);
const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
if (needs_assign) {
@@ -206,8 +214,81 @@ class TensorExecutor<Expression, DefaultDevice, Vectorizable,
/**
* Multicore strategy: the index space is partitioned and each partition is
* executed on a single core.
+ *
+ * (1) TensorExecutor will submit work to the ThreadPoolDevice managed thread
+ * pool, and will block the caller thread until all tasks are finished.
+ *
+ * (2) TensorAsyncExecutor is a non-blocking version, that will submit work to
+ * the ThreadPoolDevice managed thread pool, and will return immediately.
+ * It will call 'done' callback after all tasks are finished.
*/
#ifdef EIGEN_USE_THREADS
+
+template <typename TensorBlockMapper>
+struct TensorExecutorTilingContext {
+ typedef typename TensorBlockMapper::Block TensorBlock;
+
+ TensorExecutorTilingContext() : buffer(nullptr) {}
+ TensorExecutorTilingContext(const TensorBlockMapper& b_mapper,
+ const TensorOpCost& b_cost, void* b_buffer,
+ size_t b_aligned_size)
+ : block_mapper(b_mapper),
+ cost(b_cost),
+ buffer(b_buffer),
+ aligned_blocksize(b_aligned_size) {}
+
+ template <typename Scalar>
+ Scalar* GetCurrentThreadBuffer(const ThreadPoolDevice& device) const {
+ // ThreadPoolDevice::currentThreadId() returns -1 if called from a thread
+ // not in the thread pool, such as the main thread dispatching Eigen
+ // expressions.
+ const int thread_idx = device.currentThreadId();
+ eigen_assert(thread_idx >= -1 && thread_idx < device.numThreads());
+
+ const Index offset = aligned_blocksize * (thread_idx + 1);
+ return reinterpret_cast<Scalar*>(static_cast<char*>(buffer) + offset);
+ }
+
+ TensorBlockMapper block_mapper; // navigate through blocks
+ TensorOpCost cost; // cost of computing a single block
+ void* buffer; // temporary buffer for blocks
+ size_t aligned_blocksize; // block size after memory alignment
+};
+
+// Computes a block evaluation parameters, and allocates temporary memory buffer
+// for blocks. See TensorExecutor/TensorAsyncExecutor (Tileable=true) below.
+template <typename Evaluator, typename TensorBlockMapper, bool Vectorizable>
+TensorExecutorTilingContext<TensorBlockMapper> GetTensorExecutorTilingContext(
+ const ThreadPoolDevice& device, const Evaluator& evaluator) {
+ // Prefer blocks skewed toward inner dimension.
+ TensorBlockShapeType block_shape = kSkewedInnerDims;
+ Index block_total_size = 0;
+
+ // Query expression tree for desired block size/shape.
+ std::vector<TensorOpResourceRequirements> resources;
+ evaluator.getResourceRequirements(&resources);
+ MergeResourceRequirements(resources, &block_shape, &block_total_size);
+ int num_threads = device.numThreads();
+
+ // Estimate minimum block size based on cost.
+ TensorOpCost cost = evaluator.costPerCoeff(Vectorizable);
+ double taskSize = TensorCostModel<ThreadPoolDevice>::taskSize(1, cost);
+ size_t block_size = static_cast<size_t>(1.0 / taskSize);
+
+ TensorBlockMapper block_mapper(
+ typename TensorBlockMapper::Dimensions(evaluator.dimensions()),
+ block_shape, block_size);
+
+ block_size = block_mapper.block_dims_total_size();
+ const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1);
+ const size_t aligned_blocksize =
+ align *
+ divup<size_t>(block_size * sizeof(typename Evaluator::Scalar), align);
+ void* buf = device.allocate((num_threads + 1) * aligned_blocksize);
+
+ return {block_mapper, cost * block_size, buf, aligned_blocksize};
+}
+
template <typename Evaluator, typename StorageIndex, bool Vectorizable>
struct EvalRange {
static void run(Evaluator* evaluator_in, const StorageIndex firstIdx,
@@ -274,7 +355,7 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> {
typedef EvalRange<Evaluator, StorageIndex, Vectorizable> EvalRange;
Evaluator evaluator(expr, device);
- const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
+ const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr);
if (needs_assign) {
const StorageIndex size = array_prod(evaluator.dimensions());
device.parallelFor(size, evaluator.costPerCoeff(Vectorizable),
@@ -290,18 +371,18 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> {
template <typename Expression, bool Vectorizable>
class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ true> {
public:
+ typedef typename traits<Expression>::Index StorageIndex;
typedef typename traits<Expression>::Scalar Scalar;
typedef typename remove_const<Scalar>::type ScalarNoConst;
- typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
- typedef typename traits<Expression>::Index StorageIndex;
-
static const int NumDims = traits<Expression>::NumDimensions;
+ typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
+ typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> BlockMapper;
+ typedef TensorExecutorTilingContext<BlockMapper> TilingContext;
+
static EIGEN_STRONG_INLINE void run(const Expression& expr,
const ThreadPoolDevice& device) {
- typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> TensorBlockMapper;
-
Evaluator evaluator(expr, device);
Index total_size = array_prod(evaluator.dimensions());
Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar);
@@ -315,50 +396,152 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ tr
return;
}
- const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
+ const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr);
if (needs_assign) {
- TensorBlockShapeType block_shape = kSkewedInnerDims;
- Index block_total_size = 0;
- // Query expression tree for desired block size/shape.
- std::vector<internal::TensorOpResourceRequirements> resources;
- evaluator.getResourceRequirements(&resources);
- MergeResourceRequirements(resources, &block_shape, &block_total_size);
- int num_threads = device.numThreads();
+ const TilingContext tiling =
+ internal::GetTensorExecutorTilingContext<Evaluator, BlockMapper,
+ Vectorizable>(device, evaluator);
- // Estimate minimum block size based on cost.
- TensorOpCost cost = evaluator.costPerCoeff(Vectorizable);
- double taskSize = TensorCostModel<ThreadPoolDevice>::taskSize(1, cost);
- size_t block_size = static_cast<size_t>(1.0 / taskSize);
- TensorBlockMapper block_mapper(
- typename TensorBlockMapper::Dimensions(evaluator.dimensions()),
- block_shape, block_size);
- block_size = block_mapper.block_dims_total_size();
- const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1);
- const size_t aligned_blocksize =
- align * divup<size_t>(block_size * sizeof(Scalar), align);
- void* buf = device.allocate((num_threads + 1) * aligned_blocksize);
device.parallelFor(
- block_mapper.total_block_count(), cost * block_size,
- [=, &device, &evaluator, &block_mapper](StorageIndex firstIdx,
- StorageIndex lastIdx) {
- // currentThreadId() returns -1 if called from a thread not in the
- // thread pool, such as the main thread dispatching Eigen
- // expressions.
- const int thread_idx = device.currentThreadId();
- eigen_assert(thread_idx >= -1 && thread_idx < num_threads);
- ScalarNoConst* thread_buf = reinterpret_cast<ScalarNoConst*>(
- static_cast<char*>(buf) + aligned_blocksize * (thread_idx + 1));
+ tiling.block_mapper.total_block_count(), tiling.cost,
+ [=, &device, &evaluator, &tiling](StorageIndex firstIdx,
+ StorageIndex lastIdx) {
+ ScalarNoConst* thread_buf =
+ tiling.template GetCurrentThreadBuffer<ScalarNoConst>(device);
for (StorageIndex i = firstIdx; i < lastIdx; ++i) {
- auto block = block_mapper.GetBlockForIndex(i, thread_buf);
+ auto block = tiling.block_mapper.GetBlockForIndex(i, thread_buf);
evaluator.evalBlock(&block);
}
});
- device.deallocate(buf);
+ device.deallocate(tiling.buffer);
}
evaluator.cleanup();
}
};
+template <typename Expression, bool Vectorizable, bool Tileable>
+class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> {
+ public:
+ typedef typename Expression::Index StorageIndex;
+ typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
+
+ static EIGEN_STRONG_INLINE void runAsync(const Expression& expr,
+ const ThreadPoolDevice& device,
+ std::function<void()> done) {
+ TensorAsyncExecutorContext* const ctx =
+ new TensorAsyncExecutorContext(expr, device, std::move(done));
+ // TODO(ezhulenev): This is a potentially blocking operation. Make it async!
+ const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr);
+
+ typedef EvalRange<Evaluator, StorageIndex, Vectorizable> EvalRange;
+
+ if (needs_assign) {
+ const StorageIndex size = array_prod(ctx->evaluator.dimensions());
+ device.parallelForAsync(
+ size, ctx->evaluator.costPerCoeff(Vectorizable),
+ EvalRange::alignBlockSize,
+ [ctx](StorageIndex firstIdx, StorageIndex lastIdx) {
+ EvalRange::run(&ctx->evaluator, firstIdx, lastIdx);
+ },
+ [ctx]() { delete ctx; });
+ }
+ }
+
+ private:
+ struct TensorAsyncExecutorContext {
+ TensorAsyncExecutorContext(const Expression& expr,
+ const ThreadPoolDevice& thread_pool,
+ std::function<void()> done)
+ : evaluator(expr, thread_pool), on_done(std::move(done)) {}
+
+ ~TensorAsyncExecutorContext() {
+ on_done();
+ evaluator.cleanup();
+ }
+
+ Evaluator evaluator;
+
+ private:
+ std::function<void()> on_done;
+ };
+};
+
+template <typename Expression, bool Vectorizable>
+class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ true> {
+ public:
+ typedef typename traits<Expression>::Index StorageIndex;
+ typedef typename traits<Expression>::Scalar Scalar;
+ typedef typename remove_const<Scalar>::type ScalarNoConst;
+
+ static const int NumDims = traits<Expression>::NumDimensions;
+
+ typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
+ typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> BlockMapper;
+ typedef TensorExecutorTilingContext<BlockMapper> TilingContext;
+
+ static EIGEN_STRONG_INLINE void runAsync(const Expression& expr,
+ const ThreadPoolDevice& device,
+ std::function<void()> done) {
+ TensorAsyncExecutorContext* const ctx =
+ new TensorAsyncExecutorContext(expr, device, std::move(done));
+
+ Index total_size = array_prod(ctx->evaluator.dimensions());
+ Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar);
+
+ if (total_size < cache_size &&
+ !ExpressionHasTensorBroadcastingOp<Expression>::value) {
+ internal::TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable,
+ /*Tileable*/ false>::runAsync(
+ expr, device, [ctx]() { delete ctx; });
+ return;
+ }
+
+ // TODO(ezhulenev): This is a potentially blocking operation. Make it async!
+ const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr);
+
+ if (needs_assign) {
+ ctx->tiling =
+ internal::GetTensorExecutorTilingContext<Evaluator, BlockMapper,
+ Vectorizable>(device, ctx->evaluator);
+
+ device.parallelForAsync(
+ ctx->tiling.block_mapper.total_block_count(), ctx->tiling.cost,
+ [ctx](StorageIndex firstIdx, StorageIndex lastIdx) {
+ ScalarNoConst* thread_buf =
+ ctx->tiling.template GetCurrentThreadBuffer<ScalarNoConst>(ctx->device);
+ for (StorageIndex i = firstIdx; i < lastIdx; ++i) {
+ auto block = ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf);
+ ctx->evaluator.evalBlock(&block);
+ }
+ },
+ [ctx]() { delete ctx; });
+ }
+ }
+
+ private:
+ struct TensorAsyncExecutorContext {
+ TensorAsyncExecutorContext(const Expression& expr,
+ const ThreadPoolDevice& thread_pool,
+ std::function<void()> done)
+ : device(thread_pool),
+ evaluator(expr, thread_pool),
+ on_done(std::move(done)) {}
+
+ ~TensorAsyncExecutorContext() {
+ on_done();
+ device.deallocate(tiling.buffer);
+ evaluator.cleanup();
+ }
+
+ const ThreadPoolDevice& device;
+ Evaluator evaluator;
+ TilingContext tiling;
+
+ private:
+ std::function<void()> on_done;
+ };
+};
+
#endif // EIGEN_USE_THREADS
@@ -419,7 +602,7 @@ template <typename Expression, bool Vectorizable, bool Tileable>
EIGEN_STRONG_INLINE void TensorExecutor<Expression, GpuDevice, Vectorizable, Tileable>::run(
const Expression& expr, const GpuDevice& device) {
TensorEvaluator<Expression, GpuDevice> evaluator(expr, device);
- const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
+ const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr);
if (needs_assign) {
const int block_size = device.maxGpuThreadsPerBlock();
@@ -517,10 +700,10 @@ struct ExecExprFunctorKernel<Expr, false, Evaluator>
template <typename Expression, bool Vectorizable, bool Tileable>
class TensorExecutor<Expression, Eigen::SyclDevice, Vectorizable, Tileable> {
public:
- typedef typename Expression::Index Index;
+ typedef typename Expression::Index Index;
static EIGEN_STRONG_INLINE void run(const Expression &expr, const Eigen::SyclDevice &dev) {
Eigen::TensorEvaluator<Expression, Eigen::SyclDevice> evaluator(expr, dev);
- const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
+ const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr);
if (needs_assign) {
Index range, GRange, tileSize;
Index total_size = ::Eigen::internal::array_prod(evaluator.dimensions());