From a709c8efb4927ebac338cb93865e8d0bdfcac85d Mon Sep 17 00:00:00 2001 From: Christoph Hertzberg Date: Thu, 23 Aug 2018 19:41:59 +0200 Subject: Replace pointers by values or unique_ptr for better leak-safety --- .../CXX11/src/ThreadPool/NonBlockingThreadPool.h | 40 ++++++++++------------ 1 file changed, 19 insertions(+), 21 deletions(-) (limited to 'unsupported/Eigen/CXX11/src/ThreadPool') diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index ecd49f382..a93e22a76 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -58,11 +58,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { coprimes_.push_back(i); } } + queues_.resize(num_threads_); for (int i = 0; i < num_threads_; i++) { - queues_.push_back(new Queue()); - } - for (int i = 0; i < num_threads_; i++) { - threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); + threads_.emplace_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); } } @@ -78,13 +76,12 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Since we were cancelled, there might be entries in the queues. // Empty them to prevent their destructor from asserting. for (size_t i = 0; i < queues_.size(); i++) { - queues_[i]->Flush(); + queues_[i].Flush(); } } // Join threads explicitly to avoid destruction order issues. - for (size_t i = 0; i < num_threads_; i++) delete threads_[i]; - for (size_t i = 0; i < num_threads_; i++) delete queues_[i]; + threads_.resize(0); } void Schedule(std::function fn) { @@ -92,13 +89,13 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { PerThread* pt = GetPerThread(); if (pt->pool == this) { // Worker thread of this pool, push onto the thread's queue. - Queue* q = queues_[pt->thread_id]; - t = q->PushFront(std::move(t)); + Queue& q = queues_[pt->thread_id]; + t = q.PushFront(std::move(t)); } else { // A free-standing thread (or worker of another pool), push onto a random // queue. - Queue* q = queues_[Rand(&pt->rand) % queues_.size()]; - t = q->PushBack(std::move(t)); + Queue& q = queues_[Rand(&pt->rand) % queues_.size()]; + t = q.PushBack(std::move(t)); } // Note: below we touch this after making w available to worker threads. // Strictly speaking, this can lead to a racy-use-after-free. Consider that @@ -157,8 +154,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { Environment env_; const int num_threads_; const bool allow_spinning_; - MaxSizeVector threads_; - MaxSizeVector queues_; + MaxSizeVector > threads_; + MaxSizeVector queues_; MaxSizeVector coprimes_; MaxSizeVector waiters_; std::atomic blocked_; @@ -173,7 +170,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { pt->pool = this; pt->rand = std::hash()(std::this_thread::get_id()); pt->thread_id = thread_id; - Queue* q = queues_[thread_id]; + Queue& q = queues_[thread_id]; EventCount::Waiter* waiter = &waiters_[thread_id]; // TODO(dvyukov,rmlarsen): The time spent in Steal() is proportional // to num_threads_ and we assume that new work is scheduled at a @@ -189,10 +186,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // counter-productive for the types of I/O workloads the single thread // pools tend to be used for. while (!cancelled_) { - Task t = q->PopFront(); + Task t = q.PopFront(); for (int i = 0; i < spin_count && !t.f; i++) { if (!cancelled_.load(std::memory_order_relaxed)) { - t = q->PopFront(); + t = q.PopFront(); } } if (!t.f) { @@ -206,7 +203,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { } } else { while (!cancelled_) { - Task t = q->PopFront(); + Task t = q.PopFront(); if (!t.f) { t = Steal(); if (!t.f) { @@ -243,7 +240,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { unsigned inc = coprimes_[r % coprimes_.size()]; unsigned victim = r % size; for (unsigned i = 0; i < size; i++) { - Task t = queues_[victim]->PopBack(); + Task t = queues_[victim].PopBack(); if (t.f) { return t; } @@ -270,7 +267,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { if (cancelled_) { return false; } else { - *t = queues_[victim]->PopBack(); + *t = queues_[victim].PopBack(); return true; } } @@ -278,7 +275,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // If we are shutting down and all worker threads blocked without work, // that's we are done. blocked_++; - if (done_ && blocked_ == num_threads_) { + // TODO is blocked_ required to be unsigned? + if (done_ && blocked_ == static_cast(num_threads_)) { ec_.CancelWait(waiter); // Almost done, but need to re-check queues. // Consider that all queues are empty and all worker threads are preempted @@ -311,7 +309,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { unsigned inc = coprimes_[r % coprimes_.size()]; unsigned victim = r % size; for (unsigned i = 0; i < size; i++) { - if (!queues_[victim]->Empty()) { + if (!queues_[victim].Empty()) { return victim; } victim += inc; -- cgit v1.2.3