diff options
author | Mike Klein <mtklein@chromium.org> | 2017-02-21 22:53:16 -0500 |
---|---|---|
committer | Skia Commit-Bot <skia-commit-bot@chromium.org> | 2017-02-22 16:17:39 +0000 |
commit | 384b90af5ccdbb071f32e20b382f41351b2a0b69 (patch) | |
tree | 572dfe34b3cc547b8f12a379f0a5aa134095d7d3 /src/core/SkTaskGroup.cpp | |
parent | d44dd4c35660863db8baeedd90fd401ed921db8a (diff) |
SkExecutor
Refactoring to refamiliarize myself with SkTaskGroup and SkThreadPool.
This adds an SkExecutor interface to describe how we use SkThreadPool,
with a global setter and getter for a default instance. Then I rewrote
SkTaskGroup to work with any executor, the global default by default.
I also think I've made the SkTaskGroup::wait() borrow logic clearer
with the addition of SkSemaphore::try_wait(). This lets me keep the
semaphore count and actual count of work in perfect sync.
Change-Id: I6bbdfaeb0e2c3a43daff6192d34bc4a3f7210178
Reviewed-on: https://skia-review.googlesource.com/8836
Reviewed-by: Mike Reed <reed@google.com>
Reviewed-by: Herb Derby <herb@google.com>
Commit-Queue: Mike Klein <mtklein@chromium.org>
Diffstat (limited to 'src/core/SkTaskGroup.cpp')
-rw-r--r-- | src/core/SkTaskGroup.cpp | 219 |
1 files changed, 28 insertions, 191 deletions
diff --git a/src/core/SkTaskGroup.cpp b/src/core/SkTaskGroup.cpp index d151510cfa..78ab71c79d 100644 --- a/src/core/SkTaskGroup.cpp +++ b/src/core/SkTaskGroup.cpp @@ -5,206 +5,43 @@ * found in the LICENSE file. */ -#include "SkLeanWindows.h" -#include "SkOnce.h" -#include "SkSemaphore.h" -#include "SkSpinlock.h" -#include "SkTArray.h" -#include "SkTDArray.h" +#include "SkExecutor.h" #include "SkTaskGroup.h" -#include "SkThreadUtils.h" -#if defined(SK_BUILD_FOR_WIN32) - static void query_num_cores(int* cores) { - SYSTEM_INFO sysinfo; - GetNativeSystemInfo(&sysinfo); - *cores = sysinfo.dwNumberOfProcessors; - } -#else - #include <unistd.h> - static void query_num_cores(int* cores) { - *cores = (int)sysconf(_SC_NPROCESSORS_ONLN); - } -#endif +SkTaskGroup::SkTaskGroup(SkExecutor& executor) : fPending(0), fExecutor(executor) {} -static int num_cores() { - // We cache num_cores() so we only query the OS once. - static int cores = 0; - static SkOnce once; - once(query_num_cores, &cores); - SkASSERT(cores > 0); - return cores; +void SkTaskGroup::add(std::function<void(void)> fn) { + fPending.fetch_add(+1, sk_memory_order_relaxed); + fExecutor.add([=] { + fn(); + fPending.fetch_add(-1, sk_memory_order_release); + }); } -namespace { - -class ThreadPool : SkNoncopyable { -public: - static void Add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) { - if (!gGlobal) { - return fn(); - } - gGlobal->add(fn, pending); - } - - static void Batch(int N, std::function<void(int)> fn, SkAtomic<int32_t>* pending) { - if (!gGlobal) { - for (int i = 0; i < N; i++) { fn(i); } - return; - } - gGlobal->batch(N, fn, pending); - } - - static void Wait(SkAtomic<int32_t>* pending) { - if (!gGlobal) { // If we have no threads, the work must already be done. - SkASSERT(pending->load(sk_memory_order_relaxed) == 0); - return; - } - // Acquire pairs with decrement release here or in Loop. - while (pending->load(sk_memory_order_acquire) > 0) { - // Lend a hand until our SkTaskGroup of interest is done. - Work work; - { - // We're stealing work opportunistically, - // so we never call fWorkAvailable.wait(), which could sleep us if there's no work. - // This means fWorkAvailable is only an upper bound on fWork.count(). - AutoLock lock(&gGlobal->fWorkLock); - if (gGlobal->fWork.empty()) { - // Someone has picked up all the work (including ours). How nice of them! - // (They may still be working on it, so we can't assert *pending == 0 here.) - continue; - } - work = gGlobal->fWork.back(); - gGlobal->fWork.pop_back(); - } - // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine. - // We threads gotta stick together. We're always making forward progress. - work.fn(); - work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load above. - } - } - -private: - struct AutoLock { - AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); } - ~AutoLock() { fLock->release(); } - private: - SkSpinlock* fLock; - }; - - struct Work { - std::function<void(void)> fn; // A function to call - SkAtomic<int32_t>* pending; // then decrement pending afterwards. - }; - - explicit ThreadPool(int threads) { - if (threads == -1) { - threads = num_cores(); - } - for (int i = 0; i < threads; i++) { - fThreads.push(new SkThread(&ThreadPool::Loop, this)); - fThreads.top()->start(); - } - } - - ~ThreadPool() { - SkASSERT(fWork.empty()); // All SkTaskGroups should be destroyed by now. - - // Send a poison pill to each thread. - SkAtomic<int> dummy(0); - for (int i = 0; i < fThreads.count(); i++) { - this->add(nullptr, &dummy); - } - // Wait for them all to swallow the pill and die. - for (int i = 0; i < fThreads.count(); i++) { - fThreads[i]->join(); - } - SkASSERT(fWork.empty()); // Can't hurt to double check. - fThreads.deleteAll(); - } - - void add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) { - Work work = { fn, pending }; - pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. - { - AutoLock lock(&fWorkLock); - fWork.push_back(work); - } - fWorkAvailable.signal(1); - } - - void batch(int N, std::function<void(int)> fn, SkAtomic<int32_t>* pending) { - pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. - { - AutoLock lock(&fWorkLock); - for (int i = 0; i < N; i++) { - Work work = { [i, fn]() { fn(i); }, pending }; - fWork.push_back(work); - } - } - fWorkAvailable.signal(N); +void SkTaskGroup::batch(int N, std::function<void(int)> fn) { + // TODO: I really thought we had some sort of more clever chunking logic. + fPending.fetch_add(+N, sk_memory_order_relaxed); + for (int i = 0; i < N; i++) { + fExecutor.add([=] { + fn(i); + fPending.fetch_add(-1, sk_memory_order_release); + }); } +} - static void Loop(void* arg) { - ThreadPool* pool = (ThreadPool*)arg; - Work work; - while (true) { - // Sleep until there's work available, and claim one unit of Work as we wake. - pool->fWorkAvailable.wait(); - { - AutoLock lock(&pool->fWorkLock); - if (pool->fWork.empty()) { - // Someone in Wait() stole our work (fWorkAvailable is an upper bound). - // Well, that's fine, back to sleep for us. - continue; - } - work = pool->fWork.back(); - pool->fWork.pop_back(); - } - if (!work.fn) { - return; // Poison pill. Time... to die. - } - work.fn(); - work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait(). - } +void SkTaskGroup::wait() { + // Actively help the executor do work until our task group is done. + // This lets SkTaskGroups nest arbitrarily deep on a single SkExecutor: + // no thread ever blocks waiting for others to do its work. + // (We may end up doing work that's not part of our task group. That's fine.) + while (fPending.load(sk_memory_order_acquire) > 0) { + fExecutor.borrow(); } - - // fWorkLock must be held when reading or modifying fWork. - SkSpinlock fWorkLock; - SkTArray<Work> fWork; - - // A thread-safe upper bound for fWork.count(). - // - // We'd have it be an exact count but for the loop in Wait(): - // we never want that to block, so it can't call fWorkAvailable.wait(), - // and that's the only way to decrement fWorkAvailable. - // So fWorkAvailable may overcount actual the work available. - // We make do, but this means some worker threads may wake spuriously. - SkSemaphore fWorkAvailable; - - // These are only changed in a single-threaded context. - SkTDArray<SkThread*> fThreads; - static ThreadPool* gGlobal; - - friend struct SkTaskGroup::Enabler; -}; -ThreadPool* ThreadPool::gGlobal = nullptr; - -} // namespace +} SkTaskGroup::Enabler::Enabler(int threads) { - SkASSERT(ThreadPool::gGlobal == nullptr); - if (threads != 0) { - ThreadPool::gGlobal = new ThreadPool(threads); + if (threads) { + fThreadPool = SkExecutor::MakeThreadPool(threads); + SkExecutor::SetDefault(fThreadPool.get()); } } - -SkTaskGroup::Enabler::~Enabler() { delete ThreadPool::gGlobal; } - -SkTaskGroup::SkTaskGroup() : fPending(0) {} - -void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); } -void SkTaskGroup::add(std::function<void(void)> fn) { ThreadPool::Add(fn, &fPending); } -void SkTaskGroup::batch(int N, std::function<void(int)> fn) { - ThreadPool::Batch(N, fn, &fPending); -} |