aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Rasmus Larsen <rmlarsen@google.com>2019-05-13 20:04:35 +0000
committerGravatar Rasmus Larsen <rmlarsen@google.com>2019-05-13 20:04:35 +0000
commitc8d8d5c0fcfe31eb43005245e36627e104ad2e5f (patch)
tree770939bbd38bfbea7e767ba535c978cb968c2cf7
parent5f32b79edc47c5d010755889a091d0b3a39a0f14 (diff)
parente5ac8cbd7a12defb5e75bcaeaa460b90e9f0c5f1 (diff)
Merged in rmlarsen/eigen_threadpool (pull request PR-640)
Fix deadlocks in thread pool. Approved-by: Eugene Zhulenev <ezhulenev@google.com>
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h24
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h2
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h9
-rw-r--r--unsupported/test/cxx11_eventcount.cpp6
4 files changed, 18 insertions, 23 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
index 8b3b210b1..4549aa069 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
@@ -20,8 +20,7 @@ namespace Eigen {
// if (predicate)
// return act();
// EventCount::Waiter& w = waiters[my_index];
-// if (!ec.Prewait(&w))
-// return act();
+// ec.Prewait(&w);
// if (predicate) {
// ec.CancelWait(&w);
// return act();
@@ -62,23 +61,17 @@ class EventCount {
}
// Prewait prepares for waiting.
- // If Prewait returns true, the thread must re-check the wait predicate
+ // After calling Prewait, the thread must re-check the wait predicate
// and then call either CancelWait or CommitWait.
- // Otherwise, the thread should assume the predicate may be true
- // and don't call CancelWait/CommitWait (there was a concurrent Notify call).
- bool Prewait() {
+ void Prewait() {
uint64_t state = state_.load(std::memory_order_relaxed);
for (;;) {
CheckState(state);
uint64_t newstate = state + kWaiterInc;
- if ((state & kSignalMask) != 0) {
- // Consume the signal and cancel waiting.
- newstate -= kSignalInc + kWaiterInc;
- }
CheckState(newstate);
if (state_.compare_exchange_weak(state, newstate,
std::memory_order_seq_cst))
- return (state & kSignalMask) == 0;
+ return;
}
}
@@ -118,8 +111,13 @@ class EventCount {
for (;;) {
CheckState(state, true);
uint64_t newstate = state - kWaiterInc;
- // Also take away a signal if any.
- if ((state & kSignalMask) != 0) newstate -= kSignalInc;
+ // We don't know if the thread was also notified or not,
+ // so we should not consume a signal unconditionaly.
+ // Only if number of waiters is equal to number of signals,
+ // we know that the thread was notified and we must take away the signal.
+ if (((state & kWaiterMask) >> kWaiterShift) ==
+ ((state & kSignalMask) >> kSignalShift))
+ newstate -= kSignalInc;
CheckState(newstate);
if (state_.compare_exchange_weak(state, newstate,
std::memory_order_acq_rel))
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
index 9e54254c1..9353f41e2 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
@@ -379,7 +379,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
eigen_plain_assert(!t->f);
// We already did best-effort emptiness check in Steal, so prepare for
// blocking.
- if (!ec_.Prewait()) return true;
+ ec_.Prewait();
// Now do a reliable emptiness check.
int victim = NonEmptyQueueIndex();
if (victim != -1) {
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
index a9ae05fc6..b572ebcdf 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
@@ -97,11 +97,9 @@ class RunQueue {
}
// PopBack removes and returns the last elements in the queue.
- // Can fail spuriously.
Work PopBack() {
if (Empty()) return Work();
- std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
- if (!lock) return Work();
+ std::unique_lock<std::mutex> lock(mutex_);
unsigned back = back_.load(std::memory_order_relaxed);
Elem* e = &array_[back & kMask];
uint8_t s = e->state.load(std::memory_order_relaxed);
@@ -115,11 +113,10 @@ class RunQueue {
}
// PopBackHalf removes and returns half last elements in the queue.
- // Returns number of elements removed. But can also fail spuriously.
+ // Returns number of elements removed.
unsigned PopBackHalf(std::vector<Work>* result) {
if (Empty()) return 0;
- std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
- if (!lock) return 0;
+ std::unique_lock<std::mutex> lock(mutex_);
unsigned back = back_.load(std::memory_order_relaxed);
unsigned size = Size();
unsigned mid = back;
diff --git a/unsupported/test/cxx11_eventcount.cpp b/unsupported/test/cxx11_eventcount.cpp
index 3ca8598c7..7bf4e965f 100644
--- a/unsupported/test/cxx11_eventcount.cpp
+++ b/unsupported/test/cxx11_eventcount.cpp
@@ -30,10 +30,10 @@ static void test_basic_eventcount()
EventCount ec(waiters);
EventCount::Waiter& w = waiters[0];
ec.Notify(false);
- VERIFY(ec.Prewait());
+ ec.Prewait();
ec.Notify(true);
ec.CommitWait(&w);
- VERIFY(ec.Prewait());
+ ec.Prewait();
ec.CancelWait();
}
@@ -112,7 +112,7 @@ static void test_stress_eventcount()
unsigned idx = rand_reentrant(&rnd) % kQueues;
if (queues[idx].Pop()) continue;
j--;
- if (!ec.Prewait()) continue;
+ ec.Prewait();
bool empty = true;
for (int q = 0; q < kQueues; q++) {
if (!queues[q].Empty()) {