aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/SkTaskGroup.cpp
diff options
context:
space:
mode:
authorGravatar Mike Klein <mtklein@chromium.org>2017-02-21 22:53:16 -0500
committerGravatar Skia Commit-Bot <skia-commit-bot@chromium.org>2017-02-22 16:17:39 +0000
commit384b90af5ccdbb071f32e20b382f41351b2a0b69 (patch)
tree572dfe34b3cc547b8f12a379f0a5aa134095d7d3 /src/core/SkTaskGroup.cpp
parentd44dd4c35660863db8baeedd90fd401ed921db8a (diff)
SkExecutor
Refactoring to refamiliarize myself with SkTaskGroup and SkThreadPool. This adds an SkExecutor interface to describe how we use SkThreadPool, with a global setter and getter for a default instance. Then I rewrote SkTaskGroup to work with any executor, the global default by default. I also think I've made the SkTaskGroup::wait() borrow logic clearer with the addition of SkSemaphore::try_wait(). This lets me keep the semaphore count and actual count of work in perfect sync. Change-Id: I6bbdfaeb0e2c3a43daff6192d34bc4a3f7210178 Reviewed-on: https://skia-review.googlesource.com/8836 Reviewed-by: Mike Reed <reed@google.com> Reviewed-by: Herb Derby <herb@google.com> Commit-Queue: Mike Klein <mtklein@chromium.org>
Diffstat (limited to 'src/core/SkTaskGroup.cpp')
-rw-r--r--src/core/SkTaskGroup.cpp219
1 files changed, 28 insertions, 191 deletions
diff --git a/src/core/SkTaskGroup.cpp b/src/core/SkTaskGroup.cpp
index d151510cfa..78ab71c79d 100644
--- a/src/core/SkTaskGroup.cpp
+++ b/src/core/SkTaskGroup.cpp
@@ -5,206 +5,43 @@
* found in the LICENSE file.
*/
-#include "SkLeanWindows.h"
-#include "SkOnce.h"
-#include "SkSemaphore.h"
-#include "SkSpinlock.h"
-#include "SkTArray.h"
-#include "SkTDArray.h"
+#include "SkExecutor.h"
#include "SkTaskGroup.h"
-#include "SkThreadUtils.h"
-#if defined(SK_BUILD_FOR_WIN32)
- static void query_num_cores(int* cores) {
- SYSTEM_INFO sysinfo;
- GetNativeSystemInfo(&sysinfo);
- *cores = sysinfo.dwNumberOfProcessors;
- }
-#else
- #include <unistd.h>
- static void query_num_cores(int* cores) {
- *cores = (int)sysconf(_SC_NPROCESSORS_ONLN);
- }
-#endif
+SkTaskGroup::SkTaskGroup(SkExecutor& executor) : fPending(0), fExecutor(executor) {}
-static int num_cores() {
- // We cache num_cores() so we only query the OS once.
- static int cores = 0;
- static SkOnce once;
- once(query_num_cores, &cores);
- SkASSERT(cores > 0);
- return cores;
+void SkTaskGroup::add(std::function<void(void)> fn) {
+ fPending.fetch_add(+1, sk_memory_order_relaxed);
+ fExecutor.add([=] {
+ fn();
+ fPending.fetch_add(-1, sk_memory_order_release);
+ });
}
-namespace {
-
-class ThreadPool : SkNoncopyable {
-public:
- static void Add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) {
- if (!gGlobal) {
- return fn();
- }
- gGlobal->add(fn, pending);
- }
-
- static void Batch(int N, std::function<void(int)> fn, SkAtomic<int32_t>* pending) {
- if (!gGlobal) {
- for (int i = 0; i < N; i++) { fn(i); }
- return;
- }
- gGlobal->batch(N, fn, pending);
- }
-
- static void Wait(SkAtomic<int32_t>* pending) {
- if (!gGlobal) { // If we have no threads, the work must already be done.
- SkASSERT(pending->load(sk_memory_order_relaxed) == 0);
- return;
- }
- // Acquire pairs with decrement release here or in Loop.
- while (pending->load(sk_memory_order_acquire) > 0) {
- // Lend a hand until our SkTaskGroup of interest is done.
- Work work;
- {
- // We're stealing work opportunistically,
- // so we never call fWorkAvailable.wait(), which could sleep us if there's no work.
- // This means fWorkAvailable is only an upper bound on fWork.count().
- AutoLock lock(&gGlobal->fWorkLock);
- if (gGlobal->fWork.empty()) {
- // Someone has picked up all the work (including ours). How nice of them!
- // (They may still be working on it, so we can't assert *pending == 0 here.)
- continue;
- }
- work = gGlobal->fWork.back();
- gGlobal->fWork.pop_back();
- }
- // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
- // We threads gotta stick together. We're always making forward progress.
- work.fn();
- work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load above.
- }
- }
-
-private:
- struct AutoLock {
- AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); }
- ~AutoLock() { fLock->release(); }
- private:
- SkSpinlock* fLock;
- };
-
- struct Work {
- std::function<void(void)> fn; // A function to call
- SkAtomic<int32_t>* pending; // then decrement pending afterwards.
- };
-
- explicit ThreadPool(int threads) {
- if (threads == -1) {
- threads = num_cores();
- }
- for (int i = 0; i < threads; i++) {
- fThreads.push(new SkThread(&ThreadPool::Loop, this));
- fThreads.top()->start();
- }
- }
-
- ~ThreadPool() {
- SkASSERT(fWork.empty()); // All SkTaskGroups should be destroyed by now.
-
- // Send a poison pill to each thread.
- SkAtomic<int> dummy(0);
- for (int i = 0; i < fThreads.count(); i++) {
- this->add(nullptr, &dummy);
- }
- // Wait for them all to swallow the pill and die.
- for (int i = 0; i < fThreads.count(); i++) {
- fThreads[i]->join();
- }
- SkASSERT(fWork.empty()); // Can't hurt to double check.
- fThreads.deleteAll();
- }
-
- void add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) {
- Work work = { fn, pending };
- pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed.
- {
- AutoLock lock(&fWorkLock);
- fWork.push_back(work);
- }
- fWorkAvailable.signal(1);
- }
-
- void batch(int N, std::function<void(int)> fn, SkAtomic<int32_t>* pending) {
- pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed.
- {
- AutoLock lock(&fWorkLock);
- for (int i = 0; i < N; i++) {
- Work work = { [i, fn]() { fn(i); }, pending };
- fWork.push_back(work);
- }
- }
- fWorkAvailable.signal(N);
+void SkTaskGroup::batch(int N, std::function<void(int)> fn) {
+ // TODO: I really thought we had some sort of more clever chunking logic.
+ fPending.fetch_add(+N, sk_memory_order_relaxed);
+ for (int i = 0; i < N; i++) {
+ fExecutor.add([=] {
+ fn(i);
+ fPending.fetch_add(-1, sk_memory_order_release);
+ });
}
+}
- static void Loop(void* arg) {
- ThreadPool* pool = (ThreadPool*)arg;
- Work work;
- while (true) {
- // Sleep until there's work available, and claim one unit of Work as we wake.
- pool->fWorkAvailable.wait();
- {
- AutoLock lock(&pool->fWorkLock);
- if (pool->fWork.empty()) {
- // Someone in Wait() stole our work (fWorkAvailable is an upper bound).
- // Well, that's fine, back to sleep for us.
- continue;
- }
- work = pool->fWork.back();
- pool->fWork.pop_back();
- }
- if (!work.fn) {
- return; // Poison pill. Time... to die.
- }
- work.fn();
- work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait().
- }
+void SkTaskGroup::wait() {
+ // Actively help the executor do work until our task group is done.
+ // This lets SkTaskGroups nest arbitrarily deep on a single SkExecutor:
+ // no thread ever blocks waiting for others to do its work.
+ // (We may end up doing work that's not part of our task group. That's fine.)
+ while (fPending.load(sk_memory_order_acquire) > 0) {
+ fExecutor.borrow();
}
-
- // fWorkLock must be held when reading or modifying fWork.
- SkSpinlock fWorkLock;
- SkTArray<Work> fWork;
-
- // A thread-safe upper bound for fWork.count().
- //
- // We'd have it be an exact count but for the loop in Wait():
- // we never want that to block, so it can't call fWorkAvailable.wait(),
- // and that's the only way to decrement fWorkAvailable.
- // So fWorkAvailable may overcount actual the work available.
- // We make do, but this means some worker threads may wake spuriously.
- SkSemaphore fWorkAvailable;
-
- // These are only changed in a single-threaded context.
- SkTDArray<SkThread*> fThreads;
- static ThreadPool* gGlobal;
-
- friend struct SkTaskGroup::Enabler;
-};
-ThreadPool* ThreadPool::gGlobal = nullptr;
-
-} // namespace
+}
SkTaskGroup::Enabler::Enabler(int threads) {
- SkASSERT(ThreadPool::gGlobal == nullptr);
- if (threads != 0) {
- ThreadPool::gGlobal = new ThreadPool(threads);
+ if (threads) {
+ fThreadPool = SkExecutor::MakeThreadPool(threads);
+ SkExecutor::SetDefault(fThreadPool.get());
}
}
-
-SkTaskGroup::Enabler::~Enabler() { delete ThreadPool::gGlobal; }
-
-SkTaskGroup::SkTaskGroup() : fPending(0) {}
-
-void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); }
-void SkTaskGroup::add(std::function<void(void)> fn) { ThreadPool::Add(fn, &fPending); }
-void SkTaskGroup::batch(int N, std::function<void(int)> fn) {
- ThreadPool::Batch(N, fn, &fPending);
-}