diff options
Diffstat (limited to 'absl/synchronization/internal/waiter.cc')
-rw-r--r-- | absl/synchronization/internal/waiter.cc | 211 |
1 files changed, 118 insertions, 93 deletions
diff --git a/absl/synchronization/internal/waiter.cc b/absl/synchronization/internal/waiter.cc index 17c6a506..2949f5a8 100644 --- a/absl/synchronization/internal/waiter.cc +++ b/absl/synchronization/internal/waiter.cc @@ -49,7 +49,7 @@ #include "absl/synchronization/internal/kernel_timeout.h" namespace absl { -inline namespace lts_2019_08_08 { +ABSL_NAMESPACE_BEGIN namespace synchronization_internal { static void MaybeBecomeIdle() { @@ -123,16 +123,21 @@ class Futex { } }; -void Waiter::Init() { +Waiter::Waiter() { futex_.store(0, std::memory_order_relaxed); } +Waiter::~Waiter() = default; + bool Waiter::Wait(KernelTimeout t) { // Loop until we can atomically decrement futex from a positive // value, waiting on a futex while we believe it is zero. + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; while (true) { int32_t x = futex_.load(std::memory_order_relaxed); - if (x != 0) { + while (x != 0) { if (!futex_.compare_exchange_weak(x, x - 1, std::memory_order_acquire, std::memory_order_relaxed)) { @@ -141,6 +146,8 @@ bool Waiter::Wait(KernelTimeout t) { return true; // Consumed a wakeup, we are done. } + + if (!first_pass) MaybeBecomeIdle(); const int err = Futex::WaitUntil(&futex_, 0, t); if (err != 0) { if (err == -EINTR || err == -EWOULDBLOCK) { @@ -151,14 +158,13 @@ bool Waiter::Wait(KernelTimeout t) { ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err); } } - - MaybeBecomeIdle(); + first_pass = false; } } void Waiter::Post() { if (futex_.fetch_add(1, std::memory_order_release) == 0) { - // We incremented from 0, need to wake a potential waker. + // We incremented from 0, need to wake a potential waiter. Poke(); } } @@ -196,7 +202,7 @@ class PthreadMutexHolder { pthread_mutex_t *mu_; }; -void Waiter::Init() { +Waiter::Waiter() { const int err = pthread_mutex_init(&mu_, 0); if (err != 0) { ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err); @@ -207,8 +213,20 @@ void Waiter::Init() { ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2); } - waiter_count_.store(0, std::memory_order_relaxed); - wakeup_count_.store(0, std::memory_order_relaxed); + waiter_count_ = 0; + wakeup_count_ = 0; +} + +Waiter::~Waiter() { + const int err = pthread_mutex_destroy(&mu_); + if (err != 0) { + ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err); + } + + const int err2 = pthread_cond_destroy(&cv_); + if (err2 != 0) { + ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2); + } } bool Waiter::Wait(KernelTimeout t) { @@ -218,21 +236,13 @@ bool Waiter::Wait(KernelTimeout t) { } PthreadMutexHolder h(&mu_); - waiter_count_.fetch_add(1, std::memory_order_relaxed); + ++waiter_count_; // Loop until we find a wakeup to consume or timeout. - while (true) { - int x = wakeup_count_.load(std::memory_order_relaxed); - if (x != 0) { - if (!wakeup_count_.compare_exchange_weak(x, x - 1, - std::memory_order_acquire, - std::memory_order_relaxed)) { - continue; // Raced with someone, retry. - } - // Successfully consumed a wakeup, we're done. - waiter_count_.fetch_sub(1, std::memory_order_relaxed); - return true; - } - + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; + while (wakeup_count_ == 0) { + if (!first_pass) MaybeBecomeIdle(); // No wakeups available, time to wait. if (!t.has_timeout()) { const int err = pthread_cond_wait(&cv_, &mu_); @@ -242,46 +252,56 @@ bool Waiter::Wait(KernelTimeout t) { } else { const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout); if (err == ETIMEDOUT) { - waiter_count_.fetch_sub(1, std::memory_order_relaxed); + --waiter_count_; return false; } if (err != 0) { - ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err); + ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err); } } - MaybeBecomeIdle(); + first_pass = false; } + // Consume a wakeup and we're done. + --wakeup_count_; + --waiter_count_; + return true; } void Waiter::Post() { - wakeup_count_.fetch_add(1, std::memory_order_release); - Poke(); + PthreadMutexHolder h(&mu_); + ++wakeup_count_; + InternalCondVarPoke(); } void Waiter::Poke() { - if (waiter_count_.load(std::memory_order_relaxed) == 0) { - return; - } - // Potentially a waker. Take the lock and check again. PthreadMutexHolder h(&mu_); - if (waiter_count_.load(std::memory_order_relaxed) == 0) { - return; - } - const int err = pthread_cond_signal(&cv_); - if (err != 0) { - ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err); + InternalCondVarPoke(); +} + +void Waiter::InternalCondVarPoke() { + if (waiter_count_ != 0) { + const int err = pthread_cond_signal(&cv_); + if (ABSL_PREDICT_FALSE(err != 0)) { + ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err); + } } } #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM -void Waiter::Init() { +Waiter::Waiter() { if (sem_init(&sem_, 0, 0) != 0) { ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno); } wakeups_.store(0, std::memory_order_relaxed); } +Waiter::~Waiter() { + if (sem_destroy(&sem_) != 0) { + ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno); + } +} + bool Waiter::Wait(KernelTimeout t) { struct timespec abs_timeout; if (t.has_timeout()) { @@ -289,9 +309,12 @@ bool Waiter::Wait(KernelTimeout t) { } // Loop until we timeout or consume a wakeup. + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; while (true) { int x = wakeups_.load(std::memory_order_relaxed); - if (x != 0) { + while (x != 0) { if (!wakeups_.compare_exchange_weak(x, x - 1, std::memory_order_acquire, std::memory_order_relaxed)) { @@ -301,6 +324,7 @@ bool Waiter::Wait(KernelTimeout t) { return true; } + if (!first_pass) MaybeBecomeIdle(); // Nothing to consume, wait (looping on EINTR). while (true) { if (!t.has_timeout()) { @@ -314,13 +338,16 @@ bool Waiter::Wait(KernelTimeout t) { ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno); } } - MaybeBecomeIdle(); + first_pass = false; } } void Waiter::Post() { - wakeups_.fetch_add(1, std::memory_order_release); // Post a wakeup. - Poke(); + // Post a wakeup. + if (wakeups_.fetch_add(1, std::memory_order_release) == 0) { + // We incremented from 0, need to wake a potential waiter. + Poke(); + } } void Waiter::Poke() { @@ -341,31 +368,29 @@ class Waiter::WinHelper { return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_); } - static_assert(sizeof(SRWLOCK) == sizeof(Waiter::SRWLockStorage), - "SRWLockStorage does not have the same size as SRWLOCK"); + static_assert(sizeof(SRWLOCK) == sizeof(void *), + "`mu_storage_` does not have the same size as SRWLOCK"); + static_assert(alignof(SRWLOCK) == alignof(void *), + "`mu_storage_` does not have the same alignment as SRWLOCK"); + + static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *), + "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size " + "as `CONDITION_VARIABLE`"); static_assert( - alignof(SRWLOCK) == alignof(Waiter::SRWLockStorage), - "SRWLockStorage does not have the same alignment as SRWLOCK"); - - static_assert(sizeof(CONDITION_VARIABLE) == - sizeof(Waiter::ConditionVariableStorage), - "ABSL_CONDITION_VARIABLE_STORAGE does not have the same size " - "as CONDITION_VARIABLE"); - static_assert(alignof(CONDITION_VARIABLE) == - alignof(Waiter::ConditionVariableStorage), - "ConditionVariableStorage does not have the same " - "alignment as CONDITION_VARIABLE"); - - // The SRWLOCK and CONDITION_VARIABLE types must be trivially constuctible + alignof(CONDITION_VARIABLE) == alignof(void *), + "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`"); + + // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible // and destructible because we never call their constructors or destructors. static_assert(std::is_trivially_constructible<SRWLOCK>::value, - "The SRWLOCK type must be trivially constructible"); - static_assert(std::is_trivially_constructible<CONDITION_VARIABLE>::value, - "The CONDITION_VARIABLE type must be trivially constructible"); + "The `SRWLOCK` type must be trivially constructible"); + static_assert( + std::is_trivially_constructible<CONDITION_VARIABLE>::value, + "The `CONDITION_VARIABLE` type must be trivially constructible"); static_assert(std::is_trivially_destructible<SRWLOCK>::value, - "The SRWLOCK type must be trivially destructible"); + "The `SRWLOCK` type must be trivially destructible"); static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value, - "The CONDITION_VARIABLE type must be trivially destructible"); + "The `CONDITION_VARIABLE` type must be trivially destructible"); }; class LockHolder { @@ -385,36 +410,33 @@ class LockHolder { SRWLOCK* mu_; }; -void Waiter::Init() { +Waiter::Waiter() { auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK; auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE; InitializeSRWLock(mu); InitializeConditionVariable(cv); - waiter_count_.store(0, std::memory_order_relaxed); - wakeup_count_.store(0, std::memory_order_relaxed); + waiter_count_ = 0; + wakeup_count_ = 0; } +// SRW locks and condition variables do not need to be explicitly destroyed. +// https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock +// https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with +Waiter::~Waiter() = default; + bool Waiter::Wait(KernelTimeout t) { SRWLOCK *mu = WinHelper::GetLock(this); CONDITION_VARIABLE *cv = WinHelper::GetCond(this); LockHolder h(mu); - waiter_count_.fetch_add(1, std::memory_order_relaxed); + ++waiter_count_; // Loop until we find a wakeup to consume or timeout. - while (true) { - int x = wakeup_count_.load(std::memory_order_relaxed); - if (x != 0) { - if (!wakeup_count_.compare_exchange_weak(x, x - 1, - std::memory_order_acquire, - std::memory_order_relaxed)) { - continue; // Raced with someone, retry. - } - // Successfully consumed a wakeup, we're done. - waiter_count_.fetch_sub(1, std::memory_order_relaxed); - return true; - } - + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; + while (wakeup_count_ == 0) { + if (!first_pass) MaybeBecomeIdle(); // No wakeups available, time to wait. if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) { // GetLastError() returns a Win32 DWORD, but we assign to @@ -422,32 +444,35 @@ bool Waiter::Wait(KernelTimeout t) { // initialization guarantees this is not a narrowing conversion. const unsigned long err{GetLastError()}; // NOLINT(runtime/int) if (err == ERROR_TIMEOUT) { - waiter_count_.fetch_sub(1, std::memory_order_relaxed); + --waiter_count_; return false; } else { ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err); } } - - MaybeBecomeIdle(); + first_pass = false; } + // Consume a wakeup and we're done. + --wakeup_count_; + --waiter_count_; + return true; } void Waiter::Post() { - wakeup_count_.fetch_add(1, std::memory_order_release); - Poke(); + LockHolder h(WinHelper::GetLock(this)); + ++wakeup_count_; + InternalCondVarPoke(); } void Waiter::Poke() { - if (waiter_count_.load(std::memory_order_relaxed) == 0) { - return; - } - // Potentially a waker. Take the lock and check again. LockHolder h(WinHelper::GetLock(this)); - if (waiter_count_.load(std::memory_order_relaxed) == 0) { - return; + InternalCondVarPoke(); +} + +void Waiter::InternalCondVarPoke() { + if (waiter_count_ != 0) { + WakeConditionVariable(WinHelper::GetCond(this)); } - WakeConditionVariable(WinHelper::GetCond(this)); } #else @@ -455,5 +480,5 @@ void Waiter::Poke() { #endif } // namespace synchronization_internal -} // inline namespace lts_2019_08_08 +ABSL_NAMESPACE_END } // namespace absl |