From ba95e43ea25fd0c6066630ac121559c2e0ba7728 Mon Sep 17 00:00:00 2001 From: Benoit Steiner Date: Mon, 9 May 2016 10:45:12 -0700 Subject: Added a new parallelFor api to the thread pool device. --- .../CXX11/src/Tensor/TensorDeviceThreadPool.h | 98 ++++++++++++++++++++++ 1 file changed, 98 insertions(+) (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h') diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index c02891465..4df4cc220 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -172,7 +172,105 @@ struct ThreadPoolDevice { pool_->Schedule(func); } + // parallelFor executes f with [0, size) arguments in parallel and waits for + // completion. Block size is choosen between min_block_size and + // 2 * min_block_size to achieve the best parallel efficiency. + // If min_block_size == -1, parallelFor uses block size of 1. + // If hard_align > 0, block size is aligned to hard_align. + // If soft_align > hard_align, block size is aligned to soft_align provided + // that it does not increase block size too much. + void parallelFor(Index size, Index min_block_size, Index hard_align, + Index soft_align, + std::function f) const { + if (size <= 1 || (min_block_size != -1 && size < min_block_size) || + numThreads() == 1) { + f(0, size); + return; + } + + Index block_size = 1; + Index block_count = size; + if (min_block_size != -1) { + // Calculate block size based on (1) estimated cost and (2) parallel + // efficiency. We want blocks to be not too small to mitigate + // parallelization overheads; not too large to mitigate tail effect and + // potential load imbalance and we also want number of blocks to be evenly + // dividable across threads. + min_block_size = numext::maxi(min_block_size, 1); + block_size = numext::mini(min_block_size, size); + // Upper bound on block size: + const Index max_block_size = numext::mini(min_block_size * 2, size); + block_size = numext::mini( + alignBlockSize(block_size, hard_align, soft_align), size); + block_count = divup(size, block_size); + // Calculate parallel efficiency as fraction of total CPU time used for + // computations: + double max_efficiency = + static_cast(block_count) / + (divup(block_count, numThreads()) * numThreads()); + // Now try to increase block size up to max_block_size as long as it + // doesn't decrease parallel efficiency. + for (Index prev_block_count = block_count; prev_block_count > 1;) { + // This is the next block size that divides size into a smaller number + // of blocks than the current block_size. + Index coarser_block_size = divup(size, prev_block_count - 1); + coarser_block_size = + alignBlockSize(coarser_block_size, hard_align, soft_align); + if (coarser_block_size > max_block_size) { + break; // Reached max block size. Stop. + } + // Recalculate parallel efficiency. + const Index coarser_block_count = divup(size, coarser_block_size); + eigen_assert(coarser_block_count < prev_block_count); + prev_block_count = coarser_block_count; + const double coarser_efficiency = + static_cast(coarser_block_count) / + (divup(coarser_block_count, numThreads()) * numThreads()); + if (coarser_efficiency + 0.01 >= max_efficiency) { + // Taking it. + block_size = coarser_block_size; + block_count = coarser_block_count; + if (max_efficiency < coarser_efficiency) { + max_efficiency = coarser_efficiency; + } + } + } + } + + // Recursively divide size into halves until we reach block_size. + // Division code rounds mid to block_size, so we are guaranteed to get + // block_count leaves that do actual computations. + Barrier barrier(block_count); + std::function handleRange; + handleRange = [=, &handleRange, &barrier, &f](Index first, Index last) { + if (last - first <= block_size) { + // Single block or less, execute directly. + f(first, last); + barrier.Notify(); + return; + } + // Split into halves and submit to the pool. + Index mid = first + divup((last - first) / 2, block_size) * block_size; + pool_->Schedule([=, &handleRange]() { handleRange(mid, last); }); + pool_->Schedule([=, &handleRange]() { handleRange(first, mid); }); + }; + handleRange(0, size); + barrier.Wait(); + } + private: + static Index alignBlockSize(Index size, Index hard_align, Index soft_align) { + if (soft_align > hard_align && size >= 4 * soft_align) { + // Align to soft_align, if it won't increase size by more than 25%. + return (size + soft_align - 1) & ~(soft_align - 1); + } + if (hard_align > 0) { + return (size + hard_align - 1) & ~(hard_align - 1); + } + return size; + } + + ThreadPoolInterface* pool_; size_t num_threads_; }; -- cgit v1.2.3