aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar mtklein <mtklein@chromium.org>2015-06-17 10:50:25 -0700
committerGravatar Commit bot <commit-bot@chromium.org>2015-06-17 10:50:25 -0700
commit61fa22be104a3e98998fa1b24cd47b886df5721c (patch)
tree09bf90b56014bb4f2753efcb3b25e220b6a0f313 /src
parentab80e35fbddd5b534b486cf9d331b5da00e5aa4f (diff)
Add and use SkSemaphore
This allows a faster implementation of our SkTaskGroup thread pool. It also means we don't need SkCondVar (which, remember, isn't supported on XP.) Doing some testing with SampleApp, this really cuts down on the overhead from SkTaskGroup, e.g. 30% to 10%. BUG=skia: Review URL: https://codereview.chromium.org/1192573003
Diffstat (limited to 'src')
-rw-r--r--src/core/SkSemaphore.cpp86
-rw-r--r--src/core/SkSemaphore.h55
-rw-r--r--src/core/SkTaskGroup.cpp75
-rw-r--r--src/utils/SkCondVar.cpp103
-rw-r--r--src/utils/SkCondVar.h74
5 files changed, 188 insertions, 205 deletions
diff --git a/src/core/SkSemaphore.cpp b/src/core/SkSemaphore.cpp
new file mode 100644
index 0000000000..3f19d4fa12
--- /dev/null
+++ b/src/core/SkSemaphore.cpp
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2015 Google Inc.
+ *
+ * Use of this source code is governed by a BSD-style license that can be
+ * found in the LICENSE file.
+ */
+
+#include "SkSemaphore.h"
+
+#if defined(SK_BUILD_FOR_MAC) || defined(SK_BUILD_FOR_IOS)
+ #include <mach/mach.h>
+ struct SkSemaphore::OSSemaphore {
+ semaphore_t fSemaphore;
+
+ OSSemaphore() {
+ semaphore_create(mach_task_self(), &fSemaphore, SYNC_POLICY_FIFO, 0/*initial count*/);
+ }
+ ~OSSemaphore() { semaphore_destroy(mach_task_self(), fSemaphore); }
+
+ void signal(int n) { while (n --> 0) { semaphore_signal(fSemaphore); } }
+ void wait() { semaphore_wait(fSemaphore); }
+ };
+#elif defined(SK_BUILD_FOR_WIN32)
+ struct SkSemaphore::OSSemaphore {
+ HANDLE fSemaphore;
+
+ OSSemaphore() {
+ fSemaphore = CreateSemaphore(NULL /*security attributes, optional*/,
+ 0 /*initial count*/,
+ MAXLONG /*max count*/,
+ NULL /*name, optional*/);
+ }
+ ~OSSemaphore() { CloseHandle(fSemaphore); }
+
+ void signal(int n) {
+ ReleaseSemaphore(fSemaphore, n, NULL/*returns previous count, optional*/);
+ }
+ void wait() { WaitForSingleObject(fSemaphore, INFINITE/*timeout in ms*/); }
+ };
+#else
+ // It's important we test for Mach before this. This code will compile but not work there.
+ #include <errno.h>
+ #include <semaphore.h>
+ struct SkSemaphore::OSSemaphore {
+ sem_t fSemaphore;
+
+ OSSemaphore() { sem_init(&fSemaphore, 0/*cross process?*/, 0/*initial count*/); }
+ ~OSSemaphore() { sem_destroy(&fSemaphore); }
+
+ void signal(int n) { while (n --> 0) { sem_post(&fSemaphore); } }
+ void wait() {
+ // Try until we're not interrupted.
+ while(sem_wait(&fSemaphore) == -1 && errno == EINTR);
+ }
+ };
+#endif
+
+SkSemaphore::SkSemaphore() : fCount(0), fOSSemaphore(SkNEW(OSSemaphore)) {}
+SkSemaphore::~SkSemaphore() { SkDELETE(fOSSemaphore); }
+
+void SkSemaphore::signal(int n) {
+ SkASSERT(n >= 0);
+
+ // We only want to call the OS semaphore when our logical count crosses
+ // from <= 0 to >0 (when we need to wake sleeping threads).
+ //
+ // This is easiest to think about with specific examples of prev and n.
+ // If n == 5 and prev == -3, there are 3 threads sleeping and we signal
+ // SkTMin(-(-3), 5) == 3 times on the OS semaphore, leaving the count at 2.
+ //
+ // If prev >= 0, no threads are waiting, SkTMin(-prev, n) is always <= 0,
+ // so we don't call the OS semaphore, leaving the count at (prev + n).
+ int prev = fCount.fetch_add(n, sk_memory_order_release);
+ int toSignal = SkTMin(-prev, n);
+ if (toSignal > 0) {
+ fOSSemaphore->signal(toSignal);
+ }
+}
+
+void SkSemaphore::wait() {
+ // We only wait() on the OS semaphore if the count drops <= 0,
+ // i.e. when we need to make this thread sleep to wait for it to go back up.
+ if (fCount.fetch_add(-1, sk_memory_order_acquire) <= 0) {
+ fOSSemaphore->wait();
+ }
+}
diff --git a/src/core/SkSemaphore.h b/src/core/SkSemaphore.h
new file mode 100644
index 0000000000..5f4c2968a7
--- /dev/null
+++ b/src/core/SkSemaphore.h
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2015 Google Inc.
+ *
+ * Use of this source code is governed by a BSD-style license that can be
+ * found in the LICENSE file.
+ */
+
+#ifndef SkSemaphore_DEFINED
+#define SkSemaphore_DEFINED
+
+#include "SkTypes.h"
+#include "SkAtomics.h"
+
+/**
+ * SkSemaphore is a fast mostly-user-space semaphore.
+ *
+ * A semaphore is logically an atomic integer with a few special properties:
+ * - The integer always starts at 0.
+ * - You can only increment or decrement it, never read or write it.
+ * - Increment is spelled 'signal()'; decrement is spelled 'wait()'.
+ * - If a call to wait() decrements the counter to <= 0,
+ * the calling thread sleeps until another thread signal()s it back above 0.
+ */
+class SkSemaphore : SkNoncopyable {
+public:
+ // Initializes the counter to 0.
+ // (Though all current implementations could start from an arbitrary value.)
+ SkSemaphore();
+ ~SkSemaphore();
+
+ // Increment the counter N times.
+ // Generally it's better to call signal(N) instead of signal() N times.
+ void signal(int N = 1);
+
+ // Decrement the counter by 1,
+ // then if the counter is <= 0, sleep this thread until the counter is > 0.
+ void wait();
+
+private:
+ // This implementation follows the general strategy of
+ // 'A Lightweight Semaphore with Partial Spinning'
+ // found here
+ // http://preshing.com/20150316/semaphores-are-surprisingly-versatile/
+ // That article (and entire blog) are very much worth reading.
+ //
+ // We wrap an OS-provided semaphore with a user-space atomic counter that
+ // lets us avoid interacting with the OS semaphore unless strictly required:
+ // moving the count from >0 to <=0 or vice-versa, i.e. sleeping or waking threads.
+ struct OSSemaphore;
+
+ SkAtomic<int> fCount;
+ OSSemaphore* fOSSemaphore;
+};
+
+#endif//SkSemaphore_DEFINED
diff --git a/src/core/SkTaskGroup.cpp b/src/core/SkTaskGroup.cpp
index 17c61af960..59319c148d 100644
--- a/src/core/SkTaskGroup.cpp
+++ b/src/core/SkTaskGroup.cpp
@@ -5,12 +5,11 @@
* found in the LICENSE file.
*/
-#include "SkTaskGroup.h"
-
-#include "SkCondVar.h"
#include "SkRunnable.h"
+#include "SkSemaphore.h"
+#include "SkSpinlock.h"
#include "SkTDArray.h"
-#include "SkThread.h"
+#include "SkTaskGroup.h"
#include "SkThreadUtils.h"
#if defined(SK_BUILD_FOR_WIN32)
@@ -63,7 +62,10 @@ public:
// Lend a hand until our SkTaskGroup of interest is done.
Work work;
{
- AutoLock lock(&gGlobal->fReady);
+ // We're stealing work opportunistically,
+ // so we never call fWorkAvailable.wait(), which could sleep us if there's no work.
+ // This means fWorkAvailable is only an upper bound on fWork.count().
+ AutoLock lock(&gGlobal->fWorkLock);
if (gGlobal->fWork.isEmpty()) {
// Someone has picked up all the work (including ours). How nice of them!
// (They may still be working on it, so we can't assert *pending == 0 here.)
@@ -80,10 +82,10 @@ public:
private:
struct AutoLock {
- AutoLock(SkCondVar* c) : fC(c) { fC->lock(); }
- ~AutoLock() { fC->unlock(); }
+ AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); }
+ ~AutoLock() { fLock->release(); }
private:
- SkCondVar* fC;
+ SkSpinlock* fLock;
};
static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }
@@ -94,7 +96,7 @@ private:
SkAtomic<int32_t>* pending; // then decrement pending afterwards.
};
- explicit ThreadPool(int threads) : fDraining(false) {
+ explicit ThreadPool(int threads) {
if (threads == -1) {
threads = num_cores();
}
@@ -106,11 +108,13 @@ private:
~ThreadPool() {
SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by now.
- {
- AutoLock lock(&fReady);
- fDraining = true;
- fReady.broadcast();
+
+ // Send a poison pill to each thread.
+ SkAtomic<int> dummy(0);
+ for (int i = 0; i < fThreads.count(); i++) {
+ this->add(NULL, NULL, &dummy);
}
+ // Wait for them all to swallow the pill and die.
for (int i = 0; i < fThreads.count(); i++) {
fThreads[i]->join();
}
@@ -122,50 +126,65 @@ private:
Work work = { fn, arg, pending };
pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed.
{
- AutoLock lock(&fReady);
+ AutoLock lock(&fWorkLock);
fWork.push(work);
- fReady.signal();
}
+ fWorkAvailable.signal(1);
}
void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int32_t>* pending) {
pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed.
{
- AutoLock lock(&fReady);
+ AutoLock lock(&fWorkLock);
Work* batch = fWork.append(N);
for (int i = 0; i < N; i++) {
Work work = { fn, (char*)arg + i*stride, pending };
batch[i] = work;
}
- fReady.broadcast();
}
+ fWorkAvailable.signal(N);
}
static void Loop(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
Work work;
while (true) {
+ // Sleep until there's work available, and claim one unit of Work as we wake.
+ pool->fWorkAvailable.wait();
{
- AutoLock lock(&pool->fReady);
- while (pool->fWork.isEmpty()) {
- if (pool->fDraining) {
- return;
- }
- pool->fReady.wait();
+ AutoLock lock(&pool->fWorkLock);
+ if (pool->fWork.isEmpty()) {
+ // Someone in Wait() stole our work (fWorkAvailable is an upper bound).
+ // Well, that's fine, back to sleep for us.
+ continue;
}
pool->fWork.pop(&work);
}
+ if (!work.fn) {
+ return; // Poison pill. Time... to die.
+ }
work.fn(work.arg);
work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait().
}
}
- SkTDArray<Work> fWork;
- SkTDArray<SkThread*> fThreads;
- SkCondVar fReady;
- bool fDraining;
+ // fWorkLock must be held when reading or modifying fWork.
+ SkSpinlock fWorkLock;
+ SkTDArray<Work> fWork;
+ // A thread-safe upper bound for fWork.count().
+ //
+ // We'd have it be an exact count but for the loop in Wait():
+ // we never want that to block, so it can't call fWorkAvailable.wait(),
+ // and that's the only way to decrement fWorkAvailable.
+ // So fWorkAvailable may overcount actual the work available.
+ // We make do, but this means some worker threads may wake spuriously.
+ SkSemaphore fWorkAvailable;
+
+ // These are only changed in a single-threaded context.
+ SkTDArray<SkThread*> fThreads;
static ThreadPool* gGlobal;
+
friend struct SkTaskGroup::Enabler;
};
ThreadPool* ThreadPool::gGlobal = NULL;
@@ -174,7 +193,7 @@ ThreadPool* ThreadPool::gGlobal = NULL;
SkTaskGroup::Enabler::Enabler(int threads) {
SkASSERT(ThreadPool::gGlobal == NULL);
- if (threads != 0 && SkCondVar::Supported()) {
+ if (threads != 0) {
ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads));
}
}
diff --git a/src/utils/SkCondVar.cpp b/src/utils/SkCondVar.cpp
deleted file mode 100644
index 12aca7613e..0000000000
--- a/src/utils/SkCondVar.cpp
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright 2012 Google Inc.
- *
- * Use of this source code is governed by a BSD-style license that can be
- * found in the LICENSE file.
- */
-
-#include "SkCondVar.h"
-
-#if defined(SK_BUILD_FOR_WIN32)
- static void (WINAPI *initialize_condition_variable)(PCONDITION_VARIABLE);
- static BOOL (WINAPI *sleep_condition_variable)(PCONDITION_VARIABLE, PCRITICAL_SECTION, DWORD);
- static void (WINAPI *wake_condition_variable)(PCONDITION_VARIABLE);
- static void (WINAPI *wake_all_condition_variable)(PCONDITION_VARIABLE);
-
- template <typename T>
- static void set_fn_ptr(T* ptr, FARPROC fn) { *ptr = reinterpret_cast<T>(fn); }
-#endif
-
-bool SkCondVar::Supported() {
-#ifdef SK_BUILD_FOR_WIN32
- // If we're >= Vista we'll find these functions. Otherwise (XP) SkCondVar is not supported.
- HMODULE kernel32 = GetModuleHandleA("kernel32.dll");
- set_fn_ptr(&initialize_condition_variable,
- GetProcAddress(kernel32, "InitializeConditionVariable"));
- set_fn_ptr(&sleep_condition_variable,
- GetProcAddress(kernel32, "SleepConditionVariableCS"));
- set_fn_ptr(&wake_condition_variable,
- GetProcAddress(kernel32, "WakeConditionVariable"));
- set_fn_ptr(&wake_all_condition_variable,
- GetProcAddress(kernel32, "WakeAllConditionVariable"));
- return initialize_condition_variable
- && sleep_condition_variable
- && wake_condition_variable
- && wake_all_condition_variable;
-#else
- return true;
-#endif
-}
-
-SkCondVar::SkCondVar() {
-#ifdef SK_BUILD_FOR_WIN32
- InitializeCriticalSection(&fCriticalSection);
- SkASSERT(initialize_condition_variable);
- initialize_condition_variable(&fCondition);
-#else
- pthread_mutex_init(&fMutex, NULL /* default mutex attr */);
- pthread_cond_init(&fCond, NULL /* default cond attr */);
-#endif
-}
-
-SkCondVar::~SkCondVar() {
-#ifdef SK_BUILD_FOR_WIN32
- DeleteCriticalSection(&fCriticalSection);
- // No need to clean up fCondition.
-#else
- pthread_mutex_destroy(&fMutex);
- pthread_cond_destroy(&fCond);
-#endif
-}
-
-void SkCondVar::lock() {
-#ifdef SK_BUILD_FOR_WIN32
- EnterCriticalSection(&fCriticalSection);
-#else
- pthread_mutex_lock(&fMutex);
-#endif
-}
-
-void SkCondVar::unlock() {
-#ifdef SK_BUILD_FOR_WIN32
- LeaveCriticalSection(&fCriticalSection);
-#else
- pthread_mutex_unlock(&fMutex);
-#endif
-}
-
-void SkCondVar::wait() {
-#ifdef SK_BUILD_FOR_WIN32
- SkASSERT(sleep_condition_variable);
- sleep_condition_variable(&fCondition, &fCriticalSection, INFINITE);
-#else
- pthread_cond_wait(&fCond, &fMutex);
-#endif
-}
-
-void SkCondVar::signal() {
-#ifdef SK_BUILD_FOR_WIN32
- SkASSERT(wake_condition_variable);
- wake_condition_variable(&fCondition);
-#else
- pthread_cond_signal(&fCond);
-#endif
-}
-
-void SkCondVar::broadcast() {
-#ifdef SK_BUILD_FOR_WIN32
- SkASSERT(wake_all_condition_variable);
- wake_all_condition_variable(&fCondition);
-#else
- pthread_cond_broadcast(&fCond);
-#endif
-}
diff --git a/src/utils/SkCondVar.h b/src/utils/SkCondVar.h
deleted file mode 100644
index 0a9f94f0ab..0000000000
--- a/src/utils/SkCondVar.h
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright 2012 Google Inc.
- *
- * Use of this source code is governed by a BSD-style license that can be
- * found in the LICENSE file.
- */
-
-#ifndef SkCondVar_DEFINED
-#define SkCondVar_DEFINED
-
-#include "SkTypes.h"
-
-#ifdef SK_BUILD_FOR_WIN32
- #include <windows.h>
-#else
- #include <pthread.h>
-#endif
-
-/**
- * Condition variable for blocking access to shared data from other threads and
- * controlling which threads are awake.
- *
- * Currently only supported on platforms with posix threads and Windows Vista and above.
- */
-class SkCondVar {
-public:
- /** Returns true if it makes sense to create and use SkCondVars.
- * You _MUST_ call this method and it must return true before creating any SkCondVars.
- */
- static bool Supported();
-
- SkCondVar();
- ~SkCondVar();
-
- /**
- * Lock a mutex. Must be done before calling the other functions on this object.
- */
- void lock();
-
- /**
- * Unlock the mutex.
- */
- void unlock();
-
- /**
- * Pause the calling thread. Will be awoken when signal() or broadcast() is called.
- * Must be called while lock() is held (but gives it up while waiting). Once awoken,
- * the calling thread will hold the lock once again.
- */
- void wait();
-
- /**
- * Wake one thread waiting on this condition. Must be called while lock()
- * is held.
- */
- void signal();
-
- /**
- * Wake all threads waiting on this condition. Must be called while lock()
- * is held.
- */
- void broadcast();
-
-private:
-#ifdef SK_BUILD_FOR_WIN32
- CRITICAL_SECTION fCriticalSection;
- CONDITION_VARIABLE fCondition;
-#else
- pthread_mutex_t fMutex;
- pthread_cond_t fCond;
-#endif
-};
-
-#endif