aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
diff options
context:
space:
mode:
authorGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2015-05-20 13:52:07 -0700
committerGravatar Benoit Steiner <benoit.steiner.goog@gmail.com>2015-05-20 13:52:07 -0700
commit6b800744ce914cf243ac3169e136c5000253f52e (patch)
tree8e383eec1b6ec168de3f22cedae0e35d7c880823 /unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
parent48f6b274e2d5a59477a368795867baea40e40eed (diff)
Moved away from std::async and std::future as the underlying mechnism for the thread pool device. On several platforms, the functions passed to std::async are not scheduled in the order in which they are given to std::async, which leads to massive performance issues in the contraction code.
Instead we now have a custom thread pool that ensures that the functions are picked up by the threads in the pool in the order in which they are enqueued in the pool.
Diffstat (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h')
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h67
1 files changed, 41 insertions, 26 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
index cb2fd53fe..ed87d3100 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h
@@ -46,8 +46,8 @@ struct packRhsAndKernelArg {
const Index n_block_idx;
const Index m_blocks;
const Index n_blocks;
- std::vector<Promise>* kernel_promises;
- const std::vector<Future>* lhs_futures;
+ std::vector<Notification*>* kernel_notifications;
+ const std::vector<Notification*>* lhs_notifications;
const bool need_to_pack;
};
@@ -219,17 +219,13 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
blockBs.push_back(static_cast<RhsScalar *>(this->m_device.allocate(sizeB * sizeof(RhsScalar))));
}
- // lhs_futures starts with all null futures
- std::vector<Future> lhs_futures(num_threads);
+ // lhs_notifications starts with all null Notifications
+ std::vector<Notification*> lhs_notifications(num_threads, nullptr);
// this should really be numBlockAs * n_blocks;
- const Index num_kernel_promises = num_threads * n_blocks;
- std::vector<Promise> kernel_promises(num_kernel_promises);
- std::vector<Future> kernel_futures(num_kernel_promises);
- for (std::size_t i = 0; i < kernel_promises.size(); ++i) {
- kernel_promises[i].set_value();
- kernel_futures[i] = kernel_promises[i].get_future();
- }
+ const Index num_kernel_notifications = num_threads * n_blocks;
+ std::vector<Notification*> kernel_notifications(num_kernel_notifications,
+ nullptr);
for (Index k_block_idx = 0; k_block_idx < k_blocks; k_block_idx++) {
const Index k_start = k_block_idx * kc;
@@ -245,11 +241,16 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
eigen_assert(actual_mc > 0);
Index blockAId = (k_block_idx * m_blocks + mt_block_idx) % num_threads;
+
for (int i = 0; i < n_blocks; ++i) {
- Index future_id = (blockAId * n_blocks + i);
- wait_until_ready(&kernel_futures[future_id]);
- kernel_promises[future_id] = Promise();
- kernel_futures[future_id] = kernel_promises[future_id].get_future();
+ Index notification_id = (blockAId * n_blocks + i);
+ // Wait for any current kernels using this slot to complete
+ // before using it.
+ if (kernel_notifications[notification_id]) {
+ wait_until_ready(kernel_notifications[notification_id]);
+ delete kernel_notifications[notification_id];
+ }
+ kernel_notifications[notification_id] = new Notification();
}
const packLArg arg = {
blockAs[blockAId], // blockA
@@ -260,8 +261,12 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
actual_kc, // kc
};
- lhs_futures[blockAId] =
- this->m_device.enqueue(&Self::packLhs<packLArg, LhsPacker>, arg);
+ // Delete any existing notification since we may be
+ // replacing it. The algorithm should ensure that there are
+ // no existing waiters on this notification.
+ delete lhs_notifications[blockAId];
+ lhs_notifications[blockAId] =
+ this->m_device.enqueue(&Self::packLhs<packLArg, LhsPacker>, arg);
}
// now start kernels.
@@ -278,7 +283,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
for (Index i = num_blocks; i < num_threads; ++i) {
Index blockAId = (k_block_idx * m_blocks + i + m_block_idx) % num_threads;
Index future_id = (blockAId * n_blocks + n_block_idx);
- wait_until_ready(&kernel_futures[future_id]);
+ wait_until_ready(kernel_notifications[future_id]);
}
}
@@ -301,19 +306,29 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
n_block_idx, // n_block_idx
m_blocks, // m_blocks
n_blocks, // n_blocks
- &kernel_promises, // kernel_promises
- &lhs_futures, // lhs_futures
+ &kernel_notifications, // kernel notifications
+ &lhs_notifications, // lhs notifications
need_to_pack, // need_to_pack
};
- this->m_device.enqueueNoFuture(&Self::packRhsAndKernel<packRKArg, RhsPacker, GebpKernel>, arg);
+ // We asynchronously kick off this function, which ends up
+ // notifying the appropriate kernel_notifications objects,
+ // which this thread waits on before exiting.
+ this->m_device.enqueueNoNotification(&Self::packRhsAndKernel<packRKArg, RhsPacker, GebpKernel>, arg);
}
}
}
// Make sure all the kernels are done.
- for (size_t i = 0; i < kernel_futures.size(); ++i) {
- wait_until_ready(&kernel_futures[i]);
+ for (size_t i = 0; i < kernel_notifications.size(); ++i) {
+ wait_until_ready(kernel_notifications[i]);
+ delete kernel_notifications[i];
+ }
+
+ // No need to wait for lhs notifications since they should have
+ // already been waited on. Just clean them up.
+ for (size_t i = 0; i < lhs_notifications.size(); ++i) {
+ delete lhs_notifications[i];
}
// deallocate all of the memory for both A and B's
@@ -360,15 +375,15 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
const Index m_base_start = arg.m + arg.mc*mt_block_idx;
if (m_base_start < arg.max_m) {
Index blockAId = (arg.k_block_idx * arg.m_blocks + mt_block_idx + arg.m_block_idx) % arg.num_threads;
-
- wait_until_ready(&(*arg.lhs_futures)[blockAId]);
+ wait_until_ready((*arg.lhs_notifications)[blockAId]);
const Index actual_mc = (std::min)(m_base_start + arg.mc, arg.max_m) - m_base_start;
gebp(arg.output.getSubMapper(m_base_start, arg.n),
(*arg.blockAs)[blockAId], arg.blockB,
actual_mc, arg.kc, arg.nc, 1.0, -1, -1, 0, 0);
+ // Notify that the kernel is done.
const Index set_idx = blockAId * arg.n_blocks + arg.n_block_idx;
- (*arg.kernel_promises)[set_idx].set_value();
+ (*arg.kernel_notifications)[set_idx]->Notify();
}
}
}