summaryrefslogtreecommitdiff
path: root/absl/synchronization
diff options
context:
space:
mode:
authorGravatar Abseil Team <absl-team@google.com>2021-01-08 09:10:22 -0800
committerGravatar Derek Mauro <dmauro@google.com>2021-01-08 12:50:29 -0500
commit62ce712ecc887f669610a93efe18abecf70b47a0 (patch)
tree21ef34fc27e09d5ff51baad57d27b9ec033f9388 /absl/synchronization
parent92ba53599931fcbe31e7970497cb9e60091434c1 (diff)
Export of internal Abseil changes
-- b927776da818c674a674e46a7bbbdd54170a0ad3 by Todd Lipcon <tlipcon@google.com>: Include priority in the calculation of mutex waiter equivalence This changes the behavior of the absl::Mutex wait list to take into account waiter priority when creating "skip chains". A skip chain on the wait list is a set of adjacent waiters that share some property and enable skipping during traversal. Prior to this CL, the skip chains were formed of waiters with the same wait type (e.g. exclusive vs read) and Condition. With this CL, the priority is also taken into account. This avoids O(n) behavior when enqueueing a waiter onto a wait list where the oldest waiter is at a lower priority than the waiter to be enqueued. With the prior notion of equivalence class, a skip chain could contain waiters of different priority, so we had to walk the linked list one-by-one until finding the appropriate insertion point. With the new equivalence class computation, we can skip past all of the equivalent waiters to find the right insertion point. This gives a substantial improvement to the enqueue performance in the case where there's already a waiter at lower priority. Note that even though this code path isn't a hot one, it's performed while holding the Mutex's spinlock, which prevents other threads from unlocking the Mutex, so minimizing the time under the critical section can have "knock-on" throughput benefits. Notable performance differences: name old cpu/op new cpu/op delta BM_MutexEnqueue/multiple_priorities:0/threads:4 8.60µs ± 7% 8.69µs ± 6% ~ (p=0.365 n=19+20) BM_MutexEnqueue/multiple_priorities:0/threads:64 8.47µs ± 5% 8.64µs ±10% ~ (p=0.569 n=19+20) BM_MutexEnqueue/multiple_priorities:0/threads:128 8.56µs ± 3% 8.55µs ± 6% ~ (p=0.563 n=17+17) BM_MutexEnqueue/multiple_priorities:0/threads:512 8.98µs ± 8% 8.86µs ± 4% ~ (p=0.232 n=19+17) BM_MutexEnqueue/multiple_priorities:1/threads:4 6.64µs ±10% 6.45µs ± 4% ~ (p=0.097 n=20+17) BM_MutexEnqueue/multiple_priorities:1/threads:64 15.2µs ± 8% 9.1µs ± 4% -39.93% (p=0.000 n=20+17) BM_MutexEnqueue/multiple_priorities:1/threads:128 22.3µs ± 6% 9.4µs ± 4% -57.82% (p=0.000 n=20+17) BM_MutexEnqueue/multiple_priorities:1/threads:512 61.5µs ± 3% 10.1µs ± 8% -83.53% (p=0.000 n=20+20) name old time/op new time/op delta BM_Mutex/real_time/threads:1 19.6ns ± 4% 19.8ns ±11% ~ (p=0.534 n=17+17) BM_Mutex/real_time/threads:112 120ns ±17% 122ns ±14% ~ (p=0.988 n=20+18) BM_MutexEnqueue/multiple_priorities:0/threads:4 5.18µs ± 6% 5.23µs ± 6% ~ (p=0.428 n=19+20) BM_MutexEnqueue/multiple_priorities:0/threads:64 5.06µs ± 5% 5.18µs ±10% ~ (p=0.235 n=19+20) BM_MutexEnqueue/multiple_priorities:0/threads:128 5.16µs ± 3% 5.14µs ± 6% ~ (p=0.474 n=17+17) BM_MutexEnqueue/multiple_priorities:0/threads:512 5.40µs ± 8% 5.32µs ± 5% ~ (p=0.196 n=20+18) BM_MutexEnqueue/multiple_priorities:1/threads:4 3.99µs ±10% 3.88µs ± 3% ~ (p=0.074 n=20+17) BM_MutexEnqueue/multiple_priorities:1/threads:64 8.48µs ± 9% 5.41µs ± 3% -36.20% (p=0.000 n=20+16) BM_MutexEnqueue/multiple_priorities:1/threads:128 12.2µs ± 6% 5.6µs ± 4% -54.43% (p=0.000 n=20+17) BM_MutexEnqueue/multiple_priorities:1/threads:512 32.1µs ± 3% 5.9µs ± 8% -81.45% (p=0.000 n=20+20) ... BM_Contended<absl::Mutex>/cs_ns:2000/num_prios:2/real_time/threads:32 1.69µs ± 4% 1.66µs ± 2% -1.91% (p=0.000 n=20+20) BM_Contended<absl::Mutex>/cs_ns:2000/num_prios:2/real_time/threads:48 1.90µs ± 2% 1.82µs ± 2% -4.09% (p=0.000 n=20+19) BM_Contended<absl::Mutex>/cs_ns:2000/num_prios:2/real_time/threads:64 2.19µs ± 2% 1.80µs ± 1% -17.89% (p=0.000 n=20+20) BM_Contended<absl::Mutex>/cs_ns:2000/num_prios:2/real_time/threads:96 2.18µs ± 5% 1.81µs ± 1% -16.94% (p=0.000 n=17+19) BM_Contended<absl::Mutex>/cs_ns:2000/num_prios:2/real_time/threads:128 2.18µs ± 1% 1.91µs ± 2% -12.33% (p=0.000 n=19+20) BM_Contended<absl::Mutex>/cs_ns:2000/num_prios:2/real_time/threads:192 2.27µs ± 2% 1.89µs ± 1% -16.79% (p=0.000 n=20+19) BM_Contended<absl::Mutex>/cs_ns:2000/num_prios:2/real_time/threads:256 2.36µs ± 2% 1.83µs ± 1% -22.25% (p=0.000 n=20+19) PiperOrigin-RevId: 350775432 -- e7812590e5dbd75d21e2e8762713bd04c0353ef6 by Todd Lipcon <tlipcon@google.com>: Fix test timeouts for sequence_lock_test on TSAN PiperOrigin-RevId: 350680903 -- 3090d8154d875f3eabce48876321ae8d6a197302 by Todd Lipcon <tlipcon@google.com>: Add benchmarks for Mutex performance with multiple priorities This adds a new benchmark to mutex_benchmark which forces threads to go through the slow "Enqueue" path. The benchmark runs with varying numbers of threads and with/without the presence of a lower-priority waiter. PiperOrigin-RevId: 350655403 GitOrigin-RevId: b927776da818c674a674e46a7bbbdd54170a0ad3 Change-Id: If739e5e205f0d3867661a52466b8f64e7e033b22
Diffstat (limited to 'absl/synchronization')
-rw-r--r--absl/synchronization/mutex.cc68
-rw-r--r--absl/synchronization/mutex_benchmark.cc224
2 files changed, 188 insertions, 104 deletions
diff --git a/absl/synchronization/mutex.cc b/absl/synchronization/mutex.cc
index 82b631af..d2468ce5 100644
--- a/absl/synchronization/mutex.cc
+++ b/absl/synchronization/mutex.cc
@@ -761,11 +761,13 @@ void SetMutexDeadlockDetectionMode(OnDeadlockCycle mode) {
synch_deadlock_detection.store(mode, std::memory_order_release);
}
-// Return true iff threads x and y are waiting on the same condition for the
-// same type of lock. Requires that x and y be waiting on the same Mutex
-// queue.
-static bool MuSameCondition(PerThreadSynch *x, PerThreadSynch *y) {
- return x->waitp->how == y->waitp->how &&
+// Return true iff threads x and y are part of the same equivalence
+// class of waiters. An equivalence class is defined as the set of
+// waiters with the same condition, type of lock, and thread priority.
+//
+// Requires that x and y be waiting on the same Mutex queue.
+static bool MuEquivalentWaiter(PerThreadSynch *x, PerThreadSynch *y) {
+ return x->waitp->how == y->waitp->how && x->priority == y->priority &&
Condition::GuaranteedEqual(x->waitp->cond, y->waitp->cond);
}
@@ -784,18 +786,19 @@ static inline PerThreadSynch *GetPerThreadSynch(intptr_t v) {
// - invalid (iff x is not in a Mutex wait queue),
// - null, or
// - a pointer to a distinct thread waiting later in the same Mutex queue
-// such that all threads in [x, x->skip] have the same condition and
-// lock type (MuSameCondition() is true for all pairs in [x, x->skip]).
+// such that all threads in [x, x->skip] have the same condition, priority
+// and lock type (MuEquivalentWaiter() is true for all pairs in [x,
+// x->skip]).
// In addition, if x->skip is valid, (x->may_skip || x->skip == null)
//
-// By the spec of MuSameCondition(), it is not necessary when removing the
+// By the spec of MuEquivalentWaiter(), it is not necessary when removing the
// first runnable thread y from the front a Mutex queue to adjust the skip
// field of another thread x because if x->skip==y, x->skip must (have) become
// invalid before y is removed. The function TryRemove can remove a specified
// thread from an arbitrary position in the queue whether runnable or not, so
// it fixes up skip fields that would otherwise be left dangling.
// The statement
-// if (x->may_skip && MuSameCondition(x, x->next)) { x->skip = x->next; }
+// if (x->may_skip && MuEquivalentWaiter(x, x->next)) { x->skip = x->next; }
// maintains the invariant provided x is not the last waiter in a Mutex queue
// The statement
// if (x->skip != null) { x->skip = x->skip->skip; }
@@ -929,24 +932,17 @@ static PerThreadSynch *Enqueue(PerThreadSynch *head,
if (s->priority > head->priority) { // s's priority is above head's
// try to put s in priority-fifo order, or failing that at the front.
if (!head->maybe_unlocking) {
- // No unlocker can be scanning the queue, so we can insert between
- // skip-chains, and within a skip-chain if it has the same condition as
- // s. We insert in priority-fifo order, examining the end of every
- // skip-chain, plus every element with the same condition as s.
+ // No unlocker can be scanning the queue, so we can insert into the
+ // middle of the queue.
+ //
+ // Within a skip chain, all waiters have the same priority, so we can
+ // skip forward through the chains until we find one with a lower
+ // priority than the waiter to be enqueued.
PerThreadSynch *advance_to = head; // next value of enqueue_after
- PerThreadSynch *cur; // successor of enqueue_after
do {
enqueue_after = advance_to;
- cur = enqueue_after->next; // this advance ensures progress
- advance_to = Skip(cur); // normally, advance to end of skip chain
- // (side-effect: optimizes skip chain)
- if (advance_to != cur && s->priority > advance_to->priority &&
- MuSameCondition(s, cur)) {
- // but this skip chain is not a singleton, s has higher priority
- // than its tail and has the same condition as the chain,
- // so we can insert within the skip-chain
- advance_to = cur; // advance by just one
- }
+ // (side-effect: optimizes skip chain)
+ advance_to = Skip(enqueue_after->next);
} while (s->priority <= advance_to->priority);
// termination guaranteed because s->priority > head->priority
// and head is the end of a skip chain
@@ -965,21 +961,21 @@ static PerThreadSynch *Enqueue(PerThreadSynch *head,
// enqueue_after can be: head, Skip(...), or cur.
// The first two imply enqueue_after->skip == nullptr, and
- // the last is used only if MuSameCondition(s, cur).
+ // the last is used only if MuEquivalentWaiter(s, cur).
// We require this because clearing enqueue_after->skip
// is impossible; enqueue_after's predecessors might also
// incorrectly skip over s if we were to allow other
// insertion points.
- ABSL_RAW_CHECK(
- enqueue_after->skip == nullptr || MuSameCondition(enqueue_after, s),
- "Mutex Enqueue failure");
+ ABSL_RAW_CHECK(enqueue_after->skip == nullptr ||
+ MuEquivalentWaiter(enqueue_after, s),
+ "Mutex Enqueue failure");
if (enqueue_after != head && enqueue_after->may_skip &&
- MuSameCondition(enqueue_after, enqueue_after->next)) {
+ MuEquivalentWaiter(enqueue_after, enqueue_after->next)) {
// enqueue_after can skip to its new successor, s
enqueue_after->skip = enqueue_after->next;
}
- if (MuSameCondition(s, s->next)) { // s->may_skip is known to be true
+ if (MuEquivalentWaiter(s, s->next)) { // s->may_skip is known to be true
s->skip = s->next; // s may skip to its successor
}
} else { // enqueue not done any other way, so
@@ -989,7 +985,7 @@ static PerThreadSynch *Enqueue(PerThreadSynch *head,
head->next = s;
s->readers = head->readers; // reader count is from previous head
s->maybe_unlocking = head->maybe_unlocking; // same for unlock hint
- if (head->may_skip && MuSameCondition(head, s)) {
+ if (head->may_skip && MuEquivalentWaiter(head, s)) {
// head now has successor; may skip
head->skip = s;
}
@@ -1009,7 +1005,7 @@ static PerThreadSynch *Dequeue(PerThreadSynch *head, PerThreadSynch *pw) {
pw->next = w->next; // snip w out of list
if (head == w) { // we removed the head
head = (pw == w) ? nullptr : pw; // either emptied list, or pw is new head
- } else if (pw != head && MuSameCondition(pw, pw->next)) {
+ } else if (pw != head && MuEquivalentWaiter(pw, pw->next)) {
// pw can skip to its new successor
if (pw->next->skip !=
nullptr) { // either skip to its successors skip target
@@ -1079,11 +1075,13 @@ void Mutex::TryRemove(PerThreadSynch *s) {
PerThreadSynch *w;
if ((w = pw->next) != s) { // search for thread,
do { // processing at least one element
- if (!MuSameCondition(s, w)) { // seeking different condition
+ // If the current element isn't equivalent to the waiter to be
+ // removed, we can skip the entire chain.
+ if (!MuEquivalentWaiter(s, w)) {
pw = Skip(w); // so skip all that won't match
// we don't have to worry about dangling skip fields
// in the threads we skipped; none can point to s
- // because their condition differs from s
+ // because they are in a different equivalence class.
} else { // seeking same condition
FixSkip(w, s); // fix up any skip pointer from w to s
pw = w;
@@ -2148,7 +2146,7 @@ ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams *waitp) {
!old_h->may_skip) { // we used old_h as a terminator
old_h->may_skip = true; // allow old_h to skip once more
ABSL_RAW_CHECK(old_h->skip == nullptr, "illegal skip from head");
- if (h != old_h && MuSameCondition(old_h, old_h->next)) {
+ if (h != old_h && MuEquivalentWaiter(old_h, old_h->next)) {
old_h->skip = old_h->next; // old_h not head & can skip to successor
}
}
diff --git a/absl/synchronization/mutex_benchmark.cc b/absl/synchronization/mutex_benchmark.cc
index 933ea14f..e35aed8b 100644
--- a/absl/synchronization/mutex_benchmark.cc
+++ b/absl/synchronization/mutex_benchmark.cc
@@ -61,8 +61,124 @@ class RaiiLocker<std::mutex> {
std::mutex* mu_;
};
+// RAII object to change the Mutex priority of the running thread.
+class ScopedThreadMutexPriority {
+ public:
+ explicit ScopedThreadMutexPriority(int priority) {
+ absl::base_internal::ThreadIdentity* identity =
+ absl::synchronization_internal::GetOrCreateCurrentThreadIdentity();
+ identity->per_thread_synch.priority = priority;
+ // Bump next_priority_read_cycles to the infinite future so that the
+ // implementation doesn't re-read the thread's actual scheduler priority
+ // and replace our temporary scoped priority.
+ identity->per_thread_synch.next_priority_read_cycles =
+ std::numeric_limits<int64_t>::max();
+ }
+ ~ScopedThreadMutexPriority() {
+ // Reset the "next priority read time" back to the infinite past so that
+ // the next time the Mutex implementation wants to know this thread's
+ // priority, it re-reads it from the OS instead of using our overridden
+ // priority.
+ absl::synchronization_internal::GetOrCreateCurrentThreadIdentity()
+ ->per_thread_synch.next_priority_read_cycles =
+ std::numeric_limits<int64_t>::min();
+ }
+};
+
+void BM_MutexEnqueue(benchmark::State& state) {
+ // In the "multiple priorities" variant of the benchmark, one of the
+ // threads runs with Mutex priority 0 while the rest run at elevated priority.
+ // This benchmarks the performance impact of the presence of a low priority
+ // waiter when a higher priority waiter adds itself of the queue
+ // (b/175224064).
+ //
+ // NOTE: The actual scheduler priority is not modified in this benchmark:
+ // all of the threads get CPU slices with the same priority. Only the
+ // Mutex queueing behavior is modified.
+ const bool multiple_priorities = state.range(0);
+ ScopedThreadMutexPriority priority_setter(
+ (multiple_priorities && state.thread_index != 0) ? 1 : 0);
+
+ struct Shared {
+ absl::Mutex mu;
+ std::atomic<int> looping_threads{0};
+ std::atomic<int> blocked_threads{0};
+ std::atomic<bool> thread_has_mutex{false};
+ };
+ static Shared* shared = new Shared;
+
+ // Set up 'blocked_threads' to count how many threads are currently blocked
+ // in Abseil synchronization code.
+ //
+ // NOTE: Blocking done within the Google Benchmark library itself (e.g.
+ // the barrier which synchronizes threads entering and exiting the benchmark
+ // loop) does _not_ get registered in this counter. This is because Google
+ // Benchmark uses its own synchronization primitives based on std::mutex, not
+ // Abseil synchronization primitives. If at some point the benchmark library
+ // merges into Abseil, this code may break.
+ absl::synchronization_internal::PerThreadSem::SetThreadBlockedCounter(
+ &shared->blocked_threads);
+
+ // The benchmark framework may run several iterations in the same process,
+ // reusing the same static-initialized 'shared' object. Given the semantics
+ // of the members, here, we expect everything to be reset to zero by the
+ // end of any iteration. Assert that's the case, just to be sure.
+ ABSL_RAW_CHECK(
+ shared->looping_threads.load(std::memory_order_relaxed) == 0 &&
+ shared->blocked_threads.load(std::memory_order_relaxed) == 0 &&
+ !shared->thread_has_mutex.load(std::memory_order_relaxed),
+ "Shared state isn't zeroed at start of benchmark iteration");
+
+ static constexpr int kBatchSize = 1000;
+ while (state.KeepRunningBatch(kBatchSize)) {
+ shared->looping_threads.fetch_add(1);
+ for (int i = 0; i < kBatchSize; i++) {
+ {
+ absl::MutexLock l(&shared->mu);
+ shared->thread_has_mutex.store(true, std::memory_order_relaxed);
+ // Spin until all other threads are either out of the benchmark loop
+ // or blocked on the mutex. This ensures that the mutex queue is kept
+ // at its maximal length to benchmark the performance of queueing on
+ // a highly contended mutex.
+ while (shared->looping_threads.load(std::memory_order_relaxed) -
+ shared->blocked_threads.load(std::memory_order_relaxed) !=
+ 1) {
+ }
+ shared->thread_has_mutex.store(false);
+ }
+ // Spin until some other thread has acquired the mutex before we block
+ // again. This ensures that we always go through the slow (queueing)
+ // acquisition path rather than reacquiring the mutex we just released.
+ while (!shared->thread_has_mutex.load(std::memory_order_relaxed) &&
+ shared->looping_threads.load(std::memory_order_relaxed) > 1) {
+ }
+ }
+ // The benchmark framework uses a barrier to ensure that all of the threads
+ // complete their benchmark loop together before any of the threads exit
+ // the loop. So, we need to remove ourselves from the "looping threads"
+ // counter here before potentially blocking on that barrier. Otherwise,
+ // another thread spinning above might wait forever for this thread to
+ // block on the mutex while we in fact are waiting to exit.
+ shared->looping_threads.fetch_add(-1);
+ }
+ absl::synchronization_internal::PerThreadSem::SetThreadBlockedCounter(
+ nullptr);
+}
+
+BENCHMARK(BM_MutexEnqueue)
+ ->Threads(4)
+ ->Threads(64)
+ ->Threads(128)
+ ->Threads(512)
+ ->ArgName("multiple_priorities")
+ ->Arg(false)
+ ->Arg(true);
+
template <typename MutexType>
void BM_Contended(benchmark::State& state) {
+ int priority = state.thread_index % state.range(1);
+ ScopedThreadMutexPriority priority_setter(priority);
+
struct Shared {
MutexType mu;
int data = 0;
@@ -85,81 +201,51 @@ void BM_Contended(benchmark::State& state) {
DelayNs(state.range(0), &shared->data);
}
}
+void SetupBenchmarkArgs(benchmark::internal::Benchmark* bm,
+ bool do_test_priorities) {
+ const int max_num_priorities = do_test_priorities ? 2 : 1;
+ bm->UseRealTime()
+ // ThreadPerCpu poorly handles non-power-of-two CPU counts.
+ ->Threads(1)
+ ->Threads(2)
+ ->Threads(4)
+ ->Threads(6)
+ ->Threads(8)
+ ->Threads(12)
+ ->Threads(16)
+ ->Threads(24)
+ ->Threads(32)
+ ->Threads(48)
+ ->Threads(64)
+ ->Threads(96)
+ ->Threads(128)
+ ->Threads(192)
+ ->Threads(256)
+ ->ArgNames({"cs_ns", "num_prios"});
+ // Some empirically chosen amounts of work in critical section.
+ // 1 is low contention, 2000 is high contention and few values in between.
+ for (int critical_section_ns : {1, 20, 50, 200, 2000}) {
+ for (int num_priorities = 1; num_priorities <= max_num_priorities;
+ num_priorities++) {
+ bm->ArgPair(critical_section_ns, num_priorities);
+ }
+ }
+}
BENCHMARK_TEMPLATE(BM_Contended, absl::Mutex)
- ->UseRealTime()
- // ThreadPerCpu poorly handles non-power-of-two CPU counts.
- ->Threads(1)
- ->Threads(2)
- ->Threads(4)
- ->Threads(6)
- ->Threads(8)
- ->Threads(12)
- ->Threads(16)
- ->Threads(24)
- ->Threads(32)
- ->Threads(48)
- ->Threads(64)
- ->Threads(96)
- ->Threads(128)
- ->Threads(192)
- ->Threads(256)
- // Some empirically chosen amounts of work in critical section.
- // 1 is low contention, 200 is high contention and few values in between.
- ->Arg(1)
- ->Arg(20)
- ->Arg(50)
- ->Arg(200);
+ ->Apply([](benchmark::internal::Benchmark* bm) {
+ SetupBenchmarkArgs(bm, /*do_test_priorities=*/true);
+ });
BENCHMARK_TEMPLATE(BM_Contended, absl::base_internal::SpinLock)
- ->UseRealTime()
- // ThreadPerCpu poorly handles non-power-of-two CPU counts.
- ->Threads(1)
- ->Threads(2)
- ->Threads(4)
- ->Threads(6)
- ->Threads(8)
- ->Threads(12)
- ->Threads(16)
- ->Threads(24)
- ->Threads(32)
- ->Threads(48)
- ->Threads(64)
- ->Threads(96)
- ->Threads(128)
- ->Threads(192)
- ->Threads(256)
- // Some empirically chosen amounts of work in critical section.
- // 1 is low contention, 200 is high contention and few values in between.
- ->Arg(1)
- ->Arg(20)
- ->Arg(50)
- ->Arg(200);
+ ->Apply([](benchmark::internal::Benchmark* bm) {
+ SetupBenchmarkArgs(bm, /*do_test_priorities=*/false);
+ });
BENCHMARK_TEMPLATE(BM_Contended, std::mutex)
- ->UseRealTime()
- // ThreadPerCpu poorly handles non-power-of-two CPU counts.
- ->Threads(1)
- ->Threads(2)
- ->Threads(4)
- ->Threads(6)
- ->Threads(8)
- ->Threads(12)
- ->Threads(16)
- ->Threads(24)
- ->Threads(32)
- ->Threads(48)
- ->Threads(64)
- ->Threads(96)
- ->Threads(128)
- ->Threads(192)
- ->Threads(256)
- // Some empirically chosen amounts of work in critical section.
- // 1 is low contention, 200 is high contention and few values in between.
- ->Arg(1)
- ->Arg(20)
- ->Arg(50)
- ->Arg(200);
+ ->Apply([](benchmark::internal::Benchmark* bm) {
+ SetupBenchmarkArgs(bm, /*do_test_priorities=*/false);
+ });
// Measure the overhead of conditions on mutex release (when they must be
// evaluated). Mutex has (some) support for equivalence classes allowing