diff options
Diffstat (limited to 'absl/synchronization')
-rw-r--r-- | absl/synchronization/internal/futex.h | 70 | ||||
-rw-r--r-- | absl/synchronization/internal/kernel_timeout.cc | 41 | ||||
-rw-r--r-- | absl/synchronization/internal/kernel_timeout.h | 15 | ||||
-rw-r--r-- | absl/synchronization/internal/kernel_timeout_test.cc | 38 | ||||
-rw-r--r-- | absl/synchronization/internal/waiter.cc | 106 | ||||
-rw-r--r-- | absl/synchronization/internal/waiter.h | 3 | ||||
-rw-r--r-- | absl/synchronization/mutex.cc | 45 |
7 files changed, 271 insertions, 47 deletions
diff --git a/absl/synchronization/internal/futex.h b/absl/synchronization/internal/futex.h index 9cf9841d..62bb40f7 100644 --- a/absl/synchronization/internal/futex.h +++ b/absl/synchronization/internal/futex.h @@ -16,9 +16,7 @@ #include "absl/base/config.h" -#ifdef _WIN32 -#include <windows.h> -#else +#ifndef _WIN32 #include <sys/time.h> #include <unistd.h> #endif @@ -85,34 +83,60 @@ namespace synchronization_internal { class FutexImpl { public: - static int WaitUntil(std::atomic<int32_t> *v, int32_t val, + // Atomically check that `*v == val`, and if it is, then sleep until the + // timeout `t` has been reached, or until woken by `Wake()`. + static int WaitUntil(std::atomic<int32_t>* v, int32_t val, KernelTimeout t) { - long err = 0; // NOLINT(runtime/int) - if (t.has_timeout()) { - // https://locklessinc.com/articles/futex_cheat_sheet/ - // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time. - struct timespec abs_timeout = t.MakeAbsTimespec(); - // Atomically check that the futex value is still 0, and if it - // is, sleep until abs_timeout or until woken by FUTEX_WAKE. - err = syscall( - SYS_futex, reinterpret_cast<int32_t *>(v), - FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val, - &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY); + if (!t.has_timeout()) { + return Wait(v, val); + } else if (t.is_absolute_timeout()) { + auto abs_timespec = t.MakeAbsTimespec(); + return WaitAbsoluteTimeout(v, val, &abs_timespec); } else { - // Atomically check that the futex value is still 0, and if it - // is, sleep until woken by FUTEX_WAKE. - err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v), - FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr); + auto rel_timespec = t.MakeRelativeTimespec(); + return WaitRelativeTimeout(v, val, &rel_timespec); } - if (ABSL_PREDICT_FALSE(err != 0)) { + } + + // Atomically check that `*v == val`, and if it is, then sleep until the until + // woken by `Wake()`. + static int Wait(std::atomic<int32_t>* v, int32_t val) { + return WaitAbsoluteTimeout(v, val, nullptr); + } + + // Atomically check that `*v == val`, and if it is, then sleep until + // CLOCK_REALTIME reaches `*abs_timeout`, or until woken by `Wake()`. + static int WaitAbsoluteTimeout(std::atomic<int32_t>* v, int32_t val, + const struct timespec* abs_timeout) { + // https://locklessinc.com/articles/futex_cheat_sheet/ + // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time. + auto err = + syscall(SYS_futex, reinterpret_cast<int32_t*>(v), + FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, + val, abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY); + if (err != 0) { + return -errno; + } + return 0; + } + + // Atomically check that `*v == val`, and if it is, then sleep until + // `*rel_timeout` has elapsed, or until woken by `Wake()`. + static int WaitRelativeTimeout(std::atomic<int32_t>* v, int32_t val, + const struct timespec* rel_timeout) { + // Atomically check that the futex value is still 0, and if it + // is, sleep until abs_timeout or until woken by FUTEX_WAKE. + auto err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v), + FUTEX_PRIVATE_FLAG, val, rel_timeout); + if (err != 0) { return -errno; } return 0; } - static int Wake(std::atomic<int32_t> *v, int32_t count) { - // NOLINTNEXTLINE(runtime/int) - long err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v), + // Wakes at most `count` waiters that have entered the sleep state on `v`. + static int Wake(std::atomic<int32_t>* v, int32_t count) { + auto err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v), FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count); if (ABSL_PREDICT_FALSE(err < 0)) { return -errno; diff --git a/absl/synchronization/internal/kernel_timeout.cc b/absl/synchronization/internal/kernel_timeout.cc index 8d9e7d74..548a8fc6 100644 --- a/absl/synchronization/internal/kernel_timeout.cc +++ b/absl/synchronization/internal/kernel_timeout.cc @@ -15,6 +15,7 @@ #include "absl/synchronization/internal/kernel_timeout.h" #include <algorithm> +#include <chrono> // NOLINT(build/c++11) #include <cstdint> #include <ctime> #include <limits> @@ -163,6 +164,46 @@ KernelTimeout::DWord KernelTimeout::InMillisecondsFromNow() const { return DWord{0}; } +std::chrono::time_point<std::chrono::system_clock> +KernelTimeout::ToChronoTimePoint() const { + if (!has_timeout()) { + return std::chrono::time_point<std::chrono::system_clock>::max(); + } + + // The cast to std::microseconds is because (on some platforms) the + // std::ratio used by std::chrono::steady_clock doesn't convert to + // std::nanoseconds, so it doesn't compile. + auto micros = std::chrono::duration_cast<std::chrono::microseconds>( + std::chrono::nanoseconds(RawNanos())); + if (is_relative_timeout()) { + auto now = std::chrono::system_clock::now(); + if (micros > + std::chrono::time_point<std::chrono::system_clock>::max() - now) { + // Overflow. + return std::chrono::time_point<std::chrono::system_clock>::max(); + } + return now + micros; + } + return std::chrono::system_clock::from_time_t(0) + micros; +} + +std::chrono::nanoseconds KernelTimeout::ToChronoDuration() const { + if (!has_timeout()) { + return std::chrono::nanoseconds::max(); + } + if (is_absolute_timeout()) { + auto d = std::chrono::duration_cast<std::chrono::nanoseconds>( + std::chrono::nanoseconds(RawNanos()) - + (std::chrono::system_clock::now() - + std::chrono::system_clock::from_time_t(0))); + if (d < std::chrono::nanoseconds(0)) { + d = std::chrono::nanoseconds(0); + } + return d; + } + return std::chrono::nanoseconds(RawNanos()); +} + } // namespace synchronization_internal ABSL_NAMESPACE_END } // namespace absl diff --git a/absl/synchronization/internal/kernel_timeout.h b/absl/synchronization/internal/kernel_timeout.h index 1f4d82cd..f7c40337 100644 --- a/absl/synchronization/internal/kernel_timeout.h +++ b/absl/synchronization/internal/kernel_timeout.h @@ -16,6 +16,7 @@ #define ABSL_SYNCHRONIZATION_INTERNAL_KERNEL_TIMEOUT_H_ #include <algorithm> +#include <chrono> // NOLINT(build/c++11) #include <cstdint> #include <ctime> #include <limits> @@ -95,6 +96,20 @@ class KernelTimeout { typedef unsigned long DWord; // NOLINT DWord InMillisecondsFromNow() const; + // Convert to std::chrono::time_point for interfaces that expect an absolute + // timeout, like std::condition_variable::wait_until(). If !has_timeout() or + // is_relative_timeout(), attempts to convert to a reasonable absolute + // timeout, but callers should test has_timeout() and is_relative_timeout() + // and prefer to use a more appropriate interface. + std::chrono::time_point<std::chrono::system_clock> ToChronoTimePoint() const; + + // Convert to std::chrono::time_point for interfaces that expect a relative + // timeout, like std::condition_variable::wait_for(). If !has_timeout() or + // is_absolute_timeout(), attempts to convert to a reasonable relative + // timeout, but callers should test has_timeout() and is_absolute_timeout() + // and prefer to use a more appropriate interface. + std::chrono::nanoseconds ToChronoDuration() const; + private: // Internal representation. // - If the value is kNoTimeout, then the timeout is infinite, and diff --git a/absl/synchronization/internal/kernel_timeout_test.cc b/absl/synchronization/internal/kernel_timeout_test.cc index a89ae220..431ffcf4 100644 --- a/absl/synchronization/internal/kernel_timeout_test.cc +++ b/absl/synchronization/internal/kernel_timeout_test.cc @@ -14,6 +14,7 @@ #include "absl/synchronization/internal/kernel_timeout.h" +#include <chrono> // NOLINT(build/c++11) #include <limits> #include "gtest/gtest.h" @@ -72,6 +73,11 @@ TEST(KernelTimeout, FiniteTimes) { EXPECT_LE(absl::AbsDuration(absl::Milliseconds(t.InMillisecondsFromNow()) - std::max(duration, absl::ZeroDuration())), absl::Milliseconds(5)); + EXPECT_LE(absl::AbsDuration(absl::FromChrono(t.ToChronoTimePoint()) - when), + absl::Microseconds(1)); + EXPECT_LE(absl::AbsDuration(absl::FromChrono(t.ToChronoDuration()) - + std::max(duration, absl::ZeroDuration())), + kTimingBound); } } @@ -90,6 +96,9 @@ TEST(KernelTimeout, InfiniteFuture) { absl::Now() + absl::Hours(100000)); EXPECT_EQ(t.InMillisecondsFromNow(), std::numeric_limits<KernelTimeout::DWord>::max()); + EXPECT_EQ(t.ToChronoTimePoint(), + std::chrono::time_point<std::chrono::system_clock>::max()); + EXPECT_GE(t.ToChronoDuration(), std::chrono::nanoseconds::max()); } TEST(KernelTimeout, DefaultConstructor) { @@ -108,6 +117,9 @@ TEST(KernelTimeout, DefaultConstructor) { absl::Now() + absl::Hours(100000)); EXPECT_EQ(t.InMillisecondsFromNow(), std::numeric_limits<KernelTimeout::DWord>::max()); + EXPECT_EQ(t.ToChronoTimePoint(), + std::chrono::time_point<std::chrono::system_clock>::max()); + EXPECT_GE(t.ToChronoDuration(), std::chrono::nanoseconds::max()); } TEST(KernelTimeout, TimeMaxNanos) { @@ -126,6 +138,9 @@ TEST(KernelTimeout, TimeMaxNanos) { absl::Now() + absl::Hours(100000)); EXPECT_EQ(t.InMillisecondsFromNow(), std::numeric_limits<KernelTimeout::DWord>::max()); + EXPECT_EQ(t.ToChronoTimePoint(), + std::chrono::time_point<std::chrono::system_clock>::max()); + EXPECT_GE(t.ToChronoDuration(), std::chrono::nanoseconds::max()); } TEST(KernelTimeout, Never) { @@ -144,6 +159,9 @@ TEST(KernelTimeout, Never) { absl::Now() + absl::Hours(100000)); EXPECT_EQ(t.InMillisecondsFromNow(), std::numeric_limits<KernelTimeout::DWord>::max()); + EXPECT_EQ(t.ToChronoTimePoint(), + std::chrono::time_point<std::chrono::system_clock>::max()); + EXPECT_GE(t.ToChronoDuration(), std::chrono::nanoseconds::max()); } TEST(KernelTimeout, InfinitePast) { @@ -157,6 +175,9 @@ TEST(KernelTimeout, InfinitePast) { absl::ZeroDuration()); EXPECT_LE(absl::FromUnixNanos(t.MakeAbsNanos()), absl::FromUnixNanos(1)); EXPECT_EQ(t.InMillisecondsFromNow(), KernelTimeout::DWord{0}); + EXPECT_LT(t.ToChronoTimePoint(), std::chrono::system_clock::from_time_t(0) + + std::chrono::seconds(1)); + EXPECT_EQ(t.ToChronoDuration(), std::chrono::nanoseconds(0)); } TEST(KernelTimeout, FiniteDurations) { @@ -186,6 +207,10 @@ TEST(KernelTimeout, FiniteDurations) { absl::Milliseconds(5)); EXPECT_LE(absl::Milliseconds(t.InMillisecondsFromNow()) - duration, absl::Milliseconds(5)); + EXPECT_LE(absl::AbsDuration(absl::Now() + duration - + absl::FromChrono(t.ToChronoTimePoint())), + kTimingBound); + EXPECT_EQ(absl::FromChrono(t.ToChronoDuration()), duration); } } @@ -218,6 +243,10 @@ TEST(KernelTimeout, NegativeDurations) { absl::AbsDuration(absl::Now() - absl::FromUnixNanos(t.MakeAbsNanos())), absl::Milliseconds(5)); EXPECT_EQ(t.InMillisecondsFromNow(), KernelTimeout::DWord{0}); + EXPECT_LE(absl::AbsDuration(absl::Now() - + absl::FromChrono(t.ToChronoTimePoint())), + absl::Milliseconds(5)); + EXPECT_EQ(t.ToChronoDuration(), std::chrono::nanoseconds(0)); } } @@ -236,6 +265,9 @@ TEST(KernelTimeout, InfiniteDuration) { absl::Now() + absl::Hours(100000)); EXPECT_EQ(t.InMillisecondsFromNow(), std::numeric_limits<KernelTimeout::DWord>::max()); + EXPECT_EQ(t.ToChronoTimePoint(), + std::chrono::time_point<std::chrono::system_clock>::max()); + EXPECT_GE(t.ToChronoDuration(), std::chrono::nanoseconds::max()); } TEST(KernelTimeout, DurationMaxNanos) { @@ -254,6 +286,9 @@ TEST(KernelTimeout, DurationMaxNanos) { absl::Now() + absl::Hours(100000)); EXPECT_EQ(t.InMillisecondsFromNow(), std::numeric_limits<KernelTimeout::DWord>::max()); + EXPECT_EQ(t.ToChronoTimePoint(), + std::chrono::time_point<std::chrono::system_clock>::max()); + EXPECT_GE(t.ToChronoDuration(), std::chrono::nanoseconds::max()); } TEST(KernelTimeout, OverflowNanos) { @@ -273,6 +308,9 @@ TEST(KernelTimeout, OverflowNanos) { absl::Now() + absl::Hours(100000)); EXPECT_LE(absl::Milliseconds(t.InMillisecondsFromNow()) - duration, absl::Milliseconds(5)); + EXPECT_GT(t.ToChronoTimePoint(), + std::chrono::system_clock::now() + std::chrono::hours(100000)); + EXPECT_GT(t.ToChronoDuration(), std::chrono::hours(100000)); } } // namespace diff --git a/absl/synchronization/internal/waiter.cc b/absl/synchronization/internal/waiter.cc index f2051d67..4eee1298 100644 --- a/absl/synchronization/internal/waiter.cc +++ b/absl/synchronization/internal/waiter.cc @@ -67,11 +67,9 @@ static void MaybeBecomeIdle() { #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX -Waiter::Waiter() { - futex_.store(0, std::memory_order_relaxed); -} +Waiter::Waiter() : futex_(0) {} -bool Waiter::Wait(KernelTimeout t) { +bool Waiter::WaitAbsoluteTimeout(KernelTimeout t) { // Loop until we can atomically decrement futex from a positive // value, waiting on a futex while we believe it is zero. // Note that, since the thread ticker is just reset, we don't need to check @@ -90,7 +88,88 @@ bool Waiter::Wait(KernelTimeout t) { } if (!first_pass) MaybeBecomeIdle(); - const int err = Futex::WaitUntil(&futex_, 0, t); + auto abs_timeout = t.MakeAbsTimespec(); + const int err = Futex::WaitAbsoluteTimeout(&futex_, 0, &abs_timeout); + if (err != 0) { + if (err == -EINTR || err == -EWOULDBLOCK) { + // Do nothing, the loop will retry. + } else if (err == -ETIMEDOUT) { + return false; + } else { + ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err); + } + } + first_pass = false; + } +} + +#ifdef CLOCK_MONOTONIC + +// Subtracts the timespec `sub` from `in` if the result would not be negative, +// and returns true. Returns false if the result would be negative, and leaves +// `in` unchanged. +static bool TimespecSubtract(struct timespec& in, const struct timespec& sub) { + if (in.tv_sec < sub.tv_sec) { + return false; + } + if (in.tv_nsec < sub.tv_nsec) { + if (in.tv_sec == sub.tv_sec) { + return false; + } + // Borrow from tv_sec. + in.tv_sec -= 1; + in.tv_nsec += 1'000'000'000; + } + in.tv_sec -= sub.tv_sec; + in.tv_nsec -= sub.tv_nsec; + return true; +} + +// On some platforms a background thread periodically calls `Poke()` to briefly +// wake waiter threads so that they may call `MaybeBecomeIdle()`. This means +// that `WaitRelativeTimeout()` differs slightly from `WaitAbsoluteTimeout()` +// because it must adjust the timeout by the amount of time that it has already +// slept. +bool Waiter::WaitRelativeTimeout(KernelTimeout t) { + struct timespec start; + ABSL_RAW_CHECK(clock_gettime(CLOCK_MONOTONIC, &start) == 0, + "clock_gettime() failed"); + + // Loop until we can atomically decrement futex from a positive + // value, waiting on a futex while we believe it is zero. + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; + + while (true) { + int32_t x = futex_.load(std::memory_order_relaxed); + while (x != 0) { + if (!futex_.compare_exchange_weak(x, x - 1, + std::memory_order_acquire, + std::memory_order_relaxed)) { + continue; // Raced with someone, retry. + } + return true; // Consumed a wakeup, we are done. + } + + auto relative_timeout = t.MakeRelativeTimespec(); + if (!first_pass) { + MaybeBecomeIdle(); + + // Adjust relative_timeout for `Poke()`s. + struct timespec now; + ABSL_RAW_CHECK(clock_gettime(CLOCK_MONOTONIC, &now) == 0, + "clock_gettime() failed"); + // If TimespecSubstract(now, start) returns false, then the clock isn't + // truly monotonic. + if (TimespecSubtract(now, start)) { + if (!TimespecSubtract(relative_timeout, now)) { + return false; // Timeout. + } + } + } + + const int err = Futex::WaitRelativeTimeout(&futex_, 0, &relative_timeout); if (err != 0) { if (err == -EINTR || err == -EWOULDBLOCK) { // Do nothing, the loop will retry. @@ -104,6 +183,23 @@ bool Waiter::Wait(KernelTimeout t) { } } +#else // CLOCK_MONOTONIC + +// No support for CLOCK_MONOTONIC. +// KernelTimeout will automatically convert to an absolute timeout. +bool Waiter::WaitRelativeTimeout(KernelTimeout t) { + return WaitAbsoluteTimeout(t); +} + +#endif // CLOCK_MONOTONIC + +bool Waiter::Wait(KernelTimeout t) { + if (t.is_absolute_timeout()) { + return WaitAbsoluteTimeout(t); + } + return WaitRelativeTimeout(t); +} + void Waiter::Post() { if (futex_.fetch_add(1, std::memory_order_release) == 0) { // We incremented from 0, need to wake a potential waiter. diff --git a/absl/synchronization/internal/waiter.h b/absl/synchronization/internal/waiter.h index b8adfeb5..c206cc3f 100644 --- a/absl/synchronization/internal/waiter.h +++ b/absl/synchronization/internal/waiter.h @@ -110,6 +110,9 @@ class Waiter { ~Waiter() = delete; #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX + bool WaitAbsoluteTimeout(KernelTimeout t); + bool WaitRelativeTimeout(KernelTimeout t); + // Futexes are defined by specification to be 32-bits. // Thus std::atomic<int32_t> must be just an int32_t with lockfree methods. std::atomic<int32_t> futex_; diff --git a/absl/synchronization/mutex.cc b/absl/synchronization/mutex.cc index 064ccb74..f6a8506c 100644 --- a/absl/synchronization/mutex.cc +++ b/absl/synchronization/mutex.cc @@ -635,21 +635,6 @@ void Mutex::InternalAttemptToUseMutexInFatalSignalHandler() { std::memory_order_release); } -// --------------------------time support - -// Return the current time plus the timeout. Use the same clock as -// PerThreadSem::Wait() for consistency. Unfortunately, we don't have -// such a choice when a deadline is given directly. -static absl::Time DeadlineFromTimeout(absl::Duration timeout) { -#ifndef _WIN32 - struct timeval tv; - gettimeofday(&tv, nullptr); - return absl::TimeFromTimeval(tv) + timeout; -#else - return absl::Now() + timeout; -#endif -} - // --------------------------Mutexes // In the layout below, the msb of the bottom byte is currently unused. Also, @@ -1546,7 +1531,13 @@ void Mutex::LockWhen(const Condition &cond) { } bool Mutex::LockWhenWithTimeout(const Condition &cond, absl::Duration timeout) { - return LockWhenWithDeadline(cond, DeadlineFromTimeout(timeout)); + ABSL_TSAN_MUTEX_PRE_LOCK(this, 0); + GraphId id = DebugOnlyDeadlockCheck(this); + bool res = LockSlowWithDeadline(kExclusive, &cond, + KernelTimeout(timeout), 0); + DebugOnlyLockEnter(this, id); + ABSL_TSAN_MUTEX_POST_LOCK(this, 0, 0); + return res; } bool Mutex::LockWhenWithDeadline(const Condition &cond, absl::Time deadline) { @@ -1569,7 +1560,12 @@ void Mutex::ReaderLockWhen(const Condition &cond) { bool Mutex::ReaderLockWhenWithTimeout(const Condition &cond, absl::Duration timeout) { - return ReaderLockWhenWithDeadline(cond, DeadlineFromTimeout(timeout)); + ABSL_TSAN_MUTEX_PRE_LOCK(this, __tsan_mutex_read_lock); + GraphId id = DebugOnlyDeadlockCheck(this); + bool res = LockSlowWithDeadline(kShared, &cond, KernelTimeout(timeout), 0); + DebugOnlyLockEnter(this, id); + ABSL_TSAN_MUTEX_POST_LOCK(this, __tsan_mutex_read_lock, 0); + return res; } bool Mutex::ReaderLockWhenWithDeadline(const Condition &cond, @@ -1594,7 +1590,18 @@ void Mutex::Await(const Condition &cond) { } bool Mutex::AwaitWithTimeout(const Condition &cond, absl::Duration timeout) { - return AwaitWithDeadline(cond, DeadlineFromTimeout(timeout)); + if (cond.Eval()) { // condition already true; nothing to do + if (kDebugMode) { + this->AssertReaderHeld(); + } + return true; + } + + KernelTimeout t{timeout}; + bool res = this->AwaitCommon(cond, t); + ABSL_RAW_CHECK(res || t.has_timeout(), + "condition untrue on return from Await"); + return res; } bool Mutex::AwaitWithDeadline(const Condition &cond, absl::Time deadline) { @@ -2660,7 +2667,7 @@ bool CondVar::WaitCommon(Mutex *mutex, KernelTimeout t) { } bool CondVar::WaitWithTimeout(Mutex *mu, absl::Duration timeout) { - return WaitWithDeadline(mu, DeadlineFromTimeout(timeout)); + return WaitCommon(mu, KernelTimeout(timeout)); } bool CondVar::WaitWithDeadline(Mutex *mu, absl::Time deadline) { |