diff options
Diffstat (limited to 'absl/synchronization')
-rw-r--r-- | absl/synchronization/BUILD.bazel | 40 | ||||
-rw-r--r-- | absl/synchronization/CMakeLists.txt | 18 | ||||
-rw-r--r-- | absl/synchronization/blocking_counter.cc | 40 | ||||
-rw-r--r-- | absl/synchronization/blocking_counter.h | 8 | ||||
-rw-r--r-- | absl/synchronization/blocking_counter_benchmark.cc | 83 | ||||
-rw-r--r-- | absl/synchronization/blocking_counter_test.cc | 12 | ||||
-rw-r--r-- | absl/synchronization/internal/create_thread_identity.cc | 15 | ||||
-rw-r--r-- | absl/synchronization/internal/create_thread_identity.h | 4 | ||||
-rw-r--r-- | absl/synchronization/internal/per_thread_sem.cc | 4 | ||||
-rw-r--r-- | absl/synchronization/internal/per_thread_sem.h | 7 | ||||
-rw-r--r-- | absl/synchronization/internal/per_thread_sem_test.cc | 11 | ||||
-rw-r--r-- | absl/synchronization/internal/waiter.cc | 27 | ||||
-rw-r--r-- | absl/synchronization/internal/waiter.h | 14 | ||||
-rw-r--r-- | absl/synchronization/mutex.cc | 115 | ||||
-rw-r--r-- | absl/synchronization/mutex.h | 32 | ||||
-rw-r--r-- | absl/synchronization/mutex_benchmark.cc | 6 | ||||
-rw-r--r-- | absl/synchronization/mutex_test.cc | 81 | ||||
-rw-r--r-- | absl/synchronization/notification.h | 5 |
18 files changed, 357 insertions, 165 deletions
diff --git a/absl/synchronization/BUILD.bazel b/absl/synchronization/BUILD.bazel index 5ce16958..64d3b929 100644 --- a/absl/synchronization/BUILD.bazel +++ b/absl/synchronization/BUILD.bazel @@ -14,7 +14,6 @@ # limitations under the License. # -load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library", "cc_test") load( "//absl:copts/configure_copts.bzl", "ABSL_DEFAULT_COPTS", @@ -35,7 +34,9 @@ cc_library( hdrs = [ "internal/graphcycles.h", ], - copts = ABSL_DEFAULT_COPTS, + copts = ABSL_DEFAULT_COPTS + select({ + "//conditions:default": [], + }), linkopts = ABSL_DEFAULT_LINKOPTS, visibility = [ "//absl:__subpackages__", @@ -96,8 +97,8 @@ cc_library( deps = [ ":graphcycles_internal", ":kernel_timeout_internal", - "//absl/base", "//absl/base:atomic_hook", + "//absl/base", "//absl/base:base_internal", "//absl/base:config", "//absl/base:core_headers", @@ -107,7 +108,9 @@ cc_library( "//absl/debugging:stacktrace", "//absl/debugging:symbolize", "//absl/time", - ], + ] + select({ + "//conditions:default": [], + }), ) cc_test( @@ -116,6 +119,9 @@ cc_test( srcs = ["barrier_test.cc"], copts = ABSL_TEST_COPTS, linkopts = ABSL_DEFAULT_LINKOPTS, + tags = [ + "no_test_wasm", + ], deps = [ ":synchronization", "//absl/time", @@ -129,6 +135,9 @@ cc_test( srcs = ["blocking_counter_test.cc"], copts = ABSL_TEST_COPTS, linkopts = ABSL_DEFAULT_LINKOPTS, + tags = [ + "no_test_wasm", + ], deps = [ ":synchronization", "//absl/time", @@ -136,6 +145,21 @@ cc_test( ], ) +cc_binary( + name = "blocking_counter_benchmark", + testonly = 1, + srcs = ["blocking_counter_benchmark.cc"], + copts = ABSL_TEST_COPTS, + linkopts = ABSL_DEFAULT_LINKOPTS, + tags = ["benchmark"], + visibility = ["//visibility:private"], + deps = [ + ":synchronization", + ":thread_pool", + "@com_github_google_benchmark//:benchmark_main", + ], +) + cc_test( name = "graphcycles_test", size = "medium", @@ -264,6 +288,9 @@ cc_test( size = "medium", copts = ABSL_TEST_COPTS, linkopts = ABSL_DEFAULT_LINKOPTS, + tags = [ + "no_test_wasm", + ], deps = [ ":per_thread_sem_test_common", ":synchronization", @@ -280,7 +307,10 @@ cc_test( ], copts = ABSL_TEST_COPTS, linkopts = ABSL_DEFAULT_LINKOPTS, - tags = ["no_test_ios_x86_64"], + tags = [ + "no_test_ios_x86_64", + "no_test_wasm", + ], deps = [ ":synchronization", "//absl/base:core_headers", diff --git a/absl/synchronization/CMakeLists.txt b/absl/synchronization/CMakeLists.txt index e633d0bf..9335c264 100644 --- a/absl/synchronization/CMakeLists.txt +++ b/absl/synchronization/CMakeLists.txt @@ -14,6 +14,7 @@ # limitations under the License. # +# Internal-only target, do not depend on directly. absl_cc_library( NAME graphcycles_internal @@ -32,6 +33,7 @@ absl_cc_library( absl::raw_logging_internal ) +# Internal-only target, do not depend on directly. absl_cc_library( NAME kernel_timeout_internal @@ -95,7 +97,7 @@ absl_cc_test( DEPS absl::synchronization absl::time - gmock_main + GTest::gmock_main ) absl_cc_test( @@ -108,7 +110,7 @@ absl_cc_test( DEPS absl::synchronization absl::time - gmock_main + GTest::gmock_main ) absl_cc_test( @@ -122,9 +124,10 @@ absl_cc_test( absl::graphcycles_internal absl::core_headers absl::raw_logging_internal - gmock_main + GTest::gmock_main ) +# Internal-only target, do not depend on directly. absl_cc_library( NAME thread_pool @@ -154,7 +157,7 @@ absl_cc_test( absl::memory absl::raw_logging_internal absl::time - gmock_main + GTest::gmock_main ) absl_cc_test( @@ -167,9 +170,10 @@ absl_cc_test( DEPS absl::synchronization absl::time - gmock_main + GTest::gmock_main ) +# Internal-only target, do not depend on directly. absl_cc_library( NAME per_thread_sem_test_common @@ -183,7 +187,7 @@ absl_cc_library( absl::config absl::strings absl::time - gmock + GTest::gmock TESTONLY ) @@ -199,7 +203,7 @@ absl_cc_test( absl::synchronization absl::strings absl::time - gmock_main + GTest::gmock_main ) absl_cc_test( diff --git a/absl/synchronization/blocking_counter.cc b/absl/synchronization/blocking_counter.cc index 3cea7aed..d2f82da3 100644 --- a/absl/synchronization/blocking_counter.cc +++ b/absl/synchronization/blocking_counter.cc @@ -14,41 +14,51 @@ #include "absl/synchronization/blocking_counter.h" +#include <atomic> + #include "absl/base/internal/raw_logging.h" namespace absl { ABSL_NAMESPACE_BEGIN -// Return whether int *arg is zero. -static bool IsZero(void *arg) { - return 0 == *reinterpret_cast<int *>(arg); +namespace { + +// Return whether int *arg is true. +bool IsDone(void *arg) { return *reinterpret_cast<bool *>(arg); } + +} // namespace + +BlockingCounter::BlockingCounter(int initial_count) + : count_(initial_count), + num_waiting_(0), + done_{initial_count == 0 ? true : false} { + ABSL_RAW_CHECK(initial_count >= 0, "BlockingCounter initial_count negative"); } bool BlockingCounter::DecrementCount() { - MutexLock l(&lock_); - count_--; - if (count_ < 0) { - ABSL_RAW_LOG( - FATAL, - "BlockingCounter::DecrementCount() called too many times. count=%d", - count_); + int count = count_.fetch_sub(1, std::memory_order_acq_rel) - 1; + ABSL_RAW_CHECK(count >= 0, + "BlockingCounter::DecrementCount() called too many times"); + if (count == 0) { + MutexLock l(&lock_); + done_ = true; + return true; } - return count_ == 0; + return false; } void BlockingCounter::Wait() { MutexLock l(&this->lock_); - ABSL_RAW_CHECK(count_ >= 0, "BlockingCounter underflow"); // only one thread may call Wait(). To support more than one thread, // implement a counter num_to_exit, like in the Barrier class. ABSL_RAW_CHECK(num_waiting_ == 0, "multiple threads called Wait()"); num_waiting_++; - this->lock_.Await(Condition(IsZero, &this->count_)); + this->lock_.Await(Condition(IsDone, &this->done_)); - // At this point, We know that all threads executing DecrementCount have - // released the lock, and so will not touch this object again. + // At this point, we know that all threads executing DecrementCount + // will not touch this object again. // Therefore, the thread calling this method is free to delete the object // after we return from this method. } diff --git a/absl/synchronization/blocking_counter.h b/absl/synchronization/blocking_counter.h index 1f53f9f2..1908fdb1 100644 --- a/absl/synchronization/blocking_counter.h +++ b/absl/synchronization/blocking_counter.h @@ -20,6 +20,8 @@ #ifndef ABSL_SYNCHRONIZATION_BLOCKING_COUNTER_H_ #define ABSL_SYNCHRONIZATION_BLOCKING_COUNTER_H_ +#include <atomic> + #include "absl/base/thread_annotations.h" #include "absl/synchronization/mutex.h" @@ -60,8 +62,7 @@ ABSL_NAMESPACE_BEGIN // class BlockingCounter { public: - explicit BlockingCounter(int initial_count) - : count_(initial_count), num_waiting_(0) {} + explicit BlockingCounter(int initial_count); BlockingCounter(const BlockingCounter&) = delete; BlockingCounter& operator=(const BlockingCounter&) = delete; @@ -89,8 +90,9 @@ class BlockingCounter { private: Mutex lock_; - int count_ ABSL_GUARDED_BY(lock_); + std::atomic<int> count_; int num_waiting_ ABSL_GUARDED_BY(lock_); + bool done_ ABSL_GUARDED_BY(lock_); }; ABSL_NAMESPACE_END diff --git a/absl/synchronization/blocking_counter_benchmark.cc b/absl/synchronization/blocking_counter_benchmark.cc new file mode 100644 index 00000000..b504d1a5 --- /dev/null +++ b/absl/synchronization/blocking_counter_benchmark.cc @@ -0,0 +1,83 @@ +// Copyright 2021 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <limits> + +#include "absl/synchronization/blocking_counter.h" +#include "absl/synchronization/internal/thread_pool.h" +#include "benchmark/benchmark.h" + +namespace { + +void BM_BlockingCounter_SingleThread(benchmark::State& state) { + for (auto _ : state) { + int iterations = state.range(0); + absl::BlockingCounter counter{iterations}; + for (int i = 0; i < iterations; ++i) { + counter.DecrementCount(); + } + counter.Wait(); + } +} +BENCHMARK(BM_BlockingCounter_SingleThread) + ->ArgName("iterations") + ->Arg(2) + ->Arg(4) + ->Arg(16) + ->Arg(64) + ->Arg(256); + +void BM_BlockingCounter_DecrementCount(benchmark::State& state) { + static absl::BlockingCounter* counter = + new absl::BlockingCounter{std::numeric_limits<int>::max()}; + for (auto _ : state) { + counter->DecrementCount(); + } +} +BENCHMARK(BM_BlockingCounter_DecrementCount) + ->Threads(2) + ->Threads(4) + ->Threads(6) + ->Threads(8) + ->Threads(10) + ->Threads(12) + ->Threads(16) + ->Threads(32) + ->Threads(64) + ->Threads(128); + +void BM_BlockingCounter_Wait(benchmark::State& state) { + int num_threads = state.range(0); + absl::synchronization_internal::ThreadPool pool(num_threads); + for (auto _ : state) { + absl::BlockingCounter counter{num_threads}; + pool.Schedule([num_threads, &counter, &pool]() { + for (int i = 0; i < num_threads; ++i) { + pool.Schedule([&counter]() { counter.DecrementCount(); }); + } + }); + counter.Wait(); + } +} +BENCHMARK(BM_BlockingCounter_Wait) + ->ArgName("threads") + ->Arg(2) + ->Arg(4) + ->Arg(8) + ->Arg(16) + ->Arg(32) + ->Arg(64) + ->Arg(128); + +} // namespace diff --git a/absl/synchronization/blocking_counter_test.cc b/absl/synchronization/blocking_counter_test.cc index 2926224a..06885f57 100644 --- a/absl/synchronization/blocking_counter_test.cc +++ b/absl/synchronization/blocking_counter_test.cc @@ -63,6 +63,18 @@ TEST(BlockingCounterTest, BasicFunctionality) { } } +TEST(BlockingCounterTest, WaitZeroInitialCount) { + BlockingCounter counter(0); + counter.Wait(); +} + +#if GTEST_HAS_DEATH_TEST +TEST(BlockingCounterTest, WaitNegativeInitialCount) { + EXPECT_DEATH(BlockingCounter counter(-1), + "BlockingCounter initial_count negative"); +} +#endif + } // namespace ABSL_NAMESPACE_END } // namespace absl diff --git a/absl/synchronization/internal/create_thread_identity.cc b/absl/synchronization/internal/create_thread_identity.cc index 53a71b34..44e6129b 100644 --- a/absl/synchronization/internal/create_thread_identity.cc +++ b/absl/synchronization/internal/create_thread_identity.cc @@ -38,7 +38,7 @@ ABSL_CONST_INIT static base_internal::ThreadIdentity* thread_identity_freelist; // A per-thread destructor for reclaiming associated ThreadIdentity objects. // Since we must preserve their storage we cache them for re-use. -void ReclaimThreadIdentity(void* v) { +static void ReclaimThreadIdentity(void* v) { base_internal::ThreadIdentity* identity = static_cast<base_internal::ThreadIdentity*>(v); @@ -48,8 +48,6 @@ void ReclaimThreadIdentity(void* v) { base_internal::LowLevelAlloc::Free(identity->per_thread_synch.all_locks); } - PerThreadSem::Destroy(identity); - // We must explicitly clear the current thread's identity: // (a) Subsequent (unrelated) per-thread destructors may require an identity. // We must guarantee a new identity is used in this case (this instructor @@ -71,7 +69,12 @@ static intptr_t RoundUp(intptr_t addr, intptr_t align) { return (addr + align - 1) & ~(align - 1); } -static void ResetThreadIdentity(base_internal::ThreadIdentity* identity) { +void OneTimeInitThreadIdentity(base_internal::ThreadIdentity* identity) { + PerThreadSem::Init(identity); +} + +static void ResetThreadIdentityBetweenReuse( + base_internal::ThreadIdentity* identity) { base_internal::PerThreadSynch* pts = &identity->per_thread_synch; pts->next = nullptr; pts->skip = nullptr; @@ -116,8 +119,9 @@ static base_internal::ThreadIdentity* NewThreadIdentity() { identity = reinterpret_cast<base_internal::ThreadIdentity*>( RoundUp(reinterpret_cast<intptr_t>(allocation), base_internal::PerThreadSynch::kAlignment)); + OneTimeInitThreadIdentity(identity); } - ResetThreadIdentity(identity); + ResetThreadIdentityBetweenReuse(identity); return identity; } @@ -127,7 +131,6 @@ static base_internal::ThreadIdentity* NewThreadIdentity() { // REQUIRES: CurrentThreadIdentity(false) == nullptr base_internal::ThreadIdentity* CreateThreadIdentity() { base_internal::ThreadIdentity* identity = NewThreadIdentity(); - PerThreadSem::Init(identity); // Associate the value with the current thread, and attach our destructor. base_internal::SetCurrentThreadIdentity(identity, ReclaimThreadIdentity); return identity; diff --git a/absl/synchronization/internal/create_thread_identity.h b/absl/synchronization/internal/create_thread_identity.h index e121f683..4cfde091 100644 --- a/absl/synchronization/internal/create_thread_identity.h +++ b/absl/synchronization/internal/create_thread_identity.h @@ -36,10 +36,6 @@ namespace synchronization_internal { // For private use only. base_internal::ThreadIdentity* CreateThreadIdentity(); -// A per-thread destructor for reclaiming associated ThreadIdentity objects. -// For private use only. -void ReclaimThreadIdentity(void* v); - // Returns the ThreadIdentity object representing the calling thread; guaranteed // to be unique for its lifetime. The returned object will remain valid for the // program's lifetime; although it may be re-assigned to a subsequent thread. diff --git a/absl/synchronization/internal/per_thread_sem.cc b/absl/synchronization/internal/per_thread_sem.cc index a6031787..469e8f32 100644 --- a/absl/synchronization/internal/per_thread_sem.cc +++ b/absl/synchronization/internal/per_thread_sem.cc @@ -47,10 +47,6 @@ void PerThreadSem::Init(base_internal::ThreadIdentity *identity) { identity->is_idle.store(false, std::memory_order_relaxed); } -void PerThreadSem::Destroy(base_internal::ThreadIdentity *identity) { - Waiter::GetWaiter(identity)->~Waiter(); -} - void PerThreadSem::Tick(base_internal::ThreadIdentity *identity) { const int ticker = identity->ticker.fetch_add(1, std::memory_order_relaxed) + 1; diff --git a/absl/synchronization/internal/per_thread_sem.h b/absl/synchronization/internal/per_thread_sem.h index 7beae8ef..90a88809 100644 --- a/absl/synchronization/internal/per_thread_sem.h +++ b/absl/synchronization/internal/per_thread_sem.h @@ -66,10 +66,6 @@ class PerThreadSem { // REQUIRES: May only be called by ThreadIdentity. static void Init(base_internal::ThreadIdentity* identity); - // Destroy the PerThreadSem associated with "identity". - // REQUIRES: May only be called by ThreadIdentity. - static void Destroy(base_internal::ThreadIdentity* identity); - // Increments "identity"'s count. static inline void Post(base_internal::ThreadIdentity* identity); @@ -81,8 +77,7 @@ class PerThreadSem { // Permitted callers. friend class PerThreadSemTest; friend class absl::Mutex; - friend absl::base_internal::ThreadIdentity* CreateThreadIdentity(); - friend void ReclaimThreadIdentity(void* v); + friend void OneTimeInitThreadIdentity(absl::base_internal::ThreadIdentity*); }; } // namespace synchronization_internal diff --git a/absl/synchronization/internal/per_thread_sem_test.cc b/absl/synchronization/internal/per_thread_sem_test.cc index 8cf59e64..24a6b548 100644 --- a/absl/synchronization/internal/per_thread_sem_test.cc +++ b/absl/synchronization/internal/per_thread_sem_test.cc @@ -159,7 +159,7 @@ TEST_F(PerThreadSemTest, Timeouts) { const absl::Duration elapsed = absl::Now() - start; // Allow for a slight early return, to account for quality of implementation // issues on various platforms. - const absl::Duration slop = absl::Microseconds(200); + const absl::Duration slop = absl::Milliseconds(1); EXPECT_LE(delay - slop, elapsed) << "Wait returned " << delay - elapsed << " early (with " << slop << " slop), start time was " << start; @@ -174,6 +174,15 @@ TEST_F(PerThreadSemTest, Timeouts) { EXPECT_TRUE(Wait(negative_timeout)); } +TEST_F(PerThreadSemTest, ThreadIdentityReuse) { + // Create a base_internal::ThreadIdentity object and keep reusing it. There + // should be no memory or resource leaks. + for (int i = 0; i < 10000; i++) { + std::thread t([]() { GetOrCreateCurrentThreadIdentity(); }); + t.join(); + } +} + } // namespace } // namespace synchronization_internal diff --git a/absl/synchronization/internal/waiter.cc b/absl/synchronization/internal/waiter.cc index 2123be60..f2051d67 100644 --- a/absl/synchronization/internal/waiter.cc +++ b/absl/synchronization/internal/waiter.cc @@ -71,14 +71,13 @@ Waiter::Waiter() { futex_.store(0, std::memory_order_relaxed); } -Waiter::~Waiter() = default; - bool Waiter::Wait(KernelTimeout t) { // Loop until we can atomically decrement futex from a positive // value, waiting on a futex while we believe it is zero. // Note that, since the thread ticker is just reset, we don't need to check // whether the thread is idle on the very first pass of the loop. bool first_pass = true; + while (true) { int32_t x = futex_.load(std::memory_order_relaxed); while (x != 0) { @@ -90,7 +89,6 @@ bool Waiter::Wait(KernelTimeout t) { return true; // Consumed a wakeup, we are done. } - if (!first_pass) MaybeBecomeIdle(); const int err = Futex::WaitUntil(&futex_, 0, t); if (err != 0) { @@ -161,18 +159,6 @@ Waiter::Waiter() { wakeup_count_ = 0; } -Waiter::~Waiter() { - const int err = pthread_mutex_destroy(&mu_); - if (err != 0) { - ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err); - } - - const int err2 = pthread_cond_destroy(&cv_); - if (err2 != 0) { - ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2); - } -} - bool Waiter::Wait(KernelTimeout t) { struct timespec abs_timeout; if (t.has_timeout()) { @@ -240,12 +226,6 @@ Waiter::Waiter() { wakeups_.store(0, std::memory_order_relaxed); } -Waiter::~Waiter() { - if (sem_destroy(&sem_) != 0) { - ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno); - } -} - bool Waiter::Wait(KernelTimeout t) { struct timespec abs_timeout; if (t.has_timeout()) { @@ -363,11 +343,6 @@ Waiter::Waiter() { wakeup_count_ = 0; } -// SRW locks and condition variables do not need to be explicitly destroyed. -// https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock -// https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with -Waiter::~Waiter() = default; - bool Waiter::Wait(KernelTimeout t) { SRWLOCK *mu = WinHelper::GetLock(this); CONDITION_VARIABLE *cv = WinHelper::GetCond(this); diff --git a/absl/synchronization/internal/waiter.h b/absl/synchronization/internal/waiter.h index be3df180..b8adfeb5 100644 --- a/absl/synchronization/internal/waiter.h +++ b/absl/synchronization/internal/waiter.h @@ -71,9 +71,6 @@ class Waiter { Waiter(const Waiter&) = delete; Waiter& operator=(const Waiter&) = delete; - // Destroy any data to track waits. - ~Waiter(); - // Blocks the calling thread until a matching call to `Post()` or // `t` has passed. Returns `true` if woken (`Post()` called), // `false` on timeout. @@ -106,6 +103,12 @@ class Waiter { #endif private: + // The destructor must not be called since Mutex/CondVar + // can use PerThreadSem/Waiter after the thread exits. + // Waiter objects are embedded in ThreadIdentity objects, + // which are reused via a freelist and are never destroyed. + ~Waiter() = delete; + #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX // Futexes are defined by specification to be 32-bits. // Thus std::atomic<int32_t> must be just an int32_t with lockfree methods. @@ -136,8 +139,11 @@ class Waiter { // REQUIRES: WinHelper::GetLock(this) must be held. void InternalCondVarPoke(); - // We can't include Windows.h in our headers, so we use aligned charachter + // We can't include Windows.h in our headers, so we use aligned character // buffers to define the storage of SRWLOCK and CONDITION_VARIABLE. + // SRW locks and condition variables do not need to be explicitly destroyed. + // https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock + // https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with alignas(void*) unsigned char mu_storage_[sizeof(void*)]; alignas(void*) unsigned char cv_storage_[sizeof(void*)]; int waiter_count_; diff --git a/absl/synchronization/mutex.cc b/absl/synchronization/mutex.cc index 76ad41fe..52e2455d 100644 --- a/absl/synchronization/mutex.cc +++ b/absl/synchronization/mutex.cc @@ -109,7 +109,7 @@ static inline bool EvalConditionAnnotated(const Condition *cond, Mutex *mu, bool locking, bool trylock, bool read_lock); -void RegisterMutexProfiler(void (*fn)(int64_t wait_timestamp)) { +void RegisterMutexProfiler(void (*fn)(int64_t wait_cycles)) { submit_profile_data.Store(fn); } @@ -1744,23 +1744,33 @@ ABSL_XRAY_LOG_ARGS(1) void Mutex::ReaderUnlock() { ABSL_TSAN_MUTEX_POST_UNLOCK(this, __tsan_mutex_read_lock); } -// The zap_desig_waker bitmask is used to clear the designated waker flag in -// the mutex if this thread has blocked, and therefore may be the designated -// waker. -static const intptr_t zap_desig_waker[] = { - ~static_cast<intptr_t>(0), // not blocked - ~static_cast<intptr_t>( - kMuDesig) // blocked; turn off the designated waker bit -}; +// Clears the designated waker flag in the mutex if this thread has blocked, and +// therefore may be the designated waker. +static intptr_t ClearDesignatedWakerMask(int flag) { + assert(flag >= 0); + assert(flag <= 1); + switch (flag) { + case 0: // not blocked + return ~static_cast<intptr_t>(0); + case 1: // blocked; turn off the designated waker bit + return ~static_cast<intptr_t>(kMuDesig); + } + ABSL_INTERNAL_UNREACHABLE; +} -// The ignore_waiting_writers bitmask is used to ignore the existence -// of waiting writers if a reader that has already blocked once -// wakes up. -static const intptr_t ignore_waiting_writers[] = { - ~static_cast<intptr_t>(0), // not blocked - ~static_cast<intptr_t>( - kMuWrWait) // blocked; pretend there are no waiting writers -}; +// Conditionally ignores the existence of waiting writers if a reader that has +// already blocked once wakes up. +static intptr_t IgnoreWaitingWritersMask(int flag) { + assert(flag >= 0); + assert(flag <= 1); + switch (flag) { + case 0: // not blocked + return ~static_cast<intptr_t>(0); + case 1: // blocked; pretend there are no waiting writers + return ~static_cast<intptr_t>(kMuWrWait); + } + ABSL_INTERNAL_UNREACHABLE; +} // Internal version of LockWhen(). See LockSlowWithDeadline() ABSL_ATTRIBUTE_NOINLINE void Mutex::LockSlow(MuHow how, const Condition *cond, @@ -1852,8 +1862,10 @@ bool Mutex::LockSlowWithDeadline(MuHow how, const Condition *cond, bool unlock = false; if ((v & how->fast_need_zero) == 0 && // try fast acquire mu_.compare_exchange_strong( - v, (how->fast_or | (v & zap_desig_waker[flags & kMuHasBlocked])) + - how->fast_add, + v, + (how->fast_or | + (v & ClearDesignatedWakerMask(flags & kMuHasBlocked))) + + how->fast_add, std::memory_order_acquire, std::memory_order_relaxed)) { if (cond == nullptr || EvalConditionAnnotated(cond, this, true, false, how == kShared)) { @@ -1927,9 +1939,10 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) { CheckForMutexCorruption(v, "Lock"); if ((v & waitp->how->slow_need_zero) == 0) { if (mu_.compare_exchange_strong( - v, (waitp->how->fast_or | - (v & zap_desig_waker[flags & kMuHasBlocked])) + - waitp->how->fast_add, + v, + (waitp->how->fast_or | + (v & ClearDesignatedWakerMask(flags & kMuHasBlocked))) + + waitp->how->fast_add, std::memory_order_acquire, std::memory_order_relaxed)) { if (waitp->cond == nullptr || EvalConditionAnnotated(waitp->cond, this, true, false, @@ -1946,8 +1959,9 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) { if ((v & (kMuSpin|kMuWait)) == 0) { // no waiters // This thread tries to become the one and only waiter. PerThreadSynch *new_h = Enqueue(nullptr, waitp, v, flags); - intptr_t nv = (v & zap_desig_waker[flags & kMuHasBlocked] & kMuLow) | - kMuWait; + intptr_t nv = + (v & ClearDesignatedWakerMask(flags & kMuHasBlocked) & kMuLow) | + kMuWait; ABSL_RAW_CHECK(new_h != nullptr, "Enqueue to empty list failed"); if (waitp->how == kExclusive && (v & kMuReader) != 0) { nv |= kMuWrWait; @@ -1961,12 +1975,13 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) { waitp->thread->waitp = nullptr; } } else if ((v & waitp->how->slow_inc_need_zero & - ignore_waiting_writers[flags & kMuHasBlocked]) == 0) { + IgnoreWaitingWritersMask(flags & kMuHasBlocked)) == 0) { // This is a reader that needs to increment the reader count, // but the count is currently held in the last waiter. if (mu_.compare_exchange_strong( - v, (v & zap_desig_waker[flags & kMuHasBlocked]) | kMuSpin | - kMuReader, + v, + (v & ClearDesignatedWakerMask(flags & kMuHasBlocked)) | + kMuSpin | kMuReader, std::memory_order_acquire, std::memory_order_relaxed)) { PerThreadSynch *h = GetPerThreadSynch(v); h->readers += kMuOne; // inc reader count in waiter @@ -1987,8 +2002,9 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) { } } else if ((v & kMuSpin) == 0 && // attempt to queue ourselves mu_.compare_exchange_strong( - v, (v & zap_desig_waker[flags & kMuHasBlocked]) | kMuSpin | - kMuWait, + v, + (v & ClearDesignatedWakerMask(flags & kMuHasBlocked)) | + kMuSpin | kMuWait, std::memory_order_acquire, std::memory_order_relaxed)) { PerThreadSynch *h = GetPerThreadSynch(v); PerThreadSynch *new_h = Enqueue(h, waitp, v, flags); @@ -2315,19 +2331,21 @@ ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams *waitp) { } // end of for(;;)-loop if (wake_list != kPerThreadSynchNull) { - int64_t enqueue_timestamp = wake_list->waitp->contention_start_cycles; - bool cond_waiter = wake_list->cond_waiter; + int64_t wait_cycles = 0; + int64_t now = base_internal::CycleClock::Now(); do { + // Sample lock contention events only if the waiter was trying to acquire + // the lock, not waiting on a condition variable or Condition. + if (!wake_list->cond_waiter) { + wait_cycles += (now - wake_list->waitp->contention_start_cycles); + wake_list->waitp->contention_start_cycles = now; + } wake_list = Wakeup(wake_list); // wake waiters } while (wake_list != kPerThreadSynchNull); - if (!cond_waiter) { - // Sample lock contention events only if the (first) waiter was trying to - // acquire the lock, not waiting on a condition variable or Condition. - int64_t wait_cycles = - base_internal::CycleClock::Now() - enqueue_timestamp; + if (wait_cycles > 0) { mutex_tracer("slow release", this, wait_cycles); ABSL_TSAN_MUTEX_PRE_DIVERT(this, 0); - submit_profile_data(enqueue_timestamp); + submit_profile_data(wait_cycles); ABSL_TSAN_MUTEX_POST_DIVERT(this, 0); } } @@ -2492,9 +2510,9 @@ void CondVar::Remove(PerThreadSynch *s) { // before calling Mutex::UnlockSlow(), the Mutex code might be re-entered (via // the logging code, or via a Condition function) and might potentially attempt // to block this thread. That would be a problem if the thread were already on -// a the condition variable waiter queue. Thus, we use the waitp->cv_word -// to tell the unlock code to call CondVarEnqueue() to queue the thread on the -// condition variable queue just before the mutex is to be unlocked, and (most +// a condition variable waiter queue. Thus, we use the waitp->cv_word to tell +// the unlock code to call CondVarEnqueue() to queue the thread on the condition +// variable queue just before the mutex is to be unlocked, and (most // importantly) after any call to an external routine that might re-enter the // mutex code. static void CondVarEnqueue(SynchWaitParams *waitp) { @@ -2557,6 +2575,23 @@ bool CondVar::WaitCommon(Mutex *mutex, KernelTimeout t) { while (waitp.thread->state.load(std::memory_order_acquire) == PerThreadSynch::kQueued) { if (!Mutex::DecrementSynchSem(mutex, waitp.thread, t)) { + // DecrementSynchSem returned due to timeout. + // Now we will either (1) remove ourselves from the wait list in Remove + // below, in which case Remove will set thread.state = kAvailable and + // we will not call DecrementSynchSem again; or (2) Signal/SignalAll + // has removed us concurrently and is calling Wakeup, which will set + // thread.state = kAvailable and post to the semaphore. + // It's important to reset the timeout for the case (2) because otherwise + // we can live-lock in this loop since DecrementSynchSem will always + // return immediately due to timeout, but Signal/SignalAll is not + // necessary set thread.state = kAvailable yet (and is not scheduled + // due to thread priorities or other scheduler artifacts). + // Note this could also be resolved if Signal/SignalAll would set + // thread.state = kAvailable while holding the wait list spin lock. + // But this can't be easily done for SignalAll since it grabs the whole + // wait list with a single compare-exchange and does not really grab + // the spin lock. + t = KernelTimeout::Never(); this->Remove(waitp.thread); rc = true; } diff --git a/absl/synchronization/mutex.h b/absl/synchronization/mutex.h index f49e0c83..b69b7089 100644 --- a/absl/synchronization/mutex.h +++ b/absl/synchronization/mutex.h @@ -174,9 +174,12 @@ class ABSL_LOCKABLE Mutex { // Mutex::AssertHeld() // - // Return immediately if this thread holds the `Mutex` exclusively (in write - // mode). Otherwise, may report an error (typically by crashing with a - // diagnostic), or may return immediately. + // Require that the mutex be held exclusively (write mode) by this thread. + // + // If the mutex is not currently held by this thread, this function may report + // an error (typically by crashing with a diagnostic) or it may do nothing. + // This function is intended only as a tool to assist debugging; it doesn't + // guarantee correctness. void AssertHeld() const ABSL_ASSERT_EXCLUSIVE_LOCK(); // --------------------------------------------------------------------------- @@ -236,9 +239,13 @@ class ABSL_LOCKABLE Mutex { // Mutex::AssertReaderHeld() // - // Returns immediately if this thread holds the `Mutex` in at least shared - // mode (read mode). Otherwise, may report an error (typically by - // crashing with a diagnostic), or may return immediately. + // Require that the mutex be held at least in shared mode (read mode) by this + // thread. + // + // If the mutex is not currently held by this thread, this function may report + // an error (typically by crashing with a diagnostic) or it may do nothing. + // This function is intended only as a tool to assist debugging; it doesn't + // guarantee correctness. void AssertReaderHeld() const ABSL_ASSERT_SHARED_LOCK(); // Mutex::WriterLock() @@ -778,9 +785,9 @@ class Condition { // // Usage to wake T is: // mu.Lock(); -// // process data, possibly establishing C -// if (C) { cv->Signal(); } -// mu.Unlock(); +// // process data, possibly establishing C +// if (C) { cv->Signal(); } +// mu.Unlock(); // // If C may be useful to more than one waiter, use `SignalAll()` instead of // `Signal()`. @@ -984,14 +991,15 @@ inline Condition::Condition(const T *object, // Register a hook for profiling support. // // The function pointer registered here will be called whenever a mutex is -// contended. The callback is given the absl/base/cycleclock.h timestamp when -// waiting began. +// contended. The callback is given the cycles for which waiting happened (as +// measured by //absl/base/internal/cycleclock.h, and which may not +// be real "cycle" counts.) // // Calls to this function do not race or block, but there is no ordering // guaranteed between calls to this function and call to the provided hook. // In particular, the previously registered hook may still be called for some // time after this function returns. -void RegisterMutexProfiler(void (*fn)(int64_t wait_timestamp)); +void RegisterMutexProfiler(void (*fn)(int64_t wait_cycles)); // Register a hook for Mutex tracing. // diff --git a/absl/synchronization/mutex_benchmark.cc b/absl/synchronization/mutex_benchmark.cc index e35aed8b..b5d2fbc4 100644 --- a/absl/synchronization/mutex_benchmark.cc +++ b/absl/synchronization/mutex_benchmark.cc @@ -97,7 +97,7 @@ void BM_MutexEnqueue(benchmark::State& state) { // Mutex queueing behavior is modified. const bool multiple_priorities = state.range(0); ScopedThreadMutexPriority priority_setter( - (multiple_priorities && state.thread_index != 0) ? 1 : 0); + (multiple_priorities && state.thread_index() != 0) ? 1 : 0); struct Shared { absl::Mutex mu; @@ -176,7 +176,7 @@ BENCHMARK(BM_MutexEnqueue) template <typename MutexType> void BM_Contended(benchmark::State& state) { - int priority = state.thread_index % state.range(1); + int priority = state.thread_index() % state.range(1); ScopedThreadMutexPriority priority_setter(priority); struct Shared { @@ -196,7 +196,7 @@ void BM_Contended(benchmark::State& state) { // To achieve this amount of local work is multiplied by number of threads // to keep ratio between local work and critical section approximately // equal regardless of number of threads. - DelayNs(100 * state.threads, &local); + DelayNs(100 * state.threads(), &local); RaiiLocker<MutexType> locker(&shared->mu); DelayNs(state.range(0), &shared->data); } diff --git a/absl/synchronization/mutex_test.cc b/absl/synchronization/mutex_test.cc index f8fbf948..99bb0175 100644 --- a/absl/synchronization/mutex_test.cc +++ b/absl/synchronization/mutex_test.cc @@ -26,6 +26,7 @@ #include <random> #include <string> #include <thread> // NOLINT(build/c++11) +#include <type_traits> #include <vector> #include "gtest/gtest.h" @@ -870,33 +871,6 @@ TEST(Mutex, LockedMutexDestructionBug) ABSL_NO_THREAD_SAFETY_ANALYSIS { } } -// -------------------------------------------------------- -// Test for bug with pattern of readers using a condvar. The bug was that if a -// reader went to sleep on a condition variable while one or more other readers -// held the lock, but there were no waiters, the reader count (held in the -// mutex word) would be lost. (This is because Enqueue() had at one time -// always placed the thread on the Mutex queue. Later (CL 4075610), to -// tolerate re-entry into Mutex from a Condition predicate, Enqueue() was -// changed so that it could also place a thread on a condition-variable. This -// introduced the case where Enqueue() returned with an empty queue, and this -// case was handled incorrectly in one place.) - -static void ReaderForReaderOnCondVar(absl::Mutex *mu, absl::CondVar *cv, - int *running) { - std::random_device dev; - std::mt19937 gen(dev()); - std::uniform_int_distribution<int> random_millis(0, 15); - mu->ReaderLock(); - while (*running == 3) { - absl::SleepFor(absl::Milliseconds(random_millis(gen))); - cv->WaitWithTimeout(mu, absl::Milliseconds(random_millis(gen))); - } - mu->ReaderUnlock(); - mu->Lock(); - (*running)--; - mu->Unlock(); -} - struct True { template <class... Args> bool operator()(Args...) const { @@ -945,6 +919,33 @@ TEST(Mutex, FunctorCondition) { } } +// -------------------------------------------------------- +// Test for bug with pattern of readers using a condvar. The bug was that if a +// reader went to sleep on a condition variable while one or more other readers +// held the lock, but there were no waiters, the reader count (held in the +// mutex word) would be lost. (This is because Enqueue() had at one time +// always placed the thread on the Mutex queue. Later (CL 4075610), to +// tolerate re-entry into Mutex from a Condition predicate, Enqueue() was +// changed so that it could also place a thread on a condition-variable. This +// introduced the case where Enqueue() returned with an empty queue, and this +// case was handled incorrectly in one place.) + +static void ReaderForReaderOnCondVar(absl::Mutex *mu, absl::CondVar *cv, + int *running) { + std::random_device dev; + std::mt19937 gen(dev()); + std::uniform_int_distribution<int> random_millis(0, 15); + mu->ReaderLock(); + while (*running == 3) { + absl::SleepFor(absl::Milliseconds(random_millis(gen))); + cv->WaitWithTimeout(mu, absl::Milliseconds(random_millis(gen))); + } + mu->ReaderUnlock(); + mu->Lock(); + (*running)--; + mu->Unlock(); +} + static bool IntIsZero(int *x) { return *x == 0; } // Test for reader waiting condition variable when there are other readers @@ -1703,4 +1704,30 @@ TEST(Mutex, MuTime) { EXPECT_EQ(RunTest(&TestMuTime, threads, iterations, 1), threads * iterations); } +TEST(Mutex, SignalExitedThread) { + // The test may expose a race when Mutex::Unlock signals a thread + // that has already exited. +#if defined(__wasm__) || defined(__asmjs__) + constexpr int kThreads = 1; // OOMs under WASM +#else + constexpr int kThreads = 100; +#endif + std::vector<std::thread> top; + for (unsigned i = 0; i < 2 * std::thread::hardware_concurrency(); i++) { + top.emplace_back([&]() { + for (int i = 0; i < kThreads; i++) { + absl::Mutex mu; + std::thread t([&]() { + mu.Lock(); + mu.Unlock(); + }); + mu.Lock(); + mu.Unlock(); + t.join(); + } + }); + } + for (auto &th : top) th.join(); +} + } // namespace diff --git a/absl/synchronization/notification.h b/absl/synchronization/notification.h index 9a354ca2..4bec2689 100644 --- a/absl/synchronization/notification.h +++ b/absl/synchronization/notification.h @@ -22,7 +22,7 @@ // The `Notification` object maintains a private boolean "notified" state that // transitions to `true` at most once. The `Notification` class provides the // following primary member functions: -// * `HasBeenNotified() `to query its state +// * `HasBeenNotified()` to query its state // * `WaitForNotification*()` to have threads wait until the "notified" state // is `true`. // * `Notify()` to set the notification's "notified" state to `true` and @@ -52,6 +52,7 @@ #include <atomic> +#include "absl/base/attributes.h" #include "absl/base/macros.h" #include "absl/synchronization/mutex.h" #include "absl/time/time.h" @@ -74,7 +75,7 @@ class Notification { // Notification::HasBeenNotified() // // Returns the value of the notification's internal "notified" state. - bool HasBeenNotified() const { + ABSL_MUST_USE_RESULT bool HasBeenNotified() const { return HasBeenNotifiedInternal(&this->notified_yet_); } |