From 553caeb6a3bb545aef895f8fc9f219be44679017 Mon Sep 17 00:00:00 2001 From: Eugene Zhulenev Date: Fri, 13 Sep 2019 12:14:44 -0700 Subject: Use ThreadLocal container in TensorContractionThreadPool --- .../CXX11/src/Tensor/TensorContractionThreadPool.h | 246 ++++++++++++++++++--- 1 file changed, 216 insertions(+), 30 deletions(-) (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h') diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h index 4adfeb560..6b1a6ba85 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h @@ -379,7 +379,8 @@ struct TensorEvaluatorm_device), lhs_(self->m_leftImpl, self->m_left_nocontract_strides, self->m_i_strides, self->m_left_contracting_strides, @@ -408,7 +409,20 @@ struct TensorEvaluator[nm_]; @@ -469,11 +483,11 @@ struct TensorEvaluator done_; @@ -606,11 +622,185 @@ struct TensorEvaluator thread_local_packed_lhs_; - std::vector thread_local_packed_rhs_; + // Handle for pre-allocated thread local memory buffers. + BlockMemHandle thread_local_pre_alocated_mem_; + + // Only one of these will be initialized depending on shard_by_col value + // (the size will be `num_worker_threads * num_grains_in_the_sharding_dim`). + std::vector lhs_thread_local_pre_allocated_; + std::vector rhs_thread_local_pre_allocated_; + + // How many thread local blocks were already allocated. + std::atomic num_thread_local_allocations_; + const int thread_local_capacity; + + // We will use pre-allocated Lhs/Rhs blocks defined above, if the number of + // unique threads in a system is below or equal to the number of threads in + // a thread pool. We will fallback on dynamic memory allocation after that. + + // ThreadLocalBlocks is a container for Lhs or Rhs thread local buffers. Its + // size is equal to the grain size in Lhs/Rhs sharding dimension. + template + class ThreadLocalBlocks { + public: + ThreadLocalBlocks() = default; + + ThreadLocalBlocks(BlockType* base, size_t grain_size) + : is_pre_allocated_(true), + thread_local_pre_allocated_base_(base), + grain_size_(grain_size) {} + + ThreadLocalBlocks(BlockMemHandle mem_handle, + std::vector blocks) + : is_pre_allocated_(false), + mem_handle_(std::move(mem_handle)), + blocks_(std::move(blocks)) {} + + BlockType& block(int grain_index) { + eigen_assert(grain_index >= 0); + eigen_assert(static_cast(grain_index) < size()); + return is_pre_allocated_ ? thread_local_pre_allocated_base_[grain_index] + : blocks_[grain_index]; + } + + void Release(EvalParallelContext& ctx) const { + if (!is_pre_allocated_) { + ctx.kernel_.deallocate(ctx.device_, mem_handle_); + } + } + + size_t size() const { + return is_pre_allocated_ ? grain_size_ : blocks_.size(); + } + + private: + bool is_pre_allocated_; + + // Reuse pre-allocated thread local buffers. + BlockType* thread_local_pre_allocated_base_ = nullptr; + size_t grain_size_; + + // These will be initialized only if `is_pre_allocated == false`. + BlockMemHandle mem_handle_; + std::vector blocks_; + }; + + // ThreadLocalBlocksInitialize callable does custom thread local blocks + // initialization, and will reuse pre-allocated buffers if possible, or will + // dynamically allocate new memory. + // + // Lhs/Rhs blocks might be of the same type, so we have to pass explicitly + // for what side do we plan to do block allocation. + template + class ThreadLocalBlocksInitialize { + static constexpr bool kIsLhs = + !is_rhs && std::is_same::value; + static const bool kIsRhs = + is_rhs && std::is_same::value; + static_assert(kIsLhs || kIsRhs, "Unkown block type"); + + using Blocks = ThreadLocalBlocks; + + public: + ThreadLocalBlocksInitialize(EvalParallelContext& ctx) + : ctx_(ctx), + num_worker_threads_(ctx_.device_.numThreadsInPool()) {} + + void operator()(Blocks& blocks) { + const int n = ctx_.num_thread_local_allocations_.fetch_add( + 1, std::memory_order_relaxed); + + if (n >= num_worker_threads_) { + ThreadLocalBlocksAllocator::allocate(ctx_, blocks); + } else { + ThreadLocalBlocksAllocator::reuse(ctx_, n, blocks); + } + } + + private: + // NOTE(ezhulenev): Without 'if constexpr' we have to put calls to + // TensorContractionKernel::allocateSlices into template specializations. + // Also explicit specializations are not allowed at class scope in C++03, + // EvalCtx type parameter is just a workaround for that limitation. + template + struct ThreadLocalBlocksAllocator; + + template + struct ThreadLocalBlocksAllocator { + static void allocate(EvalCtx& ctx, Blocks& blocks) { + std::vector rhs_blocks; + BlockMemHandle mem_handle = ctx.kernel_.allocateSlices( + ctx.device_, + /*num_lhs=*/0, + /*num_rhs=*/ctx.gn_, + /*num_slices=*/1, + /*lhs_blocks=*/nullptr, /*rhs_blocks=*/&rhs_blocks); + + blocks = ThreadLocalBlocks(std::move(mem_handle), + std::move(rhs_blocks)); + } + + static void reuse(EvalCtx& ctx, int index, Blocks& blocks) { + RhsBlock* ptr = &ctx.rhs_thread_local_pre_allocated_[ctx.gn_ * index]; + blocks = ThreadLocalBlocks(ptr, ctx.gn_); + } + }; + + template + struct ThreadLocalBlocksAllocator { + static void allocate(EvalCtx& ctx, Blocks& blocks) { + std::vector lhs_blocks; + BlockMemHandle mem_handle = ctx.kernel_.allocateSlices( + ctx.device_, + /*num_lhs=*/ctx.gm_, + /*num_rhs=*/0, + /*num_slices=*/1, + /*lhs_blocks=*/&lhs_blocks, /*rhs_blocks=*/nullptr); + + blocks = ThreadLocalBlocks(std::move(mem_handle), + std::move(lhs_blocks)); + } + + static void reuse(EvalCtx& ctx, int index, Blocks& blocks) { + LhsBlock* ptr = &ctx.lhs_thread_local_pre_allocated_[ctx.gm_ * index]; + blocks = ThreadLocalBlocks(ptr, ctx.gm_); + } + }; + + EvalParallelContext& ctx_; + const int num_worker_threads_; + }; + + template + class ThreadLocalBlocksRelease { + public: + using Blocks = ThreadLocalBlocks; + ThreadLocalBlocksRelease(EvalParallelContext& ctx) : ctx_(ctx) {} + void operator()(Blocks& blocks) { blocks.Release(ctx_); } + + private: + EvalParallelContext& ctx_; + }; + + // ThreadLocalBlocks initialization callables. + using ThreadLocalLhsInit = + ThreadLocalBlocksInitialize; + using ThreadLocalRhsInit = + ThreadLocalBlocksInitialize; + + // ThreadLocalBlocks release callables. + using ThreadLocalLhsRelease = ThreadLocalBlocksRelease; + using ThreadLocalRhsRelease = ThreadLocalBlocksRelease; + + // Thread local containers for Lhs/Rhs block packs. In practice only one of + // them will be used, depending on the shard_by_col value. + Eigen::ThreadLocal, ThreadLocalLhsInit, + ThreadLocalLhsRelease> + lhs_thread_local_blocks_; + Eigen::ThreadLocal, ThreadLocalRhsInit, + ThreadLocalRhsRelease> + rhs_thread_local_blocks_; // After a particular shard for Kth slice missed thread local execution // opportunity (K-1 slice didn't complete kernels execution), we can no @@ -630,12 +820,10 @@ struct TensorEvaluator& blocks = lhs_thread_local_blocks_.local(); - Index base_idx = gm_ * device_.currentThreadId(); - Index grain_idx = m1 - m * gm_; - Index block_idx = base_idx + grain_idx; - - return thread_local_packed_lhs_[block_idx]; + Index grain_index = m1 - m * gm_; + return blocks.block(grain_index); } else { return packed_lhs_[k % (P - 1)][m1]; } @@ -644,12 +832,10 @@ struct TensorEvaluator& blocks = rhs_thread_local_blocks_.local(); - Index base_idx = gn_ * device_.currentThreadId(); - Index grain_idx = n1 - n * gn_; - Index block_idx = base_idx + grain_idx; - - return thread_local_packed_rhs_[block_idx]; + Index grain_index = n1 - n * gn_; + return blocks.block(grain_index); } else { return packed_rhs_[k % (P - 1)][n1]; } @@ -877,11 +1063,11 @@ struct TensorEvaluator 0 || device_.currentThreadId() < 0); + (k > 0 || std::this_thread::get_id() == created_by_thread_id_); if (pack_async) { device_.enqueueNoNotification( -- cgit v1.2.3