aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
diff options
context:
space:
mode:
authorGravatar Eugene Zhulenev <ezhulenev@google.com>2019-09-13 12:14:44 -0700
committerGravatar Eugene Zhulenev <ezhulenev@google.com>2019-09-13 12:14:44 -0700
commit553caeb6a3bb545aef895f8fc9f219be44679017 (patch)
tree36550665f461e6e7f04ea45802b6394774e3ec65 /unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
parentfacdec5aa7d947d5462c9dbaefa7a50c4cabff3b (diff)
Use ThreadLocal container in TensorContractionThreadPool
Diffstat (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h')
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h246
1 files changed, 216 insertions, 30 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
index 4adfeb560..6b1a6ba85 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
@@ -379,7 +379,8 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
bool parallel_pack,
bool parallelize_by_sharding_dim_only,
DoneCallback done)
- : done_(this, std::move(done)),
+ : created_by_thread_id_(std::this_thread::get_id()),
+ done_(this, std::move(done)),
device_(self->m_device),
lhs_(self->m_leftImpl, self->m_left_nocontract_strides,
self->m_i_strides, self->m_left_contracting_strides,
@@ -408,7 +409,20 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
gn_(gn),
nm0_(nm0),
nn0_(nn0),
- kernel_(m_, k_, n_, bm_, bk_, bn_) {
+ kernel_(m_, k_, n_, bm_, bk_, bn_),
+ num_thread_local_allocations_(0),
+ // We reserve 2X more capacity for a thread local values, than the
+ // number of threads in the pool to efficiently handle task stealing
+ // by threads that are not managed by the pool.
+ thread_local_capacity(2 * (parallelize_by_sharding_dim_only_
+ ? device_.numThreadsInPool()
+ : 0)),
+ // We will use only one of the Lhs/Rhs thread local storage depending
+ // on the shard_by_col value and we parallelize by sharding dim ONLY.
+ lhs_thread_local_blocks_(shard_by_col_ ? 0 : thread_local_capacity,
+ {*this}, {*this}),
+ rhs_thread_local_blocks_(shard_by_col_ ? thread_local_capacity : 0,
+ {*this}, {*this}) {
// These two options are mutually exclusive.
eigen_assert(!(parallel_pack && parallelize_by_sharding_dim_only));
@@ -455,12 +469,12 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
std::memory_order_relaxed);
Index num_blocks = num_worker_threads * gn_;
- thread_local_packed_mem_ = kernel_.allocateSlices( //
- device_, //
- /*num_lhs=*/0, //
- /*num_rhs=*/num_blocks, //
- /*num_slices=*/1, //
- /*lhs_blocks=*/nullptr, &thread_local_packed_rhs_);
+ thread_local_pre_alocated_mem_ = kernel_.allocateSlices( //
+ device_, //
+ /*num_lhs=*/0, //
+ /*num_rhs=*/num_blocks, //
+ /*num_slices=*/1, //
+ /*lhs_blocks=*/nullptr, &rhs_thread_local_pre_allocated_);
} else {
can_use_thread_local_packed_ = new std::atomic<bool>[nm_];
@@ -469,11 +483,11 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
std::memory_order_relaxed);
Index num_blocks = num_worker_threads * gm_;
- thread_local_packed_mem_ = kernel_.allocateSlices( //
- device_, //
- /*num_lhs=*/num_blocks, //
- /*num_rhs=*/0, //
- /*num_slices=*/1, &thread_local_packed_lhs_, //
+ thread_local_pre_alocated_mem_ = kernel_.allocateSlices( //
+ device_, //
+ /*num_lhs=*/num_blocks, //
+ /*num_rhs=*/0, //
+ /*num_slices=*/1, &lhs_thread_local_pre_allocated_, //
/*rhs_blocks=*/nullptr);
}
}
@@ -486,7 +500,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
}
kernel_.deallocate(device_, packed_mem_);
if (parallelize_by_sharding_dim_only_) {
- kernel_.deallocate(device_, thread_local_packed_mem_);
+ kernel_.deallocate(device_, thread_local_pre_alocated_mem_);
delete[] can_use_thread_local_packed_;
}
}
@@ -512,6 +526,8 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
}
private:
+ std::thread::id created_by_thread_id_;
+
// This notification is specialized on the type of DoneCallback and can be
// blocking or non-blocking.
EvalParallelNotification<DoneCallback, EvalParallelContext> done_;
@@ -606,11 +622,185 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
// completion of the K-1 kernel, so we have to allocate "global" packed_lhs_
// and packed_rhs_ to allow kernels to be executed later on a thread
// different from the thread that was used for packing.
- BlockMemHandle thread_local_packed_mem_;
- // Only one of these will be initialized depending on shard_by_col value.
- std::vector<LhsBlock> thread_local_packed_lhs_;
- std::vector<RhsBlock> thread_local_packed_rhs_;
+ // Handle for pre-allocated thread local memory buffers.
+ BlockMemHandle thread_local_pre_alocated_mem_;
+
+ // Only one of these will be initialized depending on shard_by_col value
+ // (the size will be `num_worker_threads * num_grains_in_the_sharding_dim`).
+ std::vector<LhsBlock> lhs_thread_local_pre_allocated_;
+ std::vector<RhsBlock> rhs_thread_local_pre_allocated_;
+
+ // How many thread local blocks were already allocated.
+ std::atomic<int> num_thread_local_allocations_;
+ const int thread_local_capacity;
+
+ // We will use pre-allocated Lhs/Rhs blocks defined above, if the number of
+ // unique threads in a system is below or equal to the number of threads in
+ // a thread pool. We will fallback on dynamic memory allocation after that.
+
+ // ThreadLocalBlocks is a container for Lhs or Rhs thread local buffers. Its
+ // size is equal to the grain size in Lhs/Rhs sharding dimension.
+ template <typename BlockType>
+ class ThreadLocalBlocks {
+ public:
+ ThreadLocalBlocks() = default;
+
+ ThreadLocalBlocks(BlockType* base, size_t grain_size)
+ : is_pre_allocated_(true),
+ thread_local_pre_allocated_base_(base),
+ grain_size_(grain_size) {}
+
+ ThreadLocalBlocks(BlockMemHandle mem_handle,
+ std::vector<BlockType> blocks)
+ : is_pre_allocated_(false),
+ mem_handle_(std::move(mem_handle)),
+ blocks_(std::move(blocks)) {}
+
+ BlockType& block(int grain_index) {
+ eigen_assert(grain_index >= 0);
+ eigen_assert(static_cast<size_t>(grain_index) < size());
+ return is_pre_allocated_ ? thread_local_pre_allocated_base_[grain_index]
+ : blocks_[grain_index];
+ }
+
+ void Release(EvalParallelContext& ctx) const {
+ if (!is_pre_allocated_) {
+ ctx.kernel_.deallocate(ctx.device_, mem_handle_);
+ }
+ }
+
+ size_t size() const {
+ return is_pre_allocated_ ? grain_size_ : blocks_.size();
+ }
+
+ private:
+ bool is_pre_allocated_;
+
+ // Reuse pre-allocated thread local buffers.
+ BlockType* thread_local_pre_allocated_base_ = nullptr;
+ size_t grain_size_;
+
+ // These will be initialized only if `is_pre_allocated == false`.
+ BlockMemHandle mem_handle_;
+ std::vector<BlockType> blocks_;
+ };
+
+ // ThreadLocalBlocksInitialize callable does custom thread local blocks
+ // initialization, and will reuse pre-allocated buffers if possible, or will
+ // dynamically allocate new memory.
+ //
+ // Lhs/Rhs blocks might be of the same type, so we have to pass explicitly
+ // for what side do we plan to do block allocation.
+ template <typename BlockType, bool is_rhs>
+ class ThreadLocalBlocksInitialize {
+ static constexpr bool kIsLhs =
+ !is_rhs && std::is_same<BlockType, LhsBlock>::value;
+ static const bool kIsRhs =
+ is_rhs && std::is_same<BlockType, RhsBlock>::value;
+ static_assert(kIsLhs || kIsRhs, "Unkown block type");
+
+ using Blocks = ThreadLocalBlocks<BlockType>;
+
+ public:
+ ThreadLocalBlocksInitialize(EvalParallelContext& ctx)
+ : ctx_(ctx),
+ num_worker_threads_(ctx_.device_.numThreadsInPool()) {}
+
+ void operator()(Blocks& blocks) {
+ const int n = ctx_.num_thread_local_allocations_.fetch_add(
+ 1, std::memory_order_relaxed);
+
+ if (n >= num_worker_threads_) {
+ ThreadLocalBlocksAllocator<is_rhs>::allocate(ctx_, blocks);
+ } else {
+ ThreadLocalBlocksAllocator<is_rhs>::reuse(ctx_, n, blocks);
+ }
+ }
+
+ private:
+ // NOTE(ezhulenev): Without 'if constexpr' we have to put calls to
+ // TensorContractionKernel::allocateSlices into template specializations.
+ // Also explicit specializations are not allowed at class scope in C++03,
+ // EvalCtx type parameter is just a workaround for that limitation.
+ template <bool pack_rhs, typename EvalCtx = EvalParallelContext>
+ struct ThreadLocalBlocksAllocator;
+
+ template <typename EvalCtx>
+ struct ThreadLocalBlocksAllocator</*pack_rhs=*/true, EvalCtx> {
+ static void allocate(EvalCtx& ctx, Blocks& blocks) {
+ std::vector<RhsBlock> rhs_blocks;
+ BlockMemHandle mem_handle = ctx.kernel_.allocateSlices(
+ ctx.device_,
+ /*num_lhs=*/0,
+ /*num_rhs=*/ctx.gn_,
+ /*num_slices=*/1,
+ /*lhs_blocks=*/nullptr, /*rhs_blocks=*/&rhs_blocks);
+
+ blocks = ThreadLocalBlocks<RhsBlock>(std::move(mem_handle),
+ std::move(rhs_blocks));
+ }
+
+ static void reuse(EvalCtx& ctx, int index, Blocks& blocks) {
+ RhsBlock* ptr = &ctx.rhs_thread_local_pre_allocated_[ctx.gn_ * index];
+ blocks = ThreadLocalBlocks<RhsBlock>(ptr, ctx.gn_);
+ }
+ };
+
+ template <typename EvalCtx>
+ struct ThreadLocalBlocksAllocator</*pack_rhs=*/false, EvalCtx> {
+ static void allocate(EvalCtx& ctx, Blocks& blocks) {
+ std::vector<RhsBlock> lhs_blocks;
+ BlockMemHandle mem_handle = ctx.kernel_.allocateSlices(
+ ctx.device_,
+ /*num_lhs=*/ctx.gm_,
+ /*num_rhs=*/0,
+ /*num_slices=*/1,
+ /*lhs_blocks=*/&lhs_blocks, /*rhs_blocks=*/nullptr);
+
+ blocks = ThreadLocalBlocks<RhsBlock>(std::move(mem_handle),
+ std::move(lhs_blocks));
+ }
+
+ static void reuse(EvalCtx& ctx, int index, Blocks& blocks) {
+ LhsBlock* ptr = &ctx.lhs_thread_local_pre_allocated_[ctx.gm_ * index];
+ blocks = ThreadLocalBlocks<LhsBlock>(ptr, ctx.gm_);
+ }
+ };
+
+ EvalParallelContext& ctx_;
+ const int num_worker_threads_;
+ };
+
+ template <typename BlockType>
+ class ThreadLocalBlocksRelease {
+ public:
+ using Blocks = ThreadLocalBlocks<BlockType>;
+ ThreadLocalBlocksRelease(EvalParallelContext& ctx) : ctx_(ctx) {}
+ void operator()(Blocks& blocks) { blocks.Release(ctx_); }
+
+ private:
+ EvalParallelContext& ctx_;
+ };
+
+ // ThreadLocalBlocks initialization callables.
+ using ThreadLocalLhsInit =
+ ThreadLocalBlocksInitialize<LhsBlock, /*is_rhs=*/false>;
+ using ThreadLocalRhsInit =
+ ThreadLocalBlocksInitialize<RhsBlock, /*is_rhs=*/true>;
+
+ // ThreadLocalBlocks release callables.
+ using ThreadLocalLhsRelease = ThreadLocalBlocksRelease<LhsBlock>;
+ using ThreadLocalRhsRelease = ThreadLocalBlocksRelease<RhsBlock>;
+
+ // Thread local containers for Lhs/Rhs block packs. In practice only one of
+ // them will be used, depending on the shard_by_col value.
+ Eigen::ThreadLocal<ThreadLocalBlocks<LhsBlock>, ThreadLocalLhsInit,
+ ThreadLocalLhsRelease>
+ lhs_thread_local_blocks_;
+ Eigen::ThreadLocal<ThreadLocalBlocks<RhsBlock>, ThreadLocalRhsInit,
+ ThreadLocalRhsRelease>
+ rhs_thread_local_blocks_;
// After a particular shard for Kth slice missed thread local execution
// opportunity (K-1 slice didn't complete kernels execution), we can no
@@ -630,12 +820,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
LhsBlock& packed_lhs(Index m, Index k, Index m1, bool use_thread_local) {
if (use_thread_local) {
eigen_assert(!shard_by_col_);
+ ThreadLocalBlocks<LhsBlock>& blocks = lhs_thread_local_blocks_.local();
- Index base_idx = gm_ * device_.currentThreadId();
- Index grain_idx = m1 - m * gm_;
- Index block_idx = base_idx + grain_idx;
-
- return thread_local_packed_lhs_[block_idx];
+ Index grain_index = m1 - m * gm_;
+ return blocks.block(grain_index);
} else {
return packed_lhs_[k % (P - 1)][m1];
}
@@ -644,12 +832,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
RhsBlock& packed_rhs(Index n, Index k, Index n1, bool use_thread_local) {
if (use_thread_local) {
eigen_assert(shard_by_col_);
+ ThreadLocalBlocks<RhsBlock>& blocks = rhs_thread_local_blocks_.local();
- Index base_idx = gn_ * device_.currentThreadId();
- Index grain_idx = n1 - n * gn_;
- Index block_idx = base_idx + grain_idx;
-
- return thread_local_packed_rhs_[block_idx];
+ Index grain_index = n1 - n * gn_;
+ return blocks.block(grain_index);
} else {
return packed_rhs_[k % (P - 1)][n1];
}
@@ -877,11 +1063,11 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
// to the execution of the first kernel of the k+1 slice, before
// completing a call to the last kernel of the k slice.
// (2) all pack tasks for sharded dim must be executed in a thread
- // pool.
+ // pool to get pre-allocated thead local buffers.
bool pack_async =
(start == 0) &&
(parallelize_by_sharding_dim_only_&& shard_by_col_ == rhs) &&
- (k > 0 || device_.currentThreadId() < 0);
+ (k > 0 || std::this_thread::get_id() == created_by_thread_id_);
if (pack_async) {
device_.enqueueNoNotification(