summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--absl/synchronization/internal/futex.h70
-rw-r--r--absl/synchronization/internal/waiter.cc106
-rw-r--r--absl/synchronization/internal/waiter.h3
-rw-r--r--absl/synchronization/mutex.cc45
4 files changed, 177 insertions, 47 deletions
diff --git a/absl/synchronization/internal/futex.h b/absl/synchronization/internal/futex.h
index 9cf9841d..62bb40f7 100644
--- a/absl/synchronization/internal/futex.h
+++ b/absl/synchronization/internal/futex.h
@@ -16,9 +16,7 @@
#include "absl/base/config.h"
-#ifdef _WIN32
-#include <windows.h>
-#else
+#ifndef _WIN32
#include <sys/time.h>
#include <unistd.h>
#endif
@@ -85,34 +83,60 @@ namespace synchronization_internal {
class FutexImpl {
public:
- static int WaitUntil(std::atomic<int32_t> *v, int32_t val,
+ // Atomically check that `*v == val`, and if it is, then sleep until the
+ // timeout `t` has been reached, or until woken by `Wake()`.
+ static int WaitUntil(std::atomic<int32_t>* v, int32_t val,
KernelTimeout t) {
- long err = 0; // NOLINT(runtime/int)
- if (t.has_timeout()) {
- // https://locklessinc.com/articles/futex_cheat_sheet/
- // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
- struct timespec abs_timeout = t.MakeAbsTimespec();
- // Atomically check that the futex value is still 0, and if it
- // is, sleep until abs_timeout or until woken by FUTEX_WAKE.
- err = syscall(
- SYS_futex, reinterpret_cast<int32_t *>(v),
- FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val,
- &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
+ if (!t.has_timeout()) {
+ return Wait(v, val);
+ } else if (t.is_absolute_timeout()) {
+ auto abs_timespec = t.MakeAbsTimespec();
+ return WaitAbsoluteTimeout(v, val, &abs_timespec);
} else {
- // Atomically check that the futex value is still 0, and if it
- // is, sleep until woken by FUTEX_WAKE.
- err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
- FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr);
+ auto rel_timespec = t.MakeRelativeTimespec();
+ return WaitRelativeTimeout(v, val, &rel_timespec);
}
- if (ABSL_PREDICT_FALSE(err != 0)) {
+ }
+
+ // Atomically check that `*v == val`, and if it is, then sleep until the until
+ // woken by `Wake()`.
+ static int Wait(std::atomic<int32_t>* v, int32_t val) {
+ return WaitAbsoluteTimeout(v, val, nullptr);
+ }
+
+ // Atomically check that `*v == val`, and if it is, then sleep until
+ // CLOCK_REALTIME reaches `*abs_timeout`, or until woken by `Wake()`.
+ static int WaitAbsoluteTimeout(std::atomic<int32_t>* v, int32_t val,
+ const struct timespec* abs_timeout) {
+ // https://locklessinc.com/articles/futex_cheat_sheet/
+ // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
+ auto err =
+ syscall(SYS_futex, reinterpret_cast<int32_t*>(v),
+ FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME,
+ val, abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
+ if (err != 0) {
+ return -errno;
+ }
+ return 0;
+ }
+
+ // Atomically check that `*v == val`, and if it is, then sleep until
+ // `*rel_timeout` has elapsed, or until woken by `Wake()`.
+ static int WaitRelativeTimeout(std::atomic<int32_t>* v, int32_t val,
+ const struct timespec* rel_timeout) {
+ // Atomically check that the futex value is still 0, and if it
+ // is, sleep until abs_timeout or until woken by FUTEX_WAKE.
+ auto err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v),
+ FUTEX_PRIVATE_FLAG, val, rel_timeout);
+ if (err != 0) {
return -errno;
}
return 0;
}
- static int Wake(std::atomic<int32_t> *v, int32_t count) {
- // NOLINTNEXTLINE(runtime/int)
- long err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v),
+ // Wakes at most `count` waiters that have entered the sleep state on `v`.
+ static int Wake(std::atomic<int32_t>* v, int32_t count) {
+ auto err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v),
FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count);
if (ABSL_PREDICT_FALSE(err < 0)) {
return -errno;
diff --git a/absl/synchronization/internal/waiter.cc b/absl/synchronization/internal/waiter.cc
index f2051d67..4eee1298 100644
--- a/absl/synchronization/internal/waiter.cc
+++ b/absl/synchronization/internal/waiter.cc
@@ -67,11 +67,9 @@ static void MaybeBecomeIdle() {
#if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
-Waiter::Waiter() {
- futex_.store(0, std::memory_order_relaxed);
-}
+Waiter::Waiter() : futex_(0) {}
-bool Waiter::Wait(KernelTimeout t) {
+bool Waiter::WaitAbsoluteTimeout(KernelTimeout t) {
// Loop until we can atomically decrement futex from a positive
// value, waiting on a futex while we believe it is zero.
// Note that, since the thread ticker is just reset, we don't need to check
@@ -90,7 +88,88 @@ bool Waiter::Wait(KernelTimeout t) {
}
if (!first_pass) MaybeBecomeIdle();
- const int err = Futex::WaitUntil(&futex_, 0, t);
+ auto abs_timeout = t.MakeAbsTimespec();
+ const int err = Futex::WaitAbsoluteTimeout(&futex_, 0, &abs_timeout);
+ if (err != 0) {
+ if (err == -EINTR || err == -EWOULDBLOCK) {
+ // Do nothing, the loop will retry.
+ } else if (err == -ETIMEDOUT) {
+ return false;
+ } else {
+ ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
+ }
+ }
+ first_pass = false;
+ }
+}
+
+#ifdef CLOCK_MONOTONIC
+
+// Subtracts the timespec `sub` from `in` if the result would not be negative,
+// and returns true. Returns false if the result would be negative, and leaves
+// `in` unchanged.
+static bool TimespecSubtract(struct timespec& in, const struct timespec& sub) {
+ if (in.tv_sec < sub.tv_sec) {
+ return false;
+ }
+ if (in.tv_nsec < sub.tv_nsec) {
+ if (in.tv_sec == sub.tv_sec) {
+ return false;
+ }
+ // Borrow from tv_sec.
+ in.tv_sec -= 1;
+ in.tv_nsec += 1'000'000'000;
+ }
+ in.tv_sec -= sub.tv_sec;
+ in.tv_nsec -= sub.tv_nsec;
+ return true;
+}
+
+// On some platforms a background thread periodically calls `Poke()` to briefly
+// wake waiter threads so that they may call `MaybeBecomeIdle()`. This means
+// that `WaitRelativeTimeout()` differs slightly from `WaitAbsoluteTimeout()`
+// because it must adjust the timeout by the amount of time that it has already
+// slept.
+bool Waiter::WaitRelativeTimeout(KernelTimeout t) {
+ struct timespec start;
+ ABSL_RAW_CHECK(clock_gettime(CLOCK_MONOTONIC, &start) == 0,
+ "clock_gettime() failed");
+
+ // Loop until we can atomically decrement futex from a positive
+ // value, waiting on a futex while we believe it is zero.
+ // Note that, since the thread ticker is just reset, we don't need to check
+ // whether the thread is idle on the very first pass of the loop.
+ bool first_pass = true;
+
+ while (true) {
+ int32_t x = futex_.load(std::memory_order_relaxed);
+ while (x != 0) {
+ if (!futex_.compare_exchange_weak(x, x - 1,
+ std::memory_order_acquire,
+ std::memory_order_relaxed)) {
+ continue; // Raced with someone, retry.
+ }
+ return true; // Consumed a wakeup, we are done.
+ }
+
+ auto relative_timeout = t.MakeRelativeTimespec();
+ if (!first_pass) {
+ MaybeBecomeIdle();
+
+ // Adjust relative_timeout for `Poke()`s.
+ struct timespec now;
+ ABSL_RAW_CHECK(clock_gettime(CLOCK_MONOTONIC, &now) == 0,
+ "clock_gettime() failed");
+ // If TimespecSubstract(now, start) returns false, then the clock isn't
+ // truly monotonic.
+ if (TimespecSubtract(now, start)) {
+ if (!TimespecSubtract(relative_timeout, now)) {
+ return false; // Timeout.
+ }
+ }
+ }
+
+ const int err = Futex::WaitRelativeTimeout(&futex_, 0, &relative_timeout);
if (err != 0) {
if (err == -EINTR || err == -EWOULDBLOCK) {
// Do nothing, the loop will retry.
@@ -104,6 +183,23 @@ bool Waiter::Wait(KernelTimeout t) {
}
}
+#else // CLOCK_MONOTONIC
+
+// No support for CLOCK_MONOTONIC.
+// KernelTimeout will automatically convert to an absolute timeout.
+bool Waiter::WaitRelativeTimeout(KernelTimeout t) {
+ return WaitAbsoluteTimeout(t);
+}
+
+#endif // CLOCK_MONOTONIC
+
+bool Waiter::Wait(KernelTimeout t) {
+ if (t.is_absolute_timeout()) {
+ return WaitAbsoluteTimeout(t);
+ }
+ return WaitRelativeTimeout(t);
+}
+
void Waiter::Post() {
if (futex_.fetch_add(1, std::memory_order_release) == 0) {
// We incremented from 0, need to wake a potential waiter.
diff --git a/absl/synchronization/internal/waiter.h b/absl/synchronization/internal/waiter.h
index b8adfeb5..c206cc3f 100644
--- a/absl/synchronization/internal/waiter.h
+++ b/absl/synchronization/internal/waiter.h
@@ -110,6 +110,9 @@ class Waiter {
~Waiter() = delete;
#if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
+ bool WaitAbsoluteTimeout(KernelTimeout t);
+ bool WaitRelativeTimeout(KernelTimeout t);
+
// Futexes are defined by specification to be 32-bits.
// Thus std::atomic<int32_t> must be just an int32_t with lockfree methods.
std::atomic<int32_t> futex_;
diff --git a/absl/synchronization/mutex.cc b/absl/synchronization/mutex.cc
index 064ccb74..f6a8506c 100644
--- a/absl/synchronization/mutex.cc
+++ b/absl/synchronization/mutex.cc
@@ -635,21 +635,6 @@ void Mutex::InternalAttemptToUseMutexInFatalSignalHandler() {
std::memory_order_release);
}
-// --------------------------time support
-
-// Return the current time plus the timeout. Use the same clock as
-// PerThreadSem::Wait() for consistency. Unfortunately, we don't have
-// such a choice when a deadline is given directly.
-static absl::Time DeadlineFromTimeout(absl::Duration timeout) {
-#ifndef _WIN32
- struct timeval tv;
- gettimeofday(&tv, nullptr);
- return absl::TimeFromTimeval(tv) + timeout;
-#else
- return absl::Now() + timeout;
-#endif
-}
-
// --------------------------Mutexes
// In the layout below, the msb of the bottom byte is currently unused. Also,
@@ -1546,7 +1531,13 @@ void Mutex::LockWhen(const Condition &cond) {
}
bool Mutex::LockWhenWithTimeout(const Condition &cond, absl::Duration timeout) {
- return LockWhenWithDeadline(cond, DeadlineFromTimeout(timeout));
+ ABSL_TSAN_MUTEX_PRE_LOCK(this, 0);
+ GraphId id = DebugOnlyDeadlockCheck(this);
+ bool res = LockSlowWithDeadline(kExclusive, &cond,
+ KernelTimeout(timeout), 0);
+ DebugOnlyLockEnter(this, id);
+ ABSL_TSAN_MUTEX_POST_LOCK(this, 0, 0);
+ return res;
}
bool Mutex::LockWhenWithDeadline(const Condition &cond, absl::Time deadline) {
@@ -1569,7 +1560,12 @@ void Mutex::ReaderLockWhen(const Condition &cond) {
bool Mutex::ReaderLockWhenWithTimeout(const Condition &cond,
absl::Duration timeout) {
- return ReaderLockWhenWithDeadline(cond, DeadlineFromTimeout(timeout));
+ ABSL_TSAN_MUTEX_PRE_LOCK(this, __tsan_mutex_read_lock);
+ GraphId id = DebugOnlyDeadlockCheck(this);
+ bool res = LockSlowWithDeadline(kShared, &cond, KernelTimeout(timeout), 0);
+ DebugOnlyLockEnter(this, id);
+ ABSL_TSAN_MUTEX_POST_LOCK(this, __tsan_mutex_read_lock, 0);
+ return res;
}
bool Mutex::ReaderLockWhenWithDeadline(const Condition &cond,
@@ -1594,7 +1590,18 @@ void Mutex::Await(const Condition &cond) {
}
bool Mutex::AwaitWithTimeout(const Condition &cond, absl::Duration timeout) {
- return AwaitWithDeadline(cond, DeadlineFromTimeout(timeout));
+ if (cond.Eval()) { // condition already true; nothing to do
+ if (kDebugMode) {
+ this->AssertReaderHeld();
+ }
+ return true;
+ }
+
+ KernelTimeout t{timeout};
+ bool res = this->AwaitCommon(cond, t);
+ ABSL_RAW_CHECK(res || t.has_timeout(),
+ "condition untrue on return from Await");
+ return res;
}
bool Mutex::AwaitWithDeadline(const Condition &cond, absl::Time deadline) {
@@ -2660,7 +2667,7 @@ bool CondVar::WaitCommon(Mutex *mutex, KernelTimeout t) {
}
bool CondVar::WaitWithTimeout(Mutex *mu, absl::Duration timeout) {
- return WaitWithDeadline(mu, DeadlineFromTimeout(timeout));
+ return WaitCommon(mu, KernelTimeout(timeout));
}
bool CondVar::WaitWithDeadline(Mutex *mu, absl::Time deadline) {