path: root/unsupported
diff options
authorGravatar Eugene Zhulenev <ezhulenev@google.com>2019-08-22 10:50:51 -0700
committerGravatar Eugene Zhulenev <ezhulenev@google.com>2019-08-22 10:50:51 -0700
commit6901788013b0148d62118b73ea5eca9c7140f0d7 (patch)
tree024c54697f332a9f19591945911f66dec46f5bfe /unsupported
parent2fb24384c9c4d4c9e0c1ab7923985a90f2f1d5e4 (diff)
Asynchronous parallelFor in Eigen ThreadPoolDevice
Diffstat (limited to 'unsupported')
1 files changed, 144 insertions, 53 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
index ef22a268a..ca2794cb5 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
@@ -75,9 +75,9 @@ struct ThreadPoolDevice {
EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const {
template<typename Type>
return data;
@@ -181,44 +181,173 @@ struct ThreadPoolDevice {
return pool_->CurrentThreadId();
- // parallelFor executes f with [0, n) arguments in parallel and waits for
- // completion. F accepts a half-open interval [first, last).
- // Block size is chosen based on the iteration cost and resulting parallel
+ // WARNING: This function is synchronous and will block the calling thread.
+ //
+ // Synchronous parallelFor executes f with [0, n) arguments in parallel and
+ // waits for completion. F accepts a half-open interval [first, last). Block
+ // size is chosen based on the iteration cost and resulting parallel
// efficiency. If block_align is not nullptr, it is called to round up the
// block size.
void parallelFor(Index n, const TensorOpCost& cost,
std::function<Index(Index)> block_align,
std::function<void(Index, Index)> f) const {
- typedef TensorCostModel<ThreadPoolDevice> CostModel;
+ // Compute small problems directly in the caller thread.
if (n <= 1 || numThreads() == 1 ||
CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
f(0, n);
- // Calculate block size based on (1) the iteration cost and (2) parallel
- // efficiency. We want blocks to be not too small to mitigate
- // parallelization overheads; not too large to mitigate tail
- // effect and potential load imbalance and we also want number
- // of blocks to be evenly dividable across threads.
+ // Compute block size and total count of blocks.
+ ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
+ // Recursively divide size into halves until we reach block_size.
+ // Division code rounds mid to block_size, so we are guaranteed to get
+ // block_count leaves that do actual computations.
+ Barrier barrier(static_cast<unsigned int>(block.count));
+ std::function<void(Index, Index)> handleRange;
+ handleRange = [=, &handleRange, &barrier, &f](Index firstIdx,
+ Index lastIdx) {
+ while (lastIdx - firstIdx > block.size) {
+ // Split into halves and schedule the second half on a different thread.
+ const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
+ pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
+ lastIdx = midIdx;
+ }
+ // Single block or less, execute directly.
+ f(firstIdx, lastIdx);
+ barrier.Notify();
+ };
+ if (block.count <= numThreads()) {
+ // Avoid a thread hop by running the root of the tree and one block on the
+ // main thread.
+ handleRange(0, n);
+ } else {
+ // Execute the root in the thread pool to avoid running work on more than
+ // numThreads() threads.
+ pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
+ }
+ barrier.Wait();
+ }
+ // Convenience wrapper for parallelFor that does not align blocks.
+ void parallelFor(Index n, const TensorOpCost& cost,
+ std::function<void(Index, Index)> f) const {
+ parallelFor(n, cost, NULL, std::move(f));
+ }
+ // WARNING: This function is asynchronous and will not block the calling thread.
+ //
+ // Asynchronous parallelFor executes f with [0, n) arguments in parallel
+ // without waiting for completion. When the last block finished, it will call
+ // 'done' callback. F accepts a half-open interval [first, last). Block size
+ // is chosen based on the iteration cost and resulting parallel efficiency. If
+ // block_align is not nullptr, it is called to round up the block size.
+ void parallelForAsync(Index n, const TensorOpCost& cost,
+ std::function<Index(Index)> block_align,
+ std::function<void(Index, Index)> f,
+ std::function<void()> done) const {
+ // Compute block size and total count of blocks.
+ ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
+ ParallelForAsyncContext* const ctx =
+ new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
+ // Recursively divide size into halves until we reach block_size.
+ // Division code rounds mid to block_size, so we are guaranteed to get
+ // block_count leaves that do actual computations.
+ ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) {
+ while (lastIdx - firstIdx > block.size) {
+ // Split into halves and schedule the second half on a different thread.
+ const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
+ pool_->Schedule(
+ [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
+ lastIdx = midIdx;
+ }
+ // Single block or less, execute directly.
+ ctx->f(firstIdx, lastIdx);
+ // Call 'done' callback if it was the last block.
+ if (ctx->count.fetch_sub(1) == 1) {
+ (ctx->done)();
+ // We can't delete ctx right now, because it will deallocate the closure
+ // we are currently in.
+ pool_->Schedule([ctx]() { delete ctx; });
+ }
+ };
+ // Execute the root in the thread pool.
+ pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
+ }
+ // Convenience wrapper for parallelForAsync that does not align blocks.
+ void parallelForAsync(Index n, const TensorOpCost& cost,
+ std::function<void(Index, Index)> f,
+ std::function<void()> done) const {
+ parallelForAsync(n, cost, NULL, std::move(f), std::move(done));
+ }
+ // Thread pool accessor.
+ ThreadPoolInterface* getPool() const { return pool_; }
+ // Allocator accessor.
+ Allocator* allocator() const { return allocator_; }
- double block_size_f = 1.0 / CostModel::taskSize(1, cost);
+ private:
+ typedef TensorCostModel<ThreadPoolDevice> CostModel;
+ // For parallelForAsync we must keep passed in closures on the heap, and
+ // delete them only after `done` callback finished.
+ struct ParallelForAsyncContext {
+ ParallelForAsyncContext(Index count, std::function<void(Index, Index)> f,
+ std::function<void()> done)
+ : count(count), f(std::move(f)), done(std::move(done)) {}
+ std::atomic<Index> count;
+ std::function<void(Index, Index)> f;
+ std::function<void()> done;
+ std::function<void(Index, Index)> handle_range;
+ };
+ struct ParallelForBlock {
+ Index size; // block size
+ Index count; // number of blocks
+ };
+ // Calculates block size based on (1) the iteration cost and (2) parallel
+ // efficiency. We want blocks to be not too small to mitigate parallelization
+ // overheads; not too large to mitigate tail effect and potential load
+ // imbalance and we also want number of blocks to be evenly dividable across
+ // threads.
+ ParallelForBlock CalculateParallelForBlock(
+ const Index n, const TensorOpCost& cost,
+ std::function<Index(Index)> block_align) const {
+ const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
const Index max_oversharding_factor = 4;
Index block_size = numext::mini(
- n, numext::maxi<Index>(divup<Index>(n, max_oversharding_factor * numThreads()),
- block_size_f));
+ n, numext::maxi<Index>(
+ divup<Index>(n, max_oversharding_factor * numThreads()),
+ block_size_f));
const Index max_block_size = numext::mini(n, 2 * block_size);
if (block_align) {
Index new_block_size = block_align(block_size);
eigen_assert(new_block_size >= block_size);
block_size = numext::mini(n, new_block_size);
Index block_count = divup(n, block_size);
// Calculate parallel efficiency as fraction of total CPU time used for
// computations:
double max_efficiency =
static_cast<double>(block_count) /
(divup<int>(block_count, numThreads()) * numThreads());
// Now try to increase block size up to max_block_size as long as it
// doesn't decrease parallel efficiency.
for (Index prev_block_count = block_count;
@@ -251,47 +380,9 @@ struct ThreadPoolDevice {
- // Recursively divide size into halves until we reach block_size.
- // Division code rounds mid to block_size, so we are guaranteed to get
- // block_count leaves that do actual computations.
- Barrier barrier(static_cast<unsigned int>(block_count));
- std::function<void(Index, Index)> handleRange;
- handleRange = [=, &handleRange, &barrier, &f](Index firstIdx, Index lastIdx) {
- while (lastIdx - firstIdx > block_size) {
- // Split into halves and schedule the second half on a different thread.
- const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block_size) * block_size;
- pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
- lastIdx = midIdx;
- }
- // Single block or less, execute directly.
- f(firstIdx, lastIdx);
- barrier.Notify();
- };
- if (block_count <= numThreads()) {
- // Avoid a thread hop by running the root of the tree and one block on the
- // main thread.
- handleRange(0, n);
- } else {
- // Execute the root in the thread pool to avoid running work on more than
- // numThreads() threads.
- pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
- }
- barrier.Wait();
+ return {block_size, block_count};
- // Convenience wrapper for parallelFor that does not align blocks.
- void parallelFor(Index n, const TensorOpCost& cost,
- std::function<void(Index, Index)> f) const {
- parallelFor(n, cost, NULL, std::move(f));
- }
- // Thread pool accessor.
- ThreadPoolInterface* getPool() const { return pool_; }
- // Allocator accessor.
- Allocator* allocator() const { return allocator_; }
- private:
ThreadPoolInterface* pool_;
int num_threads_;
Allocator* allocator_;