From db9dbbda3204d01bf1862df45b22fb561791156d Mon Sep 17 00:00:00 2001 From: Benoit Steiner Date: Mon, 29 Jun 2015 14:06:32 -0700 Subject: Improved performance of full reduction by 2 order of magnitude on CPU and 3 orders of magnitude on GPU --- .../Eigen/CXX11/src/Tensor/TensorDeviceType.h | 26 +- .../Eigen/CXX11/src/Tensor/TensorReduction.h | 438 +++++++++++++++++++-- 2 files changed, 438 insertions(+), 26 deletions(-) (limited to 'unsupported/Eigen/CXX11') diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h index 1018395a1..76154d58c 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h @@ -28,8 +28,25 @@ struct DefaultDevice { ::memset(buffer, c, n); } - EIGEN_STRONG_INLINE size_t numThreads() const { + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE size_t numThreads() const { +#ifndef __CUDA_ARCH__ + // Running on the host CPU + return 1; +#else + // Running on a CUDA device + return 32; +#endif + } + + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const { +#ifndef __CUDA_ARCH__ + // Running single threaded on the host CPU + // Should return an enum that encodes the ISA supported by the CPU return 1; +#else + // Running on a CUDA device + return __CUDA_ARCH__ / 100; +#endif } }; @@ -204,6 +221,11 @@ struct ThreadPoolDevice { return num_threads_; } + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const { + // Should return an enum that encodes the ISA supported by the CPU + return 1; + } + template EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const { Notification* n = new Notification(); @@ -308,6 +330,8 @@ struct GpuDevice { return 32; } + inline int majorDeviceVersion() const { return m_deviceProperties.major; } + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void synchronize() const { cudaStreamSynchronize(*stream_); } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h b/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h index 95116aaee..f31db652d 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h @@ -44,6 +44,38 @@ struct nested, 1, typename eval struct DimInitializer { + template EIGEN_DEVICE_FUNC + static void run(const InputDims& input_dims, + const array::value>& reduced, + OutputDims* output_dims, ReducedDims* reduced_dims) { + const int NumInputDims = internal::array_size::value; + int outputIndex = 0; + int reduceIndex = 0; + for (int i = 0; i < NumInputDims; ++i) { + if (reduced[i]) { + (*reduced_dims)[reduceIndex] = input_dims[i]; + ++reduceIndex; + } else { + (*output_dims)[outputIndex] = input_dims[i]; + ++outputIndex; + } + } + } +}; + +template <> struct DimInitializer > { + template EIGEN_DEVICE_FUNC + static void run(const InputDims& input_dims, const array& reduced, + Sizes<1>* output_dims, array* reduced_dims) { + const int NumInputDims = internal::array_size::value; + for (int i = 0; i < NumInputDims; ++i) { + (*reduced_dims)[i] = input_dims[i]; + } + } +}; + + template struct are_inner_most_dims { static const bool value = false; @@ -144,7 +176,7 @@ template struct InnerMostDimPreserver { static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void reduce(const Self& self, typename Self::Index firstIndex, Op& reducer, typename Self::PacketReturnType* accum) { EIGEN_STATIC_ASSERT(DimIndex > 0, YOU_MADE_A_PROGRAMMING_MISTAKE); - for (int j = 0; j < self.m_reducedDims[DimIndex]; ++j) { + for (typename Self::Index j = 0; j < self.m_reducedDims[DimIndex]; ++j) { const typename Self::Index input = firstIndex + j * self.m_reducedStrides[DimIndex]; InnerMostDimPreserver::reduce(self, input, reducer, accum); } @@ -154,13 +186,325 @@ struct InnerMostDimPreserver { template struct InnerMostDimPreserver<0, Self, Op, true> { static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void reduce(const Self& self, typename Self::Index firstIndex, Op& reducer, typename Self::PacketReturnType* accum) { - for (int j = 0; j < self.m_reducedDims[0]; ++j) { + for (typename Self::Index j = 0; j < self.m_reducedDims[0]; ++j) { const typename Self::Index input = firstIndex + j * self.m_reducedStrides[0]; reducer.reducePacket(self.m_impl.template packet(input), accum); } } }; +// Default full reducer +template +struct FullReducer { + static const bool HasOptimizedImplementation = false; + + static EIGEN_DEVICE_FUNC void run(const Self& self, Op& reducer, const Device&, typename Self::CoeffReturnType* output) { + const typename Self::Index num_coeffs = array_prod(self.m_impl.dimensions()); + *output = InnerMostDimReducer::reduce(self, 0, num_coeffs, reducer); + } +}; + + +#ifdef EIGEN_USE_THREADS +// Multithreaded full reducers +template +struct FullReducerShard { + static void run(const Eval& eval, typename Eval::Index firstIndex, typename Eval::Index numValuesToReduce, Op& reducer, FullReducerShard* shard) { + + shard->saccum = reducer.initialize(); + for (typename Eval::Index j = 0; j < numValuesToReduce; ++j) { + reducer.reduce(eval.m_impl.coeff(firstIndex + j), &shard->saccum); + } + } + + typename Eval::CoeffReturnType saccum; +}; + +template +struct FullReducerShard { + static void run(const Eval& eval, typename Eval::Index firstIndex, typename Eval::Index numValuesToReduce, Op& reducer, FullReducerShard* shard) { + + const int packetSize = internal::unpacket_traits::size; + const typename Eval::Index VectorizedSize = (numValuesToReduce / packetSize) * packetSize; + + shard->paccum = reducer.template initializePacket(); + for (typename Eval::Index j = 0; j < VectorizedSize; j += packetSize) { + reducer.reducePacket(eval.m_impl.template packet(firstIndex + j), &shard->paccum); + } + shard->saccum = reducer.initialize(); + for (typename Eval::Index j = VectorizedSize; j < numValuesToReduce; ++j) { + reducer.reduce(eval.m_impl.coeff(firstIndex + j), &shard->saccum); + } + } + + typename Eval::PacketReturnType paccum; + typename Eval::CoeffReturnType saccum; +}; + + +template +struct FullReducer { + static const bool HasOptimizedImplementation = !Op::IsStateful; + + // launch one reducer per thread and accumulate the result. + static void run(const Self& self, Op& reducer, const ThreadPoolDevice& device, typename Self::CoeffReturnType* output) { + typedef typename Self::Index Index; + const Index num_coeffs = array_prod(self.m_impl.dimensions()); + const Index blocksize = std::floor(static_cast(num_coeffs)/device.numThreads()); + const Index numblocks = blocksize > 0 ? num_coeffs / blocksize : 0; + eigen_assert(num_coeffs >= numblocks * blocksize); + + std::vector results; + results.reserve(numblocks); + std::vector > shards; + shards.resize(numblocks); + for (Index i = 0; i < numblocks; ++i) { + results.push_back(device.enqueue(&FullReducerShard::run, self, i*blocksize, blocksize, reducer, &shards[i])); + } + + FullReducerShard finalShard; + if (numblocks * blocksize < num_coeffs) { + FullReducerShard::run(self, numblocks * blocksize, num_coeffs - numblocks * blocksize, reducer, &finalShard); + } else { + finalShard.saccum = reducer.initialize(); + } + + for (Index i = 0; i < numblocks; ++i) { + wait_until_ready(results[i]); + delete results[i]; + } + + for (Index i = 0; i < numblocks; ++i) { + reducer.reduce(shards[i].saccum, &finalShard.saccum); + } + *output = reducer.finalize(finalShard.saccum); + } +}; + +template +struct FullReducer { + static const bool HasOptimizedImplementation = !Op::IsStateful; + + // launch one reducer per thread and accumulate the result. + static void run(const Self& self, Op& reducer, const ThreadPoolDevice& device, typename Self::CoeffReturnType* output) { + typedef typename Self::Index Index; + const Index num_coeffs = array_prod(self.m_impl.dimensions()); + const Index blocksize = std::floor(static_cast(num_coeffs)/device.numThreads()); + const Index numblocks = blocksize > 0 ? num_coeffs / blocksize : 0; + eigen_assert(num_coeffs >= numblocks * blocksize); + + std::vector results; + results.reserve(numblocks); + std::vector > shards; + shards.resize(numblocks); + for (Index i = 0; i < numblocks; ++i) { + results.push_back(device.enqueue(&FullReducerShard::run, self, i*blocksize, blocksize, reducer, &shards[i])); + } + + FullReducerShard finalShard; + if (numblocks * blocksize < num_coeffs) { + FullReducerShard::run(self, numblocks * blocksize, num_coeffs - numblocks * blocksize, reducer, &finalShard); + } else { + finalShard.paccum = reducer.template initializePacket(); + finalShard.saccum = reducer.initialize(); + } + + for (Index i = 0; i < numblocks; ++i) { + wait_until_ready(results[i]); + delete results[i]; + } + + for (Index i = 0; i < numblocks; ++i) { + reducer.reducePacket(shards[i].paccum, &finalShard.paccum); + reducer.reduce(shards[i].saccum, &finalShard.saccum); + } + + *output = reducer.finalizeBoth(finalShard.saccum, finalShard.paccum); + } +}; +#endif + + +#if defined(EIGEN_USE_GPU) && defined(__CUDACC__) +// Full reducers for GPU, don't vectorize for now + +// Reducer function that enables multiple cuda thread to safely accumulate at the same +// output address. It basically reads the current value of the output variable, and +// attempts to update it with the new value. If in the meantime another cuda thread +// updated the content of the output address it will try again. +template +__device__ EIGEN_ALWAYS_INLINE void atomicReduce(T* output, T accum, R& reducer) { +#if __CUDA_ARCH__ >= 300 + if (sizeof(T) == 4) + { + unsigned int oldval = *reinterpret_cast(output); + unsigned int newval = oldval; + reducer.reduce(accum, reinterpret_cast(&newval)); + if (newval == oldval) { + return; + } + unsigned int readback; + while ((readback = atomicCAS((unsigned int*)output, oldval, newval)) != oldval) { + oldval = readback; + newval = oldval; + reducer.reduce(accum, reinterpret_cast(&newval)); + if (newval == oldval) { + return; + } + } + } + else if (sizeof(T) == 8) { + unsigned long long oldval = *reinterpret_cast(output); + unsigned long long newval = oldval; + reducer.reduce(accum, reinterpret_cast(&newval)); + if (newval == oldval) { + return; + } + unsigned long long readback; + while ((readback = atomicCAS((unsigned long long*)output, oldval, newval)) != oldval) { + oldval = readback; + newval = oldval; + reducer.reduce(accum, reinterpret_cast(&newval)); + if (newval == oldval) { + return; + } + } + } + else { + assert(0 && "Wordsize not supported"); + } +#else + assert(0 && "Shouldn't be called on unsupported device"); +#endif +} + +template +__device__ inline void atomicReduce(T* output, T accum, SumReducer&) { +#if __CUDA_ARCH__ >= 300 + atomicAdd(output, accum); +#else + assert(0 && "Shouldn't be called on unsupported device"); +#endif +} + +template +__global__ void FullReductionKernel(Reducer reducer, const Self input, Index num_coeffs, + typename Self::CoeffReturnType* output) { + const Index first_index = blockIdx.x * BlockSize * NumPerThread + threadIdx.x; + + if (first_index == 0) { + *output = reducer.initialize(); + } + + typename Self::CoeffReturnType accum = reducer.initialize(); + for (Index i = 0; i < NumPerThread; ++i) { + const Index index = first_index + i * BlockSize; + if (index >= num_coeffs) { + break; + } + typename Self::CoeffReturnType val = input.m_impl.coeff(index); + reducer.reduce(val, &accum); + } + + for (int offset = warpSize/2; offset > 0; offset /= 2) { + reducer.reduce(__shfl_down(accum, offset), &accum); + } + + if ((threadIdx.x & (warpSize - 1)) == 0) { + atomicReduce(output, accum, reducer); + } +} + + +template +struct FullReducer { + // Unfortunately nvidia doesn't support well exotic types such as complex, + // so reduce the scope of the optimized version of the code to the simple case + // of floats. + static const bool HasOptimizedImplementation = !Op::IsStateful && + internal::is_same::value; + + template + static void run(const Self& self, Op& reducer, const GpuDevice& device, OutputType* output) { + assert(false && "Should only be called on floats"); + } + + static void run(const Self& self, Op& reducer, const GpuDevice& device, float* output) { + typedef typename Self::Index Index; + + const Index num_coeffs = array_prod(self.m_impl.dimensions()); + const int block_size = 256; + const int num_per_thread = 128; + const int num_blocks = std::ceil(static_cast(num_coeffs) / (block_size * num_per_thread)); + LAUNCH_CUDA_KERNEL((FullReductionKernel), + num_blocks, block_size, 0, device, reducer, self, num_coeffs, output); + } +}; + +#endif + + +template +class BlockReducer { + public: + typedef typename Self::Index Index; + typedef typename Self::Scalar Scalar; + typedef typename Self::CoeffReturnType CoeffReturnType; + explicit BlockReducer(const Op& reducer) : op_(reducer) { + accum_ = op_.initialize(); + } + void Reduce(Index index, Index num_values_to_reduce, Scalar* data) { + for (Index i = 0; i < num_values_to_reduce; ++i) { + op_.reduce(data[index + i], &accum_); + } + } + CoeffReturnType Finalize() { + return op_.finalize(accum_); + } + + private: + CoeffReturnType accum_; + Op op_; +}; + + +template +class BlockReducer { + public: + typedef typename Self::Index Index; + typedef typename Self::Scalar Scalar; + typedef typename Self::CoeffReturnType CoeffReturnType; + typedef typename Self::PacketReturnType PacketReturnType; + explicit BlockReducer(const Op& reducer) : op_(reducer) { + vaccum_ = op_.template initializePacket(); + accum_ = op_.initialize(); + } + void Reduce(Index index, Index num_values_to_reduce, Scalar* data) { + const int packet_size = internal::unpacket_traits::size; + const typename Self::Index vectorized_size = (num_values_to_reduce / + packet_size) * packet_size; + for (typename Self::Index i = 0; i < vectorized_size; i += packet_size) { + op_.reducePacket(internal::ploadt( + &data[index + i]), &vaccum_); + } + + for (typename Self::Index i = vectorized_size; + i < num_values_to_reduce; ++i) { + op_.reduce(data[index + i], &accum_); + } + } + typename Self::CoeffReturnType Finalize() { + return op_.finalizeBoth(accum_, vaccum_); + } + + private: + typename Self::PacketReturnType vaccum_; + typename Self::CoeffReturnType accum_; + Op op_; +}; + } // end namespace internal @@ -179,6 +523,7 @@ class TensorReductionOp : public TensorBase EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TensorReductionOp(const XprType& expr, const Dims& dims) : m_expr(expr), m_dims(dims) { } + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TensorReductionOp(const XprType& expr, const Dims& dims, const Op& reducer) : m_expr(expr), m_dims(dims), m_reducer(reducer) { } @@ -186,6 +531,7 @@ class TensorReductionOp : public TensorBase const XprType& expression() const { return m_expr; } EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE const Dims& dims() const { return m_dims; } + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE const Op& reducer() const { return m_reducer; } protected: @@ -201,10 +547,11 @@ struct TensorEvaluator, Device> { typedef TensorReductionOp XprType; typedef typename XprType::Index Index; - static const int NumInputDims = internal::array_size::Dimensions>::value; + typedef typename TensorEvaluator::Dimensions InputDimensions; + static const int NumInputDims = internal::array_size::value; static const int NumReducedDims = internal::array_size::value; static const int NumOutputDims = (NumInputDims==NumReducedDims) ? 1 : NumInputDims - NumReducedDims; - typedef DSizes Dimensions; + typedef typename internal::conditional, DSizes >::type Dimensions; typedef typename XprType::Scalar Scalar; typedef TensorEvaluator, Device> Self; static const bool InputPacketAccess = TensorEvaluator::PacketAccess; @@ -218,9 +565,10 @@ struct TensorEvaluator, Device> static const bool ReducingInnerMostDims = internal::are_inner_most_dims::value; static const bool PreservingInnerMostDims = internal::preserve_inner_most_dims::value; + static const bool RunningFullReduction = (NumInputDims==NumReducedDims); EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TensorEvaluator(const XprType& op, const Device& device) - : m_impl(op.expression(), device), m_reducer(op.reducer()) + : m_impl(op.expression(), device), m_reducer(op.reducer()), m_result(NULL), m_device(device) { EIGEN_STATIC_ASSERT(NumInputDims >= NumReducedDims, YOU_MADE_A_PROGRAMMING_MISTAKE); EIGEN_STATIC_ASSERT((!ReducingInnerMostDims | !PreservingInnerMostDims | (NumReducedDims == NumInputDims)), @@ -238,17 +586,7 @@ struct TensorEvaluator, Device> } const typename TensorEvaluator::Dimensions& input_dims = m_impl.dimensions(); - int outputIndex = 0; - int reduceIndex = 0; - for (int i = 0; i < NumInputDims; ++i) { - if (reduced[i]) { - m_reducedDims[reduceIndex] = input_dims[i]; - ++reduceIndex; - } else { - m_dimensions[outputIndex] = input_dims[i]; - ++outputIndex; - } - } + internal::DimInitializer::run(input_dims, reduced, &m_dimensions, &m_reducedDims); // Precompute output strides. if (static_cast(Layout) == static_cast(ColMajor)) { @@ -277,8 +615,8 @@ struct TensorEvaluator, Device> } } - outputIndex = 0; - reduceIndex = 0; + int outputIndex = 0; + int reduceIndex = 0; for (int i = 0; i < NumInputDims; ++i) { if (reduced[i]) { m_reducedStrides[reduceIndex] = input_strides[i]; @@ -291,27 +629,50 @@ struct TensorEvaluator, Device> // Special case for full reductions if (NumInputDims == NumReducedDims) { - m_dimensions[0] = 1; + eigen_assert(m_dimensions[0] == 1); m_preservedStrides[0] = internal::array_prod(input_dims); } } EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE const Dimensions& dimensions() const { return m_dimensions; } - EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(Scalar* /*data*/) { + typedef typename internal::remove_const::type CoeffReturnType; + typedef typename internal::remove_const::type PacketReturnType; + + EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(CoeffReturnType* data) { m_impl.evalSubExprsIfNeeded(NULL); + + // Use the FullReducer if possible. + if (RunningFullReduction && internal::FullReducer::HasOptimizedImplementation && + ((RunningOnGPU && (m_device.majorDeviceVersion() >= 3)) || + (internal::array_prod(m_impl.dimensions()) > 1024 * 1024))) { + + bool need_assign = false; + if (!data) { + m_result = static_cast(m_device.allocate(sizeof(CoeffReturnType))); + data = m_result; + need_assign = true; + } + + Op reducer(m_reducer); + internal::FullReducer::run(*this, reducer, m_device, data); + return need_assign; + } return true; } EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() { m_impl.cleanup(); + if (m_result) { + m_device.deallocate(m_result); + } } - typedef typename internal::remove_const::type CoeffReturnType; - typedef typename internal::remove_const::type PacketReturnType; - EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE CoeffReturnType coeff(Index index) const { + if (RunningFullReduction && m_result) { + return *m_result; + } Op reducer(m_reducer); if (ReducingInnerMostDims) { const Index num_values_to_reduce = @@ -372,6 +733,13 @@ struct TensorEvaluator, Device> template friend struct internal::GenericDimReducer; template friend struct internal::InnerMostDimReducer; template friend struct internal::InnerMostDimPreserver; + template friend struct internal::FullReducer; +#ifdef EIGEN_USE_THREADS + template friend struct internal::FullReducerShard; +#endif +#if defined(EIGEN_USE_GPU) && defined(__CUDACC__) + template friend void internal::FullReductionKernel(R, const S, I, typename S::CoeffReturnType*); +#endif // Returns the Index in the input tensor of the first value that needs to be // used to compute the reduction at output index "index". @@ -392,7 +760,12 @@ struct TensorEvaluator, Device> startInput += idx * m_preservedStrides[i]; index -= idx * m_outputStrides[i]; } - startInput += index * m_preservedStrides[0]; + if (PreservingInnerMostDims) { + eigen_assert(m_preservedStrides[0] == 1); + startInput += index; + } else { + startInput += index * m_preservedStrides[0]; + } } else { for (int i = 0; i < NumOutputDims - 1; ++i) { // This is index_i in the output tensor. @@ -400,7 +773,12 @@ struct TensorEvaluator, Device> startInput += idx * m_preservedStrides[i]; index -= idx * m_outputStrides[i]; } - startInput += index * m_preservedStrides[NumOutputDims - 1]; + if (PreservingInnerMostDims) { + eigen_assert(m_preservedStrides[NumOutputDims - 1] == 1); + startInput += index; + } else { + startInput += index * m_preservedStrides[NumOutputDims - 1]; + } } return startInput; } @@ -425,6 +803,16 @@ struct TensorEvaluator, Device> // Operation to apply for computing the reduction. Op m_reducer; + + // For full reductions +#if defined(EIGEN_USE_GPU) && defined(__CUDACC__) + static const bool RunningOnGPU = internal::is_same::value; +#else + static const bool RunningOnGPU = false; +#endif + CoeffReturnType* m_result; + + const Device& m_device; }; } // end namespace Eigen -- cgit v1.2.3