aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported/Eigen/CXX11/src
diff options
context:
space:
mode:
authorGravatar Deven Desai <deven.desai.amd@gmail.com>2019-03-19 16:52:38 -0400
committerGravatar Deven Desai <deven.desai.amd@gmail.com>2019-03-19 16:52:38 -0400
commit2dbea5510fe5cb64dbfdef9042c04a3a92b87f76 (patch)
treec187e7ec5e90a191e19466ff6084dd8f053dba7e /unsupported/Eigen/CXX11/src
parente7e6809e6b38a5928efc0b5ca9520258e4d1fb3a (diff)
parent5c93b38c5fca514a08084e32feb8a8fb27bf3665 (diff)
Merged eigen/eigen into default
Diffstat (limited to 'unsupported/Eigen/CXX11/src')
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorBase.h23
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorContraction.h2
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorContractionBlocking.h4
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorContractionGpu.h9
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorContractionMapper.h31
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h447
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorConversion.h127
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h10
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h1
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorForcedEval.h25
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorGenerator.h98
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h36
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h187
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h11
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h66
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h10
-rw-r--r--unsupported/Eigen/CXX11/src/util/CXX11Workarounds.h6
-rw-r--r--unsupported/Eigen/CXX11/src/util/EmulateArray.h20
-rw-r--r--unsupported/Eigen/CXX11/src/util/EmulateCXX11Meta.h16
19 files changed, 857 insertions, 272 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h b/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h
index 7d9afa685..dbacf494e 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h
@@ -244,9 +244,11 @@ class TensorBase<Derived, ReadOnlyAccessors>
}
EIGEN_DEVICE_FUNC
- EIGEN_STRONG_INLINE const TensorCwiseUnaryOp<internal::scalar_conjugate_op<Scalar>, const Derived>
+ EIGEN_STRONG_INLINE const typename internal::conditional<NumTraits<CoeffReturnType>::IsComplex,
+ TensorCwiseUnaryOp<internal::scalar_conjugate_op<Scalar>, const Derived>,
+ Derived>::type
conjugate() const {
- return unaryExpr(internal::scalar_conjugate_op<Scalar>());
+ return choose(Cond<NumTraits<CoeffReturnType>::IsComplex>(), unaryExpr(internal::scalar_conjugate_op<Scalar>()), derived());
}
EIGEN_DEVICE_FUNC
@@ -339,10 +341,13 @@ class TensorBase<Derived, ReadOnlyAccessors>
return cwiseMin(constant(threshold));
}
- template <typename NewType> EIGEN_DEVICE_FUNC
- EIGEN_STRONG_INLINE const TensorConversionOp<NewType, const Derived>
+ template<typename NewType>
+ EIGEN_DEVICE_FUNC
+ EIGEN_STRONG_INLINE const typename internal::conditional<internal::is_same<NewType, CoeffReturnType>::value,
+ Derived,
+ TensorConversionOp<NewType, const Derived> >::type
cast() const {
- return TensorConversionOp<NewType, const Derived>(derived());
+ return choose(Cond<internal::is_same<NewType, CoeffReturnType>::value>(), derived(), TensorConversionOp<NewType, const Derived>(derived()));
}
EIGEN_DEVICE_FUNC
@@ -628,26 +633,26 @@ class TensorBase<Derived, ReadOnlyAccessors>
}
template <typename Dims> EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE
- const TensorReductionOp<internal::AndReducer, const Dims, const TensorConversionOp<bool, const Derived> >
+ const TensorReductionOp<internal::AndReducer, const Dims, const typename internal::conditional<internal::is_same<bool, CoeffReturnType>::value, Derived, TensorConversionOp<bool, const Derived> >::type >
all(const Dims& dims) const {
return cast<bool>().reduce(dims, internal::AndReducer());
}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE
- const TensorReductionOp<internal::AndReducer, const DimensionList<Index, NumDimensions>, const TensorConversionOp<bool, const Derived> >
+ const TensorReductionOp<internal::AndReducer, const DimensionList<Index, NumDimensions>, const typename internal::conditional<internal::is_same<bool, CoeffReturnType>::value, Derived, TensorConversionOp<bool, const Derived> >::type >
all() const {
DimensionList<Index, NumDimensions> in_dims;
return cast<bool>().reduce(in_dims, internal::AndReducer());
}
template <typename Dims> EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE
- const TensorReductionOp<internal::OrReducer, const Dims, const TensorConversionOp<bool, const Derived> >
+ const TensorReductionOp<internal::OrReducer, const Dims, const typename internal::conditional<internal::is_same<bool, CoeffReturnType>::value, Derived, TensorConversionOp<bool, const Derived> >::type >
any(const Dims& dims) const {
return cast<bool>().reduce(dims, internal::OrReducer());
}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE
- const TensorReductionOp<internal::OrReducer, const DimensionList<Index, NumDimensions>, const TensorConversionOp<bool, const Derived> >
+ const TensorReductionOp<internal::OrReducer, const DimensionList<Index, NumDimensions>, const typename internal::conditional<internal::is_same<bool, CoeffReturnType>::value, Derived, TensorConversionOp<bool, const Derived> >::type >
any() const {
DimensionList<Index, NumDimensions> in_dims;
return cast<bool>().reduce(in_dims, internal::OrReducer());
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContraction.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContraction.h
index 61a4e1a3a..6ca881f27 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorContraction.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContraction.h
@@ -102,7 +102,7 @@ struct traits<TensorContractionOp<Dimensions, LhsXprType, RhsXprType, OutputKern
typedef typename remove_reference<RhsNested>::type _RhsNested;
// From NumDims below.
- static const int NumDimensions = traits<RhsXprType>::NumDimensions + traits<RhsXprType>::NumDimensions - 2 * array_size<Dimensions>::value;
+ static const int NumDimensions = traits<LhsXprType>::NumDimensions + traits<RhsXprType>::NumDimensions - 2 * array_size<Dimensions>::value;
static const int Layout = traits<LhsXprType>::Layout;
typedef typename conditional<Pointer_type_promotion<typename LhsXprType::Scalar, Scalar>::val,
typename traits<LhsXprType>::PointerType, typename traits<RhsXprType>::PointerType>::type PointerType;
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionBlocking.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionBlocking.h
index 71fd19774..c51f3f8dd 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionBlocking.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionBlocking.h
@@ -51,6 +51,10 @@ class TensorContractionBlocking {
else {
computeProductBlockingSizes<LhsScalar, RhsScalar, 1>(kc_, nc_, mc_, num_threads);
}
+
+ const int rhs_packet_size = internal::packet_traits<RhsScalar>::size;
+ kc_ = (rhs_packet_size <= 8 || kc_ <= rhs_packet_size) ?
+ kc_ : (kc_ / rhs_packet_size) * rhs_packet_size;
}
EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE StorageIndex kc() const { return kc_; }
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionGpu.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionGpu.h
index 056665749..5d19652e6 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionGpu.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionGpu.h
@@ -1219,9 +1219,6 @@ template<typename Indices, typename LeftArgType, typename RightArgType, typename
struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgType, OutputKernelType>, GpuDevice> :
public TensorContractionEvaluatorBase<TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgType, OutputKernelType>, GpuDevice> > {
- static_assert(std::is_same<OutputKernelType, const NoOpOutputKernel>::value,
- "GPU tensor contraction does not support output kernels.");
-
typedef GpuDevice Device;
typedef TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgType, OutputKernelType>, Device> Self;
@@ -1274,7 +1271,11 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
typedef typename RightEvaluator::Dimensions RightDimensions;
EIGEN_DEVICE_FUNC TensorEvaluator(const XprType& op, const Device& device) :
- Base(op, device) {}
+ Base(op, device)
+ {
+ EIGEN_STATIC_ASSERT( (internal::is_same<OutputKernelType, const NoOpOutputKernel>::value),
+ GPU_TENSOR_CONTRACTION_DOES_NOT_SUPPORT_OUTPUT_KERNELS);
+ }
// We need to redefine this method to make nvcc happy
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(Scalar* data) {
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionMapper.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionMapper.h
index 2d3b69128..142492603 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionMapper.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionMapper.h
@@ -120,6 +120,7 @@ class SimpleTensorContractionMapper {
EIGEN_DEVICE_FUNC
EIGEN_STRONG_INLINE Index computeIndex(Index row, Index col) const {
const bool left = (side == Lhs);
+ EIGEN_UNUSED_VARIABLE(left); // annoying bug in g++8.1: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=85963
Index nocontract_val = left ? row : col;
Index linidx = 0;
for (int i = static_cast<int>(array_size<nocontract_t>::value) - 1; i > 0; i--) {
@@ -158,6 +159,7 @@ class SimpleTensorContractionMapper {
EIGEN_DEVICE_FUNC
EIGEN_STRONG_INLINE IndexPair<Index> computeIndexPair(Index row, Index col, const Index distance) const {
const bool left = (side == Lhs);
+ EIGEN_UNUSED_VARIABLE(left); // annoying bug in g++8.1: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=85963
Index nocontract_val[2] = {left ? row : col, left ? row + distance : col};
Index linidx[2] = {0, 0};
if (array_size<typename Tensor::Dimensions>::value > array_size<contract_t>::value) {
@@ -239,8 +241,10 @@ class BaseTensorContractionMapper : public SimpleTensorContractionMapper<Scalar,
ParentMapper(tensor, nocontract_strides, ij_strides, contract_strides, k_strides) { }
template <typename PacketT,int AlignmentType>
- EIGEN_DEVICE_FUNC
- EIGEN_STRONG_INLINE PacketT load(Index i, Index j) const {
+ EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE
+ typename internal::enable_if<internal::unpacket_traits<PacketT>::size==packet_size,PacketT>::type
+ load(Index i, Index j) const
+ {
// whole method makes column major assumption
// don't need to add offsets for now (because operator handles that)
@@ -282,6 +286,29 @@ class BaseTensorContractionMapper : public SimpleTensorContractionMapper<Scalar,
}
template <typename PacketT,int AlignmentType>
+ EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE
+ typename internal::enable_if<internal::unpacket_traits<PacketT>::size!=packet_size,PacketT>::type
+ load(Index i, Index j) const
+ {
+ const Index requested_packet_size = internal::unpacket_traits<PacketT>::size;
+ EIGEN_ALIGN_MAX Scalar data[requested_packet_size];
+
+ const IndexPair<Index> indexPair = this->computeIndexPair(i, j, requested_packet_size - 1);
+ const Index first = indexPair.first;
+ const Index lastIdx = indexPair.second;
+
+ data[0] = this->m_tensor.coeff(first);
+ for (Index k = 1; k < requested_packet_size - 1; k += 2) {
+ const IndexPair<Index> internal_pair = this->computeIndexPair(i + k, j, 1);
+ data[k] = this->m_tensor.coeff(internal_pair.first);
+ data[k + 1] = this->m_tensor.coeff(internal_pair.second);
+ }
+ data[requested_packet_size - 1] = this->m_tensor.coeff(lastIdx);
+
+ return pload<PacketT>(data);
+ }
+
+ template <typename PacketT,int AlignmentType>
EIGEN_DEVICE_FUNC
EIGEN_STRONG_INLINE PacketT loadPacket(Index i, Index j) const {
return this->load<PacketT,AlignmentType>(i,j);
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
index 24ba3e431..adf57c892 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
@@ -208,6 +208,26 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
Index nm = divup(nm0, gm);
Index nn = divup(nn0, gn);
+ // If there is enough concurrency in the sharding dimension, we choose not
+ // to paralellize by the other dimension, and execute all kernels in sync
+ // mode. This reduces parallelism from the nm x nn down to nn
+ // (shard_by_col==true) or nm (shard_by_col==false).
+ const Index sharding_dim_tasks = shard_by_col ? nn : nm;
+ const int num_worker_threads = this->m_device.numThreadsInPool();
+
+ // With small number of threads we want to make sure that we do not reduce
+ // parallelism too much. With large number of threads we trade maximum
+ // parallelism for better memory locality.
+ const float oversharding_factor =
+ num_worker_threads <= 4 ? 8.0 :
+ num_worker_threads <= 8 ? 4.0 :
+ num_worker_threads <= 16 ? 2.0 :
+ num_worker_threads <= 32 ? 1.0 :
+ num_worker_threads <= 64 ? 0.8 : /* num_worker_threads > 64 */ 0.6;
+
+ const bool parallelize_by_sharding_dim_only =
+ sharding_dim_tasks >= oversharding_factor * num_worker_threads;
+
// Last by not least, decide whether we want to issue both lhs and rhs
// packing in parallel; or issue lhs packing first, and then issue rhs
// packing when lhs packing completes (for !shard_by_col lhs and rhs are
@@ -223,10 +243,13 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
// But don't do it if we will use each rhs only once. Locality seems to be
// more important in this case.
if ((shard_by_col ? nm : nn) == 1) parallel_pack = false;
+ // Also don't get in the way of parallelize_by_sharding_dim_only
+ // optimization.
+ if (parallelize_by_sharding_dim_only) parallel_pack = false;
- #define CONTEXT_ARGS \
+#define CONTEXT_ARGS \
(this, num_threads, buffer, m, n, k, bm, bn, bk, nm, nn, nk, gm, gn, nm0, \
- nn0, shard_by_col, parallel_pack) \
+ nn0, shard_by_col, parallel_pack, parallelize_by_sharding_dim_only) \
.run()
TENSOR_CONTRACTION_DISPATCH(Context, Alignment, CONTEXT_ARGS);
@@ -260,7 +283,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
Context(const Self* self, int num_threads, Scalar* buffer, Index tm, Index tn,
Index tk, Index bm, Index bn, Index bk, Index nm, Index nn, Index nk,
Index gm, Index gn, Index nm0, Index nn0, bool shard_by_col,
- bool parallel_pack)
+ bool parallel_pack, bool parallelize_by_sharding_dim_only)
: device_(self->m_device),
lhs_(self->m_leftImpl, self->m_left_nocontract_strides,
self->m_i_strides, self->m_left_contracting_strides,
@@ -275,6 +298,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
num_threads_(num_threads),
shard_by_col_(shard_by_col),
parallel_pack_(parallel_pack),
+ parallelize_by_sharding_dim_only_(parallelize_by_sharding_dim_only),
m_(tm),
n_(tn),
k_(tk),
@@ -289,6 +313,9 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
nm0_(nm0),
nn0_(nn0)
{
+ // These two options are mutually exclusive.
+ eigen_assert(!(parallel_pack && parallelize_by_sharding_dim_only));
+
for (Index x = 0; x < P; x++) {
// Normal number of notifications for k slice switch is
// nm_ + nn_ + nm_ * nn_. However, first P - 1 slices will receive only
@@ -335,6 +362,42 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
mem += rhs_size;
}
}
+
+ if (parallelize_by_sharding_dim_only_) {
+ const int num_worker_threads = device_.numThreadsInPool();
+
+ if (shard_by_col) {
+ can_use_thread_local_packed_ = new std::atomic<bool>[nn_];
+ for (int i = 0; i < nn_; ++i)
+ can_use_thread_local_packed_[i].store(true,
+ std::memory_order_relaxed);
+
+ Index num_blocks = num_worker_threads * gn_;
+ thread_local_packed_mem_ = device_.allocate(num_blocks * rhs_size);
+ mem = static_cast<char*>(thread_local_packed_mem_);
+
+ thread_local_packed_rhs_.resize(num_blocks, nullptr);
+ for (Index i = 0; i < num_blocks; ++i) {
+ thread_local_packed_rhs_[i] = reinterpret_cast<RhsScalar*>(mem);
+ mem += rhs_size;
+ }
+ } else {
+ can_use_thread_local_packed_ = new std::atomic<bool>[nm_];
+ for (int i = 0; i < nm_; ++i)
+ can_use_thread_local_packed_[i].store(true,
+ std::memory_order_relaxed);
+
+ Index num_blocks = num_worker_threads * gm_;
+ thread_local_packed_mem_ = device_.allocate(num_blocks * lhs_size);
+ mem = static_cast<char*>(thread_local_packed_mem_);
+
+ thread_local_packed_lhs_.resize(num_blocks, nullptr);
+ for (Index i = 0; i < num_blocks; ++i) {
+ thread_local_packed_lhs_[i] = reinterpret_cast<LhsScalar*>(mem);
+ mem += lhs_size;
+ }
+ }
+ }
}
~Context() {
@@ -343,6 +406,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
delete[] state_kernel_[x];
}
device_.deallocate(packed_mem_);
+ if (parallelize_by_sharding_dim_only_) {
+ device_.deallocate(thread_local_packed_mem_);
+ delete[] can_use_thread_local_packed_;
+ }
}
void run() {
@@ -368,6 +435,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
const int num_threads_;
const bool shard_by_col_;
const bool parallel_pack_;
+ const bool parallelize_by_sharding_dim_only_;
// Matrix sizes.
const Index m_;
const Index n_;
@@ -426,6 +494,36 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
void* packed_mem_;
std::vector<LhsScalar*> packed_lhs_[P - 1];
std::vector<RhsScalar*> packed_rhs_[P - 1];
+
+ // If we choose to parallelize only by the sharding dimension, each thread
+ // will have it's own "thead local" (not a c++ thread local storage) memory
+ // for packed_lhs or packed_rhs (shard_by_col = false of true). This memory
+ // can't be passed to a kernel that might execute on a different thread.
+ //
+ // In practice when we are ready to pack memory for the sharding dimension
+ // (rhs if shard_by_col==true) of the K-th slice, all kernels for K-1 slice
+ // already computed (99% of the time), and we can pack data into the thread
+ // local storage, and guarantee that all the kernels will be executed
+ // immediately in the same thread. This significantly increases L1 cache hit
+ // ratio and reduces pressure on the memory bus.
+ //
+ // It's still possible that kernel for the K-th slice will be ready before
+ // completion of the K-1 kernel, so we have to allocate "global" packed_lhs_
+ // and packed_rhs_ to allow kernels to be executed later on a thread
+ // different from the thread that was used for packing.
+ void* thread_local_packed_mem_;
+
+ // Only one of these will beinitialized depending on shard_by_col value.
+ std::vector<LhsScalar*> thread_local_packed_lhs_;
+ std::vector<RhsScalar*> thread_local_packed_rhs_;
+
+ // After a particular shard for Kth slice missed thread local execution
+ // opportunity (K-1 slice didn't complete kernels execution), we can no
+ // longer schedule K+1 and following slices in thread local mode, because
+ // there is no more guarantee that previous kernels were executed
+ // sequentially in the same thread (size is nn_ or nm_).
+ std::atomic<bool>* can_use_thread_local_packed_;
+
std::atomic<uint8_t>** state_kernel_[P];
// state_switch_ is frequently modified by worker threads, while other
// fields are read-only after constructor. Let's move it to a separate cache
@@ -434,22 +532,96 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
std::atomic<Index> state_packing_ready_[P];
std::atomic<Index> state_switch_[P];
+ LhsScalar* packed_lhs(Index m, Index k, Index m1, bool use_thread_local) {
+ if (use_thread_local) {
+ eigen_assert(!shard_by_col_);
+
+ Index base_idx = gm_ * device_.currentThreadId();
+ Index grain_idx = m1 - m * gm_;
+ Index block_idx = base_idx + grain_idx;
+
+ return thread_local_packed_lhs_[block_idx];
+ } else {
+ return packed_lhs_[k % (P - 1)][m1];
+ }
+ }
+
+ RhsScalar* packed_rhs(Index n, Index k, Index n1, bool use_thread_local) {
+ if (use_thread_local) {
+ eigen_assert(shard_by_col_);
+
+ Index base_idx = gn_ * device_.currentThreadId();
+ Index grain_idx = n1 - n * gn_;
+ Index block_idx = base_idx + grain_idx;
+
+ return thread_local_packed_rhs_[block_idx];
+ } else {
+ return packed_rhs_[k % (P - 1)][n1];
+ }
+ }
+
+ // In following two methods (pack_lhs and pack_rhs), if we know for sure
+ // that we'll be able to immediately call a kernel with packed data, and do
+ // not submit it to the thread pool, we can use thread local memory for
+ // packed data.
+ //
+ // We can only reliably check it if we are running all kernels in sync mode
+ // (parallelize only by sharding dim). If kernel for m==0 (n==0) is ready to
+ // run, it's guaranteed that all kernels with larger values of m (n) are
+ // also ready, because we execute them in the same order for all K slices.
+
void pack_lhs(Index m, Index k) {
+ bool use_thread_local = false;
+
+ if (parallelize_by_sharding_dim_only_ && !shard_by_col_ &&
+ can_use_thread_local_packed_[m].load(std::memory_order_relaxed)) {
+ if (state_kernel_[k % P][m][0].load(std::memory_order_relaxed) == 1) {
+ use_thread_local = true;
+ } else {
+ // If we can't guarantee that all kernels in `k` slice will be
+ // executed sequentially in current thread, it's no longer safe to use
+ // thread local memory in followig slices along the k dimensions.
+ eigen_assert(k > 0);
+ can_use_thread_local_packed_[m].store(false,
+ std::memory_order_relaxed);
+ }
+ }
+
const Index mend = m * gm_ + gm(m);
for (Index m1 = m * gm_; m1 < mend; m1++)
- TensorContractionKernel::packLhs(packed_lhs_[k % (P - 1)][m1],
+ TensorContractionKernel::packLhs(packed_lhs(m, k, m1, use_thread_local),
lhs_.getSubMapper(m1 * bm_, k * bk_),
bk(k), bm(m1));
if (!parallel_pack_ && shard_by_col_) {
+ assert(!use_thread_local);
signal_packing(k);
} else {
signal_switch(k + 1);
- for (Index n = nn_ - 1; n >= 0; n--) signal_kernel(m, n, k, n == 0);
+ for (Index n = nn_ - 1; n >= 0; n--) {
+ bool sync = parallelize_by_sharding_dim_only_ || n == 0;
+ signal_kernel(m, n, k, sync, use_thread_local);
+ }
}
}
void pack_rhs(Index n, Index k) {
+ bool use_thread_local = false;
+
+ if (parallelize_by_sharding_dim_only_ && shard_by_col_ &&
+ can_use_thread_local_packed_[n].load(std::memory_order_relaxed)) {
+ if (state_kernel_[k % P][0][n].load(std::memory_order_relaxed) == 1) {
+ use_thread_local = true;
+ } else {
+ // If we can't guarantee that all kernels in `k` slice will be
+ // executed sequentially in current thread, it's no longer safe to use
+ // thread local memory in followig slices along the k dimensions.
+ eigen_assert(k > 0);
+ can_use_thread_local_packed_[n].store(false,
+ std::memory_order_relaxed);
+ }
+ }
+
const Index nend = n * gn_ + gn(n);
for (Index n1 = n * gn_; n1 < nend; n1++) {
if (k == 0) {
@@ -462,20 +634,24 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
// deadlocks.
memset(buffer_ + n1 * bn_ * m_, 0, bn(n1) * m_ * sizeof(Scalar));
}
- TensorContractionKernel::packRhs(packed_rhs_[k % (P - 1)][n1],
+ TensorContractionKernel::packRhs(packed_rhs(n, k, n1, use_thread_local),
rhs_.getSubMapper(k * bk_, n1 * bn_),
bk(k), bn(n1));
}
if (parallel_pack_ || shard_by_col_) {
signal_switch(k + 1);
- for (Index m = nm_ - 1; m >= 0; m--) signal_kernel(m, n, k, m == 0);
+ for (Index m = nm_ - 1; m >= 0; m--) {
+ bool sync = parallelize_by_sharding_dim_only_ || m == 0;
+ signal_kernel(m, n, k, sync, use_thread_local);
+ }
} else {
+ assert(!use_thread_local);
signal_packing(k);
}
}
- void kernel(Index m, Index n, Index k) {
+ void kernel(Index m, Index n, Index k, bool use_thread_local) {
// Note: order of iteration matters here. Iteration over m is innermost
// because we want to reuse the same packed rhs in consecutive tasks
// (rhs fits into L2$ while lhs only into L3$).
@@ -486,8 +662,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
for (Index m1 = m * gm_; m1 < mend; m1++) {
const auto output_mapper = output_.getSubMapper(m1 * bm_, n1 * bn_);
TensorContractionKernel::invoke(
- output_mapper, packed_lhs_[k % (P - 1)][m1],
- packed_rhs_[k % (P - 1)][n1], bm(m1), bk(k), bn(n1), Scalar(1));
+ output_mapper,
+ packed_lhs(m, k, m1, !shard_by_col_ && use_thread_local),
+ packed_rhs(n, k, n1, shard_by_col_ && use_thread_local), bm(m1),
+ bk(k), bn(n1), Scalar(1));
// We are done with the last task for the [m1, n1] block.
if (k + 1 == nk_) {
@@ -501,8 +679,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
for (Index n1 = n * gn_; n1 < nend; n1++) {
const auto output_mapper = output_.getSubMapper(m1 * bm_, n1 * bn_);
TensorContractionKernel::invoke(
- output_mapper, packed_lhs_[k % (P - 1)][m1],
- packed_rhs_[k % (P - 1)][n1], bm(m1), bk(k), bn(n1), Scalar(1));
+ output_mapper,
+ packed_lhs(m, k, m1, !shard_by_col_ && use_thread_local),
+ packed_rhs(n, k, n1, shard_by_col_ && use_thread_local), bm(m1),
+ bk(k), bn(n1), Scalar(1));
// We are done with the last task for the [m1, n1] block.
if (k + 1 == nk_) {
@@ -511,7 +691,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
}
}
}
- signal_kernel(m, n, k + 1, false);
+ signal_kernel(m, n, k + 1, /*sync=*/false, /*use_thread_local=*/false);
signal_switch(k + 2);
}
@@ -524,16 +704,23 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
enqueue_packing(k, shard_by_col_);
}
- void signal_kernel(Index m, Index n, Index k, bool sync) {
+ void signal_kernel(Index m, Index n, Index k, bool sync,
+ bool use_thread_local) {
std::atomic<uint8_t>* state = &state_kernel_[k % P][m][n];
Index s = state->load();
eigen_assert(s > 0);
- if (s != 1 && state->fetch_sub(1) != 1) return;
+ if (s != 1 && state->fetch_sub(1) != 1) {
+ eigen_assert(!use_thread_local);
+ return;
+ }
state->store(parallel_pack_ ? 3 : 2, std::memory_order_relaxed);
- if (sync)
- kernel(m, n, k);
- else
- device_.enqueueNoNotification([=]() { kernel(m, n, k); });
+ if (sync) {
+ kernel(m, n, k, use_thread_local);
+ } else {
+ eigen_assert(!use_thread_local);
+ device_.enqueueNoNotification(
+ [=]() { kernel(m, n, k, use_thread_local); });
+ }
}
void signal_switch(Index k, Index v = 1) {
@@ -589,7 +776,26 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
[=]() { enqueue_packing_helper(mid, end, k, rhs); });
end = mid;
}
- enqueue_packing_helper(start, end, k, rhs);
+
+ // Decide if we want to run first packing task (start == 0) in
+ // async mode if we parallelize only by sharding dim:
+ // (1) pack_lhs and pack_rhs call signal_switch before completing
+ // all calls to signal_kernel, which in sync mode might lead
+ // to the execution of the first kernel of the k+1 slice, before
+ // completing a call to the last kernel of the k slice.
+ // (2) all pack tasks for sharded dim must be executed in a thread
+ // pool.
+ bool pack_async =
+ (start == 0) &&
+ (parallelize_by_sharding_dim_only_&& shard_by_col_ == rhs) &&
+ (k > 0 || device_.currentThreadId() < 0);
+
+ if (pack_async) {
+ device_.enqueueNoNotification(
+ [=]() { enqueue_packing_helper(start, end, k, rhs); });
+ } else {
+ enqueue_packing_helper(start, end, k, rhs);
+ }
}
}
@@ -756,6 +962,36 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
}
}
+ template <int Alignment>
+ EIGEN_STRONG_INLINE void addAllToBuffer(size_t n, const Scalar* src_buf0,
+ const Scalar* src_buf1,
+ const Scalar* src_buf2,
+ Scalar* dst_buf) const {
+ using ::Eigen::internal::padd;
+ using ::Eigen::internal::pload;
+ using ::Eigen::internal::ploadt;
+ using ::Eigen::internal::pstoret;
+
+ const int output_packet_size =
+ internal::unpacket_traits<PacketReturnType>::size;
+
+ size_t i = 0;
+ const size_t num_packets = n / output_packet_size;
+ for (; i < output_packet_size * num_packets; i += output_packet_size) {
+ const auto src_val0 = pload<PacketReturnType>(src_buf0 + i);
+ const auto src_val1 = pload<PacketReturnType>(src_buf1 + i);
+ const auto src_val2 = pload<PacketReturnType>(src_buf2 + i);
+
+ const auto dst_val = ploadt<PacketReturnType, Alignment>(dst_buf + i);
+ const auto sum = padd(padd(dst_val, src_val0), padd(src_val1, src_val2));
+
+ pstoret<Scalar, PacketReturnType, Alignment>(dst_buf + i, sum);
+ }
+ for (; i < n; ++i) {
+ dst_buf[i] += src_buf0[i] + src_buf1[i] + src_buf2[i];
+ }
+ }
+
// Decide whether we want to shard m x k x n contraction over the inner
// (contraction) dimension (k).
static bool shardByInnerDim(Index m, Index n, Index k, int num_threads,
@@ -788,48 +1024,147 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
const Index m = this->m_i_size;
const Index n = this->m_j_size;
const Index k = this->m_k_size;
- // The underlying GEMM kernel assumes that k is a multiple of 8 and
- // subtle breakage occurs if this is violated.
- Index block_size = 8 * divup<Index>(k, 8 * num_threads);
- Index num_blocks = divup<Index>(k, block_size);
- // we use 'result' for the first block's partial result.
- MaxSizeVector<Scalar*> block_buffers(num_blocks - 1);
- Barrier barrier(internal::convert_index<int>(num_blocks));
- auto process_block = [=, &barrier](Scalar* buf, Index begin, Index end) {
- ::memset(buf, 0, m * n * sizeof(Scalar));
+
+ // We will compute partial results into the buffers of this size.
+ const Index buffer_size_bytes = m * n * sizeof(Scalar);
+
+ // The underlying GEMM kernel assumes that k is a multiple of
+ // the packet size and subtle breakage occurs if this is violated.
+ const Index packet_size = internal::packet_traits<RhsScalar>::size;
+
+ const auto round_up = [=](Index index) -> Index {
+ const Index kmultiple = packet_size <= 8 ? 8 : packet_size;
+ return divup<Index>(index, kmultiple) * kmultiple;
+ };
+
+ // Cost model doesn't capture well the cost associated with constructing
+ // tensor contraction mappers and computing loop bounds in gemm_pack_lhs and
+ // gemm_pack_rhs, so we specify minimum desired block size.
+ const Index target_block_size = round_up(divup<Index>(k, num_threads));
+ const Index desired_min_block_size = 12 * packet_size;
+
+ const Index block_size = numext::mini<Index>(
+ k, numext::maxi<Index>(desired_min_block_size, target_block_size));
+ const Index num_blocks = divup<Index>(k, block_size);
+
+ // Compute block size with accounting for potentially incomplete last block.
+ const auto actual_block_size = [=](Index block_idx) -> Index {
+ return block_idx + 1 < num_blocks
+ ? block_size
+ : k + block_size - block_size * num_blocks;
+ };
+
+ // We compute partial gemm results in parallel, and to get the final result
+ // we need to add them all together. For the large number of threads (>= 48)
+ // this adds a very expensive sequential step at the end.
+ //
+ // We split the [0, num_blocks) into small ranges, and when a task for the
+ // block finishes its partial gemm computation, it checks if it was the last
+ // gemm in the range, and if so, it will add all blocks of the range.
+ //
+ // After all tasks finihes, we need to add only these pre-aggregated blocks.
+
+ // Compute range size with accounting for potentially incomplete last range.
+ const auto actual_range_size = [=](Index num_ranges, Index range_size,
+ Index range_idx) -> Index {
+ eigen_assert(range_idx < num_ranges);
+ return range_idx + 1 < num_ranges
+ ? range_size
+ : num_blocks + range_size - range_size * num_ranges;
+ };
+
+ // For now we use just a single level of ranges to compute pre-aggregated
+ // partial sums, but in general we can use more layers to compute tree
+ // aggregation in parallel and reduce the size of the sequential step.
+ //
+ // TODO(ezhulenev): Add multilevel tree aggregation? Probably will make
+ // sense only if number of threads >= ~128?
+ static const Index l0_size = 4;
+ const Index l0_ranges = divup<Index>(num_blocks, l0_size);
+
+ // Keep count of pending gemm tasks for each l0 range.
+ MaxSizeVector<std::atomic<int>> l0_state(l0_ranges);
+ for (int i = 0; i < l0_ranges; ++i) {
+ const Index num_pending_tasks = actual_range_size(l0_ranges, l0_size, i);
+ l0_state.emplace_back(internal::convert_index<int>(num_pending_tasks));
+ }
+
+ MaxSizeVector<Scalar*> block_buffers(num_blocks);
+
+ auto process_block = [&, this](Index block_idx, Index begin, Index end) {
+ Scalar* buf = block_buffers[block_idx];
+ ::memset(buf, 0, buffer_size_bytes);
+
TENSOR_CONTRACTION_DISPATCH(
this->template evalGemmPartialWithoutOutputKernel, Alignment,
- (buf, begin, end, this->m_device.numThreads()));
- barrier.Notify();
- };
- Index start = 0;
- for (Index blocks_left = num_blocks; blocks_left > 0; --blocks_left) {
- // The underlying GEMM kernel assumes that k is a multiple of packet size
- // (currently largest packet size is 8) and subtle breakage occurs if
- // this is violated.
- block_size = 8 * divup<Index>(k - start, 8 * blocks_left);
- Scalar* buf;
- if (start == 0) {
- buf = result;
- } else {
- buf = static_cast<Scalar*>(
- this->m_device.allocate(m * n * sizeof(Scalar)));
- block_buffers.push_back(buf);
- }
- Index end = start + block_size;
- if (end > k) {
- end = k;
+ (buf, begin, end,
+ /*num_threads=*/internal::convert_index<int>(num_blocks)));
+
+ // Check if it was the last task in l0 range.
+ const Index l0_index = block_idx / l0_size;
+ const int v = l0_state[l0_index].fetch_sub(1);
+ eigen_assert(v >= 1);
+
+ // If we processed the last block of the range, we can aggregate all
+ // partial results into the first block of the range.
+ if (v == 1) {
+ const Index rng_size = actual_range_size(l0_ranges, l0_size, l0_index);
+ const Index dst_block_idx = l0_index * l0_size;
+
+ if (rng_size == l0_size) {
+ addAllToBuffer<Alignment>(
+ m * n,
+ /*src_buf0=*/block_buffers[dst_block_idx + 1],
+ /*src_buf1=*/block_buffers[dst_block_idx + 2],
+ /*src_buf2=*/block_buffers[dst_block_idx + 3],
+ /*dst_buf= */ block_buffers[dst_block_idx]);
+ } else {
+ // Aggregate blocks of potentially incomplete last range.
+ for (int i = 1; i < rng_size; ++i) {
+ addToBuffer<Alignment>(m * n,
+ /*src_buf=*/block_buffers[dst_block_idx + i],
+ /*dst_buf=*/block_buffers[dst_block_idx]);
+ }
+ }
}
- this->m_device.enqueueNoNotification(
- [=, &process_block]() { process_block(buf, start, end); });
- start = end;
+ };
+
+ Barrier barrier(internal::convert_index<int>(num_blocks));
+ for (Index block_idx = 0; block_idx < num_blocks; ++block_idx) {
+ Scalar* buf = block_idx == 0
+ ? result
+ : static_cast<Scalar*>(
+ this->m_device.allocate(buffer_size_bytes));
+ block_buffers.push_back(buf);
+
+ Index block_start = block_idx * block_size;
+ Index block_end = block_start + actual_block_size(block_idx);
+
+ this->m_device.enqueueNoNotification([=, &barrier, &process_block]() {
+ process_block(block_idx, block_start, block_end);
+ barrier.Notify();
+ });
}
barrier.Wait();
- // Add other partial results into first partial result.
- for (const auto& buf : block_buffers) {
- addToBuffer<Alignment>(m * n, buf, result);
- this->m_device.deallocate(buf);
+ // Aggregate partial sums from l0 ranges.
+ Index l0_index = 1;
+ for (; l0_index + 2 < l0_ranges; l0_index += 3) {
+ addAllToBuffer<Alignment>(
+ m * n,
+ /*src_buf0=*/block_buffers[(l0_index + 0) * l0_size],
+ /*src_buf1=*/block_buffers[(l0_index + 1) * l0_size],
+ /*src_buf2=*/block_buffers[(l0_index + 2) * l0_size],
+ /*dst_buf= */block_buffers[0]);
+ }
+ for (; l0_index < l0_ranges; ++l0_index) {
+ addToBuffer<Alignment>(m * n, block_buffers[l0_index * l0_size],
+ block_buffers[0]);
+ }
+
+ // Don't forget to deallocate ALL temporary buffers.
+ for (Index i = 1; i < num_blocks; ++i) {
+ this->m_device.deallocate(block_buffers[i]);
}
// Finally call output kernel with finalized output buffer.
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorConversion.h b/unsupported/Eigen/CXX11/src/Tensor/TensorConversion.h
index 1f613d3c7..938fd0f34 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorConversion.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorConversion.h
@@ -32,7 +32,7 @@ struct traits<TensorConversionOp<TargetType, XprType> >
static const int NumDimensions = traits<XprType>::NumDimensions;
static const int Layout = traits<XprType>::Layout;
enum { Flags = 0 };
- typedef typename TypeConversion<Scalar, typename traits<XprType>::PointerType>::type PointerType;
+ typedef typename TypeConversion<Scalar, typename traits<XprType>::PointerType>::type PointerType;
};
template<typename TargetType, typename XprType>
@@ -177,6 +177,81 @@ template <typename Eval, typename Scalar> struct ConversionSubExprEval<true, Eva
}
};
+namespace internal {
+
+template <typename SrcType, typename TargetType, bool IsSameT>
+struct CoeffConv {
+ template <typename ArgType, typename Device>
+ static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TargetType run(const TensorEvaluator<ArgType, Device>& impl, Index index) {
+ internal::scalar_cast_op<SrcType, TargetType> converter;
+ return converter(impl.coeff(index));
+ }
+};
+
+template <typename SrcType, typename TargetType>
+struct CoeffConv<SrcType, TargetType, true> {
+ template <typename ArgType, typename Device>
+ static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TargetType run(const TensorEvaluator<ArgType, Device>& impl, Index index) {
+ return impl.coeff(index);
+ }
+};
+
+template <typename SrcPacket, typename TargetPacket, int LoadMode, bool ActuallyVectorize, bool IsSameT>
+struct PacketConv {
+ typedef typename internal::unpacket_traits<SrcPacket>::type SrcType;
+ typedef typename internal::unpacket_traits<TargetPacket>::type TargetType;
+
+ static const int PacketSize = internal::unpacket_traits<TargetPacket>::size;
+
+ template <typename ArgType, typename Device>
+ static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TargetPacket run(const TensorEvaluator<ArgType, Device>& impl, Index index) {
+ internal::scalar_cast_op<SrcType, TargetType> converter;
+ EIGEN_ALIGN_MAX typename internal::remove_const<TargetType>::type values[PacketSize];
+ for (int i = 0; i < PacketSize; ++i) {
+ values[i] = converter(impl.coeff(index+i));
+ }
+ TargetPacket rslt = internal::pload<TargetPacket>(values);
+ return rslt;
+ }
+};
+
+template <typename SrcPacket, typename TargetPacket, int LoadMode, bool IsSameT>
+struct PacketConv<SrcPacket, TargetPacket, LoadMode, true, IsSameT> {
+ typedef typename internal::unpacket_traits<SrcPacket>::type SrcType;
+ typedef typename internal::unpacket_traits<TargetPacket>::type TargetType;
+
+ template <typename ArgType, typename Device>
+ static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TargetPacket run(const TensorEvaluator<ArgType, Device>& impl, Index index) {
+ const int SrcCoeffRatio = internal::type_casting_traits<SrcType, TargetType>::SrcCoeffRatio;
+ const int TgtCoeffRatio = internal::type_casting_traits<SrcType, TargetType>::TgtCoeffRatio;
+ PacketConverter<TensorEvaluator<ArgType, Device>, SrcPacket, TargetPacket,
+ SrcCoeffRatio, TgtCoeffRatio> converter(impl);
+ return converter.template packet<LoadMode>(index);
+ }
+};
+
+template <typename SrcPacket, typename TargetPacket, int LoadMode>
+struct PacketConv<SrcPacket, TargetPacket, LoadMode, /*ActuallyVectorize=*/false, /*IsSameT=*/true> {
+ typedef typename internal::unpacket_traits<TargetPacket>::type TargetType;
+ static const int PacketSize = internal::unpacket_traits<TargetPacket>::size;
+
+ template <typename ArgType, typename Device>
+ static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TargetPacket run(const TensorEvaluator<ArgType, Device>& impl, Index index) {
+ EIGEN_ALIGN_MAX typename internal::remove_const<TargetType>::type values[PacketSize];
+ for (int i = 0; i < PacketSize; ++i) values[i] = impl.coeff(index+i);
+ return internal::pload<TargetPacket>(values);
+ }
+};
+
+template <typename SrcPacket, typename TargetPacket, int LoadMode>
+struct PacketConv<SrcPacket, TargetPacket, LoadMode, /*ActuallyVectorize=*/true, /*IsSameT=*/true> {
+ template <typename ArgType, typename Device>
+ static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TargetPacket run(const TensorEvaluator<ArgType, Device>& impl, Index index) {
+ return impl.template packet<LoadMode>(index);
+ }
+};
+
+} // namespace internal
// Eval as rvalue
template<typename TargetType, typename ArgType, typename Device>
@@ -191,6 +266,7 @@ struct TensorEvaluator<const TensorConversionOp<TargetType, ArgType>, Device>
typedef typename PacketType<CoeffReturnType, Device>::type PacketReturnType;
typedef typename PacketType<SrcType, Device>::type PacketSourceType;
static const int PacketSize = PacketType<CoeffReturnType, Device>::size;
+ static const bool IsSameType = internal::is_same<TargetType, SrcType>::value;
enum {
IsAligned = false,
@@ -210,7 +286,7 @@ struct TensorEvaluator<const TensorConversionOp<TargetType, ArgType>, Device>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(Scalar* data)
{
- return ConversionSubExprEval<internal::is_same<TargetType, SrcType>::value, TensorEvaluator<ArgType, Device>, Scalar>::run(m_impl, data);
+ return ConversionSubExprEval<IsSameType, TensorEvaluator<ArgType, Device>, Scalar>::run(m_impl, data);
}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup()
@@ -220,16 +296,23 @@ struct TensorEvaluator<const TensorConversionOp<TargetType, ArgType>, Device>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE CoeffReturnType coeff(Index index) const
{
- internal::scalar_cast_op<SrcType, TargetType> converter;
- return converter(m_impl.coeff(index));
+ return internal::CoeffConv<SrcType, TargetType, IsSameType>::run(m_impl,index);
}
template<int LoadMode>
- EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE PacketReturnType packet(Index index) const
- {
- const bool Vectorizable = TensorEvaluator<ArgType, Device>::PacketAccess &
- internal::type_casting_traits<SrcType, TargetType>::VectorizedCast;
- return PacketConv<LoadMode, Vectorizable>::run(m_impl, index);
+ EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE PacketReturnType
+ packet(Index index) const {
+ // If we are not going to do the cast, we just need to check that base
+ // TensorEvaluator has packet access. Otherwise we also need to make sure,
+ // that we have an implementation of vectorized cast.
+ const bool Vectorizable =
+ IsSameType
+ ? TensorEvaluator<ArgType, Device>::PacketAccess
+ : TensorEvaluator<ArgType, Device>::PacketAccess &
+ internal::type_casting_traits<SrcType, TargetType>::VectorizedCast;
+
+ return internal::PacketConv<PacketSourceType, PacketReturnType, LoadMode,
+ Vectorizable, IsSameType>::run(m_impl, index);
}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TensorOpCost
@@ -252,31 +335,7 @@ struct TensorEvaluator<const TensorConversionOp<TargetType, ArgType>, Device>
/// required by sycl in order to extract the sycl accessor
const TensorEvaluator<ArgType, Device>& impl() const { return m_impl; }
- protected:
- template <int LoadMode, bool ActuallyVectorize>
- struct PacketConv {
- static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE PacketReturnType run(const TensorEvaluator<ArgType, Device>& impl, Index index) {
- internal::scalar_cast_op<SrcType, TargetType> converter;
- EIGEN_ALIGN_MAX typename internal::remove_const<CoeffReturnType>::type values[PacketSize];
- for (int i = 0; i < PacketSize; ++i) {
- values[i] = converter(impl.coeff(index+i));
- }
- PacketReturnType rslt = internal::pload<PacketReturnType>(values);
- return rslt;
- }
- };
-
- template <int LoadMode>
- struct PacketConv<LoadMode, true> {
- static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE PacketReturnType run(const TensorEvaluator<ArgType, Device>& impl, Index index) {
- const int SrcCoeffRatio = internal::type_casting_traits<SrcType, TargetType>::SrcCoeffRatio;
- const int TgtCoeffRatio = internal::type_casting_traits<SrcType, TargetType>::TgtCoeffRatio;
- PacketConverter<TensorEvaluator<ArgType, Device>, PacketSourceType, PacketReturnType,
- SrcCoeffRatio, TgtCoeffRatio> converter(impl);
- return converter.template packet<LoadMode>(index);
- }
- };
-
+ protected:
TensorEvaluator<ArgType, Device> m_impl;
};
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
index bb330a77b..b43db40c8 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
@@ -87,13 +87,13 @@ struct ThreadPoolDevice {
const size_t kMinBlockSize = 32768;
typedef TensorCostModel<ThreadPoolDevice> CostModel;
const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4);
- if (n <= kMinBlockSize || num_threads == 1) {
+ if (n <= kMinBlockSize || num_threads < 2) {
::memcpy(dst, src, n);
} else {
const char* src_ptr = static_cast<const char*>(src);
char* dst_ptr = static_cast<char*>(dst);
const size_t blocksize = (n + (num_threads - 1)) / num_threads;
- Barrier barrier(num_threads - 1);
+ Barrier barrier(static_cast<int>(num_threads - 1));
// Launch the last 3 blocks on worker threads.
for (size_t i = 1; i < num_threads; ++i) {
enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] {
@@ -122,6 +122,12 @@ struct ThreadPoolDevice {
return num_threads_;
}
+ // Number of theads available in the underlying thread pool. This number can
+ // be different from the value returned by numThreads().
+ EIGEN_STRONG_INLINE int numThreadsInPool() const {
+ return pool_->NumThreads();
+ }
+
EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const {
return l1CacheSize();
}
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
index 1c44541bd..e2ff11129 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h
@@ -325,7 +325,6 @@ class TensorExecutor<Expression, GpuDevice, Vectorizable, Tileable> {
static void run(const Expression& expr, const GpuDevice& device);
};
-
#if defined(EIGEN_GPUCC)
template <typename Evaluator, typename StorageIndex, bool Vectorizable>
struct EigenMetaKernelEval {
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorForcedEval.h b/unsupported/Eigen/CXX11/src/Tensor/TensorForcedEval.h
index 78068be35..74b905329 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorForcedEval.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorForcedEval.h
@@ -90,14 +90,21 @@ struct TensorEvaluator<const TensorForcedEvalOp<ArgType>, Device>
static const int PacketSize = PacketType<CoeffReturnType, Device>::size;
enum {
- IsAligned = true,
- PacketAccess = (PacketType<CoeffReturnType, Device>::size > 1),
- BlockAccess = false,
+ IsAligned = true,
+ PacketAccess = (PacketType<CoeffReturnType, Device>::size > 1),
+ BlockAccess = internal::is_arithmetic<CoeffReturnType>::value,
PreferBlockAccess = false,
- Layout = TensorEvaluator<ArgType, Device>::Layout,
- RawAccess = true
+ Layout = TensorEvaluator<ArgType, Device>::Layout,
+ RawAccess = true
};
+ typedef typename internal::TensorBlock<
+ CoeffReturnType, Index, internal::traits<ArgType>::NumDimensions, Layout>
+ TensorBlock;
+ typedef typename internal::TensorBlockReader<
+ CoeffReturnType, Index, internal::traits<ArgType>::NumDimensions, Layout>
+ TensorBlockReader;
+
EIGEN_DEVICE_FUNC TensorEvaluator(const XprType& op, const Device& device)
/// op_ is used for sycl
: m_impl(op.expression(), device), m_op(op.expression()), m_device(device), m_buffer(NULL)
@@ -139,6 +146,14 @@ struct TensorEvaluator<const TensorForcedEvalOp<ArgType>, Device>
return internal::ploadt<PacketReturnType, LoadMode>(m_buffer + index);
}
+ EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void getResourceRequirements(
+ std::vector<internal::TensorOpResourceRequirements>*) const {}
+
+ EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void block(TensorBlock* block) const {
+ assert(m_buffer != NULL);
+ TensorBlockReader::Run(block, m_buffer);
+ }
+
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TensorOpCost costPerCoeff(bool vectorized) const {
return TensorOpCost(sizeof(CoeffReturnType), 0, 0, vectorized, PacketSize);
}
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorGenerator.h b/unsupported/Eigen/CXX11/src/Tensor/TensorGenerator.h
index ac66f9cf1..204a6fd33 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorGenerator.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorGenerator.h
@@ -89,17 +89,22 @@ struct TensorEvaluator<const TensorGeneratorOp<Generator, ArgType>, Device>
typedef typename XprType::CoeffReturnType CoeffReturnType;
typedef typename PacketType<CoeffReturnType, Device>::type PacketReturnType;
enum {
- IsAligned = false,
- PacketAccess = (PacketType<CoeffReturnType, Device>::size > 1),
- BlockAccess = false,
- PreferBlockAccess = false,
- Layout = TensorEvaluator<ArgType, Device>::Layout,
- CoordAccess = false, // to be implemented
- RawAccess = false
+ IsAligned = false,
+ PacketAccess = (PacketType<CoeffReturnType, Device>::size > 1),
+ BlockAccess = true,
+ PreferBlockAccess = true,
+ Layout = TensorEvaluator<ArgType, Device>::Layout,
+ CoordAccess = false, // to be implemented
+ RawAccess = false
};
+ typedef internal::TensorIntDivisor<Index> IndexDivisor;
+
+ typedef internal::TensorBlock<CoeffReturnType, Index, NumDims, Layout>
+ TensorBlock;
+
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TensorEvaluator(const XprType& op, const Device& device)
- : m_generator(op.generator())
+ : m_device(device), m_generator(op.generator())
#ifdef EIGEN_USE_SYCL
, m_argImpl(op.expression(), device)
#endif
@@ -111,11 +116,13 @@ struct TensorEvaluator<const TensorGeneratorOp<Generator, ArgType>, Device>
m_strides[0] = 1;
for (int i = 1; i < NumDims; ++i) {
m_strides[i] = m_strides[i - 1] * m_dimensions[i - 1];
+ if (m_strides[i] != 0) m_fast_strides[i] = IndexDivisor(m_strides[i]);
}
} else {
m_strides[NumDims - 1] = 1;
for (int i = NumDims - 2; i >= 0; --i) {
m_strides[i] = m_strides[i + 1] * m_dimensions[i + 1];
+ if (m_strides[i] != 0) m_fast_strides[i] = IndexDivisor(m_strides[i]);
}
}
}
@@ -150,6 +157,75 @@ struct TensorEvaluator<const TensorGeneratorOp<Generator, ArgType>, Device>
return rslt;
}
+ EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void getResourceRequirements(
+ std::vector<internal::TensorOpResourceRequirements>* resources) const {
+ Eigen::Index block_total_size_max = numext::maxi<Eigen::Index>(
+ 1, m_device.firstLevelCacheSize() / sizeof(Scalar));
+ resources->push_back(internal::TensorOpResourceRequirements(
+ internal::kSkewedInnerDims, block_total_size_max));
+ }
+
+ struct BlockIteratorState {
+ Index stride;
+ Index span;
+ Index size;
+ Index count;
+ };
+
+ EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void block(
+ TensorBlock* output_block) const {
+ if (NumDims <= 0) return;
+
+ static const bool is_col_major =
+ static_cast<int>(Layout) == static_cast<int>(ColMajor);
+
+ // Compute spatial coordinates for the first block element.
+ array<Index, NumDims> coords;
+ extract_coordinates(output_block->first_coeff_index(), coords);
+ array<Index, NumDims> initial_coords = coords;
+
+ CoeffReturnType* data = output_block->data();
+ Index offset = 0;
+
+ // Initialize output block iterator state. Dimension in this array are
+ // always in inner_most -> outer_most order (col major layout).
+ array<BlockIteratorState, NumDims> it;
+ for (Index i = 0; i < NumDims; ++i) {
+ const Index dim = is_col_major ? i : NumDims - 1 - i;
+ it[i].size = output_block->block_sizes()[dim];
+ it[i].stride = output_block->block_strides()[dim];
+ it[i].span = it[i].stride * (it[i].size - 1);
+ it[i].count = 0;
+ }
+ eigen_assert(it[0].stride == 1);
+
+ while (it[NumDims - 1].count < it[NumDims - 1].size) {
+ // Generate data for the inner-most dimension.
+ for (Index i = 0; i < it[0].size; ++i) {
+ *(data + offset + i) = m_generator(coords);
+ coords[is_col_major ? 0 : NumDims - 1]++;
+ }
+ coords[is_col_major ? 0 : NumDims - 1] =
+ initial_coords[is_col_major ? 0 : NumDims - 1];
+
+ // For the 1d tensor we need to generate only one inner-most dimension.
+ if (NumDims == 1) break;
+
+ // Update offset.
+ for (Index i = 1; i < NumDims; ++i) {
+ if (++it[i].count < it[i].size) {
+ offset += it[i].stride;
+ coords[is_col_major ? i : NumDims - 1 - i]++;
+ break;
+ }
+ if (i != NumDims - 1) it[i].count = 0;
+ coords[is_col_major ? i : NumDims - 1 - i] =
+ initial_coords[is_col_major ? i : NumDims - 1 - i];
+ offset -= it[i].span;
+ }
+ }
+ }
+
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TensorOpCost
costPerCoeff(bool) const {
// TODO(rmlarsen): This is just a placeholder. Define interface to make
@@ -170,14 +246,14 @@ struct TensorEvaluator<const TensorGeneratorOp<Generator, ArgType>, Device>
void extract_coordinates(Index index, array<Index, NumDims>& coords) const {
if (static_cast<int>(Layout) == static_cast<int>(ColMajor)) {
for (int i = NumDims - 1; i > 0; --i) {
- const Index idx = index / m_strides[i];
+ const Index idx = index / m_fast_strides[i];
index -= idx * m_strides[i];
coords[i] = idx;
}
coords[0] = index;
} else {
for (int i = 0; i < NumDims - 1; ++i) {
- const Index idx = index / m_strides[i];
+ const Index idx = index / m_fast_strides[i];
index -= idx * m_strides[i];
coords[i] = idx;
}
@@ -185,8 +261,10 @@ struct TensorEvaluator<const TensorGeneratorOp<Generator, ArgType>, Device>
}
}
+ const Device& m_device;
Dimensions m_dimensions;
array<Index, NumDims> m_strides;
+ array<IndexDivisor, NumDims> m_fast_strides;
Generator m_generator;
#ifdef EIGEN_USE_SYCL
TensorEvaluator<ArgType, Device> m_argImpl;
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h b/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h
index 50fa0cb2e..bb63433fe 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h
@@ -402,25 +402,25 @@ struct OuterReducer {
#if defined(EIGEN_USE_GPU) && (defined(EIGEN_GPUCC))
-template <int B, int N, typename S, typename R, typename I>
-__global__ void FullReductionKernel(R, const S, I, typename S::CoeffReturnType*, unsigned int*);
+template <int B, int N, typename S, typename R, typename I_>
+__global__ void FullReductionKernel(R, const S, I_, typename S::CoeffReturnType*, unsigned int*);
#if defined(EIGEN_HAS_GPU_FP16)
-template <typename S, typename R, typename I>
-__global__ void ReductionInitFullReduxKernelHalfFloat(R, const S, I, half2*);
-template <int B, int N, typename S, typename R, typename I>
-__global__ void FullReductionKernelHalfFloat(R, const S, I, half*, half2*);
-template <int NPT, typename S, typename R, typename I>
-__global__ void InnerReductionKernelHalfFloat(R, const S, I, I, half*);
+template <typename S, typename R, typename I_>
+__global__ void ReductionInitFullReduxKernelHalfFloat(R, const S, I_, half2*);
+template <int B, int N, typename S, typename R, typename I_>
+__global__ void FullReductionKernelHalfFloat(R, const S, I_, half*, half2*);
+template <int NPT, typename S, typename R, typename I_>
+__global__ void InnerReductionKernelHalfFloat(R, const S, I_, I_, half*);
#endif
-template <int NPT, typename S, typename R, typename I>
-__global__ void InnerReductionKernel(R, const S, I, I, typename S::CoeffReturnType*);
+template <int NPT, typename S, typename R, typename I_>
+__global__ void InnerReductionKernel(R, const S, I_, I_, typename S::CoeffReturnType*);
-template <int NPT, typename S, typename R, typename I>
-__global__ void OuterReductionKernel(R, const S, I, I, typename S::CoeffReturnType*);
+template <int NPT, typename S, typename R, typename I_>
+__global__ void OuterReductionKernel(R, const S, I_, I_, typename S::CoeffReturnType*);
#endif
template <typename Self, typename Op,
@@ -1114,15 +1114,15 @@ struct TensorEvaluator<const TensorReductionOp<Op, Dims, ArgType, MakePointer_>,
template <typename S, typename O, bool V> friend struct internal::FullReducerShard;
#endif
#if defined(EIGEN_USE_GPU) && (defined(EIGEN_GPUCC))
- template <int B, int N, typename S, typename R, typename I> KERNEL_FRIEND void internal::FullReductionKernel(R, const S, I, typename S::CoeffReturnType*, unsigned int*);
+ template <int B, int N, typename S, typename R, typename I_> KERNEL_FRIEND void internal::FullReductionKernel(R, const S, I_, typename S::CoeffReturnType*, unsigned int*);
#if defined(EIGEN_HAS_GPU_FP16)
- template <typename S, typename R, typename I> KERNEL_FRIEND void internal::ReductionInitFullReduxKernelHalfFloat(R, const S, I, half2*);
- template <int B, int N, typename S, typename R, typename I> KERNEL_FRIEND void internal::FullReductionKernelHalfFloat(R, const S, I, half*, half2*);
- template <int NPT, typename S, typename R, typename I> KERNEL_FRIEND void internal::InnerReductionKernelHalfFloat(R, const S, I, I, half*);
+ template <typename S, typename R, typename I_> KERNEL_FRIEND void internal::ReductionInitFullReduxKernelHalfFloat(R, const S, I_, half2*);
+ template <int B, int N, typename S, typename R, typename I_> KERNEL_FRIEND void internal::FullReductionKernelHalfFloat(R, const S, I_, half*, half2*);
+ template <int NPT, typename S, typename R, typename I_> KERNEL_FRIEND void internal::InnerReductionKernelHalfFloat(R, const S, I_, I_, half*);
#endif
- template <int NPT, typename S, typename R, typename I> KERNEL_FRIEND void internal::InnerReductionKernel(R, const S, I, I, typename S::CoeffReturnType*);
+ template <int NPT, typename S, typename R, typename I_> KERNEL_FRIEND void internal::InnerReductionKernel(R, const S, I_, I_, typename S::CoeffReturnType*);
- template <int NPT, typename S, typename R, typename I> KERNEL_FRIEND void internal::OuterReductionKernel(R, const S, I, I, typename S::CoeffReturnType*);
+ template <int NPT, typename S, typename R, typename I_> KERNEL_FRIEND void internal::OuterReductionKernel(R, const S, I_, I_, typename S::CoeffReturnType*);
#endif
#if defined(EIGEN_USE_SYCL)
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
index 7a9ebe40a..8b3b210b1 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
@@ -20,7 +20,8 @@ namespace Eigen {
// if (predicate)
// return act();
// EventCount::Waiter& w = waiters[my_index];
-// ec.Prewait(&w);
+// if (!ec.Prewait(&w))
+// return act();
// if (predicate) {
// ec.CancelWait(&w);
// return act();
@@ -50,78 +51,78 @@ class EventCount {
public:
class Waiter;
- EventCount(MaxSizeVector<Waiter>& waiters) : waiters_(waiters) {
+ EventCount(MaxSizeVector<Waiter>& waiters)
+ : state_(kStackMask), waiters_(waiters) {
eigen_plain_assert(waiters.size() < (1 << kWaiterBits) - 1);
- // Initialize epoch to something close to overflow to test overflow.
- state_ = kStackMask | (kEpochMask - kEpochInc * waiters.size() * 2);
}
~EventCount() {
// Ensure there are no waiters.
- eigen_plain_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask);
+ eigen_plain_assert(state_.load() == kStackMask);
}
// Prewait prepares for waiting.
- // After calling this function the thread must re-check the wait predicate
- // and call either CancelWait or CommitWait passing the same Waiter object.
- void Prewait(Waiter* w) {
- w->epoch = state_.fetch_add(kWaiterInc, std::memory_order_relaxed);
- std::atomic_thread_fence(std::memory_order_seq_cst);
+ // If Prewait returns true, the thread must re-check the wait predicate
+ // and then call either CancelWait or CommitWait.
+ // Otherwise, the thread should assume the predicate may be true
+ // and don't call CancelWait/CommitWait (there was a concurrent Notify call).
+ bool Prewait() {
+ uint64_t state = state_.load(std::memory_order_relaxed);
+ for (;;) {
+ CheckState(state);
+ uint64_t newstate = state + kWaiterInc;
+ if ((state & kSignalMask) != 0) {
+ // Consume the signal and cancel waiting.
+ newstate -= kSignalInc + kWaiterInc;
+ }
+ CheckState(newstate);
+ if (state_.compare_exchange_weak(state, newstate,
+ std::memory_order_seq_cst))
+ return (state & kSignalMask) == 0;
+ }
}
- // CommitWait commits waiting.
+ // CommitWait commits waiting after Prewait.
void CommitWait(Waiter* w) {
+ eigen_plain_assert((w->epoch & ~kEpochMask) == 0);
w->state = Waiter::kNotSignaled;
- // Modification epoch of this waiter.
- uint64_t epoch =
- (w->epoch & kEpochMask) +
- (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
+ const uint64_t me = (w - &waiters_[0]) | w->epoch;
uint64_t state = state_.load(std::memory_order_seq_cst);
for (;;) {
- if (int64_t((state & kEpochMask) - epoch) < 0) {
- // The preceding waiter has not decided on its fate. Wait until it
- // calls either CancelWait or CommitWait, or is notified.
- EIGEN_THREAD_YIELD();
- state = state_.load(std::memory_order_seq_cst);
- continue;
+ CheckState(state, true);
+ uint64_t newstate;
+ if ((state & kSignalMask) != 0) {
+ // Consume the signal and return immidiately.
+ newstate = state - kWaiterInc - kSignalInc;
+ } else {
+ // Remove this thread from pre-wait counter and add to the waiter stack.
+ newstate = ((state & kWaiterMask) - kWaiterInc) | me;
+ w->next.store(state & (kStackMask | kEpochMask),
+ std::memory_order_relaxed);
}
- // We've already been notified.
- if (int64_t((state & kEpochMask) - epoch) > 0) return;
- // Remove this thread from prewait counter and add it to the waiter list.
- eigen_plain_assert((state & kWaiterMask) != 0);
- uint64_t newstate = state - kWaiterInc + kEpochInc;
- newstate = (newstate & ~kStackMask) | (w - &waiters_[0]);
- if ((state & kStackMask) == kStackMask)
- w->next.store(nullptr, std::memory_order_relaxed);
- else
- w->next.store(&waiters_[state & kStackMask], std::memory_order_relaxed);
+ CheckState(newstate);
if (state_.compare_exchange_weak(state, newstate,
- std::memory_order_release))
- break;
+ std::memory_order_acq_rel)) {
+ if ((state & kSignalMask) == 0) {
+ w->epoch += kEpochInc;
+ Park(w);
+ }
+ return;
+ }
}
- Park(w);
}
// CancelWait cancels effects of the previous Prewait call.
- void CancelWait(Waiter* w) {
- uint64_t epoch =
- (w->epoch & kEpochMask) +
- (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
+ void CancelWait() {
uint64_t state = state_.load(std::memory_order_relaxed);
for (;;) {
- if (int64_t((state & kEpochMask) - epoch) < 0) {
- // The preceding waiter has not decided on its fate. Wait until it
- // calls either CancelWait or CommitWait, or is notified.
- EIGEN_THREAD_YIELD();
- state = state_.load(std::memory_order_relaxed);
- continue;
- }
- // We've already been notified.
- if (int64_t((state & kEpochMask) - epoch) > 0) return;
- // Remove this thread from prewait counter.
- eigen_plain_assert((state & kWaiterMask) != 0);
- if (state_.compare_exchange_weak(state, state - kWaiterInc + kEpochInc,
- std::memory_order_relaxed))
+ CheckState(state, true);
+ uint64_t newstate = state - kWaiterInc;
+ // Also take away a signal if any.
+ if ((state & kSignalMask) != 0) newstate -= kSignalInc;
+ CheckState(newstate);
+ if (state_.compare_exchange_weak(state, newstate,
+ std::memory_order_acq_rel))
return;
}
}
@@ -132,35 +133,33 @@ class EventCount {
std::atomic_thread_fence(std::memory_order_seq_cst);
uint64_t state = state_.load(std::memory_order_acquire);
for (;;) {
+ CheckState(state);
+ const uint64_t waiters = (state & kWaiterMask) >> kWaiterShift;
+ const uint64_t signals = (state & kSignalMask) >> kSignalShift;
// Easy case: no waiters.
- if ((state & kStackMask) == kStackMask && (state & kWaiterMask) == 0)
- return;
- uint64_t waiters = (state & kWaiterMask) >> kWaiterShift;
+ if ((state & kStackMask) == kStackMask && waiters == signals) return;
uint64_t newstate;
if (notifyAll) {
- // Reset prewait counter and empty wait list.
- newstate = (state & kEpochMask) + (kEpochInc * waiters) + kStackMask;
- } else if (waiters) {
+ // Empty wait stack and set signal to number of pre-wait threads.
+ newstate =
+ (state & kWaiterMask) | (waiters << kSignalShift) | kStackMask;
+ } else if (signals < waiters) {
// There is a thread in pre-wait state, unblock it.
- newstate = state + kEpochInc - kWaiterInc;
+ newstate = state + kSignalInc;
} else {
// Pop a waiter from list and unpark it.
Waiter* w = &waiters_[state & kStackMask];
- Waiter* wnext = w->next.load(std::memory_order_relaxed);
- uint64_t next = kStackMask;
- if (wnext != nullptr) next = wnext - &waiters_[0];
- // Note: we don't add kEpochInc here. ABA problem on the lock-free stack
- // can't happen because a waiter is re-pushed onto the stack only after
- // it was in the pre-wait state which inevitably leads to epoch
- // increment.
- newstate = (state & kEpochMask) + next;
+ uint64_t next = w->next.load(std::memory_order_relaxed);
+ newstate = (state & (kWaiterMask | kSignalMask)) | next;
}
+ CheckState(newstate);
if (state_.compare_exchange_weak(state, newstate,
- std::memory_order_acquire)) {
- if (!notifyAll && waiters) return; // unblocked pre-wait thread
+ std::memory_order_acq_rel)) {
+ if (!notifyAll && (signals < waiters))
+ return; // unblocked pre-wait thread
if ((state & kStackMask) == kStackMask) return;
Waiter* w = &waiters_[state & kStackMask];
- if (!notifyAll) w->next.store(nullptr, std::memory_order_relaxed);
+ if (!notifyAll) w->next.store(kStackMask, std::memory_order_relaxed);
Unpark(w);
return;
}
@@ -171,11 +170,11 @@ class EventCount {
friend class EventCount;
// Align to 128 byte boundary to prevent false sharing with other Waiter
// objects in the same vector.
- EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<Waiter*> next;
+ EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<uint64_t> next;
std::mutex mu;
std::condition_variable cv;
- uint64_t epoch;
- unsigned state;
+ uint64_t epoch = 0;
+ unsigned state = kNotSignaled;
enum {
kNotSignaled,
kWaiting,
@@ -185,23 +184,41 @@ class EventCount {
private:
// State_ layout:
- // - low kStackBits is a stack of waiters committed wait.
+ // - low kWaiterBits is a stack of waiters committed wait
+ // (indexes in waiters_ array are used as stack elements,
+ // kStackMask means empty stack).
// - next kWaiterBits is count of waiters in prewait state.
- // - next kEpochBits is modification counter.
- static const uint64_t kStackBits = 16;
- static const uint64_t kStackMask = (1ull << kStackBits) - 1;
- static const uint64_t kWaiterBits = 16;
- static const uint64_t kWaiterShift = 16;
+ // - next kWaiterBits is count of pending signals.
+ // - remaining bits are ABA counter for the stack.
+ // (stored in Waiter node and incremented on push).
+ static const uint64_t kWaiterBits = 14;
+ static const uint64_t kStackMask = (1ull << kWaiterBits) - 1;
+ static const uint64_t kWaiterShift = kWaiterBits;
static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1)
<< kWaiterShift;
- static const uint64_t kWaiterInc = 1ull << kWaiterBits;
- static const uint64_t kEpochBits = 32;
- static const uint64_t kEpochShift = 32;
+ static const uint64_t kWaiterInc = 1ull << kWaiterShift;
+ static const uint64_t kSignalShift = 2 * kWaiterBits;
+ static const uint64_t kSignalMask = ((1ull << kWaiterBits) - 1)
+ << kSignalShift;
+ static const uint64_t kSignalInc = 1ull << kSignalShift;
+ static const uint64_t kEpochShift = 3 * kWaiterBits;
+ static const uint64_t kEpochBits = 64 - kEpochShift;
static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift;
static const uint64_t kEpochInc = 1ull << kEpochShift;
std::atomic<uint64_t> state_;
MaxSizeVector<Waiter>& waiters_;
+ static void CheckState(uint64_t state, bool waiter = false) {
+ static_assert(kEpochBits >= 20, "not enough bits to prevent ABA problem");
+ const uint64_t waiters = (state & kWaiterMask) >> kWaiterShift;
+ const uint64_t signals = (state & kSignalMask) >> kSignalShift;
+ eigen_plain_assert(waiters >= signals);
+ eigen_plain_assert(waiters < (1 << kWaiterBits) - 1);
+ eigen_plain_assert(!waiter || waiters > 0);
+ (void)waiters;
+ (void)signals;
+ }
+
void Park(Waiter* w) {
std::unique_lock<std::mutex> lock(w->mu);
while (w->state != Waiter::kSignaled) {
@@ -210,10 +227,10 @@ class EventCount {
}
}
- void Unpark(Waiter* waiters) {
- Waiter* next = nullptr;
- for (Waiter* w = waiters; w; w = next) {
- next = w->next.load(std::memory_order_relaxed);
+ void Unpark(Waiter* w) {
+ for (Waiter* next; w; w = next) {
+ uint64_t wnext = w->next.load(std::memory_order_relaxed) & kStackMask;
+ next = wnext == kStackMask ? nullptr : &waiters_[wnext];
unsigned state;
{
std::unique_lock<std::mutex> lock(w->mu);
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
index 8fafcdab5..9e54254c1 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
@@ -29,6 +29,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
thread_data_(num_threads),
all_coprimes_(num_threads),
waiters_(num_threads),
+ global_steal_partition_(EncodePartition(0, num_threads_)),
blocked_(0),
spinning_(0),
done_(false),
@@ -237,6 +238,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
MaxSizeVector<ThreadData> thread_data_;
MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_;
MaxSizeVector<EventCount::Waiter> waiters_;
+ unsigned global_steal_partition_;
std::atomic<unsigned> blocked_;
std::atomic<bool> spinning_;
std::atomic<bool> done_;
@@ -354,6 +356,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
Task LocalSteal() {
PerThread* pt = GetPerThread();
unsigned partition = GetStealPartition(pt->thread_id);
+ // If thread steal partition is the same as global partition, there is no
+ // need to go through the steal loop twice.
+ if (global_steal_partition_ == partition) return Task();
unsigned start, limit;
DecodePartition(partition, &start, &limit);
AssertBounds(start, limit);
@@ -374,11 +379,11 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
eigen_plain_assert(!t->f);
// We already did best-effort emptiness check in Steal, so prepare for
// blocking.
- ec_.Prewait(waiter);
+ if (!ec_.Prewait()) return true;
// Now do a reliable emptiness check.
int victim = NonEmptyQueueIndex();
if (victim != -1) {
- ec_.CancelWait(waiter);
+ ec_.CancelWait();
if (cancelled_) {
return false;
} else {
@@ -392,7 +397,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
blocked_++;
// TODO is blocked_ required to be unsigned?
if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
- ec_.CancelWait(waiter);
+ ec_.CancelWait();
// Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted
// right after incrementing blocked_ above. Now a free-standing thread
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
index 73928c1d4..a9ae05fc6 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
@@ -150,30 +150,11 @@ class RunQueue {
// Size returns current queue size.
// Can be called by any thread at any time.
- unsigned Size() const {
- // Emptiness plays critical role in thread pool blocking. So we go to great
- // effort to not produce false positives (claim non-empty queue as empty).
- for (;;) {
- // Capture a consistent snapshot of front/tail.
- unsigned front = front_.load(std::memory_order_acquire);
- unsigned back = back_.load(std::memory_order_acquire);
- unsigned front1 = front_.load(std::memory_order_relaxed);
- if (front != front1) continue;
- int size = (front & kMask2) - (back & kMask2);
- // Fix overflow.
- if (size < 0) size += 2 * kSize;
- // Order of modification in push/pop is crafted to make the queue look
- // larger than it is during concurrent modifications. E.g. pop can
- // decrement size before the corresponding push has incremented it.
- // So the computed size can be up to kSize + 1, fix it.
- if (size > static_cast<int>(kSize)) size = kSize;
- return size;
- }
- }
+ unsigned Size() const { return SizeOrNotEmpty<true>(); }
// Empty tests whether container is empty.
// Can be called by any thread at any time.
- bool Empty() const { return Size() == 0; }
+ bool Empty() const { return SizeOrNotEmpty<false>() == 0; }
// Delete all the elements from the queue.
void Flush() {
@@ -206,6 +187,49 @@ class RunQueue {
std::atomic<unsigned> back_;
Elem array_[kSize];
+ // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
+ // only whether the size is 0 is guaranteed to be correct.
+ // Can be called by any thread at any time.
+ template<bool NeedSizeEstimate>
+ unsigned SizeOrNotEmpty() const {
+ // Emptiness plays critical role in thread pool blocking. So we go to great
+ // effort to not produce false positives (claim non-empty queue as empty).
+ unsigned front = front_.load(std::memory_order_acquire);
+ for (;;) {
+ // Capture a consistent snapshot of front/tail.
+ unsigned back = back_.load(std::memory_order_acquire);
+ unsigned front1 = front_.load(std::memory_order_relaxed);
+ if (front != front1) {
+ front = front1;
+ std::atomic_thread_fence(std::memory_order_acquire);
+ continue;
+ }
+ if (NeedSizeEstimate) {
+ return CalculateSize(front, back);
+ } else {
+ // This value will be 0 if the queue is empty, and undefined otherwise.
+ unsigned maybe_zero = ((front ^ back) & kMask2);
+ // Queue size estimate must agree with maybe zero check on the queue
+ // empty/non-empty state.
+ eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
+ return maybe_zero;
+ }
+ }
+ }
+
+ EIGEN_ALWAYS_INLINE
+ unsigned CalculateSize(unsigned front, unsigned back) const {
+ int size = (front & kMask2) - (back & kMask2);
+ // Fix overflow.
+ if (size < 0) size += 2 * kSize;
+ // Order of modification in push/pop is crafted to make the queue look
+ // larger than it is during concurrent modifications. E.g. push can
+ // increment size before the corresponding pop has decremented it.
+ // So the computed size can be up to kSize + 1, fix it.
+ if (size > static_cast<int>(kSize)) size = kSize;
+ return static_cast<unsigned>(size);
+ }
+
RunQueue(const RunQueue&) = delete;
void operator=(const RunQueue&) = delete;
};
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h
index 7229839ac..696c2d03b 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h
@@ -10,6 +10,14 @@
#ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
#define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
+#ifdef EIGEN_AVOID_THREAD_LOCAL
+
+#ifdef EIGEN_THREAD_LOCAL
+#undef EIGEN_THREAD_LOCAL
+#endif
+
+#else
+
#if EIGEN_MAX_CPP_VER >= 11 && \
((EIGEN_COMP_GNUC && EIGEN_GNUC_AT_LEAST(4, 8)) || \
__has_feature(cxx_thread_local) || \
@@ -52,4 +60,6 @@
#endif
#endif // defined(__ANDROID__) && defined(__clang__)
+#endif // EIGEN_AVOID_THREAD_LOCAL
+
#endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
diff --git a/unsupported/Eigen/CXX11/src/util/CXX11Workarounds.h b/unsupported/Eigen/CXX11/src/util/CXX11Workarounds.h
index fe4d22803..f1c0284ea 100644
--- a/unsupported/Eigen/CXX11/src/util/CXX11Workarounds.h
+++ b/unsupported/Eigen/CXX11/src/util/CXX11Workarounds.h
@@ -47,9 +47,9 @@ namespace internal {
*/
-template<std::size_t I, class T> constexpr inline T& array_get(std::vector<T>& a) { return a[I]; }
-template<std::size_t I, class T> constexpr inline T&& array_get(std::vector<T>&& a) { return a[I]; }
-template<std::size_t I, class T> constexpr inline T const& array_get(std::vector<T> const& a) { return a[I]; }
+template<std::size_t I_, class T> constexpr inline T& array_get(std::vector<T>& a) { return a[I_]; }
+template<std::size_t I_, class T> constexpr inline T&& array_get(std::vector<T>&& a) { return a[I_]; }
+template<std::size_t I_, class T> constexpr inline T const& array_get(std::vector<T> const& a) { return a[I_]; }
/* Suppose you have a template of the form
* template<typename T> struct X;
diff --git a/unsupported/Eigen/CXX11/src/util/EmulateArray.h b/unsupported/Eigen/CXX11/src/util/EmulateArray.h
index 39c255791..834b20b55 100644
--- a/unsupported/Eigen/CXX11/src/util/EmulateArray.h
+++ b/unsupported/Eigen/CXX11/src/util/EmulateArray.h
@@ -197,13 +197,13 @@ EIGEN_DEVICE_FUNC bool operator==(const array<T,N>& lhs, const array<T,N>& rhs)
namespace internal {
-template<std::size_t I, class T, std::size_t N>
+template<std::size_t I_, class T, std::size_t N>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE T& array_get(array<T,N>& a) {
- return a[I];
+ return a[I_];
}
-template<std::size_t I, class T, std::size_t N>
+template<std::size_t I_, class T, std::size_t N>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE const T& array_get(const array<T,N>& a) {
- return a[I];
+ return a[I_];
}
template<class T, std::size_t N> struct array_size<array<T,N> > {
@@ -240,16 +240,16 @@ namespace internal {
* this may not be constexpr
*/
#if defined(__GLIBCXX__) && __GLIBCXX__ < 20120322
-#define STD_GET_ARR_HACK a._M_instance[I]
+#define STD_GET_ARR_HACK a._M_instance[I_]
#elif defined(_LIBCPP_VERSION)
-#define STD_GET_ARR_HACK a.__elems_[I]
+#define STD_GET_ARR_HACK a.__elems_[I_]
#else
-#define STD_GET_ARR_HACK std::template get<I, T, N>(a)
+#define STD_GET_ARR_HACK std::template get<I_, T, N>(a)
#endif
-template<std::size_t I, class T, std::size_t N> constexpr inline T& array_get(std::array<T,N>& a) { return (T&) STD_GET_ARR_HACK; }
-template<std::size_t I, class T, std::size_t N> constexpr inline T&& array_get(std::array<T,N>&& a) { return (T&&) STD_GET_ARR_HACK; }
-template<std::size_t I, class T, std::size_t N> constexpr inline T const& array_get(std::array<T,N> const& a) { return (T const&) STD_GET_ARR_HACK; }
+template<std::size_t I_, class T, std::size_t N> constexpr inline T& array_get(std::array<T,N>& a) { return (T&) STD_GET_ARR_HACK; }
+template<std::size_t I_, class T, std::size_t N> constexpr inline T&& array_get(std::array<T,N>&& a) { return (T&&) STD_GET_ARR_HACK; }
+template<std::size_t I_, class T, std::size_t N> constexpr inline T const& array_get(std::array<T,N> const& a) { return (T const&) STD_GET_ARR_HACK; }
#undef STD_GET_ARR_HACK
diff --git a/unsupported/Eigen/CXX11/src/util/EmulateCXX11Meta.h b/unsupported/Eigen/CXX11/src/util/EmulateCXX11Meta.h
index 8a536faf6..d02d86f85 100644
--- a/unsupported/Eigen/CXX11/src/util/EmulateCXX11Meta.h
+++ b/unsupported/Eigen/CXX11/src/util/EmulateCXX11Meta.h
@@ -166,13 +166,13 @@ array<t, n> repeat(t v) {
return array;
}
-template<std::size_t I, class Head, class Tail>
+template<std::size_t I_, class Head, class Tail>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE typename Head::type array_get(type_list<Head, Tail>&) {
- return get<I, type_list<Head, Tail> >::value;
+ return get<I_, type_list<Head, Tail> >::value;
}
-template<std::size_t I, class Head, class Tail>
+template<std::size_t I_, class Head, class Tail>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE typename Head::type array_get(const type_list<Head, Tail>&) {
- return get<I, type_list<Head, Tail> >::value;
+ return get<I_, type_list<Head, Tail> >::value;
}
template <class NList>
@@ -200,13 +200,13 @@ EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE t array_prod(const std::vector<t>& a) {
}
-template<std::size_t I, class T>
+template<std::size_t I_, class T>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE T& array_get(std::vector<T>& a) {
- return a[I];
+ return a[I_];
}
-template<std::size_t I, class T>
+template<std::size_t I_, class T>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE const T& array_get(const std::vector<T>& a) {
- return a[I];
+ return a[I_];
}
struct sum_op {