From dd6d65898a9826bb07556f2c788e6c0757d27603 Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Wed, 12 Dec 2018 14:45:31 -0800 Subject: Fix shorten-64-to-32 warning. Use regular memcpy if num_threads==0. --- unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h') diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index 3b87b114d..e03735611 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -87,13 +87,13 @@ struct ThreadPoolDevice { const size_t kMinBlockSize = 32768; typedef TensorCostModel CostModel; const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4); - if (n <= kMinBlockSize || num_threads == 1) { + if (n <= kMinBlockSize || num_threads < 2) { ::memcpy(dst, src, n); } else { const char* src_ptr = static_cast(src); char* dst_ptr = static_cast(dst); const size_t blocksize = (n + (num_threads - 1)) / num_threads; - Barrier barrier(num_threads - 1); + Barrier barrier(static_cast(num_threads - 1)); // Launch the last 3 blocks on worker threads. for (size_t i = 1; i < num_threads; ++i) { enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] { -- cgit v1.2.3 From eb21bab769b11546d08f7db0b5bb78bfde6cdbae Mon Sep 17 00:00:00 2001 From: Eugene Zhulenev Date: Mon, 4 Feb 2019 10:43:16 -0800 Subject: Parallelize tensor contraction only by sharding dimension and use 'thread-local' memory for packing --- .../CXX11/src/Tensor/TensorContractionThreadPool.h | 223 +++++++++++++++++++-- .../CXX11/src/Tensor/TensorDeviceThreadPool.h | 6 + 2 files changed, 212 insertions(+), 17 deletions(-) (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h') diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h index e06099957..4932514c7 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h @@ -335,6 +335,47 @@ struct TensorEvaluator= device_.numThreadsInPool()) { + parallelize_by_sharding_dim_only_ = true; + + int num_worker_threads = device_.numThreadsInPool(); + + if (shard_by_col) { + can_use_thread_local_packed_ = new std::atomic[nn_]; + for (int i = 0; i < nn_; ++i) + can_use_thread_local_packed_[i].store(true, + std::memory_order_relaxed); + + Index num_blocks = num_worker_threads * gn_; + thread_local_packed_mem_ = device_.allocate(num_blocks * rhs_size); + mem = static_cast(thread_local_packed_mem_); + + thread_local_packed_rhs_.resize(num_blocks, nullptr); + for (Index i = 0; i < num_blocks; ++i) { + thread_local_packed_rhs_[i] = reinterpret_cast(mem); + mem += rhs_size; + } + } else { + can_use_thread_local_packed_ = new std::atomic[nm_]; + for (int i = 0; i < nm_; ++i) + can_use_thread_local_packed_[i].store(true, + std::memory_order_relaxed); + + Index num_blocks = num_worker_threads * gm_; + thread_local_packed_mem_ = device_.allocate(num_blocks * lhs_size); + mem = static_cast(thread_local_packed_mem_); + + thread_local_packed_lhs_.resize(num_blocks, nullptr); + for (Index i = 0; i < num_blocks; ++i) { + thread_local_packed_lhs_[i] = reinterpret_cast(mem); + mem += lhs_size; + } + } + } } ~Context() { @@ -343,6 +384,10 @@ struct TensorEvaluator packed_lhs_[P - 1]; std::vector packed_rhs_[P - 1]; + + // If there is enough concurrency in the sharding dimension, we choose not + // to paralellize by the other dimension, and execute all kernels in sync + // mode. This reduces parallelism from the nm_ x nn_ down to nn_ + // (shard_by_col==true) or nm_ (shard_by_col==false). + bool parallelize_by_sharding_dim_only_ = false; + + // If we choose to parallelize only by the sharding dimension, each thread + // will have it's own "thead local" (not a c++ thread local storage) memory + // for packed_lhs or packed_rhs (shard_by_col = false of true). This memory + // can't be passed to a kernel that might execute on a different thread. + // + // In practice when we are ready to pack memory for the sharding dimension + // (rhs if shard_by_col==true) of the K-th slice, all kernels for K-1 slice + // already computed (99% of the time), and we can pack data into the thread + // local storage, and guarantee that all the kernels will be executed + // immediately in the same thread. This significantly increases L1 cache hit + // ratio and reduces pressure on the memory bus. + // + // It's still possible that kernel for the K-th slice will be ready before + // completion of the K-1 kernel, so we have to allocate "global" packed_lhs_ + // and packed_rhs_ to allow kernels to be executed later on a thread + // different from the thread that was used for packing. + void* thread_local_packed_mem_; + + // Only one of these will beinitialized depending on shard_by_col value. + std::vector thread_local_packed_lhs_; + std::vector thread_local_packed_rhs_; + + // After a particular shard for Kth slice missed thread local execution + // opportunity (K-1 slice didn't complete kernels execution), we can no + // longer schedule K+1 and following slices in thread local mode, because + // there is no more guarantee that previous kernels were executed + // sequentially in the same thread (size is nn_ or nm_). + std::atomic* can_use_thread_local_packed_; + std::atomic** state_kernel_[P]; // state_switch_ is frequently modified by worker threads, while other // fields are read-only after constructor. Let's move it to a separate cache @@ -434,22 +515,96 @@ struct TensorEvaluator state_packing_ready_[P]; std::atomic state_switch_[P]; + LhsScalar* packed_lhs(Index m, Index k, Index m1, bool use_thread_local) { + if (use_thread_local) { + eigen_assert(!shard_by_col_); + + Index base_idx = gm_ * device_.currentThreadId(); + Index grain_idx = m1 - m * gm_; + Index block_idx = base_idx + grain_idx; + + return thread_local_packed_lhs_[block_idx]; + } else { + return packed_lhs_[k % (P - 1)][m1]; + } + } + + RhsScalar* packed_rhs(Index n, Index k, Index n1, bool use_thread_local) { + if (use_thread_local) { + eigen_assert(shard_by_col_); + + Index base_idx = gn_ * device_.currentThreadId(); + Index grain_idx = n1 - n * gn_; + Index block_idx = base_idx + grain_idx; + + return thread_local_packed_rhs_[block_idx]; + } else { + return packed_rhs_[k % (P - 1)][n1]; + } + } + + // In following two methods (pack_lhs and pack_rhs), if we know for sure + // that we'll be able to immediately call a kernel with packed data, and do + // not submit it to the thread pool, we can use thread local memory for + // packed data. + // + // We can only reliably check it if we are running all kernels in sync mode + // (parallelize only by sharding dim). If kernel for m==0 (n==0) is ready to + // run, it's guaranteed that all kernels with larger values of m (n) are + // also ready, because we execute them in the same order for all K slices. + void pack_lhs(Index m, Index k) { + bool use_thread_local = false; + + if (parallelize_by_sharding_dim_only_ && !shard_by_col_ && + can_use_thread_local_packed_[m].load(std::memory_order_relaxed)) { + if (state_kernel_[k % P][m][0].load(std::memory_order_relaxed) == 1) { + use_thread_local = true; + } else { + // If we can't guarantee that all kernels in `k` slice will be + // executed sequentially in current thread, it's no longer safe to use + // thread local memory in followig slices along the k dimensions. + eigen_assert(k > 0); + can_use_thread_local_packed_[m].store(false, + std::memory_order_relaxed); + } + } + const Index mend = m * gm_ + gm(m); for (Index m1 = m * gm_; m1 < mend; m1++) - TensorContractionKernel::packLhs(packed_lhs_[k % (P - 1)][m1], + TensorContractionKernel::packLhs(packed_lhs(m, k, m1, use_thread_local), lhs_.getSubMapper(m1 * bm_, k * bk_), bk(k), bm(m1)); if (!parallel_pack_ && shard_by_col_) { + assert(!use_thread_local); signal_packing(k); } else { signal_switch(k + 1); - for (Index n = nn_ - 1; n >= 0; n--) signal_kernel(m, n, k, n == 0); + for (Index n = nn_ - 1; n >= 0; n--) { + bool sync = parallelize_by_sharding_dim_only_ || n == 0; + signal_kernel(m, n, k, sync, use_thread_local); + } } } void pack_rhs(Index n, Index k) { + bool use_thread_local = false; + + if (parallelize_by_sharding_dim_only_ && shard_by_col_ && + can_use_thread_local_packed_[n].load(std::memory_order_relaxed)) { + if (state_kernel_[k % P][0][n].load(std::memory_order_relaxed) == 1) { + use_thread_local = true; + } else { + // If we can't guarantee that all kernels in `k` slice will be + // executed sequentially in current thread, it's no longer safe to use + // thread local memory in followig slices along the k dimensions. + eigen_assert(k > 0); + can_use_thread_local_packed_[n].store(false, + std::memory_order_relaxed); + } + } + const Index nend = n * gn_ + gn(n); for (Index n1 = n * gn_; n1 < nend; n1++) { if (k == 0) { @@ -462,20 +617,24 @@ struct TensorEvaluator= 0; m--) signal_kernel(m, n, k, m == 0); + for (Index m = nm_ - 1; m >= 0; m--) { + bool sync = parallelize_by_sharding_dim_only_ || m == 0; + signal_kernel(m, n, k, sync, use_thread_local); + } } else { + assert(!use_thread_local); signal_packing(k); } } - void kernel(Index m, Index n, Index k) { + void kernel(Index m, Index n, Index k, bool use_thread_local) { // Note: order of iteration matters here. Iteration over m is innermost // because we want to reuse the same packed rhs in consecutive tasks // (rhs fits into L2$ while lhs only into L3$). @@ -486,8 +645,10 @@ struct TensorEvaluator* state = &state_kernel_[k % P][m][n]; Index s = state->load(); eigen_assert(s > 0); - if (s != 1 && state->fetch_sub(1) != 1) return; + if (s != 1 && state->fetch_sub(1) != 1) { + eigen_assert(!use_thread_local); + return; + } state->store(parallel_pack_ ? 3 : 2, std::memory_order_relaxed); - if (sync) - kernel(m, n, k); - else - device_.enqueueNoNotification([=]() { kernel(m, n, k); }); + if (sync) { + kernel(m, n, k, use_thread_local); + } else { + eigen_assert(!use_thread_local); + device_.enqueueNoNotification( + [=]() { kernel(m, n, k, use_thread_local); }); + } } void signal_switch(Index k, Index v = 1) { @@ -589,7 +759,26 @@ struct TensorEvaluator 0 || device_.currentThreadId() < 0); + + if (pack_async) { + device_.enqueueNoNotification( + [=]() { enqueue_packing_helper(start, end, k, rhs); }); + } else { + enqueue_packing_helper(start, end, k, rhs); + } } } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index e03735611..fb34cd75e 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -122,6 +122,12 @@ struct ThreadPoolDevice { return num_threads_; } + // Number of theads available in the underlying thread pool. This number can + // be different from the value returned by numThreads(). + EIGEN_STRONG_INLINE int numThreadsInPool() const { + return pool_->NumThreads(); + } + EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const { return l1CacheSize(); } -- cgit v1.2.3