diff options
Diffstat (limited to 'absl/synchronization/internal')
24 files changed, 2149 insertions, 710 deletions
diff --git a/absl/synchronization/internal/create_thread_identity.cc b/absl/synchronization/internal/create_thread_identity.cc index 44e6129b..eacaa28d 100644 --- a/absl/synchronization/internal/create_thread_identity.cc +++ b/absl/synchronization/internal/create_thread_identity.cc @@ -13,10 +13,12 @@ // limitations under the License. #include <stdint.h> + #include <new> // This file is a no-op if the required LowLevelAlloc support is missing. #include "absl/base/internal/low_level_alloc.h" +#include "absl/synchronization/internal/waiter.h" #ifndef ABSL_LOW_LEVEL_ALLOC_MISSING #include <string.h> @@ -71,6 +73,9 @@ static intptr_t RoundUp(intptr_t addr, intptr_t align) { void OneTimeInitThreadIdentity(base_internal::ThreadIdentity* identity) { PerThreadSem::Init(identity); + identity->ticker.store(0, std::memory_order_relaxed); + identity->wait_start.store(0, std::memory_order_relaxed); + identity->is_idle.store(false, std::memory_order_relaxed); } static void ResetThreadIdentityBetweenReuse( diff --git a/absl/synchronization/internal/futex.h b/absl/synchronization/internal/futex.h index cb97da09..573c01b7 100644 --- a/absl/synchronization/internal/futex.h +++ b/absl/synchronization/internal/futex.h @@ -16,9 +16,7 @@ #include "absl/base/config.h" -#ifdef _WIN32 -#include <windows.h> -#else +#ifndef _WIN32 #include <sys/time.h> #include <unistd.h> #endif @@ -34,6 +32,7 @@ #include <atomic> #include <cstdint> +#include <limits> #include "absl/base/optimization.h" #include "absl/synchronization/internal/kernel_timeout.h" @@ -81,51 +80,64 @@ namespace synchronization_internal { #if defined(SYS_futex_time64) && !defined(SYS_futex) #define SYS_futex SYS_futex_time64 +using FutexTimespec = struct timespec; +#else +// Some libc implementations have switched to an unconditional 64-bit `time_t` +// definition. This means that `struct timespec` may not match the layout +// expected by the kernel ABI on 32-bit platforms. So we define the +// FutexTimespec that matches the kernel timespec definition. It should be safe +// to use this struct for 64-bit userspace builds too, since it will use another +// SYS_futex kernel call with 64-bit tv_sec inside timespec. +struct FutexTimespec { + long tv_sec; // NOLINT + long tv_nsec; // NOLINT +}; #endif class FutexImpl { public: - static int WaitUntil(std::atomic<int32_t> *v, int32_t val, - KernelTimeout t) { - long err = 0; // NOLINT(runtime/int) - if (t.has_timeout()) { - // https://locklessinc.com/articles/futex_cheat_sheet/ - // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time. - struct timespec abs_timeout = t.MakeAbsTimespec(); - // Atomically check that the futex value is still 0, and if it - // is, sleep until abs_timeout or until woken by FUTEX_WAKE. - err = syscall( - SYS_futex, reinterpret_cast<int32_t *>(v), - FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val, - &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY); - } else { - // Atomically check that the futex value is still 0, and if it - // is, sleep until woken by FUTEX_WAKE. - err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v), - FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr); - } - if (ABSL_PREDICT_FALSE(err != 0)) { + // Atomically check that `*v == val`, and if it is, then sleep until the until + // woken by `Wake()`. + static int Wait(std::atomic<int32_t>* v, int32_t val) { + return WaitAbsoluteTimeout(v, val, nullptr); + } + + // Atomically check that `*v == val`, and if it is, then sleep until + // CLOCK_REALTIME reaches `*abs_timeout`, or until woken by `Wake()`. + static int WaitAbsoluteTimeout(std::atomic<int32_t>* v, int32_t val, + const struct timespec* abs_timeout) { + FutexTimespec ts; + // https://locklessinc.com/articles/futex_cheat_sheet/ + // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time. + auto err = syscall( + SYS_futex, reinterpret_cast<int32_t*>(v), + FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val, + ToFutexTimespec(abs_timeout, &ts), nullptr, FUTEX_BITSET_MATCH_ANY); + if (err != 0) { return -errno; } return 0; } - static int WaitBitsetAbsoluteTimeout(std::atomic<int32_t> *v, int32_t val, - int32_t bits, - const struct timespec *abstime) { - // NOLINTNEXTLINE(runtime/int) - long err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v), - FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG, val, abstime, - nullptr, bits); - if (ABSL_PREDICT_FALSE(err != 0)) { + // Atomically check that `*v == val`, and if it is, then sleep until + // `*rel_timeout` has elapsed, or until woken by `Wake()`. + static int WaitRelativeTimeout(std::atomic<int32_t>* v, int32_t val, + const struct timespec* rel_timeout) { + FutexTimespec ts; + // Atomically check that the futex value is still 0, and if it + // is, sleep until abs_timeout or until woken by FUTEX_WAKE. + auto err = + syscall(SYS_futex, reinterpret_cast<int32_t*>(v), FUTEX_PRIVATE_FLAG, + val, ToFutexTimespec(rel_timeout, &ts)); + if (err != 0) { return -errno; } return 0; } - static int Wake(std::atomic<int32_t> *v, int32_t count) { - // NOLINTNEXTLINE(runtime/int) - long err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v), + // Wakes at most `count` waiters that have entered the sleep state on `v`. + static int Wake(std::atomic<int32_t>* v, int32_t count) { + auto err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v), FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count); if (ABSL_PREDICT_FALSE(err < 0)) { return -errno; @@ -133,16 +145,24 @@ class FutexImpl { return 0; } - // FUTEX_WAKE_BITSET - static int WakeBitset(std::atomic<int32_t> *v, int32_t count, int32_t bits) { - // NOLINTNEXTLINE(runtime/int) - long err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v), - FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG, count, nullptr, - nullptr, bits); - if (ABSL_PREDICT_FALSE(err < 0)) { - return -errno; + private: + static FutexTimespec* ToFutexTimespec(const struct timespec* userspace_ts, + FutexTimespec* futex_ts) { + if (userspace_ts == nullptr) { + return nullptr; } - return 0; + + using FutexSeconds = decltype(futex_ts->tv_sec); + using FutexNanoseconds = decltype(futex_ts->tv_nsec); + + constexpr auto kMaxSeconds{(std::numeric_limits<FutexSeconds>::max)()}; + if (userspace_ts->tv_sec > kMaxSeconds) { + futex_ts->tv_sec = kMaxSeconds; + } else { + futex_ts->tv_sec = static_cast<FutexSeconds>(userspace_ts->tv_sec); + } + futex_ts->tv_nsec = static_cast<FutexNanoseconds>(userspace_ts->tv_nsec); + return futex_ts; } }; diff --git a/absl/synchronization/internal/futex_waiter.cc b/absl/synchronization/internal/futex_waiter.cc new file mode 100644 index 00000000..87eb3b23 --- /dev/null +++ b/absl/synchronization/internal/futex_waiter.cc @@ -0,0 +1,111 @@ +// Copyright 2023 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "absl/synchronization/internal/futex_waiter.h" + +#ifdef ABSL_INTERNAL_HAVE_FUTEX_WAITER + +#include <atomic> +#include <cstdint> +#include <cerrno> + +#include "absl/base/config.h" +#include "absl/base/internal/raw_logging.h" +#include "absl/base/internal/thread_identity.h" +#include "absl/base/optimization.h" +#include "absl/synchronization/internal/kernel_timeout.h" +#include "absl/synchronization/internal/futex.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +#ifdef ABSL_INTERNAL_NEED_REDUNDANT_CONSTEXPR_DECL +constexpr char FutexWaiter::kName[]; +#endif + +int FutexWaiter::WaitUntil(std::atomic<int32_t>* v, int32_t val, + KernelTimeout t) { +#ifdef CLOCK_MONOTONIC + constexpr bool kHasClockMonotonic = true; +#else + constexpr bool kHasClockMonotonic = false; +#endif + + // We can't call Futex::WaitUntil() here because the prodkernel implementation + // does not know about KernelTimeout::SupportsSteadyClock(). + if (!t.has_timeout()) { + return Futex::Wait(v, val); + } else if (kHasClockMonotonic && KernelTimeout::SupportsSteadyClock() && + t.is_relative_timeout()) { + auto rel_timespec = t.MakeRelativeTimespec(); + return Futex::WaitRelativeTimeout(v, val, &rel_timespec); + } else { + auto abs_timespec = t.MakeAbsTimespec(); + return Futex::WaitAbsoluteTimeout(v, val, &abs_timespec); + } +} + +bool FutexWaiter::Wait(KernelTimeout t) { + // Loop until we can atomically decrement futex from a positive + // value, waiting on a futex while we believe it is zero. + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; + while (true) { + int32_t x = futex_.load(std::memory_order_relaxed); + while (x != 0) { + if (!futex_.compare_exchange_weak(x, x - 1, + std::memory_order_acquire, + std::memory_order_relaxed)) { + continue; // Raced with someone, retry. + } + return true; // Consumed a wakeup, we are done. + } + + if (!first_pass) MaybeBecomeIdle(); + const int err = WaitUntil(&futex_, 0, t); + if (err != 0) { + if (err == -EINTR || err == -EWOULDBLOCK) { + // Do nothing, the loop will retry. + } else if (err == -ETIMEDOUT) { + return false; + } else { + ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err); + } + } + first_pass = false; + } +} + +void FutexWaiter::Post() { + if (futex_.fetch_add(1, std::memory_order_release) == 0) { + // We incremented from 0, need to wake a potential waiter. + Poke(); + } +} + +void FutexWaiter::Poke() { + // Wake one thread waiting on the futex. + const int err = Futex::Wake(&futex_, 1); + if (ABSL_PREDICT_FALSE(err < 0)) { + ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err); + } +} + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl + +#endif // ABSL_INTERNAL_HAVE_FUTEX_WAITER diff --git a/absl/synchronization/internal/futex_waiter.h b/absl/synchronization/internal/futex_waiter.h new file mode 100644 index 00000000..11dfa93b --- /dev/null +++ b/absl/synchronization/internal/futex_waiter.h @@ -0,0 +1,63 @@ +// Copyright 2023 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef ABSL_SYNCHRONIZATION_INTERNAL_FUTEX_WAITER_H_ +#define ABSL_SYNCHRONIZATION_INTERNAL_FUTEX_WAITER_H_ + +#include <atomic> +#include <cstdint> + +#include "absl/base/config.h" +#include "absl/synchronization/internal/kernel_timeout.h" +#include "absl/synchronization/internal/futex.h" +#include "absl/synchronization/internal/waiter_base.h" + +#ifdef ABSL_INTERNAL_HAVE_FUTEX + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +#define ABSL_INTERNAL_HAVE_FUTEX_WAITER 1 + +class FutexWaiter : public WaiterCrtp<FutexWaiter> { + public: + FutexWaiter() : futex_(0) {} + + bool Wait(KernelTimeout t); + void Post(); + void Poke(); + + static constexpr char kName[] = "FutexWaiter"; + + private: + // Atomically check that `*v == val`, and if it is, then sleep until the + // timeout `t` has been reached, or until woken by `Wake()`. + static int WaitUntil(std::atomic<int32_t>* v, int32_t val, + KernelTimeout t); + + // Futexes are defined by specification to be 32-bits. + // Thus std::atomic<int32_t> must be just an int32_t with lockfree methods. + std::atomic<int32_t> futex_; + static_assert(sizeof(int32_t) == sizeof(futex_), "Wrong size for futex"); +}; + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl + +#endif // ABSL_INTERNAL_HAVE_FUTEX + +#endif // ABSL_SYNCHRONIZATION_INTERNAL_FUTEX_WAITER_H_ diff --git a/absl/synchronization/internal/graphcycles.cc b/absl/synchronization/internal/graphcycles.cc index feec4581..39b18482 100644 --- a/absl/synchronization/internal/graphcycles.cc +++ b/absl/synchronization/internal/graphcycles.cc @@ -37,6 +37,7 @@ #include <algorithm> #include <array> +#include <cinttypes> #include <limits> #include "absl/base/internal/hide_ptr.h" #include "absl/base/internal/raw_logging.h" @@ -114,7 +115,7 @@ class Vec { if (src->ptr_ == src->space_) { // Need to actually copy resize(src->size_); - std::copy(src->ptr_, src->ptr_ + src->size_, ptr_); + std::copy_n(src->ptr_, src->size_, ptr_); src->size_ = 0; } else { Discard(); @@ -148,7 +149,7 @@ class Vec { size_t request = static_cast<size_t>(capacity_) * sizeof(T); T* copy = static_cast<T*>( base_internal::LowLevelAlloc::AllocWithArena(request, arena)); - std::copy(ptr_, ptr_ + size_, copy); + std::copy_n(ptr_, size_, copy); Discard(); ptr_ = copy; } @@ -386,19 +387,22 @@ bool GraphCycles::CheckInvariants() const { Node* nx = r->nodes_[x]; void* ptr = base_internal::UnhidePtr<void>(nx->masked_ptr); if (ptr != nullptr && static_cast<uint32_t>(r->ptrmap_.Find(ptr)) != x) { - ABSL_RAW_LOG(FATAL, "Did not find live node in hash table %u %p", x, ptr); + ABSL_RAW_LOG(FATAL, "Did not find live node in hash table %" PRIu32 " %p", + x, ptr); } if (nx->visited) { - ABSL_RAW_LOG(FATAL, "Did not clear visited marker on node %u", x); + ABSL_RAW_LOG(FATAL, "Did not clear visited marker on node %" PRIu32, x); } if (!ranks.insert(nx->rank)) { - ABSL_RAW_LOG(FATAL, "Duplicate occurrence of rank %d", nx->rank); + ABSL_RAW_LOG(FATAL, "Duplicate occurrence of rank %" PRId32, nx->rank); } HASH_FOR_EACH(y, nx->out) { Node* ny = r->nodes_[static_cast<uint32_t>(y)]; if (nx->rank >= ny->rank) { - ABSL_RAW_LOG(FATAL, "Edge %u->%d has bad rank assignment %d->%d", x, y, - nx->rank, ny->rank); + ABSL_RAW_LOG(FATAL, + "Edge %" PRIu32 " ->%" PRId32 + " has bad rank assignment %" PRId32 "->%" PRId32, + x, y, nx->rank, ny->rank); } } } diff --git a/absl/synchronization/internal/graphcycles_test.cc b/absl/synchronization/internal/graphcycles_test.cc index 74eaffe7..3c6ef798 100644 --- a/absl/synchronization/internal/graphcycles_test.cc +++ b/absl/synchronization/internal/graphcycles_test.cc @@ -21,8 +21,9 @@ #include <vector> #include "gtest/gtest.h" -#include "absl/base/internal/raw_logging.h" #include "absl/base/macros.h" +#include "absl/log/check.h" +#include "absl/log/log.h" namespace absl { ABSL_NAMESPACE_BEGIN @@ -65,51 +66,51 @@ static bool IsReachable(Edges *edges, int from, int to, } static void PrintEdges(Edges *edges) { - ABSL_RAW_LOG(INFO, "EDGES (%zu)", edges->size()); + LOG(INFO) << "EDGES (" << edges->size() << ")"; for (const auto &edge : *edges) { int a = edge.from; int b = edge.to; - ABSL_RAW_LOG(INFO, "%d %d", a, b); + LOG(INFO) << a << " " << b; } - ABSL_RAW_LOG(INFO, "---"); + LOG(INFO) << "---"; } static void PrintGCEdges(Nodes *nodes, const IdMap &id, GraphCycles *gc) { - ABSL_RAW_LOG(INFO, "GC EDGES"); + LOG(INFO) << "GC EDGES"; for (int a : *nodes) { for (int b : *nodes) { if (gc->HasEdge(Get(id, a), Get(id, b))) { - ABSL_RAW_LOG(INFO, "%d %d", a, b); + LOG(INFO) << a << " " << b; } } } - ABSL_RAW_LOG(INFO, "---"); + LOG(INFO) << "---"; } static void PrintTransitiveClosure(Nodes *nodes, Edges *edges) { - ABSL_RAW_LOG(INFO, "Transitive closure"); + LOG(INFO) << "Transitive closure"; for (int a : *nodes) { for (int b : *nodes) { std::unordered_set<int> seen; if (IsReachable(edges, a, b, &seen)) { - ABSL_RAW_LOG(INFO, "%d %d", a, b); + LOG(INFO) << a << " " << b; } } } - ABSL_RAW_LOG(INFO, "---"); + LOG(INFO) << "---"; } static void PrintGCTransitiveClosure(Nodes *nodes, const IdMap &id, GraphCycles *gc) { - ABSL_RAW_LOG(INFO, "GC Transitive closure"); + LOG(INFO) << "GC Transitive closure"; for (int a : *nodes) { for (int b : *nodes) { if (gc->IsReachable(Get(id, a), Get(id, b))) { - ABSL_RAW_LOG(INFO, "%d %d", a, b); + LOG(INFO) << a << " " << b; } } } - ABSL_RAW_LOG(INFO, "---"); + LOG(INFO) << "---"; } static void CheckTransitiveClosure(Nodes *nodes, Edges *edges, const IdMap &id, @@ -125,9 +126,8 @@ static void CheckTransitiveClosure(Nodes *nodes, Edges *edges, const IdMap &id, PrintGCEdges(nodes, id, gc); PrintTransitiveClosure(nodes, edges); PrintGCTransitiveClosure(nodes, id, gc); - ABSL_RAW_LOG(FATAL, "gc_reachable %s reachable %s a %d b %d", - gc_reachable ? "true" : "false", - reachable ? "true" : "false", a, b); + LOG(FATAL) << "gc_reachable " << gc_reachable << " reachable " + << reachable << " a " << a << " b " << b; } } } @@ -142,7 +142,7 @@ static void CheckEdges(Nodes *nodes, Edges *edges, const IdMap &id, if (!gc->HasEdge(Get(id, a), Get(id, b))) { PrintEdges(edges); PrintGCEdges(nodes, id, gc); - ABSL_RAW_LOG(FATAL, "!gc->HasEdge(%d, %d)", a, b); + LOG(FATAL) << "!gc->HasEdge(" << a << ", " << b << ")"; } } for (const auto &a : *nodes) { @@ -155,13 +155,12 @@ static void CheckEdges(Nodes *nodes, Edges *edges, const IdMap &id, if (count != edges->size()) { PrintEdges(edges); PrintGCEdges(nodes, id, gc); - ABSL_RAW_LOG(FATAL, "edges->size() %zu count %d", edges->size(), count); + LOG(FATAL) << "edges->size() " << edges->size() << " count " << count; } } static void CheckInvariants(const GraphCycles &gc) { - if (ABSL_PREDICT_FALSE(!gc.CheckInvariants())) - ABSL_RAW_LOG(FATAL, "CheckInvariants"); + CHECK(gc.CheckInvariants()) << "CheckInvariants"; } // Returns the index of a randomly chosen node in *nodes. @@ -309,7 +308,7 @@ TEST(GraphCycles, RandomizedTest) { break; default: - ABSL_RAW_LOG(FATAL, "op %d", op); + LOG(FATAL) << "op " << op; } // Very rarely, test graph expansion by adding then removing many nodes. diff --git a/absl/synchronization/internal/kernel_timeout.cc b/absl/synchronization/internal/kernel_timeout.cc new file mode 100644 index 00000000..48ea6287 --- /dev/null +++ b/absl/synchronization/internal/kernel_timeout.cc @@ -0,0 +1,225 @@ +// Copyright 2023 The Abseil Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "absl/synchronization/internal/kernel_timeout.h" + +#ifndef _WIN32 +#include <sys/types.h> +#endif + +#include <algorithm> +#include <chrono> // NOLINT(build/c++11) +#include <cstdint> +#include <cstdlib> +#include <cstring> +#include <ctime> +#include <limits> + +#include "absl/base/attributes.h" +#include "absl/base/call_once.h" +#include "absl/base/config.h" +#include "absl/time/time.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +#ifdef ABSL_INTERNAL_NEED_REDUNDANT_CONSTEXPR_DECL +constexpr uint64_t KernelTimeout::kNoTimeout; +constexpr int64_t KernelTimeout::kMaxNanos; +#endif + +int64_t KernelTimeout::SteadyClockNow() { + if (!SupportsSteadyClock()) { + return absl::GetCurrentTimeNanos(); + } + return std::chrono::duration_cast<std::chrono::nanoseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); +} + +KernelTimeout::KernelTimeout(absl::Time t) { + // `absl::InfiniteFuture()` is a common "no timeout" value and cheaper to + // compare than convert. + if (t == absl::InfiniteFuture()) { + rep_ = kNoTimeout; + return; + } + + int64_t unix_nanos = absl::ToUnixNanos(t); + + // A timeout that lands before the unix epoch is converted to 0. + // In theory implementations should expire these timeouts immediately. + if (unix_nanos < 0) { + unix_nanos = 0; + } + + // Values greater than or equal to kMaxNanos are converted to infinite. + if (unix_nanos >= kMaxNanos) { + rep_ = kNoTimeout; + return; + } + + rep_ = static_cast<uint64_t>(unix_nanos) << 1; +} + +KernelTimeout::KernelTimeout(absl::Duration d) { + // `absl::InfiniteDuration()` is a common "no timeout" value and cheaper to + // compare than convert. + if (d == absl::InfiniteDuration()) { + rep_ = kNoTimeout; + return; + } + + int64_t nanos = absl::ToInt64Nanoseconds(d); + + // Negative durations are normalized to 0. + // In theory implementations should expire these timeouts immediately. + if (nanos < 0) { + nanos = 0; + } + + int64_t now = SteadyClockNow(); + if (nanos > kMaxNanos - now) { + // Durations that would be greater than kMaxNanos are converted to infinite. + rep_ = kNoTimeout; + return; + } + + nanos += now; + rep_ = (static_cast<uint64_t>(nanos) << 1) | uint64_t{1}; +} + +int64_t KernelTimeout::MakeAbsNanos() const { + if (!has_timeout()) { + return kMaxNanos; + } + + int64_t nanos = RawAbsNanos(); + + if (is_relative_timeout()) { + // We need to change epochs, because the relative timeout might be + // represented by an absolute timestamp from another clock. + nanos = std::max<int64_t>(nanos - SteadyClockNow(), 0); + int64_t now = absl::GetCurrentTimeNanos(); + if (nanos > kMaxNanos - now) { + // Overflow. + nanos = kMaxNanos; + } else { + nanos += now; + } + } else if (nanos == 0) { + // Some callers have assumed that 0 means no timeout, so instead we return a + // time of 1 nanosecond after the epoch. + nanos = 1; + } + + return nanos; +} + +int64_t KernelTimeout::InNanosecondsFromNow() const { + if (!has_timeout()) { + return kMaxNanos; + } + + int64_t nanos = RawAbsNanos(); + if (is_absolute_timeout()) { + return std::max<int64_t>(nanos - absl::GetCurrentTimeNanos(), 0); + } + return std::max<int64_t>(nanos - SteadyClockNow(), 0); +} + +struct timespec KernelTimeout::MakeAbsTimespec() const { + return absl::ToTimespec(absl::Nanoseconds(MakeAbsNanos())); +} + +struct timespec KernelTimeout::MakeRelativeTimespec() const { + return absl::ToTimespec(absl::Nanoseconds(InNanosecondsFromNow())); +} + +#ifndef _WIN32 +struct timespec KernelTimeout::MakeClockAbsoluteTimespec(clockid_t c) const { + if (!has_timeout()) { + return absl::ToTimespec(absl::Nanoseconds(kMaxNanos)); + } + + int64_t nanos = RawAbsNanos(); + if (is_absolute_timeout()) { + nanos -= absl::GetCurrentTimeNanos(); + } else { + nanos -= SteadyClockNow(); + } + + struct timespec now; + ABSL_RAW_CHECK(clock_gettime(c, &now) == 0, "clock_gettime() failed"); + absl::Duration from_clock_epoch = + absl::DurationFromTimespec(now) + absl::Nanoseconds(nanos); + if (from_clock_epoch <= absl::ZeroDuration()) { + // Some callers have assumed that 0 means no timeout, so instead we return a + // time of 1 nanosecond after the epoch. For safety we also do not return + // negative values. + return absl::ToTimespec(absl::Nanoseconds(1)); + } + return absl::ToTimespec(from_clock_epoch); +} +#endif + +KernelTimeout::DWord KernelTimeout::InMillisecondsFromNow() const { + constexpr DWord kInfinite = std::numeric_limits<DWord>::max(); + + if (!has_timeout()) { + return kInfinite; + } + + constexpr uint64_t kNanosInMillis = uint64_t{1'000'000}; + constexpr uint64_t kMaxValueNanos = + std::numeric_limits<int64_t>::max() - kNanosInMillis + 1; + + uint64_t ns_from_now = static_cast<uint64_t>(InNanosecondsFromNow()); + if (ns_from_now >= kMaxValueNanos) { + // Rounding up would overflow. + return kInfinite; + } + // Convert to milliseconds, always rounding up. + uint64_t ms_from_now = (ns_from_now + kNanosInMillis - 1) / kNanosInMillis; + if (ms_from_now > kInfinite) { + return kInfinite; + } + return static_cast<DWord>(ms_from_now); +} + +std::chrono::time_point<std::chrono::system_clock> +KernelTimeout::ToChronoTimePoint() const { + if (!has_timeout()) { + return std::chrono::time_point<std::chrono::system_clock>::max(); + } + + // The cast to std::microseconds is because (on some platforms) the + // std::ratio used by std::chrono::steady_clock doesn't convert to + // std::nanoseconds, so it doesn't compile. + auto micros = std::chrono::duration_cast<std::chrono::microseconds>( + std::chrono::nanoseconds(MakeAbsNanos())); + return std::chrono::system_clock::from_time_t(0) + micros; +} + +std::chrono::nanoseconds KernelTimeout::ToChronoDuration() const { + if (!has_timeout()) { + return std::chrono::nanoseconds::max(); + } + return std::chrono::nanoseconds(InNanosecondsFromNow()); +} + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl diff --git a/absl/synchronization/internal/kernel_timeout.h b/absl/synchronization/internal/kernel_timeout.h index f5c2c0ef..06404a75 100644 --- a/absl/synchronization/internal/kernel_timeout.h +++ b/absl/synchronization/internal/kernel_timeout.h @@ -11,26 +11,21 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -// - -// An optional absolute timeout, with nanosecond granularity, -// compatible with absl::Time. Suitable for in-register -// parameter-passing (e.g. syscalls.) -// Constructible from a absl::Time (for a timeout to be respected) or {} -// (for "no timeout".) -// This is a private low-level API for use by a handful of low-level -// components. Higher-level components should build APIs based on -// absl::Time and absl::Duration. #ifndef ABSL_SYNCHRONIZATION_INTERNAL_KERNEL_TIMEOUT_H_ #define ABSL_SYNCHRONIZATION_INTERNAL_KERNEL_TIMEOUT_H_ -#include <time.h> +#ifndef _WIN32 +#include <sys/types.h> +#endif #include <algorithm> +#include <chrono> // NOLINT(build/c++11) #include <cstdint> +#include <ctime> #include <limits> +#include "absl/base/config.h" #include "absl/base/internal/raw_logging.h" #include "absl/time/clock.h" #include "absl/time/time.h" @@ -39,58 +34,73 @@ namespace absl { ABSL_NAMESPACE_BEGIN namespace synchronization_internal { -class Waiter; - +// An optional timeout, with nanosecond granularity. +// +// This is a private low-level API for use by a handful of low-level +// components. Higher-level components should build APIs based on +// absl::Time and absl::Duration. class KernelTimeout { public: - // A timeout that should expire at <t>. Any value, in the full - // InfinitePast() to InfiniteFuture() range, is valid here and will be - // respected. - explicit KernelTimeout(absl::Time t) : ns_(MakeNs(t)) {} - // No timeout. - KernelTimeout() : ns_(0) {} + // Construct an absolute timeout that should expire at `t`. + explicit KernelTimeout(absl::Time t); - // A more explicit factory for those who prefer it. Equivalent to {}. - static KernelTimeout Never() { return {}; } + // Construct a relative timeout that should expire after `d`. + explicit KernelTimeout(absl::Duration d); - // We explicitly do not support other custom formats: timespec, int64_t nanos. - // Unify on this and absl::Time, please. + // Infinite timeout. + constexpr KernelTimeout() : rep_(kNoTimeout) {} - bool has_timeout() const { return ns_ != 0; } + // A more explicit factory for those who prefer it. + // Equivalent to `KernelTimeout()`. + static constexpr KernelTimeout Never() { return KernelTimeout(); } - // Convert to parameter for sem_timedwait/futex/similar. Only for approved - // users. Do not call if !has_timeout. + // Returns true if there is a timeout that will eventually expire. + // Returns false if the timeout is infinite. + bool has_timeout() const { return rep_ != kNoTimeout; } + + // If `has_timeout()` is true, returns true if the timeout was provided as an + // `absl::Time`. The return value is undefined if `has_timeout()` is false + // because all indefinite timeouts are equivalent. + bool is_absolute_timeout() const { return (rep_ & 1) == 0; } + + // If `has_timeout()` is true, returns true if the timeout was provided as an + // `absl::Duration`. The return value is undefined if `has_timeout()` is false + // because all indefinite timeouts are equivalent. + bool is_relative_timeout() const { return (rep_ & 1) == 1; } + + // Convert to `struct timespec` for interfaces that expect an absolute + // timeout. If !has_timeout() or is_relative_timeout(), attempts to convert to + // a reasonable absolute timeout, but callers should to test has_timeout() and + // is_relative_timeout() and prefer to use a more appropriate interface. struct timespec MakeAbsTimespec() const; - // Convert to unix epoch nanos. Do not call if !has_timeout. + // Convert to `struct timespec` for interfaces that expect a relative + // timeout. If !has_timeout() or is_absolute_timeout(), attempts to convert to + // a reasonable relative timeout, but callers should to test has_timeout() and + // is_absolute_timeout() and prefer to use a more appropriate interface. Since + // the return value is a relative duration, it should be recomputed by calling + // this method in the case of a spurious wakeup. + struct timespec MakeRelativeTimespec() const; + +#ifndef _WIN32 + // Convert to `struct timespec` for interfaces that expect an absolute timeout + // on a specific clock `c`. This is similar to `MakeAbsTimespec()`, but + // callers usually want to use this method with `CLOCK_MONOTONIC` when + // relative timeouts are requested, and when the appropriate interface expects + // an absolute timeout relative to a specific clock (for example, + // pthread_cond_clockwait() or sem_clockwait()). If !has_timeout(), attempts + // to convert to a reasonable absolute timeout, but callers should to test + // has_timeout() prefer to use a more appropriate interface. + struct timespec MakeClockAbsoluteTimespec(clockid_t c) const; +#endif + + // Convert to unix epoch nanos for interfaces that expect an absolute timeout + // in nanoseconds. If !has_timeout() or is_relative_timeout(), attempts to + // convert to a reasonable absolute timeout, but callers should to test + // has_timeout() and is_relative_timeout() and prefer to use a more + // appropriate interface. int64_t MakeAbsNanos() const; - private: - // internal rep, not user visible: ns after unix epoch. - // zero = no timeout. - // Negative we treat as an unlikely (and certainly expired!) but valid - // timeout. - int64_t ns_; - - static int64_t MakeNs(absl::Time t) { - // optimization--InfiniteFuture is common "no timeout" value - // and cheaper to compare than convert. - if (t == absl::InfiniteFuture()) return 0; - int64_t x = ToUnixNanos(t); - - // A timeout that lands exactly on the epoch (x=0) needs to be respected, - // so we alter it unnoticably to 1. Negative timeouts are in - // theory supported, but handled poorly by the kernel (long - // delays) so push them forward too; since all such times have - // already passed, it's indistinguishable. - if (x <= 0) x = 1; - // A time larger than what can be represented to the kernel is treated - // as no timeout. - if (x == (std::numeric_limits<int64_t>::max)()) x = 0; - return x; - } - -#ifdef _WIN32 // Converts to milliseconds from now, or INFINITE when // !has_timeout(). For use by SleepConditionVariableSRW on // Windows. Callers should recognize that the return value is a @@ -100,68 +110,66 @@ class KernelTimeout { // so we define our own DWORD and INFINITE instead of getting them from // <intsafe.h> and <WinBase.h>. typedef unsigned long DWord; // NOLINT - DWord InMillisecondsFromNow() const { - constexpr DWord kInfinite = (std::numeric_limits<DWord>::max)(); - if (!has_timeout()) { - return kInfinite; - } - // The use of absl::Now() to convert from absolute time to - // relative time means that absl::Now() cannot use anything that - // depends on KernelTimeout (for example, Mutex) on Windows. - int64_t now = ToUnixNanos(absl::Now()); - if (ns_ >= now) { - // Round up so that Now() + ms_from_now >= ns_. - constexpr uint64_t max_nanos = - (std::numeric_limits<int64_t>::max)() - 999999u; - uint64_t ms_from_now = - ((std::min)(max_nanos, static_cast<uint64_t>(ns_ - now)) + 999999u) / - 1000000u; - if (ms_from_now > kInfinite) { - return kInfinite; - } - return static_cast<DWord>(ms_from_now); - } - return 0; - } - - friend class Waiter; -#endif -}; + DWord InMillisecondsFromNow() const; + + // Convert to std::chrono::time_point for interfaces that expect an absolute + // timeout, like std::condition_variable::wait_until(). If !has_timeout() or + // is_relative_timeout(), attempts to convert to a reasonable absolute + // timeout, but callers should test has_timeout() and is_relative_timeout() + // and prefer to use a more appropriate interface. + std::chrono::time_point<std::chrono::system_clock> ToChronoTimePoint() const; + + // Convert to std::chrono::time_point for interfaces that expect a relative + // timeout, like std::condition_variable::wait_for(). If !has_timeout() or + // is_absolute_timeout(), attempts to convert to a reasonable relative + // timeout, but callers should test has_timeout() and is_absolute_timeout() + // and prefer to use a more appropriate interface. Since the return value is a + // relative duration, it should be recomputed by calling this method in the + // case of a spurious wakeup. + std::chrono::nanoseconds ToChronoDuration() const; + + // Returns true if steady (aka monotonic) clocks are supported by the system. + // This method exists because go/btm requires synchronized clocks, and + // thus requires we use the system (aka walltime) clock. + static constexpr bool SupportsSteadyClock() { return true; } -inline struct timespec KernelTimeout::MakeAbsTimespec() const { - int64_t n = ns_; - static const int64_t kNanosPerSecond = 1000 * 1000 * 1000; - if (n == 0) { - ABSL_RAW_LOG( - ERROR, "Tried to create a timespec from a non-timeout; never do this."); - // But we'll try to continue sanely. no-timeout ~= saturated timeout. - n = (std::numeric_limits<int64_t>::max)(); - } - - // Kernel APIs validate timespecs as being at or after the epoch, - // despite the kernel time type being signed. However, no one can - // tell the difference between a timeout at or before the epoch (since - // all such timeouts have expired!) - if (n < 0) n = 0; - - struct timespec abstime; - int64_t seconds = (std::min)(n / kNanosPerSecond, - int64_t{(std::numeric_limits<time_t>::max)()}); - abstime.tv_sec = static_cast<time_t>(seconds); - abstime.tv_nsec = static_cast<decltype(abstime.tv_nsec)>(n % kNanosPerSecond); - return abstime; -} - -inline int64_t KernelTimeout::MakeAbsNanos() const { - if (ns_ == 0) { - ABSL_RAW_LOG( - ERROR, "Tried to create a timeout from a non-timeout; never do this."); - // But we'll try to continue sanely. no-timeout ~= saturated timeout. - return (std::numeric_limits<int64_t>::max)(); - } - - return ns_; -} + private: + // Returns the current time, expressed as a count of nanoseconds since the + // epoch used by an arbitrary clock. The implementation tries to use a steady + // (monotonic) clock if one is available. + static int64_t SteadyClockNow(); + + // Internal representation. + // - If the value is kNoTimeout, then the timeout is infinite, and + // has_timeout() will return true. + // - If the low bit is 0, then the high 63 bits is the number of nanoseconds + // after the unix epoch. + // - If the low bit is 1, then the high 63 bits is the number of nanoseconds + // after the epoch used by SteadyClockNow(). + // + // In all cases the time is stored as an absolute time, the only difference is + // the clock epoch. The use of absolute times is important since in the case + // of a relative timeout with a spurious wakeup, the program would have to + // restart the wait, and thus needs a way of recomputing the remaining time. + uint64_t rep_; + + // Returns the number of nanoseconds stored in the internal representation. + // When combined with the clock epoch indicated by the low bit (which is + // accessed through is_absolute_timeout() and is_relative_timeout()), the + // return value is used to compute when the timeout should occur. + int64_t RawAbsNanos() const { return static_cast<int64_t>(rep_ >> 1); } + + // Converts to nanoseconds from now. Since the return value is a relative + // duration, it should be recomputed by calling this method in the case of a + // spurious wakeup. + int64_t InNanosecondsFromNow() const; + + // A value that represents no timeout (or an infinite timeout). + static constexpr uint64_t kNoTimeout = (std::numeric_limits<uint64_t>::max)(); + + // The maximum value that can be stored in the high 63 bits. + static constexpr int64_t kMaxNanos = (std::numeric_limits<int64_t>::max)(); +}; } // namespace synchronization_internal ABSL_NAMESPACE_END diff --git a/absl/synchronization/internal/kernel_timeout_test.cc b/absl/synchronization/internal/kernel_timeout_test.cc new file mode 100644 index 00000000..92ed2691 --- /dev/null +++ b/absl/synchronization/internal/kernel_timeout_test.cc @@ -0,0 +1,394 @@ +// Copyright 2023 The Abseil Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "absl/synchronization/internal/kernel_timeout.h" + +#include <ctime> +#include <chrono> // NOLINT(build/c++11) +#include <limits> + +#include "absl/base/config.h" +#include "absl/random/random.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "gtest/gtest.h" + +// Test go/btm support by randomizing the value of clock_gettime() for +// CLOCK_MONOTONIC. This works by overriding a weak symbol in glibc. +// We should be resistant to this randomization when !SupportsSteadyClock(). +#if defined(__GOOGLE_GRTE_VERSION__) && \ + !defined(ABSL_HAVE_ADDRESS_SANITIZER) && \ + !defined(ABSL_HAVE_MEMORY_SANITIZER) && \ + !defined(ABSL_HAVE_THREAD_SANITIZER) +extern "C" int __clock_gettime(clockid_t c, struct timespec* ts); + +extern "C" int clock_gettime(clockid_t c, struct timespec* ts) { + if (c == CLOCK_MONOTONIC && + !absl::synchronization_internal::KernelTimeout::SupportsSteadyClock()) { + absl::SharedBitGen gen; + ts->tv_sec = absl::Uniform(gen, 0, 1'000'000'000); + ts->tv_nsec = absl::Uniform(gen, 0, 1'000'000'000); + return 0; + } + return __clock_gettime(c, ts); +} +#endif + +namespace { + +#if defined(ABSL_HAVE_ADDRESS_SANITIZER) || \ + defined(ABSL_HAVE_MEMORY_SANITIZER) || \ + defined(ABSL_HAVE_THREAD_SANITIZER) || \ + defined(__ANDROID__) || \ + defined(_WIN32) || defined(_WIN64) +constexpr absl::Duration kTimingBound = absl::Milliseconds(5); +#else +constexpr absl::Duration kTimingBound = absl::Microseconds(250); +#endif + +using absl::synchronization_internal::KernelTimeout; + +TEST(KernelTimeout, FiniteTimes) { + constexpr absl::Duration kDurationsToTest[] = { + absl::ZeroDuration(), + absl::Nanoseconds(1), + absl::Microseconds(1), + absl::Milliseconds(1), + absl::Seconds(1), + absl::Minutes(1), + absl::Hours(1), + absl::Hours(1000), + -absl::Nanoseconds(1), + -absl::Microseconds(1), + -absl::Milliseconds(1), + -absl::Seconds(1), + -absl::Minutes(1), + -absl::Hours(1), + -absl::Hours(1000), + }; + + for (auto duration : kDurationsToTest) { + const absl::Time now = absl::Now(); + const absl::Time when = now + duration; + SCOPED_TRACE(duration); + KernelTimeout t(when); + EXPECT_TRUE(t.has_timeout()); + EXPECT_TRUE(t.is_absolute_timeout()); + EXPECT_FALSE(t.is_relative_timeout()); + EXPECT_EQ(absl::TimeFromTimespec(t.MakeAbsTimespec()), when); +#ifndef _WIN32 + EXPECT_LE( + absl::AbsDuration(absl::Now() + duration - + absl::TimeFromTimespec( + t.MakeClockAbsoluteTimespec(CLOCK_REALTIME))), + absl::Milliseconds(10)); +#endif + EXPECT_LE( + absl::AbsDuration(absl::DurationFromTimespec(t.MakeRelativeTimespec()) - + std::max(duration, absl::ZeroDuration())), + kTimingBound); + EXPECT_EQ(absl::FromUnixNanos(t.MakeAbsNanos()), when); + EXPECT_LE(absl::AbsDuration(absl::Milliseconds(t.InMillisecondsFromNow()) - + std::max(duration, absl::ZeroDuration())), + absl::Milliseconds(5)); + EXPECT_LE(absl::AbsDuration(absl::FromChrono(t.ToChronoTimePoint()) - when), + absl::Microseconds(1)); + EXPECT_LE(absl::AbsDuration(absl::FromChrono(t.ToChronoDuration()) - + std::max(duration, absl::ZeroDuration())), + kTimingBound); + } +} + +TEST(KernelTimeout, InfiniteFuture) { + KernelTimeout t(absl::InfiniteFuture()); + EXPECT_FALSE(t.has_timeout()); + // Callers are expected to check has_timeout() instead of using the methods + // below, but we do try to do something reasonable if they don't. We may not + // be able to round-trip back to absl::InfiniteDuration() or + // absl::InfiniteFuture(), but we should return a very large value. + EXPECT_GT(absl::TimeFromTimespec(t.MakeAbsTimespec()), + absl::Now() + absl::Hours(100000)); +#ifndef _WIN32 + EXPECT_GT(absl::TimeFromTimespec(t.MakeClockAbsoluteTimespec(CLOCK_REALTIME)), + absl::Now() + absl::Hours(100000)); +#endif + EXPECT_GT(absl::DurationFromTimespec(t.MakeRelativeTimespec()), + absl::Hours(100000)); + EXPECT_GT(absl::FromUnixNanos(t.MakeAbsNanos()), + absl::Now() + absl::Hours(100000)); + EXPECT_EQ(t.InMillisecondsFromNow(), + std::numeric_limits<KernelTimeout::DWord>::max()); + EXPECT_EQ(t.ToChronoTimePoint(), + std::chrono::time_point<std::chrono::system_clock>::max()); + EXPECT_GE(t.ToChronoDuration(), std::chrono::nanoseconds::max()); +} + +TEST(KernelTimeout, DefaultConstructor) { + // The default constructor is equivalent to absl::InfiniteFuture(). + KernelTimeout t; + EXPECT_FALSE(t.has_timeout()); + // Callers are expected to check has_timeout() instead of using the methods + // below, but we do try to do something reasonable if they don't. We may not + // be able to round-trip back to absl::InfiniteDuration() or + // absl::InfiniteFuture(), but we should return a very large value. + EXPECT_GT(absl::TimeFromTimespec(t.MakeAbsTimespec()), + absl::Now() + absl::Hours(100000)); +#ifndef _WIN32 + EXPECT_GT(absl::TimeFromTimespec(t.MakeClockAbsoluteTimespec(CLOCK_REALTIME)), + absl::Now() + absl::Hours(100000)); +#endif + EXPECT_GT(absl::DurationFromTimespec(t.MakeRelativeTimespec()), + absl::Hours(100000)); + EXPECT_GT(absl::FromUnixNanos(t.MakeAbsNanos()), + absl::Now() + absl::Hours(100000)); + EXPECT_EQ(t.InMillisecondsFromNow(), + std::numeric_limits<KernelTimeout::DWord>::max()); + EXPECT_EQ(t.ToChronoTimePoint(), + std::chrono::time_point<std::chrono::system_clock>::max()); + EXPECT_GE(t.ToChronoDuration(), std::chrono::nanoseconds::max()); +} + +TEST(KernelTimeout, TimeMaxNanos) { + // Time >= kMaxNanos should behave as no timeout. + KernelTimeout t(absl::FromUnixNanos(std::numeric_limits<int64_t>::max())); + EXPECT_FALSE(t.has_timeout()); + // Callers are expected to check has_timeout() instead of using the methods + // below, but we do try to do something reasonable if they don't. We may not + // be able to round-trip back to absl::InfiniteDuration() or + // absl::InfiniteFuture(), but we should return a very large value. + EXPECT_GT(absl::TimeFromTimespec(t.MakeAbsTimespec()), + absl::Now() + absl::Hours(100000)); +#ifndef _WIN32 + EXPECT_GT(absl::TimeFromTimespec(t.MakeClockAbsoluteTimespec(CLOCK_REALTIME)), + absl::Now() + absl::Hours(100000)); +#endif + EXPECT_GT(absl::DurationFromTimespec(t.MakeRelativeTimespec()), + absl::Hours(100000)); + EXPECT_GT(absl::FromUnixNanos(t.MakeAbsNanos()), + absl::Now() + absl::Hours(100000)); + EXPECT_EQ(t.InMillisecondsFromNow(), + std::numeric_limits<KernelTimeout::DWord>::max()); + EXPECT_EQ(t.ToChronoTimePoint(), + std::chrono::time_point<std::chrono::system_clock>::max()); + EXPECT_GE(t.ToChronoDuration(), std::chrono::nanoseconds::max()); +} + +TEST(KernelTimeout, Never) { + // KernelTimeout::Never() is equivalent to absl::InfiniteFuture(). + KernelTimeout t = KernelTimeout::Never(); + EXPECT_FALSE(t.has_timeout()); + // Callers are expected to check has_timeout() instead of using the methods + // below, but we do try to do something reasonable if they don't. We may not + // be able to round-trip back to absl::InfiniteDuration() or + // absl::InfiniteFuture(), but we should return a very large value. + EXPECT_GT(absl::TimeFromTimespec(t.MakeAbsTimespec()), + absl::Now() + absl::Hours(100000)); +#ifndef _WIN32 + EXPECT_GT(absl::TimeFromTimespec(t.MakeClockAbsoluteTimespec(CLOCK_REALTIME)), + absl::Now() + absl::Hours(100000)); +#endif + EXPECT_GT(absl::DurationFromTimespec(t.MakeRelativeTimespec()), + absl::Hours(100000)); + EXPECT_GT(absl::FromUnixNanos(t.MakeAbsNanos()), + absl::Now() + absl::Hours(100000)); + EXPECT_EQ(t.InMillisecondsFromNow(), + std::numeric_limits<KernelTimeout::DWord>::max()); + EXPECT_EQ(t.ToChronoTimePoint(), + std::chrono::time_point<std::chrono::system_clock>::max()); + EXPECT_GE(t.ToChronoDuration(), std::chrono::nanoseconds::max()); +} + +TEST(KernelTimeout, InfinitePast) { + KernelTimeout t(absl::InfinitePast()); + EXPECT_TRUE(t.has_timeout()); + EXPECT_TRUE(t.is_absolute_timeout()); + EXPECT_FALSE(t.is_relative_timeout()); + EXPECT_LE(absl::TimeFromTimespec(t.MakeAbsTimespec()), + absl::FromUnixNanos(1)); +#ifndef _WIN32 + EXPECT_LE(absl::TimeFromTimespec(t.MakeClockAbsoluteTimespec(CLOCK_REALTIME)), + absl::FromUnixSeconds(1)); +#endif + EXPECT_EQ(absl::DurationFromTimespec(t.MakeRelativeTimespec()), + absl::ZeroDuration()); + EXPECT_LE(absl::FromUnixNanos(t.MakeAbsNanos()), absl::FromUnixNanos(1)); + EXPECT_EQ(t.InMillisecondsFromNow(), KernelTimeout::DWord{0}); + EXPECT_LT(t.ToChronoTimePoint(), std::chrono::system_clock::from_time_t(0) + + std::chrono::seconds(1)); + EXPECT_EQ(t.ToChronoDuration(), std::chrono::nanoseconds(0)); +} + +TEST(KernelTimeout, FiniteDurations) { + constexpr absl::Duration kDurationsToTest[] = { + absl::ZeroDuration(), + absl::Nanoseconds(1), + absl::Microseconds(1), + absl::Milliseconds(1), + absl::Seconds(1), + absl::Minutes(1), + absl::Hours(1), + absl::Hours(1000), + }; + + for (auto duration : kDurationsToTest) { + SCOPED_TRACE(duration); + KernelTimeout t(duration); + EXPECT_TRUE(t.has_timeout()); + EXPECT_FALSE(t.is_absolute_timeout()); + EXPECT_TRUE(t.is_relative_timeout()); + EXPECT_LE(absl::AbsDuration(absl::Now() + duration - + absl::TimeFromTimespec(t.MakeAbsTimespec())), + absl::Milliseconds(5)); +#ifndef _WIN32 + EXPECT_LE( + absl::AbsDuration(absl::Now() + duration - + absl::TimeFromTimespec( + t.MakeClockAbsoluteTimespec(CLOCK_REALTIME))), + absl::Milliseconds(5)); +#endif + EXPECT_LE( + absl::AbsDuration(absl::DurationFromTimespec(t.MakeRelativeTimespec()) - + duration), + kTimingBound); + EXPECT_LE(absl::AbsDuration(absl::Now() + duration - + absl::FromUnixNanos(t.MakeAbsNanos())), + absl::Milliseconds(5)); + EXPECT_LE(absl::Milliseconds(t.InMillisecondsFromNow()) - duration, + absl::Milliseconds(5)); + EXPECT_LE(absl::AbsDuration(absl::Now() + duration - + absl::FromChrono(t.ToChronoTimePoint())), + kTimingBound); + EXPECT_LE( + absl::AbsDuration(absl::FromChrono(t.ToChronoDuration()) - duration), + kTimingBound); + } +} + +TEST(KernelTimeout, NegativeDurations) { + constexpr absl::Duration kDurationsToTest[] = { + -absl::ZeroDuration(), + -absl::Nanoseconds(1), + -absl::Microseconds(1), + -absl::Milliseconds(1), + -absl::Seconds(1), + -absl::Minutes(1), + -absl::Hours(1), + -absl::Hours(1000), + -absl::InfiniteDuration(), + }; + + for (auto duration : kDurationsToTest) { + // Negative durations should all be converted to zero durations or "now". + SCOPED_TRACE(duration); + KernelTimeout t(duration); + EXPECT_TRUE(t.has_timeout()); + EXPECT_FALSE(t.is_absolute_timeout()); + EXPECT_TRUE(t.is_relative_timeout()); + EXPECT_LE(absl::AbsDuration(absl::Now() - + absl::TimeFromTimespec(t.MakeAbsTimespec())), + absl::Milliseconds(5)); +#ifndef _WIN32 + EXPECT_LE(absl::AbsDuration(absl::Now() - absl::TimeFromTimespec( + t.MakeClockAbsoluteTimespec( + CLOCK_REALTIME))), + absl::Milliseconds(5)); +#endif + EXPECT_EQ(absl::DurationFromTimespec(t.MakeRelativeTimespec()), + absl::ZeroDuration()); + EXPECT_LE( + absl::AbsDuration(absl::Now() - absl::FromUnixNanos(t.MakeAbsNanos())), + absl::Milliseconds(5)); + EXPECT_EQ(t.InMillisecondsFromNow(), KernelTimeout::DWord{0}); + EXPECT_LE(absl::AbsDuration(absl::Now() - + absl::FromChrono(t.ToChronoTimePoint())), + absl::Milliseconds(5)); + EXPECT_EQ(t.ToChronoDuration(), std::chrono::nanoseconds(0)); + } +} + +TEST(KernelTimeout, InfiniteDuration) { + KernelTimeout t(absl::InfiniteDuration()); + EXPECT_FALSE(t.has_timeout()); + // Callers are expected to check has_timeout() instead of using the methods + // below, but we do try to do something reasonable if they don't. We may not + // be able to round-trip back to absl::InfiniteDuration() or + // absl::InfiniteFuture(), but we should return a very large value. + EXPECT_GT(absl::TimeFromTimespec(t.MakeAbsTimespec()), + absl::Now() + absl::Hours(100000)); +#ifndef _WIN32 + EXPECT_GT(absl::TimeFromTimespec(t.MakeClockAbsoluteTimespec(CLOCK_REALTIME)), + absl::Now() + absl::Hours(100000)); +#endif + EXPECT_GT(absl::DurationFromTimespec(t.MakeRelativeTimespec()), + absl::Hours(100000)); + EXPECT_GT(absl::FromUnixNanos(t.MakeAbsNanos()), + absl::Now() + absl::Hours(100000)); + EXPECT_EQ(t.InMillisecondsFromNow(), + std::numeric_limits<KernelTimeout::DWord>::max()); + EXPECT_EQ(t.ToChronoTimePoint(), + std::chrono::time_point<std::chrono::system_clock>::max()); + EXPECT_GE(t.ToChronoDuration(), std::chrono::nanoseconds::max()); +} + +TEST(KernelTimeout, DurationMaxNanos) { + // Duration >= kMaxNanos should behave as no timeout. + KernelTimeout t(absl::Nanoseconds(std::numeric_limits<int64_t>::max())); + EXPECT_FALSE(t.has_timeout()); + // Callers are expected to check has_timeout() instead of using the methods + // below, but we do try to do something reasonable if they don't. We may not + // be able to round-trip back to absl::InfiniteDuration() or + // absl::InfiniteFuture(), but we should return a very large value. + EXPECT_GT(absl::TimeFromTimespec(t.MakeAbsTimespec()), + absl::Now() + absl::Hours(100000)); +#ifndef _WIN32 + EXPECT_GT(absl::TimeFromTimespec(t.MakeClockAbsoluteTimespec(CLOCK_REALTIME)), + absl::Now() + absl::Hours(100000)); +#endif + EXPECT_GT(absl::DurationFromTimespec(t.MakeRelativeTimespec()), + absl::Hours(100000)); + EXPECT_GT(absl::FromUnixNanos(t.MakeAbsNanos()), + absl::Now() + absl::Hours(100000)); + EXPECT_EQ(t.InMillisecondsFromNow(), + std::numeric_limits<KernelTimeout::DWord>::max()); + EXPECT_EQ(t.ToChronoTimePoint(), + std::chrono::time_point<std::chrono::system_clock>::max()); + EXPECT_GE(t.ToChronoDuration(), std::chrono::nanoseconds::max()); +} + +TEST(KernelTimeout, OverflowNanos) { + // Test what happens when KernelTimeout is constructed with an absl::Duration + // that would overflow now_nanos + duration. + int64_t now_nanos = absl::ToUnixNanos(absl::Now()); + int64_t limit = std::numeric_limits<int64_t>::max() - now_nanos; + absl::Duration duration = absl::Nanoseconds(limit) + absl::Seconds(1); + KernelTimeout t(duration); + // Timeouts should still be far in the future. + EXPECT_GT(absl::TimeFromTimespec(t.MakeAbsTimespec()), + absl::Now() + absl::Hours(100000)); +#ifndef _WIN32 + EXPECT_GT(absl::TimeFromTimespec(t.MakeClockAbsoluteTimespec(CLOCK_REALTIME)), + absl::Now() + absl::Hours(100000)); +#endif + EXPECT_GT(absl::DurationFromTimespec(t.MakeRelativeTimespec()), + absl::Hours(100000)); + EXPECT_GT(absl::FromUnixNanos(t.MakeAbsNanos()), + absl::Now() + absl::Hours(100000)); + EXPECT_LE(absl::Milliseconds(t.InMillisecondsFromNow()) - duration, + absl::Milliseconds(5)); + EXPECT_GT(t.ToChronoTimePoint(), + std::chrono::system_clock::now() + std::chrono::hours(100000)); + EXPECT_GT(t.ToChronoDuration(), std::chrono::hours(100000)); +} + +} // namespace diff --git a/absl/synchronization/internal/per_thread_sem.cc b/absl/synchronization/internal/per_thread_sem.cc index 469e8f32..c9b8dc1e 100644 --- a/absl/synchronization/internal/per_thread_sem.cc +++ b/absl/synchronization/internal/per_thread_sem.cc @@ -40,13 +40,6 @@ std::atomic<int> *PerThreadSem::GetThreadBlockedCounter() { return identity->blocked_count_ptr; } -void PerThreadSem::Init(base_internal::ThreadIdentity *identity) { - new (Waiter::GetWaiter(identity)) Waiter(); - identity->ticker.store(0, std::memory_order_relaxed); - identity->wait_start.store(0, std::memory_order_relaxed); - identity->is_idle.store(false, std::memory_order_relaxed); -} - void PerThreadSem::Tick(base_internal::ThreadIdentity *identity) { const int ticker = identity->ticker.fetch_add(1, std::memory_order_relaxed) + 1; @@ -54,7 +47,7 @@ void PerThreadSem::Tick(base_internal::ThreadIdentity *identity) { const bool is_idle = identity->is_idle.load(std::memory_order_relaxed); if (wait_start && (ticker - wait_start > Waiter::kIdlePeriods) && !is_idle) { // Wakeup the waiting thread since it is time for it to become idle. - Waiter::GetWaiter(identity)->Poke(); + ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemPoke)(identity); } } @@ -64,11 +57,22 @@ ABSL_NAMESPACE_END extern "C" { +ABSL_ATTRIBUTE_WEAK void ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemInit)( + absl::base_internal::ThreadIdentity *identity) { + new (absl::synchronization_internal::Waiter::GetWaiter(identity)) + absl::synchronization_internal::Waiter(); +} + ABSL_ATTRIBUTE_WEAK void ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemPost)( absl::base_internal::ThreadIdentity *identity) { absl::synchronization_internal::Waiter::GetWaiter(identity)->Post(); } +ABSL_ATTRIBUTE_WEAK void ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemPoke)( + absl::base_internal::ThreadIdentity *identity) { + absl::synchronization_internal::Waiter::GetWaiter(identity)->Poke(); +} + ABSL_ATTRIBUTE_WEAK bool ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemWait)( absl::synchronization_internal::KernelTimeout t) { bool timeout = false; diff --git a/absl/synchronization/internal/per_thread_sem.h b/absl/synchronization/internal/per_thread_sem.h index 90a88809..144ab3cd 100644 --- a/absl/synchronization/internal/per_thread_sem.h +++ b/absl/synchronization/internal/per_thread_sem.h @@ -64,7 +64,7 @@ class PerThreadSem { private: // Create the PerThreadSem associated with "identity". Initializes count=0. // REQUIRES: May only be called by ThreadIdentity. - static void Init(base_internal::ThreadIdentity* identity); + static inline void Init(base_internal::ThreadIdentity* identity); // Increments "identity"'s count. static inline void Post(base_internal::ThreadIdentity* identity); @@ -91,12 +91,21 @@ ABSL_NAMESPACE_END // By changing our extension points to be extern "C", we dodge this // check. extern "C" { +void ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemInit)( + absl::base_internal::ThreadIdentity* identity); void ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemPost)( absl::base_internal::ThreadIdentity* identity); bool ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemWait)( absl::synchronization_internal::KernelTimeout t); +void ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemPoke)( + absl::base_internal::ThreadIdentity* identity); } // extern "C" +void absl::synchronization_internal::PerThreadSem::Init( + absl::base_internal::ThreadIdentity* identity) { + ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemInit)(identity); +} + void absl::synchronization_internal::PerThreadSem::Post( absl::base_internal::ThreadIdentity* identity) { ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemPost)(identity); diff --git a/absl/synchronization/internal/pthread_waiter.cc b/absl/synchronization/internal/pthread_waiter.cc new file mode 100644 index 00000000..bf700e95 --- /dev/null +++ b/absl/synchronization/internal/pthread_waiter.cc @@ -0,0 +1,167 @@ +// Copyright 2023 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "absl/synchronization/internal/pthread_waiter.h" + +#ifdef ABSL_INTERNAL_HAVE_PTHREAD_WAITER + +#include <pthread.h> +#include <sys/time.h> +#include <unistd.h> + +#include <cassert> +#include <cerrno> + +#include "absl/base/config.h" +#include "absl/base/internal/raw_logging.h" +#include "absl/base/internal/thread_identity.h" +#include "absl/base/optimization.h" +#include "absl/synchronization/internal/kernel_timeout.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +namespace { +class PthreadMutexHolder { + public: + explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) { + const int err = pthread_mutex_lock(mu_); + if (err != 0) { + ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err); + } + } + + PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete; + PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete; + + ~PthreadMutexHolder() { + const int err = pthread_mutex_unlock(mu_); + if (err != 0) { + ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err); + } + } + + private: + pthread_mutex_t *mu_; +}; +} // namespace + +#ifdef ABSL_INTERNAL_NEED_REDUNDANT_CONSTEXPR_DECL +constexpr char PthreadWaiter::kName[]; +#endif + +PthreadWaiter::PthreadWaiter() : waiter_count_(0), wakeup_count_(0) { + const int err = pthread_mutex_init(&mu_, 0); + if (err != 0) { + ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err); + } + + const int err2 = pthread_cond_init(&cv_, 0); + if (err2 != 0) { + ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2); + } +} + +#ifdef __APPLE__ +#define ABSL_INTERNAL_HAS_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP 1 +#endif + +#if defined(__GLIBC__) && \ + (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 30)) +#define ABSL_INTERNAL_HAVE_PTHREAD_COND_CLOCKWAIT 1 +#elif defined(__ANDROID_API__) && __ANDROID_API__ >= 30 +#define ABSL_INTERNAL_HAVE_PTHREAD_COND_CLOCKWAIT 1 +#endif + +// Calls pthread_cond_timedwait() or possibly something else like +// pthread_cond_timedwait_relative_np() depending on the platform and +// KernelTimeout requested. The return value is the same as the return +// value of pthread_cond_timedwait(). +int PthreadWaiter::TimedWait(KernelTimeout t) { + assert(t.has_timeout()); + if (KernelTimeout::SupportsSteadyClock() && t.is_relative_timeout()) { +#ifdef ABSL_INTERNAL_HAS_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP + const auto rel_timeout = t.MakeRelativeTimespec(); + return pthread_cond_timedwait_relative_np(&cv_, &mu_, &rel_timeout); +#elif defined(ABSL_INTERNAL_HAVE_PTHREAD_COND_CLOCKWAIT) && \ + defined(CLOCK_MONOTONIC) + const auto abs_clock_timeout = t.MakeClockAbsoluteTimespec(CLOCK_MONOTONIC); + return pthread_cond_clockwait(&cv_, &mu_, CLOCK_MONOTONIC, + &abs_clock_timeout); +#endif + } + + const auto abs_timeout = t.MakeAbsTimespec(); + return pthread_cond_timedwait(&cv_, &mu_, &abs_timeout); +} + +bool PthreadWaiter::Wait(KernelTimeout t) { + PthreadMutexHolder h(&mu_); + ++waiter_count_; + // Loop until we find a wakeup to consume or timeout. + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; + while (wakeup_count_ == 0) { + if (!first_pass) MaybeBecomeIdle(); + // No wakeups available, time to wait. + if (!t.has_timeout()) { + const int err = pthread_cond_wait(&cv_, &mu_); + if (err != 0) { + ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err); + } + } else { + const int err = TimedWait(t); + if (err == ETIMEDOUT) { + --waiter_count_; + return false; + } + if (err != 0) { + ABSL_RAW_LOG(FATAL, "PthreadWaiter::TimedWait() failed: %d", err); + } + } + first_pass = false; + } + // Consume a wakeup and we're done. + --wakeup_count_; + --waiter_count_; + return true; +} + +void PthreadWaiter::Post() { + PthreadMutexHolder h(&mu_); + ++wakeup_count_; + InternalCondVarPoke(); +} + +void PthreadWaiter::Poke() { + PthreadMutexHolder h(&mu_); + InternalCondVarPoke(); +} + +void PthreadWaiter::InternalCondVarPoke() { + if (waiter_count_ != 0) { + const int err = pthread_cond_signal(&cv_); + if (ABSL_PREDICT_FALSE(err != 0)) { + ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err); + } + } +} + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl + +#endif // ABSL_INTERNAL_HAVE_PTHREAD_WAITER diff --git a/absl/synchronization/internal/pthread_waiter.h b/absl/synchronization/internal/pthread_waiter.h new file mode 100644 index 00000000..206aefa4 --- /dev/null +++ b/absl/synchronization/internal/pthread_waiter.h @@ -0,0 +1,60 @@ +// Copyright 2023 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef ABSL_SYNCHRONIZATION_INTERNAL_PTHREAD_WAITER_H_ +#define ABSL_SYNCHRONIZATION_INTERNAL_PTHREAD_WAITER_H_ + +#ifndef _WIN32 +#include <pthread.h> + +#include "absl/base/config.h" +#include "absl/synchronization/internal/kernel_timeout.h" +#include "absl/synchronization/internal/waiter_base.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +#define ABSL_INTERNAL_HAVE_PTHREAD_WAITER 1 + +class PthreadWaiter : public WaiterCrtp<PthreadWaiter> { + public: + PthreadWaiter(); + + bool Wait(KernelTimeout t); + void Post(); + void Poke(); + + static constexpr char kName[] = "PthreadWaiter"; + + private: + int TimedWait(KernelTimeout t); + + // REQUIRES: mu_ must be held. + void InternalCondVarPoke(); + + pthread_mutex_t mu_; + pthread_cond_t cv_; + int waiter_count_; + int wakeup_count_; // Unclaimed wakeups. +}; + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl + +#endif // ndef _WIN32 + +#endif // ABSL_SYNCHRONIZATION_INTERNAL_PTHREAD_WAITER_H_ diff --git a/absl/synchronization/internal/sem_waiter.cc b/absl/synchronization/internal/sem_waiter.cc new file mode 100644 index 00000000..d62dbdc7 --- /dev/null +++ b/absl/synchronization/internal/sem_waiter.cc @@ -0,0 +1,122 @@ +// Copyright 2023 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "absl/synchronization/internal/sem_waiter.h" + +#ifdef ABSL_INTERNAL_HAVE_SEM_WAITER + +#include <semaphore.h> + +#include <atomic> +#include <cassert> +#include <cstdint> +#include <cerrno> + +#include "absl/base/config.h" +#include "absl/base/internal/raw_logging.h" +#include "absl/base/internal/thread_identity.h" +#include "absl/base/optimization.h" +#include "absl/synchronization/internal/kernel_timeout.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +#ifdef ABSL_INTERNAL_NEED_REDUNDANT_CONSTEXPR_DECL +constexpr char SemWaiter::kName[]; +#endif + +SemWaiter::SemWaiter() : wakeups_(0) { + if (sem_init(&sem_, 0, 0) != 0) { + ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno); + } +} + +#if defined(__GLIBC__) && \ + (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 30)) +#define ABSL_INTERNAL_HAVE_SEM_CLOCKWAIT 1 +#elif defined(__ANDROID_API__) && __ANDROID_API__ >= 30 +#define ABSL_INTERNAL_HAVE_SEM_CLOCKWAIT 1 +#endif + +// Calls sem_timedwait() or possibly something else like +// sem_clockwait() depending on the platform and +// KernelTimeout requested. The return value is the same as a call to the return +// value to a call to sem_timedwait(). +int SemWaiter::TimedWait(KernelTimeout t) { + if (KernelTimeout::SupportsSteadyClock() && t.is_relative_timeout()) { +#if defined(ABSL_INTERNAL_HAVE_SEM_CLOCKWAIT) && defined(CLOCK_MONOTONIC) + const auto abs_clock_timeout = t.MakeClockAbsoluteTimespec(CLOCK_MONOTONIC); + return sem_clockwait(&sem_, CLOCK_MONOTONIC, &abs_clock_timeout); +#endif + } + + const auto abs_timeout = t.MakeAbsTimespec(); + return sem_timedwait(&sem_, &abs_timeout); +} + +bool SemWaiter::Wait(KernelTimeout t) { + // Loop until we timeout or consume a wakeup. + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; + while (true) { + int x = wakeups_.load(std::memory_order_relaxed); + while (x != 0) { + if (!wakeups_.compare_exchange_weak(x, x - 1, + std::memory_order_acquire, + std::memory_order_relaxed)) { + continue; // Raced with someone, retry. + } + // Successfully consumed a wakeup, we're done. + return true; + } + + if (!first_pass) MaybeBecomeIdle(); + // Nothing to consume, wait (looping on EINTR). + while (true) { + if (!t.has_timeout()) { + if (sem_wait(&sem_) == 0) break; + if (errno == EINTR) continue; + ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno); + } else { + if (TimedWait(t) == 0) break; + if (errno == EINTR) continue; + if (errno == ETIMEDOUT) return false; + ABSL_RAW_LOG(FATAL, "SemWaiter::TimedWait() failed: %d", errno); + } + } + first_pass = false; + } +} + +void SemWaiter::Post() { + // Post a wakeup. + if (wakeups_.fetch_add(1, std::memory_order_release) == 0) { + // We incremented from 0, need to wake a potential waiter. + Poke(); + } +} + +void SemWaiter::Poke() { + if (sem_post(&sem_) != 0) { // Wake any semaphore waiter. + ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno); + } +} + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl + +#endif // ABSL_INTERNAL_HAVE_SEM_WAITER diff --git a/absl/synchronization/internal/sem_waiter.h b/absl/synchronization/internal/sem_waiter.h new file mode 100644 index 00000000..c22746f9 --- /dev/null +++ b/absl/synchronization/internal/sem_waiter.h @@ -0,0 +1,65 @@ +// Copyright 2023 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef ABSL_SYNCHRONIZATION_INTERNAL_SEM_WAITER_H_ +#define ABSL_SYNCHRONIZATION_INTERNAL_SEM_WAITER_H_ + +#include "absl/base/config.h" + +#ifdef ABSL_HAVE_SEMAPHORE_H +#include <semaphore.h> + +#include <atomic> +#include <cstdint> + +#include "absl/base/internal/thread_identity.h" +#include "absl/synchronization/internal/futex.h" +#include "absl/synchronization/internal/kernel_timeout.h" +#include "absl/synchronization/internal/waiter_base.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +#define ABSL_INTERNAL_HAVE_SEM_WAITER 1 + +class SemWaiter : public WaiterCrtp<SemWaiter> { + public: + SemWaiter(); + + bool Wait(KernelTimeout t); + void Post(); + void Poke(); + + static constexpr char kName[] = "SemWaiter"; + + private: + int TimedWait(KernelTimeout t); + + sem_t sem_; + + // This seems superfluous, but for Poke() we need to cause spurious + // wakeups on the semaphore. Hence we can't actually use the + // semaphore's count. + std::atomic<int> wakeups_; +}; + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl + +#endif // ABSL_HAVE_SEMAPHORE_H + +#endif // ABSL_SYNCHRONIZATION_INTERNAL_SEM_WAITER_H_ diff --git a/absl/synchronization/internal/stdcpp_waiter.cc b/absl/synchronization/internal/stdcpp_waiter.cc new file mode 100644 index 00000000..355718a7 --- /dev/null +++ b/absl/synchronization/internal/stdcpp_waiter.cc @@ -0,0 +1,91 @@ +// Copyright 2023 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "absl/synchronization/internal/stdcpp_waiter.h" + +#ifdef ABSL_INTERNAL_HAVE_STDCPP_WAITER + +#include <chrono> // NOLINT(build/c++11) +#include <condition_variable> // NOLINT(build/c++11) +#include <mutex> // NOLINT(build/c++11) + +#include "absl/base/config.h" +#include "absl/base/internal/raw_logging.h" +#include "absl/base/internal/thread_identity.h" +#include "absl/base/optimization.h" +#include "absl/synchronization/internal/kernel_timeout.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +#ifdef ABSL_INTERNAL_NEED_REDUNDANT_CONSTEXPR_DECL +constexpr char StdcppWaiter::kName[]; +#endif + +StdcppWaiter::StdcppWaiter() : waiter_count_(0), wakeup_count_(0) {} + +bool StdcppWaiter::Wait(KernelTimeout t) { + std::unique_lock<std::mutex> lock(mu_); + ++waiter_count_; + + // Loop until we find a wakeup to consume or timeout. + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; + while (wakeup_count_ == 0) { + if (!first_pass) MaybeBecomeIdle(); + // No wakeups available, time to wait. + if (!t.has_timeout()) { + cv_.wait(lock); + } else { + auto wait_result = t.SupportsSteadyClock() && t.is_relative_timeout() + ? cv_.wait_for(lock, t.ToChronoDuration()) + : cv_.wait_until(lock, t.ToChronoTimePoint()); + if (wait_result == std::cv_status::timeout) { + --waiter_count_; + return false; + } + } + first_pass = false; + } + + // Consume a wakeup and we're done. + --wakeup_count_; + --waiter_count_; + return true; +} + +void StdcppWaiter::Post() { + std::lock_guard<std::mutex> lock(mu_); + ++wakeup_count_; + InternalCondVarPoke(); +} + +void StdcppWaiter::Poke() { + std::lock_guard<std::mutex> lock(mu_); + InternalCondVarPoke(); +} + +void StdcppWaiter::InternalCondVarPoke() { + if (waiter_count_ != 0) { + cv_.notify_one(); + } +} + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl + +#endif // ABSL_INTERNAL_HAVE_STDCPP_WAITER diff --git a/absl/synchronization/internal/stdcpp_waiter.h b/absl/synchronization/internal/stdcpp_waiter.h new file mode 100644 index 00000000..e592a27b --- /dev/null +++ b/absl/synchronization/internal/stdcpp_waiter.h @@ -0,0 +1,56 @@ +// Copyright 2023 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef ABSL_SYNCHRONIZATION_INTERNAL_STDCPP_WAITER_H_ +#define ABSL_SYNCHRONIZATION_INTERNAL_STDCPP_WAITER_H_ + +#include <condition_variable> // NOLINT(build/c++11) +#include <mutex> // NOLINT(build/c++11) + +#include "absl/base/config.h" +#include "absl/synchronization/internal/kernel_timeout.h" +#include "absl/synchronization/internal/waiter_base.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +#define ABSL_INTERNAL_HAVE_STDCPP_WAITER 1 + +class StdcppWaiter : public WaiterCrtp<StdcppWaiter> { + public: + StdcppWaiter(); + + bool Wait(KernelTimeout t); + void Post(); + void Poke(); + + static constexpr char kName[] = "StdcppWaiter"; + + private: + // REQUIRES: mu_ must be held. + void InternalCondVarPoke(); + + std::mutex mu_; + std::condition_variable cv_; + int waiter_count_; + int wakeup_count_; // Unclaimed wakeups. +}; + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl + +#endif // ABSL_SYNCHRONIZATION_INTERNAL_STDCPP_WAITER_H_ diff --git a/absl/synchronization/internal/waiter.cc b/absl/synchronization/internal/waiter.cc deleted file mode 100644 index f2051d67..00000000 --- a/absl/synchronization/internal/waiter.cc +++ /dev/null @@ -1,403 +0,0 @@ -// Copyright 2017 The Abseil Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "absl/synchronization/internal/waiter.h" - -#include "absl/base/config.h" - -#ifdef _WIN32 -#include <windows.h> -#else -#include <pthread.h> -#include <sys/time.h> -#include <unistd.h> -#endif - -#ifdef __linux__ -#include <linux/futex.h> -#include <sys/syscall.h> -#endif - -#ifdef ABSL_HAVE_SEMAPHORE_H -#include <semaphore.h> -#endif - -#include <errno.h> -#include <stdio.h> -#include <time.h> - -#include <atomic> -#include <cassert> -#include <cstdint> -#include <new> -#include <type_traits> - -#include "absl/base/internal/raw_logging.h" -#include "absl/base/internal/thread_identity.h" -#include "absl/base/optimization.h" -#include "absl/synchronization/internal/kernel_timeout.h" - - -namespace absl { -ABSL_NAMESPACE_BEGIN -namespace synchronization_internal { - -static void MaybeBecomeIdle() { - base_internal::ThreadIdentity *identity = - base_internal::CurrentThreadIdentityIfPresent(); - assert(identity != nullptr); - const bool is_idle = identity->is_idle.load(std::memory_order_relaxed); - const int ticker = identity->ticker.load(std::memory_order_relaxed); - const int wait_start = identity->wait_start.load(std::memory_order_relaxed); - if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) { - identity->is_idle.store(true, std::memory_order_relaxed); - } -} - -#if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX - -Waiter::Waiter() { - futex_.store(0, std::memory_order_relaxed); -} - -bool Waiter::Wait(KernelTimeout t) { - // Loop until we can atomically decrement futex from a positive - // value, waiting on a futex while we believe it is zero. - // Note that, since the thread ticker is just reset, we don't need to check - // whether the thread is idle on the very first pass of the loop. - bool first_pass = true; - - while (true) { - int32_t x = futex_.load(std::memory_order_relaxed); - while (x != 0) { - if (!futex_.compare_exchange_weak(x, x - 1, - std::memory_order_acquire, - std::memory_order_relaxed)) { - continue; // Raced with someone, retry. - } - return true; // Consumed a wakeup, we are done. - } - - if (!first_pass) MaybeBecomeIdle(); - const int err = Futex::WaitUntil(&futex_, 0, t); - if (err != 0) { - if (err == -EINTR || err == -EWOULDBLOCK) { - // Do nothing, the loop will retry. - } else if (err == -ETIMEDOUT) { - return false; - } else { - ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err); - } - } - first_pass = false; - } -} - -void Waiter::Post() { - if (futex_.fetch_add(1, std::memory_order_release) == 0) { - // We incremented from 0, need to wake a potential waiter. - Poke(); - } -} - -void Waiter::Poke() { - // Wake one thread waiting on the futex. - const int err = Futex::Wake(&futex_, 1); - if (ABSL_PREDICT_FALSE(err < 0)) { - ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err); - } -} - -#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR - -class PthreadMutexHolder { - public: - explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) { - const int err = pthread_mutex_lock(mu_); - if (err != 0) { - ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err); - } - } - - PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete; - PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete; - - ~PthreadMutexHolder() { - const int err = pthread_mutex_unlock(mu_); - if (err != 0) { - ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err); - } - } - - private: - pthread_mutex_t *mu_; -}; - -Waiter::Waiter() { - const int err = pthread_mutex_init(&mu_, 0); - if (err != 0) { - ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err); - } - - const int err2 = pthread_cond_init(&cv_, 0); - if (err2 != 0) { - ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2); - } - - waiter_count_ = 0; - wakeup_count_ = 0; -} - -bool Waiter::Wait(KernelTimeout t) { - struct timespec abs_timeout; - if (t.has_timeout()) { - abs_timeout = t.MakeAbsTimespec(); - } - - PthreadMutexHolder h(&mu_); - ++waiter_count_; - // Loop until we find a wakeup to consume or timeout. - // Note that, since the thread ticker is just reset, we don't need to check - // whether the thread is idle on the very first pass of the loop. - bool first_pass = true; - while (wakeup_count_ == 0) { - if (!first_pass) MaybeBecomeIdle(); - // No wakeups available, time to wait. - if (!t.has_timeout()) { - const int err = pthread_cond_wait(&cv_, &mu_); - if (err != 0) { - ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err); - } - } else { - const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout); - if (err == ETIMEDOUT) { - --waiter_count_; - return false; - } - if (err != 0) { - ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err); - } - } - first_pass = false; - } - // Consume a wakeup and we're done. - --wakeup_count_; - --waiter_count_; - return true; -} - -void Waiter::Post() { - PthreadMutexHolder h(&mu_); - ++wakeup_count_; - InternalCondVarPoke(); -} - -void Waiter::Poke() { - PthreadMutexHolder h(&mu_); - InternalCondVarPoke(); -} - -void Waiter::InternalCondVarPoke() { - if (waiter_count_ != 0) { - const int err = pthread_cond_signal(&cv_); - if (ABSL_PREDICT_FALSE(err != 0)) { - ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err); - } - } -} - -#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM - -Waiter::Waiter() { - if (sem_init(&sem_, 0, 0) != 0) { - ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno); - } - wakeups_.store(0, std::memory_order_relaxed); -} - -bool Waiter::Wait(KernelTimeout t) { - struct timespec abs_timeout; - if (t.has_timeout()) { - abs_timeout = t.MakeAbsTimespec(); - } - - // Loop until we timeout or consume a wakeup. - // Note that, since the thread ticker is just reset, we don't need to check - // whether the thread is idle on the very first pass of the loop. - bool first_pass = true; - while (true) { - int x = wakeups_.load(std::memory_order_relaxed); - while (x != 0) { - if (!wakeups_.compare_exchange_weak(x, x - 1, - std::memory_order_acquire, - std::memory_order_relaxed)) { - continue; // Raced with someone, retry. - } - // Successfully consumed a wakeup, we're done. - return true; - } - - if (!first_pass) MaybeBecomeIdle(); - // Nothing to consume, wait (looping on EINTR). - while (true) { - if (!t.has_timeout()) { - if (sem_wait(&sem_) == 0) break; - if (errno == EINTR) continue; - ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno); - } else { - if (sem_timedwait(&sem_, &abs_timeout) == 0) break; - if (errno == EINTR) continue; - if (errno == ETIMEDOUT) return false; - ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno); - } - } - first_pass = false; - } -} - -void Waiter::Post() { - // Post a wakeup. - if (wakeups_.fetch_add(1, std::memory_order_release) == 0) { - // We incremented from 0, need to wake a potential waiter. - Poke(); - } -} - -void Waiter::Poke() { - if (sem_post(&sem_) != 0) { // Wake any semaphore waiter. - ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno); - } -} - -#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32 - -class Waiter::WinHelper { - public: - static SRWLOCK *GetLock(Waiter *w) { - return reinterpret_cast<SRWLOCK *>(&w->mu_storage_); - } - - static CONDITION_VARIABLE *GetCond(Waiter *w) { - return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_); - } - - static_assert(sizeof(SRWLOCK) == sizeof(void *), - "`mu_storage_` does not have the same size as SRWLOCK"); - static_assert(alignof(SRWLOCK) == alignof(void *), - "`mu_storage_` does not have the same alignment as SRWLOCK"); - - static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *), - "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size " - "as `CONDITION_VARIABLE`"); - static_assert( - alignof(CONDITION_VARIABLE) == alignof(void *), - "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`"); - - // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible - // and destructible because we never call their constructors or destructors. - static_assert(std::is_trivially_constructible<SRWLOCK>::value, - "The `SRWLOCK` type must be trivially constructible"); - static_assert( - std::is_trivially_constructible<CONDITION_VARIABLE>::value, - "The `CONDITION_VARIABLE` type must be trivially constructible"); - static_assert(std::is_trivially_destructible<SRWLOCK>::value, - "The `SRWLOCK` type must be trivially destructible"); - static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value, - "The `CONDITION_VARIABLE` type must be trivially destructible"); -}; - -class LockHolder { - public: - explicit LockHolder(SRWLOCK* mu) : mu_(mu) { - AcquireSRWLockExclusive(mu_); - } - - LockHolder(const LockHolder&) = delete; - LockHolder& operator=(const LockHolder&) = delete; - - ~LockHolder() { - ReleaseSRWLockExclusive(mu_); - } - - private: - SRWLOCK* mu_; -}; - -Waiter::Waiter() { - auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK; - auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE; - InitializeSRWLock(mu); - InitializeConditionVariable(cv); - waiter_count_ = 0; - wakeup_count_ = 0; -} - -bool Waiter::Wait(KernelTimeout t) { - SRWLOCK *mu = WinHelper::GetLock(this); - CONDITION_VARIABLE *cv = WinHelper::GetCond(this); - - LockHolder h(mu); - ++waiter_count_; - - // Loop until we find a wakeup to consume or timeout. - // Note that, since the thread ticker is just reset, we don't need to check - // whether the thread is idle on the very first pass of the loop. - bool first_pass = true; - while (wakeup_count_ == 0) { - if (!first_pass) MaybeBecomeIdle(); - // No wakeups available, time to wait. - if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) { - // GetLastError() returns a Win32 DWORD, but we assign to - // unsigned long to simplify the ABSL_RAW_LOG case below. The uniform - // initialization guarantees this is not a narrowing conversion. - const unsigned long err{GetLastError()}; // NOLINT(runtime/int) - if (err == ERROR_TIMEOUT) { - --waiter_count_; - return false; - } else { - ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err); - } - } - first_pass = false; - } - // Consume a wakeup and we're done. - --wakeup_count_; - --waiter_count_; - return true; -} - -void Waiter::Post() { - LockHolder h(WinHelper::GetLock(this)); - ++wakeup_count_; - InternalCondVarPoke(); -} - -void Waiter::Poke() { - LockHolder h(WinHelper::GetLock(this)); - InternalCondVarPoke(); -} - -void Waiter::InternalCondVarPoke() { - if (waiter_count_ != 0) { - WakeConditionVariable(WinHelper::GetCond(this)); - } -} - -#else -#error Unknown ABSL_WAITER_MODE -#endif - -} // namespace synchronization_internal -ABSL_NAMESPACE_END -} // namespace absl diff --git a/absl/synchronization/internal/waiter.h b/absl/synchronization/internal/waiter.h index b8adfeb5..1a8b0b83 100644 --- a/absl/synchronization/internal/waiter.h +++ b/absl/synchronization/internal/waiter.h @@ -17,142 +17,48 @@ #define ABSL_SYNCHRONIZATION_INTERNAL_WAITER_H_ #include "absl/base/config.h" - -#ifdef _WIN32 -#include <sdkddkver.h> -#else -#include <pthread.h> -#endif - -#ifdef __linux__ -#include <linux/futex.h> -#endif - -#ifdef ABSL_HAVE_SEMAPHORE_H -#include <semaphore.h> -#endif - -#include <atomic> -#include <cstdint> - -#include "absl/base/internal/thread_identity.h" -#include "absl/synchronization/internal/futex.h" -#include "absl/synchronization/internal/kernel_timeout.h" +#include "absl/synchronization/internal/futex_waiter.h" +#include "absl/synchronization/internal/pthread_waiter.h" +#include "absl/synchronization/internal/sem_waiter.h" +#include "absl/synchronization/internal/stdcpp_waiter.h" +#include "absl/synchronization/internal/win32_waiter.h" // May be chosen at compile time via -DABSL_FORCE_WAITER_MODE=<index> #define ABSL_WAITER_MODE_FUTEX 0 #define ABSL_WAITER_MODE_SEM 1 #define ABSL_WAITER_MODE_CONDVAR 2 #define ABSL_WAITER_MODE_WIN32 3 +#define ABSL_WAITER_MODE_STDCPP 4 #if defined(ABSL_FORCE_WAITER_MODE) #define ABSL_WAITER_MODE ABSL_FORCE_WAITER_MODE -#elif defined(_WIN32) && _WIN32_WINNT >= _WIN32_WINNT_VISTA +#elif defined(ABSL_INTERNAL_HAVE_WIN32_WAITER) #define ABSL_WAITER_MODE ABSL_WAITER_MODE_WIN32 -#elif defined(ABSL_INTERNAL_HAVE_FUTEX) +#elif defined(ABSL_INTERNAL_HAVE_FUTEX_WAITER) #define ABSL_WAITER_MODE ABSL_WAITER_MODE_FUTEX -#elif defined(ABSL_HAVE_SEMAPHORE_H) +#elif defined(ABSL_INTERNAL_HAVE_SEM_WAITER) #define ABSL_WAITER_MODE ABSL_WAITER_MODE_SEM -#else +#elif defined(ABSL_INTERNAL_HAVE_PTHREAD_WAITER) #define ABSL_WAITER_MODE ABSL_WAITER_MODE_CONDVAR +#else +#error ABSL_WAITER_MODE is undefined #endif namespace absl { ABSL_NAMESPACE_BEGIN namespace synchronization_internal { -// Waiter is an OS-specific semaphore. -class Waiter { - public: - // Prepare any data to track waits. - Waiter(); - - // Not copyable or movable - Waiter(const Waiter&) = delete; - Waiter& operator=(const Waiter&) = delete; - - // Blocks the calling thread until a matching call to `Post()` or - // `t` has passed. Returns `true` if woken (`Post()` called), - // `false` on timeout. - bool Wait(KernelTimeout t); - - // Restart the caller of `Wait()` as with a normal semaphore. - void Post(); - - // If anyone is waiting, wake them up temporarily and cause them to - // call `MaybeBecomeIdle()`. They will then return to waiting for a - // `Post()` or timeout. - void Poke(); - - // Returns the Waiter associated with the identity. - static Waiter* GetWaiter(base_internal::ThreadIdentity* identity) { - static_assert( - sizeof(Waiter) <= sizeof(base_internal::ThreadIdentity::WaiterState), - "Insufficient space for Waiter"); - return reinterpret_cast<Waiter*>(identity->waiter_state.data); - } - - // How many periods to remain idle before releasing resources -#ifndef ABSL_HAVE_THREAD_SANITIZER - static constexpr int kIdlePeriods = 60; -#else - // Memory consumption under ThreadSanitizer is a serious concern, - // so we release resources sooner. The value of 1 leads to 1 to 2 second - // delay before marking a thread as idle. - static const int kIdlePeriods = 1; -#endif - - private: - // The destructor must not be called since Mutex/CondVar - // can use PerThreadSem/Waiter after the thread exits. - // Waiter objects are embedded in ThreadIdentity objects, - // which are reused via a freelist and are never destroyed. - ~Waiter() = delete; - #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX - // Futexes are defined by specification to be 32-bits. - // Thus std::atomic<int32_t> must be just an int32_t with lockfree methods. - std::atomic<int32_t> futex_; - static_assert(sizeof(int32_t) == sizeof(futex_), "Wrong size for futex"); - -#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR - // REQUIRES: mu_ must be held. - void InternalCondVarPoke(); - - pthread_mutex_t mu_; - pthread_cond_t cv_; - int waiter_count_; - int wakeup_count_; // Unclaimed wakeups. - +using Waiter = FutexWaiter; #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM - sem_t sem_; - // This seems superfluous, but for Poke() we need to cause spurious - // wakeups on the semaphore. Hence we can't actually use the - // semaphore's count. - std::atomic<int> wakeups_; - +using Waiter = SemWaiter; +#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR +using Waiter = PthreadWaiter; #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32 - // WinHelper - Used to define utilities for accessing the lock and - // condition variable storage once the types are complete. - class WinHelper; - - // REQUIRES: WinHelper::GetLock(this) must be held. - void InternalCondVarPoke(); - - // We can't include Windows.h in our headers, so we use aligned character - // buffers to define the storage of SRWLOCK and CONDITION_VARIABLE. - // SRW locks and condition variables do not need to be explicitly destroyed. - // https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock - // https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with - alignas(void*) unsigned char mu_storage_[sizeof(void*)]; - alignas(void*) unsigned char cv_storage_[sizeof(void*)]; - int waiter_count_; - int wakeup_count_; - -#else - #error Unknown ABSL_WAITER_MODE +using Waiter = Win32Waiter; +#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_STDCPP +using Waiter = StdcppWaiter; #endif -}; } // namespace synchronization_internal ABSL_NAMESPACE_END diff --git a/absl/synchronization/internal/waiter_base.cc b/absl/synchronization/internal/waiter_base.cc new file mode 100644 index 00000000..46928b40 --- /dev/null +++ b/absl/synchronization/internal/waiter_base.cc @@ -0,0 +1,42 @@ +// Copyright 2023 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "absl/synchronization/internal/waiter_base.h" + +#include "absl/base/config.h" +#include "absl/base/internal/thread_identity.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +#ifdef ABSL_INTERNAL_NEED_REDUNDANT_CONSTEXPR_DECL +constexpr int WaiterBase::kIdlePeriods; +#endif + +void WaiterBase::MaybeBecomeIdle() { + base_internal::ThreadIdentity *identity = + base_internal::CurrentThreadIdentityIfPresent(); + assert(identity != nullptr); + const bool is_idle = identity->is_idle.load(std::memory_order_relaxed); + const int ticker = identity->ticker.load(std::memory_order_relaxed); + const int wait_start = identity->wait_start.load(std::memory_order_relaxed); + if (!is_idle && ticker - wait_start > kIdlePeriods) { + identity->is_idle.store(true, std::memory_order_relaxed); + } +} + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl diff --git a/absl/synchronization/internal/waiter_base.h b/absl/synchronization/internal/waiter_base.h new file mode 100644 index 00000000..cf175481 --- /dev/null +++ b/absl/synchronization/internal/waiter_base.h @@ -0,0 +1,90 @@ +// Copyright 2023 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef ABSL_SYNCHRONIZATION_INTERNAL_WAITER_BASE_H_ +#define ABSL_SYNCHRONIZATION_INTERNAL_WAITER_BASE_H_ + +#include "absl/base/config.h" +#include "absl/base/internal/thread_identity.h" +#include "absl/synchronization/internal/kernel_timeout.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +// `Waiter` is a platform specific semaphore implementation that `PerThreadSem` +// waits on to implement blocking in `absl::Mutex`. Implementations should +// inherit from `WaiterCrtp` and must implement `Wait()`, `Post()`, and `Poke()` +// as described in `WaiterBase`. `waiter.h` selects the implementation and uses +// static-dispatch for performance. +class WaiterBase { + public: + WaiterBase() = default; + + // Not copyable or movable + WaiterBase(const WaiterBase&) = delete; + WaiterBase& operator=(const WaiterBase&) = delete; + + // Blocks the calling thread until a matching call to `Post()` or + // `t` has passed. Returns `true` if woken (`Post()` called), + // `false` on timeout. + // + // bool Wait(KernelTimeout t); + + // Restart the caller of `Wait()` as with a normal semaphore. + // + // void Post(); + + // If anyone is waiting, wake them up temporarily and cause them to + // call `MaybeBecomeIdle()`. They will then return to waiting for a + // `Post()` or timeout. + // + // void Poke(); + + // Returns the name of this implementation. Used only for debugging. + // + // static constexpr char kName[]; + + // How many periods to remain idle before releasing resources +#ifndef ABSL_HAVE_THREAD_SANITIZER + static constexpr int kIdlePeriods = 60; +#else + // Memory consumption under ThreadSanitizer is a serious concern, + // so we release resources sooner. The value of 1 leads to 1 to 2 second + // delay before marking a thread as idle. + static constexpr int kIdlePeriods = 1; +#endif + + protected: + static void MaybeBecomeIdle(); +}; + +template <typename T> +class WaiterCrtp : public WaiterBase { + public: + // Returns the Waiter associated with the identity. + static T* GetWaiter(base_internal::ThreadIdentity* identity) { + static_assert( + sizeof(T) <= sizeof(base_internal::ThreadIdentity::WaiterState), + "Insufficient space for Waiter"); + return reinterpret_cast<T*>(identity->waiter_state.data); + } +}; + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl + +#endif // ABSL_SYNCHRONIZATION_INTERNAL_WAITER_BASE_H_ diff --git a/absl/synchronization/internal/waiter_test.cc b/absl/synchronization/internal/waiter_test.cc new file mode 100644 index 00000000..992db29b --- /dev/null +++ b/absl/synchronization/internal/waiter_test.cc @@ -0,0 +1,180 @@ +// Copyright 2023 The Abseil Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "absl/synchronization/internal/waiter.h" + +#include <ctime> +#include <iostream> +#include <ostream> + +#include "absl/base/config.h" +#include "absl/random/random.h" +#include "absl/synchronization/internal/create_thread_identity.h" +#include "absl/synchronization/internal/futex_waiter.h" +#include "absl/synchronization/internal/kernel_timeout.h" +#include "absl/synchronization/internal/pthread_waiter.h" +#include "absl/synchronization/internal/sem_waiter.h" +#include "absl/synchronization/internal/stdcpp_waiter.h" +#include "absl/synchronization/internal/thread_pool.h" +#include "absl/synchronization/internal/win32_waiter.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "gtest/gtest.h" + +// Test go/btm support by randomizing the value of clock_gettime() for +// CLOCK_MONOTONIC. This works by overriding a weak symbol in glibc. +// We should be resistant to this randomization when !SupportsSteadyClock(). +#if defined(__GOOGLE_GRTE_VERSION__) && \ + !defined(ABSL_HAVE_ADDRESS_SANITIZER) && \ + !defined(ABSL_HAVE_MEMORY_SANITIZER) && \ + !defined(ABSL_HAVE_THREAD_SANITIZER) +extern "C" int __clock_gettime(clockid_t c, struct timespec* ts); + +extern "C" int clock_gettime(clockid_t c, struct timespec* ts) { + if (c == CLOCK_MONOTONIC && + !absl::synchronization_internal::KernelTimeout::SupportsSteadyClock()) { + absl::SharedBitGen gen; + ts->tv_sec = absl::Uniform(gen, 0, 1'000'000'000); + ts->tv_nsec = absl::Uniform(gen, 0, 1'000'000'000); + return 0; + } + return __clock_gettime(c, ts); +} +#endif + +namespace { + +TEST(Waiter, PrintPlatformImplementation) { + // Allows us to verify that the platform is using the expected implementation. + std::cout << absl::synchronization_internal::Waiter::kName << std::endl; +} + +template <typename T> +class WaiterTest : public ::testing::Test { + public: + // Waiter implementations assume that a ThreadIdentity has already been + // created. + WaiterTest() { + absl::synchronization_internal::GetOrCreateCurrentThreadIdentity(); + } +}; + +TYPED_TEST_SUITE_P(WaiterTest); + +absl::Duration WithTolerance(absl::Duration d) { return d * 0.95; } + +TYPED_TEST_P(WaiterTest, WaitNoTimeout) { + absl::synchronization_internal::ThreadPool tp(1); + TypeParam waiter; + tp.Schedule([&]() { + // Include some `Poke()` calls to ensure they don't cause `waiter` to return + // from `Wait()`. + waiter.Poke(); + absl::SleepFor(absl::Seconds(1)); + waiter.Poke(); + absl::SleepFor(absl::Seconds(1)); + waiter.Post(); + }); + absl::Time start = absl::Now(); + EXPECT_TRUE( + waiter.Wait(absl::synchronization_internal::KernelTimeout::Never())); + absl::Duration waited = absl::Now() - start; + EXPECT_GE(waited, WithTolerance(absl::Seconds(2))); +} + +TYPED_TEST_P(WaiterTest, WaitDurationWoken) { + absl::synchronization_internal::ThreadPool tp(1); + TypeParam waiter; + tp.Schedule([&]() { + // Include some `Poke()` calls to ensure they don't cause `waiter` to return + // from `Wait()`. + waiter.Poke(); + absl::SleepFor(absl::Milliseconds(500)); + waiter.Post(); + }); + absl::Time start = absl::Now(); + EXPECT_TRUE(waiter.Wait( + absl::synchronization_internal::KernelTimeout(absl::Seconds(10)))); + absl::Duration waited = absl::Now() - start; + EXPECT_GE(waited, WithTolerance(absl::Milliseconds(500))); + EXPECT_LT(waited, absl::Seconds(2)); +} + +TYPED_TEST_P(WaiterTest, WaitTimeWoken) { + absl::synchronization_internal::ThreadPool tp(1); + TypeParam waiter; + tp.Schedule([&]() { + // Include some `Poke()` calls to ensure they don't cause `waiter` to return + // from `Wait()`. + waiter.Poke(); + absl::SleepFor(absl::Milliseconds(500)); + waiter.Post(); + }); + absl::Time start = absl::Now(); + EXPECT_TRUE(waiter.Wait(absl::synchronization_internal::KernelTimeout( + start + absl::Seconds(10)))); + absl::Duration waited = absl::Now() - start; + EXPECT_GE(waited, WithTolerance(absl::Milliseconds(500))); + EXPECT_LT(waited, absl::Seconds(2)); +} + +TYPED_TEST_P(WaiterTest, WaitDurationReached) { + TypeParam waiter; + absl::Time start = absl::Now(); + EXPECT_FALSE(waiter.Wait( + absl::synchronization_internal::KernelTimeout(absl::Milliseconds(500)))); + absl::Duration waited = absl::Now() - start; + EXPECT_GE(waited, WithTolerance(absl::Milliseconds(500))); + EXPECT_LT(waited, absl::Seconds(1)); +} + +TYPED_TEST_P(WaiterTest, WaitTimeReached) { + TypeParam waiter; + absl::Time start = absl::Now(); + EXPECT_FALSE(waiter.Wait(absl::synchronization_internal::KernelTimeout( + start + absl::Milliseconds(500)))); + absl::Duration waited = absl::Now() - start; + EXPECT_GE(waited, WithTolerance(absl::Milliseconds(500))); + EXPECT_LT(waited, absl::Seconds(1)); +} + +REGISTER_TYPED_TEST_SUITE_P(WaiterTest, + WaitNoTimeout, + WaitDurationWoken, + WaitTimeWoken, + WaitDurationReached, + WaitTimeReached); + +#ifdef ABSL_INTERNAL_HAVE_FUTEX_WAITER +INSTANTIATE_TYPED_TEST_SUITE_P(Futex, WaiterTest, + absl::synchronization_internal::FutexWaiter); +#endif +#ifdef ABSL_INTERNAL_HAVE_PTHREAD_WAITER +INSTANTIATE_TYPED_TEST_SUITE_P(Pthread, WaiterTest, + absl::synchronization_internal::PthreadWaiter); +#endif +#ifdef ABSL_INTERNAL_HAVE_SEM_WAITER +INSTANTIATE_TYPED_TEST_SUITE_P(Sem, WaiterTest, + absl::synchronization_internal::SemWaiter); +#endif +#ifdef ABSL_INTERNAL_HAVE_WIN32_WAITER +INSTANTIATE_TYPED_TEST_SUITE_P(Win32, WaiterTest, + absl::synchronization_internal::Win32Waiter); +#endif +#ifdef ABSL_INTERNAL_HAVE_STDCPP_WAITER +INSTANTIATE_TYPED_TEST_SUITE_P(Stdcpp, WaiterTest, + absl::synchronization_internal::StdcppWaiter); +#endif + +} // namespace diff --git a/absl/synchronization/internal/win32_waiter.cc b/absl/synchronization/internal/win32_waiter.cc new file mode 100644 index 00000000..bd95ff08 --- /dev/null +++ b/absl/synchronization/internal/win32_waiter.cc @@ -0,0 +1,151 @@ +// Copyright 2023 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "absl/synchronization/internal/win32_waiter.h" + +#ifdef ABSL_INTERNAL_HAVE_WIN32_WAITER + +#include <windows.h> + +#include "absl/base/config.h" +#include "absl/base/internal/raw_logging.h" +#include "absl/base/internal/thread_identity.h" +#include "absl/base/optimization.h" +#include "absl/synchronization/internal/kernel_timeout.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +#ifdef ABSL_INTERNAL_NEED_REDUNDANT_CONSTEXPR_DECL +constexpr char Win32Waiter::kName[]; +#endif + +class Win32Waiter::WinHelper { + public: + static SRWLOCK *GetLock(Win32Waiter *w) { + return reinterpret_cast<SRWLOCK *>(&w->mu_storage_); + } + + static CONDITION_VARIABLE *GetCond(Win32Waiter *w) { + return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_); + } + + static_assert(sizeof(SRWLOCK) == sizeof(void *), + "`mu_storage_` does not have the same size as SRWLOCK"); + static_assert(alignof(SRWLOCK) == alignof(void *), + "`mu_storage_` does not have the same alignment as SRWLOCK"); + + static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *), + "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size " + "as `CONDITION_VARIABLE`"); + static_assert( + alignof(CONDITION_VARIABLE) == alignof(void *), + "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`"); + + // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible + // and destructible because we never call their constructors or destructors. + static_assert(std::is_trivially_constructible<SRWLOCK>::value, + "The `SRWLOCK` type must be trivially constructible"); + static_assert( + std::is_trivially_constructible<CONDITION_VARIABLE>::value, + "The `CONDITION_VARIABLE` type must be trivially constructible"); + static_assert(std::is_trivially_destructible<SRWLOCK>::value, + "The `SRWLOCK` type must be trivially destructible"); + static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value, + "The `CONDITION_VARIABLE` type must be trivially destructible"); +}; + +class LockHolder { + public: + explicit LockHolder(SRWLOCK* mu) : mu_(mu) { + AcquireSRWLockExclusive(mu_); + } + + LockHolder(const LockHolder&) = delete; + LockHolder& operator=(const LockHolder&) = delete; + + ~LockHolder() { + ReleaseSRWLockExclusive(mu_); + } + + private: + SRWLOCK* mu_; +}; + +Win32Waiter::Win32Waiter() { + auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK; + auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE; + InitializeSRWLock(mu); + InitializeConditionVariable(cv); + waiter_count_ = 0; + wakeup_count_ = 0; +} + +bool Win32Waiter::Wait(KernelTimeout t) { + SRWLOCK *mu = WinHelper::GetLock(this); + CONDITION_VARIABLE *cv = WinHelper::GetCond(this); + + LockHolder h(mu); + ++waiter_count_; + + // Loop until we find a wakeup to consume or timeout. + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; + while (wakeup_count_ == 0) { + if (!first_pass) MaybeBecomeIdle(); + // No wakeups available, time to wait. + if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) { + // GetLastError() returns a Win32 DWORD, but we assign to + // unsigned long to simplify the ABSL_RAW_LOG case below. The uniform + // initialization guarantees this is not a narrowing conversion. + const unsigned long err{GetLastError()}; // NOLINT(runtime/int) + if (err == ERROR_TIMEOUT) { + --waiter_count_; + return false; + } else { + ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err); + } + } + first_pass = false; + } + // Consume a wakeup and we're done. + --wakeup_count_; + --waiter_count_; + return true; +} + +void Win32Waiter::Post() { + LockHolder h(WinHelper::GetLock(this)); + ++wakeup_count_; + InternalCondVarPoke(); +} + +void Win32Waiter::Poke() { + LockHolder h(WinHelper::GetLock(this)); + InternalCondVarPoke(); +} + +void Win32Waiter::InternalCondVarPoke() { + if (waiter_count_ != 0) { + WakeConditionVariable(WinHelper::GetCond(this)); + } +} + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl + +#endif // ABSL_INTERNAL_HAVE_WIN32_WAITER diff --git a/absl/synchronization/internal/win32_waiter.h b/absl/synchronization/internal/win32_waiter.h new file mode 100644 index 00000000..87eb617c --- /dev/null +++ b/absl/synchronization/internal/win32_waiter.h @@ -0,0 +1,70 @@ +// Copyright 2023 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef ABSL_SYNCHRONIZATION_INTERNAL_WIN32_WAITER_H_ +#define ABSL_SYNCHRONIZATION_INTERNAL_WIN32_WAITER_H_ + +#ifdef _WIN32 +#include <sdkddkver.h> +#endif + +#if defined(_WIN32) && _WIN32_WINNT >= _WIN32_WINNT_VISTA + +#include "absl/base/config.h" +#include "absl/synchronization/internal/kernel_timeout.h" +#include "absl/synchronization/internal/waiter_base.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +#define ABSL_INTERNAL_HAVE_WIN32_WAITER 1 + +class Win32Waiter : public WaiterCrtp<Win32Waiter> { + public: + Win32Waiter(); + + bool Wait(KernelTimeout t); + void Post(); + void Poke(); + + static constexpr char kName[] = "Win32Waiter"; + + private: + // WinHelper - Used to define utilities for accessing the lock and + // condition variable storage once the types are complete. + class WinHelper; + + // REQUIRES: WinHelper::GetLock(this) must be held. + void InternalCondVarPoke(); + + // We can't include Windows.h in our headers, so we use aligned character + // buffers to define the storage of SRWLOCK and CONDITION_VARIABLE. + // SRW locks and condition variables do not need to be explicitly destroyed. + // https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock + // https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with + alignas(void*) unsigned char mu_storage_[sizeof(void*)]; + alignas(void*) unsigned char cv_storage_[sizeof(void*)]; + int waiter_count_; + int wakeup_count_; +}; + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl + +#endif // defined(_WIN32) && _WIN32_WINNT >= _WIN32_WINNT_VISTA + +#endif // ABSL_SYNCHRONIZATION_INTERNAL_WIN32_WAITER_H_ |