From 7b76c85daf293f903682f26733918b7dbfde740a Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Tue, 5 May 2020 00:19:43 +0000 Subject: Vectorize and parallelize TensorScanOp. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TensorScanOp is used in TensorFlow for a number of operations, such as cumulative logexp reduction and cumulative sum and product reductions. The benchmarks numbers below are for cumulative row- and column reductions of NxN matrices. name old time/op new time/op delta BM_cumSumRowReduction_1T/4 [using 1 threads ] 25.1ns ± 1% 35.2ns ± 1% +40.45% BM_cumSumRowReduction_1T/8 [using 1 threads ] 73.4ns ± 0% 82.7ns ± 3% +12.74% BM_cumSumRowReduction_1T/32 [using 1 threads ] 988ns ± 0% 832ns ± 0% -15.77% BM_cumSumRowReduction_1T/64 [using 1 threads ] 4.07µs ± 2% 3.47µs ± 0% -14.70% BM_cumSumRowReduction_1T/128 [using 1 threads ] 18.0µs ± 0% 16.8µs ± 0% -6.58% BM_cumSumRowReduction_1T/512 [using 1 threads ] 287µs ± 0% 281µs ± 0% -2.22% BM_cumSumRowReduction_1T/2k [using 1 threads ] 4.78ms ± 1% 4.78ms ± 2% ~ BM_cumSumRowReduction_1T/10k [using 1 threads ] 117ms ± 1% 117ms ± 1% ~ BM_cumSumRowReduction_8T/4 [using 8 threads ] 25.0ns ± 0% 35.2ns ± 0% +40.82% BM_cumSumRowReduction_8T/8 [using 8 threads ] 77.2ns ±16% 81.3ns ± 0% ~ BM_cumSumRowReduction_8T/32 [using 8 threads ] 988ns ± 0% 833ns ± 0% -15.67% BM_cumSumRowReduction_8T/64 [using 8 threads ] 4.08µs ± 2% 3.47µs ± 0% -14.95% BM_cumSumRowReduction_8T/128 [using 8 threads ] 18.0µs ± 0% 17.3µs ±10% ~ BM_cumSumRowReduction_8T/512 [using 8 threads ] 287µs ± 0% 58µs ± 6% -79.92% BM_cumSumRowReduction_8T/2k [using 8 threads ] 4.79ms ± 1% 0.64ms ± 1% -86.58% BM_cumSumRowReduction_8T/10k [using 8 threads ] 117ms ± 1% 18ms ± 6% -84.50% BM_cumSumColReduction_1T/4 [using 1 threads ] 23.9ns ± 0% 33.4ns ± 1% +39.68% BM_cumSumColReduction_1T/8 [using 1 threads ] 71.6ns ± 1% 49.1ns ± 3% -31.40% BM_cumSumColReduction_1T/32 [using 1 threads ] 973ns ± 0% 165ns ± 2% -83.10% BM_cumSumColReduction_1T/64 [using 1 threads ] 4.06µs ± 1% 0.57µs ± 1% -85.94% BM_cumSumColReduction_1T/128 [using 1 threads ] 33.4µs ± 1% 4.1µs ± 1% -87.67% BM_cumSumColReduction_1T/512 [using 1 threads ] 1.72ms ± 4% 0.21ms ± 5% -87.91% BM_cumSumColReduction_1T/2k [using 1 threads ] 119ms ±53% 11ms ±35% -90.42% BM_cumSumColReduction_1T/10k [using 1 threads ] 1.59s ±67% 0.35s ±49% -77.96% BM_cumSumColReduction_8T/4 [using 8 threads ] 23.8ns ± 0% 33.3ns ± 0% +40.06% BM_cumSumColReduction_8T/8 [using 8 threads ] 71.6ns ± 1% 49.2ns ± 5% -31.33% BM_cumSumColReduction_8T/32 [using 8 threads ] 1.01µs ±12% 0.17µs ± 3% -82.93% BM_cumSumColReduction_8T/64 [using 8 threads ] 4.15µs ± 4% 0.58µs ± 1% -86.09% BM_cumSumColReduction_8T/128 [using 8 threads ] 33.5µs ± 0% 4.1µs ± 4% -87.65% BM_cumSumColReduction_8T/512 [using 8 threads ] 1.71ms ± 3% 0.06ms ±16% -96.21% BM_cumSumColReduction_8T/2k [using 8 threads ] 97.1ms ±14% 3.0ms ±23% -96.88% BM_cumSumColReduction_8T/10k [using 8 threads ] 1.97s ± 8% 0.06s ± 2% -96.74% --- .../CXX11/src/Tensor/TensorDeviceThreadPool.h | 6 +- unsupported/Eigen/CXX11/src/Tensor/TensorScan.h | 329 ++++++++++++++++----- 2 files changed, 255 insertions(+), 80 deletions(-) (limited to 'unsupported/Eigen/CXX11') diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index cee46634c..e524b535a 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -190,9 +190,11 @@ struct ThreadPoolDevice { void parallelFor(Index n, const TensorOpCost& cost, std::function block_align, std::function f) const { + if (EIGEN_PREDICT_FALSE(n <= 0)){ + return; // Compute small problems directly in the caller thread. - if (n <= 1 || numThreads() == 1 || - CostModel::numThreads(n, cost, static_cast(numThreads())) == 1) { + } else if (n == 1 || numThreads() == 1 || + CostModel::numThreads(n, cost, static_cast(numThreads())) == 1) { f(0, n); return; } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h b/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h index ee465dd0f..833a3200c 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h @@ -77,8 +77,256 @@ protected: const bool m_exclusive; }; -template -struct ScanLauncher; +template +inline void ReduceScalar(Self& self, Index offset, + typename Self::CoeffReturnType* data) { + // Compute the scan along the axis, starting at the given offset + typename Self::CoeffReturnType accum = self.accumulator().initialize(); + if (self.stride() == 1) { + if (self.exclusive()) { + for (Index curr = offset; curr < offset + self.size(); ++curr) { + data[curr] = self.accumulator().finalize(accum); + self.accumulator().reduce(self.inner().coeff(curr), &accum); + } + } else { + for (Index curr = offset; curr < offset + self.size(); ++curr) { + self.accumulator().reduce(self.inner().coeff(curr), &accum); + data[curr] = self.accumulator().finalize(accum); + } + } + } else { + if (self.exclusive()) { + for (Index idx3 = 0; idx3 < self.size(); idx3++) { + Index curr = offset + idx3 * self.stride(); + data[curr] = self.accumulator().finalize(accum); + self.accumulator().reduce(self.inner().coeff(curr), &accum); + } + } else { + for (Index idx3 = 0; idx3 < self.size(); idx3++) { + Index curr = offset + idx3 * self.stride(); + self.accumulator().reduce(self.inner().coeff(curr), &accum); + data[curr] = self.accumulator().finalize(accum); + } + } + } +} + +template +inline void ReducePacket(Self& self, Index offset, typename Self::CoeffReturnType* data) { + using Scalar = typename Self::CoeffReturnType; + using Packet = typename Self::PacketReturnType; + // Compute the scan along the axis, starting at the calculated offset + Packet accum = self.accumulator().template initializePacket(); + if (self.stride() == 1) { + if (self.exclusive()) { + for (Index curr = offset; curr < offset + self.size(); ++curr) { + internal::pstoreu(data + curr, self.accumulator().finalizePacket(accum)); + self.accumulator().reducePacket(self.inner().template packet(curr), &accum); + } + } else { + for (Index curr = offset; curr < offset + self.size(); ++curr) { + self.accumulator().reducePacket(self.inner().template packet(curr), &accum); + internal::pstoreu(data + curr, self.accumulator().finalizePacket(accum)); + } + } + } else { + if (self.exclusive()) { + for (Index idx3 = 0; idx3 < self.size(); idx3++) { + const Index curr = offset + idx3 * self.stride(); + internal::pstoreu(data + curr, self.accumulator().finalizePacket(accum)); + self.accumulator().reducePacket(self.inner().template packet(curr), &accum); + } + } else { + for (Index idx3 = 0; idx3 < self.size(); idx3++) { + const Index curr = offset + idx3 * self.stride(); + self.accumulator().reducePacket(self.inner().template packet(curr), &accum); + internal::pstoreu(data + curr, self.accumulator().finalizePacket(accum)); + } + } + } +} + +template +struct ReduceBlock { + void operator()(Self& self, Index idx1, typename Self::CoeffReturnType* data) { + for (Index idx2 = 0; idx2 < self.stride(); idx2++) { + // Calculate the starting offset for the scan + Index offset = idx1 + idx2; + ReduceScalar(self, offset, data); + } + } +}; + +// Specialization for vectorized reduction. +template +struct ReduceBlock { + void operator()(Self& self, Index idx1, typename Self::CoeffReturnType* data) { + using Packet = typename Self::PacketReturnType; + const int PacketSize = internal::unpacket_traits::size; + Index idx2 = 0; + for (; idx2 + PacketSize <= self.stride(); idx2 += PacketSize) { + // Calculate the starting offset for the packet scan + Index offset = idx1 + idx2; + ReducePacket(self, offset, data); + } + for (; idx2 < self.stride(); idx2++) { + // Calculate the starting offset for the scan + Index offset = idx1 + idx2; + ReduceScalar(self, offset, data); + } + } +}; + +// CPU implementation of scan +template ::PacketAccess && + internal::reducer_traits::PacketAccess> +struct ScanLauncher { + void operator()(Self& self, typename Self::CoeffReturnType *data) { + Index total_size = internal::array_prod(self.dimensions()); + + // We fix the index along the scan axis to 0 and perform a + // scan per remaining entry. The iteration is split into two nested + // loops to avoid an integer division by keeping track of each idx1 and idx2. + for (Index idx1 = 0; idx1 < total_size; idx1 += self.stride() * self.size()) { + ReduceBlock block_reducer; + block_reducer(self, idx1, data); + } + } +}; + +#ifdef EIGEN_USE_THREADS +// Specialization for multi-threaded, vectorized execution. +template +struct ScanLauncher { + void operator()(Self& self, typename Self::CoeffReturnType* data) { + using Scalar = typename Self::CoeffReturnType; + using Packet = typename Self::PacketReturnType; + const int PacketSize = internal::unpacket_traits::size; + const Index total_size = internal::array_prod(self.dimensions()); + const Index inner_block_size = self.stride() * self.size(); + const Index num_outer_blocks = total_size / inner_block_size; + // Block alignment used to avoid false sharing of cachelines among threads. + // Currently set to twice the cache line size on Intel and ARM processors. + EIGEN_CONSTEXPR Index kBlockAlignment = 128; + + if ((num_outer_blocks >= self.stride() && total_size <= 4096) || + (num_outer_blocks < self.stride() && self.stride() < PacketSize)) { + ScanLauncher launcher; + launcher(self, data); + return; + } + + if (num_outer_blocks >= self.stride()) { + // Parallelize over outer blocks. + self.device().parallelFor( + num_outer_blocks, + TensorOpCost(inner_block_size, inner_block_size, + 16 * PacketSize * inner_block_size, true, PacketSize), + // Make the shard size large enough that two neighboring threads won't + // write to the same cacheline of `data`. + [=](Index blk_size) { + const Index inner_blocks_cacheline = + numext::maxi(1, kBlockAlignment / (inner_block_size * sizeof(Scalar))); + return inner_blocks_cacheline * + divup(blk_size, inner_blocks_cacheline); + }, + [&](Index first, Index last) { + for (Index idx = first; idx < last; ++idx) { + ReduceBlock block_reducer; + block_reducer(self, idx * inner_block_size, data); + } + }); + } else { + // Parallelize over packets/scalars of the dimensions when the reduction + // axis is not an inner dimension. + const Index num_packets = self.stride() / PacketSize; + for (Index idx1 = 0; idx1 < total_size; + idx1 += self.stride() * self.size()) { + self.device().parallelFor( + num_packets, + TensorOpCost(PacketSize * self.size(), PacketSize * self.size(), + 16 * PacketSize * self.size(), true, PacketSize), + // Make the shard size large enough that two neighboring threads + // won't write to the same cacheline of `data`. + [=](Index blk_size) { + const Index packets_per_cacheline = + numext::maxi(1, kBlockAlignment / (PacketSize * sizeof(Scalar))); + return packets_per_cacheline * + divup(blk_size, packets_per_cacheline); + }, + [&](Index first, Index last) { + for (Index packet = first; packet < last; ++packet) { + const Index idx2 = packet * PacketSize; + ReducePacket(self, idx1 + idx2, data); + } + }); + + const Index num_scalars = self.stride() - num_packets * PacketSize; + self.device().parallelFor( + num_scalars, + TensorOpCost(self.size(), self.size(), 16 * self.size()), + // Make the shard size large enough that two neighboring threads + // won't write to the same cacheline of `data`. + [=](Index blk_size) { + const Index scalars_per_cacheline = + numext::maxi(1, kBlockAlignment / sizeof(Scalar)); + return scalars_per_cacheline * + divup(blk_size, scalars_per_cacheline); + }, + [&](Index first, Index last) { + for (Index scalar = first; scalar < last; ++scalar) { + const Index idx2 = num_packets * PacketSize + scalar; + ReduceScalar(self, idx1 + idx2, data); + } + }); + } + } + } +}; +#endif // EIGEN_USE_THREADS + +#if defined(EIGEN_USE_GPU) && (defined(EIGEN_GPUCC)) + +// GPU implementation of scan +// TODO(ibab) This placeholder implementation performs multiple scans in +// parallel, but it would be better to use a parallel scan algorithm and +// optimize memory access. +template +__global__ void ScanKernel(Self self, Index total_size, typename Self::CoeffReturnType* data) { + // Compute offset as in the CPU version + Index val = threadIdx.x + blockIdx.x * blockDim.x; + Index offset = (val / self.stride()) * self.stride() * self.size() + val % self.stride(); + + if (offset + (self.size() - 1) * self.stride() < total_size) { + // Compute the scan along the axis, starting at the calculated offset + typename Self::CoeffReturnType accum = self.accumulator().initialize(); + for (Index idx = 0; idx < self.size(); idx++) { + Index curr = offset + idx * self.stride(); + if (self.exclusive()) { + data[curr] = self.accumulator().finalize(accum); + self.accumulator().reduce(self.inner().coeff(curr), &accum); + } else { + self.accumulator().reduce(self.inner().coeff(curr), &accum); + data[curr] = self.accumulator().finalize(accum); + } + } + } + __syncthreads(); + +} + +template +struct ScanLauncher { + void operator()(const Self& self, typename Self::CoeffReturnType* data) { + Index total_size = internal::array_prod(self.dimensions()); + Index num_blocks = (total_size / self.size() + 63) / 64; + Index block_size = 64; + + LAUNCH_GPU_KERNEL((ScanKernel), num_blocks, block_size, 0, self.device(), self, total_size, data); + } +}; +#endif // EIGEN_USE_GPU && (EIGEN_GPUCC) // Eval as rvalue template @@ -86,6 +334,7 @@ struct TensorEvaluator, Device> { typedef TensorScanOp XprType; typedef typename XprType::Index Index; + typedef const ArgType ChildTypeNoConst; typedef const ArgType ChildType; static const int NumDims = internal::array_size::Dimensions>::value; typedef DSizes Dimensions; @@ -232,82 +481,6 @@ protected: EvaluatorPointerType m_output; }; -// CPU implementation of scan -// TODO(ibab) This single-threaded implementation should be parallelized, -// at least by running multiple scans at the same time. -template -struct ScanLauncher { - void operator()(Self& self, typename Self::CoeffReturnType *data) { - Index total_size = internal::array_prod(self.dimensions()); - - // We fix the index along the scan axis to 0 and perform a - // scan per remaining entry. The iteration is split into two nested - // loops to avoid an integer division by keeping track of each idx1 and idx2. - for (Index idx1 = 0; idx1 < total_size; idx1 += self.stride() * self.size()) { - for (Index idx2 = 0; idx2 < self.stride(); idx2++) { - // Calculate the starting offset for the scan - Index offset = idx1 + idx2; - - // Compute the scan along the axis, starting at the calculated offset - typename Self::CoeffReturnType accum = self.accumulator().initialize(); - for (Index idx3 = 0; idx3 < self.size(); idx3++) { - Index curr = offset + idx3 * self.stride(); - - if (self.exclusive()) { - data[curr] = self.accumulator().finalize(accum); - self.accumulator().reduce(self.inner().coeff(curr), &accum); - } else { - self.accumulator().reduce(self.inner().coeff(curr), &accum); - data[curr] = self.accumulator().finalize(accum); - } - } - } - } - } -}; - -#if defined(EIGEN_USE_GPU) && (defined(EIGEN_GPUCC)) - -// GPU implementation of scan -// TODO(ibab) This placeholder implementation performs multiple scans in -// parallel, but it would be better to use a parallel scan algorithm and -// optimize memory access. -template -__global__ void ScanKernel(Self self, Index total_size, typename Self::CoeffReturnType* data) { - // Compute offset as in the CPU version - Index val = threadIdx.x + blockIdx.x * blockDim.x; - Index offset = (val / self.stride()) * self.stride() * self.size() + val % self.stride(); - - if (offset + (self.size() - 1) * self.stride() < total_size) { - // Compute the scan along the axis, starting at the calculated offset - typename Self::CoeffReturnType accum = self.accumulator().initialize(); - for (Index idx = 0; idx < self.size(); idx++) { - Index curr = offset + idx * self.stride(); - if (self.exclusive()) { - data[curr] = self.accumulator().finalize(accum); - self.accumulator().reduce(self.inner().coeff(curr), &accum); - } else { - self.accumulator().reduce(self.inner().coeff(curr), &accum); - data[curr] = self.accumulator().finalize(accum); - } - } - } - __syncthreads(); - -} - -template -struct ScanLauncher { - void operator()(const Self& self, typename Self::CoeffReturnType* data) { - Index total_size = internal::array_prod(self.dimensions()); - Index num_blocks = (total_size / self.size() + 63) / 64; - Index block_size = 64; - - LAUNCH_GPU_KERNEL((ScanKernel), num_blocks, block_size, 0, self.device(), self, total_size, data); - } -}; -#endif // EIGEN_USE_GPU && (EIGEN_GPUCC) - } // end namespace Eigen #endif // EIGEN_CXX11_TENSOR_TENSOR_SCAN_H -- cgit v1.2.3