diff options
author | 2016-12-14 17:32:16 -0800 | |
---|---|---|
committer | 2016-12-14 17:32:16 -0800 | |
commit | 9ff5d0f8214fb705aa315f686530c8b9f835d1f8 (patch) | |
tree | 5034af4159deb7b3cfeea50939dfa4f47ea8751b /unsupported | |
parent | 730eb9fe1c0e0daa81aebbc4dbce52e185dda3dd (diff) | |
parent | 11b492e993f4272d86fc4019014b47b09a57a2ce (diff) |
Merged eigen/eigen into default
Diffstat (limited to 'unsupported')
12 files changed, 79 insertions, 58 deletions
diff --git a/unsupported/Eigen/CXX11/Tensor b/unsupported/Eigen/CXX11/Tensor index 8b36093f0..f98eb03bd 100644 --- a/unsupported/Eigen/CXX11/Tensor +++ b/unsupported/Eigen/CXX11/Tensor @@ -53,8 +53,10 @@ typedef __int32 int32_t; typedef unsigned __int32 uint32_t; typedef __int64 int64_t; typedef unsigned __int64 uint64_t; +#include <windows.h> #else #include <stdint.h> +#include <unistd.h> #endif #if __cplusplus > 199711 || EIGEN_COMP_MSVC >= 1900 diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool index 141372f63..c34614194 100644 --- a/unsupported/Eigen/CXX11/ThreadPool +++ b/unsupported/Eigen/CXX11/ThreadPool @@ -58,6 +58,18 @@ #include "src/ThreadPool/SimpleThreadPool.h" #include "src/ThreadPool/NonBlockingThreadPool.h" + +// Use the more efficient NonBlockingThreadPool by default. +namespace Eigen { +#ifndef EIGEN_USE_SIMPLE_THREAD_POOL +template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>; +typedef NonBlockingThreadPool ThreadPool; +#else +template <typename Env> using ThreadPoolTempl = SimpleThreadPoolTempl<Env>; +typedef SimpleThreadPool ThreadPool; +#endif +} // namespace Eigen + #endif #include <Eigen/src/Core/util/ReenableStupidWarnings.h> diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionMapper.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionMapper.h index 6a28024b6..ab320a50d 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionMapper.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionMapper.h @@ -418,7 +418,7 @@ class TensorContractionSubMapper { return m_base_mapper.template loadHalfPacket<Alignment>(i + m_vert_offset, m_horiz_offset); } - EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE void storePacket(Index i, Packet p) const { + EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE void storePacket(Index i, const Packet& p) const { if (UseDirectOffsets) { m_base_mapper.storePacket(i, 0, p); } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceCuda.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceCuda.h index ec732f17d..e6cee11ef 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceCuda.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceCuda.h @@ -88,11 +88,7 @@ static void initializeDeviceProp() { #if __cplusplus >= 201103L std::atomic_thread_fence(std::memory_order_acquire); #endif -#if EIGEN_OS_WIN || EIGEN_OS_WIN64 - Sleep(1000); -#else - sleep(1); -#endif + EIGEN_SLEEP(1000); } } } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index 210ae1368..16180ca69 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -12,17 +12,6 @@ namespace Eigen { -// Use the SimpleThreadPool by default. We'll switch to the new non blocking -// thread pool later. -#ifndef EIGEN_USE_SIMPLE_THREAD_POOL -template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>; -typedef NonBlockingThreadPool ThreadPool; -#else -template <typename Env> using ThreadPoolTempl = SimpleThreadPoolTempl<Env>; -typedef SimpleThreadPool ThreadPool; -#endif - - // Barrier is an object that allows one or more threads to wait until // Notify has been called a specified number of times. class Barrier { diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorMacros.h b/unsupported/Eigen/CXX11/src/Tensor/TensorMacros.h index ee0078bbc..f92e39d69 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorMacros.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorMacros.h @@ -51,4 +51,12 @@ #endif +#if EIGEN_OS_WIN || EIGEN_OS_WIN64 +#define EIGEN_SLEEP(n) Sleep(n) +#elif EIGEN_OS_GNULINUX +#define EIGEN_SLEEP(n) usleep(n * 1000); +#else +#define EIGEN_SLEEP(n) sleep(std::max<unsigned>(1, n/1000)) +#endif + #endif diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index b57863163..0e6a0bf8f 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -28,6 +28,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { blocked_(0), spinning_(0), done_(false), + cancelled_(false), ec_(waiters_) { waiters_.resize(num_threads); @@ -61,10 +62,19 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { ~NonBlockingThreadPoolTempl() { done_ = true; + // Now if all threads block without work, they will start exiting. // But note that threads can continue to work arbitrary long, // block, submit new work, unblock and otherwise live full life. - ec_.Notify(true); + if (!cancelled_) { + ec_.Notify(true); + } else { + // Since we were cancelled, there might be entries in the queues. + // Empty them to prevent their destructor from asserting. + for (size_t i = 0; i < queues_.size(); i++) { + queues_[i]->Flush(); + } + } // Join threads explicitly to avoid destruction order issues. for (size_t i = 0; i < threads_.size(); i++) delete threads_[i]; @@ -91,16 +101,25 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // completes overall computations, which in turn leads to destruction of // this. We expect that such scenario is prevented by program, that is, // this is kept alive while any threads can potentially be in Schedule. - if (!t.f) + if (!t.f) { ec_.Notify(false); - else + } + else { env_.ExecuteTask(t); // Push failed, execute directly. + } } void Cancel() { + cancelled_ = true; + done_ = true; + + // Let each thread know it's been cancelled. for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->Cancel(); + threads_[i]->OnCancel(); } + + // Wake up the threads without work to let them exit on their own. + ec_.Notify(true); } int NumThreads() const final { @@ -135,6 +154,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { std::atomic<unsigned> blocked_; std::atomic<bool> spinning_; std::atomic<bool> done_; + std::atomic<bool> cancelled_; EventCount ec_; // Main worker thread loop. @@ -145,7 +165,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { pt->thread_id = thread_id; Queue* q = queues_[thread_id]; EventCount::Waiter* waiter = &waiters_[thread_id]; - for (;;) { + while (!cancelled_) { Task t = q->PopFront(); if (!t.f) { t = Steal(); @@ -158,7 +178,11 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // pool. Consider a time based limit instead. if (!spinning_ && !spinning_.exchange(true)) { for (int i = 0; i < 1000 && !t.f; i++) { - t = Steal(); + if (!cancelled_.load(std::memory_order_relaxed)) { + t = Steal(); + } else { + return; + } } spinning_ = false; } @@ -207,8 +231,12 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { int victim = NonEmptyQueueIndex(); if (victim != -1) { ec_.CancelWait(waiter); - *t = queues_[victim]->PopBack(); - return true; + if (cancelled_) { + return false; + } else { + *t = queues_[victim]->PopBack(); + return true; + } } // Number of blocked threads is used as termination condition. // If we are shutting down and all worker threads blocked without work, diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h index ab4f85fbf..fb08deb20 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h @@ -71,7 +71,7 @@ class SimpleThreadPoolTempl : public ThreadPoolInterface { void Cancel() { for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->Cancel(); + threads_[i]->OnCancel(); } } diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h index b3c45057d..d94a06416 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h @@ -23,7 +23,8 @@ struct StlThreadEnvironment { public: EnvThread(std::function<void()> f) : thr_(std::move(f)) {} ~EnvThread() { thr_.join(); } - void Cancel() { EIGEN_THREAD_CANCEL(thr_); } + // This function is called when the threadpool is cancelled. + void OnCancel() { } private: std::thread thr_; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h index 5935b7cd8..84e1e6cc0 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h @@ -19,8 +19,10 @@ class ThreadPoolInterface { // Submits a closure to be run by a thread in the pool. virtual void Schedule(std::function<void()> fn) = 0; - // Cancel all the threads in the pool. - virtual void Cancel() = 0; + // If implemented, stop processing the closures that have been enqueued. + // Currently running closures may still be processed. + // If not implemented, does nothing. + virtual void Cancel() {} // Returns the number of threads in the pool. virtual int NumThreads() const = 0; diff --git a/unsupported/test/cxx11_non_blocking_thread_pool.cpp b/unsupported/test/cxx11_non_blocking_thread_pool.cpp index fe30551ce..2c5765ce4 100644 --- a/unsupported/test/cxx11_non_blocking_thread_pool.cpp +++ b/unsupported/test/cxx11_non_blocking_thread_pool.cpp @@ -10,8 +10,8 @@ #define EIGEN_USE_THREADS #include "main.h" -#include <unistd.h> #include "Eigen/CXX11/ThreadPool" +#include "Eigen/CXX11/Tensor" static void test_create_destroy_empty_pool() { @@ -104,23 +104,15 @@ static void test_parallelism() static void test_cancel() { - NonBlockingThreadPool tp(4); + NonBlockingThreadPool tp(2); -#ifdef EIGEN_SUPPORTS_THREAD_CANCELLATION - std::cout << "Thread cancellation is supported on this platform" << std::endl; - - // Put 2 threads to sleep for much longer than the default test timeout. - tp.Schedule([]() { sleep(3600); } ); - tp.Schedule([]() { sleep(3600 * 24); } ); -#else - std::cout << "Thread cancellation is a no-op on this platform" << std::endl; - - // Make 2 threads sleep for a short period of time - tp.Schedule([]() { sleep(1); } ); - tp.Schedule([]() { sleep(2); } ); -#endif + // Schedule a large number of closure that each sleeps for one second. This + // will keep the thread pool busy for much longer than the default test timeout. + for (int i = 0; i < 1000; ++i) { + tp.Schedule([]() { EIGEN_SLEEP(2000); }); + } - // Call cancel: + // Cancel the processing of all the closures that are still pending. tp.Cancel(); } diff --git a/unsupported/test/cxx11_tensor_notification.cpp b/unsupported/test/cxx11_tensor_notification.cpp index c946007b8..183ef02c1 100644 --- a/unsupported/test/cxx11_tensor_notification.cpp +++ b/unsupported/test/cxx11_tensor_notification.cpp @@ -13,15 +13,6 @@ #include "main.h" #include <Eigen/CXX11/Tensor> -#if EIGEN_OS_WIN || EIGEN_OS_WIN64 -#include <windows.h> -void sleep(int seconds) { - Sleep(seconds*1000); -} -#else -#include <unistd.h> -#endif - namespace { @@ -40,7 +31,7 @@ static void test_notification_single() Eigen::Notification n; std::function<void()> func = std::bind(&WaitAndAdd, &n, &counter); thread_pool.Schedule(func); - sleep(1); + EIGEN_SLEEP(1000); // The thread should be waiting for the notification. VERIFY_IS_EQUAL(counter, 0); @@ -48,7 +39,7 @@ static void test_notification_single() // Unblock the thread n.Notify(); - sleep(1); + EIGEN_SLEEP(1000); // Verify the counter has been incremented VERIFY_IS_EQUAL(counter, 1); @@ -67,10 +58,10 @@ static void test_notification_multiple() thread_pool.Schedule(func); thread_pool.Schedule(func); thread_pool.Schedule(func); - sleep(1); + EIGEN_SLEEP(1000); VERIFY_IS_EQUAL(counter, 0); n.Notify(); - sleep(1); + EIGEN_SLEEP(1000); VERIFY_IS_EQUAL(counter, 4); } |