/* * Copyright 2012 Google Inc. * * Use of this source code is governed by a BSD-style license that can be * found in the LICENSE file. */ #ifndef SkThreadPool_DEFINED #define SkThreadPool_DEFINED #include "SkCondVar.h" #include "SkRunnable.h" #include "SkTDArray.h" #include "SkTInternalLList.h" #include "SkThreadUtils.h" #include "SkTypes.h" #if defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUILD_FOR_ANDROID) # include #endif // Returns the number of cores on this machine. static inline int num_cores() { #if defined(SK_BUILD_FOR_WIN32) SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); return sysinfo.dwNumberOfProcessors; #elif defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUILD_FOR_ANDROID) return (int) sysconf(_SC_NPROCESSORS_ONLN); #else return 1; #endif } template class SkTThreadPool { public: /** * Create a threadpool with count threads, or one thread per core if kThreadPerCore. */ static const int kThreadPerCore = -1; explicit SkTThreadPool(int count); ~SkTThreadPool(); /** * Queues up an SkRunnable to run when a thread is available, or synchronously if count is 0. * Does not take ownership. NULL is a safe no-op. If T is not void, the runnable will be passed * a reference to a T on the thread's local stack. */ void add(SkTRunnable*); /** * Same as add, but adds the runnable as the very next to run rather than enqueueing it. */ void addNext(SkTRunnable*); /** * Block until all added SkRunnables have completed. Once called, calling add() is undefined. */ void wait(); private: struct LinkedRunnable { SkTRunnable* fRunnable; // Unowned. SK_DECLARE_INTERNAL_LLIST_INTERFACE(LinkedRunnable); }; enum State { kRunning_State, // Normal case. We've been constructed and no one has called wait(). kWaiting_State, // wait has been called, but there still might be work to do or being done. kHalting_State, // There's no work to do and no thread is busy. All threads can shut down. }; void addSomewhere(SkTRunnable* r, void (SkTInternalLList::*)(LinkedRunnable*)); SkTInternalLList fQueue; SkCondVar fReady; SkTDArray fThreads; State fState; int fBusyThreads; static void Loop(void*); // Static because we pass in this. }; template SkTThreadPool::SkTThreadPool(int count) : fState(kRunning_State), fBusyThreads(0) { if (count < 0) { count = num_cores(); } // Create count threads, all running SkTThreadPool::Loop. for (int i = 0; i < count; i++) { SkThread* thread = SkNEW_ARGS(SkThread, (&SkTThreadPool::Loop, this)); *fThreads.append() = thread; thread->start(); } } template SkTThreadPool::~SkTThreadPool() { if (kRunning_State == fState) { this->wait(); } } namespace SkThreadPoolPrivate { template struct ThreadLocal { void run(SkTRunnable* r) { r->run(data); } T data; }; template <> struct ThreadLocal { void run(SkTRunnable* r) { r->run(); } }; } // namespace SkThreadPoolPrivate template void SkTThreadPool::addSomewhere(SkTRunnable* r, void (SkTInternalLList::* f)(LinkedRunnable*)) { if (r == NULL) { return; } if (fThreads.isEmpty()) { SkThreadPoolPrivate::ThreadLocal threadLocal; threadLocal.run(r); return; } LinkedRunnable* linkedRunnable = SkNEW(LinkedRunnable); linkedRunnable->fRunnable = r; fReady.lock(); SkASSERT(fState != kHalting_State); // Shouldn't be able to add work when we're halting. (fQueue.*f)(linkedRunnable); fReady.signal(); fReady.unlock(); } template void SkTThreadPool::add(SkTRunnable* r) { this->addSomewhere(r, &SkTInternalLList::addToTail); } template void SkTThreadPool::addNext(SkTRunnable* r) { this->addSomewhere(r, &SkTInternalLList::addToHead); } template void SkTThreadPool::wait() { fReady.lock(); fState = kWaiting_State; fReady.broadcast(); fReady.unlock(); // Wait for all threads to stop. for (int i = 0; i < fThreads.count(); i++) { fThreads[i]->join(); SkDELETE(fThreads[i]); } SkASSERT(fQueue.isEmpty()); } template /*static*/ void SkTThreadPool::Loop(void* arg) { // The SkTThreadPool passes itself as arg to each thread as they're created. SkTThreadPool* pool = static_cast*>(arg); SkThreadPoolPrivate::ThreadLocal threadLocal; while (true) { // We have to be holding the lock to read the queue and to call wait. pool->fReady.lock(); while(pool->fQueue.isEmpty()) { // Does the client want to stop and are all the threads ready to stop? // If so, we move into the halting state, and whack all the threads so they notice. if (kWaiting_State == pool->fState && pool->fBusyThreads == 0) { pool->fState = kHalting_State; pool->fReady.broadcast(); } // Any time we find ourselves in the halting state, it's quitting time. if (kHalting_State == pool->fState) { pool->fReady.unlock(); return; } // wait yields the lock while waiting, but will have it again when awoken. pool->fReady.wait(); } // We've got the lock back here, no matter if we ran wait or not. // The queue is not empty, so we have something to run. Claim it. LinkedRunnable* r = pool->fQueue.head(); pool->fQueue.remove(r); // Having claimed our SkRunnable, we now give up the lock while we run it. // Otherwise, we'd only ever do work on one thread at a time, which rather // defeats the point of this code. pool->fBusyThreads++; pool->fReady.unlock(); // OK, now really do the work. threadLocal.run(r->fRunnable); SkDELETE(r); // Let everyone know we're not busy. pool->fReady.lock(); pool->fBusyThreads--; pool->fReady.unlock(); } SkASSERT(false); // Unreachable. The only exit happens when pool->fState is kHalting_State. } typedef SkTThreadPool SkThreadPool; #endif