aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
diff options
context:
space:
mode:
authorGravatar Eugene Zhulenev <ezhulenev@google.com>2019-02-04 10:43:16 -0800
committerGravatar Eugene Zhulenev <ezhulenev@google.com>2019-02-04 10:43:16 -0800
commiteb21bab769b11546d08f7db0b5bb78bfde6cdbae (patch)
tree622589d6d83b8bc472f93ab4f18afe06db83f0e4 /unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
parent871e2e5339476ae3f7efe63a0156507fd10c73d7 (diff)
Parallelize tensor contraction only by sharding dimension and use 'thread-local' memory for packing
Diffstat (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h')
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h223
1 files changed, 206 insertions, 17 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
index e06099957..4932514c7 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
@@ -335,6 +335,47 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
mem += rhs_size;
}
}
+
+ // If there is enough available parallelism in sharding dimension we can
+ // call kernels in sync mode and use thread local memory for packed data.
+ const Index sharding_dim_tasks = shard_by_col ? nn : nm;
+ if (!parallel_pack_ && sharding_dim_tasks >= device_.numThreadsInPool()) {
+ parallelize_by_sharding_dim_only_ = true;
+
+ int num_worker_threads = device_.numThreadsInPool();
+
+ if (shard_by_col) {
+ can_use_thread_local_packed_ = new std::atomic<bool>[nn_];
+ for (int i = 0; i < nn_; ++i)
+ can_use_thread_local_packed_[i].store(true,
+ std::memory_order_relaxed);
+
+ Index num_blocks = num_worker_threads * gn_;
+ thread_local_packed_mem_ = device_.allocate(num_blocks * rhs_size);
+ mem = static_cast<char*>(thread_local_packed_mem_);
+
+ thread_local_packed_rhs_.resize(num_blocks, nullptr);
+ for (Index i = 0; i < num_blocks; ++i) {
+ thread_local_packed_rhs_[i] = reinterpret_cast<RhsScalar*>(mem);
+ mem += rhs_size;
+ }
+ } else {
+ can_use_thread_local_packed_ = new std::atomic<bool>[nm_];
+ for (int i = 0; i < nm_; ++i)
+ can_use_thread_local_packed_[i].store(true,
+ std::memory_order_relaxed);
+
+ Index num_blocks = num_worker_threads * gm_;
+ thread_local_packed_mem_ = device_.allocate(num_blocks * lhs_size);
+ mem = static_cast<char*>(thread_local_packed_mem_);
+
+ thread_local_packed_lhs_.resize(num_blocks, nullptr);
+ for (Index i = 0; i < num_blocks; ++i) {
+ thread_local_packed_lhs_[i] = reinterpret_cast<LhsScalar*>(mem);
+ mem += lhs_size;
+ }
+ }
+ }
}
~Context() {
@@ -343,6 +384,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
delete[] state_kernel_[x];
}
device_.deallocate(packed_mem_);
+ if (parallelize_by_sharding_dim_only_) {
+ device_.deallocate(thread_local_packed_mem_);
+ delete[] can_use_thread_local_packed_;
+ }
}
void run() {
@@ -426,6 +471,42 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
void* packed_mem_;
std::vector<LhsScalar*> packed_lhs_[P - 1];
std::vector<RhsScalar*> packed_rhs_[P - 1];
+
+ // If there is enough concurrency in the sharding dimension, we choose not
+ // to paralellize by the other dimension, and execute all kernels in sync
+ // mode. This reduces parallelism from the nm_ x nn_ down to nn_
+ // (shard_by_col==true) or nm_ (shard_by_col==false).
+ bool parallelize_by_sharding_dim_only_ = false;
+
+ // If we choose to parallelize only by the sharding dimension, each thread
+ // will have it's own "thead local" (not a c++ thread local storage) memory
+ // for packed_lhs or packed_rhs (shard_by_col = false of true). This memory
+ // can't be passed to a kernel that might execute on a different thread.
+ //
+ // In practice when we are ready to pack memory for the sharding dimension
+ // (rhs if shard_by_col==true) of the K-th slice, all kernels for K-1 slice
+ // already computed (99% of the time), and we can pack data into the thread
+ // local storage, and guarantee that all the kernels will be executed
+ // immediately in the same thread. This significantly increases L1 cache hit
+ // ratio and reduces pressure on the memory bus.
+ //
+ // It's still possible that kernel for the K-th slice will be ready before
+ // completion of the K-1 kernel, so we have to allocate "global" packed_lhs_
+ // and packed_rhs_ to allow kernels to be executed later on a thread
+ // different from the thread that was used for packing.
+ void* thread_local_packed_mem_;
+
+ // Only one of these will beinitialized depending on shard_by_col value.
+ std::vector<LhsScalar*> thread_local_packed_lhs_;
+ std::vector<RhsScalar*> thread_local_packed_rhs_;
+
+ // After a particular shard for Kth slice missed thread local execution
+ // opportunity (K-1 slice didn't complete kernels execution), we can no
+ // longer schedule K+1 and following slices in thread local mode, because
+ // there is no more guarantee that previous kernels were executed
+ // sequentially in the same thread (size is nn_ or nm_).
+ std::atomic<bool>* can_use_thread_local_packed_;
+
std::atomic<uint8_t>** state_kernel_[P];
// state_switch_ is frequently modified by worker threads, while other
// fields are read-only after constructor. Let's move it to a separate cache
@@ -434,22 +515,96 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
std::atomic<Index> state_packing_ready_[P];
std::atomic<Index> state_switch_[P];
+ LhsScalar* packed_lhs(Index m, Index k, Index m1, bool use_thread_local) {
+ if (use_thread_local) {
+ eigen_assert(!shard_by_col_);
+
+ Index base_idx = gm_ * device_.currentThreadId();
+ Index grain_idx = m1 - m * gm_;
+ Index block_idx = base_idx + grain_idx;
+
+ return thread_local_packed_lhs_[block_idx];
+ } else {
+ return packed_lhs_[k % (P - 1)][m1];
+ }
+ }
+
+ RhsScalar* packed_rhs(Index n, Index k, Index n1, bool use_thread_local) {
+ if (use_thread_local) {
+ eigen_assert(shard_by_col_);
+
+ Index base_idx = gn_ * device_.currentThreadId();
+ Index grain_idx = n1 - n * gn_;
+ Index block_idx = base_idx + grain_idx;
+
+ return thread_local_packed_rhs_[block_idx];
+ } else {
+ return packed_rhs_[k % (P - 1)][n1];
+ }
+ }
+
+ // In following two methods (pack_lhs and pack_rhs), if we know for sure
+ // that we'll be able to immediately call a kernel with packed data, and do
+ // not submit it to the thread pool, we can use thread local memory for
+ // packed data.
+ //
+ // We can only reliably check it if we are running all kernels in sync mode
+ // (parallelize only by sharding dim). If kernel for m==0 (n==0) is ready to
+ // run, it's guaranteed that all kernels with larger values of m (n) are
+ // also ready, because we execute them in the same order for all K slices.
+
void pack_lhs(Index m, Index k) {
+ bool use_thread_local = false;
+
+ if (parallelize_by_sharding_dim_only_ && !shard_by_col_ &&
+ can_use_thread_local_packed_[m].load(std::memory_order_relaxed)) {
+ if (state_kernel_[k % P][m][0].load(std::memory_order_relaxed) == 1) {
+ use_thread_local = true;
+ } else {
+ // If we can't guarantee that all kernels in `k` slice will be
+ // executed sequentially in current thread, it's no longer safe to use
+ // thread local memory in followig slices along the k dimensions.
+ eigen_assert(k > 0);
+ can_use_thread_local_packed_[m].store(false,
+ std::memory_order_relaxed);
+ }
+ }
+
const Index mend = m * gm_ + gm(m);
for (Index m1 = m * gm_; m1 < mend; m1++)
- TensorContractionKernel::packLhs(packed_lhs_[k % (P - 1)][m1],
+ TensorContractionKernel::packLhs(packed_lhs(m, k, m1, use_thread_local),
lhs_.getSubMapper(m1 * bm_, k * bk_),
bk(k), bm(m1));
if (!parallel_pack_ && shard_by_col_) {
+ assert(!use_thread_local);
signal_packing(k);
} else {
signal_switch(k + 1);
- for (Index n = nn_ - 1; n >= 0; n--) signal_kernel(m, n, k, n == 0);
+ for (Index n = nn_ - 1; n >= 0; n--) {
+ bool sync = parallelize_by_sharding_dim_only_ || n == 0;
+ signal_kernel(m, n, k, sync, use_thread_local);
+ }
}
}
void pack_rhs(Index n, Index k) {
+ bool use_thread_local = false;
+
+ if (parallelize_by_sharding_dim_only_ && shard_by_col_ &&
+ can_use_thread_local_packed_[n].load(std::memory_order_relaxed)) {
+ if (state_kernel_[k % P][0][n].load(std::memory_order_relaxed) == 1) {
+ use_thread_local = true;
+ } else {
+ // If we can't guarantee that all kernels in `k` slice will be
+ // executed sequentially in current thread, it's no longer safe to use
+ // thread local memory in followig slices along the k dimensions.
+ eigen_assert(k > 0);
+ can_use_thread_local_packed_[n].store(false,
+ std::memory_order_relaxed);
+ }
+ }
+
const Index nend = n * gn_ + gn(n);
for (Index n1 = n * gn_; n1 < nend; n1++) {
if (k == 0) {
@@ -462,20 +617,24 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
// deadlocks.
memset(buffer_ + n1 * bn_ * m_, 0, bn(n1) * m_ * sizeof(Scalar));
}
- TensorContractionKernel::packRhs(packed_rhs_[k % (P - 1)][n1],
+ TensorContractionKernel::packRhs(packed_rhs(n, k, n1, use_thread_local),
rhs_.getSubMapper(k * bk_, n1 * bn_),
bk(k), bn(n1));
}
if (parallel_pack_ || shard_by_col_) {
signal_switch(k + 1);
- for (Index m = nm_ - 1; m >= 0; m--) signal_kernel(m, n, k, m == 0);
+ for (Index m = nm_ - 1; m >= 0; m--) {
+ bool sync = parallelize_by_sharding_dim_only_ || m == 0;
+ signal_kernel(m, n, k, sync, use_thread_local);
+ }
} else {
+ assert(!use_thread_local);
signal_packing(k);
}
}
- void kernel(Index m, Index n, Index k) {
+ void kernel(Index m, Index n, Index k, bool use_thread_local) {
// Note: order of iteration matters here. Iteration over m is innermost
// because we want to reuse the same packed rhs in consecutive tasks
// (rhs fits into L2$ while lhs only into L3$).
@@ -486,8 +645,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
for (Index m1 = m * gm_; m1 < mend; m1++) {
const auto output_mapper = output_.getSubMapper(m1 * bm_, n1 * bn_);
TensorContractionKernel::invoke(
- output_mapper, packed_lhs_[k % (P - 1)][m1],
- packed_rhs_[k % (P - 1)][n1], bm(m1), bk(k), bn(n1), Scalar(1));
+ output_mapper,
+ packed_lhs(m, k, m1, !shard_by_col_ && use_thread_local),
+ packed_rhs(n, k, n1, shard_by_col_ && use_thread_local), bm(m1),
+ bk(k), bn(n1), Scalar(1));
// We are done with the last task for the [m1, n1] block.
if (k + 1 == nk_) {
@@ -501,8 +662,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
for (Index n1 = n * gn_; n1 < nend; n1++) {
const auto output_mapper = output_.getSubMapper(m1 * bm_, n1 * bn_);
TensorContractionKernel::invoke(
- output_mapper, packed_lhs_[k % (P - 1)][m1],
- packed_rhs_[k % (P - 1)][n1], bm(m1), bk(k), bn(n1), Scalar(1));
+ output_mapper,
+ packed_lhs(m, k, m1, !shard_by_col_ && use_thread_local),
+ packed_rhs(n, k, n1, shard_by_col_ && use_thread_local), bm(m1),
+ bk(k), bn(n1), Scalar(1));
// We are done with the last task for the [m1, n1] block.
if (k + 1 == nk_) {
@@ -511,7 +674,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
}
}
}
- signal_kernel(m, n, k + 1, false);
+ signal_kernel(m, n, k + 1, /*sync=*/false, /*use_thread_local=*/false);
signal_switch(k + 2);
}
@@ -524,16 +687,23 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
enqueue_packing(k, shard_by_col_);
}
- void signal_kernel(Index m, Index n, Index k, bool sync) {
+ void signal_kernel(Index m, Index n, Index k, bool sync,
+ bool use_thread_local) {
std::atomic<uint8_t>* state = &state_kernel_[k % P][m][n];
Index s = state->load();
eigen_assert(s > 0);
- if (s != 1 && state->fetch_sub(1) != 1) return;
+ if (s != 1 && state->fetch_sub(1) != 1) {
+ eigen_assert(!use_thread_local);
+ return;
+ }
state->store(parallel_pack_ ? 3 : 2, std::memory_order_relaxed);
- if (sync)
- kernel(m, n, k);
- else
- device_.enqueueNoNotification([=]() { kernel(m, n, k); });
+ if (sync) {
+ kernel(m, n, k, use_thread_local);
+ } else {
+ eigen_assert(!use_thread_local);
+ device_.enqueueNoNotification(
+ [=]() { kernel(m, n, k, use_thread_local); });
+ }
}
void signal_switch(Index k, Index v = 1) {
@@ -589,7 +759,26 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
[=]() { enqueue_packing_helper(mid, end, k, rhs); });
end = mid;
}
- enqueue_packing_helper(start, end, k, rhs);
+
+ // Decide if we want to run first packing task (start == 0) in
+ // async mode if we parallelize only by sharding dim:
+ // (1) pack_lhs and pack_rhs call signal_switch before completing
+ // all calls to signal_kernel, which in sync mode might lead
+ // to the execution of the first kernel of the k+1 slice, before
+ // completing a call to the last kernel of the k slice.
+ // (2) all pack tasks for sharded dim must be executed in a thread
+ // pool.
+ bool pack_async =
+ (start == 0) &&
+ (parallelize_by_sharding_dim_only_&& shard_by_col_ == rhs) &&
+ (k > 0 || device_.currentThreadId() < 0);
+
+ if (pack_async) {
+ device_.enqueueNoNotification(
+ [=]() { enqueue_packing_helper(start, end, k, rhs); });
+ } else {
+ enqueue_packing_helper(start, end, k, rhs);
+ }
}
}