diff options
author | Rasmus Larsen <rmlarsen@google.com> | 2019-05-13 20:04:35 +0000 |
---|---|---|
committer | Rasmus Larsen <rmlarsen@google.com> | 2019-05-13 20:04:35 +0000 |
commit | c8d8d5c0fcfe31eb43005245e36627e104ad2e5f (patch) | |
tree | 770939bbd38bfbea7e767ba535c978cb968c2cf7 | |
parent | 5f32b79edc47c5d010755889a091d0b3a39a0f14 (diff) | |
parent | e5ac8cbd7a12defb5e75bcaeaa460b90e9f0c5f1 (diff) |
Merged in rmlarsen/eigen_threadpool (pull request PR-640)
Fix deadlocks in thread pool.
Approved-by: Eugene Zhulenev <ezhulenev@google.com>
4 files changed, 18 insertions, 23 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h index 8b3b210b1..4549aa069 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h @@ -20,8 +20,7 @@ namespace Eigen { // if (predicate) // return act(); // EventCount::Waiter& w = waiters[my_index]; -// if (!ec.Prewait(&w)) -// return act(); +// ec.Prewait(&w); // if (predicate) { // ec.CancelWait(&w); // return act(); @@ -62,23 +61,17 @@ class EventCount { } // Prewait prepares for waiting. - // If Prewait returns true, the thread must re-check the wait predicate + // After calling Prewait, the thread must re-check the wait predicate // and then call either CancelWait or CommitWait. - // Otherwise, the thread should assume the predicate may be true - // and don't call CancelWait/CommitWait (there was a concurrent Notify call). - bool Prewait() { + void Prewait() { uint64_t state = state_.load(std::memory_order_relaxed); for (;;) { CheckState(state); uint64_t newstate = state + kWaiterInc; - if ((state & kSignalMask) != 0) { - // Consume the signal and cancel waiting. - newstate -= kSignalInc + kWaiterInc; - } CheckState(newstate); if (state_.compare_exchange_weak(state, newstate, std::memory_order_seq_cst)) - return (state & kSignalMask) == 0; + return; } } @@ -118,8 +111,13 @@ class EventCount { for (;;) { CheckState(state, true); uint64_t newstate = state - kWaiterInc; - // Also take away a signal if any. - if ((state & kSignalMask) != 0) newstate -= kSignalInc; + // We don't know if the thread was also notified or not, + // so we should not consume a signal unconditionaly. + // Only if number of waiters is equal to number of signals, + // we know that the thread was notified and we must take away the signal. + if (((state & kWaiterMask) >> kWaiterShift) == + ((state & kSignalMask) >> kSignalShift)) + newstate -= kSignalInc; CheckState(newstate); if (state_.compare_exchange_weak(state, newstate, std::memory_order_acq_rel)) diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index 9e54254c1..9353f41e2 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -379,7 +379,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { eigen_plain_assert(!t->f); // We already did best-effort emptiness check in Steal, so prepare for // blocking. - if (!ec_.Prewait()) return true; + ec_.Prewait(); // Now do a reliable emptiness check. int victim = NonEmptyQueueIndex(); if (victim != -1) { diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h index a9ae05fc6..b572ebcdf 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h @@ -97,11 +97,9 @@ class RunQueue { } // PopBack removes and returns the last elements in the queue. - // Can fail spuriously. Work PopBack() { if (Empty()) return Work(); - std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); - if (!lock) return Work(); + std::unique_lock<std::mutex> lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); Elem* e = &array_[back & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); @@ -115,11 +113,10 @@ class RunQueue { } // PopBackHalf removes and returns half last elements in the queue. - // Returns number of elements removed. But can also fail spuriously. + // Returns number of elements removed. unsigned PopBackHalf(std::vector<Work>* result) { if (Empty()) return 0; - std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); - if (!lock) return 0; + std::unique_lock<std::mutex> lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); unsigned size = Size(); unsigned mid = back; diff --git a/unsupported/test/cxx11_eventcount.cpp b/unsupported/test/cxx11_eventcount.cpp index 3ca8598c7..7bf4e965f 100644 --- a/unsupported/test/cxx11_eventcount.cpp +++ b/unsupported/test/cxx11_eventcount.cpp @@ -30,10 +30,10 @@ static void test_basic_eventcount() EventCount ec(waiters); EventCount::Waiter& w = waiters[0]; ec.Notify(false); - VERIFY(ec.Prewait()); + ec.Prewait(); ec.Notify(true); ec.CommitWait(&w); - VERIFY(ec.Prewait()); + ec.Prewait(); ec.CancelWait(); } @@ -112,7 +112,7 @@ static void test_stress_eventcount() unsigned idx = rand_reentrant(&rnd) % kQueues; if (queues[idx].Pop()) continue; j--; - if (!ec.Prewait()) continue; + ec.Prewait(); bool empty = true; for (int q = 0; q < kQueues; q++) { if (!queues[q].Empty()) { |