From 8278ae63137fb2ce2cdf6fc8117df3080e5cb2fe Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Mon, 13 Aug 2018 15:31:23 -0700 Subject: Add support for thread local support on platforms that do not support it through emulation using a hash map. --- test/main.h | 6 +- unsupported/Eigen/CXX11/ThreadPool | 10 +++- .../CXX11/src/Tensor/TensorDeviceThreadPool.h | 50 ----------------- .../CXX11/src/ThreadPool/NonBlockingThreadPool.h | 65 +++++++++++++++++----- .../Eigen/CXX11/src/ThreadPool/ThreadLocal.h | 48 +++++++++++++--- 5 files changed, 103 insertions(+), 76 deletions(-) diff --git a/test/main.h b/test/main.h index de8a4865f..36784b1f4 100644 --- a/test/main.h +++ b/test/main.h @@ -125,7 +125,7 @@ inline void on_temporary_creation(long int size) { if(nb_temporaries!=(N)) { std::cerr << "nb_temporaries == " << nb_temporaries << "\n"; }\ VERIFY( (#XPR) && nb_temporaries==(N) ); \ } - + #endif #include "split_test_helper.h" @@ -328,7 +328,7 @@ namespace Eigen #define VERIFY_RAISES_STATIC_ASSERT(a) \ std::cout << "Can't VERIFY_RAISES_STATIC_ASSERT( " #a " ) with exceptions disabled\n"; #endif - + #if !defined(__CUDACC__) && !defined(__HIPCC__) && !defined(__SYCL_DEVICE_ONLY__) #define EIGEN_USE_CUSTOM_ASSERT #endif @@ -845,4 +845,4 @@ int main(int argc, char *argv[]) #ifdef _MSC_VER // 4503 - decorated name length exceeded, name was truncated #pragma warning( disable : 4503) -#endif \ No newline at end of file +#endif diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool index cbb3bbf2c..12aa07c7f 100644 --- a/unsupported/Eigen/CXX11/ThreadPool +++ b/unsupported/Eigen/CXX11/ThreadPool @@ -44,6 +44,14 @@ #include #include #include +#ifndef EIGEN_THREAD_LOCAL +// There are non-parenthesized calls to "max" in the header, +// which trigger a check in test/main.h causing compilation to fail. +// We work around the check here by removing the check for max in +// the case where we have to emulate thread_local. +#undef max +#include +#endif #include "src/util/CXX11Meta.h" #include "src/util/MaxSizeVector.h" @@ -55,6 +63,7 @@ #include "src/ThreadPool/RunQueue.h" #include "src/ThreadPool/ThreadPoolInterface.h" #include "src/ThreadPool/ThreadEnvironment.h" +#include "src/ThreadPool/Barrier.h" #include "src/ThreadPool/NonBlockingThreadPool.h" #endif @@ -62,4 +71,3 @@ #include #endif // EIGEN_CXX11_THREADPOOL_MODULE - diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index 3e3665efb..6fc6688d3 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -12,56 +12,6 @@ namespace Eigen { -// Barrier is an object that allows one or more threads to wait until -// Notify has been called a specified number of times. -class Barrier { - public: - Barrier(unsigned int count) : state_(count << 1), notified_(false) { - eigen_assert(((count << 1) >> 1) == count); - } - ~Barrier() { - eigen_assert((state_>>1) == 0); - } - - void Notify() { - unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2; - if (v != 1) { - eigen_assert(((v + 2) & ~1) != 0); - return; // either count has not dropped to 0, or waiter is not waiting - } - std::unique_lock l(mu_); - eigen_assert(!notified_); - notified_ = true; - cv_.notify_all(); - } - - void Wait() { - unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel); - if ((v >> 1) == 0) return; - std::unique_lock l(mu_); - while (!notified_) { - cv_.wait(l); - } - } - - private: - std::mutex mu_; - std::condition_variable cv_; - std::atomic state_; // low bit is waiter flag - bool notified_; -}; - - -// Notification is an object that allows a user to to wait for another -// thread to signal a notification that an event has occurred. -// -// Multiple threads can wait on the same Notification object, -// but only one caller must call Notify() on the object. -struct Notification : Barrier { - Notification() : Barrier(1) {}; -}; - - // Runs an arbitrary function and then calls Notify() on the passed in // Notification. template struct FunctionWrapperWithNotification diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index ecd49f382..ede70da8d 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -10,7 +10,6 @@ #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H - namespace Eigen { template @@ -23,7 +22,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { : ThreadPoolTempl(num_threads, true, env) {} ThreadPoolTempl(int num_threads, bool allow_spinning, - Environment env = Environment()) + Environment env = Environment()) : env_(env), num_threads_(num_threads), allow_spinning_(allow_spinning), @@ -61,9 +60,17 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { for (int i = 0; i < num_threads_; i++) { queues_.push_back(new Queue()); } +#ifndef EIGEN_THREAD_LOCAL + init_barrier_.reset(new Barrier(num_threads_)); +#endif for (int i = 0; i < num_threads_; i++) { threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); } +#ifndef EIGEN_THREAD_LOCAL + // Wait for workers to initialize per_thread_map_. Otherwise we might race + // with them in Schedule or CurrentThreadId. + init_barrier_->Wait(); +#endif } ~ThreadPoolTempl() { @@ -85,6 +92,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Join threads explicitly to avoid destruction order issues. for (size_t i = 0; i < num_threads_; i++) delete threads_[i]; for (size_t i = 0; i < num_threads_; i++) delete queues_[i]; +#ifndef EIGEN_THREAD_LOCAL + for (auto it : per_thread_map_) delete it.second; +#endif } void Schedule(std::function fn) { @@ -109,8 +119,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // this is kept alive while any threads can potentially be in Schedule. if (!t.f) { ec_.Notify(false); - } - else { + } else { env_.ExecuteTask(t); // Push failed, execute directly. } } @@ -130,13 +139,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { ec_.Notify(true); } - int NumThreads() const final { - return num_threads_; - } + int NumThreads() const final { return num_threads_; } int CurrentThreadId() const final { - const PerThread* pt = - const_cast(this)->GetPerThread(); + const PerThread* pt = const_cast(this)->GetPerThread(); if (pt->pool == this) { return pt->thread_id; } else { @@ -148,10 +154,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { typedef typename Environment::EnvThread Thread; struct PerThread { - constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { } + constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {} ThreadPoolTempl* pool; // Parent pool, or null for normal threads. - uint64_t rand; // Random generator state. - int thread_id; // Worker thread index in pool. + uint64_t rand; // Random generator state. + int thread_id; // Worker thread index in pool. }; Environment env_; @@ -166,12 +172,26 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { std::atomic done_; std::atomic cancelled_; EventCount ec_; +#ifndef EIGEN_THREAD_LOCAL + std::unique_ptr init_barrier_; + std::mutex mu; // Protects per_thread_map_. + std::unordered_map per_thread_map_; +#endif // Main worker thread loop. void WorkerLoop(int thread_id) { +#ifndef EIGEN_THREAD_LOCAL + PerThread* pt = new PerThread(); + mu.lock(); + per_thread_map_[GlobalThreadIdHash()] = pt; + mu.unlock(); + init_barrier_->Notify(); + init_barrier_->Wait(); +#else PerThread* pt = GetPerThread(); +#endif pt->pool = this; - pt->rand = std::hash()(std::this_thread::get_id()); + pt->rand = GlobalThreadIdHash(); pt->thread_id = thread_id; Queue* q = queues_[thread_id]; EventCount::Waiter* waiter = &waiters_[thread_id]; @@ -322,10 +342,24 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { return -1; } - static EIGEN_STRONG_INLINE PerThread* GetPerThread() { + static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() { + return std::hash()(std::this_thread::get_id()); + } + + EIGEN_STRONG_INLINE PerThread* GetPerThread() { +#ifndef EIGEN_THREAD_LOCAL + static PerThread dummy; + auto it = per_thread_map_.find(GlobalThreadIdHash()); + if (it == per_thread_map_.end()) { + return &dummy; + } else { + return it->second; + } +#else EIGEN_THREAD_LOCAL PerThread per_thread_; PerThread* pt = &per_thread_; return pt; +#endif } static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) { @@ -333,7 +367,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Update the internal state *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL; // Generate the random output (using the PCG-XSH-RS scheme) - return static_cast((current ^ (current >> 22)) >> (22 + (current >> 61))); + return static_cast((current ^ (current >> 22)) >> + (22 + (current >> 61))); } }; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h index cfa221732..f33759ba9 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h @@ -10,13 +10,47 @@ #ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H #define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H -// Try to come up with a portable implementation of thread local variables -#if EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7) -#define EIGEN_THREAD_LOCAL static __thread -#elif EIGEN_COMP_CLANG -#define EIGEN_THREAD_LOCAL static __thread -#else -#define EIGEN_THREAD_LOCAL static thread_local +#undef EIGEN_THREAD_LOCAL + +#if EIGEN_MAX_CPP_VER>=11 && (__has_feature(cxx_thread_local)) + #define EIGEN_THREAD_LOCAL static thread_local +#elif (EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7)) || EIGEN_COMP_CLANG + #define EIGEN_THREAD_LOCAL static __thread +#endif + +// Disable TLS for Apple and Android builds with older toolchains. +#if defined(__APPLE__) +// Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED, +// __IPHONE_8_0. +#include +#include +#endif +// Checks whether C++11's `thread_local` storage duration specifier is +// supported. +#if defined(__apple_build_version__) && \ + ((__apple_build_version__ < 8000042) || \ + (TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0)) +// Notes: Xcode's clang did not support `thread_local` until version +// 8, and even then not for all iOS < 9.0. +#undef EIGEN_THREAD_LOCAL + +#elif defined(__ANDROID__) && EIGEN_COMP_CLANG +// There are platforms for which TLS should not be used even though the compiler +// makes it seem like it's supported (Android NDK < r12b for example). +// This is primarily because of linker problems and toolchain misconfiguration: +// TLS isn't supported until NDK r12b per +// https://developer.android.com/ndk/downloads/revision_history.html +// Since NDK r16, `__NDK_MAJOR__` and `__NDK_MINOR__` are defined in +// . For NDK < r16, users should define these macros, +// e.g. `-D__NDK_MAJOR__=11 -D__NKD_MINOR__=0` for NDK r11. +#if __has_include() +#include +#endif // __has_include() +#if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && \ + defined(__NDK_MINOR__) && \ + ((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1))) +#undef EIGEN_THREAD_LOCAL #endif +#endif // defined(__ANDROID__) && defined(__clang__) #endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H -- cgit v1.2.3