diff options
Diffstat (limited to 'unsupported/Eigen/CXX11/src')
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h | 6 | ||||
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorScan.h | 329 |
2 files changed, 255 insertions, 80 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index cee46634c..e524b535a 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -190,9 +190,11 @@ struct ThreadPoolDevice { void parallelFor(Index n, const TensorOpCost& cost, std::function<Index(Index)> block_align, std::function<void(Index, Index)> f) const { + if (EIGEN_PREDICT_FALSE(n <= 0)){ + return; // Compute small problems directly in the caller thread. - if (n <= 1 || numThreads() == 1 || - CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) { + } else if (n == 1 || numThreads() == 1 || + CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) { f(0, n); return; } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h b/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h index ee465dd0f..833a3200c 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h @@ -77,8 +77,256 @@ protected: const bool m_exclusive; }; -template <typename Self, typename Reducer, typename Device> -struct ScanLauncher; +template <typename Self> +inline void ReduceScalar(Self& self, Index offset, + typename Self::CoeffReturnType* data) { + // Compute the scan along the axis, starting at the given offset + typename Self::CoeffReturnType accum = self.accumulator().initialize(); + if (self.stride() == 1) { + if (self.exclusive()) { + for (Index curr = offset; curr < offset + self.size(); ++curr) { + data[curr] = self.accumulator().finalize(accum); + self.accumulator().reduce(self.inner().coeff(curr), &accum); + } + } else { + for (Index curr = offset; curr < offset + self.size(); ++curr) { + self.accumulator().reduce(self.inner().coeff(curr), &accum); + data[curr] = self.accumulator().finalize(accum); + } + } + } else { + if (self.exclusive()) { + for (Index idx3 = 0; idx3 < self.size(); idx3++) { + Index curr = offset + idx3 * self.stride(); + data[curr] = self.accumulator().finalize(accum); + self.accumulator().reduce(self.inner().coeff(curr), &accum); + } + } else { + for (Index idx3 = 0; idx3 < self.size(); idx3++) { + Index curr = offset + idx3 * self.stride(); + self.accumulator().reduce(self.inner().coeff(curr), &accum); + data[curr] = self.accumulator().finalize(accum); + } + } + } +} + +template <typename Self> +inline void ReducePacket(Self& self, Index offset, typename Self::CoeffReturnType* data) { + using Scalar = typename Self::CoeffReturnType; + using Packet = typename Self::PacketReturnType; + // Compute the scan along the axis, starting at the calculated offset + Packet accum = self.accumulator().template initializePacket<Packet>(); + if (self.stride() == 1) { + if (self.exclusive()) { + for (Index curr = offset; curr < offset + self.size(); ++curr) { + internal::pstoreu<Scalar, Packet>(data + curr, self.accumulator().finalizePacket(accum)); + self.accumulator().reducePacket(self.inner().template packet<Unaligned>(curr), &accum); + } + } else { + for (Index curr = offset; curr < offset + self.size(); ++curr) { + self.accumulator().reducePacket(self.inner().template packet<Unaligned>(curr), &accum); + internal::pstoreu<Scalar, Packet>(data + curr, self.accumulator().finalizePacket(accum)); + } + } + } else { + if (self.exclusive()) { + for (Index idx3 = 0; idx3 < self.size(); idx3++) { + const Index curr = offset + idx3 * self.stride(); + internal::pstoreu<Scalar, Packet>(data + curr, self.accumulator().finalizePacket(accum)); + self.accumulator().reducePacket(self.inner().template packet<Unaligned>(curr), &accum); + } + } else { + for (Index idx3 = 0; idx3 < self.size(); idx3++) { + const Index curr = offset + idx3 * self.stride(); + self.accumulator().reducePacket(self.inner().template packet<Unaligned>(curr), &accum); + internal::pstoreu<Scalar, Packet>(data + curr, self.accumulator().finalizePacket(accum)); + } + } + } +} + +template <typename Self, bool Vectorized> +struct ReduceBlock { + void operator()(Self& self, Index idx1, typename Self::CoeffReturnType* data) { + for (Index idx2 = 0; idx2 < self.stride(); idx2++) { + // Calculate the starting offset for the scan + Index offset = idx1 + idx2; + ReduceScalar(self, offset, data); + } + } +}; + +// Specialization for vectorized reduction. +template <typename Self> +struct ReduceBlock<Self, true> { + void operator()(Self& self, Index idx1, typename Self::CoeffReturnType* data) { + using Packet = typename Self::PacketReturnType; + const int PacketSize = internal::unpacket_traits<Packet>::size; + Index idx2 = 0; + for (; idx2 + PacketSize <= self.stride(); idx2 += PacketSize) { + // Calculate the starting offset for the packet scan + Index offset = idx1 + idx2; + ReducePacket(self, offset, data); + } + for (; idx2 < self.stride(); idx2++) { + // Calculate the starting offset for the scan + Index offset = idx1 + idx2; + ReduceScalar(self, offset, data); + } + } +}; + +// CPU implementation of scan +template <typename Self, typename Reducer, typename Device, bool Vectorized = + TensorEvaluator<typename Self::ChildTypeNoConst, Device>::PacketAccess && + internal::reducer_traits<Reducer, Device>::PacketAccess> +struct ScanLauncher { + void operator()(Self& self, typename Self::CoeffReturnType *data) { + Index total_size = internal::array_prod(self.dimensions()); + + // We fix the index along the scan axis to 0 and perform a + // scan per remaining entry. The iteration is split into two nested + // loops to avoid an integer division by keeping track of each idx1 and idx2. + for (Index idx1 = 0; idx1 < total_size; idx1 += self.stride() * self.size()) { + ReduceBlock<Self, Vectorized> block_reducer; + block_reducer(self, idx1, data); + } + } +}; + +#ifdef EIGEN_USE_THREADS +// Specialization for multi-threaded, vectorized execution. +template <typename Self, typename Reducer> +struct ScanLauncher<Self, Reducer, ThreadPoolDevice, true> { + void operator()(Self& self, typename Self::CoeffReturnType* data) { + using Scalar = typename Self::CoeffReturnType; + using Packet = typename Self::PacketReturnType; + const int PacketSize = internal::unpacket_traits<Packet>::size; + const Index total_size = internal::array_prod(self.dimensions()); + const Index inner_block_size = self.stride() * self.size(); + const Index num_outer_blocks = total_size / inner_block_size; + // Block alignment used to avoid false sharing of cachelines among threads. + // Currently set to twice the cache line size on Intel and ARM processors. + EIGEN_CONSTEXPR Index kBlockAlignment = 128; + + if ((num_outer_blocks >= self.stride() && total_size <= 4096) || + (num_outer_blocks < self.stride() && self.stride() < PacketSize)) { + ScanLauncher<Self, Reducer, DefaultDevice, true> launcher; + launcher(self, data); + return; + } + + if (num_outer_blocks >= self.stride()) { + // Parallelize over outer blocks. + self.device().parallelFor( + num_outer_blocks, + TensorOpCost(inner_block_size, inner_block_size, + 16 * PacketSize * inner_block_size, true, PacketSize), + // Make the shard size large enough that two neighboring threads won't + // write to the same cacheline of `data`. + [=](Index blk_size) { + const Index inner_blocks_cacheline = + numext::maxi<Index>(1, kBlockAlignment / (inner_block_size * sizeof(Scalar))); + return inner_blocks_cacheline * + divup(blk_size, inner_blocks_cacheline); + }, + [&](Index first, Index last) { + for (Index idx = first; idx < last; ++idx) { + ReduceBlock<Self, true> block_reducer; + block_reducer(self, idx * inner_block_size, data); + } + }); + } else { + // Parallelize over packets/scalars of the dimensions when the reduction + // axis is not an inner dimension. + const Index num_packets = self.stride() / PacketSize; + for (Index idx1 = 0; idx1 < total_size; + idx1 += self.stride() * self.size()) { + self.device().parallelFor( + num_packets, + TensorOpCost(PacketSize * self.size(), PacketSize * self.size(), + 16 * PacketSize * self.size(), true, PacketSize), + // Make the shard size large enough that two neighboring threads + // won't write to the same cacheline of `data`. + [=](Index blk_size) { + const Index packets_per_cacheline = + numext::maxi<Index>(1, kBlockAlignment / (PacketSize * sizeof(Scalar))); + return packets_per_cacheline * + divup(blk_size, packets_per_cacheline); + }, + [&](Index first, Index last) { + for (Index packet = first; packet < last; ++packet) { + const Index idx2 = packet * PacketSize; + ReducePacket(self, idx1 + idx2, data); + } + }); + + const Index num_scalars = self.stride() - num_packets * PacketSize; + self.device().parallelFor( + num_scalars, + TensorOpCost(self.size(), self.size(), 16 * self.size()), + // Make the shard size large enough that two neighboring threads + // won't write to the same cacheline of `data`. + [=](Index blk_size) { + const Index scalars_per_cacheline = + numext::maxi<Index>(1, kBlockAlignment / sizeof(Scalar)); + return scalars_per_cacheline * + divup(blk_size, scalars_per_cacheline); + }, + [&](Index first, Index last) { + for (Index scalar = first; scalar < last; ++scalar) { + const Index idx2 = num_packets * PacketSize + scalar; + ReduceScalar(self, idx1 + idx2, data); + } + }); + } + } + } +}; +#endif // EIGEN_USE_THREADS + +#if defined(EIGEN_USE_GPU) && (defined(EIGEN_GPUCC)) + +// GPU implementation of scan +// TODO(ibab) This placeholder implementation performs multiple scans in +// parallel, but it would be better to use a parallel scan algorithm and +// optimize memory access. +template <typename Self, typename Reducer> +__global__ void ScanKernel(Self self, Index total_size, typename Self::CoeffReturnType* data) { + // Compute offset as in the CPU version + Index val = threadIdx.x + blockIdx.x * blockDim.x; + Index offset = (val / self.stride()) * self.stride() * self.size() + val % self.stride(); + + if (offset + (self.size() - 1) * self.stride() < total_size) { + // Compute the scan along the axis, starting at the calculated offset + typename Self::CoeffReturnType accum = self.accumulator().initialize(); + for (Index idx = 0; idx < self.size(); idx++) { + Index curr = offset + idx * self.stride(); + if (self.exclusive()) { + data[curr] = self.accumulator().finalize(accum); + self.accumulator().reduce(self.inner().coeff(curr), &accum); + } else { + self.accumulator().reduce(self.inner().coeff(curr), &accum); + data[curr] = self.accumulator().finalize(accum); + } + } + } + __syncthreads(); + +} + +template <typename Self, typename Reducer> +struct ScanLauncher<Self, Reducer, GpuDevice, false> { + void operator()(const Self& self, typename Self::CoeffReturnType* data) { + Index total_size = internal::array_prod(self.dimensions()); + Index num_blocks = (total_size / self.size() + 63) / 64; + Index block_size = 64; + + LAUNCH_GPU_KERNEL((ScanKernel<Self, Reducer>), num_blocks, block_size, 0, self.device(), self, total_size, data); + } +}; +#endif // EIGEN_USE_GPU && (EIGEN_GPUCC) // Eval as rvalue template <typename Op, typename ArgType, typename Device> @@ -86,6 +334,7 @@ struct TensorEvaluator<const TensorScanOp<Op, ArgType>, Device> { typedef TensorScanOp<Op, ArgType> XprType; typedef typename XprType::Index Index; + typedef const ArgType ChildTypeNoConst; typedef const ArgType ChildType; static const int NumDims = internal::array_size<typename TensorEvaluator<ArgType, Device>::Dimensions>::value; typedef DSizes<Index, NumDims> Dimensions; @@ -232,82 +481,6 @@ protected: EvaluatorPointerType m_output; }; -// CPU implementation of scan -// TODO(ibab) This single-threaded implementation should be parallelized, -// at least by running multiple scans at the same time. -template <typename Self, typename Reducer, typename Device> -struct ScanLauncher { - void operator()(Self& self, typename Self::CoeffReturnType *data) { - Index total_size = internal::array_prod(self.dimensions()); - - // We fix the index along the scan axis to 0 and perform a - // scan per remaining entry. The iteration is split into two nested - // loops to avoid an integer division by keeping track of each idx1 and idx2. - for (Index idx1 = 0; idx1 < total_size; idx1 += self.stride() * self.size()) { - for (Index idx2 = 0; idx2 < self.stride(); idx2++) { - // Calculate the starting offset for the scan - Index offset = idx1 + idx2; - - // Compute the scan along the axis, starting at the calculated offset - typename Self::CoeffReturnType accum = self.accumulator().initialize(); - for (Index idx3 = 0; idx3 < self.size(); idx3++) { - Index curr = offset + idx3 * self.stride(); - - if (self.exclusive()) { - data[curr] = self.accumulator().finalize(accum); - self.accumulator().reduce(self.inner().coeff(curr), &accum); - } else { - self.accumulator().reduce(self.inner().coeff(curr), &accum); - data[curr] = self.accumulator().finalize(accum); - } - } - } - } - } -}; - -#if defined(EIGEN_USE_GPU) && (defined(EIGEN_GPUCC)) - -// GPU implementation of scan -// TODO(ibab) This placeholder implementation performs multiple scans in -// parallel, but it would be better to use a parallel scan algorithm and -// optimize memory access. -template <typename Self, typename Reducer> -__global__ void ScanKernel(Self self, Index total_size, typename Self::CoeffReturnType* data) { - // Compute offset as in the CPU version - Index val = threadIdx.x + blockIdx.x * blockDim.x; - Index offset = (val / self.stride()) * self.stride() * self.size() + val % self.stride(); - - if (offset + (self.size() - 1) * self.stride() < total_size) { - // Compute the scan along the axis, starting at the calculated offset - typename Self::CoeffReturnType accum = self.accumulator().initialize(); - for (Index idx = 0; idx < self.size(); idx++) { - Index curr = offset + idx * self.stride(); - if (self.exclusive()) { - data[curr] = self.accumulator().finalize(accum); - self.accumulator().reduce(self.inner().coeff(curr), &accum); - } else { - self.accumulator().reduce(self.inner().coeff(curr), &accum); - data[curr] = self.accumulator().finalize(accum); - } - } - } - __syncthreads(); - -} - -template <typename Self, typename Reducer> -struct ScanLauncher<Self, Reducer, GpuDevice> { - void operator()(const Self& self, typename Self::CoeffReturnType* data) { - Index total_size = internal::array_prod(self.dimensions()); - Index num_blocks = (total_size / self.size() + 63) / 64; - Index block_size = 64; - - LAUNCH_GPU_KERNEL((ScanKernel<Self, Reducer>), num_blocks, block_size, 0, self.device(), self, total_size, data); - } -}; -#endif // EIGEN_USE_GPU && (EIGEN_GPUCC) - } // end namespace Eigen #endif // EIGEN_CXX11_TENSOR_TENSOR_SCAN_H |