diff options
author | Eugene Zhulenev <ezhulenev@google.com> | 2018-07-16 15:06:57 -0700 |
---|---|---|
committer | Eugene Zhulenev <ezhulenev@google.com> | 2018-07-16 15:06:57 -0700 |
commit | e204ecdaafa6c5642a4286a1ffb19e9964e32201 (patch) | |
tree | c0152261d685f2e64d11a27395934763f400edf6 /unsupported/Eigen | |
parent | b324ed55d969b28ff84343b0840137a6b56300f1 (diff) |
Remove SimpleThreadPool and always use {NonBlocking}ThreadPool
Diffstat (limited to 'unsupported/Eigen')
5 files changed, 8 insertions, 515 deletions
diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool index c34614194..cbb3bbf2c 100644 --- a/unsupported/Eigen/CXX11/ThreadPool +++ b/unsupported/Eigen/CXX11/ThreadPool @@ -55,21 +55,8 @@ #include "src/ThreadPool/RunQueue.h" #include "src/ThreadPool/ThreadPoolInterface.h" #include "src/ThreadPool/ThreadEnvironment.h" -#include "src/ThreadPool/SimpleThreadPool.h" #include "src/ThreadPool/NonBlockingThreadPool.h" - -// Use the more efficient NonBlockingThreadPool by default. -namespace Eigen { -#ifndef EIGEN_USE_SIMPLE_THREAD_POOL -template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>; -typedef NonBlockingThreadPool ThreadPool; -#else -template <typename Env> using ThreadPoolTempl = SimpleThreadPoolTempl<Env>; -typedef SimpleThreadPool ThreadPool; -#endif -} // namespace Eigen - #endif #include <Eigen/src/Core/util/ReenableStupidWarnings.h> diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h index d7536bd6a..c774d15e2 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h @@ -15,47 +15,6 @@ namespace Eigen { -#ifdef EIGEN_USE_SIMPLE_THREAD_POOL -namespace internal { - -template<typename LhsScalar, typename LhsMapper, typename Index> -struct packLhsArg { - LhsScalar* blockA; - const LhsMapper& lhs; - const Index m_start; - const Index k_start; - const Index mc; - const Index kc; -}; - -template<typename LhsScalar, typename RhsScalar, typename RhsMapper, typename OutputMapper, typename Index> -struct packRhsAndKernelArg { - const MaxSizeVector<LhsScalar*>* blockAs; - RhsScalar* blockB; - const RhsMapper& rhs; - OutputMapper& output; - const Index m; - const Index k; - const Index n; - const Index mc; - const Index kc; - const Index nc; - const Index num_threads; - const Index num_blockAs; - const Index max_m; - const Index k_block_idx; - const Index m_block_idx; - const Index n_block_idx; - const Index m_blocks; - const Index n_blocks; - MaxSizeVector<Notification*>* kernel_notifications; - const MaxSizeVector<Notification*>* lhs_notifications; - const bool need_to_pack; -}; - -} // end namespace internal -#endif // EIGEN_USE_SIMPLE_THREAD_POOL - template<typename Indices, typename LeftArgType, typename RightArgType, typename OutputKernelType> struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgType, OutputKernelType>, ThreadPoolDevice> : public TensorContractionEvaluatorBase<TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgType, OutputKernelType>, ThreadPoolDevice> > { @@ -112,7 +71,6 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT TensorEvaluator(const XprType& op, const Device& device) : Base(op, device) {} -#ifndef EIGEN_USE_SIMPLE_THREAD_POOL template <bool lhs_inner_dim_contiguous, bool rhs_inner_dim_contiguous, bool rhs_inner_dim_reordered, int Alignment> void evalProduct(Scalar* buffer) const { @@ -763,288 +721,6 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT return 0; } -#else // EIGEN_USE_SIMPLE_THREAD_POOL - // TODO(ezhulenev): SimpleThreadPool will be removed in the future, and seems - // like it's not worth adding output kernel support here. - static_assert(std::is_same<OutputKernelType, const NoOpOutputKernel>::value, - "SimpleThreadPool does not support contraction output kernels."); - - template <bool lhs_inner_dim_contiguous, bool rhs_inner_dim_contiguous, bool rhs_inner_dim_reordered, int Alignment> - void evalProduct(Scalar* buffer) const { - if (this->m_j_size == 1) { - this->template evalGemv<lhs_inner_dim_contiguous, rhs_inner_dim_contiguous, rhs_inner_dim_reordered, Alignment>(buffer); - return; - } - - evalGemm<lhs_inner_dim_contiguous, rhs_inner_dim_contiguous, rhs_inner_dim_reordered, Alignment>(buffer); - } - - template <bool lhs_inner_dim_contiguous, bool rhs_inner_dim_contiguous, bool rhs_inner_dim_reordered, int Alignment> - void evalGemm(Scalar* buffer) const { - // columns in left side, rows in right side - const Index k = this->m_k_size; - - // rows in left side - const Index m = this->m_i_size; - - // columns in right side - const Index n = this->m_j_size; - - // zero out the result buffer (which must be of size at least m * n * sizeof(Scalar) - this->m_device.memset(buffer, 0, m * n * sizeof(Scalar)); - - - const int lhs_packet_size = internal::unpacket_traits<typename LeftEvaluator::PacketReturnType>::size; - const int rhs_packet_size = internal::unpacket_traits<typename RightEvaluator::PacketReturnType>::size; - - typedef internal::TensorContractionInputMapper<LhsScalar, Index, internal::Lhs, - LeftEvaluator, left_nocontract_t, - contract_t, lhs_packet_size, - lhs_inner_dim_contiguous, - false, Unaligned> LhsMapper; - - typedef internal::TensorContractionInputMapper<RhsScalar, Index, internal::Rhs, - RightEvaluator, right_nocontract_t, - contract_t, rhs_packet_size, - rhs_inner_dim_contiguous, - rhs_inner_dim_reordered, Unaligned> RhsMapper; - - typedef internal::blas_data_mapper<Scalar, Index, ColMajor> OutputMapper; - - // TODO: packing could be faster sometimes if we supported row major tensor mappers - typedef internal::gemm_pack_lhs<LhsScalar, Index, typename LhsMapper::SubMapper, Traits::mr, - Traits::LhsProgress, ColMajor> LhsPacker; - typedef internal::gemm_pack_rhs<RhsScalar, Index, typename RhsMapper::SubMapper, Traits::nr, ColMajor> RhsPacker; - - // TODO: replace false, false with conjugate values? - typedef internal::gebp_kernel<LhsScalar, RhsScalar, Index, OutputMapper, - Traits::mr, Traits::nr, false, false> GebpKernel; - - typedef internal::packLhsArg<LhsScalar, LhsMapper, Index> packLArg; - typedef internal::packRhsAndKernelArg<LhsScalar, RhsScalar, RhsMapper, OutputMapper, Index> packRKArg; - - // initialize data mappers - LhsMapper lhs(this->m_leftImpl, this->m_left_nocontract_strides, this->m_i_strides, - this->m_left_contracting_strides, this->m_k_strides); - - RhsMapper rhs(this->m_rightImpl, this->m_right_nocontract_strides, this->m_j_strides, - this->m_right_contracting_strides, this->m_k_strides); - - OutputMapper output(buffer, m); - - // compute block sizes (which depend on number of threads) - const Index num_threads = this->m_device.numThreads(); - internal::TensorContractionBlocking<LhsMapper, RhsMapper, Index, internal::ShardByCol> blocking(k, m, n, num_threads); - Index mc = blocking.mc(); - Index nc = blocking.nc(); - Index kc = blocking.kc(); - eigen_assert(mc <= m); - eigen_assert(nc <= n); - eigen_assert(kc <= k); - -#define CEIL_DIV(a, b) (((a) + (b) - 1) / (b)) - const Index k_blocks = CEIL_DIV(k, kc); - const Index n_blocks = CEIL_DIV(n, nc); - const Index m_blocks = CEIL_DIV(m, mc); - const Index sizeA = mc * kc; - const Index sizeB = kc * nc; - - /* cout << "m: " << m << " n: " << n << " k: " << k << endl; - cout << "mc: " << mc << " nc: " << nc << " kc: " << kc << endl; - cout << "m_blocks: " << m_blocks << " n_blocks: " << n_blocks << " k_blocks: " << k_blocks << endl; - cout << "num threads: " << num_threads << endl; - */ - - // note: m_device.allocate should return 16 byte aligned pointers, but if blockA and blockB - // aren't 16 byte aligned segfaults will happen due to SIMD instructions - // note: You can get away with allocating just a single blockA and offsets and meet the - // the alignment requirements with the assumption that - // (Traits::mr * sizeof(ResScalar)) % 16 == 0 - const Index numBlockAs = numext::mini(num_threads, m_blocks); - MaxSizeVector<LhsScalar *> blockAs(num_threads); - for (int i = 0; i < num_threads; i++) { - blockAs.push_back(static_cast<LhsScalar *>(this->m_device.allocate(sizeA * sizeof(LhsScalar)))); - } - - // To circumvent alignment issues, I'm just going to separately allocate the memory for each thread - // TODO: is this too much memory to allocate? This simplifies coding a lot, but is wasteful. - // Other options: (1) reuse memory when a thread finishes. con: tricky - // (2) allocate block B memory in each thread. con: overhead - MaxSizeVector<RhsScalar *> blockBs(n_blocks); - for (int i = 0; i < n_blocks; i++) { - blockBs.push_back(static_cast<RhsScalar *>(this->m_device.allocate(sizeB * sizeof(RhsScalar)))); - } - - // lhs_notifications starts with all null Notifications - MaxSizeVector<Notification*> lhs_notifications(num_threads, nullptr); - - // this should really be numBlockAs * n_blocks; - const Index num_kernel_notifications = num_threads * n_blocks; - MaxSizeVector<Notification*> kernel_notifications(num_kernel_notifications, - nullptr); - - for (Index k_block_idx = 0; k_block_idx < k_blocks; k_block_idx++) { - const Index k_start = k_block_idx * kc; - // make sure we don't overshoot right edge of left matrix - const Index actual_kc = numext::mini(k_start + kc, k) - k_start; - - for (Index m_block_idx = 0; m_block_idx < m_blocks; m_block_idx += numBlockAs) { - const Index num_blocks = numext::mini(m_blocks-m_block_idx, numBlockAs); - - for (Index mt_block_idx = m_block_idx; mt_block_idx < m_block_idx+num_blocks; mt_block_idx++) { - const Index m_start = mt_block_idx * mc; - const Index actual_mc = numext::mini(m_start + mc, m) - m_start; - eigen_assert(actual_mc > 0); - - Index blockAId = (k_block_idx * m_blocks + mt_block_idx) % num_threads; - - for (int i = 0; i < n_blocks; ++i) { - Index notification_id = (blockAId * n_blocks + i); - // Wait for any current kernels using this slot to complete - // before using it. - if (kernel_notifications[notification_id]) { - wait_until_ready(kernel_notifications[notification_id]); - delete kernel_notifications[notification_id]; - } - kernel_notifications[notification_id] = new Notification(); - } - const packLArg arg = { - blockAs[blockAId], // blockA - lhs, // lhs - m_start, // m - k_start, // k - actual_mc, // mc - actual_kc, // kc - }; - - // Delete any existing notification since we may be - // replacing it. The algorithm should ensure that there are - // no existing waiters on this notification. - delete lhs_notifications[blockAId]; - lhs_notifications[blockAId] = - this->m_device.enqueue(&Self::packLhs<packLArg, LhsPacker>, arg); - } - - // now start kernels. - const Index m_base_start = m_block_idx * mc; - const bool need_to_pack = m_block_idx == 0; - - for (Index n_block_idx = 0; n_block_idx < n_blocks; n_block_idx++) { - const Index n_start = n_block_idx * nc; - const Index actual_nc = numext::mini(n_start + nc, n) - n_start; - - // first make sure the previous kernels are all done before overwriting rhs. Also wait if - // we're going to start new k. In both cases need_to_pack is true. - if (need_to_pack) { - for (Index i = num_blocks; i < num_threads; ++i) { - Index blockAId = (k_block_idx * m_blocks + i + m_block_idx) % num_threads; - Index future_id = (blockAId * n_blocks + n_block_idx); - wait_until_ready(kernel_notifications[future_id]); - } - } - - packRKArg arg = { - &blockAs, // blockA - blockBs[n_block_idx], // blockB - rhs, // rhs - output, // output - m_base_start, // m - k_start, // k - n_start, // n - mc, // mc - actual_kc, // kc - actual_nc, // nc - num_threads, - numBlockAs, - m, - k_block_idx, - m_block_idx, - n_block_idx, // n_block_idx - m_blocks, // m_blocks - n_blocks, // n_blocks - &kernel_notifications, // kernel notifications - &lhs_notifications, // lhs notifications - need_to_pack, // need_to_pack - }; - - // We asynchronously kick off this function, which ends up - // notifying the appropriate kernel_notifications objects, - // which this thread waits on before exiting. - this->m_device.enqueueNoNotification(&Self::packRhsAndKernel<packRKArg, RhsPacker, GebpKernel>, arg); - } - } - } - - // Make sure all the kernels are done. - for (size_t i = 0; i < kernel_notifications.size(); ++i) { - wait_until_ready(kernel_notifications[i]); - delete kernel_notifications[i]; - } - - // No need to wait for lhs notifications since they should have - // already been waited on. Just clean them up. - for (size_t i = 0; i < lhs_notifications.size(); ++i) { - delete lhs_notifications[i]; - } - - // deallocate all of the memory for both A and B's - for (size_t i = 0; i < blockAs.size(); i++) { - this->m_device.deallocate(blockAs[i]); - } - for (size_t i = 0; i < blockBs.size(); i++) { - this->m_device.deallocate(blockBs[i]); - } - -#undef CEIL_DIV - } - - /* - * Packs a LHS block of size (mt, kc) starting at lhs(m, k). Before packing - * the LHS block, check that all of the kernels that worked on the same - * mt_block_idx in the previous m_block are done. - */ - template <typename packLArg, typename LhsPacker> - static void packLhs(const packLArg arg) { - // perform actual packing - LhsPacker pack_lhs; - pack_lhs(arg.blockA, arg.lhs.getSubMapper(arg.m_start, arg.k_start), arg.kc, arg.mc); - } - - /* - * Packs a RHS block of size (kc, nc) starting at (k, n) after checking that - * all kernels in the previous block are done. - * Then for each LHS future, we wait on the future and then call GEBP - * on the area packed by the future (which starts at - * blockA + future_idx * mt * kc) on the LHS and with the full packed - * RHS block. - * The output of this GEBP is written to output(m + i * mt, n). - */ - template <typename packRKArg, typename RhsPacker, typename GebpKernel> - static void packRhsAndKernel(packRKArg arg) { - if (arg.need_to_pack) { - RhsPacker pack_rhs; - pack_rhs(arg.blockB, arg.rhs.getSubMapper(arg.k, arg.n), arg.kc, arg.nc); - } - - GebpKernel gebp; - for (Index mt_block_idx = 0; mt_block_idx < arg.num_blockAs; mt_block_idx++) { - const Index m_base_start = arg.m + arg.mc*mt_block_idx; - if (m_base_start < arg.max_m) { - Index blockAId = (arg.k_block_idx * arg.m_blocks + mt_block_idx + arg.m_block_idx) % arg.num_threads; - wait_until_ready((*arg.lhs_notifications)[blockAId]); - const Index actual_mc = numext::mini(m_base_start + arg.mc, arg.max_m) - m_base_start; - gebp(arg.output.getSubMapper(m_base_start, arg.n), - (*arg.blockAs)[blockAId], arg.blockB, - actual_mc, arg.kc, arg.nc, Scalar(1), -1, -1, 0, 0); - - // Notify that the kernel is done. - const Index set_idx = blockAId * arg.n_blocks + arg.n_block_idx; - (*arg.kernel_notifications)[set_idx]->Notify(); - } - } - } -#endif // EIGEN_USE_SIMPLE_THREAD_POOL - TensorOpCost contractionCost(Index m, Index n, Index bm, Index bn, Index bk, bool shard_by_col, bool prepacked) const { const int packed_size = std::min<int>(PacketType<LhsScalar, Device>::size, diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h index 1181c2753..53640c6aa 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h @@ -150,13 +150,6 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable> { if (needs_assign) { const Index size = array_prod(evaluator.dimensions()); -#if !defined(EIGEN_USE_SIMPLE_THREAD_POOL) - device.parallelFor(size, evaluator.costPerCoeff(Vectorizable), - EvalRange<Evaluator, Index, Vectorizable>::alignBlockSize, - [&evaluator](Index first, Index last) { - EvalRange<Evaluator, Index, Vectorizable>::run(&evaluator, first, last); - }); -#else size_t num_threads = device.numThreads(); if (num_threads > 1) { num_threads = TensorCostModel<ThreadPoolDevice>::numThreads( @@ -182,7 +175,6 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable> { } barrier.Wait(); } -#endif // defined(!EIGEN_USE_SIMPLE_THREAD_POOL) } evaluator.cleanup(); } diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index 1264a0270..ecd49f382 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -14,15 +14,15 @@ namespace Eigen { template <typename Environment> -class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { +class ThreadPoolTempl : public Eigen::ThreadPoolInterface { public: typedef typename Environment::Task Task; typedef RunQueue<Task, 1024> Queue; - NonBlockingThreadPoolTempl(int num_threads, Environment env = Environment()) - : NonBlockingThreadPoolTempl(num_threads, true, env) {} + ThreadPoolTempl(int num_threads, Environment env = Environment()) + : ThreadPoolTempl(num_threads, true, env) {} - NonBlockingThreadPoolTempl(int num_threads, bool allow_spinning, + ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env = Environment()) : env_(env), num_threads_(num_threads), @@ -66,7 +66,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { } } - ~NonBlockingThreadPoolTempl() { + ~ThreadPoolTempl() { done_ = true; // Now if all threads block without work, they will start exiting. @@ -136,7 +136,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { int CurrentThreadId() const final { const PerThread* pt = - const_cast<NonBlockingThreadPoolTempl*>(this)->GetPerThread(); + const_cast<ThreadPoolTempl*>(this)->GetPerThread(); if (pt->pool == this) { return pt->thread_id; } else { @@ -149,7 +149,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { struct PerThread { constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { } - NonBlockingThreadPoolTempl* pool; // Parent pool, or null for normal threads. + ThreadPoolTempl* pool; // Parent pool, or null for normal threads. uint64_t rand; // Random generator state. int thread_id; // Worker thread index in pool. }; @@ -337,7 +337,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { } }; -typedef NonBlockingThreadPoolTempl<StlThreadEnvironment> NonBlockingThreadPool; +typedef ThreadPoolTempl<StlThreadEnvironment> ThreadPool; } // namespace Eigen diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h deleted file mode 100644 index 335728665..000000000 --- a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h +++ /dev/null @@ -1,162 +0,0 @@ -// This file is part of Eigen, a lightweight C++ template library -// for linear algebra. -// -// Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com> -// -// This Source Code Form is subject to the terms of the Mozilla -// Public License v. 2.0. If a copy of the MPL was not distributed -// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. - -#ifndef EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H -#define EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H - -namespace Eigen { - -// The implementation of the ThreadPool type ensures that the Schedule method -// runs the functions it is provided in FIFO order when the scheduling is done -// by a single thread. -// Environment provides a way to create threads and also allows to intercept -// task submission and execution. -template <typename Environment> -class SimpleThreadPoolTempl : public ThreadPoolInterface { - public: - // Construct a pool that contains "num_threads" threads. - explicit SimpleThreadPoolTempl(int num_threads, Environment env = Environment()) - : env_(env), threads_(num_threads), waiters_(num_threads) { - for (int i = 0; i < num_threads; i++) { - threads_.push_back(env.CreateThread([this, i]() { WorkerLoop(i); })); - } - } - - // Wait until all scheduled work has finished and then destroy the - // set of threads. - ~SimpleThreadPoolTempl() { - { - // Wait for all work to get done. - std::unique_lock<std::mutex> l(mu_); - while (!pending_.empty()) { - empty_.wait(l); - } - exiting_ = true; - - // Wakeup all waiters. - for (auto w : waiters_) { - w->ready = true; - w->task.f = nullptr; - w->cv.notify_one(); - } - } - - // Wait for threads to finish. - for (auto t : threads_) { - delete t; - } - } - - // Schedule fn() for execution in the pool of threads. The functions are - // executed in the order in which they are scheduled. - void Schedule(std::function<void()> fn) final { - Task t = env_.CreateTask(std::move(fn)); - std::unique_lock<std::mutex> l(mu_); - if (waiters_.empty()) { - pending_.push_back(std::move(t)); - } else { - Waiter* w = waiters_.back(); - waiters_.pop_back(); - w->ready = true; - w->task = std::move(t); - w->cv.notify_one(); - } - } - - void Cancel() { -#ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION - for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->OnCancel(); - } -#endif - } - - int NumThreads() const final { - return static_cast<int>(threads_.size()); - } - - int CurrentThreadId() const final { - const PerThread* pt = this->GetPerThread(); - if (pt->pool == this) { - return pt->thread_id; - } else { - return -1; - } - } - - protected: - void WorkerLoop(int thread_id) { - std::unique_lock<std::mutex> l(mu_); - PerThread* pt = GetPerThread(); - pt->pool = this; - pt->thread_id = thread_id; - Waiter w; - Task t; - while (!exiting_) { - if (pending_.empty()) { - // Wait for work to be assigned to me - w.ready = false; - waiters_.push_back(&w); - while (!w.ready) { - w.cv.wait(l); - } - t = w.task; - w.task.f = nullptr; - } else { - // Pick up pending work - t = std::move(pending_.front()); - pending_.pop_front(); - if (pending_.empty()) { - empty_.notify_all(); - } - } - if (t.f) { - mu_.unlock(); - env_.ExecuteTask(t); - t.f = nullptr; - mu_.lock(); - } - } - } - - private: - typedef typename Environment::Task Task; - typedef typename Environment::EnvThread Thread; - - struct Waiter { - std::condition_variable cv; - Task task; - bool ready; - }; - - struct PerThread { - constexpr PerThread() : pool(NULL), thread_id(-1) { } - SimpleThreadPoolTempl* pool; // Parent pool, or null for normal threads. - int thread_id; // Worker thread index in pool. - }; - - Environment env_; - std::mutex mu_; - MaxSizeVector<Thread*> threads_; // All threads - MaxSizeVector<Waiter*> waiters_; // Stack of waiting threads. - std::deque<Task> pending_; // Queue of pending work - std::condition_variable empty_; // Signaled on pending_.empty() - bool exiting_ = false; - - PerThread* GetPerThread() const { - EIGEN_THREAD_LOCAL PerThread per_thread; - return &per_thread; - } -}; - -typedef SimpleThreadPoolTempl<StlThreadEnvironment> SimpleThreadPool; - -} // namespace Eigen - -#endif // EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H |