From e3dec4dcc1854972113ba7862c801737d7955972 Mon Sep 17 00:00:00 2001 From: Eugene Zhulenev Date: Mon, 9 Sep 2019 15:18:14 -0700 Subject: ThreadLocal container that does not rely on thread local storage --- unsupported/test/cxx11_tensor_thread_local.cpp | 158 +++++++++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 unsupported/test/cxx11_tensor_thread_local.cpp (limited to 'unsupported/test/cxx11_tensor_thread_local.cpp') diff --git a/unsupported/test/cxx11_tensor_thread_local.cpp b/unsupported/test/cxx11_tensor_thread_local.cpp new file mode 100644 index 000000000..dd43ab9d1 --- /dev/null +++ b/unsupported/test/cxx11_tensor_thread_local.cpp @@ -0,0 +1,158 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#define EIGEN_USE_THREADS + +#include +#include + +#include "main.h" +#include + +class Counter { + public: + Counter() : Counter(0) {} + explicit Counter(int value) + : created_by_(std::this_thread::get_id()), value_(value) {} + + void inc() { + // Check that mutation happens only in a thread that created this counter. + VERIFY_IS_EQUAL(std::this_thread::get_id(), created_by_); + value_++; + } + int value() { return value_; } + + private: + std::thread::id created_by_; + int value_; +}; + +struct CounterFactory { + using T = Counter; + + T Allocate() { return Counter(0); } + void Release(T&) {} +}; + +void test_simple_thread_local() { + CounterFactory factory; + int num_threads = internal::random(4, 32); + Eigen::ThreadPool thread_pool(num_threads); + Eigen::ThreadLocal counter(factory, num_threads); + + int num_tasks = 3 * num_threads; + Eigen::Barrier barrier(num_tasks); + + for (int i = 0; i < num_tasks; ++i) { + thread_pool.Schedule([&counter, &barrier]() { + Counter& local = counter.local(); + local.inc(); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + barrier.Notify(); + }); + } + + barrier.Wait(); + + counter.ForEach( + [](std::thread::id, Counter& cnt) { VERIFY_IS_EQUAL(cnt.value(), 3); }); +} + +void test_zero_sized_thread_local() { + CounterFactory factory; + Eigen::ThreadLocal counter(factory, 0); + + Counter& local = counter.local(); + local.inc(); + + int total = 0; + counter.ForEach([&total](std::thread::id, Counter& cnt) { + total += cnt.value(); + VERIFY_IS_EQUAL(cnt.value(), 1); + }); + + VERIFY_IS_EQUAL(total, 1); +} + +// All thread local values fits into the lock-free storage. +void test_large_number_of_tasks_no_spill() { + CounterFactory factory; + int num_threads = internal::random(4, 32); + Eigen::ThreadPool thread_pool(num_threads); + Eigen::ThreadLocal counter(factory, num_threads); + + int num_tasks = 10000; + Eigen::Barrier barrier(num_tasks); + + for (int i = 0; i < num_tasks; ++i) { + thread_pool.Schedule([&counter, &barrier]() { + Counter& local = counter.local(); + local.inc(); + barrier.Notify(); + }); + } + + barrier.Wait(); + + int total = 0; + std::unordered_set unique_threads; + + counter.ForEach([&](std::thread::id id, Counter& cnt) { + total += cnt.value(); + unique_threads.insert(id); + }); + + VERIFY_IS_EQUAL(total, num_tasks); + // Not all threads in a pool might be woken up to execute submitted tasks. + // Also thread_pool.Schedule() might use current thread if queue is full. + VERIFY_IS_EQUAL( + unique_threads.size() <= (static_cast(num_threads + 1)), true); +} + +// Lock free thread local storage is too small to fit all the unique threads, +// and it spills to a map guarded by a mutex. +void test_large_number_of_tasks_with_spill() { + CounterFactory factory; + int num_threads = internal::random(4, 32); + Eigen::ThreadPool thread_pool(num_threads); + Eigen::ThreadLocal counter(factory, 1); // This is too small + + int num_tasks = 10000; + Eigen::Barrier barrier(num_tasks); + + for (int i = 0; i < num_tasks; ++i) { + thread_pool.Schedule([&counter, &barrier]() { + Counter& local = counter.local(); + local.inc(); + barrier.Notify(); + }); + } + + barrier.Wait(); + + int total = 0; + std::unordered_set unique_threads; + + counter.ForEach([&](std::thread::id id, Counter& cnt) { + total += cnt.value(); + unique_threads.insert(id); + }); + + VERIFY_IS_EQUAL(total, num_tasks); + // Not all threads in a pool might be woken up to execute submitted tasks. + // Also thread_pool.Schedule() might use current thread if queue is full. + VERIFY_IS_EQUAL( + unique_threads.size() <= (static_cast(num_threads + 1)), true); +} + +EIGEN_DECLARE_TEST(cxx11_tensor_thread_local) { + CALL_SUBTEST(test_simple_thread_local()); + CALL_SUBTEST(test_zero_sized_thread_local()); + CALL_SUBTEST(test_large_number_of_tasks_no_spill()); + CALL_SUBTEST(test_large_number_of_tasks_with_spill()); +} -- cgit v1.2.3