diff options
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool')
5 files changed, 178 insertions, 49 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h b/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h new file mode 100644 index 000000000..ef5e9ff18 --- /dev/null +++ b/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h @@ -0,0 +1,64 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2018 Rasmus Munk Larsen <rmlarsen@google.com> +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Barrier is an object that allows one or more threads to wait until +// Notify has been called a specified number of times. + +#ifndef EIGEN_CXX11_THREADPOOL_BARRIER_H +#define EIGEN_CXX11_THREADPOOL_BARRIER_H + +namespace Eigen { + +class Barrier { + public: + Barrier(unsigned int count) : state_(count << 1), notified_(false) { + eigen_assert(((count << 1) >> 1) == count); + } + ~Barrier() { eigen_plain_assert((state_ >> 1) == 0); } + + void Notify() { + unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2; + if (v != 1) { + eigen_assert(((v + 2) & ~1) != 0); + return; // either count has not dropped to 0, or waiter is not waiting + } + std::unique_lock<std::mutex> l(mu_); + eigen_assert(!notified_); + notified_ = true; + cv_.notify_all(); + } + + void Wait() { + unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel); + if ((v >> 1) == 0) return; + std::unique_lock<std::mutex> l(mu_); + while (!notified_) { + cv_.wait(l); + } + } + + private: + std::mutex mu_; + std::condition_variable cv_; + std::atomic<unsigned int> state_; // low bit is waiter flag + bool notified_; +}; + +// Notification is an object that allows a user to to wait for another +// thread to signal a notification that an event has occurred. +// +// Multiple threads can wait on the same Notification object, +// but only one caller must call Notify() on the object. +struct Notification : Barrier { + Notification() : Barrier(1){}; +}; + +} // namespace Eigen + +#endif // EIGEN_CXX11_THREADPOOL_BARRIER_H diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h index 0a7181102..22c952ae1 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h @@ -58,7 +58,7 @@ class EventCount { ~EventCount() { // Ensure there are no waiters. - eigen_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask); + eigen_plain_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask); } // Prewait prepares for waiting. @@ -169,7 +169,8 @@ class EventCount { class Waiter { friend class EventCount; - // Align to 128 byte boundary to prevent false sharing with other Waiter objects in the same vector. + // Align to 128 byte boundary to prevent false sharing with other Waiter + // objects in the same vector. EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<Waiter*> next; std::mutex mu; std::condition_variable cv; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index ecd49f382..60a0c9fb6 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -10,7 +10,6 @@ #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H - namespace Eigen { template <typename Environment> @@ -23,7 +22,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { : ThreadPoolTempl(num_threads, true, env) {} ThreadPoolTempl(int num_threads, bool allow_spinning, - Environment env = Environment()) + Environment env = Environment()) : env_(env), num_threads_(num_threads), allow_spinning_(allow_spinning), @@ -58,12 +57,18 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { coprimes_.push_back(i); } } + queues_.resize(num_threads_); +#ifndef EIGEN_THREAD_LOCAL + init_barrier_.reset(new Barrier(num_threads_)); +#endif for (int i = 0; i < num_threads_; i++) { - queues_.push_back(new Queue()); - } - for (int i = 0; i < num_threads_; i++) { - threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); + threads_.emplace_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); } +#ifndef EIGEN_THREAD_LOCAL + // Wait for workers to initialize per_thread_map_. Otherwise we might race + // with them in Schedule or CurrentThreadId. + init_barrier_->Wait(); +#endif } ~ThreadPoolTempl() { @@ -78,13 +83,13 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Since we were cancelled, there might be entries in the queues. // Empty them to prevent their destructor from asserting. for (size_t i = 0; i < queues_.size(); i++) { - queues_[i]->Flush(); + queues_[i].Flush(); } } // Join threads explicitly to avoid destruction order issues. - for (size_t i = 0; i < num_threads_; i++) delete threads_[i]; - for (size_t i = 0; i < num_threads_; i++) delete queues_[i]; + threads_.resize(0); + queues_.resize(0); } void Schedule(std::function<void()> fn) { @@ -92,13 +97,13 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { PerThread* pt = GetPerThread(); if (pt->pool == this) { // Worker thread of this pool, push onto the thread's queue. - Queue* q = queues_[pt->thread_id]; - t = q->PushFront(std::move(t)); + Queue& q = queues_[pt->thread_id]; + t = q.PushFront(std::move(t)); } else { // A free-standing thread (or worker of another pool), push onto a random // queue. - Queue* q = queues_[Rand(&pt->rand) % queues_.size()]; - t = q->PushBack(std::move(t)); + Queue& q = queues_[Rand(&pt->rand) % queues_.size()]; + t = q.PushBack(std::move(t)); } // Note: below we touch this after making w available to worker threads. // Strictly speaking, this can lead to a racy-use-after-free. Consider that @@ -109,8 +114,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // this is kept alive while any threads can potentially be in Schedule. if (!t.f) { ec_.Notify(false); - } - else { + } else { env_.ExecuteTask(t); // Push failed, execute directly. } } @@ -130,13 +134,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { ec_.Notify(true); } - int NumThreads() const final { - return num_threads_; - } + int NumThreads() const final { return num_threads_; } int CurrentThreadId() const final { - const PerThread* pt = - const_cast<ThreadPoolTempl*>(this)->GetPerThread(); + const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread(); if (pt->pool == this) { return pt->thread_id; } else { @@ -148,17 +149,21 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { typedef typename Environment::EnvThread Thread; struct PerThread { - constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { } + constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {} ThreadPoolTempl* pool; // Parent pool, or null for normal threads. - uint64_t rand; // Random generator state. - int thread_id; // Worker thread index in pool. + uint64_t rand; // Random generator state. + int thread_id; // Worker thread index in pool. +#ifndef EIGEN_THREAD_LOCAL + // Prevent false sharing. + char pad_[128]; +#endif }; Environment env_; const int num_threads_; const bool allow_spinning_; - MaxSizeVector<Thread*> threads_; - MaxSizeVector<Queue*> queues_; + MaxSizeVector<std::unique_ptr<Thread> > threads_; + MaxSizeVector<Queue> queues_; MaxSizeVector<unsigned> coprimes_; MaxSizeVector<EventCount::Waiter> waiters_; std::atomic<unsigned> blocked_; @@ -166,14 +171,27 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { std::atomic<bool> done_; std::atomic<bool> cancelled_; EventCount ec_; +#ifndef EIGEN_THREAD_LOCAL + std::unique_ptr<Barrier> init_barrier_; + std::mutex per_thread_map_mutex_; // Protects per_thread_map_. + std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_; +#endif // Main worker thread loop. void WorkerLoop(int thread_id) { +#ifndef EIGEN_THREAD_LOCAL + std::unique_ptr<PerThread> new_pt(new PerThread()); + per_thread_map_mutex_.lock(); + eigen_assert(per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second); + per_thread_map_mutex_.unlock(); + init_barrier_->Notify(); + init_barrier_->Wait(); +#endif PerThread* pt = GetPerThread(); pt->pool = this; - pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id()); + pt->rand = GlobalThreadIdHash(); pt->thread_id = thread_id; - Queue* q = queues_[thread_id]; + Queue& q = queues_[thread_id]; EventCount::Waiter* waiter = &waiters_[thread_id]; // TODO(dvyukov,rmlarsen): The time spent in Steal() is proportional // to num_threads_ and we assume that new work is scheduled at a @@ -189,10 +207,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // counter-productive for the types of I/O workloads the single thread // pools tend to be used for. while (!cancelled_) { - Task t = q->PopFront(); + Task t = q.PopFront(); for (int i = 0; i < spin_count && !t.f; i++) { if (!cancelled_.load(std::memory_order_relaxed)) { - t = q->PopFront(); + t = q.PopFront(); } } if (!t.f) { @@ -206,7 +224,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { } } else { while (!cancelled_) { - Task t = q->PopFront(); + Task t = q.PopFront(); if (!t.f) { t = Steal(); if (!t.f) { @@ -243,7 +261,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { unsigned inc = coprimes_[r % coprimes_.size()]; unsigned victim = r % size; for (unsigned i = 0; i < size; i++) { - Task t = queues_[victim]->PopBack(); + Task t = queues_[victim].PopBack(); if (t.f) { return t; } @@ -270,7 +288,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { if (cancelled_) { return false; } else { - *t = queues_[victim]->PopBack(); + *t = queues_[victim].PopBack(); return true; } } @@ -278,7 +296,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // If we are shutting down and all worker threads blocked without work, // that's we are done. blocked_++; - if (done_ && blocked_ == num_threads_) { + // TODO is blocked_ required to be unsigned? + if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) { ec_.CancelWait(waiter); // Almost done, but need to re-check queues. // Consider that all queues are empty and all worker threads are preempted @@ -311,7 +330,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { unsigned inc = coprimes_[r % coprimes_.size()]; unsigned victim = r % size; for (unsigned i = 0; i < size; i++) { - if (!queues_[victim]->Empty()) { + if (!queues_[victim].Empty()) { return victim; } victim += inc; @@ -322,10 +341,24 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { return -1; } - static EIGEN_STRONG_INLINE PerThread* GetPerThread() { + static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() { + return std::hash<std::thread::id>()(std::this_thread::get_id()); + } + + EIGEN_STRONG_INLINE PerThread* GetPerThread() { +#ifndef EIGEN_THREAD_LOCAL + static PerThread dummy; + auto it = per_thread_map_.find(GlobalThreadIdHash()); + if (it == per_thread_map_.end()) { + return &dummy; + } else { + return it->second.get(); + } +#else EIGEN_THREAD_LOCAL PerThread per_thread_; PerThread* pt = &per_thread_; return pt; +#endif } static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) { @@ -333,7 +366,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Update the internal state *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL; // Generate the random output (using the PCG-XSH-RS scheme) - return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61))); + return static_cast<unsigned>((current ^ (current >> 22)) >> + (22 + (current >> 61))); } }; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h index cb3690a2e..05c739aa1 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h @@ -10,7 +10,6 @@ #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ - namespace Eigen { // RunQueue is a fixed-size, partially non-blocking deque or Work items. @@ -47,7 +46,7 @@ class RunQueue { array_[i].state.store(kEmpty, std::memory_order_relaxed); } - ~RunQueue() { eigen_assert(Size() == 0); } + ~RunQueue() { eigen_plain_assert(Size() == 0); } // PushFront inserts w at the beginning of the queue. // If queue is full returns w, otherwise returns default-constructed Work. @@ -131,9 +130,8 @@ class RunQueue { Elem* e = &array_[mid & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); if (n == 0) { - if (s != kReady || - !e->state.compare_exchange_strong(s, kBusy, - std::memory_order_acquire)) + if (s != kReady || !e->state.compare_exchange_strong( + s, kBusy, std::memory_order_acquire)) continue; start = mid; } else { diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h index cfa221732..a41731c34 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h @@ -10,13 +10,45 @@ #ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H #define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H -// Try to come up with a portable implementation of thread local variables -#if EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7) -#define EIGEN_THREAD_LOCAL static __thread -#elif EIGEN_COMP_CLANG -#define EIGEN_THREAD_LOCAL static __thread -#else +#if EIGEN_MAX_CPP_VER >= 11 && \ + ((EIGEN_COMP_GNUC && EIGEN_GNUC_AT_LEAST(4, 8)) || \ + __has_feature(cxx_thread_local)) #define EIGEN_THREAD_LOCAL static thread_local #endif +// Disable TLS for Apple and Android builds with older toolchains. +#if defined(__APPLE__) +// Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED, +// __IPHONE_8_0. +#include <Availability.h> +#include <TargetConditionals.h> +#endif +// Checks whether C++11's `thread_local` storage duration specifier is +// supported. +#if defined(__apple_build_version__) && \ + ((__apple_build_version__ < 8000042) || \ + (TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0)) +// Notes: Xcode's clang did not support `thread_local` until version +// 8, and even then not for all iOS < 9.0. +#undef EIGEN_THREAD_LOCAL + +#elif defined(__ANDROID__) && EIGEN_COMP_CLANG +// There are platforms for which TLS should not be used even though the compiler +// makes it seem like it's supported (Android NDK < r12b for example). +// This is primarily because of linker problems and toolchain misconfiguration: +// TLS isn't supported until NDK r12b per +// https://developer.android.com/ndk/downloads/revision_history.html +// Since NDK r16, `__NDK_MAJOR__` and `__NDK_MINOR__` are defined in +// <android/ndk-version.h>. For NDK < r16, users should define these macros, +// e.g. `-D__NDK_MAJOR__=11 -D__NKD_MINOR__=0` for NDK r11. +#if __has_include(<android/ndk-version.h>) +#include <android/ndk-version.h> +#endif // __has_include(<android/ndk-version.h>) +#if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && \ + defined(__NDK_MINOR__) && \ + ((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1))) +#undef EIGEN_THREAD_LOCAL +#endif +#endif // defined(__ANDROID__) && defined(__clang__) + #endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H |