aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported/Eigen/CXX11/src/ThreadPool
diff options
context:
space:
mode:
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool')
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h64
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h5
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h106
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h8
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h44
5 files changed, 178 insertions, 49 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h b/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h
new file mode 100644
index 000000000..ef5e9ff18
--- /dev/null
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h
@@ -0,0 +1,64 @@
+// This file is part of Eigen, a lightweight C++ template library
+// for linear algebra.
+//
+// Copyright (C) 2018 Rasmus Munk Larsen <rmlarsen@google.com>
+//
+// This Source Code Form is subject to the terms of the Mozilla
+// Public License v. 2.0. If a copy of the MPL was not distributed
+// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+// Barrier is an object that allows one or more threads to wait until
+// Notify has been called a specified number of times.
+
+#ifndef EIGEN_CXX11_THREADPOOL_BARRIER_H
+#define EIGEN_CXX11_THREADPOOL_BARRIER_H
+
+namespace Eigen {
+
+class Barrier {
+ public:
+ Barrier(unsigned int count) : state_(count << 1), notified_(false) {
+ eigen_assert(((count << 1) >> 1) == count);
+ }
+ ~Barrier() { eigen_plain_assert((state_ >> 1) == 0); }
+
+ void Notify() {
+ unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2;
+ if (v != 1) {
+ eigen_assert(((v + 2) & ~1) != 0);
+ return; // either count has not dropped to 0, or waiter is not waiting
+ }
+ std::unique_lock<std::mutex> l(mu_);
+ eigen_assert(!notified_);
+ notified_ = true;
+ cv_.notify_all();
+ }
+
+ void Wait() {
+ unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel);
+ if ((v >> 1) == 0) return;
+ std::unique_lock<std::mutex> l(mu_);
+ while (!notified_) {
+ cv_.wait(l);
+ }
+ }
+
+ private:
+ std::mutex mu_;
+ std::condition_variable cv_;
+ std::atomic<unsigned int> state_; // low bit is waiter flag
+ bool notified_;
+};
+
+// Notification is an object that allows a user to to wait for another
+// thread to signal a notification that an event has occurred.
+//
+// Multiple threads can wait on the same Notification object,
+// but only one caller must call Notify() on the object.
+struct Notification : Barrier {
+ Notification() : Barrier(1){};
+};
+
+} // namespace Eigen
+
+#endif // EIGEN_CXX11_THREADPOOL_BARRIER_H
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
index 0a7181102..22c952ae1 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
@@ -58,7 +58,7 @@ class EventCount {
~EventCount() {
// Ensure there are no waiters.
- eigen_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask);
+ eigen_plain_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask);
}
// Prewait prepares for waiting.
@@ -169,7 +169,8 @@ class EventCount {
class Waiter {
friend class EventCount;
- // Align to 128 byte boundary to prevent false sharing with other Waiter objects in the same vector.
+ // Align to 128 byte boundary to prevent false sharing with other Waiter
+ // objects in the same vector.
EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<Waiter*> next;
std::mutex mu;
std::condition_variable cv;
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
index ecd49f382..60a0c9fb6 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
@@ -10,7 +10,6 @@
#ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
#define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
-
namespace Eigen {
template <typename Environment>
@@ -23,7 +22,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
: ThreadPoolTempl(num_threads, true, env) {}
ThreadPoolTempl(int num_threads, bool allow_spinning,
- Environment env = Environment())
+ Environment env = Environment())
: env_(env),
num_threads_(num_threads),
allow_spinning_(allow_spinning),
@@ -58,12 +57,18 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
coprimes_.push_back(i);
}
}
+ queues_.resize(num_threads_);
+#ifndef EIGEN_THREAD_LOCAL
+ init_barrier_.reset(new Barrier(num_threads_));
+#endif
for (int i = 0; i < num_threads_; i++) {
- queues_.push_back(new Queue());
- }
- for (int i = 0; i < num_threads_; i++) {
- threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
+ threads_.emplace_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
}
+#ifndef EIGEN_THREAD_LOCAL
+ // Wait for workers to initialize per_thread_map_. Otherwise we might race
+ // with them in Schedule or CurrentThreadId.
+ init_barrier_->Wait();
+#endif
}
~ThreadPoolTempl() {
@@ -78,13 +83,13 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// Since we were cancelled, there might be entries in the queues.
// Empty them to prevent their destructor from asserting.
for (size_t i = 0; i < queues_.size(); i++) {
- queues_[i]->Flush();
+ queues_[i].Flush();
}
}
// Join threads explicitly to avoid destruction order issues.
- for (size_t i = 0; i < num_threads_; i++) delete threads_[i];
- for (size_t i = 0; i < num_threads_; i++) delete queues_[i];
+ threads_.resize(0);
+ queues_.resize(0);
}
void Schedule(std::function<void()> fn) {
@@ -92,13 +97,13 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
PerThread* pt = GetPerThread();
if (pt->pool == this) {
// Worker thread of this pool, push onto the thread's queue.
- Queue* q = queues_[pt->thread_id];
- t = q->PushFront(std::move(t));
+ Queue& q = queues_[pt->thread_id];
+ t = q.PushFront(std::move(t));
} else {
// A free-standing thread (or worker of another pool), push onto a random
// queue.
- Queue* q = queues_[Rand(&pt->rand) % queues_.size()];
- t = q->PushBack(std::move(t));
+ Queue& q = queues_[Rand(&pt->rand) % queues_.size()];
+ t = q.PushBack(std::move(t));
}
// Note: below we touch this after making w available to worker threads.
// Strictly speaking, this can lead to a racy-use-after-free. Consider that
@@ -109,8 +114,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// this is kept alive while any threads can potentially be in Schedule.
if (!t.f) {
ec_.Notify(false);
- }
- else {
+ } else {
env_.ExecuteTask(t); // Push failed, execute directly.
}
}
@@ -130,13 +134,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
ec_.Notify(true);
}
- int NumThreads() const final {
- return num_threads_;
- }
+ int NumThreads() const final { return num_threads_; }
int CurrentThreadId() const final {
- const PerThread* pt =
- const_cast<ThreadPoolTempl*>(this)->GetPerThread();
+ const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
if (pt->pool == this) {
return pt->thread_id;
} else {
@@ -148,17 +149,21 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
typedef typename Environment::EnvThread Thread;
struct PerThread {
- constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { }
+ constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {}
ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
- uint64_t rand; // Random generator state.
- int thread_id; // Worker thread index in pool.
+ uint64_t rand; // Random generator state.
+ int thread_id; // Worker thread index in pool.
+#ifndef EIGEN_THREAD_LOCAL
+ // Prevent false sharing.
+ char pad_[128];
+#endif
};
Environment env_;
const int num_threads_;
const bool allow_spinning_;
- MaxSizeVector<Thread*> threads_;
- MaxSizeVector<Queue*> queues_;
+ MaxSizeVector<std::unique_ptr<Thread> > threads_;
+ MaxSizeVector<Queue> queues_;
MaxSizeVector<unsigned> coprimes_;
MaxSizeVector<EventCount::Waiter> waiters_;
std::atomic<unsigned> blocked_;
@@ -166,14 +171,27 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
std::atomic<bool> done_;
std::atomic<bool> cancelled_;
EventCount ec_;
+#ifndef EIGEN_THREAD_LOCAL
+ std::unique_ptr<Barrier> init_barrier_;
+ std::mutex per_thread_map_mutex_; // Protects per_thread_map_.
+ std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_;
+#endif
// Main worker thread loop.
void WorkerLoop(int thread_id) {
+#ifndef EIGEN_THREAD_LOCAL
+ std::unique_ptr<PerThread> new_pt(new PerThread());
+ per_thread_map_mutex_.lock();
+ eigen_assert(per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second);
+ per_thread_map_mutex_.unlock();
+ init_barrier_->Notify();
+ init_barrier_->Wait();
+#endif
PerThread* pt = GetPerThread();
pt->pool = this;
- pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
+ pt->rand = GlobalThreadIdHash();
pt->thread_id = thread_id;
- Queue* q = queues_[thread_id];
+ Queue& q = queues_[thread_id];
EventCount::Waiter* waiter = &waiters_[thread_id];
// TODO(dvyukov,rmlarsen): The time spent in Steal() is proportional
// to num_threads_ and we assume that new work is scheduled at a
@@ -189,10 +207,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// counter-productive for the types of I/O workloads the single thread
// pools tend to be used for.
while (!cancelled_) {
- Task t = q->PopFront();
+ Task t = q.PopFront();
for (int i = 0; i < spin_count && !t.f; i++) {
if (!cancelled_.load(std::memory_order_relaxed)) {
- t = q->PopFront();
+ t = q.PopFront();
}
}
if (!t.f) {
@@ -206,7 +224,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
}
} else {
while (!cancelled_) {
- Task t = q->PopFront();
+ Task t = q.PopFront();
if (!t.f) {
t = Steal();
if (!t.f) {
@@ -243,7 +261,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
unsigned inc = coprimes_[r % coprimes_.size()];
unsigned victim = r % size;
for (unsigned i = 0; i < size; i++) {
- Task t = queues_[victim]->PopBack();
+ Task t = queues_[victim].PopBack();
if (t.f) {
return t;
}
@@ -270,7 +288,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
if (cancelled_) {
return false;
} else {
- *t = queues_[victim]->PopBack();
+ *t = queues_[victim].PopBack();
return true;
}
}
@@ -278,7 +296,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// If we are shutting down and all worker threads blocked without work,
// that's we are done.
blocked_++;
- if (done_ && blocked_ == num_threads_) {
+ // TODO is blocked_ required to be unsigned?
+ if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
ec_.CancelWait(waiter);
// Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted
@@ -311,7 +330,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
unsigned inc = coprimes_[r % coprimes_.size()];
unsigned victim = r % size;
for (unsigned i = 0; i < size; i++) {
- if (!queues_[victim]->Empty()) {
+ if (!queues_[victim].Empty()) {
return victim;
}
victim += inc;
@@ -322,10 +341,24 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
return -1;
}
- static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
+ static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
+ return std::hash<std::thread::id>()(std::this_thread::get_id());
+ }
+
+ EIGEN_STRONG_INLINE PerThread* GetPerThread() {
+#ifndef EIGEN_THREAD_LOCAL
+ static PerThread dummy;
+ auto it = per_thread_map_.find(GlobalThreadIdHash());
+ if (it == per_thread_map_.end()) {
+ return &dummy;
+ } else {
+ return it->second.get();
+ }
+#else
EIGEN_THREAD_LOCAL PerThread per_thread_;
PerThread* pt = &per_thread_;
return pt;
+#endif
}
static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
@@ -333,7 +366,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// Update the internal state
*state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
// Generate the random output (using the PCG-XSH-RS scheme)
- return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
+ return static_cast<unsigned>((current ^ (current >> 22)) >>
+ (22 + (current >> 61)));
}
};
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
index cb3690a2e..05c739aa1 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
@@ -10,7 +10,6 @@
#ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
#define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
-
namespace Eigen {
// RunQueue is a fixed-size, partially non-blocking deque or Work items.
@@ -47,7 +46,7 @@ class RunQueue {
array_[i].state.store(kEmpty, std::memory_order_relaxed);
}
- ~RunQueue() { eigen_assert(Size() == 0); }
+ ~RunQueue() { eigen_plain_assert(Size() == 0); }
// PushFront inserts w at the beginning of the queue.
// If queue is full returns w, otherwise returns default-constructed Work.
@@ -131,9 +130,8 @@ class RunQueue {
Elem* e = &array_[mid & kMask];
uint8_t s = e->state.load(std::memory_order_relaxed);
if (n == 0) {
- if (s != kReady ||
- !e->state.compare_exchange_strong(s, kBusy,
- std::memory_order_acquire))
+ if (s != kReady || !e->state.compare_exchange_strong(
+ s, kBusy, std::memory_order_acquire))
continue;
start = mid;
} else {
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h
index cfa221732..a41731c34 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h
@@ -10,13 +10,45 @@
#ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
#define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
-// Try to come up with a portable implementation of thread local variables
-#if EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7)
-#define EIGEN_THREAD_LOCAL static __thread
-#elif EIGEN_COMP_CLANG
-#define EIGEN_THREAD_LOCAL static __thread
-#else
+#if EIGEN_MAX_CPP_VER >= 11 && \
+ ((EIGEN_COMP_GNUC && EIGEN_GNUC_AT_LEAST(4, 8)) || \
+ __has_feature(cxx_thread_local))
#define EIGEN_THREAD_LOCAL static thread_local
#endif
+// Disable TLS for Apple and Android builds with older toolchains.
+#if defined(__APPLE__)
+// Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED,
+// __IPHONE_8_0.
+#include <Availability.h>
+#include <TargetConditionals.h>
+#endif
+// Checks whether C++11's `thread_local` storage duration specifier is
+// supported.
+#if defined(__apple_build_version__) && \
+ ((__apple_build_version__ < 8000042) || \
+ (TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0))
+// Notes: Xcode's clang did not support `thread_local` until version
+// 8, and even then not for all iOS < 9.0.
+#undef EIGEN_THREAD_LOCAL
+
+#elif defined(__ANDROID__) && EIGEN_COMP_CLANG
+// There are platforms for which TLS should not be used even though the compiler
+// makes it seem like it's supported (Android NDK < r12b for example).
+// This is primarily because of linker problems and toolchain misconfiguration:
+// TLS isn't supported until NDK r12b per
+// https://developer.android.com/ndk/downloads/revision_history.html
+// Since NDK r16, `__NDK_MAJOR__` and `__NDK_MINOR__` are defined in
+// <android/ndk-version.h>. For NDK < r16, users should define these macros,
+// e.g. `-D__NDK_MAJOR__=11 -D__NKD_MINOR__=0` for NDK r11.
+#if __has_include(<android/ndk-version.h>)
+#include <android/ndk-version.h>
+#endif // __has_include(<android/ndk-version.h>)
+#if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && \
+ defined(__NDK_MINOR__) && \
+ ((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1)))
+#undef EIGEN_THREAD_LOCAL
+#endif
+#endif // defined(__ANDROID__) && defined(__clang__)
+
#endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H