From 2fd8a5a08fece826d211a6f34d777bb65f6b4562 Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Wed, 6 May 2020 14:48:37 -0700 Subject: Add parallelization of TensorScanOp for types without packet ops. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Clean up the code a bit and do a few micro-optimizations to improve performance for small tensors. Benchmark numbers for Tensor: name old time/op new time/op delta BM_cumSumRowReduction_1T/8 [using 1 threads] 76.5ns ± 0% 61.3ns ± 4% -19.80% (p=0.008 n=5+5) BM_cumSumRowReduction_1T/64 [using 1 threads] 2.47µs ± 1% 2.40µs ± 1% -2.77% (p=0.008 n=5+5) BM_cumSumRowReduction_1T/256 [using 1 threads] 39.8µs ± 0% 39.6µs ± 0% -0.60% (p=0.008 n=5+5) BM_cumSumRowReduction_1T/4k [using 1 threads] 13.9ms ± 0% 13.4ms ± 1% -4.19% (p=0.008 n=5+5) BM_cumSumRowReduction_2T/8 [using 2 threads] 76.8ns ± 0% 59.1ns ± 0% -23.09% (p=0.016 n=5+4) BM_cumSumRowReduction_2T/64 [using 2 threads] 2.47µs ± 1% 2.41µs ± 1% -2.53% (p=0.008 n=5+5) BM_cumSumRowReduction_2T/256 [using 2 threads] 39.8µs ± 0% 34.7µs ± 6% -12.74% (p=0.008 n=5+5) BM_cumSumRowReduction_2T/4k [using 2 threads] 13.8ms ± 1% 7.2ms ± 6% -47.74% (p=0.008 n=5+5) BM_cumSumRowReduction_8T/8 [using 8 threads] 76.4ns ± 0% 61.8ns ± 3% -19.02% (p=0.008 n=5+5) BM_cumSumRowReduction_8T/64 [using 8 threads] 2.47µs ± 1% 2.40µs ± 1% -2.84% (p=0.008 n=5+5) BM_cumSumRowReduction_8T/256 [using 8 threads] 39.8µs ± 0% 28.3µs ±11% -28.75% (p=0.008 n=5+5) BM_cumSumRowReduction_8T/4k [using 8 threads] 13.8ms ± 0% 2.7ms ± 5% -80.39% (p=0.008 n=5+5) BM_cumSumColReduction_1T/8 [using 1 threads] 59.1ns ± 0% 80.3ns ± 0% +35.94% (p=0.029 n=4+4) BM_cumSumColReduction_1T/64 [using 1 threads] 3.06µs ± 0% 3.08µs ± 1% ~ (p=0.114 n=4+4) BM_cumSumColReduction_1T/256 [using 1 threads] 175µs ± 0% 176µs ± 0% ~ (p=0.190 n=4+5) BM_cumSumColReduction_1T/4k [using 1 threads] 824ms ± 1% 844ms ± 1% +2.37% (p=0.008 n=5+5) BM_cumSumColReduction_2T/8 [using 2 threads] 59.0ns ± 0% 90.7ns ± 0% +53.74% (p=0.029 n=4+4) BM_cumSumColReduction_2T/64 [using 2 threads] 3.06µs ± 0% 3.10µs ± 0% +1.08% (p=0.016 n=4+5) BM_cumSumColReduction_2T/256 [using 2 threads] 176µs ± 0% 189µs ±18% ~ (p=0.151 n=5+5) BM_cumSumColReduction_2T/4k [using 2 threads] 836ms ± 2% 611ms ±14% -26.92% (p=0.008 n=5+5) BM_cumSumColReduction_8T/8 [using 8 threads] 59.3ns ± 2% 90.6ns ± 0% +52.79% (p=0.008 n=5+5) BM_cumSumColReduction_8T/64 [using 8 threads] 3.07µs ± 0% 3.10µs ± 0% +0.99% (p=0.016 n=5+4) BM_cumSumColReduction_8T/256 [using 8 threads] 176µs ± 0% 80µs ±19% -54.51% (p=0.008 n=5+5) BM_cumSumColReduction_8T/4k [using 8 threads] 827ms ± 2% 180ms ±14% -78.24% (p=0.008 n=5+5) --- unsupported/Eigen/CXX11/src/Tensor/TensorScan.h | 195 +++++++++++++++--------- 1 file changed, 119 insertions(+), 76 deletions(-) (limited to 'unsupported/Eigen/CXX11') diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h b/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h index 833a3200c..bef8d261f 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h @@ -77,9 +77,12 @@ protected: const bool m_exclusive; }; + +namespace internal { + template -inline void ReduceScalar(Self& self, Index offset, - typename Self::CoeffReturnType* data) { +EIGEN_STRONG_INLINE void ReduceScalar(Self& self, Index offset, + typename Self::CoeffReturnType* data) { // Compute the scan along the axis, starting at the given offset typename Self::CoeffReturnType accum = self.accumulator().initialize(); if (self.stride() == 1) { @@ -112,7 +115,8 @@ inline void ReduceScalar(Self& self, Index offset, } template -inline void ReducePacket(Self& self, Index offset, typename Self::CoeffReturnType* data) { +EIGEN_STRONG_INLINE void ReducePacket(Self& self, Index offset, + typename Self::CoeffReturnType* data) { using Scalar = typename Self::CoeffReturnType; using Packet = typename Self::PacketReturnType; // Compute the scan along the axis, starting at the calculated offset @@ -146,9 +150,10 @@ inline void ReducePacket(Self& self, Index offset, typename Self::CoeffReturnTyp } } -template +template struct ReduceBlock { - void operator()(Self& self, Index idx1, typename Self::CoeffReturnType* data) { + EIGEN_STRONG_INLINE void operator()(Self& self, Index idx1, + typename Self::CoeffReturnType* data) { for (Index idx2 = 0; idx2 < self.stride(); idx2++) { // Calculate the starting offset for the scan Index offset = idx1 + idx2; @@ -159,8 +164,9 @@ struct ReduceBlock { // Specialization for vectorized reduction. template -struct ReduceBlock { - void operator()(Self& self, Index idx1, typename Self::CoeffReturnType* data) { +struct ReduceBlock { + EIGEN_STRONG_INLINE void operator()(Self& self, Index idx1, + typename Self::CoeffReturnType* data) { using Packet = typename Self::PacketReturnType; const int PacketSize = internal::unpacket_traits::size; Index idx2 = 0; @@ -177,109 +183,144 @@ struct ReduceBlock { } }; -// CPU implementation of scan -template ::PacketAccess && - internal::reducer_traits::PacketAccess> +// Single-threaded CPU implementation of scan +template ::PacketAccess && + internal::reducer_traits::PacketAccess)> struct ScanLauncher { - void operator()(Self& self, typename Self::CoeffReturnType *data) { + void operator()(Self& self, typename Self::CoeffReturnType* data) { Index total_size = internal::array_prod(self.dimensions()); // We fix the index along the scan axis to 0 and perform a // scan per remaining entry. The iteration is split into two nested - // loops to avoid an integer division by keeping track of each idx1 and idx2. + // loops to avoid an integer division by keeping track of each idx1 and + // idx2. for (Index idx1 = 0; idx1 < total_size; idx1 += self.stride() * self.size()) { - ReduceBlock block_reducer; + ReduceBlock block_reducer; block_reducer(self, idx1, data); } } }; #ifdef EIGEN_USE_THREADS -// Specialization for multi-threaded, vectorized execution. -template -struct ScanLauncher { + +// Adjust block_size to avoid false sharing of cachelines among +// threads. Currently set to twice the cache line size on Intel and ARM +// processors. +EIGEN_STRONG_INLINE Index AdjustBlockSize(Index item_size, Index block_size) { + EIGEN_CONSTEXPR Index kBlockAlignment = 128; + const Index items_per_cacheline = + numext::maxi(1, kBlockAlignment / item_size); + return items_per_cacheline * divup(block_size, items_per_cacheline); +} + +template +struct ReduceBlock { + EIGEN_STRONG_INLINE void operator()(Self& self, Index idx1, + typename Self::CoeffReturnType* data) { + using Scalar = typename Self::CoeffReturnType; + using Packet = typename Self::PacketReturnType; + const int PacketSize = internal::unpacket_traits::size; + Index num_scalars = self.stride(); + Index num_packets = 0; + if (self.stride() >= PacketSize) { + num_packets = self.stride() / PacketSize; + self.device().parallelFor( + num_packets, + TensorOpCost(PacketSize * self.size(), PacketSize * self.size(), + 16 * PacketSize * self.size(), true, PacketSize), + // Make the shard size large enough that two neighboring threads + // won't write to the same cacheline of `data`. + [=](Index blk_size) { + return AdjustBlockSize(PacketSize * sizeof(Scalar), blk_size); + }, + [&](Index first, Index last) { + for (Index packet = first; packet < last; ++packet) { + const Index idx2 = packet * PacketSize; + ReducePacket(self, idx1 + idx2, data); + } + }); + num_scalars -= num_packets * PacketSize; + } + self.device().parallelFor( + num_scalars, TensorOpCost(self.size(), self.size(), 16 * self.size()), + // Make the shard size large enough that two neighboring threads + // won't write to the same cacheline of `data`. + [=](Index blk_size) { + return AdjustBlockSize(sizeof(Scalar), blk_size); + }, + [&](Index first, Index last) { + for (Index scalar = first; scalar < last; ++scalar) { + const Index idx2 = num_packets * PacketSize + scalar; + ReduceScalar(self, idx1 + idx2, data); + } + }); + } +}; + +template +struct ReduceBlock { + EIGEN_STRONG_INLINE void operator()(Self& self, Index idx1, + typename Self::CoeffReturnType* data) { + using Scalar = typename Self::CoeffReturnType; + self.device().parallelFor( + self.stride(), TensorOpCost(self.size(), self.size(), 16 * self.size()), + // Make the shard size large enough that two neighboring threads + // won't write to the same cacheline of `data`. + [=](Index blk_size) { + return AdjustBlockSize(sizeof(Scalar), blk_size); + }, + [&](Index first, Index last) { + for (Index idx2 = first; idx2 < last; ++idx2) { + ReduceScalar(self, idx1 + idx2, data); + } + }); + } +}; + +// Specialization for multi-threaded execution. +template +struct ScanLauncher { void operator()(Self& self, typename Self::CoeffReturnType* data) { using Scalar = typename Self::CoeffReturnType; using Packet = typename Self::PacketReturnType; const int PacketSize = internal::unpacket_traits::size; const Index total_size = internal::array_prod(self.dimensions()); const Index inner_block_size = self.stride() * self.size(); - const Index num_outer_blocks = total_size / inner_block_size; - // Block alignment used to avoid false sharing of cachelines among threads. - // Currently set to twice the cache line size on Intel and ARM processors. - EIGEN_CONSTEXPR Index kBlockAlignment = 128; - - if ((num_outer_blocks >= self.stride() && total_size <= 4096) || - (num_outer_blocks < self.stride() && self.stride() < PacketSize)) { - ScanLauncher launcher; + bool parallelize_by_outer_blocks = (total_size >= (self.stride() * inner_block_size)); + + if ((parallelize_by_outer_blocks && total_size <= 4096) || + (!parallelize_by_outer_blocks && self.stride() < PacketSize)) { + ScanLauncher launcher; launcher(self, data); return; } - if (num_outer_blocks >= self.stride()) { + if (parallelize_by_outer_blocks) { // Parallelize over outer blocks. + const Index num_outer_blocks = total_size / inner_block_size; self.device().parallelFor( num_outer_blocks, TensorOpCost(inner_block_size, inner_block_size, - 16 * PacketSize * inner_block_size, true, PacketSize), - // Make the shard size large enough that two neighboring threads won't - // write to the same cacheline of `data`. + 16 * PacketSize * inner_block_size, Vectorize, + PacketSize), [=](Index blk_size) { - const Index inner_blocks_cacheline = - numext::maxi(1, kBlockAlignment / (inner_block_size * sizeof(Scalar))); - return inner_blocks_cacheline * - divup(blk_size, inner_blocks_cacheline); + return AdjustBlockSize(inner_block_size * sizeof(Scalar), blk_size); }, [&](Index first, Index last) { - for (Index idx = first; idx < last; ++idx) { - ReduceBlock block_reducer; - block_reducer(self, idx * inner_block_size, data); + for (Index idx1 = first; idx1 < last; ++idx1) { + ReduceBlock block_reducer; + block_reducer(self, idx1 * inner_block_size, data); } }); } else { - // Parallelize over packets/scalars of the dimensions when the reduction + // Parallelize over inner packets/scalars dimensions when the reduction // axis is not an inner dimension. - const Index num_packets = self.stride() / PacketSize; + ReduceBlock block_reducer; for (Index idx1 = 0; idx1 < total_size; idx1 += self.stride() * self.size()) { - self.device().parallelFor( - num_packets, - TensorOpCost(PacketSize * self.size(), PacketSize * self.size(), - 16 * PacketSize * self.size(), true, PacketSize), - // Make the shard size large enough that two neighboring threads - // won't write to the same cacheline of `data`. - [=](Index blk_size) { - const Index packets_per_cacheline = - numext::maxi(1, kBlockAlignment / (PacketSize * sizeof(Scalar))); - return packets_per_cacheline * - divup(blk_size, packets_per_cacheline); - }, - [&](Index first, Index last) { - for (Index packet = first; packet < last; ++packet) { - const Index idx2 = packet * PacketSize; - ReducePacket(self, idx1 + idx2, data); - } - }); - - const Index num_scalars = self.stride() - num_packets * PacketSize; - self.device().parallelFor( - num_scalars, - TensorOpCost(self.size(), self.size(), 16 * self.size()), - // Make the shard size large enough that two neighboring threads - // won't write to the same cacheline of `data`. - [=](Index blk_size) { - const Index scalars_per_cacheline = - numext::maxi(1, kBlockAlignment / sizeof(Scalar)); - return scalars_per_cacheline * - divup(blk_size, scalars_per_cacheline); - }, - [&](Index first, Index last) { - for (Index scalar = first; scalar < last; ++scalar) { - const Index idx2 = num_packets * PacketSize + scalar; - ReduceScalar(self, idx1 + idx2, data); - } - }); + block_reducer(self, idx1, data); } } } @@ -328,6 +369,8 @@ struct ScanLauncher { }; #endif // EIGEN_USE_GPU && (EIGEN_GPUCC) +} // namespace internal + // Eval as rvalue template struct TensorEvaluator, Device> { @@ -424,7 +467,7 @@ struct TensorEvaluator, Device> { EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(EvaluatorPointerType data) { m_impl.evalSubExprsIfNeeded(NULL); - ScanLauncher launcher; + internal::ScanLauncher launcher; if (data) { launcher(*this, data); return false; -- cgit v1.2.3