aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported/Eigen/CXX11
diff options
context:
space:
mode:
Diffstat (limited to 'unsupported/Eigen/CXX11')
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorScan.h195
1 files changed, 119 insertions, 76 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h b/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h
index 833a3200c..bef8d261f 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorScan.h
@@ -77,9 +77,12 @@ protected:
const bool m_exclusive;
};
+
+namespace internal {
+
template <typename Self>
-inline void ReduceScalar(Self& self, Index offset,
- typename Self::CoeffReturnType* data) {
+EIGEN_STRONG_INLINE void ReduceScalar(Self& self, Index offset,
+ typename Self::CoeffReturnType* data) {
// Compute the scan along the axis, starting at the given offset
typename Self::CoeffReturnType accum = self.accumulator().initialize();
if (self.stride() == 1) {
@@ -112,7 +115,8 @@ inline void ReduceScalar(Self& self, Index offset,
}
template <typename Self>
-inline void ReducePacket(Self& self, Index offset, typename Self::CoeffReturnType* data) {
+EIGEN_STRONG_INLINE void ReducePacket(Self& self, Index offset,
+ typename Self::CoeffReturnType* data) {
using Scalar = typename Self::CoeffReturnType;
using Packet = typename Self::PacketReturnType;
// Compute the scan along the axis, starting at the calculated offset
@@ -146,9 +150,10 @@ inline void ReducePacket(Self& self, Index offset, typename Self::CoeffReturnTyp
}
}
-template <typename Self, bool Vectorized>
+template <typename Self, bool Vectorize, bool Parallel>
struct ReduceBlock {
- void operator()(Self& self, Index idx1, typename Self::CoeffReturnType* data) {
+ EIGEN_STRONG_INLINE void operator()(Self& self, Index idx1,
+ typename Self::CoeffReturnType* data) {
for (Index idx2 = 0; idx2 < self.stride(); idx2++) {
// Calculate the starting offset for the scan
Index offset = idx1 + idx2;
@@ -159,8 +164,9 @@ struct ReduceBlock {
// Specialization for vectorized reduction.
template <typename Self>
-struct ReduceBlock<Self, true> {
- void operator()(Self& self, Index idx1, typename Self::CoeffReturnType* data) {
+struct ReduceBlock<Self, /*Vectorize=*/true, /*Parallel=*/false> {
+ EIGEN_STRONG_INLINE void operator()(Self& self, Index idx1,
+ typename Self::CoeffReturnType* data) {
using Packet = typename Self::PacketReturnType;
const int PacketSize = internal::unpacket_traits<Packet>::size;
Index idx2 = 0;
@@ -177,109 +183,144 @@ struct ReduceBlock<Self, true> {
}
};
-// CPU implementation of scan
-template <typename Self, typename Reducer, typename Device, bool Vectorized =
- TensorEvaluator<typename Self::ChildTypeNoConst, Device>::PacketAccess &&
- internal::reducer_traits<Reducer, Device>::PacketAccess>
+// Single-threaded CPU implementation of scan
+template <typename Self, typename Reducer, typename Device,
+ bool Vectorize =
+ (TensorEvaluator<typename Self::ChildTypeNoConst, Device>::PacketAccess &&
+ internal::reducer_traits<Reducer, Device>::PacketAccess)>
struct ScanLauncher {
- void operator()(Self& self, typename Self::CoeffReturnType *data) {
+ void operator()(Self& self, typename Self::CoeffReturnType* data) {
Index total_size = internal::array_prod(self.dimensions());
// We fix the index along the scan axis to 0 and perform a
// scan per remaining entry. The iteration is split into two nested
- // loops to avoid an integer division by keeping track of each idx1 and idx2.
+ // loops to avoid an integer division by keeping track of each idx1 and
+ // idx2.
for (Index idx1 = 0; idx1 < total_size; idx1 += self.stride() * self.size()) {
- ReduceBlock<Self, Vectorized> block_reducer;
+ ReduceBlock<Self, Vectorize, /*Parallel=*/false> block_reducer;
block_reducer(self, idx1, data);
}
}
};
#ifdef EIGEN_USE_THREADS
-// Specialization for multi-threaded, vectorized execution.
-template <typename Self, typename Reducer>
-struct ScanLauncher<Self, Reducer, ThreadPoolDevice, true> {
+
+// Adjust block_size to avoid false sharing of cachelines among
+// threads. Currently set to twice the cache line size on Intel and ARM
+// processors.
+EIGEN_STRONG_INLINE Index AdjustBlockSize(Index item_size, Index block_size) {
+ EIGEN_CONSTEXPR Index kBlockAlignment = 128;
+ const Index items_per_cacheline =
+ numext::maxi<Index>(1, kBlockAlignment / item_size);
+ return items_per_cacheline * divup(block_size, items_per_cacheline);
+}
+
+template <typename Self>
+struct ReduceBlock<Self, /*Vectorize=*/true, /*Parallel=*/true> {
+ EIGEN_STRONG_INLINE void operator()(Self& self, Index idx1,
+ typename Self::CoeffReturnType* data) {
+ using Scalar = typename Self::CoeffReturnType;
+ using Packet = typename Self::PacketReturnType;
+ const int PacketSize = internal::unpacket_traits<Packet>::size;
+ Index num_scalars = self.stride();
+ Index num_packets = 0;
+ if (self.stride() >= PacketSize) {
+ num_packets = self.stride() / PacketSize;
+ self.device().parallelFor(
+ num_packets,
+ TensorOpCost(PacketSize * self.size(), PacketSize * self.size(),
+ 16 * PacketSize * self.size(), true, PacketSize),
+ // Make the shard size large enough that two neighboring threads
+ // won't write to the same cacheline of `data`.
+ [=](Index blk_size) {
+ return AdjustBlockSize(PacketSize * sizeof(Scalar), blk_size);
+ },
+ [&](Index first, Index last) {
+ for (Index packet = first; packet < last; ++packet) {
+ const Index idx2 = packet * PacketSize;
+ ReducePacket(self, idx1 + idx2, data);
+ }
+ });
+ num_scalars -= num_packets * PacketSize;
+ }
+ self.device().parallelFor(
+ num_scalars, TensorOpCost(self.size(), self.size(), 16 * self.size()),
+ // Make the shard size large enough that two neighboring threads
+ // won't write to the same cacheline of `data`.
+ [=](Index blk_size) {
+ return AdjustBlockSize(sizeof(Scalar), blk_size);
+ },
+ [&](Index first, Index last) {
+ for (Index scalar = first; scalar < last; ++scalar) {
+ const Index idx2 = num_packets * PacketSize + scalar;
+ ReduceScalar(self, idx1 + idx2, data);
+ }
+ });
+ }
+};
+
+template <typename Self>
+struct ReduceBlock<Self, /*Vectorize=*/false, /*Parallel=*/true> {
+ EIGEN_STRONG_INLINE void operator()(Self& self, Index idx1,
+ typename Self::CoeffReturnType* data) {
+ using Scalar = typename Self::CoeffReturnType;
+ self.device().parallelFor(
+ self.stride(), TensorOpCost(self.size(), self.size(), 16 * self.size()),
+ // Make the shard size large enough that two neighboring threads
+ // won't write to the same cacheline of `data`.
+ [=](Index blk_size) {
+ return AdjustBlockSize(sizeof(Scalar), blk_size);
+ },
+ [&](Index first, Index last) {
+ for (Index idx2 = first; idx2 < last; ++idx2) {
+ ReduceScalar(self, idx1 + idx2, data);
+ }
+ });
+ }
+};
+
+// Specialization for multi-threaded execution.
+template <typename Self, typename Reducer, bool Vectorize>
+struct ScanLauncher<Self, Reducer, ThreadPoolDevice, Vectorize> {
void operator()(Self& self, typename Self::CoeffReturnType* data) {
using Scalar = typename Self::CoeffReturnType;
using Packet = typename Self::PacketReturnType;
const int PacketSize = internal::unpacket_traits<Packet>::size;
const Index total_size = internal::array_prod(self.dimensions());
const Index inner_block_size = self.stride() * self.size();
- const Index num_outer_blocks = total_size / inner_block_size;
- // Block alignment used to avoid false sharing of cachelines among threads.
- // Currently set to twice the cache line size on Intel and ARM processors.
- EIGEN_CONSTEXPR Index kBlockAlignment = 128;
-
- if ((num_outer_blocks >= self.stride() && total_size <= 4096) ||
- (num_outer_blocks < self.stride() && self.stride() < PacketSize)) {
- ScanLauncher<Self, Reducer, DefaultDevice, true> launcher;
+ bool parallelize_by_outer_blocks = (total_size >= (self.stride() * inner_block_size));
+
+ if ((parallelize_by_outer_blocks && total_size <= 4096) ||
+ (!parallelize_by_outer_blocks && self.stride() < PacketSize)) {
+ ScanLauncher<Self, Reducer, DefaultDevice, Vectorize> launcher;
launcher(self, data);
return;
}
- if (num_outer_blocks >= self.stride()) {
+ if (parallelize_by_outer_blocks) {
// Parallelize over outer blocks.
+ const Index num_outer_blocks = total_size / inner_block_size;
self.device().parallelFor(
num_outer_blocks,
TensorOpCost(inner_block_size, inner_block_size,
- 16 * PacketSize * inner_block_size, true, PacketSize),
- // Make the shard size large enough that two neighboring threads won't
- // write to the same cacheline of `data`.
+ 16 * PacketSize * inner_block_size, Vectorize,
+ PacketSize),
[=](Index blk_size) {
- const Index inner_blocks_cacheline =
- numext::maxi<Index>(1, kBlockAlignment / (inner_block_size * sizeof(Scalar)));
- return inner_blocks_cacheline *
- divup(blk_size, inner_blocks_cacheline);
+ return AdjustBlockSize(inner_block_size * sizeof(Scalar), blk_size);
},
[&](Index first, Index last) {
- for (Index idx = first; idx < last; ++idx) {
- ReduceBlock<Self, true> block_reducer;
- block_reducer(self, idx * inner_block_size, data);
+ for (Index idx1 = first; idx1 < last; ++idx1) {
+ ReduceBlock<Self, Vectorize, /*Parallelize=*/false> block_reducer;
+ block_reducer(self, idx1 * inner_block_size, data);
}
});
} else {
- // Parallelize over packets/scalars of the dimensions when the reduction
+ // Parallelize over inner packets/scalars dimensions when the reduction
// axis is not an inner dimension.
- const Index num_packets = self.stride() / PacketSize;
+ ReduceBlock<Self, Vectorize, /*Parallelize=*/true> block_reducer;
for (Index idx1 = 0; idx1 < total_size;
idx1 += self.stride() * self.size()) {
- self.device().parallelFor(
- num_packets,
- TensorOpCost(PacketSize * self.size(), PacketSize * self.size(),
- 16 * PacketSize * self.size(), true, PacketSize),
- // Make the shard size large enough that two neighboring threads
- // won't write to the same cacheline of `data`.
- [=](Index blk_size) {
- const Index packets_per_cacheline =
- numext::maxi<Index>(1, kBlockAlignment / (PacketSize * sizeof(Scalar)));
- return packets_per_cacheline *
- divup(blk_size, packets_per_cacheline);
- },
- [&](Index first, Index last) {
- for (Index packet = first; packet < last; ++packet) {
- const Index idx2 = packet * PacketSize;
- ReducePacket(self, idx1 + idx2, data);
- }
- });
-
- const Index num_scalars = self.stride() - num_packets * PacketSize;
- self.device().parallelFor(
- num_scalars,
- TensorOpCost(self.size(), self.size(), 16 * self.size()),
- // Make the shard size large enough that two neighboring threads
- // won't write to the same cacheline of `data`.
- [=](Index blk_size) {
- const Index scalars_per_cacheline =
- numext::maxi<Index>(1, kBlockAlignment / sizeof(Scalar));
- return scalars_per_cacheline *
- divup(blk_size, scalars_per_cacheline);
- },
- [&](Index first, Index last) {
- for (Index scalar = first; scalar < last; ++scalar) {
- const Index idx2 = num_packets * PacketSize + scalar;
- ReduceScalar(self, idx1 + idx2, data);
- }
- });
+ block_reducer(self, idx1, data);
}
}
}
@@ -328,6 +369,8 @@ struct ScanLauncher<Self, Reducer, GpuDevice, false> {
};
#endif // EIGEN_USE_GPU && (EIGEN_GPUCC)
+} // namespace internal
+
// Eval as rvalue
template <typename Op, typename ArgType, typename Device>
struct TensorEvaluator<const TensorScanOp<Op, ArgType>, Device> {
@@ -424,7 +467,7 @@ struct TensorEvaluator<const TensorScanOp<Op, ArgType>, Device> {
EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(EvaluatorPointerType data) {
m_impl.evalSubExprsIfNeeded(NULL);
- ScanLauncher<Self, Op, Device> launcher;
+ internal::ScanLauncher<Self, Op, Device> launcher;
if (data) {
launcher(*this, data);
return false;