diff options
author | Eugene Zhulenev <ezhulenev@google.com> | 2019-09-13 12:14:44 -0700 |
---|---|---|
committer | Eugene Zhulenev <ezhulenev@google.com> | 2019-09-13 12:14:44 -0700 |
commit | 553caeb6a3bb545aef895f8fc9f219be44679017 (patch) | |
tree | 36550665f461e6e7f04ea45802b6394774e3ec65 /unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h | |
parent | facdec5aa7d947d5462c9dbaefa7a50c4cabff3b (diff) |
Use ThreadLocal container in TensorContractionThreadPool
Diffstat (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h')
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h | 246 |
1 files changed, 216 insertions, 30 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h index 4adfeb560..6b1a6ba85 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h @@ -379,7 +379,8 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT bool parallel_pack, bool parallelize_by_sharding_dim_only, DoneCallback done) - : done_(this, std::move(done)), + : created_by_thread_id_(std::this_thread::get_id()), + done_(this, std::move(done)), device_(self->m_device), lhs_(self->m_leftImpl, self->m_left_nocontract_strides, self->m_i_strides, self->m_left_contracting_strides, @@ -408,7 +409,20 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT gn_(gn), nm0_(nm0), nn0_(nn0), - kernel_(m_, k_, n_, bm_, bk_, bn_) { + kernel_(m_, k_, n_, bm_, bk_, bn_), + num_thread_local_allocations_(0), + // We reserve 2X more capacity for a thread local values, than the + // number of threads in the pool to efficiently handle task stealing + // by threads that are not managed by the pool. + thread_local_capacity(2 * (parallelize_by_sharding_dim_only_ + ? device_.numThreadsInPool() + : 0)), + // We will use only one of the Lhs/Rhs thread local storage depending + // on the shard_by_col value and we parallelize by sharding dim ONLY. + lhs_thread_local_blocks_(shard_by_col_ ? 0 : thread_local_capacity, + {*this}, {*this}), + rhs_thread_local_blocks_(shard_by_col_ ? thread_local_capacity : 0, + {*this}, {*this}) { // These two options are mutually exclusive. eigen_assert(!(parallel_pack && parallelize_by_sharding_dim_only)); @@ -455,12 +469,12 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT std::memory_order_relaxed); Index num_blocks = num_worker_threads * gn_; - thread_local_packed_mem_ = kernel_.allocateSlices( // - device_, // - /*num_lhs=*/0, // - /*num_rhs=*/num_blocks, // - /*num_slices=*/1, // - /*lhs_blocks=*/nullptr, &thread_local_packed_rhs_); + thread_local_pre_alocated_mem_ = kernel_.allocateSlices( // + device_, // + /*num_lhs=*/0, // + /*num_rhs=*/num_blocks, // + /*num_slices=*/1, // + /*lhs_blocks=*/nullptr, &rhs_thread_local_pre_allocated_); } else { can_use_thread_local_packed_ = new std::atomic<bool>[nm_]; @@ -469,11 +483,11 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT std::memory_order_relaxed); Index num_blocks = num_worker_threads * gm_; - thread_local_packed_mem_ = kernel_.allocateSlices( // - device_, // - /*num_lhs=*/num_blocks, // - /*num_rhs=*/0, // - /*num_slices=*/1, &thread_local_packed_lhs_, // + thread_local_pre_alocated_mem_ = kernel_.allocateSlices( // + device_, // + /*num_lhs=*/num_blocks, // + /*num_rhs=*/0, // + /*num_slices=*/1, &lhs_thread_local_pre_allocated_, // /*rhs_blocks=*/nullptr); } } @@ -486,7 +500,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT } kernel_.deallocate(device_, packed_mem_); if (parallelize_by_sharding_dim_only_) { - kernel_.deallocate(device_, thread_local_packed_mem_); + kernel_.deallocate(device_, thread_local_pre_alocated_mem_); delete[] can_use_thread_local_packed_; } } @@ -512,6 +526,8 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT } private: + std::thread::id created_by_thread_id_; + // This notification is specialized on the type of DoneCallback and can be // blocking or non-blocking. EvalParallelNotification<DoneCallback, EvalParallelContext> done_; @@ -606,11 +622,185 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT // completion of the K-1 kernel, so we have to allocate "global" packed_lhs_ // and packed_rhs_ to allow kernels to be executed later on a thread // different from the thread that was used for packing. - BlockMemHandle thread_local_packed_mem_; - // Only one of these will be initialized depending on shard_by_col value. - std::vector<LhsBlock> thread_local_packed_lhs_; - std::vector<RhsBlock> thread_local_packed_rhs_; + // Handle for pre-allocated thread local memory buffers. + BlockMemHandle thread_local_pre_alocated_mem_; + + // Only one of these will be initialized depending on shard_by_col value + // (the size will be `num_worker_threads * num_grains_in_the_sharding_dim`). + std::vector<LhsBlock> lhs_thread_local_pre_allocated_; + std::vector<RhsBlock> rhs_thread_local_pre_allocated_; + + // How many thread local blocks were already allocated. + std::atomic<int> num_thread_local_allocations_; + const int thread_local_capacity; + + // We will use pre-allocated Lhs/Rhs blocks defined above, if the number of + // unique threads in a system is below or equal to the number of threads in + // a thread pool. We will fallback on dynamic memory allocation after that. + + // ThreadLocalBlocks is a container for Lhs or Rhs thread local buffers. Its + // size is equal to the grain size in Lhs/Rhs sharding dimension. + template <typename BlockType> + class ThreadLocalBlocks { + public: + ThreadLocalBlocks() = default; + + ThreadLocalBlocks(BlockType* base, size_t grain_size) + : is_pre_allocated_(true), + thread_local_pre_allocated_base_(base), + grain_size_(grain_size) {} + + ThreadLocalBlocks(BlockMemHandle mem_handle, + std::vector<BlockType> blocks) + : is_pre_allocated_(false), + mem_handle_(std::move(mem_handle)), + blocks_(std::move(blocks)) {} + + BlockType& block(int grain_index) { + eigen_assert(grain_index >= 0); + eigen_assert(static_cast<size_t>(grain_index) < size()); + return is_pre_allocated_ ? thread_local_pre_allocated_base_[grain_index] + : blocks_[grain_index]; + } + + void Release(EvalParallelContext& ctx) const { + if (!is_pre_allocated_) { + ctx.kernel_.deallocate(ctx.device_, mem_handle_); + } + } + + size_t size() const { + return is_pre_allocated_ ? grain_size_ : blocks_.size(); + } + + private: + bool is_pre_allocated_; + + // Reuse pre-allocated thread local buffers. + BlockType* thread_local_pre_allocated_base_ = nullptr; + size_t grain_size_; + + // These will be initialized only if `is_pre_allocated == false`. + BlockMemHandle mem_handle_; + std::vector<BlockType> blocks_; + }; + + // ThreadLocalBlocksInitialize callable does custom thread local blocks + // initialization, and will reuse pre-allocated buffers if possible, or will + // dynamically allocate new memory. + // + // Lhs/Rhs blocks might be of the same type, so we have to pass explicitly + // for what side do we plan to do block allocation. + template <typename BlockType, bool is_rhs> + class ThreadLocalBlocksInitialize { + static constexpr bool kIsLhs = + !is_rhs && std::is_same<BlockType, LhsBlock>::value; + static const bool kIsRhs = + is_rhs && std::is_same<BlockType, RhsBlock>::value; + static_assert(kIsLhs || kIsRhs, "Unkown block type"); + + using Blocks = ThreadLocalBlocks<BlockType>; + + public: + ThreadLocalBlocksInitialize(EvalParallelContext& ctx) + : ctx_(ctx), + num_worker_threads_(ctx_.device_.numThreadsInPool()) {} + + void operator()(Blocks& blocks) { + const int n = ctx_.num_thread_local_allocations_.fetch_add( + 1, std::memory_order_relaxed); + + if (n >= num_worker_threads_) { + ThreadLocalBlocksAllocator<is_rhs>::allocate(ctx_, blocks); + } else { + ThreadLocalBlocksAllocator<is_rhs>::reuse(ctx_, n, blocks); + } + } + + private: + // NOTE(ezhulenev): Without 'if constexpr' we have to put calls to + // TensorContractionKernel::allocateSlices into template specializations. + // Also explicit specializations are not allowed at class scope in C++03, + // EvalCtx type parameter is just a workaround for that limitation. + template <bool pack_rhs, typename EvalCtx = EvalParallelContext> + struct ThreadLocalBlocksAllocator; + + template <typename EvalCtx> + struct ThreadLocalBlocksAllocator</*pack_rhs=*/true, EvalCtx> { + static void allocate(EvalCtx& ctx, Blocks& blocks) { + std::vector<RhsBlock> rhs_blocks; + BlockMemHandle mem_handle = ctx.kernel_.allocateSlices( + ctx.device_, + /*num_lhs=*/0, + /*num_rhs=*/ctx.gn_, + /*num_slices=*/1, + /*lhs_blocks=*/nullptr, /*rhs_blocks=*/&rhs_blocks); + + blocks = ThreadLocalBlocks<RhsBlock>(std::move(mem_handle), + std::move(rhs_blocks)); + } + + static void reuse(EvalCtx& ctx, int index, Blocks& blocks) { + RhsBlock* ptr = &ctx.rhs_thread_local_pre_allocated_[ctx.gn_ * index]; + blocks = ThreadLocalBlocks<RhsBlock>(ptr, ctx.gn_); + } + }; + + template <typename EvalCtx> + struct ThreadLocalBlocksAllocator</*pack_rhs=*/false, EvalCtx> { + static void allocate(EvalCtx& ctx, Blocks& blocks) { + std::vector<RhsBlock> lhs_blocks; + BlockMemHandle mem_handle = ctx.kernel_.allocateSlices( + ctx.device_, + /*num_lhs=*/ctx.gm_, + /*num_rhs=*/0, + /*num_slices=*/1, + /*lhs_blocks=*/&lhs_blocks, /*rhs_blocks=*/nullptr); + + blocks = ThreadLocalBlocks<RhsBlock>(std::move(mem_handle), + std::move(lhs_blocks)); + } + + static void reuse(EvalCtx& ctx, int index, Blocks& blocks) { + LhsBlock* ptr = &ctx.lhs_thread_local_pre_allocated_[ctx.gm_ * index]; + blocks = ThreadLocalBlocks<LhsBlock>(ptr, ctx.gm_); + } + }; + + EvalParallelContext& ctx_; + const int num_worker_threads_; + }; + + template <typename BlockType> + class ThreadLocalBlocksRelease { + public: + using Blocks = ThreadLocalBlocks<BlockType>; + ThreadLocalBlocksRelease(EvalParallelContext& ctx) : ctx_(ctx) {} + void operator()(Blocks& blocks) { blocks.Release(ctx_); } + + private: + EvalParallelContext& ctx_; + }; + + // ThreadLocalBlocks initialization callables. + using ThreadLocalLhsInit = + ThreadLocalBlocksInitialize<LhsBlock, /*is_rhs=*/false>; + using ThreadLocalRhsInit = + ThreadLocalBlocksInitialize<RhsBlock, /*is_rhs=*/true>; + + // ThreadLocalBlocks release callables. + using ThreadLocalLhsRelease = ThreadLocalBlocksRelease<LhsBlock>; + using ThreadLocalRhsRelease = ThreadLocalBlocksRelease<RhsBlock>; + + // Thread local containers for Lhs/Rhs block packs. In practice only one of + // them will be used, depending on the shard_by_col value. + Eigen::ThreadLocal<ThreadLocalBlocks<LhsBlock>, ThreadLocalLhsInit, + ThreadLocalLhsRelease> + lhs_thread_local_blocks_; + Eigen::ThreadLocal<ThreadLocalBlocks<RhsBlock>, ThreadLocalRhsInit, + ThreadLocalRhsRelease> + rhs_thread_local_blocks_; // After a particular shard for Kth slice missed thread local execution // opportunity (K-1 slice didn't complete kernels execution), we can no @@ -630,12 +820,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT LhsBlock& packed_lhs(Index m, Index k, Index m1, bool use_thread_local) { if (use_thread_local) { eigen_assert(!shard_by_col_); + ThreadLocalBlocks<LhsBlock>& blocks = lhs_thread_local_blocks_.local(); - Index base_idx = gm_ * device_.currentThreadId(); - Index grain_idx = m1 - m * gm_; - Index block_idx = base_idx + grain_idx; - - return thread_local_packed_lhs_[block_idx]; + Index grain_index = m1 - m * gm_; + return blocks.block(grain_index); } else { return packed_lhs_[k % (P - 1)][m1]; } @@ -644,12 +832,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT RhsBlock& packed_rhs(Index n, Index k, Index n1, bool use_thread_local) { if (use_thread_local) { eigen_assert(shard_by_col_); + ThreadLocalBlocks<RhsBlock>& blocks = rhs_thread_local_blocks_.local(); - Index base_idx = gn_ * device_.currentThreadId(); - Index grain_idx = n1 - n * gn_; - Index block_idx = base_idx + grain_idx; - - return thread_local_packed_rhs_[block_idx]; + Index grain_index = n1 - n * gn_; + return blocks.block(grain_index); } else { return packed_rhs_[k % (P - 1)][n1]; } @@ -877,11 +1063,11 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT // to the execution of the first kernel of the k+1 slice, before // completing a call to the last kernel of the k slice. // (2) all pack tasks for sharded dim must be executed in a thread - // pool. + // pool to get pre-allocated thead local buffers. bool pack_async = (start == 0) && (parallelize_by_sharding_dim_only_&& shard_by_col_ == rhs) && - (k > 0 || device_.currentThreadId() < 0); + (k > 0 || std::this_thread::get_id() == created_by_thread_id_); if (pack_async) { device_.enqueueNoNotification( |