summaryrefslogtreecommitdiff
path: root/absl/synchronization/mutex.cc
diff options
context:
space:
mode:
Diffstat (limited to 'absl/synchronization/mutex.cc')
-rw-r--r--absl/synchronization/mutex.cc115
1 files changed, 75 insertions, 40 deletions
diff --git a/absl/synchronization/mutex.cc b/absl/synchronization/mutex.cc
index 76ad41fe..52e2455d 100644
--- a/absl/synchronization/mutex.cc
+++ b/absl/synchronization/mutex.cc
@@ -109,7 +109,7 @@ static inline bool EvalConditionAnnotated(const Condition *cond, Mutex *mu,
bool locking, bool trylock,
bool read_lock);
-void RegisterMutexProfiler(void (*fn)(int64_t wait_timestamp)) {
+void RegisterMutexProfiler(void (*fn)(int64_t wait_cycles)) {
submit_profile_data.Store(fn);
}
@@ -1744,23 +1744,33 @@ ABSL_XRAY_LOG_ARGS(1) void Mutex::ReaderUnlock() {
ABSL_TSAN_MUTEX_POST_UNLOCK(this, __tsan_mutex_read_lock);
}
-// The zap_desig_waker bitmask is used to clear the designated waker flag in
-// the mutex if this thread has blocked, and therefore may be the designated
-// waker.
-static const intptr_t zap_desig_waker[] = {
- ~static_cast<intptr_t>(0), // not blocked
- ~static_cast<intptr_t>(
- kMuDesig) // blocked; turn off the designated waker bit
-};
+// Clears the designated waker flag in the mutex if this thread has blocked, and
+// therefore may be the designated waker.
+static intptr_t ClearDesignatedWakerMask(int flag) {
+ assert(flag >= 0);
+ assert(flag <= 1);
+ switch (flag) {
+ case 0: // not blocked
+ return ~static_cast<intptr_t>(0);
+ case 1: // blocked; turn off the designated waker bit
+ return ~static_cast<intptr_t>(kMuDesig);
+ }
+ ABSL_INTERNAL_UNREACHABLE;
+}
-// The ignore_waiting_writers bitmask is used to ignore the existence
-// of waiting writers if a reader that has already blocked once
-// wakes up.
-static const intptr_t ignore_waiting_writers[] = {
- ~static_cast<intptr_t>(0), // not blocked
- ~static_cast<intptr_t>(
- kMuWrWait) // blocked; pretend there are no waiting writers
-};
+// Conditionally ignores the existence of waiting writers if a reader that has
+// already blocked once wakes up.
+static intptr_t IgnoreWaitingWritersMask(int flag) {
+ assert(flag >= 0);
+ assert(flag <= 1);
+ switch (flag) {
+ case 0: // not blocked
+ return ~static_cast<intptr_t>(0);
+ case 1: // blocked; pretend there are no waiting writers
+ return ~static_cast<intptr_t>(kMuWrWait);
+ }
+ ABSL_INTERNAL_UNREACHABLE;
+}
// Internal version of LockWhen(). See LockSlowWithDeadline()
ABSL_ATTRIBUTE_NOINLINE void Mutex::LockSlow(MuHow how, const Condition *cond,
@@ -1852,8 +1862,10 @@ bool Mutex::LockSlowWithDeadline(MuHow how, const Condition *cond,
bool unlock = false;
if ((v & how->fast_need_zero) == 0 && // try fast acquire
mu_.compare_exchange_strong(
- v, (how->fast_or | (v & zap_desig_waker[flags & kMuHasBlocked])) +
- how->fast_add,
+ v,
+ (how->fast_or |
+ (v & ClearDesignatedWakerMask(flags & kMuHasBlocked))) +
+ how->fast_add,
std::memory_order_acquire, std::memory_order_relaxed)) {
if (cond == nullptr ||
EvalConditionAnnotated(cond, this, true, false, how == kShared)) {
@@ -1927,9 +1939,10 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) {
CheckForMutexCorruption(v, "Lock");
if ((v & waitp->how->slow_need_zero) == 0) {
if (mu_.compare_exchange_strong(
- v, (waitp->how->fast_or |
- (v & zap_desig_waker[flags & kMuHasBlocked])) +
- waitp->how->fast_add,
+ v,
+ (waitp->how->fast_or |
+ (v & ClearDesignatedWakerMask(flags & kMuHasBlocked))) +
+ waitp->how->fast_add,
std::memory_order_acquire, std::memory_order_relaxed)) {
if (waitp->cond == nullptr ||
EvalConditionAnnotated(waitp->cond, this, true, false,
@@ -1946,8 +1959,9 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) {
if ((v & (kMuSpin|kMuWait)) == 0) { // no waiters
// This thread tries to become the one and only waiter.
PerThreadSynch *new_h = Enqueue(nullptr, waitp, v, flags);
- intptr_t nv = (v & zap_desig_waker[flags & kMuHasBlocked] & kMuLow) |
- kMuWait;
+ intptr_t nv =
+ (v & ClearDesignatedWakerMask(flags & kMuHasBlocked) & kMuLow) |
+ kMuWait;
ABSL_RAW_CHECK(new_h != nullptr, "Enqueue to empty list failed");
if (waitp->how == kExclusive && (v & kMuReader) != 0) {
nv |= kMuWrWait;
@@ -1961,12 +1975,13 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) {
waitp->thread->waitp = nullptr;
}
} else if ((v & waitp->how->slow_inc_need_zero &
- ignore_waiting_writers[flags & kMuHasBlocked]) == 0) {
+ IgnoreWaitingWritersMask(flags & kMuHasBlocked)) == 0) {
// This is a reader that needs to increment the reader count,
// but the count is currently held in the last waiter.
if (mu_.compare_exchange_strong(
- v, (v & zap_desig_waker[flags & kMuHasBlocked]) | kMuSpin |
- kMuReader,
+ v,
+ (v & ClearDesignatedWakerMask(flags & kMuHasBlocked)) |
+ kMuSpin | kMuReader,
std::memory_order_acquire, std::memory_order_relaxed)) {
PerThreadSynch *h = GetPerThreadSynch(v);
h->readers += kMuOne; // inc reader count in waiter
@@ -1987,8 +2002,9 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) {
}
} else if ((v & kMuSpin) == 0 && // attempt to queue ourselves
mu_.compare_exchange_strong(
- v, (v & zap_desig_waker[flags & kMuHasBlocked]) | kMuSpin |
- kMuWait,
+ v,
+ (v & ClearDesignatedWakerMask(flags & kMuHasBlocked)) |
+ kMuSpin | kMuWait,
std::memory_order_acquire, std::memory_order_relaxed)) {
PerThreadSynch *h = GetPerThreadSynch(v);
PerThreadSynch *new_h = Enqueue(h, waitp, v, flags);
@@ -2315,19 +2331,21 @@ ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams *waitp) {
} // end of for(;;)-loop
if (wake_list != kPerThreadSynchNull) {
- int64_t enqueue_timestamp = wake_list->waitp->contention_start_cycles;
- bool cond_waiter = wake_list->cond_waiter;
+ int64_t wait_cycles = 0;
+ int64_t now = base_internal::CycleClock::Now();
do {
+ // Sample lock contention events only if the waiter was trying to acquire
+ // the lock, not waiting on a condition variable or Condition.
+ if (!wake_list->cond_waiter) {
+ wait_cycles += (now - wake_list->waitp->contention_start_cycles);
+ wake_list->waitp->contention_start_cycles = now;
+ }
wake_list = Wakeup(wake_list); // wake waiters
} while (wake_list != kPerThreadSynchNull);
- if (!cond_waiter) {
- // Sample lock contention events only if the (first) waiter was trying to
- // acquire the lock, not waiting on a condition variable or Condition.
- int64_t wait_cycles =
- base_internal::CycleClock::Now() - enqueue_timestamp;
+ if (wait_cycles > 0) {
mutex_tracer("slow release", this, wait_cycles);
ABSL_TSAN_MUTEX_PRE_DIVERT(this, 0);
- submit_profile_data(enqueue_timestamp);
+ submit_profile_data(wait_cycles);
ABSL_TSAN_MUTEX_POST_DIVERT(this, 0);
}
}
@@ -2492,9 +2510,9 @@ void CondVar::Remove(PerThreadSynch *s) {
// before calling Mutex::UnlockSlow(), the Mutex code might be re-entered (via
// the logging code, or via a Condition function) and might potentially attempt
// to block this thread. That would be a problem if the thread were already on
-// a the condition variable waiter queue. Thus, we use the waitp->cv_word
-// to tell the unlock code to call CondVarEnqueue() to queue the thread on the
-// condition variable queue just before the mutex is to be unlocked, and (most
+// a condition variable waiter queue. Thus, we use the waitp->cv_word to tell
+// the unlock code to call CondVarEnqueue() to queue the thread on the condition
+// variable queue just before the mutex is to be unlocked, and (most
// importantly) after any call to an external routine that might re-enter the
// mutex code.
static void CondVarEnqueue(SynchWaitParams *waitp) {
@@ -2557,6 +2575,23 @@ bool CondVar::WaitCommon(Mutex *mutex, KernelTimeout t) {
while (waitp.thread->state.load(std::memory_order_acquire) ==
PerThreadSynch::kQueued) {
if (!Mutex::DecrementSynchSem(mutex, waitp.thread, t)) {
+ // DecrementSynchSem returned due to timeout.
+ // Now we will either (1) remove ourselves from the wait list in Remove
+ // below, in which case Remove will set thread.state = kAvailable and
+ // we will not call DecrementSynchSem again; or (2) Signal/SignalAll
+ // has removed us concurrently and is calling Wakeup, which will set
+ // thread.state = kAvailable and post to the semaphore.
+ // It's important to reset the timeout for the case (2) because otherwise
+ // we can live-lock in this loop since DecrementSynchSem will always
+ // return immediately due to timeout, but Signal/SignalAll is not
+ // necessary set thread.state = kAvailable yet (and is not scheduled
+ // due to thread priorities or other scheduler artifacts).
+ // Note this could also be resolved if Signal/SignalAll would set
+ // thread.state = kAvailable while holding the wait list spin lock.
+ // But this can't be easily done for SignalAll since it grabs the whole
+ // wait list with a single compare-exchange and does not really grab
+ // the spin lock.
+ t = KernelTimeout::Never();
this->Remove(waitp.thread);
rc = true;
}