diff options
author | Rasmus Munk Larsen <rmlarsen@google.com> | 2018-08-23 11:36:49 -0700 |
---|---|---|
committer | Rasmus Munk Larsen <rmlarsen@google.com> | 2018-08-23 11:36:49 -0700 |
commit | d35880ed919d68e08279d78ac61a21054c1df480 (patch) | |
tree | 249a51a4e4f3bcb1b039ab01c8ec7b925daafba0 /unsupported/Eigen/CXX11/src/ThreadPool | |
parent | fa0bcbf23016007877f55fec99244d1ffa9bffca (diff) | |
parent | a709c8efb4927ebac338cb93865e8d0bdfcac85d (diff) |
merge
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool')
-rw-r--r-- | unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h | 36 |
1 files changed, 19 insertions, 17 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index 354995be8..a800e827f 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -57,6 +57,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { coprimes_.push_back(i); } } + queues_.resize(num_threads_); for (int i = 0; i < num_threads_; i++) { queues_.push_back(new Queue()); } @@ -64,7 +65,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { init_barrier_.reset(new Barrier(num_threads_)); #endif for (int i = 0; i < num_threads_; i++) { - threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); + threads_.emplace_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); } #ifndef EIGEN_THREAD_LOCAL // Wait for workers to initialize per_thread_map_. Otherwise we might race @@ -85,13 +86,13 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Since we were cancelled, there might be entries in the queues. // Empty them to prevent their destructor from asserting. for (size_t i = 0; i < queues_.size(); i++) { - queues_[i]->Flush(); + queues_[i].Flush(); } } // Join threads explicitly to avoid destruction order issues. - for (int i = 0; i < num_threads_; i++) delete threads_[i]; - for (int i = 0; i < num_threads_; i++) delete queues_[i]; + threads_.resize(0); + queues_.resize(0); #ifndef EIGEN_THREAD_LOCAL for (auto it : per_thread_map_) delete it.second; #endif @@ -102,13 +103,13 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { PerThread* pt = GetPerThread(); if (pt->pool == this) { // Worker thread of this pool, push onto the thread's queue. - Queue* q = queues_[pt->thread_id]; - t = q->PushFront(std::move(t)); + Queue& q = queues_[pt->thread_id]; + t = q.PushFront(std::move(t)); } else { // A free-standing thread (or worker of another pool), push onto a random // queue. - Queue* q = queues_[Rand(&pt->rand) % queues_.size()]; - t = q->PushBack(std::move(t)); + Queue& q = queues_[Rand(&pt->rand) % queues_.size()]; + t = q.PushBack(std::move(t)); } // Note: below we touch this after making w available to worker threads. // Strictly speaking, this can lead to a racy-use-after-free. Consider that @@ -163,8 +164,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { Environment env_; const int num_threads_; const bool allow_spinning_; - MaxSizeVector<Thread*> threads_; - MaxSizeVector<Queue*> queues_; + MaxSizeVector<std::unique_ptr<Thread> > threads_; + MaxSizeVector<Queue> queues_; MaxSizeVector<unsigned> coprimes_; MaxSizeVector<EventCount::Waiter> waiters_; std::atomic<unsigned> blocked_; @@ -193,7 +194,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { pt->pool = this; pt->rand = GlobalThreadIdHash(); pt->thread_id = thread_id; - Queue* q = queues_[thread_id]; + Queue& q = queues_[thread_id]; EventCount::Waiter* waiter = &waiters_[thread_id]; // TODO(dvyukov,rmlarsen): The time spent in Steal() is proportional // to num_threads_ and we assume that new work is scheduled at a @@ -209,10 +210,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // counter-productive for the types of I/O workloads the single thread // pools tend to be used for. while (!cancelled_) { - Task t = q->PopFront(); + Task t = q.PopFront(); for (int i = 0; i < spin_count && !t.f; i++) { if (!cancelled_.load(std::memory_order_relaxed)) { - t = q->PopFront(); + t = q.PopFront(); } } if (!t.f) { @@ -226,7 +227,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { } } else { while (!cancelled_) { - Task t = q->PopFront(); + Task t = q.PopFront(); if (!t.f) { t = Steal(); if (!t.f) { @@ -263,7 +264,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { unsigned inc = coprimes_[r % coprimes_.size()]; unsigned victim = r % size; for (unsigned i = 0; i < size; i++) { - Task t = queues_[victim]->PopBack(); + Task t = queues_[victim].PopBack(); if (t.f) { return t; } @@ -290,7 +291,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { if (cancelled_) { return false; } else { - *t = queues_[victim]->PopBack(); + *t = queues_[victim].PopBack(); return true; } } @@ -298,6 +299,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // If we are shutting down and all worker threads blocked without work, // that's we are done. blocked_++; + // TODO is blocked_ required to be unsigned? if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) { ec_.CancelWait(waiter); // Almost done, but need to re-check queues. @@ -331,7 +333,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { unsigned inc = coprimes_[r % coprimes_.size()]; unsigned victim = r % size; for (unsigned i = 0; i < size; i++) { - if (!queues_[victim]->Empty()) { + if (!queues_[victim].Empty()) { return victim; } victim += inc; |