diff options
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h')
-rw-r--r-- | unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h | 234 |
1 files changed, 234 insertions, 0 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h new file mode 100644 index 000000000..6dd64f185 --- /dev/null +++ b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h @@ -0,0 +1,234 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com> +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_ +#define EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_ + +namespace Eigen { + +// EventCount allows to wait for arbitrary predicates in non-blocking +// algorithms. Think of condition variable, but wait predicate does not need to +// be protected by a mutex. Usage: +// Waiting thread does: +// +// if (predicate) +// return act(); +// EventCount::Waiter& w = waiters[my_index]; +// ec.Prewait(&w); +// if (predicate) { +// ec.CancelWait(&w); +// return act(); +// } +// ec.CommitWait(&w); +// +// Notifying thread does: +// +// predicate = true; +// ec.Notify(true); +// +// Notify is cheap if there are no waiting threads. Prewait/CommitWait are not +// cheap, but they are executed only if the preceeding predicate check has +// failed. +// +// Algorihtm outline: +// There are two main variables: predicate (managed by user) and state_. +// Operation closely resembles Dekker mutual algorithm: +// https://en.wikipedia.org/wiki/Dekker%27s_algorithm +// Waiting thread sets state_ then checks predicate, Notifying thread sets +// predicate then checks state_. Due to seq_cst fences in between these +// operations it is guaranteed than either waiter will see predicate change +// and won't block, or notifying thread will see state_ change and will unblock +// the waiter, or both. But it can't happen that both threads don't see each +// other changes, which would lead to deadlock. +class EventCount { + public: + class Waiter; + + EventCount(std::vector<Waiter>& waiters) : waiters_(waiters) { + eigen_assert(waiters.size() < (1 << kWaiterBits) - 1); + // Initialize epoch to something close to overflow to test overflow. + state_ = kStackMask | (kEpochMask - kEpochInc * waiters.size() * 2); + } + + ~EventCount() { + // Ensure there are no waiters. + eigen_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask); + } + + // Prewait prepares for waiting. + // After calling this function the thread must re-check the wait predicate + // and call either CancelWait or CommitWait passing the same Waiter object. + void Prewait(Waiter* w) { + w->epoch = state_.fetch_add(kWaiterInc, std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_seq_cst); + } + + // CommitWait commits waiting. + void CommitWait(Waiter* w) { + w->state = Waiter::kNotSignaled; + // Modification epoch of this waiter. + uint64_t epoch = + (w->epoch & kEpochMask) + + (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift); + uint64_t state = state_.load(std::memory_order_seq_cst); + for (;;) { + if (int64_t((state & kEpochMask) - epoch) < 0) { + // The preceeding waiter has not decided on its fate. Wait until it + // calls either CancelWait or CommitWait, or is notified. + EIGEN_THREAD_YIELD(); + state = state_.load(std::memory_order_seq_cst); + continue; + } + // We've already been notified. + if (int64_t((state & kEpochMask) - epoch) > 0) return; + // Remove this thread from prewait counter and add it to the waiter list. + eigen_assert((state & kWaiterMask) != 0); + uint64_t newstate = state - kWaiterInc + kEpochInc; + newstate = (newstate & ~kStackMask) | (w - &waiters_[0]); + if ((state & kStackMask) == kStackMask) + w->next.store(nullptr, std::memory_order_relaxed); + else + w->next.store(&waiters_[state & kStackMask], std::memory_order_relaxed); + if (state_.compare_exchange_weak(state, newstate, + std::memory_order_release)) + break; + } + Park(w); + } + + // CancelWait cancels effects of the previous Prewait call. + void CancelWait(Waiter* w) { + uint64_t epoch = + (w->epoch & kEpochMask) + + (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift); + uint64_t state = state_.load(std::memory_order_relaxed); + for (;;) { + if (int64_t((state & kEpochMask) - epoch) < 0) { + // The preceeding waiter has not decided on its fate. Wait until it + // calls either CancelWait or CommitWait, or is notified. + EIGEN_THREAD_YIELD(); + state = state_.load(std::memory_order_relaxed); + continue; + } + // We've already been notified. + if (int64_t((state & kEpochMask) - epoch) > 0) return; + // Remove this thread from prewait counter. + eigen_assert((state & kWaiterMask) != 0); + if (state_.compare_exchange_weak(state, state - kWaiterInc + kEpochInc, + std::memory_order_relaxed)) + return; + } + } + + // Notify wakes one or all waiting threads. + // Must be called after changing the associated wait predicate. + void Notify(bool all) { + std::atomic_thread_fence(std::memory_order_seq_cst); + uint64_t state = state_.load(std::memory_order_acquire); + for (;;) { + // Easy case: no waiters. + if ((state & kStackMask) == kStackMask && (state & kWaiterMask) == 0) + return; + uint64_t waiters = (state & kWaiterMask) >> kWaiterShift; + uint64_t newstate; + if (all) { + // Reset prewait counter and empty wait list. + newstate = (state & kEpochMask) + (kEpochInc * waiters) + kStackMask; + } else if (waiters) { + // There is a thread in pre-wait state, unblock it. + newstate = state + kEpochInc - kWaiterInc; + } else { + // Pop a waiter from list and unpark it. + Waiter* w = &waiters_[state & kStackMask]; + Waiter* wnext = w->next.load(std::memory_order_relaxed); + uint64_t next = kStackMask; + if (wnext != nullptr) next = wnext - &waiters_[0]; + // Note: we don't add kEpochInc here. ABA problem on the lock-free stack + // can't happen because a waiter is re-pushed onto the stack only after + // it was in the pre-wait state which inevitably leads to epoch + // increment. + newstate = (state & kEpochMask) + next; + } + if (state_.compare_exchange_weak(state, newstate, + std::memory_order_acquire)) { + if (!all && waiters) return; // unblocked pre-wait thread + if ((state & kStackMask) == kStackMask) return; + Waiter* w = &waiters_[state & kStackMask]; + if (!all) w->next.store(nullptr, std::memory_order_relaxed); + Unpark(w); + return; + } + } + } + + class Waiter { + friend class EventCount; + std::atomic<Waiter*> next; + std::mutex mu; + std::condition_variable cv; + uint64_t epoch; + unsigned state; + enum { + kNotSignaled, + kWaiting, + kSignaled, + }; + // Prevent false sharing with other Waiter objects in the same vector. + char pad_[128]; + }; + + private: + // State_ layout: + // - low kStackBits is a stack of waiters committed wait. + // - next kWaiterBits is count of waiters in prewait state. + // - next kEpochBits is modification counter. + static const uint64_t kStackBits = 16; + static const uint64_t kStackMask = (1ull << kStackBits) - 1; + static const uint64_t kWaiterBits = 16; + static const uint64_t kWaiterShift = 16; + static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1) + << kWaiterShift; + static const uint64_t kWaiterInc = 1ull << kWaiterBits; + static const uint64_t kEpochBits = 32; + static const uint64_t kEpochShift = 32; + static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift; + static const uint64_t kEpochInc = 1ull << kEpochShift; + std::atomic<uint64_t> state_; + std::vector<Waiter>& waiters_; + + void Park(Waiter* w) { + std::unique_lock<std::mutex> lock(w->mu); + while (w->state != Waiter::kSignaled) { + w->state = Waiter::kWaiting; + w->cv.wait(lock); + } + } + + void Unpark(Waiter* waiters) { + Waiter* next = nullptr; + for (Waiter* w = waiters; w; w = next) { + next = w->next.load(std::memory_order_relaxed); + unsigned state; + { + std::unique_lock<std::mutex> lock(w->mu); + state = w->state; + w->state = Waiter::kSignaled; + } + // Avoid notifying if it wasn't waiting. + if (state == Waiter::kWaiting) w->cv.notify_one(); + } + } + + EventCount(const EventCount&) = delete; + void operator=(const EventCount&) = delete; +}; + +} // namespace Eigen + +#endif // EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_ |