diff options
author | Benoit Steiner <benoit.steiner.goog@gmail.com> | 2016-04-14 15:23:49 -0700 |
---|---|---|
committer | Benoit Steiner <benoit.steiner.goog@gmail.com> | 2016-04-14 15:23:49 -0700 |
commit | a8e8837ba7f76c5cad3f2206cd32f90ee48eea96 (patch) | |
tree | 72a0539d8258f30818abefaef7ba7f04b7c776f2 /unsupported/test | |
parent | 78a51abc123dff49d6e1b1a6dd5b193e92ae0817 (diff) |
Added tests for the non blocking thread pool
Diffstat (limited to 'unsupported/test')
-rw-r--r-- | unsupported/test/cxx11_eventcount.cpp | 128 | ||||
-rw-r--r-- | unsupported/test/cxx11_runqueue.cpp | 214 |
2 files changed, 342 insertions, 0 deletions
diff --git a/unsupported/test/cxx11_eventcount.cpp b/unsupported/test/cxx11_eventcount.cpp new file mode 100644 index 000000000..a58c21a30 --- /dev/null +++ b/unsupported/test/cxx11_eventcount.cpp @@ -0,0 +1,128 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com> +// Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com> +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include "main.h" +#include <Eigen/CXX11/ThreadPool> + +static void test_basic_eventcount() +{ + std::vector<EventCount::Waiter> waiters(1); + EventCount ec(waiters); + EventCount::Waiter& w = waiters[0]; + ec.Notify(false); + ec.Prewait(&w); + ec.Notify(true); + ec.CommitWait(&w); + ec.Prewait(&w); + ec.CancelWait(&w); +} + +// Fake bounded counter-based queue. +struct TestQueue { + std::atomic<int> val_; + static const int kQueueSize = 10; + + TestQueue() : val_() {} + + ~TestQueue() { VERIFY_IS_EQUAL(val_.load(), 0); } + + bool Push() { + int val = val_.load(std::memory_order_relaxed); + for (;;) { + VERIFY_GE(val, 0); + VERIFY_LE(val, kQueueSize); + if (val == kQueueSize) return false; + if (val_.compare_exchange_weak(val, val + 1, std::memory_order_relaxed)) + return true; + } + } + + bool Pop() { + int val = val_.load(std::memory_order_relaxed); + for (;;) { + VERIFY_GE(val, 0); + VERIFY_LE(val, kQueueSize); + if (val == 0) return false; + if (val_.compare_exchange_weak(val, val - 1, std::memory_order_relaxed)) + return true; + } + } + + bool Empty() { return val_.load(std::memory_order_relaxed) == 0; } +}; + +const int TestQueue::kQueueSize; + +// A number of producers send messages to a set of consumers using a set of +// fake queues. Ensure that it does not crash, consumers don't deadlock and +// number of blocked and unblocked threads match. +static void test_stress_eventcount() +{ + const int kThreads = std::thread::hardware_concurrency(); + const int kEvents = 1 << 16; + const int kQueues = 10; + + std::vector<EventCount::Waiter> waiters(kThreads); + EventCount ec(waiters); + TestQueue queues[kQueues]; + + std::vector<std::unique_ptr<std::thread>> producers; + for (int i = 0; i < kThreads; i++) { + producers.emplace_back(new std::thread([&ec, &queues]() { + unsigned rnd = std::hash<std::thread::id>()(std::this_thread::get_id()); + for (int i = 0; i < kEvents; i++) { + unsigned idx = rand_r(&rnd) % kQueues; + if (queues[idx].Push()) { + ec.Notify(false); + continue; + } + std::this_thread::yield(); + i--; + } + })); + } + + std::vector<std::unique_ptr<std::thread>> consumers; + for (int i = 0; i < kThreads; i++) { + consumers.emplace_back(new std::thread([&ec, &queues, &waiters, i]() { + EventCount::Waiter& w = waiters[i]; + unsigned rnd = std::hash<std::thread::id>()(std::this_thread::get_id()); + for (int i = 0; i < kEvents; i++) { + unsigned idx = rand_r(&rnd) % kQueues; + if (queues[idx].Pop()) continue; + i--; + ec.Prewait(&w); + bool empty = true; + for (int q = 0; q < kQueues; q++) { + if (!queues[q].Empty()) { + empty = false; + break; + } + } + if (!empty) { + ec.CancelWait(&w); + continue; + } + ec.CommitWait(&w); + } + })); + } + + for (int i = 0; i < kThreads; i++) { + producers[i]->join(); + consumers[i]->join(); + } +} + +void test_cxx11_eventcount() +{ + CALL_SUBTEST(test_basic_eventcount()); + CALL_SUBTEST(test_stress_eventcount()); +} diff --git a/unsupported/test/cxx11_runqueue.cpp b/unsupported/test/cxx11_runqueue.cpp new file mode 100644 index 000000000..5a0a13103 --- /dev/null +++ b/unsupported/test/cxx11_runqueue.cpp @@ -0,0 +1,214 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com> +// Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com> +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include "main.h" + +#include <Eigen/CXX11/ThreadPool> + +static void test_basic_runqueue() +{ + RunQueue<int, 4> q; + // Check empty state. + VERIFY(q.Empty()); + VERIFY_IS_EQUAL(0, q.Size()); + VERIFY_IS_EQUAL(0, q.PopFront()); + std::vector<int> stolen; + VERIFY_IS_EQUAL(0, q.PopBackHalf(&stolen)); + VERIFY_IS_EQUAL(0, stolen.size()); + // Push one front, pop one front. + VERIFY_IS_EQUAL(0, q.PushFront(1)); + VERIFY_IS_EQUAL(1, q.Size()); + VERIFY_IS_EQUAL(1, q.PopFront()); + VERIFY_IS_EQUAL(0, q.Size()); + // Push front to overflow. + VERIFY_IS_EQUAL(0, q.PushFront(2)); + VERIFY_IS_EQUAL(1, q.Size()); + VERIFY_IS_EQUAL(0, q.PushFront(3)); + VERIFY_IS_EQUAL(2, q.Size()); + VERIFY_IS_EQUAL(0, q.PushFront(4)); + VERIFY_IS_EQUAL(3, q.Size()); + VERIFY_IS_EQUAL(0, q.PushFront(5)); + VERIFY_IS_EQUAL(4, q.Size()); + VERIFY_IS_EQUAL(6, q.PushFront(6)); + VERIFY_IS_EQUAL(4, q.Size()); + VERIFY_IS_EQUAL(5, q.PopFront()); + VERIFY_IS_EQUAL(3, q.Size()); + VERIFY_IS_EQUAL(4, q.PopFront()); + VERIFY_IS_EQUAL(2, q.Size()); + VERIFY_IS_EQUAL(3, q.PopFront()); + VERIFY_IS_EQUAL(1, q.Size()); + VERIFY_IS_EQUAL(2, q.PopFront()); + VERIFY_IS_EQUAL(0, q.Size()); + VERIFY_IS_EQUAL(0, q.PopFront()); + // Push one back, pop one back. + VERIFY_IS_EQUAL(0, q.PushBack(7)); + VERIFY_IS_EQUAL(1, q.Size()); + VERIFY_IS_EQUAL(1, q.PopBackHalf(&stolen)); + VERIFY_IS_EQUAL(1, stolen.size()); + VERIFY_IS_EQUAL(7, stolen[0]); + VERIFY_IS_EQUAL(0, q.Size()); + stolen.clear(); + // Push back to overflow. + VERIFY_IS_EQUAL(0, q.PushBack(8)); + VERIFY_IS_EQUAL(1, q.Size()); + VERIFY_IS_EQUAL(0, q.PushBack(9)); + VERIFY_IS_EQUAL(2, q.Size()); + VERIFY_IS_EQUAL(0, q.PushBack(10)); + VERIFY_IS_EQUAL(3, q.Size()); + VERIFY_IS_EQUAL(0, q.PushBack(11)); + VERIFY_IS_EQUAL(4, q.Size()); + VERIFY_IS_EQUAL(12, q.PushBack(12)); + VERIFY_IS_EQUAL(4, q.Size()); + // Pop back in halves. + VERIFY_IS_EQUAL(2, q.PopBackHalf(&stolen)); + VERIFY_IS_EQUAL(2, stolen.size()); + VERIFY_IS_EQUAL(10, stolen[0]); + VERIFY_IS_EQUAL(11, stolen[1]); + VERIFY_IS_EQUAL(2, q.Size()); + stolen.clear(); + VERIFY_IS_EQUAL(1, q.PopBackHalf(&stolen)); + VERIFY_IS_EQUAL(1, stolen.size()); + VERIFY_IS_EQUAL(9, stolen[0]); + VERIFY_IS_EQUAL(1, q.Size()); + stolen.clear(); + VERIFY_IS_EQUAL(1, q.PopBackHalf(&stolen)); + VERIFY_IS_EQUAL(1, stolen.size()); + VERIFY_IS_EQUAL(8, stolen[0]); + stolen.clear(); + VERIFY_IS_EQUAL(0, q.PopBackHalf(&stolen)); + VERIFY_IS_EQUAL(0, stolen.size()); + // Empty again. + VERIFY(q.Empty()); + VERIFY_IS_EQUAL(0, q.Size()); +} + +// Empty tests that the queue is not claimed to be empty when is is in fact not. +// Emptiness property is crucial part of thread pool blocking scheme, +// so we go to great effort to ensure this property. We create a queue with +// 1 element and then push 1 element (either front or back at random) and pop +// 1 element (either front or back at random). So queue always contains at least +// 1 element, but otherwise changes chaotically. Another thread constantly tests +// that the queue is not claimed to be empty. +static void test_empty_runqueue() +{ + RunQueue<int, 4> q; + q.PushFront(1); + std::atomic<bool> done(false); + std::thread mutator([&q, &done]() { + unsigned rnd = 0; + std::vector<int> stolen; + for (int i = 0; i < 1 << 18; i++) { + if (rand_r(&rnd) % 2) + VERIFY_IS_EQUAL(0, q.PushFront(1)); + else + VERIFY_IS_EQUAL(0, q.PushBack(1)); + if (rand_r(&rnd) % 2) + VERIFY_IS_EQUAL(1, q.PopFront()); + else { + for (;;) { + if (q.PopBackHalf(&stolen) == 1) { + stolen.clear(); + break; + } + VERIFY_IS_EQUAL(0, stolen.size()); + } + } + } + done = true; + }); + while (!done) { + VERIFY(!q.Empty()); + int size = q.Size(); + VERIFY_GE(size, 1); + VERIFY_LE(size, 2); + } + VERIFY_IS_EQUAL(1, q.PopFront()); + mutator.join(); +} + +// Stress is a chaotic random test. +// One thread (owner) calls PushFront/PopFront, other threads call PushBack/ +// PopBack. Ensure that we don't crash, deadlock, and all sanity checks pass. +static void test_stress_runqueue() +{ + const int kEvents = 1 << 18; + RunQueue<int, 8> q; + std::atomic<int> total(0); + std::vector<std::unique_ptr<std::thread>> threads; + threads.emplace_back(new std::thread([&q, &total]() { + int sum = 0; + int pushed = 1; + int popped = 1; + while (pushed < kEvents || popped < kEvents) { + if (pushed < kEvents) { + if (q.PushFront(pushed) == 0) { + sum += pushed; + pushed++; + } + } + if (popped < kEvents) { + int v = q.PopFront(); + if (v != 0) { + sum -= v; + popped++; + } + } + } + total += sum; + })); + for (int i = 0; i < 2; i++) { + threads.emplace_back(new std::thread([&q, &total]() { + int sum = 0; + for (int i = 1; i < kEvents; i++) { + if (q.PushBack(i) == 0) { + sum += i; + continue; + } + std::this_thread::yield(); + i--; + } + total += sum; + })); + threads.emplace_back(new std::thread([&q, &total]() { + int sum = 0; + std::vector<int> stolen; + for (int i = 1; i < kEvents;) { + if (q.PopBackHalf(&stolen) == 0) { + std::this_thread::yield(); + continue; + } + while (stolen.size() && i < kEvents) { + int v = stolen.back(); + stolen.pop_back(); + VERIFY_IS_NOT_EQUAL(v, 0); + sum += v; + i++; + } + } + while (stolen.size()) { + int v = stolen.back(); + stolen.pop_back(); + VERIFY_IS_NOT_EQUAL(v, 0); + while ((v = q.PushBack(v)) != 0) std::this_thread::yield(); + } + total -= sum; + })); + } + for (size_t i = 0; i < threads.size(); i++) threads[i]->join(); + VERIFY(q.Empty()); + VERIFY(total.load() == 0); +} + +void test_cxx11_runqueue() +{ + CALL_SUBTEST_1(test_basic_runqueue()); + CALL_SUBTEST_2(test_empty_runqueue()); + CALL_SUBTEST_3(test_stress_runqueue()); +} |