aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
diff options
context:
space:
mode:
authorGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2016-05-09 10:45:12 -0700
committerGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2016-05-09 10:45:12 -0700
commitba95e43ea25fd0c6066630ac121559c2e0ba7728 (patch)
tree6349c48e68eab1cafc0772423f46340b080f9cc6 /unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
parentdc7dbc2df71e88615c4f179a2eded7f617fca7a9 (diff)
Added a new parallelFor api to the thread pool device.
Diffstat (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h')
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h98
1 files changed, 98 insertions, 0 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
index c02891465..4df4cc220 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
@@ -172,7 +172,105 @@ struct ThreadPoolDevice {
pool_->Schedule(func);
}
+ // parallelFor executes f with [0, size) arguments in parallel and waits for
+ // completion. Block size is choosen between min_block_size and
+ // 2 * min_block_size to achieve the best parallel efficiency.
+ // If min_block_size == -1, parallelFor uses block size of 1.
+ // If hard_align > 0, block size is aligned to hard_align.
+ // If soft_align > hard_align, block size is aligned to soft_align provided
+ // that it does not increase block size too much.
+ void parallelFor(Index size, Index min_block_size, Index hard_align,
+ Index soft_align,
+ std::function<void(Index, Index)> f) const {
+ if (size <= 1 || (min_block_size != -1 && size < min_block_size) ||
+ numThreads() == 1) {
+ f(0, size);
+ return;
+ }
+
+ Index block_size = 1;
+ Index block_count = size;
+ if (min_block_size != -1) {
+ // Calculate block size based on (1) estimated cost and (2) parallel
+ // efficiency. We want blocks to be not too small to mitigate
+ // parallelization overheads; not too large to mitigate tail effect and
+ // potential load imbalance and we also want number of blocks to be evenly
+ // dividable across threads.
+ min_block_size = numext::maxi<Index>(min_block_size, 1);
+ block_size = numext::mini(min_block_size, size);
+ // Upper bound on block size:
+ const Index max_block_size = numext::mini(min_block_size * 2, size);
+ block_size = numext::mini(
+ alignBlockSize(block_size, hard_align, soft_align), size);
+ block_count = divup(size, block_size);
+ // Calculate parallel efficiency as fraction of total CPU time used for
+ // computations:
+ double max_efficiency =
+ static_cast<double>(block_count) /
+ (divup<int>(block_count, numThreads()) * numThreads());
+ // Now try to increase block size up to max_block_size as long as it
+ // doesn't decrease parallel efficiency.
+ for (Index prev_block_count = block_count; prev_block_count > 1;) {
+ // This is the next block size that divides size into a smaller number
+ // of blocks than the current block_size.
+ Index coarser_block_size = divup(size, prev_block_count - 1);
+ coarser_block_size =
+ alignBlockSize(coarser_block_size, hard_align, soft_align);
+ if (coarser_block_size > max_block_size) {
+ break; // Reached max block size. Stop.
+ }
+ // Recalculate parallel efficiency.
+ const Index coarser_block_count = divup(size, coarser_block_size);
+ eigen_assert(coarser_block_count < prev_block_count);
+ prev_block_count = coarser_block_count;
+ const double coarser_efficiency =
+ static_cast<double>(coarser_block_count) /
+ (divup<int>(coarser_block_count, numThreads()) * numThreads());
+ if (coarser_efficiency + 0.01 >= max_efficiency) {
+ // Taking it.
+ block_size = coarser_block_size;
+ block_count = coarser_block_count;
+ if (max_efficiency < coarser_efficiency) {
+ max_efficiency = coarser_efficiency;
+ }
+ }
+ }
+ }
+
+ // Recursively divide size into halves until we reach block_size.
+ // Division code rounds mid to block_size, so we are guaranteed to get
+ // block_count leaves that do actual computations.
+ Barrier barrier(block_count);
+ std::function<void(Index, Index)> handleRange;
+ handleRange = [=, &handleRange, &barrier, &f](Index first, Index last) {
+ if (last - first <= block_size) {
+ // Single block or less, execute directly.
+ f(first, last);
+ barrier.Notify();
+ return;
+ }
+ // Split into halves and submit to the pool.
+ Index mid = first + divup((last - first) / 2, block_size) * block_size;
+ pool_->Schedule([=, &handleRange]() { handleRange(mid, last); });
+ pool_->Schedule([=, &handleRange]() { handleRange(first, mid); });
+ };
+ handleRange(0, size);
+ barrier.Wait();
+ }
+
private:
+ static Index alignBlockSize(Index size, Index hard_align, Index soft_align) {
+ if (soft_align > hard_align && size >= 4 * soft_align) {
+ // Align to soft_align, if it won't increase size by more than 25%.
+ return (size + soft_align - 1) & ~(soft_align - 1);
+ }
+ if (hard_align > 0) {
+ return (size + hard_align - 1) & ~(hard_align - 1);
+ }
+ return size;
+ }
+
+
ThreadPoolInterface* pool_;
size_t num_threads_;
};