aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--test/main.h6
-rw-r--r--unsupported/Eigen/CXX11/ThreadPool10
-rw-r--r--unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h50
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h65
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h48
5 files changed, 103 insertions, 76 deletions
diff --git a/test/main.h b/test/main.h
index de8a4865f..36784b1f4 100644
--- a/test/main.h
+++ b/test/main.h
@@ -125,7 +125,7 @@ inline void on_temporary_creation(long int size) {
if(nb_temporaries!=(N)) { std::cerr << "nb_temporaries == " << nb_temporaries << "\n"; }\
VERIFY( (#XPR) && nb_temporaries==(N) ); \
}
-
+
#endif
#include "split_test_helper.h"
@@ -328,7 +328,7 @@ namespace Eigen
#define VERIFY_RAISES_STATIC_ASSERT(a) \
std::cout << "Can't VERIFY_RAISES_STATIC_ASSERT( " #a " ) with exceptions disabled\n";
#endif
-
+
#if !defined(__CUDACC__) && !defined(__HIPCC__) && !defined(__SYCL_DEVICE_ONLY__)
#define EIGEN_USE_CUSTOM_ASSERT
#endif
@@ -845,4 +845,4 @@ int main(int argc, char *argv[])
#ifdef _MSC_VER
// 4503 - decorated name length exceeded, name was truncated
#pragma warning( disable : 4503)
-#endif \ No newline at end of file
+#endif
diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool
index cbb3bbf2c..12aa07c7f 100644
--- a/unsupported/Eigen/CXX11/ThreadPool
+++ b/unsupported/Eigen/CXX11/ThreadPool
@@ -44,6 +44,14 @@
#include <thread>
#include <functional>
#include <memory>
+#ifndef EIGEN_THREAD_LOCAL
+// There are non-parenthesized calls to "max" in the <unordered_map> header,
+// which trigger a check in test/main.h causing compilation to fail.
+// We work around the check here by removing the check for max in
+// the case where we have to emulate thread_local.
+#undef max
+#include <unordered_map>
+#endif
#include "src/util/CXX11Meta.h"
#include "src/util/MaxSizeVector.h"
@@ -55,6 +63,7 @@
#include "src/ThreadPool/RunQueue.h"
#include "src/ThreadPool/ThreadPoolInterface.h"
#include "src/ThreadPool/ThreadEnvironment.h"
+#include "src/ThreadPool/Barrier.h"
#include "src/ThreadPool/NonBlockingThreadPool.h"
#endif
@@ -62,4 +71,3 @@
#include <Eigen/src/Core/util/ReenableStupidWarnings.h>
#endif // EIGEN_CXX11_THREADPOOL_MODULE
-
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
index 3e3665efb..6fc6688d3 100644
--- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h
@@ -12,56 +12,6 @@
namespace Eigen {
-// Barrier is an object that allows one or more threads to wait until
-// Notify has been called a specified number of times.
-class Barrier {
- public:
- Barrier(unsigned int count) : state_(count << 1), notified_(false) {
- eigen_assert(((count << 1) >> 1) == count);
- }
- ~Barrier() {
- eigen_assert((state_>>1) == 0);
- }
-
- void Notify() {
- unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2;
- if (v != 1) {
- eigen_assert(((v + 2) & ~1) != 0);
- return; // either count has not dropped to 0, or waiter is not waiting
- }
- std::unique_lock<std::mutex> l(mu_);
- eigen_assert(!notified_);
- notified_ = true;
- cv_.notify_all();
- }
-
- void Wait() {
- unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel);
- if ((v >> 1) == 0) return;
- std::unique_lock<std::mutex> l(mu_);
- while (!notified_) {
- cv_.wait(l);
- }
- }
-
- private:
- std::mutex mu_;
- std::condition_variable cv_;
- std::atomic<unsigned int> state_; // low bit is waiter flag
- bool notified_;
-};
-
-
-// Notification is an object that allows a user to to wait for another
-// thread to signal a notification that an event has occurred.
-//
-// Multiple threads can wait on the same Notification object,
-// but only one caller must call Notify() on the object.
-struct Notification : Barrier {
- Notification() : Barrier(1) {};
-};
-
-
// Runs an arbitrary function and then calls Notify() on the passed in
// Notification.
template <typename Function, typename... Args> struct FunctionWrapperWithNotification
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
index ecd49f382..ede70da8d 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
@@ -10,7 +10,6 @@
#ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
#define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
-
namespace Eigen {
template <typename Environment>
@@ -23,7 +22,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
: ThreadPoolTempl(num_threads, true, env) {}
ThreadPoolTempl(int num_threads, bool allow_spinning,
- Environment env = Environment())
+ Environment env = Environment())
: env_(env),
num_threads_(num_threads),
allow_spinning_(allow_spinning),
@@ -61,9 +60,17 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
for (int i = 0; i < num_threads_; i++) {
queues_.push_back(new Queue());
}
+#ifndef EIGEN_THREAD_LOCAL
+ init_barrier_.reset(new Barrier(num_threads_));
+#endif
for (int i = 0; i < num_threads_; i++) {
threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
}
+#ifndef EIGEN_THREAD_LOCAL
+ // Wait for workers to initialize per_thread_map_. Otherwise we might race
+ // with them in Schedule or CurrentThreadId.
+ init_barrier_->Wait();
+#endif
}
~ThreadPoolTempl() {
@@ -85,6 +92,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// Join threads explicitly to avoid destruction order issues.
for (size_t i = 0; i < num_threads_; i++) delete threads_[i];
for (size_t i = 0; i < num_threads_; i++) delete queues_[i];
+#ifndef EIGEN_THREAD_LOCAL
+ for (auto it : per_thread_map_) delete it.second;
+#endif
}
void Schedule(std::function<void()> fn) {
@@ -109,8 +119,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// this is kept alive while any threads can potentially be in Schedule.
if (!t.f) {
ec_.Notify(false);
- }
- else {
+ } else {
env_.ExecuteTask(t); // Push failed, execute directly.
}
}
@@ -130,13 +139,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
ec_.Notify(true);
}
- int NumThreads() const final {
- return num_threads_;
- }
+ int NumThreads() const final { return num_threads_; }
int CurrentThreadId() const final {
- const PerThread* pt =
- const_cast<ThreadPoolTempl*>(this)->GetPerThread();
+ const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
if (pt->pool == this) {
return pt->thread_id;
} else {
@@ -148,10 +154,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
typedef typename Environment::EnvThread Thread;
struct PerThread {
- constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { }
+ constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {}
ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
- uint64_t rand; // Random generator state.
- int thread_id; // Worker thread index in pool.
+ uint64_t rand; // Random generator state.
+ int thread_id; // Worker thread index in pool.
};
Environment env_;
@@ -166,12 +172,26 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
std::atomic<bool> done_;
std::atomic<bool> cancelled_;
EventCount ec_;
+#ifndef EIGEN_THREAD_LOCAL
+ std::unique_ptr<Barrier> init_barrier_;
+ std::mutex mu; // Protects per_thread_map_.
+ std::unordered_map<uint64_t, PerThread*> per_thread_map_;
+#endif
// Main worker thread loop.
void WorkerLoop(int thread_id) {
+#ifndef EIGEN_THREAD_LOCAL
+ PerThread* pt = new PerThread();
+ mu.lock();
+ per_thread_map_[GlobalThreadIdHash()] = pt;
+ mu.unlock();
+ init_barrier_->Notify();
+ init_barrier_->Wait();
+#else
PerThread* pt = GetPerThread();
+#endif
pt->pool = this;
- pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
+ pt->rand = GlobalThreadIdHash();
pt->thread_id = thread_id;
Queue* q = queues_[thread_id];
EventCount::Waiter* waiter = &waiters_[thread_id];
@@ -322,10 +342,24 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
return -1;
}
- static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
+ static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
+ return std::hash<std::thread::id>()(std::this_thread::get_id());
+ }
+
+ EIGEN_STRONG_INLINE PerThread* GetPerThread() {
+#ifndef EIGEN_THREAD_LOCAL
+ static PerThread dummy;
+ auto it = per_thread_map_.find(GlobalThreadIdHash());
+ if (it == per_thread_map_.end()) {
+ return &dummy;
+ } else {
+ return it->second;
+ }
+#else
EIGEN_THREAD_LOCAL PerThread per_thread_;
PerThread* pt = &per_thread_;
return pt;
+#endif
}
static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
@@ -333,7 +367,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// Update the internal state
*state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
// Generate the random output (using the PCG-XSH-RS scheme)
- return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
+ return static_cast<unsigned>((current ^ (current >> 22)) >>
+ (22 + (current >> 61)));
}
};
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h
index cfa221732..f33759ba9 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h
@@ -10,13 +10,47 @@
#ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
#define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
-// Try to come up with a portable implementation of thread local variables
-#if EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7)
-#define EIGEN_THREAD_LOCAL static __thread
-#elif EIGEN_COMP_CLANG
-#define EIGEN_THREAD_LOCAL static __thread
-#else
-#define EIGEN_THREAD_LOCAL static thread_local
+#undef EIGEN_THREAD_LOCAL
+
+#if EIGEN_MAX_CPP_VER>=11 && (__has_feature(cxx_thread_local))
+ #define EIGEN_THREAD_LOCAL static thread_local
+#elif (EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7)) || EIGEN_COMP_CLANG
+ #define EIGEN_THREAD_LOCAL static __thread
+#endif
+
+// Disable TLS for Apple and Android builds with older toolchains.
+#if defined(__APPLE__)
+// Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED,
+// __IPHONE_8_0.
+#include <Availability.h>
+#include <TargetConditionals.h>
+#endif
+// Checks whether C++11's `thread_local` storage duration specifier is
+// supported.
+#if defined(__apple_build_version__) && \
+ ((__apple_build_version__ < 8000042) || \
+ (TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0))
+// Notes: Xcode's clang did not support `thread_local` until version
+// 8, and even then not for all iOS < 9.0.
+#undef EIGEN_THREAD_LOCAL
+
+#elif defined(__ANDROID__) && EIGEN_COMP_CLANG
+// There are platforms for which TLS should not be used even though the compiler
+// makes it seem like it's supported (Android NDK < r12b for example).
+// This is primarily because of linker problems and toolchain misconfiguration:
+// TLS isn't supported until NDK r12b per
+// https://developer.android.com/ndk/downloads/revision_history.html
+// Since NDK r16, `__NDK_MAJOR__` and `__NDK_MINOR__` are defined in
+// <android/ndk-version.h>. For NDK < r16, users should define these macros,
+// e.g. `-D__NDK_MAJOR__=11 -D__NKD_MINOR__=0` for NDK r11.
+#if __has_include(<android/ndk-version.h>)
+#include <android/ndk-version.h>
+#endif // __has_include(<android/ndk-version.h>)
+#if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && \
+ defined(__NDK_MINOR__) && \
+ ((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1)))
+#undef EIGEN_THREAD_LOCAL
#endif
+#endif // defined(__ANDROID__) && defined(__clang__)
#endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H