From e3dec4dcc1854972113ba7862c801737d7955972 Mon Sep 17 00:00:00 2001 From: Eugene Zhulenev Date: Mon, 9 Sep 2019 15:18:14 -0700 Subject: ThreadLocal container that does not rely on thread local storage --- unsupported/Eigen/CXX11/ThreadPool | 10 +- .../Eigen/CXX11/src/ThreadPool/ThreadLocal.h | 222 ++++++++++++++++++++- unsupported/test/CMakeLists.txt | 1 + unsupported/test/cxx11_tensor_thread_local.cpp | 158 +++++++++++++++ 4 files changed, 385 insertions(+), 6 deletions(-) create mode 100644 unsupported/test/cxx11_tensor_thread_local.cpp (limited to 'unsupported') diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool index 7a795da3d..71a6afe39 100644 --- a/unsupported/Eigen/CXX11/ThreadPool +++ b/unsupported/Eigen/CXX11/ThreadPool @@ -45,11 +45,7 @@ #include #include #include -#include "src/util/CXX11Meta.h" -#include "src/util/MaxSizeVector.h" -#include "src/ThreadPool/ThreadLocal.h" -#ifndef EIGEN_THREAD_LOCAL // There are non-parenthesized calls to "max" in the header, // which trigger a check in test/main.h causing compilation to fail. // We work around the check here by removing the check for max in @@ -58,7 +54,11 @@ #undef max #endif #include -#endif + +#include "src/util/CXX11Meta.h" +#include "src/util/MaxSizeVector.h" + +#include "src/ThreadPool/ThreadLocal.h" #include "src/ThreadPool/ThreadYield.h" #include "src/ThreadPool/ThreadCancel.h" #include "src/ThreadPool/EventCount.h" diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h index 696c2d03b..63a168372 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h @@ -60,6 +60,226 @@ #endif #endif // defined(__ANDROID__) && defined(__clang__) -#endif // EIGEN_AVOID_THREAD_LOCAL +#endif // EIGEN_AVOID_THREAD_LOCAL + +namespace Eigen { + +// Thread local container for elements of type Factory::T, that does not use +// thread local storage. It will lazily initialize elements for each thread that +// accesses this object. As long as the number of unique threads accessing this +// storage is smaller than `kAllocationMultiplier * num_threads`, it is +// lock-free and wait-free. Otherwise it will use a mutex for synchronization. +// +// Example: +// +// struct Counter { +// int value; +// } +// +// struct CounterFactory { +// using T = Counter; +// +// Counter Allocate() { return {0}; } +// void Release(Counter&) {} +// }; +// +// CounterFactory factory; +// Eigen::ThreadLocal counter(factory, 10); +// +// // Each thread will have access to it's own counter object. +// Counter& cnt = counter.local(); +// cnt++; +// +// WARNING: Eigen::ThreadLocal uses the OS-specific value returned by +// std::this_thread::get_id() to identify threads. This value is not guaranteed +// to be unique except for the life of the thread. A newly created thread may +// get an OS-specific ID equal to that of an already destroyed thread. +// +// Somewhat similar to TBB thread local storage, with similar restrictions: +// https://www.threadingbuildingblocks.org/docs/help/reference/thread_local_storage/enumerable_thread_specific_cls.html +// +template +class ThreadLocal { + // We allocate larger storage for thread local data, than the number of + // threads, because thread pool size might grow, or threads outside of a + // thread pool might steal the work. We still expect this number to be of the + // same order of magnitude as the original `num_threads`. + static constexpr int kAllocationMultiplier = 4; + + using T = typename Factory::T; + + // We preallocate default constructed elements in MaxSizedVector. + static_assert(std::is_default_constructible::value, + "ThreadLocal data type must be default constructible"); + + public: + explicit ThreadLocal(Factory& factory, int num_threads) + : factory_(factory), + num_records_(kAllocationMultiplier * num_threads), + data_(num_records_), + ptr_(num_records_), + filled_records_(0) { + eigen_assert(num_threads >= 0); + data_.resize(num_records_); + for (int i = 0; i < num_records_; ++i) { + ptr_.emplace_back(nullptr); + } + } + + T& local() { + std::thread::id this_thread = std::this_thread::get_id(); + if (num_records_ == 0) return SpilledLocal(this_thread); + + std::size_t h = std::hash()(this_thread); + const int start_idx = h % num_records_; + + // NOTE: From the definition of `std::this_thread::get_id()` it is + // guaranteed that we never can have concurrent insertions with the same key + // to our hash-map like data structure. If we didn't find an element during + // the initial traversal, it's guaranteed that no one else could have + // inserted it while we are in this function. This allows to massively + // simplify out lock-free insert-only hash map. + + // Check if we already have an element for `this_thread`. + int idx = start_idx; + while (ptr_[idx].load() != nullptr) { + ThreadIdAndValue& record = *(ptr_[idx].load()); + if (record.thread_id == this_thread) return record.value; + + idx += 1; + if (idx >= num_records_) idx -= num_records_; + if (idx == start_idx) break; + } + + // If we are here, it means that we found an insertion point in lookup + // table at `idx`, or we did a full traversal and table is full. + + // If lock-free storage is full, fallback on mutex. + if (filled_records_.load() >= num_records_) + return SpilledLocal(this_thread); + + // We double check that we still have space to insert an element into a lock + // free storage. If old value in `filled_records_` is larger than the + // records capacity, it means that some other thread added an element while + // we were traversing lookup table. + int insertion_index = + filled_records_.fetch_add(1, std::memory_order_relaxed); + if (insertion_index >= num_records_) return SpilledLocal(this_thread); + + // At this point it's guaranteed that we can access to + // data_[insertion_index_] without a data race. + data_[insertion_index] = {this_thread, factory_.Allocate()}; + + // That's the pointer we'll put into the lookup table. + ThreadIdAndValue* inserted = &data_[insertion_index]; + + // We'll use nullptr pointer to ThreadIdAndValue in a compare-and-swap loop. + ThreadIdAndValue* empty = nullptr; + + // Now we have to find an insertion point into the lookup table. We start + // from the `idx` that was identified as an insertion point above, it's + // guaranteed that we will have an empty record somewhere in a lookup table + // (because we created a record in the `data_`). + const int insertion_idx = idx; + + do { + // Always start search from the original insertion candidate. + idx = insertion_idx; + while (ptr_[idx].load() != nullptr) { + idx += 1; + if (idx >= num_records_) idx -= num_records_; + // If we did a full loop, it means that we don't have any free entries + // in the lookup table, and this means that something is terribly wrong. + eigen_assert(idx != insertion_idx); + } + // Atomic CAS of the pointer guarantees that any other thread, that will + // follow this pointer will see all the mutations in the `data_`. + } while (!ptr_[idx].compare_exchange_weak(empty, inserted)); + + return inserted->value; + } + + // WARN: It's not thread safe to call it concurrently with `local()`. + void ForEach(std::function f) { + // Reading directly from `data_` is unsafe, because only CAS to the + // record in `ptr_` makes all changes visible to other threads. + for (auto& ptr : ptr_) { + ThreadIdAndValue* record = ptr.load(); + if (record == nullptr) continue; + f(record->thread_id, record->value); + } + + // We did not spill into the map based storage. + if (filled_records_.load(std::memory_order_relaxed) < num_records_) return; + + // Adds a happens before edge from the last call to SpilledLocal(). + std::unique_lock lock(mu_); + for (auto& kv : per_thread_map_) { + f(kv.first, kv.second); + } + } + + // WARN: It's not thread safe to call it concurrently with `local()`. + ~ThreadLocal() { + // Reading directly from `data_` is unsafe, because only CAS to the record + // in `ptr_` makes all changes visible to other threads. + for (auto& ptr : ptr_) { + ThreadIdAndValue* record = ptr.load(); + if (record == nullptr) continue; + factory_.Release(record->value); + } + + // We did not spill into the map based storage. + if (filled_records_.load(std::memory_order_relaxed) < num_records_) return; + + // Adds a happens before edge from the last call to SpilledLocal(). + std::unique_lock lock(mu_); + for (auto& kv : per_thread_map_) { + factory_.Release(kv.second); + } + } + + private: + struct ThreadIdAndValue { + std::thread::id thread_id; + T value; + }; + + // Use unordered map guarded by a mutex when lock free storage is full. + T& SpilledLocal(std::thread::id this_thread) { + std::unique_lock lock(mu_); + + auto it = per_thread_map_.find(this_thread); + if (it == per_thread_map_.end()) { + auto result = per_thread_map_.emplace(this_thread, factory_.Allocate()); + eigen_assert(result.second); + return (*result.first).second; + } else { + return it->second; + } + } + + Factory& factory_; + const int num_records_; + + // Storage that backs lock-free lookup table `ptr_`. Records stored in this + // storage contiguously starting from index 0. + MaxSizeVector data_; + + // Atomic pointers to the data stored in `data_`. Used as a lookup table for + // linear probing hash map (https://en.wikipedia.org/wiki/Linear_probing). + MaxSizeVector> ptr_; + + // Number of records stored in the `data_`. + std::atomic filled_records_; + + // We fallback on per thread map if lock-free storage is full. In practice + // this should never happen, if `num_threads` is a reasonable estimate of the + // number of threads running in a system. + std::mutex mu_; // Protects per_thread_map_. + std::unordered_map per_thread_map_; +}; + +} // namespace Eigen #endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H diff --git a/unsupported/test/CMakeLists.txt b/unsupported/test/CMakeLists.txt index 42a450a85..f1f109ecb 100644 --- a/unsupported/test/CMakeLists.txt +++ b/unsupported/test/CMakeLists.txt @@ -201,6 +201,7 @@ if(EIGEN_TEST_CXX11) ei_add_test(cxx11_tensor_shuffling) ei_add_test(cxx11_tensor_striding) ei_add_test(cxx11_tensor_notification "-pthread" "${CMAKE_THREAD_LIBS_INIT}") + ei_add_test(cxx11_tensor_thread_local "-pthread" "${CMAKE_THREAD_LIBS_INIT}") ei_add_test(cxx11_tensor_thread_pool "-pthread" "${CMAKE_THREAD_LIBS_INIT}") ei_add_test(cxx11_tensor_executor "-pthread" "${CMAKE_THREAD_LIBS_INIT}") ei_add_test(cxx11_tensor_ref) diff --git a/unsupported/test/cxx11_tensor_thread_local.cpp b/unsupported/test/cxx11_tensor_thread_local.cpp new file mode 100644 index 000000000..dd43ab9d1 --- /dev/null +++ b/unsupported/test/cxx11_tensor_thread_local.cpp @@ -0,0 +1,158 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#define EIGEN_USE_THREADS + +#include +#include + +#include "main.h" +#include + +class Counter { + public: + Counter() : Counter(0) {} + explicit Counter(int value) + : created_by_(std::this_thread::get_id()), value_(value) {} + + void inc() { + // Check that mutation happens only in a thread that created this counter. + VERIFY_IS_EQUAL(std::this_thread::get_id(), created_by_); + value_++; + } + int value() { return value_; } + + private: + std::thread::id created_by_; + int value_; +}; + +struct CounterFactory { + using T = Counter; + + T Allocate() { return Counter(0); } + void Release(T&) {} +}; + +void test_simple_thread_local() { + CounterFactory factory; + int num_threads = internal::random(4, 32); + Eigen::ThreadPool thread_pool(num_threads); + Eigen::ThreadLocal counter(factory, num_threads); + + int num_tasks = 3 * num_threads; + Eigen::Barrier barrier(num_tasks); + + for (int i = 0; i < num_tasks; ++i) { + thread_pool.Schedule([&counter, &barrier]() { + Counter& local = counter.local(); + local.inc(); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + barrier.Notify(); + }); + } + + barrier.Wait(); + + counter.ForEach( + [](std::thread::id, Counter& cnt) { VERIFY_IS_EQUAL(cnt.value(), 3); }); +} + +void test_zero_sized_thread_local() { + CounterFactory factory; + Eigen::ThreadLocal counter(factory, 0); + + Counter& local = counter.local(); + local.inc(); + + int total = 0; + counter.ForEach([&total](std::thread::id, Counter& cnt) { + total += cnt.value(); + VERIFY_IS_EQUAL(cnt.value(), 1); + }); + + VERIFY_IS_EQUAL(total, 1); +} + +// All thread local values fits into the lock-free storage. +void test_large_number_of_tasks_no_spill() { + CounterFactory factory; + int num_threads = internal::random(4, 32); + Eigen::ThreadPool thread_pool(num_threads); + Eigen::ThreadLocal counter(factory, num_threads); + + int num_tasks = 10000; + Eigen::Barrier barrier(num_tasks); + + for (int i = 0; i < num_tasks; ++i) { + thread_pool.Schedule([&counter, &barrier]() { + Counter& local = counter.local(); + local.inc(); + barrier.Notify(); + }); + } + + barrier.Wait(); + + int total = 0; + std::unordered_set unique_threads; + + counter.ForEach([&](std::thread::id id, Counter& cnt) { + total += cnt.value(); + unique_threads.insert(id); + }); + + VERIFY_IS_EQUAL(total, num_tasks); + // Not all threads in a pool might be woken up to execute submitted tasks. + // Also thread_pool.Schedule() might use current thread if queue is full. + VERIFY_IS_EQUAL( + unique_threads.size() <= (static_cast(num_threads + 1)), true); +} + +// Lock free thread local storage is too small to fit all the unique threads, +// and it spills to a map guarded by a mutex. +void test_large_number_of_tasks_with_spill() { + CounterFactory factory; + int num_threads = internal::random(4, 32); + Eigen::ThreadPool thread_pool(num_threads); + Eigen::ThreadLocal counter(factory, 1); // This is too small + + int num_tasks = 10000; + Eigen::Barrier barrier(num_tasks); + + for (int i = 0; i < num_tasks; ++i) { + thread_pool.Schedule([&counter, &barrier]() { + Counter& local = counter.local(); + local.inc(); + barrier.Notify(); + }); + } + + barrier.Wait(); + + int total = 0; + std::unordered_set unique_threads; + + counter.ForEach([&](std::thread::id id, Counter& cnt) { + total += cnt.value(); + unique_threads.insert(id); + }); + + VERIFY_IS_EQUAL(total, num_tasks); + // Not all threads in a pool might be woken up to execute submitted tasks. + // Also thread_pool.Schedule() might use current thread if queue is full. + VERIFY_IS_EQUAL( + unique_threads.size() <= (static_cast(num_threads + 1)), true); +} + +EIGEN_DECLARE_TEST(cxx11_tensor_thread_local) { + CALL_SUBTEST(test_simple_thread_local()); + CALL_SUBTEST(test_zero_sized_thread_local()); + CALL_SUBTEST(test_large_number_of_tasks_no_spill()); + CALL_SUBTEST(test_large_number_of_tasks_with_spill()); +} -- cgit v1.2.3 From d918bd9a8b98f60a21b46e1d643843e9f34cf974 Mon Sep 17 00:00:00 2001 From: Eugene Zhulenev Date: Tue, 10 Sep 2019 16:13:32 -0700 Subject: Update ThreadLocal to use separate Initialize/Release callables --- .../Eigen/CXX11/src/ThreadPool/ThreadLocal.h | 114 ++++++++++++--------- unsupported/test/cxx11_tensor_thread_local.cpp | 39 +++---- 2 files changed, 80 insertions(+), 73 deletions(-) (limited to 'unsupported') diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h index 63a168372..4e6847404 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h @@ -64,27 +64,38 @@ namespace Eigen { -// Thread local container for elements of type Factory::T, that does not use -// thread local storage. It will lazily initialize elements for each thread that -// accesses this object. As long as the number of unique threads accessing this -// storage is smaller than `kAllocationMultiplier * num_threads`, it is -// lock-free and wait-free. Otherwise it will use a mutex for synchronization. +namespace internal { +template +struct ThreadLocalNoOpInitialize { + void operator()(T&) const {} +}; + +template +struct ThreadLocalNoOpRelease { + void operator()(T&) const {} +}; + +} // namespace internal + +// Thread local container for elements of type T, that does not use thread local +// storage. As long as the number of unique threads accessing this storage +// is smaller than `capacity_`, it is lock-free and wait-free. Otherwise it will +// use a mutex for synchronization. +// +// Type `T` has to be default constructible, and by default each thread will get +// a default constructed value. It is possible to specify custom `initialize` +// callable, that will be called lazily from each thread accessing this object, +// and will be passed a default initialized object of type `T`. Also it's +// possible to pass a custom `release` callable, that will be invoked before +// calling ~T(). // // Example: // // struct Counter { -// int value; +// int value = 0; // } // -// struct CounterFactory { -// using T = Counter; -// -// Counter Allocate() { return {0}; } -// void Release(Counter&) {} -// }; -// -// CounterFactory factory; -// Eigen::ThreadLocal counter(factory, 10); +// Eigen::ThreadLocal counter(10); // // // Each thread will have access to it's own counter object. // Counter& cnt = counter.local(); @@ -98,40 +109,43 @@ namespace Eigen { // Somewhat similar to TBB thread local storage, with similar restrictions: // https://www.threadingbuildingblocks.org/docs/help/reference/thread_local_storage/enumerable_thread_specific_cls.html // -template +template , + typename Release = internal::ThreadLocalNoOpRelease> class ThreadLocal { - // We allocate larger storage for thread local data, than the number of - // threads, because thread pool size might grow, or threads outside of a - // thread pool might steal the work. We still expect this number to be of the - // same order of magnitude as the original `num_threads`. - static constexpr int kAllocationMultiplier = 4; - - using T = typename Factory::T; - // We preallocate default constructed elements in MaxSizedVector. static_assert(std::is_default_constructible::value, "ThreadLocal data type must be default constructible"); public: - explicit ThreadLocal(Factory& factory, int num_threads) - : factory_(factory), - num_records_(kAllocationMultiplier * num_threads), - data_(num_records_), - ptr_(num_records_), + explicit ThreadLocal(int capacity) + : ThreadLocal(capacity, internal::ThreadLocalNoOpInitialize(), + internal::ThreadLocalNoOpRelease()) {} + + ThreadLocal(int capacity, Initialize initialize) + : ThreadLocal(capacity, std::move(initialize), + internal::ThreadLocalNoOpRelease()) {} + + ThreadLocal(int capacity, Initialize initialize, Release release) + : initialize_(std::move(initialize)), + release_(std::move(release)), + capacity_(capacity), + data_(capacity_), + ptr_(capacity_), filled_records_(0) { - eigen_assert(num_threads >= 0); - data_.resize(num_records_); - for (int i = 0; i < num_records_; ++i) { + eigen_assert(capacity_ >= 0); + data_.resize(capacity_); + for (int i = 0; i < capacity_; ++i) { ptr_.emplace_back(nullptr); } } T& local() { std::thread::id this_thread = std::this_thread::get_id(); - if (num_records_ == 0) return SpilledLocal(this_thread); + if (capacity_ == 0) return SpilledLocal(this_thread); std::size_t h = std::hash()(this_thread); - const int start_idx = h % num_records_; + const int start_idx = h % capacity_; // NOTE: From the definition of `std::this_thread::get_id()` it is // guaranteed that we never can have concurrent insertions with the same key @@ -147,7 +161,7 @@ class ThreadLocal { if (record.thread_id == this_thread) return record.value; idx += 1; - if (idx >= num_records_) idx -= num_records_; + if (idx >= capacity_) idx -= capacity_; if (idx == start_idx) break; } @@ -155,8 +169,7 @@ class ThreadLocal { // table at `idx`, or we did a full traversal and table is full. // If lock-free storage is full, fallback on mutex. - if (filled_records_.load() >= num_records_) - return SpilledLocal(this_thread); + if (filled_records_.load() >= capacity_) return SpilledLocal(this_thread); // We double check that we still have space to insert an element into a lock // free storage. If old value in `filled_records_` is larger than the @@ -164,11 +177,12 @@ class ThreadLocal { // we were traversing lookup table. int insertion_index = filled_records_.fetch_add(1, std::memory_order_relaxed); - if (insertion_index >= num_records_) return SpilledLocal(this_thread); + if (insertion_index >= capacity_) return SpilledLocal(this_thread); // At this point it's guaranteed that we can access to // data_[insertion_index_] without a data race. - data_[insertion_index] = {this_thread, factory_.Allocate()}; + data_[insertion_index].thread_id = this_thread; + initialize_(data_[insertion_index].value); // That's the pointer we'll put into the lookup table. ThreadIdAndValue* inserted = &data_[insertion_index]; @@ -187,7 +201,7 @@ class ThreadLocal { idx = insertion_idx; while (ptr_[idx].load() != nullptr) { idx += 1; - if (idx >= num_records_) idx -= num_records_; + if (idx >= capacity_) idx -= capacity_; // If we did a full loop, it means that we don't have any free entries // in the lookup table, and this means that something is terribly wrong. eigen_assert(idx != insertion_idx); @@ -200,7 +214,7 @@ class ThreadLocal { } // WARN: It's not thread safe to call it concurrently with `local()`. - void ForEach(std::function f) { + void ForEach(std::function f) { // Reading directly from `data_` is unsafe, because only CAS to the // record in `ptr_` makes all changes visible to other threads. for (auto& ptr : ptr_) { @@ -210,7 +224,7 @@ class ThreadLocal { } // We did not spill into the map based storage. - if (filled_records_.load(std::memory_order_relaxed) < num_records_) return; + if (filled_records_.load(std::memory_order_relaxed) < capacity_) return; // Adds a happens before edge from the last call to SpilledLocal(). std::unique_lock lock(mu_); @@ -226,16 +240,16 @@ class ThreadLocal { for (auto& ptr : ptr_) { ThreadIdAndValue* record = ptr.load(); if (record == nullptr) continue; - factory_.Release(record->value); + release_(record->value); } // We did not spill into the map based storage. - if (filled_records_.load(std::memory_order_relaxed) < num_records_) return; + if (filled_records_.load(std::memory_order_relaxed) < capacity_) return; // Adds a happens before edge from the last call to SpilledLocal(). std::unique_lock lock(mu_); for (auto& kv : per_thread_map_) { - factory_.Release(kv.second); + release_(kv.second); } } @@ -251,16 +265,18 @@ class ThreadLocal { auto it = per_thread_map_.find(this_thread); if (it == per_thread_map_.end()) { - auto result = per_thread_map_.emplace(this_thread, factory_.Allocate()); + auto result = per_thread_map_.emplace(this_thread, T()); eigen_assert(result.second); + initialize_((*result.first).second); return (*result.first).second; } else { return it->second; } } - Factory& factory_; - const int num_records_; + Initialize initialize_; + Release release_; + const int capacity_; // Storage that backs lock-free lookup table `ptr_`. Records stored in this // storage contiguously starting from index 0. @@ -274,7 +290,7 @@ class ThreadLocal { std::atomic filled_records_; // We fallback on per thread map if lock-free storage is full. In practice - // this should never happen, if `num_threads` is a reasonable estimate of the + // this should never happen, if `capacity_` is a reasonable estimate of the // number of threads running in a system. std::mutex mu_; // Protects per_thread_map_. std::unordered_map per_thread_map_; diff --git a/unsupported/test/cxx11_tensor_thread_local.cpp b/unsupported/test/cxx11_tensor_thread_local.cpp index dd43ab9d1..7e866f6d1 100644 --- a/unsupported/test/cxx11_tensor_thread_local.cpp +++ b/unsupported/test/cxx11_tensor_thread_local.cpp @@ -13,36 +13,30 @@ #include "main.h" #include -class Counter { - public: - Counter() : Counter(0) {} - explicit Counter(int value) - : created_by_(std::this_thread::get_id()), value_(value) {} +struct Counter { + Counter() = default; void inc() { // Check that mutation happens only in a thread that created this counter. - VERIFY_IS_EQUAL(std::this_thread::get_id(), created_by_); - value_++; + VERIFY_IS_EQUAL(std::this_thread::get_id(), created_by); + counter_value++; } - int value() { return value_; } + int value() { return counter_value; } - private: - std::thread::id created_by_; - int value_; + std::thread::id created_by; + int counter_value = 0; }; -struct CounterFactory { - using T = Counter; - - T Allocate() { return Counter(0); } - void Release(T&) {} +struct InitCounter { + void operator()(Counter& counter) { + counter.created_by = std::this_thread::get_id(); + } }; void test_simple_thread_local() { - CounterFactory factory; int num_threads = internal::random(4, 32); Eigen::ThreadPool thread_pool(num_threads); - Eigen::ThreadLocal counter(factory, num_threads); + Eigen::ThreadLocal counter(num_threads, InitCounter()); int num_tasks = 3 * num_threads; Eigen::Barrier barrier(num_tasks); @@ -64,8 +58,7 @@ void test_simple_thread_local() { } void test_zero_sized_thread_local() { - CounterFactory factory; - Eigen::ThreadLocal counter(factory, 0); + Eigen::ThreadLocal counter(0, InitCounter()); Counter& local = counter.local(); local.inc(); @@ -81,10 +74,9 @@ void test_zero_sized_thread_local() { // All thread local values fits into the lock-free storage. void test_large_number_of_tasks_no_spill() { - CounterFactory factory; int num_threads = internal::random(4, 32); Eigen::ThreadPool thread_pool(num_threads); - Eigen::ThreadLocal counter(factory, num_threads); + Eigen::ThreadLocal counter(num_threads, InitCounter()); int num_tasks = 10000; Eigen::Barrier barrier(num_tasks); @@ -117,10 +109,9 @@ void test_large_number_of_tasks_no_spill() { // Lock free thread local storage is too small to fit all the unique threads, // and it spills to a map guarded by a mutex. void test_large_number_of_tasks_with_spill() { - CounterFactory factory; int num_threads = internal::random(4, 32); Eigen::ThreadPool thread_pool(num_threads); - Eigen::ThreadLocal counter(factory, 1); // This is too small + Eigen::ThreadLocal counter(1, InitCounter()); int num_tasks = 10000; Eigen::Barrier barrier(num_tasks); -- cgit v1.2.3 From cdb377d0cba4889fc909d1bbdd430b988db0db97 Mon Sep 17 00:00:00 2001 From: Deven Desai Date: Fri, 6 Sep 2019 16:03:49 +0000 Subject: Fix for the HIP build+test errors introduced by the ndtri support. The fixes needed are * adding EIGEN_DEVICE_FUNC attribute to a couple of funcs (else HIPCC will error out when non-device funcs are called from global/device funcs) * switching to using :: instead std:: (only for HIPCC) in cases where the std:: is not recognized as a device func by HIPCC * removing an errant "j" from a testcase (don't know how that made it in to begin with!) --- Eigen/src/Core/GenericPacketMath.h | 4 ++-- Eigen/src/Core/arch/Default/GenericPacketMathFunctions.h | 4 ++-- unsupported/Eigen/src/SpecialFunctions/SpecialFunctionsImpl.h | 1 + unsupported/test/cxx11_tensor_gpu.cu | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) (limited to 'unsupported') diff --git a/Eigen/src/Core/GenericPacketMath.h b/Eigen/src/Core/GenericPacketMath.h index 5ce984caf..3118e4e5e 100644 --- a/Eigen/src/Core/GenericPacketMath.h +++ b/Eigen/src/Core/GenericPacketMath.h @@ -542,7 +542,7 @@ Packet pexpm1(const Packet& a) { return numext::expm1(a); } /** \internal \returns the log of \a a (coeff-wise) */ template EIGEN_DECLARE_FUNCTION_ALLOWING_MULTIPLE_DEFINITIONS -Packet plog(const Packet& a) { using std::log; return log(a); } +Packet plog(const Packet& a) { EIGEN_USING_STD(log); return log(a); } /** \internal \returns the log1p of \a a (coeff-wise) */ template EIGEN_DECLARE_FUNCTION_ALLOWING_MULTIPLE_DEFINITIONS @@ -554,7 +554,7 @@ Packet plog10(const Packet& a) { using std::log10; return log10(a); } /** \internal \returns the square-root of \a a (coeff-wise) */ template EIGEN_DECLARE_FUNCTION_ALLOWING_MULTIPLE_DEFINITIONS -Packet psqrt(const Packet& a) { using std::sqrt; return sqrt(a); } +Packet psqrt(const Packet& a) { EIGEN_USING_STD(sqrt); return sqrt(a); } /** \internal \returns the reciprocal square-root of \a a (coeff-wise) */ template EIGEN_DECLARE_FUNCTION_ALLOWING_MULTIPLE_DEFINITIONS diff --git a/Eigen/src/Core/arch/Default/GenericPacketMathFunctions.h b/Eigen/src/Core/arch/Default/GenericPacketMathFunctions.h index 367d14dad..518db2207 100644 --- a/Eigen/src/Core/arch/Default/GenericPacketMathFunctions.h +++ b/Eigen/src/Core/arch/Default/GenericPacketMathFunctions.h @@ -556,7 +556,7 @@ Packet pcos_float(const Packet& x) */ template struct ppolevl { - static EIGEN_STRONG_INLINE Packet run(const Packet& x, const typename unpacket_traits::type coeff[]) { + static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Packet run(const Packet& x, const typename unpacket_traits::type coeff[]) { EIGEN_STATIC_ASSERT((N > 0), YOU_MADE_A_PROGRAMMING_MISTAKE); return pmadd(ppolevl::run(x, coeff), x, pset1(coeff[N])); } @@ -564,7 +564,7 @@ struct ppolevl { template struct ppolevl { - static EIGEN_STRONG_INLINE Packet run(const Packet& x, const typename unpacket_traits::type coeff[]) { + static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Packet run(const Packet& x, const typename unpacket_traits::type coeff[]) { EIGEN_UNUSED_VARIABLE(x); return pset1(coeff[0]); } diff --git a/unsupported/Eigen/src/SpecialFunctions/SpecialFunctionsImpl.h b/unsupported/Eigen/src/SpecialFunctions/SpecialFunctionsImpl.h index 78050d0a1..a9bc205ab 100644 --- a/unsupported/Eigen/src/SpecialFunctions/SpecialFunctionsImpl.h +++ b/unsupported/Eigen/src/SpecialFunctions/SpecialFunctionsImpl.h @@ -624,6 +624,7 @@ EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE T generic_ndtri_lt_exp_neg_two( } template +EIGEN_DEVICE_FUNC EIGEN_ALWAYS_INLINE T generic_ndtri(const T& a) { const T maxnum = pset1(NumTraits::infinity()); const T neg_maxnum = pset1(-NumTraits::infinity()); diff --git a/unsupported/test/cxx11_tensor_gpu.cu b/unsupported/test/cxx11_tensor_gpu.cu index 0a2fa8e61..aa8470123 100644 --- a/unsupported/test/cxx11_tensor_gpu.cu +++ b/unsupported/test/cxx11_tensor_gpu.cu @@ -1091,7 +1091,7 @@ void test_gpu_ndtri() expected_out(1) = -std::numeric_limits::infinity(); expected_out(2) = Scalar(0.0); expected_out(3) = Scalar(-0.8416212335729142); - expected_out(4) = Scalar(0.8416212335729142);j + expected_out(4) = Scalar(0.8416212335729142); expected_out(5) = Scalar(1.2815515655446004); expected_out(6) = Scalar(-1.2815515655446004); expected_out(7) = Scalar(2.3263478740408408); -- cgit v1.2.3