diff options
author | Rasmus Munk Larsen <rmlarsen@google.com> | 2016-05-12 14:07:22 -0700 |
---|---|---|
committer | Rasmus Munk Larsen <rmlarsen@google.com> | 2016-05-12 14:07:22 -0700 |
commit | e55deb21c59109f3ed2ade858031116503b2c313 (patch) | |
tree | b369d3ded141916c24ad6464001b9c951bdb8d1d /unsupported/Eigen/CXX11/src | |
parent | ae9688f3139579b9e87560ad48e62d1205fb3eb3 (diff) |
Improvements to parallelFor.
Move some scalar functors from TensorFunctors. to Eigen core.
Diffstat (limited to 'unsupported/Eigen/CXX11/src')
3 files changed, 76 insertions, 143 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index 4df4cc220..fab1e316a 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -172,67 +172,69 @@ struct ThreadPoolDevice { pool_->Schedule(func); } - // parallelFor executes f with [0, size) arguments in parallel and waits for - // completion. Block size is choosen between min_block_size and - // 2 * min_block_size to achieve the best parallel efficiency. - // If min_block_size == -1, parallelFor uses block size of 1. - // If hard_align > 0, block size is aligned to hard_align. - // If soft_align > hard_align, block size is aligned to soft_align provided - // that it does not increase block size too much. - void parallelFor(Index size, Index min_block_size, Index hard_align, - Index soft_align, + // parallelFor executes f with [0, n) arguments in parallel and waits for + // completion. F accepts a half-open interval [first, last). + // Block size is choosen based on the iteration cost and resulting parallel + // efficiency. If block_align is not nullptr, it is called to round up the + // block size. + void parallelFor(Index n, const TensorOpCost& cost, + std::function<Index(Index)> block_align, std::function<void(Index, Index)> f) const { - if (size <= 1 || (min_block_size != -1 && size < min_block_size) || - numThreads() == 1) { - f(0, size); + typedef TensorCostModel<ThreadPoolDevice> CostModel; + if (n <= 1 || numThreads() == 1 || + CostModel::numThreads(n, cost, numThreads()) == 1) { + f(0, n); return; } - Index block_size = 1; - Index block_count = size; - if (min_block_size != -1) { - // Calculate block size based on (1) estimated cost and (2) parallel - // efficiency. We want blocks to be not too small to mitigate - // parallelization overheads; not too large to mitigate tail effect and - // potential load imbalance and we also want number of blocks to be evenly - // dividable across threads. - min_block_size = numext::maxi<Index>(min_block_size, 1); - block_size = numext::mini(min_block_size, size); - // Upper bound on block size: - const Index max_block_size = numext::mini(min_block_size * 2, size); - block_size = numext::mini( - alignBlockSize(block_size, hard_align, soft_align), size); - block_count = divup(size, block_size); - // Calculate parallel efficiency as fraction of total CPU time used for - // computations: - double max_efficiency = - static_cast<double>(block_count) / - (divup<int>(block_count, numThreads()) * numThreads()); - // Now try to increase block size up to max_block_size as long as it - // doesn't decrease parallel efficiency. - for (Index prev_block_count = block_count; prev_block_count > 1;) { - // This is the next block size that divides size into a smaller number - // of blocks than the current block_size. - Index coarser_block_size = divup(size, prev_block_count - 1); - coarser_block_size = - alignBlockSize(coarser_block_size, hard_align, soft_align); - if (coarser_block_size > max_block_size) { - break; // Reached max block size. Stop. - } - // Recalculate parallel efficiency. - const Index coarser_block_count = divup(size, coarser_block_size); - eigen_assert(coarser_block_count < prev_block_count); - prev_block_count = coarser_block_count; - const double coarser_efficiency = - static_cast<double>(coarser_block_count) / - (divup<int>(coarser_block_count, numThreads()) * numThreads()); - if (coarser_efficiency + 0.01 >= max_efficiency) { - // Taking it. - block_size = coarser_block_size; - block_count = coarser_block_count; - if (max_efficiency < coarser_efficiency) { - max_efficiency = coarser_efficiency; - } + // Calculate block size based on (1) the iteration cost and (2) parallel + // efficiency. We want blocks to be not too small to mitigate + // parallelization overheads; not too large to mitigate tail + // effect and potential load imbalance and we also want number + // of blocks to be evenly dividable across threads. + + double block_size_f = 1.0 / CostModel::taskSize(1, cost); + Index block_size = numext::mini(n, numext::maxi<Index>(1, block_size_f)); + const Index max_block_size = + numext::mini(n, numext::maxi<Index>(1, 2 * block_size_f)); + if (block_align) { + Index new_block_size = block_align(block_size); + eigen_assert(new_block_size >= block_size); + block_size = numext::mini(n, new_block_size); + } + Index block_count = divup(n, block_size); + // Calculate parallel efficiency as fraction of total CPU time used for + // computations: + double max_efficiency = + static_cast<double>(block_count) / + (divup<int>(block_count, numThreads()) * numThreads()); + // Now try to increase block size up to max_block_size as long as it + // doesn't decrease parallel efficiency. + for (Index prev_block_count = block_count; prev_block_count > 1;) { + // This is the next block size that divides size into a smaller number + // of blocks than the current block_size. + Index coarser_block_size = divup(n, prev_block_count - 1); + if (block_align) { + Index new_block_size = block_align(coarser_block_size); + eigen_assert(new_block_size >= coarser_block_size); + coarser_block_size = numext::mini(n, new_block_size); + } + if (coarser_block_size > max_block_size) { + break; // Reached max block size. Stop. + } + // Recalculate parallel efficiency. + const Index coarser_block_count = divup(n, coarser_block_size); + eigen_assert(coarser_block_count < prev_block_count); + prev_block_count = coarser_block_count; + const double coarser_efficiency = + static_cast<double>(coarser_block_count) / + (divup<int>(coarser_block_count, numThreads()) * numThreads()); + if (coarser_efficiency + 0.01 >= max_efficiency) { + // Taking it. + block_size = coarser_block_size; + block_count = coarser_block_count; + if (max_efficiency < coarser_efficiency) { + max_efficiency = coarser_efficiency; } } } @@ -251,26 +253,20 @@ struct ThreadPoolDevice { } // Split into halves and submit to the pool. Index mid = first + divup((last - first) / 2, block_size) * block_size; - pool_->Schedule([=, &handleRange]() { handleRange(mid, last); }); - pool_->Schedule([=, &handleRange]() { handleRange(first, mid); }); + enqueue_func([=, &handleRange]() { handleRange(mid, last); }); + enqueue_func([=, &handleRange]() { handleRange(first, mid); }); }; - handleRange(0, size); + handleRange(0, n); barrier.Wait(); } - private: - static Index alignBlockSize(Index size, Index hard_align, Index soft_align) { - if (soft_align > hard_align && size >= 4 * soft_align) { - // Align to soft_align, if it won't increase size by more than 25%. - return (size + soft_align - 1) & ~(soft_align - 1); - } - if (hard_align > 0) { - return (size + hard_align - 1) & ~(hard_align - 1); - } - return size; + // Convinience wrapper for parallelFor that does not align blocks. + void parallelFor(Index n, const TensorOpCost& cost, + std::function<void(Index, Index)> f) const { + parallelFor(n, cost, nullptr, std::move(f)); } - + private: ThreadPoolInterface* pool_; size_t num_threads_; }; diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h index 1155354cd..e0df13e78 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h @@ -137,6 +137,13 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable> { { const Index PacketSize = Vectorizable ? unpacket_traits<typename Evaluator::PacketReturnType>::size : 1; const Index size = array_prod(evaluator.dimensions()); +#if defined(EIGEN_USE_NONBLOCKING_THREAD_POOL) && defined(EIGEN_USE_COST_MODEL) + device.parallelFor(size, evaluator.costPerCoeff(Vectorizable), + EvalRange::alignBlockSize, + [&evaluator](Index first, Index last) { + EvalRange::run(&evaluator, first, last); + }); +#else size_t num_threads = device.numThreads(); #ifdef EIGEN_USE_COST_MODEL if (num_threads > 1) { @@ -163,11 +170,12 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable> { } barrier.Wait(); } +#endif // EIGEN_USE_NONBLOCKING_THREAD_POOL } evaluator.cleanup(); } }; -#endif +#endif // EIGEN_USE_THREADS // GPU: the evaluation of the expression is offloaded to a GPU. diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorFunctors.h b/unsupported/Eigen/CXX11/src/Tensor/TensorFunctors.h index c674fcfe1..d07063444 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorFunctors.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorFunctors.h @@ -13,77 +13,6 @@ namespace Eigen { namespace internal { - -/** \internal - * \brief Template functor to compute the modulo between an array and a scalar. - */ -template <typename Scalar> -struct scalar_mod_op { - EIGEN_DEVICE_FUNC scalar_mod_op(const Scalar& divisor) : m_divisor(divisor) {} - EIGEN_DEVICE_FUNC inline Scalar operator() (const Scalar& a) const { return a % m_divisor; } - const Scalar m_divisor; -}; -template <typename Scalar> -struct functor_traits<scalar_mod_op<Scalar> > -{ enum { Cost = NumTraits<Scalar>::template Div<false>::Cost, PacketAccess = false }; }; - - -/** \internal - * \brief Template functor to compute the modulo between 2 arrays. - */ -template <typename Scalar> -struct scalar_mod2_op { - EIGEN_EMPTY_STRUCT_CTOR(scalar_mod2_op); - EIGEN_DEVICE_FUNC inline Scalar operator() (const Scalar& a, const Scalar& b) const { return a % b; } -}; -template <typename Scalar> -struct functor_traits<scalar_mod2_op<Scalar> > -{ enum { Cost = NumTraits<Scalar>::template Div<false>::Cost, PacketAccess = false }; }; - -template <typename Scalar> -struct scalar_fmod_op { - EIGEN_EMPTY_STRUCT_CTOR(scalar_fmod_op); - EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Scalar - operator()(const Scalar& a, const Scalar& b) const { - return numext::fmod(a, b); - } -}; -template <typename Scalar> -struct functor_traits<scalar_fmod_op<Scalar> > { - enum { Cost = 13, // Reciprocal throughput of FPREM on Haswell. - PacketAccess = false }; -}; - - -/** \internal - * \brief Template functor to compute the sigmoid of a scalar - * \sa class CwiseUnaryOp, ArrayBase::sigmoid() - */ -template <typename T> -struct scalar_sigmoid_op { - EIGEN_EMPTY_STRUCT_CTOR(scalar_sigmoid_op) - EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE T operator()(const T& x) const { - const T one = T(1); - return one / (one + numext::exp(-x)); - } - - template <typename Packet> EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE - Packet packetOp(const Packet& x) const { - const Packet one = pset1<Packet>(T(1)); - return pdiv(one, padd(one, pexp(pnegate(x)))); - } -}; - -template <typename T> -struct functor_traits<scalar_sigmoid_op<T> > { - enum { - Cost = NumTraits<T>::AddCost * 2 + NumTraits<T>::MulCost * 6, - PacketAccess = packet_traits<T>::HasAdd && packet_traits<T>::HasDiv && - packet_traits<T>::HasNegate && packet_traits<T>::HasExp - }; -}; - - // Standard reduction functors template <typename T> struct SumReducer { |