diff options
author | Benoit Steiner <benoit.steiner.goog@gmail.com> | 2016-12-14 17:32:16 -0800 |
---|---|---|
committer | Benoit Steiner <benoit.steiner.goog@gmail.com> | 2016-12-14 17:32:16 -0800 |
commit | 9ff5d0f8214fb705aa315f686530c8b9f835d1f8 (patch) | |
tree | 5034af4159deb7b3cfeea50939dfa4f47ea8751b /unsupported/Eigen/CXX11 | |
parent | 730eb9fe1c0e0daa81aebbc4dbce52e185dda3dd (diff) | |
parent | 11b492e993f4272d86fc4019014b47b09a57a2ce (diff) |
Merged eigen/eigen into default
Diffstat (limited to 'unsupported/Eigen/CXX11')
10 files changed, 67 insertions, 29 deletions
diff --git a/unsupported/Eigen/CXX11/Tensor b/unsupported/Eigen/CXX11/Tensor index 8b36093f0..f98eb03bd 100644 --- a/unsupported/Eigen/CXX11/Tensor +++ b/unsupported/Eigen/CXX11/Tensor @@ -53,8 +53,10 @@ typedef __int32 int32_t; typedef unsigned __int32 uint32_t; typedef __int64 int64_t; typedef unsigned __int64 uint64_t; +#include <windows.h> #else #include <stdint.h> +#include <unistd.h> #endif #if __cplusplus > 199711 || EIGEN_COMP_MSVC >= 1900 diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool index 141372f63..c34614194 100644 --- a/unsupported/Eigen/CXX11/ThreadPool +++ b/unsupported/Eigen/CXX11/ThreadPool @@ -58,6 +58,18 @@ #include "src/ThreadPool/SimpleThreadPool.h" #include "src/ThreadPool/NonBlockingThreadPool.h" + +// Use the more efficient NonBlockingThreadPool by default. +namespace Eigen { +#ifndef EIGEN_USE_SIMPLE_THREAD_POOL +template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>; +typedef NonBlockingThreadPool ThreadPool; +#else +template <typename Env> using ThreadPoolTempl = SimpleThreadPoolTempl<Env>; +typedef SimpleThreadPool ThreadPool; +#endif +} // namespace Eigen + #endif #include <Eigen/src/Core/util/ReenableStupidWarnings.h> diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionMapper.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionMapper.h index 6a28024b6..ab320a50d 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionMapper.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionMapper.h @@ -418,7 +418,7 @@ class TensorContractionSubMapper { return m_base_mapper.template loadHalfPacket<Alignment>(i + m_vert_offset, m_horiz_offset); } - EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE void storePacket(Index i, Packet p) const { + EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE void storePacket(Index i, const Packet& p) const { if (UseDirectOffsets) { m_base_mapper.storePacket(i, 0, p); } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceCuda.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceCuda.h index ec732f17d..e6cee11ef 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceCuda.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceCuda.h @@ -88,11 +88,7 @@ static void initializeDeviceProp() { #if __cplusplus >= 201103L std::atomic_thread_fence(std::memory_order_acquire); #endif -#if EIGEN_OS_WIN || EIGEN_OS_WIN64 - Sleep(1000); -#else - sleep(1); -#endif + EIGEN_SLEEP(1000); } } } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index 210ae1368..16180ca69 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -12,17 +12,6 @@ namespace Eigen { -// Use the SimpleThreadPool by default. We'll switch to the new non blocking -// thread pool later. -#ifndef EIGEN_USE_SIMPLE_THREAD_POOL -template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>; -typedef NonBlockingThreadPool ThreadPool; -#else -template <typename Env> using ThreadPoolTempl = SimpleThreadPoolTempl<Env>; -typedef SimpleThreadPool ThreadPool; -#endif - - // Barrier is an object that allows one or more threads to wait until // Notify has been called a specified number of times. class Barrier { diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorMacros.h b/unsupported/Eigen/CXX11/src/Tensor/TensorMacros.h index ee0078bbc..f92e39d69 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorMacros.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorMacros.h @@ -51,4 +51,12 @@ #endif +#if EIGEN_OS_WIN || EIGEN_OS_WIN64 +#define EIGEN_SLEEP(n) Sleep(n) +#elif EIGEN_OS_GNULINUX +#define EIGEN_SLEEP(n) usleep(n * 1000); +#else +#define EIGEN_SLEEP(n) sleep(std::max<unsigned>(1, n/1000)) +#endif + #endif diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index b57863163..0e6a0bf8f 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -28,6 +28,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { blocked_(0), spinning_(0), done_(false), + cancelled_(false), ec_(waiters_) { waiters_.resize(num_threads); @@ -61,10 +62,19 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { ~NonBlockingThreadPoolTempl() { done_ = true; + // Now if all threads block without work, they will start exiting. // But note that threads can continue to work arbitrary long, // block, submit new work, unblock and otherwise live full life. - ec_.Notify(true); + if (!cancelled_) { + ec_.Notify(true); + } else { + // Since we were cancelled, there might be entries in the queues. + // Empty them to prevent their destructor from asserting. + for (size_t i = 0; i < queues_.size(); i++) { + queues_[i]->Flush(); + } + } // Join threads explicitly to avoid destruction order issues. for (size_t i = 0; i < threads_.size(); i++) delete threads_[i]; @@ -91,16 +101,25 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // completes overall computations, which in turn leads to destruction of // this. We expect that such scenario is prevented by program, that is, // this is kept alive while any threads can potentially be in Schedule. - if (!t.f) + if (!t.f) { ec_.Notify(false); - else + } + else { env_.ExecuteTask(t); // Push failed, execute directly. + } } void Cancel() { + cancelled_ = true; + done_ = true; + + // Let each thread know it's been cancelled. for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->Cancel(); + threads_[i]->OnCancel(); } + + // Wake up the threads without work to let them exit on their own. + ec_.Notify(true); } int NumThreads() const final { @@ -135,6 +154,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { std::atomic<unsigned> blocked_; std::atomic<bool> spinning_; std::atomic<bool> done_; + std::atomic<bool> cancelled_; EventCount ec_; // Main worker thread loop. @@ -145,7 +165,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { pt->thread_id = thread_id; Queue* q = queues_[thread_id]; EventCount::Waiter* waiter = &waiters_[thread_id]; - for (;;) { + while (!cancelled_) { Task t = q->PopFront(); if (!t.f) { t = Steal(); @@ -158,7 +178,11 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // pool. Consider a time based limit instead. if (!spinning_ && !spinning_.exchange(true)) { for (int i = 0; i < 1000 && !t.f; i++) { - t = Steal(); + if (!cancelled_.load(std::memory_order_relaxed)) { + t = Steal(); + } else { + return; + } } spinning_ = false; } @@ -207,8 +231,12 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { int victim = NonEmptyQueueIndex(); if (victim != -1) { ec_.CancelWait(waiter); - *t = queues_[victim]->PopBack(); - return true; + if (cancelled_) { + return false; + } else { + *t = queues_[victim]->PopBack(); + return true; + } } // Number of blocked threads is used as termination condition. // If we are shutting down and all worker threads blocked without work, diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h index ab4f85fbf..fb08deb20 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h @@ -71,7 +71,7 @@ class SimpleThreadPoolTempl : public ThreadPoolInterface { void Cancel() { for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->Cancel(); + threads_[i]->OnCancel(); } } diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h index b3c45057d..d94a06416 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h @@ -23,7 +23,8 @@ struct StlThreadEnvironment { public: EnvThread(std::function<void()> f) : thr_(std::move(f)) {} ~EnvThread() { thr_.join(); } - void Cancel() { EIGEN_THREAD_CANCEL(thr_); } + // This function is called when the threadpool is cancelled. + void OnCancel() { } private: std::thread thr_; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h index 5935b7cd8..84e1e6cc0 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h @@ -19,8 +19,10 @@ class ThreadPoolInterface { // Submits a closure to be run by a thread in the pool. virtual void Schedule(std::function<void()> fn) = 0; - // Cancel all the threads in the pool. - virtual void Cancel() = 0; + // If implemented, stop processing the closures that have been enqueued. + // Currently running closures may still be processed. + // If not implemented, does nothing. + virtual void Cancel() {} // Returns the number of threads in the pool. virtual int NumThreads() const = 0; |