diff options
Diffstat (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h')
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h | 670 |
1 files changed, 665 insertions, 5 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h index 9044454fd..ee16cde9b 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h @@ -14,6 +14,8 @@ #ifdef EIGEN_USE_THREADS namespace Eigen { + +#ifdef EIGEN_USE_SIMPLE_THREAD_POOL namespace internal { template<typename LhsScalar, typename LhsMapper, typename Index> @@ -52,7 +54,7 @@ struct packRhsAndKernelArg { }; } // end namespace internal - +#endif // EIGEN_USE_SIMPLE_THREAD_POOL template<typename Indices, typename LeftArgType, typename RightArgType> struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgType>, ThreadPoolDevice> : @@ -92,10 +94,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT typedef array<Index, RDims> right_dim_mapper_t; typedef array<Index, ContractDims> contract_t; - typedef array<Index, max_n_1<LDims - ContractDims>::size> left_nocontract_t; - typedef array<Index, max_n_1<RDims - ContractDims>::size> right_nocontract_t; + typedef array<Index, LDims - ContractDims> left_nocontract_t; + typedef array<Index, RDims - ContractDims> right_nocontract_t; - static const int NumDims = max_n_1<LDims + RDims - 2 * ContractDims>::size; + static const int NumDims = LDims + RDims - 2 * ContractDims; typedef DSizes<Index, NumDims> Dimensions; @@ -110,6 +112,623 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT TensorEvaluator(const XprType& op, const Device& device) : Base(op, device) {} +#ifndef EIGEN_USE_SIMPLE_THREAD_POOL + template <bool lhs_inner_dim_contiguous, bool rhs_inner_dim_contiguous, + bool rhs_inner_dim_reordered, int Alignment> + void evalProduct(Scalar* buffer) const { + typedef + typename internal::remove_const<typename EvalLeftArgType::Scalar>::type + LhsScalar; + typedef + typename internal::remove_const<typename EvalRightArgType::Scalar>::type + RhsScalar; + typedef typename internal::gebp_traits<LhsScalar, RhsScalar> Traits; + typedef TensorEvaluator<EvalLeftArgType, Device> LeftEvaluator; + typedef TensorEvaluator<EvalRightArgType, Device> RightEvaluator; + typedef internal::TensorContractionInputMapper< + LhsScalar, Index, internal::Lhs, LeftEvaluator, left_nocontract_t, + contract_t, internal::packet_traits<LhsScalar>::size, + lhs_inner_dim_contiguous, false, Unaligned> + LhsMapper; + typedef internal::TensorContractionInputMapper< + RhsScalar, Index, internal::Rhs, RightEvaluator, right_nocontract_t, + contract_t, internal::packet_traits<RhsScalar>::size, + rhs_inner_dim_contiguous, rhs_inner_dim_reordered, Unaligned> + RhsMapper; + typedef internal::blas_data_mapper<Scalar, Index, ColMajor> OutputMapper; + typedef internal::gemm_pack_lhs<LhsScalar, Index, + typename LhsMapper::SubMapper, Traits::mr, + Traits::LhsProgress, ColMajor> + LhsPacker; + typedef internal::gemm_pack_rhs< + RhsScalar, Index, typename RhsMapper::SubMapper, Traits::nr, ColMajor> + RhsPacker; + typedef internal::gebp_kernel<LhsScalar, RhsScalar, Index, OutputMapper, + Traits::mr, Traits::nr, false, false> + GebpKernel; + + const Index m = this->m_i_size; + const Index n = this->m_j_size; + const Index k = this->m_k_size; + if (m == 0 || n == 0 || k == 0) return; + + // Compute a set of algorithm parameters: + // - kernel block sizes (bm, bn, bk) + // - task grain sizes (number of kernels executed per task: gm, gn) + // - number of threads + // - sharding by row/column + // - parallel packing or first lhs then rhs + // and some derived parameters: + // - number of tasks (nm, nn, nk) + // - number of kernels (nm0, nn0) + // Unfortunately, all these parameters are tightly interdependent. + // So in some cases we first compute approximate values, then compute other + // values based on these approximations and then refine the approximations. + + // There are lots of heuristics here. There is some reasoning behind them, + // but ultimately they are just tuned on contraction benchmarks for + // different input configurations, thread counts and instruction sets. + // So feel free to question any of them. + + // Compute whether we want to shard by row or by column. + // This is a first approximation, it will be refined later. Since we don't + // know number of threads yet we use 2, because what's we are most + // interested in at this point is whether it makes sense to use + // parallelization at all or not. + bool shard_by_col = shardByCol(m, n, 2); + + // First approximation of kernel blocking sizes. + // Again, we don't know number of threads yet, so we use 2. + Index bm, bn, bk; + if (shard_by_col) { + internal::TensorContractionBlocking<LhsMapper, RhsMapper, Index, + internal::ShardByCol> + blocking(k, m, n, 2); + bm = blocking.mc(); + bn = blocking.nc(); + bk = blocking.kc(); + } else { + internal::TensorContractionBlocking<LhsMapper, RhsMapper, Index, + internal::ShardByRow> + blocking(k, m, n, 2); + bm = blocking.mc(); + bn = blocking.nc(); + bk = blocking.kc(); + } + + // Compute optimal number of threads. + // Note: we use bk instead of k here because we are interested in amount of + // _parallelizable_ computations, and computations are not parallelizable + // across k dimension. + const TensorOpCost cost = + contractionCost(m, n, bm, bn, bk, shard_by_col, false); + int num_threads = TensorCostModel<ThreadPoolDevice>::numThreads( + static_cast<double>(n) * m, cost, this->m_device.numThreads()); + + // TODO(dvyukov): this is a stop-gap to prevent regressions while the cost + // model is not tuned. Remove this when the cost model is tuned. + if (n == 1) num_threads = 1; + + if (num_threads == 1) { + // The single-threaded algorithm should be faster in this case. + if (n == 1) + this->template evalGemv<lhs_inner_dim_contiguous, + rhs_inner_dim_contiguous, + rhs_inner_dim_reordered, Alignment>(buffer); + else + this->template evalGemm<lhs_inner_dim_contiguous, + rhs_inner_dim_contiguous, + rhs_inner_dim_reordered, Alignment>(buffer); + return; + } + + // Now that we know number of threads, recalculate sharding and blocking. + shard_by_col = shardByCol(m, n, num_threads); + if (shard_by_col) { + internal::TensorContractionBlocking<LhsMapper, RhsMapper, Index, + internal::ShardByCol> + blocking(k, m, n, num_threads); + bm = blocking.mc(); + bn = blocking.nc(); + bk = blocking.kc(); + } else { + internal::TensorContractionBlocking<LhsMapper, RhsMapper, Index, + internal::ShardByRow> + blocking(k, m, n, num_threads); + bm = blocking.mc(); + bn = blocking.nc(); + bk = blocking.kc(); + } + + // Number of kernels for each dimension. + Index nm0 = divup(m, bm); + Index nn0 = divup(n, bn); + Index nk = divup(k, bk); + + // Calculate task grain size (number of kernels executed per task). + // This task size coarsening serves two purposes: + // 1. It reduces per-task overheads including synchronization overheads. + // 2. It allows to use caches better (reuse the same packed rhs in several + // consecutive kernels). + Index gm = 1; + Index gn = 1; + // If we are sharding by column, then we prefer to reduce rows first. + if (shard_by_col) { + gm = coarsenM(m, n, bm, bn, bk, gn, num_threads, shard_by_col); + gn = coarsenN(m, n, bm, bn, bk, gm, num_threads, shard_by_col); + } else { + gn = coarsenN(m, n, bm, bn, bk, gm, num_threads, shard_by_col); + gm = coarsenM(m, n, bm, bn, bk, gn, num_threads, shard_by_col); + } + // Number of tasks in each dimension. + Index nm = divup(nm0, gm); + Index nn = divup(nn0, gn); + + // Last by not least, decide whether we want to issue both lhs and rhs + // packing in parallel; or issue lhs packing first, and then issue rhs + // packing when lhs packing completes (for !shard_by_col lhs and rhs are + // swapped). Parallel packing allows more parallelism (for both packing and + // kernels), while sequential packing provides better locality (once + // a thread finishes rhs packing it proceed to kernels with that rhs). + // First, we are interested in parallel packing if there are few tasks. + bool parallel_pack = num_threads >= nm * nn; + // Also do parallel packing if all data fits into L2$. + if (m * bk * Index(sizeof(LhsScalar)) + n * bk * Index(sizeof(RhsScalar)) <= + l2CacheSize() * num_threads) + parallel_pack = true; + // But don't do it if we will use each rhs only once. Locality seems to be + // more important in this case. + if ((shard_by_col ? nm : nn) == 1) parallel_pack = false; + + LhsMapper lhs(this->m_leftImpl, this->m_left_nocontract_strides, + this->m_i_strides, this->m_left_contracting_strides, + this->m_k_strides); + + RhsMapper rhs(this->m_rightImpl, this->m_right_nocontract_strides, + this->m_j_strides, this->m_right_contracting_strides, + this->m_k_strides); + + Context<LhsPacker, RhsPacker, GebpKernel, LhsMapper, RhsMapper, + OutputMapper>(this->m_device, num_threads, lhs, rhs, buffer, m, n, + k, bm, bn, bk, nm, nn, nk, gm, gn, nm0, nn0, + shard_by_col, parallel_pack) + .run(); + } + + // Context coordinates a single parallel gemm operation. + template <typename LhsPacker, typename RhsPacker, typename GebpKernel, + typename LhsMapper, typename RhsMapper, typename OutputMapper> + class Context { + public: + Context(const Device& device, int num_threads, LhsMapper& lhs, + RhsMapper& rhs, Scalar* buffer, Index tm, Index tn, Index tk, Index bm, + Index bn, Index bk, Index nm, Index nn, Index nk, Index gm, + Index gn, Index nm0, Index nn0, bool shard_by_col, + bool parallel_pack) + : device_(device), + lhs_(lhs), + rhs_(rhs), + buffer_(buffer), + output_(buffer, tm), + num_threads_(num_threads), + shard_by_col_(shard_by_col), + parallel_pack_(parallel_pack), + m_(tm), + n_(tn), + k_(tk), + bm_(bm), + bn_(bn), + bk_(bk), + nm_(nm), + nn_(nn), + nk_(nk), + gm_(gm), + gn_(gn), + nm0_(nm0), + nn0_(nn0) + { + for (Index x = 0; x < P; x++) { + // Normal number of notifications for k slice switch is + // nm_ + nn_ + nm_ * nn_. However, first P - 1 slices will receive only + // nm_ + nn_ notifications, because they will not receive notifications + // from preceeding kernels. + state_switch_[x] = + x == 0 + ? 1 + : (parallel_pack_ ? nn_ + nm_ : (shard_by_col_ ? nn_ : nm_)) + + (x == P - 1 ? nm_ * nn_ : 0); + state_packing_ready_[x] = + parallel_pack_ ? 0 : (shard_by_col_ ? nm_ : nn_); + state_kernel_[x] = new std::atomic<uint8_t>*[nm_]; + for (Index m = 0; m < nm_; m++) { + state_kernel_[x][m] = new std::atomic<uint8_t>[nn_]; + // Kernels generally receive 3 notifications (previous kernel + 2 + // packing), but the first slice won't get notifications from previous + // kernels. + for (Index n = 0; n < nn_; n++) + state_kernel_[x][m][n].store( + (x == 0 ? 0 : 1) + (parallel_pack_ ? 2 : 1), + std::memory_order_relaxed); + } + } + + // Allocate memory for packed rhs/lhs matrices. + size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1); + size_t lhs_size = + divup<size_t>(bm_ * bk_ * sizeof(LhsScalar), align) * align; + size_t rhs_size = + divup<size_t>(bn_ * bk_ * sizeof(RhsScalar), align) * align; + packed_mem_ = static_cast<char*>(internal::aligned_malloc( + (nm0_ * lhs_size + nn0_ * rhs_size) * std::min<size_t>(nk_, P - 1))); + char* mem = static_cast<char*>(packed_mem_); + for (Index x = 0; x < numext::mini<Index>(nk_, P - 1); x++) { + packed_lhs_[x].resize(nm0_); + for (Index m = 0; m < nm0_; m++) { + packed_lhs_[x][m] = reinterpret_cast<LhsScalar*>(mem); + mem += lhs_size; + } + packed_rhs_[x].resize(nn0_); + for (Index n = 0; n < nn0_; n++) { + packed_rhs_[x][n] = reinterpret_cast<RhsScalar*>(mem); + mem += rhs_size; + } + } + } + + ~Context() { + for (Index x = 0; x < P; x++) { + for (Index m = 0; m < nm_; m++) delete[] state_kernel_[x][m]; + delete[] state_kernel_[x]; + } + internal::aligned_free(packed_mem_); + } + + void run() { + // Kick off packing of the first slice. + signal_switch(0, 1); + // Wait for overall completion. + // TODO(dvyukov): this wait can lead to deadlock. + // If nthreads contractions are concurrently submitted from worker + // threads, this wait will block all worker threads and the system will + // deadlock. + done_.Wait(); + } + + private: + Notification done_; + const Device& device_; + LhsMapper& lhs_; + RhsMapper& rhs_; + Scalar* const buffer_; + OutputMapper output_; + const int num_threads_; + const bool shard_by_col_; + const bool parallel_pack_; + // Matrix sizes. + const Index m_; + const Index n_; + const Index k_; + // Block sizes. + const Index bm_; + const Index bn_; + const Index bk_; + // Number of tasks. + const Index nm_; + const Index nn_; + const Index nk_; + // Task grain sizes (number of kernels executed per task). + const Index gm_; + const Index gn_; + // Number of blocks (this is different from ni_/nn_ because of task size + // coarsening). + const Index nm0_; + const Index nn0_; + + // Parallelization strategy. + // + // Blocks related to the same k block can run in parallel because they write + // to different output blocks. So we parallelize within k slices, this + // gives us parallelism level of m x n. Before we can start any kernels + // related to k-th slice, we need to issue m lhs packing tasks and n rhs + // packing tasks. + // + // However, there is a bottleneck when we are finishing kernels for k-th + // slice (at the very end there is only 1 runnable kernel). To mitigate this + // bottleneck we allow kernels from k-th and k+1-th slices to run in + // parallel. Note that (m, n, k) and (m, n, k+1) kernels write to the same + // output block, so they must not run in parallel. + // + // This gives us the following dependency graph. + // On each k slice we have m x n kernel tasks, m lhs paking tasks and n rhs + // packing tasks. + // Kernel (m, n, k) can start when: + // - kernel (m, n, k-1) has finished + // - lhs packing (m, k) has finished + // - rhs packing (n, k) has finished + // Lhs/rhs packing can start when: + // - all k-1 packing has finished (artificially imposed to limit amount of + // parallel packing) + // + // On top of that we limit runnable tasks to two consecutive k slices. + // This is done to limit amount of memory we need for packed lhs/rhs + // (for each k slice we need m*bk + n*bk memory in packed_lhs_/packed_rhs_). + // + // state_switch_ tracks when we are ready to switch to the next k slice. + // state_kernel_[m][n] tracks when we are ready to kick off kernel (m, n). + // These variable are rolling over 3 consecutive k slices: first two we are + // actively executing + one to track completion of kernels in the second + // slice. + static const Index P = 3; + void* packed_mem_; + std::vector<LhsScalar*> packed_lhs_[P - 1]; + std::vector<RhsScalar*> packed_rhs_[P - 1]; + std::atomic<uint8_t>** state_kernel_[P]; + // state_switch_ is frequently modified by worker threads, while other + // fields are read-only after constructor. Let's move it to a separate cache + // line to reduce cache-coherency traffic. + char pad_[128]; + std::atomic<Index> state_packing_ready_[P]; + std::atomic<Index> state_switch_[P]; + + void pack_lhs(Index m, Index k) { + const Index mend = m * gm_ + gm(m); + for (Index m1 = m * gm_; m1 < mend; m1++) + LhsPacker()(packed_lhs_[k % (P - 1)][m1], + lhs_.getSubMapper(m1 * bm_, k * bk_), bk(k), bm(m1)); + + if (!parallel_pack_ && shard_by_col_) { + signal_packing(k); + } else { + signal_switch(k + 1); + for (Index n = nn_ - 1; n >= 0; n--) signal_kernel(m, n, k, n == 0); + } + } + + void pack_rhs(Index n, Index k) { + const Index nend = n * gn_ + gn(n); + for (Index n1 = n * gn_; n1 < nend; n1++) { + if (k == 0) { + // Zero the output memory in parallel. + // On 10000x2x10000 mm zeroing can easily take half of time. + // Zero (bn x m) row. Safe to do here because all kernels that will + // write to this memory depend on completion of this task. + // Note: don't call device_.memset() here. device_.memset() blocks on + // thread pool worker thread, which can lead to underutilization and + // deadlocks. + memset(buffer_ + n1 * bn_ * m_, 0, bn(n1) * m_ * sizeof(Scalar)); + } + RhsPacker()(packed_rhs_[k % (P - 1)][n1], + rhs_.getSubMapper(k * bk_, n1 * bn_), bk(k), bn(n1)); + } + + if (parallel_pack_ || shard_by_col_) { + signal_switch(k + 1); + for (Index m = nm_ - 1; m >= 0; m--) signal_kernel(m, n, k, m == 0); + } else { + signal_packing(k); + } + } + + void kernel(Index m, Index n, Index k) { + // Note: order of iteration matters here. Iteration over m is innermost + // because we want to reuse the same packed rhs in consequetive tasks + // (rhs fits into L2$ while lhs only into L3$). + const Index nend = n * gn_ + gn(n); + const Index mend = m * gm_ + gm(m); + if (shard_by_col_) { + for (Index n1 = n * gn_; n1 < nend; n1++) { + for (Index m1 = m * gm_; m1 < mend; m1++) + GebpKernel()(output_.getSubMapper(m1 * bm_, n1 * bn_), + packed_lhs_[k % (P - 1)][m1], + packed_rhs_[k % (P - 1)][n1], bm(m1), bk(k), bn(n1), + Scalar(1), -1, -1, 0, 0); + } + } else { + for (Index m1 = m * gm_; m1 < mend; m1++) + for (Index n1 = n * gn_; n1 < nend; n1++) { + GebpKernel()(output_.getSubMapper(m1 * bm_, n1 * bn_), + packed_lhs_[k % (P - 1)][m1], + packed_rhs_[k % (P - 1)][n1], bm(m1), bk(k), bn(n1), + Scalar(1), -1, -1, 0, 0); + } + } + signal_kernel(m, n, k + 1, false); + signal_switch(k + 2); + } + + void signal_packing(Index k) { + eigen_assert(!parallel_pack_); + Index s = state_packing_ready_[k % P].fetch_sub(1); + eigen_assert(s > 0); + if (s != 1) return; + state_packing_ready_[k % P] = shard_by_col_ ? nm_ : nn_; + enqueue_packing(k, shard_by_col_); + } + + void signal_kernel(Index m, Index n, Index k, bool sync) { + std::atomic<uint8_t>* state = &state_kernel_[k % P][m][n]; + Index s = state->load(); + eigen_assert(s > 0); + if (s != 1 && state->fetch_sub(1) != 1) return; + state->store(parallel_pack_ ? 3 : 2, std::memory_order_relaxed); + if (sync) + kernel(m, n, k); + else + device_.enqueueNoNotification([=]() { kernel(m, n, k); }); + } + + void signal_switch(Index k, Index v = 1) { + Index s = state_switch_[k % P].fetch_sub(v); + eigen_assert(s >= v); + if (s != v) return; + + // Ready to switch to the next k slice. + // Reset counter for the next iteration. + state_switch_[k % P] = + (parallel_pack_ ? nm_ + nn_ : (shard_by_col_ ? nn_ : nm_)) + + nm_ * nn_; + if (k < nk_) { + // Issue lhs/rhs packing. Their completion will in turn kick off + // kernels. + if (parallel_pack_) { + enqueue_packing(k, !shard_by_col_); + enqueue_packing(k, shard_by_col_); + } else if (shard_by_col_) { + enqueue_packing(k, false); + } else { + enqueue_packing(k, true); + } + + // Termination handling. + // Because kernel completion signals k + 2 switch, we need to finish nk + // + 2 slices without issuing any tasks on nk + 1 slice. So here we + // pretend that all nk + 1 packing tasks just finish instantly; so that + // nk + 2 switch only waits for completion of nk kernels. + } else if (k == nk_) { + signal_switch(k + 1, + parallel_pack_ ? nm_ + nn_ : (shard_by_col_ ? nn_ : nm_)); + } else { + done_.Notify(); + } + } + + // Enqueue all rhs/lhs packing for k-th slice. + void enqueue_packing(Index k, bool rhs) { + enqueue_packing_helper(0, rhs ? nn_ : nm_, k, rhs); + } + + void enqueue_packing_helper(Index start, Index end, Index k, bool rhs) { + if (end - start == 1) { + if (rhs) + pack_rhs(start, k); + else + pack_lhs(start, k); + } else { + Index mid = (start + end) / 2; + device_.enqueueNoNotification( + [=]() { enqueue_packing_helper(mid, end, k, rhs); }); + device_.enqueueNoNotification( + [=]() { enqueue_packing_helper(start, mid, k, rhs); }); + } + } + + // Block sizes with accounting for potentially incomplete last block. + Index bm(Index m) const { return m + 1 < nm0_ ? bm_ : m_ + bm_ - bm_ * nm0_; } + Index bn(Index n) const { return n + 1 < nn0_ ? bn_ : n_ + bn_ - bn_ * nn0_; } + Index bk(Index k) const { return k + 1 < nk_ ? bk_ : k_ + bk_ - bk_ * nk_; } + // Task grain sizes accounting for potentially incomplete last task. + Index gm(Index m) const { return m + 1 < nm_ ? gm_ : nm0_ + gm_ - gm_ * nm_; } + Index gn(Index n) const { return n + 1 < nn_ ? gn_ : nn0_ + gn_ - gn_ * nn_; } + + Context(const Context&) = delete; + void operator=(const Context&) = delete; + }; + + // Decide whether we want to shard m x n contraction by columns or by rows. + static bool shardByCol(Index m, Index n, Index num_threads) { + // Note: we are comparing both n and m against Traits::nr, it is not + // a mistake. We are trying to figure out how both n and m will fit into + // the main sharding dimension. + + // Sharding by column is the default + // ... unless there is enough data for vectorization over rows + if (m / num_threads >= Traits::nr && + // and not enough data for vectorization over columns + (n / num_threads < Traits::nr || + // ... or barely enough data for vectorization over columns, + // but it is not evenly dividable across threads + (n / num_threads < 4 * Traits::nr && + (n % (num_threads * Traits::nr)) != 0 && + // ... and it is evenly dividable across threads for rows + ((m % (num_threads * Traits::nr)) == 0 || + // .. or it is not evenly dividable for both dimensions but + // there is much more data over rows so that corner effects are + // mitigated. + (m / n >= 6))))) + return false; + // Wait, or if matrices are just substantially prolonged over the other + // dimension. + if (n / num_threads < 16 * Traits::nr && m > n * 32) return false; + return true; + } + + Index coarsenM(Index m, Index n, Index bm, Index bn, Index bk, Index gn, + int num_threads, bool shard_by_col) const { + Index gm = 1; + Index gm1 = 1; + Index nm0 = divup(m, bm); + Index nm1 = nm0; + for (;;) { + // Find the next candidate for m grain size. It needs to result in + // different number of blocks. E.g. if we have 10 kernels, we want to try + // 5 and 10, but not 6, 7, 8 and 9. + while (gm1 <= nm0 && nm1 == divup(nm0, gm1)) gm1++; + if (gm1 > nm0) break; + // Check the candidate. + int res = checkGrain(m, n, bm, bn, bk, gm1, gn, gm, gn, num_threads, + shard_by_col); + if (res < 0) break; + nm1 = divup(nm0, gm1); + if (res == 0) continue; + // Commit new grain size. + gm = gm1; + } + return gm; + } + + Index coarsenN(Index m, Index n, Index bm, Index bn, Index bk, Index gm, + int num_threads, bool shard_by_col) const { + Index gn = 1; + Index gn1 = 1; + Index nn0 = divup(n, bn); + Index nn1 = nn0; + for (;;) { + while (gn1 <= nn0 && nn1 == divup(nn0, gn1)) gn1++; + if (gn1 > nn0) break; + int res = checkGrain(m, n, bm, bn, bk, gm, gn1, gm, gn, num_threads, + shard_by_col); + if (res < 0) break; + nn1 = divup(nn0, gn1); + if (res == 0) continue; + gn = gn1; + } + return gn; + } + + // checkGrain checks whether grain (gm, gn) is suitable and is better than + // (oldgm, oldgn). + int checkGrain(Index m, Index n, Index bm, Index bn, Index bk, Index gm, + Index gn, Index oldgm, Index oldgn, int num_threads, + bool shard_by_col) const { + const TensorOpCost cost = + contractionCost(bm * gm, bn * gn, bm, bn, bk, shard_by_col, true); + double taskSize = TensorCostModel<ThreadPoolDevice>::taskSize( + static_cast<double>(bm) * gm * bn * gn, cost); + // If the task is too small, then we agree on it regardless of anything + // else. Otherwise synchronization overheads will dominate. + if (taskSize < 1) return 1; + // If it is too large, then we reject it and all larger tasks. + if (taskSize > 2) return -1; + // Now we are in presumably good task size range. + // The main deciding factor here is parallelism. Consider that we have 12 + // kernels and 4 threads. Grains of 2, 3 and 4 all yield good task sizes. + // But 2/4 yield 6/3 tasks, which gives us parallelism of 0.75 (at most 3/4 + // of cores will be busy). While grain size 3 gives us 4 tasks, which gives + // us parallelism of 1 (we can load all cores). + Index nm0 = divup(m, bm); + Index nn0 = divup(n, bn); + Index new_tasks = divup(nm0, gm) * divup(nn0, gn); + double new_parallelism = static_cast<double>(new_tasks) / + (divup<int>(new_tasks, num_threads) * num_threads); + Index old_tasks = divup(nm0, oldgm) * divup(nn0, oldgn); + double old_parallelism = static_cast<double>(old_tasks) / + (divup<int>(old_tasks, num_threads) * num_threads); + if (new_parallelism > old_parallelism || new_parallelism == 1) return 1; + return 0; + } + +#else // EIGEN_USE_SIMPLE_THREAD_POOL + template <bool lhs_inner_dim_contiguous, bool rhs_inner_dim_contiguous, bool rhs_inner_dim_reordered, int Alignment> void evalProduct(Scalar* buffer) const { if (this->m_j_size == 1) { @@ -376,7 +995,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT const Index actual_mc = numext::mini(m_base_start + arg.mc, arg.max_m) - m_base_start; gebp(arg.output.getSubMapper(m_base_start, arg.n), (*arg.blockAs)[blockAId], arg.blockB, - actual_mc, arg.kc, arg.nc, 1.0, -1, -1, 0, 0); + actual_mc, arg.kc, arg.nc, Scalar(1), -1, -1, 0, 0); // Notify that the kernel is done. const Index set_idx = blockAId * arg.n_blocks + arg.n_block_idx; @@ -384,6 +1003,47 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT } } } +#endif // EIGEN_USE_SIMPLE_THREAD_POOL + + TensorOpCost contractionCost(Index m, Index n, Index bm, Index bn, Index bk, + bool shard_by_col, bool prepacked) const { + const int packed_size = std::min<int>(PacketType<LhsScalar, Device>::size, + PacketType<RhsScalar, Device>::size); + const int output_packet_size = internal::unpacket_traits<PacketReturnType>::size; + const double kd = static_cast<double>(bk); + // Peak VFMA bandwidth is 0.5. However if we have not enough data for + // vectorization bandwidth drops. The 4.0 and 2.0 bandwidth is determined + // experimentally. + double computeBandwidth = bk == 1 ? 4.0 : + (shard_by_col ? bn : bm) < Traits::nr || + (shard_by_col ? bm : bn) < Traits::mr ? 2.0 : 0.5; +#ifndef EIGEN_VECTORIZE_FMA + // Bandwidth of all of VFMA/MULPS/ADDPS is 0.5 on latest Intel processors. + // However for MULPS/ADDPS we have dependent sequence of 2 such instructions, + // so overall bandwidth is 1.0. + if (computeBandwidth == 0.5) computeBandwidth = 1.0; +#endif + // Computations. + TensorOpCost cost = TensorOpCost(0, 0, kd * computeBandwidth, true, packed_size); + // Output stores. + cost += TensorOpCost(0, sizeof(CoeffReturnType), 0, true, output_packet_size); + if (prepacked) { + // Packing and kernels are executed in different tasks. When we calculate + // task grain size we look only at kernel cost assuming that kernel + // is more expensive than packing. + return cost; + } + // Lhs/rhs loads + computations. + TensorOpCost lhsCost = this->m_leftImpl.costPerCoeff(true) * (kd / n); + TensorOpCost rhsCost = this->m_rightImpl.costPerCoeff(true) * (kd / m); + // Lhs packing memory cost does not contribute considerably to overall + // execution time because lhs is prefetched early and accessed sequentially. + if (shard_by_col) + lhsCost.dropMemoryCost(); + else + rhsCost.dropMemoryCost(); + return cost + lhsCost + rhsCost; + } }; } // end namespace Eigen |