aboutsummaryrefslogtreecommitdiffhomepage
path: root/unsupported/Eigen/CXX11/src/ThreadPool
diff options
context:
space:
mode:
authorGravatar Christoph Hertzberg <chtz@informatik.uni-bremen.de>2018-08-23 19:41:59 +0200
committerGravatar Christoph Hertzberg <chtz@informatik.uni-bremen.de>2018-08-23 19:41:59 +0200
commita709c8efb4927ebac338cb93865e8d0bdfcac85d (patch)
tree9e7c019c83cfb6c1c6fb1aa961aeea28643a57dc /unsupported/Eigen/CXX11/src/ThreadPool
parent39335cf51e7ea5edfe9113cb91034625a039ccbf (diff)
Replace pointers by values or unique_ptr for better leak-safety
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool')
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h40
1 files changed, 19 insertions, 21 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
index ecd49f382..a93e22a76 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
@@ -58,11 +58,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
coprimes_.push_back(i);
}
}
+ queues_.resize(num_threads_);
for (int i = 0; i < num_threads_; i++) {
- queues_.push_back(new Queue());
- }
- for (int i = 0; i < num_threads_; i++) {
- threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
+ threads_.emplace_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
}
}
@@ -78,13 +76,12 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// Since we were cancelled, there might be entries in the queues.
// Empty them to prevent their destructor from asserting.
for (size_t i = 0; i < queues_.size(); i++) {
- queues_[i]->Flush();
+ queues_[i].Flush();
}
}
// Join threads explicitly to avoid destruction order issues.
- for (size_t i = 0; i < num_threads_; i++) delete threads_[i];
- for (size_t i = 0; i < num_threads_; i++) delete queues_[i];
+ threads_.resize(0);
}
void Schedule(std::function<void()> fn) {
@@ -92,13 +89,13 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
PerThread* pt = GetPerThread();
if (pt->pool == this) {
// Worker thread of this pool, push onto the thread's queue.
- Queue* q = queues_[pt->thread_id];
- t = q->PushFront(std::move(t));
+ Queue& q = queues_[pt->thread_id];
+ t = q.PushFront(std::move(t));
} else {
// A free-standing thread (or worker of another pool), push onto a random
// queue.
- Queue* q = queues_[Rand(&pt->rand) % queues_.size()];
- t = q->PushBack(std::move(t));
+ Queue& q = queues_[Rand(&pt->rand) % queues_.size()];
+ t = q.PushBack(std::move(t));
}
// Note: below we touch this after making w available to worker threads.
// Strictly speaking, this can lead to a racy-use-after-free. Consider that
@@ -157,8 +154,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
Environment env_;
const int num_threads_;
const bool allow_spinning_;
- MaxSizeVector<Thread*> threads_;
- MaxSizeVector<Queue*> queues_;
+ MaxSizeVector<std::unique_ptr<Thread> > threads_;
+ MaxSizeVector<Queue> queues_;
MaxSizeVector<unsigned> coprimes_;
MaxSizeVector<EventCount::Waiter> waiters_;
std::atomic<unsigned> blocked_;
@@ -173,7 +170,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
pt->pool = this;
pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
pt->thread_id = thread_id;
- Queue* q = queues_[thread_id];
+ Queue& q = queues_[thread_id];
EventCount::Waiter* waiter = &waiters_[thread_id];
// TODO(dvyukov,rmlarsen): The time spent in Steal() is proportional
// to num_threads_ and we assume that new work is scheduled at a
@@ -189,10 +186,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// counter-productive for the types of I/O workloads the single thread
// pools tend to be used for.
while (!cancelled_) {
- Task t = q->PopFront();
+ Task t = q.PopFront();
for (int i = 0; i < spin_count && !t.f; i++) {
if (!cancelled_.load(std::memory_order_relaxed)) {
- t = q->PopFront();
+ t = q.PopFront();
}
}
if (!t.f) {
@@ -206,7 +203,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
}
} else {
while (!cancelled_) {
- Task t = q->PopFront();
+ Task t = q.PopFront();
if (!t.f) {
t = Steal();
if (!t.f) {
@@ -243,7 +240,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
unsigned inc = coprimes_[r % coprimes_.size()];
unsigned victim = r % size;
for (unsigned i = 0; i < size; i++) {
- Task t = queues_[victim]->PopBack();
+ Task t = queues_[victim].PopBack();
if (t.f) {
return t;
}
@@ -270,7 +267,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
if (cancelled_) {
return false;
} else {
- *t = queues_[victim]->PopBack();
+ *t = queues_[victim].PopBack();
return true;
}
}
@@ -278,7 +275,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// If we are shutting down and all worker threads blocked without work,
// that's we are done.
blocked_++;
- if (done_ && blocked_ == num_threads_) {
+ // TODO is blocked_ required to be unsigned?
+ if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
ec_.CancelWait(waiter);
// Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted
@@ -311,7 +309,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
unsigned inc = coprimes_[r % coprimes_.size()];
unsigned victim = r % size;
for (unsigned i = 0; i < size; i++) {
- if (!queues_[victim]->Empty()) {
+ if (!queues_[victim].Empty()) {
return victim;
}
victim += inc;