summaryrefslogtreecommitdiff
path: root/absl/synchronization/internal/waiter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'absl/synchronization/internal/waiter.cc')
-rw-r--r--absl/synchronization/internal/waiter.cc211
1 files changed, 118 insertions, 93 deletions
diff --git a/absl/synchronization/internal/waiter.cc b/absl/synchronization/internal/waiter.cc
index 17c6a506..2949f5a8 100644
--- a/absl/synchronization/internal/waiter.cc
+++ b/absl/synchronization/internal/waiter.cc
@@ -49,7 +49,7 @@
#include "absl/synchronization/internal/kernel_timeout.h"
namespace absl {
-inline namespace lts_2019_08_08 {
+ABSL_NAMESPACE_BEGIN
namespace synchronization_internal {
static void MaybeBecomeIdle() {
@@ -123,16 +123,21 @@ class Futex {
}
};
-void Waiter::Init() {
+Waiter::Waiter() {
futex_.store(0, std::memory_order_relaxed);
}
+Waiter::~Waiter() = default;
+
bool Waiter::Wait(KernelTimeout t) {
// Loop until we can atomically decrement futex from a positive
// value, waiting on a futex while we believe it is zero.
+ // Note that, since the thread ticker is just reset, we don't need to check
+ // whether the thread is idle on the very first pass of the loop.
+ bool first_pass = true;
while (true) {
int32_t x = futex_.load(std::memory_order_relaxed);
- if (x != 0) {
+ while (x != 0) {
if (!futex_.compare_exchange_weak(x, x - 1,
std::memory_order_acquire,
std::memory_order_relaxed)) {
@@ -141,6 +146,8 @@ bool Waiter::Wait(KernelTimeout t) {
return true; // Consumed a wakeup, we are done.
}
+
+ if (!first_pass) MaybeBecomeIdle();
const int err = Futex::WaitUntil(&futex_, 0, t);
if (err != 0) {
if (err == -EINTR || err == -EWOULDBLOCK) {
@@ -151,14 +158,13 @@ bool Waiter::Wait(KernelTimeout t) {
ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
}
}
-
- MaybeBecomeIdle();
+ first_pass = false;
}
}
void Waiter::Post() {
if (futex_.fetch_add(1, std::memory_order_release) == 0) {
- // We incremented from 0, need to wake a potential waker.
+ // We incremented from 0, need to wake a potential waiter.
Poke();
}
}
@@ -196,7 +202,7 @@ class PthreadMutexHolder {
pthread_mutex_t *mu_;
};
-void Waiter::Init() {
+Waiter::Waiter() {
const int err = pthread_mutex_init(&mu_, 0);
if (err != 0) {
ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
@@ -207,8 +213,20 @@ void Waiter::Init() {
ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
}
- waiter_count_.store(0, std::memory_order_relaxed);
- wakeup_count_.store(0, std::memory_order_relaxed);
+ waiter_count_ = 0;
+ wakeup_count_ = 0;
+}
+
+Waiter::~Waiter() {
+ const int err = pthread_mutex_destroy(&mu_);
+ if (err != 0) {
+ ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err);
+ }
+
+ const int err2 = pthread_cond_destroy(&cv_);
+ if (err2 != 0) {
+ ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2);
+ }
}
bool Waiter::Wait(KernelTimeout t) {
@@ -218,21 +236,13 @@ bool Waiter::Wait(KernelTimeout t) {
}
PthreadMutexHolder h(&mu_);
- waiter_count_.fetch_add(1, std::memory_order_relaxed);
+ ++waiter_count_;
// Loop until we find a wakeup to consume or timeout.
- while (true) {
- int x = wakeup_count_.load(std::memory_order_relaxed);
- if (x != 0) {
- if (!wakeup_count_.compare_exchange_weak(x, x - 1,
- std::memory_order_acquire,
- std::memory_order_relaxed)) {
- continue; // Raced with someone, retry.
- }
- // Successfully consumed a wakeup, we're done.
- waiter_count_.fetch_sub(1, std::memory_order_relaxed);
- return true;
- }
-
+ // Note that, since the thread ticker is just reset, we don't need to check
+ // whether the thread is idle on the very first pass of the loop.
+ bool first_pass = true;
+ while (wakeup_count_ == 0) {
+ if (!first_pass) MaybeBecomeIdle();
// No wakeups available, time to wait.
if (!t.has_timeout()) {
const int err = pthread_cond_wait(&cv_, &mu_);
@@ -242,46 +252,56 @@ bool Waiter::Wait(KernelTimeout t) {
} else {
const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
if (err == ETIMEDOUT) {
- waiter_count_.fetch_sub(1, std::memory_order_relaxed);
+ --waiter_count_;
return false;
}
if (err != 0) {
- ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
+ ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err);
}
}
- MaybeBecomeIdle();
+ first_pass = false;
}
+ // Consume a wakeup and we're done.
+ --wakeup_count_;
+ --waiter_count_;
+ return true;
}
void Waiter::Post() {
- wakeup_count_.fetch_add(1, std::memory_order_release);
- Poke();
+ PthreadMutexHolder h(&mu_);
+ ++wakeup_count_;
+ InternalCondVarPoke();
}
void Waiter::Poke() {
- if (waiter_count_.load(std::memory_order_relaxed) == 0) {
- return;
- }
- // Potentially a waker. Take the lock and check again.
PthreadMutexHolder h(&mu_);
- if (waiter_count_.load(std::memory_order_relaxed) == 0) {
- return;
- }
- const int err = pthread_cond_signal(&cv_);
- if (err != 0) {
- ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
+ InternalCondVarPoke();
+}
+
+void Waiter::InternalCondVarPoke() {
+ if (waiter_count_ != 0) {
+ const int err = pthread_cond_signal(&cv_);
+ if (ABSL_PREDICT_FALSE(err != 0)) {
+ ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
+ }
}
}
#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
-void Waiter::Init() {
+Waiter::Waiter() {
if (sem_init(&sem_, 0, 0) != 0) {
ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
}
wakeups_.store(0, std::memory_order_relaxed);
}
+Waiter::~Waiter() {
+ if (sem_destroy(&sem_) != 0) {
+ ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno);
+ }
+}
+
bool Waiter::Wait(KernelTimeout t) {
struct timespec abs_timeout;
if (t.has_timeout()) {
@@ -289,9 +309,12 @@ bool Waiter::Wait(KernelTimeout t) {
}
// Loop until we timeout or consume a wakeup.
+ // Note that, since the thread ticker is just reset, we don't need to check
+ // whether the thread is idle on the very first pass of the loop.
+ bool first_pass = true;
while (true) {
int x = wakeups_.load(std::memory_order_relaxed);
- if (x != 0) {
+ while (x != 0) {
if (!wakeups_.compare_exchange_weak(x, x - 1,
std::memory_order_acquire,
std::memory_order_relaxed)) {
@@ -301,6 +324,7 @@ bool Waiter::Wait(KernelTimeout t) {
return true;
}
+ if (!first_pass) MaybeBecomeIdle();
// Nothing to consume, wait (looping on EINTR).
while (true) {
if (!t.has_timeout()) {
@@ -314,13 +338,16 @@ bool Waiter::Wait(KernelTimeout t) {
ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
}
}
- MaybeBecomeIdle();
+ first_pass = false;
}
}
void Waiter::Post() {
- wakeups_.fetch_add(1, std::memory_order_release); // Post a wakeup.
- Poke();
+ // Post a wakeup.
+ if (wakeups_.fetch_add(1, std::memory_order_release) == 0) {
+ // We incremented from 0, need to wake a potential waiter.
+ Poke();
+ }
}
void Waiter::Poke() {
@@ -341,31 +368,29 @@ class Waiter::WinHelper {
return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
}
- static_assert(sizeof(SRWLOCK) == sizeof(Waiter::SRWLockStorage),
- "SRWLockStorage does not have the same size as SRWLOCK");
+ static_assert(sizeof(SRWLOCK) == sizeof(void *),
+ "`mu_storage_` does not have the same size as SRWLOCK");
+ static_assert(alignof(SRWLOCK) == alignof(void *),
+ "`mu_storage_` does not have the same alignment as SRWLOCK");
+
+ static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *),
+ "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size "
+ "as `CONDITION_VARIABLE`");
static_assert(
- alignof(SRWLOCK) == alignof(Waiter::SRWLockStorage),
- "SRWLockStorage does not have the same alignment as SRWLOCK");
-
- static_assert(sizeof(CONDITION_VARIABLE) ==
- sizeof(Waiter::ConditionVariableStorage),
- "ABSL_CONDITION_VARIABLE_STORAGE does not have the same size "
- "as CONDITION_VARIABLE");
- static_assert(alignof(CONDITION_VARIABLE) ==
- alignof(Waiter::ConditionVariableStorage),
- "ConditionVariableStorage does not have the same "
- "alignment as CONDITION_VARIABLE");
-
- // The SRWLOCK and CONDITION_VARIABLE types must be trivially constuctible
+ alignof(CONDITION_VARIABLE) == alignof(void *),
+ "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`");
+
+ // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible
// and destructible because we never call their constructors or destructors.
static_assert(std::is_trivially_constructible<SRWLOCK>::value,
- "The SRWLOCK type must be trivially constructible");
- static_assert(std::is_trivially_constructible<CONDITION_VARIABLE>::value,
- "The CONDITION_VARIABLE type must be trivially constructible");
+ "The `SRWLOCK` type must be trivially constructible");
+ static_assert(
+ std::is_trivially_constructible<CONDITION_VARIABLE>::value,
+ "The `CONDITION_VARIABLE` type must be trivially constructible");
static_assert(std::is_trivially_destructible<SRWLOCK>::value,
- "The SRWLOCK type must be trivially destructible");
+ "The `SRWLOCK` type must be trivially destructible");
static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value,
- "The CONDITION_VARIABLE type must be trivially destructible");
+ "The `CONDITION_VARIABLE` type must be trivially destructible");
};
class LockHolder {
@@ -385,36 +410,33 @@ class LockHolder {
SRWLOCK* mu_;
};
-void Waiter::Init() {
+Waiter::Waiter() {
auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
InitializeSRWLock(mu);
InitializeConditionVariable(cv);
- waiter_count_.store(0, std::memory_order_relaxed);
- wakeup_count_.store(0, std::memory_order_relaxed);
+ waiter_count_ = 0;
+ wakeup_count_ = 0;
}
+// SRW locks and condition variables do not need to be explicitly destroyed.
+// https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock
+// https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with
+Waiter::~Waiter() = default;
+
bool Waiter::Wait(KernelTimeout t) {
SRWLOCK *mu = WinHelper::GetLock(this);
CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
LockHolder h(mu);
- waiter_count_.fetch_add(1, std::memory_order_relaxed);
+ ++waiter_count_;
// Loop until we find a wakeup to consume or timeout.
- while (true) {
- int x = wakeup_count_.load(std::memory_order_relaxed);
- if (x != 0) {
- if (!wakeup_count_.compare_exchange_weak(x, x - 1,
- std::memory_order_acquire,
- std::memory_order_relaxed)) {
- continue; // Raced with someone, retry.
- }
- // Successfully consumed a wakeup, we're done.
- waiter_count_.fetch_sub(1, std::memory_order_relaxed);
- return true;
- }
-
+ // Note that, since the thread ticker is just reset, we don't need to check
+ // whether the thread is idle on the very first pass of the loop.
+ bool first_pass = true;
+ while (wakeup_count_ == 0) {
+ if (!first_pass) MaybeBecomeIdle();
// No wakeups available, time to wait.
if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
// GetLastError() returns a Win32 DWORD, but we assign to
@@ -422,32 +444,35 @@ bool Waiter::Wait(KernelTimeout t) {
// initialization guarantees this is not a narrowing conversion.
const unsigned long err{GetLastError()}; // NOLINT(runtime/int)
if (err == ERROR_TIMEOUT) {
- waiter_count_.fetch_sub(1, std::memory_order_relaxed);
+ --waiter_count_;
return false;
} else {
ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
}
}
-
- MaybeBecomeIdle();
+ first_pass = false;
}
+ // Consume a wakeup and we're done.
+ --wakeup_count_;
+ --waiter_count_;
+ return true;
}
void Waiter::Post() {
- wakeup_count_.fetch_add(1, std::memory_order_release);
- Poke();
+ LockHolder h(WinHelper::GetLock(this));
+ ++wakeup_count_;
+ InternalCondVarPoke();
}
void Waiter::Poke() {
- if (waiter_count_.load(std::memory_order_relaxed) == 0) {
- return;
- }
- // Potentially a waker. Take the lock and check again.
LockHolder h(WinHelper::GetLock(this));
- if (waiter_count_.load(std::memory_order_relaxed) == 0) {
- return;
+ InternalCondVarPoke();
+}
+
+void Waiter::InternalCondVarPoke() {
+ if (waiter_count_ != 0) {
+ WakeConditionVariable(WinHelper::GetCond(this));
}
- WakeConditionVariable(WinHelper::GetCond(this));
}
#else
@@ -455,5 +480,5 @@ void Waiter::Poke() {
#endif
} // namespace synchronization_internal
-} // inline namespace lts_2019_08_08
+ABSL_NAMESPACE_END
} // namespace absl