aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
diff options
context:
space:
mode:
authorGravatar Eugene Zhulenev <ezhulenev@google.com>2018-09-27 12:05:06 -0700
committerGravatar Eugene Zhulenev <ezhulenev@google.com>2018-09-27 12:05:06 -0700
commita7a3e9f2b6dfa97887fd44b6d8f658c4928c799d (patch)
treedf58dc8f6c1414abd73fc1d7887020ef47d38492 /unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
parent9f4988959f1b0394ee027f474f49916543ad2f3c (diff)
parent1e5750a5b896089b4455cf4940b4fe88d99b3293 (diff)
Merge with eigen/eigen default
Diffstat (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h')
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h179
1 files changed, 166 insertions, 13 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
index 8a464b073..4553c3785 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
@@ -147,6 +147,14 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
contractionCost(m, n, bm, bn, bk, shard_by_col, false);
int num_threads = TensorCostModel<ThreadPoolDevice>::numThreads(
static_cast<double>(n) * m, cost, this->m_device.numThreads());
+ int num_threads_by_k = numThreadsInnerDim(m, n, k);
+ if (shardByInnerDim(m, n, k, num_threads, num_threads_by_k)) {
+ // We are in the scenario where it is more effective to shard by the
+ // inner dimension.
+ this->template evalShardedByInnerDim<Alignment>(num_threads_by_k,
+ buffer);
+ return;
+ }
// TODO(dvyukov): this is a stop-gap to prevent regressions while the cost
// model is not tuned. Remove this when the cost model is tuned.
@@ -706,20 +714,9 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
PacketType<RhsScalar, Device>::size);
const int output_packet_size = internal::unpacket_traits<PacketReturnType>::size;
const double kd = static_cast<double>(bk);
- // Peak VFMA bandwidth is 0.5. However if we have not enough data for
- // vectorization bandwidth drops. The 4.0 and 2.0 bandwidth is determined
- // experimentally.
- double computeBandwidth = bk == 1 ? 4.0 :
- (shard_by_col ? bn : bm) < Traits::nr ||
- (shard_by_col ? bm : bn) < Traits::mr ? 2.0 : 0.5;
-#ifndef EIGEN_VECTORIZE_FMA
- // Bandwidth of all of VFMA/MULPS/ADDPS is 0.5 on latest Intel processors.
- // However for MULPS/ADDPS we have dependent sequence of 2 such instructions,
- // so overall bandwidth is 1.0.
- if (computeBandwidth == 0.5) computeBandwidth = 1.0;
-#endif
+ double compute_bandwidth = computeBandwidth(false, bm, bn, bk);
// Computations.
- TensorOpCost cost = TensorOpCost(0, 0, kd * computeBandwidth, true, packed_size);
+ TensorOpCost cost = TensorOpCost(0, 0, kd * compute_bandwidth, true, packed_size);
// Output stores.
cost += TensorOpCost(0, sizeof(CoeffReturnType), 0, true, output_packet_size);
if (prepacked) {
@@ -740,6 +737,162 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
return cost + lhsCost + rhsCost;
}
+ template <int Alignment>
+ EIGEN_STRONG_INLINE void addToBuffer(size_t n, const Scalar* src_buf,
+ Scalar* tgt_buf) const {
+ const int output_packet_size = internal::unpacket_traits<PacketReturnType>::size;
+ size_t i = 0;
+ const size_t num_packets = n / output_packet_size;
+ for (; i < output_packet_size * num_packets; i += output_packet_size) {
+ const PacketReturnType src_val =
+ internal::pload<PacketReturnType>(src_buf + i);
+ const PacketReturnType tgt_val =
+ internal::ploadt<PacketReturnType, Alignment>(tgt_buf + i);
+ const PacketReturnType sum = internal::padd(src_val, tgt_val);
+ internal::pstoret<Scalar, PacketReturnType, Alignment>(tgt_buf + i, sum);
+ }
+ for (; i < n; ++i) {
+ tgt_buf[i] += src_buf[i];
+ }
+ }
+
+ // Decide whether we want to shard m x k x n contraction over the inner
+ // (contraction) dimension (k).
+ static bool shardByInnerDim(Index m, Index n, Index k, int num_threads,
+ int num_threads_by_k) {
+ std::ptrdiff_t bufsize = m * n * sizeof(Scalar);
+ bool shard_by_k = false;
+ if (n == 1 || // If mat*vec or...
+ num_threads_by_k < 2 || // running single threaded or...
+ num_threads_by_k <
+ num_threads || // sharding by k gives less parallelism or...
+ bufsize > l3CacheSize() / num_threads_by_k || // need more buffer space
+ // than L3 cache or...
+ k / num_threads_by_k < 2 * Traits::nr) { // k per thread is tiny.
+ shard_by_k = false;
+ } else if (numext::maxi(m, n) / num_threads <
+ Traits::nr || // both other dimensions are tiny or...
+ // k per thread is not small and...
+ (k / num_threads_by_k > 8 * Traits::nr &&
+ // one of the outer dimensions is tiny or sharding by k offers
+ // more parallelism.
+ (numext::mini(m, n) < 2 * Traits::nr ||
+ num_threads_by_k > num_threads))) {
+ shard_by_k = true;
+ }
+ return shard_by_k;
+ }
+
+ template <int Alignment>
+ void evalShardedByInnerDim(int num_threads, Scalar* result) const {
+ const Index m = this->m_i_size;
+ const Index n = this->m_j_size;
+ const Index k = this->m_k_size;
+ // The underlying GEMM kernel assumes that k is a multiple of 8 and
+ // subtle breakage occurs if this is violated.
+ Index block_size = 8 * divup<Index>(k, 8 * num_threads);
+ int num_blocks = divup<Index>(k, block_size);
+ // we use 'result' for the first block's partial result.
+ MaxSizeVector<Scalar*> block_buffers(num_blocks - 1);
+ Barrier barrier(num_blocks);
+ auto process_block = [=, &barrier](Scalar* buf, Index first, Index last) {
+ ::memset(buf, 0, m * n * sizeof(Scalar));
+ TENSOR_CONTRACTION_DISPATCH(
+ this->template evalGemmPartial, Alignment,
+ (buf, first, last, this->m_device.numThreads()));
+ barrier.Notify();
+ };
+ Index start = 0;
+ for (int blocks_left = num_blocks; blocks_left > 0; --blocks_left) {
+ // The underlying GEMM kernel assumes that k is a multiple of 8 and
+ // subtle breakage occurs if this is violated.
+ block_size = 8 * divup<Index>(k - start, 8 * blocks_left);
+ Scalar* buf;
+ if (start == 0) {
+ buf = result;
+ } else {
+ buf = static_cast<Scalar*>(
+ this->m_device.allocate(m * n * sizeof(Scalar)));
+ block_buffers.push_back(buf);
+ }
+ Index end = start + block_size;
+ if (end > k) {
+ end = k;
+ }
+ this->m_device.enqueueNoNotification(
+ [=, &process_block]() { process_block(buf, start, end); });
+ start = end;
+ }
+ barrier.Wait();
+
+ // Add other partial results into first partial result.
+ for (const auto& buf : block_buffers) {
+ addToBuffer<Alignment>(m * n, buf, result);
+ this->m_device.deallocate(buf);
+ }
+ }
+
+ TensorOpCost contractionCostPerInnerDim(Index m, Index n, Index k) const {
+ // Compute cost.
+ const int output_packet_size = internal::unpacket_traits<PacketReturnType>::size;
+ TensorOpCost cost(0, 0, (computeBandwidth(true, m, n, k) * m) * n);
+ // Output stores.
+ cost += TensorOpCost(0, sizeof(CoeffReturnType), 0, true, output_packet_size);
+ TensorOpCost lhsCost = this->m_leftImpl.costPerCoeff(true) * m;
+ TensorOpCost rhsCost = this->m_rightImpl.costPerCoeff(true) * n;
+ // Since the inner gemm kernel is always sharded by column, the lhs
+ // load cost is negligible.
+ lhsCost.dropMemoryCost();
+ return cost + lhsCost + rhsCost;
+ }
+
+ int numThreadsInnerDim(Index m, Index n, Index k) const {
+ const int output_packet_size = internal::unpacket_traits<PacketReturnType>::size;
+ TensorOpCost cost = contractionCostPerInnerDim(m, n, k);
+ double total_parallel_cost =
+ TensorCostModel<ThreadPoolDevice>::totalCost(k, cost);
+ // Cost of reduction step accumulating the m*n per-thread buffers into the
+ // result.
+ double reduction_cost = TensorCostModel<ThreadPoolDevice>::totalCost(
+ m * n, TensorOpCost(2, 1, 1, true, output_packet_size));
+ Index num_threads = 1;
+ double min_cost = total_parallel_cost;
+ double kPerThreadOverHead = 4000;
+ double kFixedOverHead = 100000;
+ for (int nt = 2; nt <= this->m_device.numThreads(); nt++) {
+ double sequential_cost =
+ kFixedOverHead + nt * (reduction_cost + kPerThreadOverHead);
+ double parallel_cost = total_parallel_cost / nt + sequential_cost;
+ if (parallel_cost < min_cost) {
+ num_threads = nt;
+ min_cost = parallel_cost;
+ }
+ }
+ return num_threads;
+ }
+
+
+ double computeBandwidth(bool shard_by_col, Index bm, Index bn,
+ Index bk) const {
+ // Peak VFMA bandwidth is 0.5. However if we have not enough data for
+ // vectorization bandwidth drops. The 4.0 and 2.0 bandwidth is determined
+ // experimentally.
+ double computeBandwidth =
+ bk == 1 ? 4.0
+ : (shard_by_col ? bn : bm) < Traits::nr ||
+ (shard_by_col ? bm : bn) < Traits::mr
+ ? 2.0
+ : 0.5;
+#ifndef EIGEN_VECTORIZE_FMA
+ // Bandwidth of all of VFMA/MULPS/ADDPS is 0.5 on latest Intel processors.
+ // However for MULPS/ADDPS we have dependent sequence of 2 such
+ // instructions,
+ // so overall bandwidth is 1.0.
+ if (computeBandwidth == 0.5) computeBandwidth = 1.0;
+#endif
+ return computeBandwidth;
+ }
+
#if defined(EIGEN_VECTORIZE_AVX) && defined(EIGEN_USE_LIBXSMM)
// TODO(ezhulenev): Add support for output kernels and LIBXSMM.
static_assert(std::is_same<OutputKernelType, const NoOpOutputKernel>::value,