diff options
Diffstat (limited to 'absl/synchronization')
23 files changed, 959 insertions, 1080 deletions
diff --git a/absl/synchronization/BUILD.bazel b/absl/synchronization/BUILD.bazel index 3f876b9f..64d3b929 100644 --- a/absl/synchronization/BUILD.bazel +++ b/absl/synchronization/BUILD.bazel @@ -14,7 +14,6 @@ # limitations under the License. # -load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library", "cc_test") load( "//absl:copts/configure_copts.bzl", "ABSL_DEFAULT_COPTS", @@ -24,7 +23,7 @@ load( package(default_visibility = ["//visibility:public"]) -licenses(["notice"]) # Apache 2.0 +licenses(["notice"]) # Internal data structure for efficiently detecting mutex dependency cycles cc_library( @@ -35,7 +34,9 @@ cc_library( hdrs = [ "internal/graphcycles.h", ], - copts = ABSL_DEFAULT_COPTS, + copts = ABSL_DEFAULT_COPTS + select({ + "//conditions:default": [], + }), linkopts = ABSL_DEFAULT_LINKOPTS, visibility = [ "//absl:__subpackages__", @@ -73,15 +74,14 @@ cc_library( "internal/create_thread_identity.cc", "internal/per_thread_sem.cc", "internal/waiter.cc", + "mutex.cc", "notification.cc", - ] + select({ - "//conditions:default": ["mutex.cc"], - }), + ], hdrs = [ "barrier.h", "blocking_counter.h", "internal/create_thread_identity.h", - "internal/mutex_nonprod.inc", + "internal/futex.h", "internal/per_thread_sem.h", "internal/waiter.h", "mutex.h", @@ -89,14 +89,16 @@ cc_library( ], copts = ABSL_DEFAULT_COPTS, linkopts = select({ - "//absl:windows": [], + "//absl:msvc_compiler": [], + "//absl:clang-cl_compiler": [], + "//absl:wasm": [], "//conditions:default": ["-pthread"], }) + ABSL_DEFAULT_LINKOPTS, deps = [ ":graphcycles_internal", ":kernel_timeout_internal", - "//absl/base", "//absl/base:atomic_hook", + "//absl/base", "//absl/base:base_internal", "//absl/base:config", "//absl/base:core_headers", @@ -106,7 +108,9 @@ cc_library( "//absl/debugging:stacktrace", "//absl/debugging:symbolize", "//absl/time", - ], + ] + select({ + "//conditions:default": [], + }), ) cc_test( @@ -115,6 +119,9 @@ cc_test( srcs = ["barrier_test.cc"], copts = ABSL_TEST_COPTS, linkopts = ABSL_DEFAULT_LINKOPTS, + tags = [ + "no_test_wasm", + ], deps = [ ":synchronization", "//absl/time", @@ -128,6 +135,9 @@ cc_test( srcs = ["blocking_counter_test.cc"], copts = ABSL_TEST_COPTS, linkopts = ABSL_DEFAULT_LINKOPTS, + tags = [ + "no_test_wasm", + ], deps = [ ":synchronization", "//absl/time", @@ -135,6 +145,21 @@ cc_test( ], ) +cc_binary( + name = "blocking_counter_benchmark", + testonly = 1, + srcs = ["blocking_counter_benchmark.cc"], + copts = ABSL_TEST_COPTS, + linkopts = ABSL_DEFAULT_LINKOPTS, + tags = ["benchmark"], + visibility = ["//visibility:private"], + deps = [ + ":synchronization", + ":thread_pool", + "@com_github_google_benchmark//:benchmark_main", + ], +) + cc_test( name = "graphcycles_test", size = "medium", @@ -189,6 +214,7 @@ cc_test( ":synchronization", ":thread_pool", "//absl/base", + "//absl/base:config", "//absl/base:core_headers", "//absl/base:raw_logging_internal", "//absl/memory", @@ -210,6 +236,7 @@ cc_library( ":synchronization", ":thread_pool", "//absl/base", + "//absl/base:config", "@com_github_google_benchmark//:benchmark_main", ], alwayslink = 1, @@ -248,6 +275,7 @@ cc_library( deps = [ ":synchronization", "//absl/base", + "//absl/base:config", "//absl/strings", "//absl/time", "@com_google_googletest//:gtest", @@ -260,6 +288,9 @@ cc_test( size = "medium", copts = ABSL_TEST_COPTS, linkopts = ABSL_DEFAULT_LINKOPTS, + tags = [ + "no_test_wasm", + ], deps = [ ":per_thread_sem_test_common", ":synchronization", @@ -276,7 +307,10 @@ cc_test( ], copts = ABSL_TEST_COPTS, linkopts = ABSL_DEFAULT_LINKOPTS, - tags = ["no_test_ios_x86_64"], + tags = [ + "no_test_ios_x86_64", + "no_test_wasm", + ], deps = [ ":synchronization", "//absl/base:core_headers", diff --git a/absl/synchronization/CMakeLists.txt b/absl/synchronization/CMakeLists.txt index dfe5d05d..9335c264 100644 --- a/absl/synchronization/CMakeLists.txt +++ b/absl/synchronization/CMakeLists.txt @@ -14,6 +14,7 @@ # limitations under the License. # +# Internal-only target, do not depend on directly. absl_cc_library( NAME graphcycles_internal @@ -32,6 +33,7 @@ absl_cc_library( absl::raw_logging_internal ) +# Internal-only target, do not depend on directly. absl_cc_library( NAME kernel_timeout_internal @@ -52,7 +54,7 @@ absl_cc_library( "barrier.h" "blocking_counter.h" "internal/create_thread_identity.h" - "internal/mutex_nonprod.inc" + "internal/futex.h" "internal/per_thread_sem.h" "internal/waiter.h" "mutex.h" @@ -95,7 +97,7 @@ absl_cc_test( DEPS absl::synchronization absl::time - gmock_main + GTest::gmock_main ) absl_cc_test( @@ -108,7 +110,7 @@ absl_cc_test( DEPS absl::synchronization absl::time - gmock_main + GTest::gmock_main ) absl_cc_test( @@ -122,9 +124,10 @@ absl_cc_test( absl::graphcycles_internal absl::core_headers absl::raw_logging_internal - gmock_main + GTest::gmock_main ) +# Internal-only target, do not depend on directly. absl_cc_library( NAME thread_pool @@ -149,11 +152,12 @@ absl_cc_test( absl::synchronization absl::thread_pool absl::base + absl::config absl::core_headers absl::memory absl::raw_logging_internal absl::time - gmock_main + GTest::gmock_main ) absl_cc_test( @@ -166,9 +170,10 @@ absl_cc_test( DEPS absl::synchronization absl::time - gmock_main + GTest::gmock_main ) +# Internal-only target, do not depend on directly. absl_cc_library( NAME per_thread_sem_test_common @@ -179,9 +184,10 @@ absl_cc_library( DEPS absl::synchronization absl::base + absl::config absl::strings absl::time - gmock + GTest::gmock TESTONLY ) @@ -197,7 +203,7 @@ absl_cc_test( absl::synchronization absl::strings absl::time - gmock_main + GTest::gmock_main ) absl_cc_test( diff --git a/absl/synchronization/blocking_counter.cc b/absl/synchronization/blocking_counter.cc index 3cea7aed..d2f82da3 100644 --- a/absl/synchronization/blocking_counter.cc +++ b/absl/synchronization/blocking_counter.cc @@ -14,41 +14,51 @@ #include "absl/synchronization/blocking_counter.h" +#include <atomic> + #include "absl/base/internal/raw_logging.h" namespace absl { ABSL_NAMESPACE_BEGIN -// Return whether int *arg is zero. -static bool IsZero(void *arg) { - return 0 == *reinterpret_cast<int *>(arg); +namespace { + +// Return whether int *arg is true. +bool IsDone(void *arg) { return *reinterpret_cast<bool *>(arg); } + +} // namespace + +BlockingCounter::BlockingCounter(int initial_count) + : count_(initial_count), + num_waiting_(0), + done_{initial_count == 0 ? true : false} { + ABSL_RAW_CHECK(initial_count >= 0, "BlockingCounter initial_count negative"); } bool BlockingCounter::DecrementCount() { - MutexLock l(&lock_); - count_--; - if (count_ < 0) { - ABSL_RAW_LOG( - FATAL, - "BlockingCounter::DecrementCount() called too many times. count=%d", - count_); + int count = count_.fetch_sub(1, std::memory_order_acq_rel) - 1; + ABSL_RAW_CHECK(count >= 0, + "BlockingCounter::DecrementCount() called too many times"); + if (count == 0) { + MutexLock l(&lock_); + done_ = true; + return true; } - return count_ == 0; + return false; } void BlockingCounter::Wait() { MutexLock l(&this->lock_); - ABSL_RAW_CHECK(count_ >= 0, "BlockingCounter underflow"); // only one thread may call Wait(). To support more than one thread, // implement a counter num_to_exit, like in the Barrier class. ABSL_RAW_CHECK(num_waiting_ == 0, "multiple threads called Wait()"); num_waiting_++; - this->lock_.Await(Condition(IsZero, &this->count_)); + this->lock_.Await(Condition(IsDone, &this->done_)); - // At this point, We know that all threads executing DecrementCount have - // released the lock, and so will not touch this object again. + // At this point, we know that all threads executing DecrementCount + // will not touch this object again. // Therefore, the thread calling this method is free to delete the object // after we return from this method. } diff --git a/absl/synchronization/blocking_counter.h b/absl/synchronization/blocking_counter.h index 1f53f9f2..1908fdb1 100644 --- a/absl/synchronization/blocking_counter.h +++ b/absl/synchronization/blocking_counter.h @@ -20,6 +20,8 @@ #ifndef ABSL_SYNCHRONIZATION_BLOCKING_COUNTER_H_ #define ABSL_SYNCHRONIZATION_BLOCKING_COUNTER_H_ +#include <atomic> + #include "absl/base/thread_annotations.h" #include "absl/synchronization/mutex.h" @@ -60,8 +62,7 @@ ABSL_NAMESPACE_BEGIN // class BlockingCounter { public: - explicit BlockingCounter(int initial_count) - : count_(initial_count), num_waiting_(0) {} + explicit BlockingCounter(int initial_count); BlockingCounter(const BlockingCounter&) = delete; BlockingCounter& operator=(const BlockingCounter&) = delete; @@ -89,8 +90,9 @@ class BlockingCounter { private: Mutex lock_; - int count_ ABSL_GUARDED_BY(lock_); + std::atomic<int> count_; int num_waiting_ ABSL_GUARDED_BY(lock_); + bool done_ ABSL_GUARDED_BY(lock_); }; ABSL_NAMESPACE_END diff --git a/absl/synchronization/blocking_counter_benchmark.cc b/absl/synchronization/blocking_counter_benchmark.cc new file mode 100644 index 00000000..b504d1a5 --- /dev/null +++ b/absl/synchronization/blocking_counter_benchmark.cc @@ -0,0 +1,83 @@ +// Copyright 2021 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <limits> + +#include "absl/synchronization/blocking_counter.h" +#include "absl/synchronization/internal/thread_pool.h" +#include "benchmark/benchmark.h" + +namespace { + +void BM_BlockingCounter_SingleThread(benchmark::State& state) { + for (auto _ : state) { + int iterations = state.range(0); + absl::BlockingCounter counter{iterations}; + for (int i = 0; i < iterations; ++i) { + counter.DecrementCount(); + } + counter.Wait(); + } +} +BENCHMARK(BM_BlockingCounter_SingleThread) + ->ArgName("iterations") + ->Arg(2) + ->Arg(4) + ->Arg(16) + ->Arg(64) + ->Arg(256); + +void BM_BlockingCounter_DecrementCount(benchmark::State& state) { + static absl::BlockingCounter* counter = + new absl::BlockingCounter{std::numeric_limits<int>::max()}; + for (auto _ : state) { + counter->DecrementCount(); + } +} +BENCHMARK(BM_BlockingCounter_DecrementCount) + ->Threads(2) + ->Threads(4) + ->Threads(6) + ->Threads(8) + ->Threads(10) + ->Threads(12) + ->Threads(16) + ->Threads(32) + ->Threads(64) + ->Threads(128); + +void BM_BlockingCounter_Wait(benchmark::State& state) { + int num_threads = state.range(0); + absl::synchronization_internal::ThreadPool pool(num_threads); + for (auto _ : state) { + absl::BlockingCounter counter{num_threads}; + pool.Schedule([num_threads, &counter, &pool]() { + for (int i = 0; i < num_threads; ++i) { + pool.Schedule([&counter]() { counter.DecrementCount(); }); + } + }); + counter.Wait(); + } +} +BENCHMARK(BM_BlockingCounter_Wait) + ->ArgName("threads") + ->Arg(2) + ->Arg(4) + ->Arg(8) + ->Arg(16) + ->Arg(32) + ->Arg(64) + ->Arg(128); + +} // namespace diff --git a/absl/synchronization/blocking_counter_test.cc b/absl/synchronization/blocking_counter_test.cc index 2926224a..06885f57 100644 --- a/absl/synchronization/blocking_counter_test.cc +++ b/absl/synchronization/blocking_counter_test.cc @@ -63,6 +63,18 @@ TEST(BlockingCounterTest, BasicFunctionality) { } } +TEST(BlockingCounterTest, WaitZeroInitialCount) { + BlockingCounter counter(0); + counter.Wait(); +} + +#if GTEST_HAS_DEATH_TEST +TEST(BlockingCounterTest, WaitNegativeInitialCount) { + EXPECT_DEATH(BlockingCounter counter(-1), + "BlockingCounter initial_count negative"); +} +#endif + } // namespace ABSL_NAMESPACE_END } // namespace absl diff --git a/absl/synchronization/internal/create_thread_identity.cc b/absl/synchronization/internal/create_thread_identity.cc index 53a71b34..44e6129b 100644 --- a/absl/synchronization/internal/create_thread_identity.cc +++ b/absl/synchronization/internal/create_thread_identity.cc @@ -38,7 +38,7 @@ ABSL_CONST_INIT static base_internal::ThreadIdentity* thread_identity_freelist; // A per-thread destructor for reclaiming associated ThreadIdentity objects. // Since we must preserve their storage we cache them for re-use. -void ReclaimThreadIdentity(void* v) { +static void ReclaimThreadIdentity(void* v) { base_internal::ThreadIdentity* identity = static_cast<base_internal::ThreadIdentity*>(v); @@ -48,8 +48,6 @@ void ReclaimThreadIdentity(void* v) { base_internal::LowLevelAlloc::Free(identity->per_thread_synch.all_locks); } - PerThreadSem::Destroy(identity); - // We must explicitly clear the current thread's identity: // (a) Subsequent (unrelated) per-thread destructors may require an identity. // We must guarantee a new identity is used in this case (this instructor @@ -71,7 +69,12 @@ static intptr_t RoundUp(intptr_t addr, intptr_t align) { return (addr + align - 1) & ~(align - 1); } -static void ResetThreadIdentity(base_internal::ThreadIdentity* identity) { +void OneTimeInitThreadIdentity(base_internal::ThreadIdentity* identity) { + PerThreadSem::Init(identity); +} + +static void ResetThreadIdentityBetweenReuse( + base_internal::ThreadIdentity* identity) { base_internal::PerThreadSynch* pts = &identity->per_thread_synch; pts->next = nullptr; pts->skip = nullptr; @@ -116,8 +119,9 @@ static base_internal::ThreadIdentity* NewThreadIdentity() { identity = reinterpret_cast<base_internal::ThreadIdentity*>( RoundUp(reinterpret_cast<intptr_t>(allocation), base_internal::PerThreadSynch::kAlignment)); + OneTimeInitThreadIdentity(identity); } - ResetThreadIdentity(identity); + ResetThreadIdentityBetweenReuse(identity); return identity; } @@ -127,7 +131,6 @@ static base_internal::ThreadIdentity* NewThreadIdentity() { // REQUIRES: CurrentThreadIdentity(false) == nullptr base_internal::ThreadIdentity* CreateThreadIdentity() { base_internal::ThreadIdentity* identity = NewThreadIdentity(); - PerThreadSem::Init(identity); // Associate the value with the current thread, and attach our destructor. base_internal::SetCurrentThreadIdentity(identity, ReclaimThreadIdentity); return identity; diff --git a/absl/synchronization/internal/create_thread_identity.h b/absl/synchronization/internal/create_thread_identity.h index e121f683..4cfde091 100644 --- a/absl/synchronization/internal/create_thread_identity.h +++ b/absl/synchronization/internal/create_thread_identity.h @@ -36,10 +36,6 @@ namespace synchronization_internal { // For private use only. base_internal::ThreadIdentity* CreateThreadIdentity(); -// A per-thread destructor for reclaiming associated ThreadIdentity objects. -// For private use only. -void ReclaimThreadIdentity(void* v); - // Returns the ThreadIdentity object representing the calling thread; guaranteed // to be unique for its lifetime. The returned object will remain valid for the // program's lifetime; although it may be re-assigned to a subsequent thread. diff --git a/absl/synchronization/internal/futex.h b/absl/synchronization/internal/futex.h new file mode 100644 index 00000000..06fbd6d0 --- /dev/null +++ b/absl/synchronization/internal/futex.h @@ -0,0 +1,154 @@ +// Copyright 2020 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef ABSL_SYNCHRONIZATION_INTERNAL_FUTEX_H_ +#define ABSL_SYNCHRONIZATION_INTERNAL_FUTEX_H_ + +#include "absl/base/config.h" + +#ifdef _WIN32 +#include <windows.h> +#else +#include <sys/time.h> +#include <unistd.h> +#endif + +#ifdef __linux__ +#include <linux/futex.h> +#include <sys/syscall.h> +#endif + +#include <errno.h> +#include <stdio.h> +#include <time.h> + +#include <atomic> +#include <cstdint> + +#include "absl/base/optimization.h" +#include "absl/synchronization/internal/kernel_timeout.h" + +#ifdef ABSL_INTERNAL_HAVE_FUTEX +#error ABSL_INTERNAL_HAVE_FUTEX may not be set on the command line +#elif defined(__BIONIC__) +// Bionic supports all the futex operations we need even when some of the futex +// definitions are missing. +#define ABSL_INTERNAL_HAVE_FUTEX +#elif defined(__linux__) && defined(FUTEX_CLOCK_REALTIME) +// FUTEX_CLOCK_REALTIME requires Linux >= 2.6.28. +#define ABSL_INTERNAL_HAVE_FUTEX +#endif + +#ifdef ABSL_INTERNAL_HAVE_FUTEX + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +// Some Android headers are missing these definitions even though they +// support these futex operations. +#ifdef __BIONIC__ +#ifndef SYS_futex +#define SYS_futex __NR_futex +#endif +#ifndef FUTEX_WAIT_BITSET +#define FUTEX_WAIT_BITSET 9 +#endif +#ifndef FUTEX_PRIVATE_FLAG +#define FUTEX_PRIVATE_FLAG 128 +#endif +#ifndef FUTEX_CLOCK_REALTIME +#define FUTEX_CLOCK_REALTIME 256 +#endif +#ifndef FUTEX_BITSET_MATCH_ANY +#define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF +#endif +#endif + +#if defined(__NR_futex_time64) && !defined(SYS_futex_time64) +#define SYS_futex_time64 __NR_futex_time64 +#endif + +#if defined(SYS_futex_time64) && !defined(SYS_futex) +#define SYS_futex SYS_futex_time64 +#endif + +class FutexImpl { + public: + static int WaitUntil(std::atomic<int32_t> *v, int32_t val, + KernelTimeout t) { + int err = 0; + if (t.has_timeout()) { + // https://locklessinc.com/articles/futex_cheat_sheet/ + // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time. + struct timespec abs_timeout = t.MakeAbsTimespec(); + // Atomically check that the futex value is still 0, and if it + // is, sleep until abs_timeout or until woken by FUTEX_WAKE. + err = syscall( + SYS_futex, reinterpret_cast<int32_t *>(v), + FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val, + &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY); + } else { + // Atomically check that the futex value is still 0, and if it + // is, sleep until woken by FUTEX_WAKE. + err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v), + FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr); + } + if (ABSL_PREDICT_FALSE(err != 0)) { + err = -errno; + } + return err; + } + + static int WaitBitsetAbsoluteTimeout(std::atomic<int32_t> *v, int32_t val, + int32_t bits, + const struct timespec *abstime) { + int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v), + FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG, val, abstime, + nullptr, bits); + if (ABSL_PREDICT_FALSE(err != 0)) { + err = -errno; + } + return err; + } + + static int Wake(std::atomic<int32_t> *v, int32_t count) { + int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v), + FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count); + if (ABSL_PREDICT_FALSE(err < 0)) { + err = -errno; + } + return err; + } + + // FUTEX_WAKE_BITSET + static int WakeBitset(std::atomic<int32_t> *v, int32_t count, int32_t bits) { + int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v), + FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG, count, nullptr, + nullptr, bits); + if (ABSL_PREDICT_FALSE(err < 0)) { + err = -errno; + } + return err; + } +}; + +class Futex : public FutexImpl {}; + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl + +#endif // ABSL_INTERNAL_HAVE_FUTEX + +#endif // ABSL_SYNCHRONIZATION_INTERNAL_FUTEX_H_ diff --git a/absl/synchronization/internal/graphcycles.cc b/absl/synchronization/internal/graphcycles.cc index 19f9aab5..27fec216 100644 --- a/absl/synchronization/internal/graphcycles.cc +++ b/absl/synchronization/internal/graphcycles.cc @@ -37,6 +37,7 @@ #include <algorithm> #include <array> +#include <limits> #include "absl/base/internal/hide_ptr.h" #include "absl/base/internal/raw_logging.h" #include "absl/base/internal/spinlock.h" diff --git a/absl/synchronization/internal/kernel_timeout.h b/absl/synchronization/internal/kernel_timeout.h index d6ac5db0..bbd4d2d7 100644 --- a/absl/synchronization/internal/kernel_timeout.h +++ b/absl/synchronization/internal/kernel_timeout.h @@ -26,6 +26,7 @@ #define ABSL_SYNCHRONIZATION_INTERNAL_KERNEL_TIMEOUT_H_ #include <time.h> + #include <algorithm> #include <limits> @@ -57,6 +58,10 @@ class KernelTimeout { bool has_timeout() const { return ns_ != 0; } + // Convert to parameter for sem_timedwait/futex/similar. Only for approved + // users. Do not call if !has_timeout. + struct timespec MakeAbsTimespec(); + private: // internal rep, not user visible: ns after unix epoch. // zero = no timeout. @@ -82,34 +87,6 @@ class KernelTimeout { return x; } - // Convert to parameter for sem_timedwait/futex/similar. Only for approved - // users. Do not call if !has_timeout. - struct timespec MakeAbsTimespec() { - int64_t n = ns_; - static const int64_t kNanosPerSecond = 1000 * 1000 * 1000; - if (n == 0) { - ABSL_RAW_LOG( - ERROR, - "Tried to create a timespec from a non-timeout; never do this."); - // But we'll try to continue sanely. no-timeout ~= saturated timeout. - n = (std::numeric_limits<int64_t>::max)(); - } - - // Kernel APIs validate timespecs as being at or after the epoch, - // despite the kernel time type being signed. However, no one can - // tell the difference between a timeout at or before the epoch (since - // all such timeouts have expired!) - if (n < 0) n = 0; - - struct timespec abstime; - int64_t seconds = (std::min)(n / kNanosPerSecond, - int64_t{(std::numeric_limits<time_t>::max)()}); - abstime.tv_sec = static_cast<time_t>(seconds); - abstime.tv_nsec = - static_cast<decltype(abstime.tv_nsec)>(n % kNanosPerSecond); - return abstime; - } - #ifdef _WIN32 // Converts to milliseconds from now, or INFINITE when // !has_timeout(). For use by SleepConditionVariableSRW on @@ -148,6 +125,30 @@ class KernelTimeout { friend class Waiter; }; +inline struct timespec KernelTimeout::MakeAbsTimespec() { + int64_t n = ns_; + static const int64_t kNanosPerSecond = 1000 * 1000 * 1000; + if (n == 0) { + ABSL_RAW_LOG( + ERROR, "Tried to create a timespec from a non-timeout; never do this."); + // But we'll try to continue sanely. no-timeout ~= saturated timeout. + n = (std::numeric_limits<int64_t>::max)(); + } + + // Kernel APIs validate timespecs as being at or after the epoch, + // despite the kernel time type being signed. However, no one can + // tell the difference between a timeout at or before the epoch (since + // all such timeouts have expired!) + if (n < 0) n = 0; + + struct timespec abstime; + int64_t seconds = (std::min)(n / kNanosPerSecond, + int64_t{(std::numeric_limits<time_t>::max)()}); + abstime.tv_sec = static_cast<time_t>(seconds); + abstime.tv_nsec = static_cast<decltype(abstime.tv_nsec)>(n % kNanosPerSecond); + return abstime; +} + } // namespace synchronization_internal ABSL_NAMESPACE_END } // namespace absl diff --git a/absl/synchronization/internal/mutex_nonprod.cc b/absl/synchronization/internal/mutex_nonprod.cc deleted file mode 100644 index 1732f836..00000000 --- a/absl/synchronization/internal/mutex_nonprod.cc +++ /dev/null @@ -1,324 +0,0 @@ -// Copyright 2017 The Abseil Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Implementation of a small subset of Mutex and CondVar functionality -// for platforms where the production implementation hasn't been fully -// ported yet. - -#include "absl/synchronization/mutex.h" - -#if defined(_WIN32) -#include <chrono> // NOLINT(build/c++11) -#else -#include <sys/time.h> -#include <time.h> -#endif - -#include <algorithm> - -#include "absl/base/internal/raw_logging.h" -#include "absl/time/time.h" - -namespace absl { -ABSL_NAMESPACE_BEGIN - -void SetMutexDeadlockDetectionMode(OnDeadlockCycle) {} -void EnableMutexInvariantDebugging(bool) {} - -namespace synchronization_internal { - -namespace { - -// Return the current time plus the timeout. -absl::Time DeadlineFromTimeout(absl::Duration timeout) { - return absl::Now() + timeout; -} - -// Limit the deadline to a positive, 32-bit time_t value to accommodate -// implementation restrictions. This also deals with InfinitePast and -// InfiniteFuture. -absl::Time LimitedDeadline(absl::Time deadline) { - deadline = std::max(absl::FromTimeT(0), deadline); - deadline = std::min(deadline, absl::FromTimeT(0x7fffffff)); - return deadline; -} - -} // namespace - -#if defined(_WIN32) - -MutexImpl::MutexImpl() {} - -MutexImpl::~MutexImpl() { - if (locked_) { - std_mutex_.unlock(); - } -} - -void MutexImpl::Lock() { - std_mutex_.lock(); - locked_ = true; -} - -bool MutexImpl::TryLock() { - bool locked = std_mutex_.try_lock(); - if (locked) locked_ = true; - return locked; -} - -void MutexImpl::Unlock() { - locked_ = false; - released_.SignalAll(); - std_mutex_.unlock(); -} - -CondVarImpl::CondVarImpl() {} - -CondVarImpl::~CondVarImpl() {} - -void CondVarImpl::Signal() { std_cv_.notify_one(); } - -void CondVarImpl::SignalAll() { std_cv_.notify_all(); } - -void CondVarImpl::Wait(MutexImpl* mu) { - mu->released_.SignalAll(); - std_cv_.wait(mu->std_mutex_); -} - -bool CondVarImpl::WaitWithDeadline(MutexImpl* mu, absl::Time deadline) { - mu->released_.SignalAll(); - time_t when = ToTimeT(deadline); - int64_t nanos = ToInt64Nanoseconds(deadline - absl::FromTimeT(when)); - std::chrono::system_clock::time_point deadline_tp = - std::chrono::system_clock::from_time_t(when) + - std::chrono::duration_cast<std::chrono::system_clock::duration>( - std::chrono::nanoseconds(nanos)); - auto deadline_since_epoch = - std::chrono::duration_cast<std::chrono::duration<double>>( - deadline_tp - std::chrono::system_clock::from_time_t(0)); - return std_cv_.wait_until(mu->std_mutex_, deadline_tp) == - std::cv_status::timeout; -} - -#else // ! _WIN32 - -MutexImpl::MutexImpl() { - ABSL_RAW_CHECK(pthread_mutex_init(&pthread_mutex_, nullptr) == 0, - "pthread error"); -} - -MutexImpl::~MutexImpl() { - if (locked_) { - ABSL_RAW_CHECK(pthread_mutex_unlock(&pthread_mutex_) == 0, "pthread error"); - } - ABSL_RAW_CHECK(pthread_mutex_destroy(&pthread_mutex_) == 0, "pthread error"); -} - -void MutexImpl::Lock() { - ABSL_RAW_CHECK(pthread_mutex_lock(&pthread_mutex_) == 0, "pthread error"); - locked_ = true; -} - -bool MutexImpl::TryLock() { - bool locked = (0 == pthread_mutex_trylock(&pthread_mutex_)); - if (locked) locked_ = true; - return locked; -} - -void MutexImpl::Unlock() { - locked_ = false; - released_.SignalAll(); - ABSL_RAW_CHECK(pthread_mutex_unlock(&pthread_mutex_) == 0, "pthread error"); -} - -CondVarImpl::CondVarImpl() { - ABSL_RAW_CHECK(pthread_cond_init(&pthread_cv_, nullptr) == 0, - "pthread error"); -} - -CondVarImpl::~CondVarImpl() { - ABSL_RAW_CHECK(pthread_cond_destroy(&pthread_cv_) == 0, "pthread error"); -} - -void CondVarImpl::Signal() { - ABSL_RAW_CHECK(pthread_cond_signal(&pthread_cv_) == 0, "pthread error"); -} - -void CondVarImpl::SignalAll() { - ABSL_RAW_CHECK(pthread_cond_broadcast(&pthread_cv_) == 0, "pthread error"); -} - -void CondVarImpl::Wait(MutexImpl* mu) { - mu->released_.SignalAll(); - ABSL_RAW_CHECK(pthread_cond_wait(&pthread_cv_, &mu->pthread_mutex_) == 0, - "pthread error"); -} - -bool CondVarImpl::WaitWithDeadline(MutexImpl* mu, absl::Time deadline) { - mu->released_.SignalAll(); - struct timespec ts = ToTimespec(deadline); - int rc = pthread_cond_timedwait(&pthread_cv_, &mu->pthread_mutex_, &ts); - if (rc == ETIMEDOUT) return true; - ABSL_RAW_CHECK(rc == 0, "pthread error"); - return false; -} - -#endif // ! _WIN32 - -void MutexImpl::Await(const Condition& cond) { - if (cond.Eval()) return; - released_.SignalAll(); - do { - released_.Wait(this); - } while (!cond.Eval()); -} - -bool MutexImpl::AwaitWithDeadline(const Condition& cond, absl::Time deadline) { - if (cond.Eval()) return true; - released_.SignalAll(); - while (true) { - if (released_.WaitWithDeadline(this, deadline)) return false; - if (cond.Eval()) return true; - } -} - -} // namespace synchronization_internal - -Mutex::Mutex() {} - -Mutex::~Mutex() {} - -void Mutex::Lock() { impl()->Lock(); } - -void Mutex::Unlock() { impl()->Unlock(); } - -bool Mutex::TryLock() { return impl()->TryLock(); } - -void Mutex::ReaderLock() { Lock(); } - -void Mutex::ReaderUnlock() { Unlock(); } - -void Mutex::Await(const Condition& cond) { impl()->Await(cond); } - -void Mutex::LockWhen(const Condition& cond) { - Lock(); - Await(cond); -} - -bool Mutex::AwaitWithDeadline(const Condition& cond, absl::Time deadline) { - return impl()->AwaitWithDeadline( - cond, synchronization_internal::LimitedDeadline(deadline)); -} - -bool Mutex::AwaitWithTimeout(const Condition& cond, absl::Duration timeout) { - return AwaitWithDeadline( - cond, synchronization_internal::DeadlineFromTimeout(timeout)); -} - -bool Mutex::LockWhenWithDeadline(const Condition& cond, absl::Time deadline) { - Lock(); - return AwaitWithDeadline(cond, deadline); -} - -bool Mutex::LockWhenWithTimeout(const Condition& cond, absl::Duration timeout) { - return LockWhenWithDeadline( - cond, synchronization_internal::DeadlineFromTimeout(timeout)); -} - -void Mutex::ReaderLockWhen(const Condition& cond) { - ReaderLock(); - Await(cond); -} - -bool Mutex::ReaderLockWhenWithTimeout(const Condition& cond, - absl::Duration timeout) { - return LockWhenWithTimeout(cond, timeout); -} -bool Mutex::ReaderLockWhenWithDeadline(const Condition& cond, - absl::Time deadline) { - return LockWhenWithDeadline(cond, deadline); -} - -void Mutex::EnableDebugLog(const char*) {} -void Mutex::EnableInvariantDebugging(void (*)(void*), void*) {} -void Mutex::ForgetDeadlockInfo() {} -void Mutex::AssertHeld() const {} -void Mutex::AssertReaderHeld() const {} -void Mutex::AssertNotHeld() const {} - -CondVar::CondVar() {} - -CondVar::~CondVar() {} - -void CondVar::Signal() { impl()->Signal(); } - -void CondVar::SignalAll() { impl()->SignalAll(); } - -void CondVar::Wait(Mutex* mu) { return impl()->Wait(mu->impl()); } - -bool CondVar::WaitWithDeadline(Mutex* mu, absl::Time deadline) { - return impl()->WaitWithDeadline( - mu->impl(), synchronization_internal::LimitedDeadline(deadline)); -} - -bool CondVar::WaitWithTimeout(Mutex* mu, absl::Duration timeout) { - return WaitWithDeadline(mu, absl::Now() + timeout); -} - -void CondVar::EnableDebugLog(const char*) {} - -#ifdef THREAD_SANITIZER -extern "C" void __tsan_read1(void *addr); -#else -#define __tsan_read1(addr) // do nothing if TSan not enabled -#endif - -// A function that just returns its argument, dereferenced -static bool Dereference(void *arg) { - // ThreadSanitizer does not instrument this file for memory accesses. - // This function dereferences a user variable that can participate - // in a data race, so we need to manually tell TSan about this memory access. - __tsan_read1(arg); - return *(static_cast<bool *>(arg)); -} - -Condition::Condition() {} // null constructor, used for kTrue only -const Condition Condition::kTrue; - -Condition::Condition(bool (*func)(void *), void *arg) - : eval_(&CallVoidPtrFunction), - function_(func), - method_(nullptr), - arg_(arg) {} - -bool Condition::CallVoidPtrFunction(const Condition *c) { - return (*c->function_)(c->arg_); -} - -Condition::Condition(const bool *cond) - : eval_(CallVoidPtrFunction), - function_(Dereference), - method_(nullptr), - // const_cast is safe since Dereference does not modify arg - arg_(const_cast<bool *>(cond)) {} - -bool Condition::Eval() const { - // eval_ == null for kTrue - return (this->eval_ == nullptr) || (*this->eval_)(this); -} - -void RegisterSymbolizer(bool (*)(const void*, char*, int)) {} - -ABSL_NAMESPACE_END -} // namespace absl diff --git a/absl/synchronization/internal/mutex_nonprod.inc b/absl/synchronization/internal/mutex_nonprod.inc deleted file mode 100644 index d83bc8a9..00000000 --- a/absl/synchronization/internal/mutex_nonprod.inc +++ /dev/null @@ -1,249 +0,0 @@ -// Do not include. This is an implementation detail of base/mutex.h. -// -// Declares three classes: -// -// base::internal::MutexImpl - implementation helper for Mutex -// base::internal::CondVarImpl - implementation helper for CondVar -// base::internal::SynchronizationStorage<T> - implementation helper for -// Mutex, CondVar - -#include <type_traits> - -#if defined(_WIN32) -#include <condition_variable> -#include <mutex> -#else -#include <pthread.h> -#endif - -#include "absl/base/call_once.h" -#include "absl/time/time.h" - -// Declare that Mutex::ReaderLock is actually Lock(). Intended primarily -// for tests, and even then as a last resort. -#ifdef ABSL_MUTEX_READER_LOCK_IS_EXCLUSIVE -#error ABSL_MUTEX_READER_LOCK_IS_EXCLUSIVE cannot be directly set -#else -#define ABSL_MUTEX_READER_LOCK_IS_EXCLUSIVE 1 -#endif - -// Declare that Mutex::EnableInvariantDebugging is not implemented. -// Intended primarily for tests, and even then as a last resort. -#ifdef ABSL_MUTEX_ENABLE_INVARIANT_DEBUGGING_NOT_IMPLEMENTED -#error ABSL_MUTEX_ENABLE_INVARIANT_DEBUGGING_NOT_IMPLEMENTED cannot be directly set -#else -#define ABSL_MUTEX_ENABLE_INVARIANT_DEBUGGING_NOT_IMPLEMENTED 1 -#endif - -namespace absl { -ABSL_NAMESPACE_BEGIN -class Condition; - -namespace synchronization_internal { - -class MutexImpl; - -// Do not use this implementation detail of CondVar. Provides most of the -// implementation, but should not be placed directly in static storage -// because it will not linker initialize properly. See -// SynchronizationStorage<T> below for what we mean by linker -// initialization. -class CondVarImpl { - public: - CondVarImpl(); - CondVarImpl(const CondVarImpl&) = delete; - CondVarImpl& operator=(const CondVarImpl&) = delete; - ~CondVarImpl(); - - void Signal(); - void SignalAll(); - void Wait(MutexImpl* mutex); - bool WaitWithDeadline(MutexImpl* mutex, absl::Time deadline); - - private: -#if defined(_WIN32) - std::condition_variable_any std_cv_; -#else - pthread_cond_t pthread_cv_; -#endif -}; - -// Do not use this implementation detail of Mutex. Provides most of the -// implementation, but should not be placed directly in static storage -// because it will not linker initialize properly. See -// SynchronizationStorage<T> below for what we mean by linker -// initialization. -class MutexImpl { - public: - MutexImpl(); - MutexImpl(const MutexImpl&) = delete; - MutexImpl& operator=(const MutexImpl&) = delete; - ~MutexImpl(); - - void Lock(); - bool TryLock(); - void Unlock(); - void Await(const Condition& cond); - bool AwaitWithDeadline(const Condition& cond, absl::Time deadline); - - private: - friend class CondVarImpl; - -#if defined(_WIN32) - std::mutex std_mutex_; -#else - pthread_mutex_t pthread_mutex_; -#endif - - // True if the underlying mutex is locked. If the destructor is entered - // while locked_, the underlying mutex is unlocked. Mutex supports - // destruction while locked, but the same is undefined behavior for both - // pthread_mutex_t and std::mutex. - bool locked_ = false; - - // Signaled before releasing the lock, in support of Await. - CondVarImpl released_; -}; - -// Do not use this implementation detail of CondVar and Mutex. A storage -// space for T that supports a LinkerInitialized constructor. T must -// have a default constructor, which is called by the first call to -// get(). T's destructor is never called if the LinkerInitialized -// constructor is called. -// -// Objects constructed with the default constructor are constructed and -// destructed like any other object, and should never be allocated in -// static storage. -// -// Objects constructed with the LinkerInitialized constructor should -// always be in static storage. For such objects, calls to get() are always -// valid, except from signal handlers. -// -// Note that this implementation relies on undefined language behavior that -// are known to hold for the set of supported compilers. An analysis -// follows. -// -// From the C++11 standard: -// -// [basic.life] says an object has non-trivial initialization if it is of -// class type and it is initialized by a constructor other than a trivial -// default constructor. (the LinkerInitialized constructor is -// non-trivial) -// -// [basic.life] says the lifetime of an object with a non-trivial -// constructor begins when the call to the constructor is complete. -// -// [basic.life] says the lifetime of an object with non-trivial destructor -// ends when the call to the destructor begins. -// -// [basic.life] p5 specifies undefined behavior when accessing non-static -// members of an instance outside its -// lifetime. (SynchronizationStorage::get() access non-static members) -// -// So, LinkerInitialized object of SynchronizationStorage uses a -// non-trivial constructor, which is called at some point during dynamic -// initialization, and is therefore subject to order of dynamic -// initialization bugs, where get() is called before the object's -// constructor is, resulting in undefined behavior. -// -// Similarly, a LinkerInitialized SynchronizationStorage object has a -// non-trivial destructor, and so its lifetime ends at some point during -// destruction of objects with static storage duration [basic.start.term] -// p4. There is a window where other exit code could call get() after this -// occurs, resulting in undefined behavior. -// -// Combined, these statements imply that LinkerInitialized instances -// of SynchronizationStorage<T> rely on undefined behavior. -// -// However, in practice, the implementation works on all supported -// compilers. Specifically, we rely on: -// -// a) zero-initialization being sufficient to initialize -// LinkerInitialized instances for the purposes of calling -// get(), regardless of when the constructor is called. This is -// because the is_dynamic_ boolean is correctly zero-initialized to -// false. -// -// b) the LinkerInitialized constructor is a NOP, and immaterial to -// even to concurrent calls to get(). -// -// c) the destructor being a NOP for LinkerInitialized objects -// (guaranteed by a check for !is_dynamic_), and so any concurrent and -// subsequent calls to get() functioning as if the destructor were not -// called, by virtue of the instances' storage remaining valid after the -// destructor runs. -// -// d) That a-c apply transitively when SynchronizationStorage<T> is the -// only member of a class allocated in static storage. -// -// Nothing in the language standard guarantees that a-d hold. In practice, -// these hold in all supported compilers. -// -// Future direction: -// -// Ideally, we would simply use std::mutex or a similar class, which when -// allocated statically would support use immediately after static -// initialization up until static storage is reclaimed (i.e. the properties -// we require of all "linker initialized" instances). -// -// Regarding construction in static storage, std::mutex is required to -// provide a constexpr default constructor [thread.mutex.class], which -// ensures the instance's lifetime begins with static initialization -// [basic.start.init], and so is immune to any problems caused by the order -// of dynamic initialization. However, as of this writing Microsoft's -// Visual Studio does not provide a constexpr constructor for std::mutex. -// See -// https://blogs.msdn.microsoft.com/vcblog/2015/06/02/constexpr-complete-for-vs-2015-rtm-c11-compiler-c17-stl/ -// -// Regarding destruction of instances in static storage, [basic.life] does -// say an object ends when storage in which the occupies is released, in -// the case of non-trivial destructor. However, std::mutex is not specified -// to have a trivial destructor. -// -// So, we would need a class with a constexpr default constructor and a -// trivial destructor. Today, we can achieve neither desired property using -// std::mutex directly. -template <typename T> -class SynchronizationStorage { - public: - // Instances allocated on the heap or on the stack should use the default - // constructor. - SynchronizationStorage() - : destruct_(true), once_() {} - - constexpr explicit SynchronizationStorage(absl::ConstInitType) - : destruct_(false), once_(), space_{{0}} {} - - SynchronizationStorage(SynchronizationStorage&) = delete; - SynchronizationStorage& operator=(SynchronizationStorage&) = delete; - - ~SynchronizationStorage() { - if (destruct_) { - get()->~T(); - } - } - - // Retrieve the object in storage. This is fast and thread safe, but does - // incur the cost of absl::call_once(). - T* get() { - absl::call_once(once_, SynchronizationStorage::Construct, this); - return reinterpret_cast<T*>(&space_); - } - - private: - static void Construct(SynchronizationStorage<T>* self) { - new (&self->space_) T(); - } - - // When true, T's destructor is run when this is destructed. - const bool destruct_; - - absl::once_flag once_; - - // An aligned space for the T. - alignas(T) unsigned char space_[sizeof(T)]; -}; - -} // namespace synchronization_internal -ABSL_NAMESPACE_END -} // namespace absl diff --git a/absl/synchronization/internal/per_thread_sem.cc b/absl/synchronization/internal/per_thread_sem.cc index 821ca9b4..469e8f32 100644 --- a/absl/synchronization/internal/per_thread_sem.cc +++ b/absl/synchronization/internal/per_thread_sem.cc @@ -47,10 +47,6 @@ void PerThreadSem::Init(base_internal::ThreadIdentity *identity) { identity->is_idle.store(false, std::memory_order_relaxed); } -void PerThreadSem::Destroy(base_internal::ThreadIdentity *identity) { - Waiter::GetWaiter(identity)->~Waiter(); -} - void PerThreadSem::Tick(base_internal::ThreadIdentity *identity) { const int ticker = identity->ticker.fetch_add(1, std::memory_order_relaxed) + 1; @@ -68,12 +64,12 @@ ABSL_NAMESPACE_END extern "C" { -ABSL_ATTRIBUTE_WEAK void AbslInternalPerThreadSemPost( +ABSL_ATTRIBUTE_WEAK void ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemPost)( absl::base_internal::ThreadIdentity *identity) { absl::synchronization_internal::Waiter::GetWaiter(identity)->Post(); } -ABSL_ATTRIBUTE_WEAK bool AbslInternalPerThreadSemWait( +ABSL_ATTRIBUTE_WEAK bool ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemWait)( absl::synchronization_internal::KernelTimeout t) { bool timeout = false; absl::base_internal::ThreadIdentity *identity; diff --git a/absl/synchronization/internal/per_thread_sem.h b/absl/synchronization/internal/per_thread_sem.h index 8ab43915..90a88809 100644 --- a/absl/synchronization/internal/per_thread_sem.h +++ b/absl/synchronization/internal/per_thread_sem.h @@ -66,10 +66,6 @@ class PerThreadSem { // REQUIRES: May only be called by ThreadIdentity. static void Init(base_internal::ThreadIdentity* identity); - // Destroy the PerThreadSem associated with "identity". - // REQUIRES: May only be called by ThreadIdentity. - static void Destroy(base_internal::ThreadIdentity* identity); - // Increments "identity"'s count. static inline void Post(base_internal::ThreadIdentity* identity); @@ -78,11 +74,10 @@ class PerThreadSem { // !t.has_timeout() => Wait(t) will return true. static inline bool Wait(KernelTimeout t); - // White-listed callers. + // Permitted callers. friend class PerThreadSemTest; friend class absl::Mutex; - friend absl::base_internal::ThreadIdentity* CreateThreadIdentity(); - friend void ReclaimThreadIdentity(void* v); + friend void OneTimeInitThreadIdentity(absl::base_internal::ThreadIdentity*); }; } // namespace synchronization_internal @@ -96,20 +91,20 @@ ABSL_NAMESPACE_END // By changing our extension points to be extern "C", we dodge this // check. extern "C" { -void AbslInternalPerThreadSemPost( +void ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemPost)( absl::base_internal::ThreadIdentity* identity); -bool AbslInternalPerThreadSemWait( +bool ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemWait)( absl::synchronization_internal::KernelTimeout t); } // extern "C" void absl::synchronization_internal::PerThreadSem::Post( absl::base_internal::ThreadIdentity* identity) { - AbslInternalPerThreadSemPost(identity); + ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemPost)(identity); } bool absl::synchronization_internal::PerThreadSem::Wait( absl::synchronization_internal::KernelTimeout t) { - return AbslInternalPerThreadSemWait(t); + return ABSL_INTERNAL_C_SYMBOL(AbslInternalPerThreadSemWait)(t); } #endif // ABSL_SYNCHRONIZATION_INTERNAL_PER_THREAD_SEM_H_ diff --git a/absl/synchronization/internal/per_thread_sem_test.cc b/absl/synchronization/internal/per_thread_sem_test.cc index b5a2f6d4..24a6b548 100644 --- a/absl/synchronization/internal/per_thread_sem_test.cc +++ b/absl/synchronization/internal/per_thread_sem_test.cc @@ -23,6 +23,7 @@ #include <thread> // NOLINT(build/c++11) #include "gtest/gtest.h" +#include "absl/base/config.h" #include "absl/base/internal/cycleclock.h" #include "absl/base/internal/thread_identity.h" #include "absl/strings/str_cat.h" @@ -158,7 +159,7 @@ TEST_F(PerThreadSemTest, Timeouts) { const absl::Duration elapsed = absl::Now() - start; // Allow for a slight early return, to account for quality of implementation // issues on various platforms. - const absl::Duration slop = absl::Microseconds(200); + const absl::Duration slop = absl::Milliseconds(1); EXPECT_LE(delay - slop, elapsed) << "Wait returned " << delay - elapsed << " early (with " << slop << " slop), start time was " << start; @@ -173,6 +174,15 @@ TEST_F(PerThreadSemTest, Timeouts) { EXPECT_TRUE(Wait(negative_timeout)); } +TEST_F(PerThreadSemTest, ThreadIdentityReuse) { + // Create a base_internal::ThreadIdentity object and keep reusing it. There + // should be no memory or resource leaks. + for (int i = 0; i < 10000; i++) { + std::thread t([]() { GetOrCreateCurrentThreadIdentity(); }); + t.join(); + } +} + } // namespace } // namespace synchronization_internal diff --git a/absl/synchronization/internal/waiter.cc b/absl/synchronization/internal/waiter.cc index b6150b9b..f2051d67 100644 --- a/absl/synchronization/internal/waiter.cc +++ b/absl/synchronization/internal/waiter.cc @@ -48,6 +48,7 @@ #include "absl/base/optimization.h" #include "absl/synchronization/internal/kernel_timeout.h" + namespace absl { ABSL_NAMESPACE_BEGIN namespace synchronization_internal { @@ -66,83 +67,17 @@ static void MaybeBecomeIdle() { #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX -// Some Android headers are missing these definitions even though they -// support these futex operations. -#ifdef __BIONIC__ -#ifndef SYS_futex -#define SYS_futex __NR_futex -#endif -#ifndef FUTEX_WAIT_BITSET -#define FUTEX_WAIT_BITSET 9 -#endif -#ifndef FUTEX_PRIVATE_FLAG -#define FUTEX_PRIVATE_FLAG 128 -#endif -#ifndef FUTEX_CLOCK_REALTIME -#define FUTEX_CLOCK_REALTIME 256 -#endif -#ifndef FUTEX_BITSET_MATCH_ANY -#define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF -#endif -#endif - -#if defined(__NR_futex_time64) && !defined(SYS_futex_time64) -#define SYS_futex_time64 __NR_futex_time64 -#endif - -#if defined(SYS_futex_time64) && !defined(SYS_futex) -#define SYS_futex SYS_futex_time64 -#endif - -class Futex { - public: - static int WaitUntil(std::atomic<int32_t> *v, int32_t val, - KernelTimeout t) { - int err = 0; - if (t.has_timeout()) { - // https://locklessinc.com/articles/futex_cheat_sheet/ - // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time. - struct timespec abs_timeout = t.MakeAbsTimespec(); - // Atomically check that the futex value is still 0, and if it - // is, sleep until abs_timeout or until woken by FUTEX_WAKE. - err = syscall( - SYS_futex, reinterpret_cast<int32_t *>(v), - FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val, - &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY); - } else { - // Atomically check that the futex value is still 0, and if it - // is, sleep until woken by FUTEX_WAKE. - err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v), - FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr); - } - if (err != 0) { - err = -errno; - } - return err; - } - - static int Wake(std::atomic<int32_t> *v, int32_t count) { - int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v), - FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count); - if (ABSL_PREDICT_FALSE(err < 0)) { - err = -errno; - } - return err; - } -}; - Waiter::Waiter() { futex_.store(0, std::memory_order_relaxed); } -Waiter::~Waiter() = default; - bool Waiter::Wait(KernelTimeout t) { // Loop until we can atomically decrement futex from a positive // value, waiting on a futex while we believe it is zero. // Note that, since the thread ticker is just reset, we don't need to check // whether the thread is idle on the very first pass of the loop. bool first_pass = true; + while (true) { int32_t x = futex_.load(std::memory_order_relaxed); while (x != 0) { @@ -154,7 +89,6 @@ bool Waiter::Wait(KernelTimeout t) { return true; // Consumed a wakeup, we are done. } - if (!first_pass) MaybeBecomeIdle(); const int err = Futex::WaitUntil(&futex_, 0, t); if (err != 0) { @@ -225,18 +159,6 @@ Waiter::Waiter() { wakeup_count_ = 0; } -Waiter::~Waiter() { - const int err = pthread_mutex_destroy(&mu_); - if (err != 0) { - ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err); - } - - const int err2 = pthread_cond_destroy(&cv_); - if (err2 != 0) { - ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2); - } -} - bool Waiter::Wait(KernelTimeout t) { struct timespec abs_timeout; if (t.has_timeout()) { @@ -304,12 +226,6 @@ Waiter::Waiter() { wakeups_.store(0, std::memory_order_relaxed); } -Waiter::~Waiter() { - if (sem_destroy(&sem_) != 0) { - ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno); - } -} - bool Waiter::Wait(KernelTimeout t) { struct timespec abs_timeout; if (t.has_timeout()) { @@ -427,11 +343,6 @@ Waiter::Waiter() { wakeup_count_ = 0; } -// SRW locks and condition variables do not need to be explicitly destroyed. -// https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock -// https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with -Waiter::~Waiter() = default; - bool Waiter::Wait(KernelTimeout t) { SRWLOCK *mu = WinHelper::GetLock(this); CONDITION_VARIABLE *cv = WinHelper::GetCond(this); diff --git a/absl/synchronization/internal/waiter.h b/absl/synchronization/internal/waiter.h index ae83907b..b8adfeb5 100644 --- a/absl/synchronization/internal/waiter.h +++ b/absl/synchronization/internal/waiter.h @@ -36,6 +36,7 @@ #include <cstdint> #include "absl/base/internal/thread_identity.h" +#include "absl/synchronization/internal/futex.h" #include "absl/synchronization/internal/kernel_timeout.h" // May be chosen at compile time via -DABSL_FORCE_WAITER_MODE=<index> @@ -48,12 +49,7 @@ #define ABSL_WAITER_MODE ABSL_FORCE_WAITER_MODE #elif defined(_WIN32) && _WIN32_WINNT >= _WIN32_WINNT_VISTA #define ABSL_WAITER_MODE ABSL_WAITER_MODE_WIN32 -#elif defined(__BIONIC__) -// Bionic supports all the futex operations we need even when some of the futex -// definitions are missing. -#define ABSL_WAITER_MODE ABSL_WAITER_MODE_FUTEX -#elif defined(__linux__) && defined(FUTEX_CLOCK_REALTIME) -// FUTEX_CLOCK_REALTIME requires Linux >= 2.6.28. +#elif defined(ABSL_INTERNAL_HAVE_FUTEX) #define ABSL_WAITER_MODE ABSL_WAITER_MODE_FUTEX #elif defined(ABSL_HAVE_SEMAPHORE_H) #define ABSL_WAITER_MODE ABSL_WAITER_MODE_SEM @@ -75,9 +71,6 @@ class Waiter { Waiter(const Waiter&) = delete; Waiter& operator=(const Waiter&) = delete; - // Destroy any data to track waits. - ~Waiter(); - // Blocks the calling thread until a matching call to `Post()` or // `t` has passed. Returns `true` if woken (`Post()` called), // `false` on timeout. @@ -100,7 +93,7 @@ class Waiter { } // How many periods to remain idle before releasing resources -#ifndef THREAD_SANITIZER +#ifndef ABSL_HAVE_THREAD_SANITIZER static constexpr int kIdlePeriods = 60; #else // Memory consumption under ThreadSanitizer is a serious concern, @@ -110,6 +103,12 @@ class Waiter { #endif private: + // The destructor must not be called since Mutex/CondVar + // can use PerThreadSem/Waiter after the thread exits. + // Waiter objects are embedded in ThreadIdentity objects, + // which are reused via a freelist and are never destroyed. + ~Waiter() = delete; + #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX // Futexes are defined by specification to be 32-bits. // Thus std::atomic<int32_t> must be just an int32_t with lockfree methods. @@ -140,8 +139,11 @@ class Waiter { // REQUIRES: WinHelper::GetLock(this) must be held. void InternalCondVarPoke(); - // We can't include Windows.h in our headers, so we use aligned charachter + // We can't include Windows.h in our headers, so we use aligned character // buffers to define the storage of SRWLOCK and CONDITION_VARIABLE. + // SRW locks and condition variables do not need to be explicitly destroyed. + // https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock + // https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with alignas(void*) unsigned char mu_storage_[sizeof(void*)]; alignas(void*) unsigned char cv_storage_[sizeof(void*)]; int waiter_count_; diff --git a/absl/synchronization/mutex.cc b/absl/synchronization/mutex.cc index 1f8a696e..52e2455d 100644 --- a/absl/synchronization/mutex.cc +++ b/absl/synchronization/mutex.cc @@ -39,6 +39,7 @@ #include <thread> // NOLINT(build/c++11) #include "absl/base/attributes.h" +#include "absl/base/call_once.h" #include "absl/base/config.h" #include "absl/base/dynamic_annotations.h" #include "absl/base/internal/atomic_hook.h" @@ -49,6 +50,7 @@ #include "absl/base/internal/spinlock.h" #include "absl/base/internal/sysinfo.h" #include "absl/base/internal/thread_identity.h" +#include "absl/base/internal/tsan_mutex_interface.h" #include "absl/base/port.h" #include "absl/debugging/stacktrace.h" #include "absl/debugging/symbolize.h" @@ -58,6 +60,7 @@ using absl::base_internal::CurrentThreadIdentityIfPresent; using absl::base_internal::PerThreadSynch; +using absl::base_internal::SchedulingGuard; using absl::base_internal::ThreadIdentity; using absl::synchronization_internal::GetOrCreateCurrentThreadIdentity; using absl::synchronization_internal::GraphCycles; @@ -67,7 +70,9 @@ using absl::synchronization_internal::KernelTimeout; using absl::synchronization_internal::PerThreadSem; extern "C" { -ABSL_ATTRIBUTE_WEAK void AbslInternalMutexYield() { std::this_thread::yield(); } +ABSL_ATTRIBUTE_WEAK void ABSL_INTERNAL_C_SYMBOL(AbslInternalMutexYield)() { + std::this_thread::yield(); +} } // extern "C" namespace absl { @@ -75,7 +80,7 @@ ABSL_NAMESPACE_BEGIN namespace { -#if defined(THREAD_SANITIZER) +#if defined(ABSL_HAVE_THREAD_SANITIZER) constexpr OnDeadlockCycle kDeadlockDetectionDefault = OnDeadlockCycle::kIgnore; #else constexpr OnDeadlockCycle kDeadlockDetectionDefault = OnDeadlockCycle::kAbort; @@ -85,31 +90,9 @@ ABSL_CONST_INIT std::atomic<OnDeadlockCycle> synch_deadlock_detection( kDeadlockDetectionDefault); ABSL_CONST_INIT std::atomic<bool> synch_check_invariants(false); -// ------------------------------------------ spinlock support - -// Make sure read-only globals used in the Mutex code are contained on the -// same cacheline and cacheline aligned to eliminate any false sharing with -// other globals from this and other modules. -static struct MutexGlobals { - MutexGlobals() { - // Find machine-specific data needed for Delay() and - // TryAcquireWithSpinning(). This runs in the global constructor - // sequence, and before that zeros are safe values. - num_cpus = absl::base_internal::NumCPUs(); - spinloop_iterations = num_cpus > 1 ? 1500 : 0; - } - int num_cpus; - int spinloop_iterations; - // Pad this struct to a full cacheline to prevent false sharing. - char padding[ABSL_CACHELINE_SIZE - 2 * sizeof(int)]; -} ABSL_CACHELINE_ALIGNED mutex_globals; -static_assert( - sizeof(MutexGlobals) == ABSL_CACHELINE_SIZE, - "MutexGlobals must occupy an entire cacheline to prevent false sharing"); - ABSL_INTERNAL_ATOMIC_HOOK_ATTRIBUTES - absl::base_internal::AtomicHook<void (*)(int64_t wait_cycles)> - submit_profile_data; +absl::base_internal::AtomicHook<void (*)(int64_t wait_cycles)> + submit_profile_data; ABSL_INTERNAL_ATOMIC_HOOK_ATTRIBUTES absl::base_internal::AtomicHook<void (*)( const char *msg, const void *obj, int64_t wait_cycles)> mutex_tracer; @@ -126,7 +109,7 @@ static inline bool EvalConditionAnnotated(const Condition *cond, Mutex *mu, bool locking, bool trylock, bool read_lock); -void RegisterMutexProfiler(void (*fn)(int64_t wait_timestamp)) { +void RegisterMutexProfiler(void (*fn)(int64_t wait_cycles)) { submit_profile_data.Store(fn); } @@ -143,33 +126,64 @@ void RegisterSymbolizer(bool (*fn)(const void *pc, char *out, int out_size)) { symbolizer.Store(fn); } -// spinlock delay on iteration c. Returns new c. namespace { - enum DelayMode { AGGRESSIVE, GENTLE }; +// Represents the strategy for spin and yield. +// See the comment in GetMutexGlobals() for more information. +enum DelayMode { AGGRESSIVE, GENTLE }; + +struct ABSL_CACHELINE_ALIGNED MutexGlobals { + absl::once_flag once; + int spinloop_iterations = 0; + int32_t mutex_sleep_limit[2] = {}; }; -static int Delay(int32_t c, DelayMode mode) { - // If this a uniprocessor, only yield/sleep. Otherwise, if the mode is - // aggressive then spin many times before yielding. If the mode is - // gentle then spin only a few times before yielding. Aggressive spinning is - // used to ensure that an Unlock() call, which must get the spin lock for - // any thread to make progress gets it without undue delay. - int32_t limit = (mutex_globals.num_cpus > 1) ? - ((mode == AGGRESSIVE) ? 5000 : 250) : 0; + +const MutexGlobals &GetMutexGlobals() { + ABSL_CONST_INIT static MutexGlobals data; + absl::base_internal::LowLevelCallOnce(&data.once, [&]() { + const int num_cpus = absl::base_internal::NumCPUs(); + data.spinloop_iterations = num_cpus > 1 ? 1500 : 0; + // If this a uniprocessor, only yield/sleep. Otherwise, if the mode is + // aggressive then spin many times before yielding. If the mode is + // gentle then spin only a few times before yielding. Aggressive spinning + // is used to ensure that an Unlock() call, which must get the spin lock + // for any thread to make progress gets it without undue delay. + if (num_cpus > 1) { + data.mutex_sleep_limit[AGGRESSIVE] = 5000; + data.mutex_sleep_limit[GENTLE] = 250; + } else { + data.mutex_sleep_limit[AGGRESSIVE] = 0; + data.mutex_sleep_limit[GENTLE] = 0; + } + }); + return data; +} +} // namespace + +namespace synchronization_internal { +// Returns the Mutex delay on iteration `c` depending on the given `mode`. +// The returned value should be used as `c` for the next call to `MutexDelay`. +int MutexDelay(int32_t c, int mode) { + const int32_t limit = GetMutexGlobals().mutex_sleep_limit[mode]; if (c < limit) { - c++; // spin + // Spin. + c++; } else { + SchedulingGuard::ScopedEnable enable_rescheduling; ABSL_TSAN_MUTEX_PRE_DIVERT(nullptr, 0); - if (c == limit) { // yield once - AbslInternalMutexYield(); + if (c == limit) { + // Yield once. + ABSL_INTERNAL_C_SYMBOL(AbslInternalMutexYield)(); c++; - } else { // then wait + } else { + // Then wait. absl::SleepFor(absl::Microseconds(10)); c = 0; } ABSL_TSAN_MUTEX_POST_DIVERT(nullptr, 0); } - return (c); + return c; } +} // namespace synchronization_internal // --------------------------Generic atomic ops // Ensure that "(*pv & bits) == bits" by doing an atomic update of "*pv" to @@ -489,7 +503,7 @@ struct SynchWaitParams { std::atomic<intptr_t> *cv_word; int64_t contention_start_cycles; // Time (in cycles) when this thread started - // to contend for the mutex. + // to contend for the mutex. }; struct SynchLocksHeld { @@ -545,7 +559,7 @@ static SynchLocksHeld *Synch_GetAllLocks() { } // Post on "w"'s associated PerThreadSem. -inline void Mutex::IncrementSynchSem(Mutex *mu, PerThreadSynch *w) { +void Mutex::IncrementSynchSem(Mutex *mu, PerThreadSynch *w) { if (mu) { ABSL_TSAN_MUTEX_PRE_DIVERT(mu, 0); } @@ -703,7 +717,7 @@ static constexpr bool kDebugMode = false; static constexpr bool kDebugMode = true; #endif -#ifdef THREAD_SANITIZER +#ifdef ABSL_INTERNAL_HAVE_TSAN_INTERFACE static unsigned TsanFlags(Mutex::MuHow how) { return how == kShared ? __tsan_mutex_read_lock : 0; } @@ -749,11 +763,13 @@ void SetMutexDeadlockDetectionMode(OnDeadlockCycle mode) { synch_deadlock_detection.store(mode, std::memory_order_release); } -// Return true iff threads x and y are waiting on the same condition for the -// same type of lock. Requires that x and y be waiting on the same Mutex -// queue. -static bool MuSameCondition(PerThreadSynch *x, PerThreadSynch *y) { - return x->waitp->how == y->waitp->how && +// Return true iff threads x and y are part of the same equivalence +// class of waiters. An equivalence class is defined as the set of +// waiters with the same condition, type of lock, and thread priority. +// +// Requires that x and y be waiting on the same Mutex queue. +static bool MuEquivalentWaiter(PerThreadSynch *x, PerThreadSynch *y) { + return x->waitp->how == y->waitp->how && x->priority == y->priority && Condition::GuaranteedEqual(x->waitp->cond, y->waitp->cond); } @@ -772,18 +788,19 @@ static inline PerThreadSynch *GetPerThreadSynch(intptr_t v) { // - invalid (iff x is not in a Mutex wait queue), // - null, or // - a pointer to a distinct thread waiting later in the same Mutex queue -// such that all threads in [x, x->skip] have the same condition and -// lock type (MuSameCondition() is true for all pairs in [x, x->skip]). +// such that all threads in [x, x->skip] have the same condition, priority +// and lock type (MuEquivalentWaiter() is true for all pairs in [x, +// x->skip]). // In addition, if x->skip is valid, (x->may_skip || x->skip == null) // -// By the spec of MuSameCondition(), it is not necessary when removing the +// By the spec of MuEquivalentWaiter(), it is not necessary when removing the // first runnable thread y from the front a Mutex queue to adjust the skip // field of another thread x because if x->skip==y, x->skip must (have) become // invalid before y is removed. The function TryRemove can remove a specified // thread from an arbitrary position in the queue whether runnable or not, so // it fixes up skip fields that would otherwise be left dangling. // The statement -// if (x->may_skip && MuSameCondition(x, x->next)) { x->skip = x->next; } +// if (x->may_skip && MuEquivalentWaiter(x, x->next)) { x->skip = x->next; } // maintains the invariant provided x is not the last waiter in a Mutex queue // The statement // if (x->skip != null) { x->skip = x->skip->skip; } @@ -917,24 +934,17 @@ static PerThreadSynch *Enqueue(PerThreadSynch *head, if (s->priority > head->priority) { // s's priority is above head's // try to put s in priority-fifo order, or failing that at the front. if (!head->maybe_unlocking) { - // No unlocker can be scanning the queue, so we can insert between - // skip-chains, and within a skip-chain if it has the same condition as - // s. We insert in priority-fifo order, examining the end of every - // skip-chain, plus every element with the same condition as s. + // No unlocker can be scanning the queue, so we can insert into the + // middle of the queue. + // + // Within a skip chain, all waiters have the same priority, so we can + // skip forward through the chains until we find one with a lower + // priority than the waiter to be enqueued. PerThreadSynch *advance_to = head; // next value of enqueue_after - PerThreadSynch *cur; // successor of enqueue_after do { enqueue_after = advance_to; - cur = enqueue_after->next; // this advance ensures progress - advance_to = Skip(cur); // normally, advance to end of skip chain - // (side-effect: optimizes skip chain) - if (advance_to != cur && s->priority > advance_to->priority && - MuSameCondition(s, cur)) { - // but this skip chain is not a singleton, s has higher priority - // than its tail and has the same condition as the chain, - // so we can insert within the skip-chain - advance_to = cur; // advance by just one - } + // (side-effect: optimizes skip chain) + advance_to = Skip(enqueue_after->next); } while (s->priority <= advance_to->priority); // termination guaranteed because s->priority > head->priority // and head is the end of a skip chain @@ -953,21 +963,21 @@ static PerThreadSynch *Enqueue(PerThreadSynch *head, // enqueue_after can be: head, Skip(...), or cur. // The first two imply enqueue_after->skip == nullptr, and - // the last is used only if MuSameCondition(s, cur). + // the last is used only if MuEquivalentWaiter(s, cur). // We require this because clearing enqueue_after->skip // is impossible; enqueue_after's predecessors might also // incorrectly skip over s if we were to allow other // insertion points. - ABSL_RAW_CHECK( - enqueue_after->skip == nullptr || MuSameCondition(enqueue_after, s), - "Mutex Enqueue failure"); + ABSL_RAW_CHECK(enqueue_after->skip == nullptr || + MuEquivalentWaiter(enqueue_after, s), + "Mutex Enqueue failure"); if (enqueue_after != head && enqueue_after->may_skip && - MuSameCondition(enqueue_after, enqueue_after->next)) { + MuEquivalentWaiter(enqueue_after, enqueue_after->next)) { // enqueue_after can skip to its new successor, s enqueue_after->skip = enqueue_after->next; } - if (MuSameCondition(s, s->next)) { // s->may_skip is known to be true + if (MuEquivalentWaiter(s, s->next)) { // s->may_skip is known to be true s->skip = s->next; // s may skip to its successor } } else { // enqueue not done any other way, so @@ -977,7 +987,7 @@ static PerThreadSynch *Enqueue(PerThreadSynch *head, head->next = s; s->readers = head->readers; // reader count is from previous head s->maybe_unlocking = head->maybe_unlocking; // same for unlock hint - if (head->may_skip && MuSameCondition(head, s)) { + if (head->may_skip && MuEquivalentWaiter(head, s)) { // head now has successor; may skip head->skip = s; } @@ -997,7 +1007,7 @@ static PerThreadSynch *Dequeue(PerThreadSynch *head, PerThreadSynch *pw) { pw->next = w->next; // snip w out of list if (head == w) { // we removed the head head = (pw == w) ? nullptr : pw; // either emptied list, or pw is new head - } else if (pw != head && MuSameCondition(pw, pw->next)) { + } else if (pw != head && MuEquivalentWaiter(pw, pw->next)) { // pw can skip to its new successor if (pw->next->skip != nullptr) { // either skip to its successors skip target @@ -1054,6 +1064,7 @@ static PerThreadSynch *DequeueAllWakeable(PerThreadSynch *head, // Try to remove thread s from the list of waiters on this mutex. // Does nothing if s is not on the waiter list. void Mutex::TryRemove(PerThreadSynch *s) { + SchedulingGuard::ScopedDisable disable_rescheduling; intptr_t v = mu_.load(std::memory_order_relaxed); // acquire spinlock & lock if ((v & (kMuWait | kMuSpin | kMuWriter | kMuReader)) == kMuWait && @@ -1066,11 +1077,13 @@ void Mutex::TryRemove(PerThreadSynch *s) { PerThreadSynch *w; if ((w = pw->next) != s) { // search for thread, do { // processing at least one element - if (!MuSameCondition(s, w)) { // seeking different condition + // If the current element isn't equivalent to the waiter to be + // removed, we can skip the entire chain. + if (!MuEquivalentWaiter(s, w)) { pw = Skip(w); // so skip all that won't match // we don't have to worry about dangling skip fields // in the threads we skipped; none can point to s - // because their condition differs from s + // because they are in a different equivalence class. } else { // seeking same condition FixSkip(w, s); // fix up any skip pointer from w to s pw = w; @@ -1118,7 +1131,7 @@ ABSL_XRAY_LOG_ARGS(1) void Mutex::Block(PerThreadSynch *s) { this->TryRemove(s); int c = 0; while (s->next != nullptr) { - c = Delay(c, GENTLE); + c = synchronization_internal::MutexDelay(c, GENTLE); this->TryRemove(s); } if (kDebugMode) { @@ -1361,7 +1374,9 @@ static GraphId DeadlockCheck(Mutex *mu) { len += static_cast<int>(strlen(&b->buf[len])); } } - ABSL_RAW_LOG(ERROR, "Acquiring %p Mutexes held: %s", + ABSL_RAW_LOG(ERROR, + "Acquiring absl::Mutex %p while holding %s; a cycle in the " + "historical lock ordering graph has been observed", static_cast<void *>(mu), b->buf); ABSL_RAW_LOG(ERROR, "Cycle: "); int path_len = deadlock_graph->FindPath( @@ -1437,7 +1452,7 @@ void Mutex::AssertNotHeld() const { // Attempt to acquire *mu, and return whether successful. The implementation // may spin for a short while if the lock cannot be acquired immediately. static bool TryAcquireWithSpinning(std::atomic<intptr_t>* mu) { - int c = mutex_globals.spinloop_iterations; + int c = GetMutexGlobals().spinloop_iterations; do { // do/while somewhat faster on AMD intptr_t v = mu->load(std::memory_order_relaxed); if ((v & (kMuReader|kMuEvent)) != 0) { @@ -1729,23 +1744,33 @@ ABSL_XRAY_LOG_ARGS(1) void Mutex::ReaderUnlock() { ABSL_TSAN_MUTEX_POST_UNLOCK(this, __tsan_mutex_read_lock); } -// The zap_desig_waker bitmask is used to clear the designated waker flag in -// the mutex if this thread has blocked, and therefore may be the designated -// waker. -static const intptr_t zap_desig_waker[] = { - ~static_cast<intptr_t>(0), // not blocked - ~static_cast<intptr_t>( - kMuDesig) // blocked; turn off the designated waker bit -}; +// Clears the designated waker flag in the mutex if this thread has blocked, and +// therefore may be the designated waker. +static intptr_t ClearDesignatedWakerMask(int flag) { + assert(flag >= 0); + assert(flag <= 1); + switch (flag) { + case 0: // not blocked + return ~static_cast<intptr_t>(0); + case 1: // blocked; turn off the designated waker bit + return ~static_cast<intptr_t>(kMuDesig); + } + ABSL_INTERNAL_UNREACHABLE; +} -// The ignore_waiting_writers bitmask is used to ignore the existence -// of waiting writers if a reader that has already blocked once -// wakes up. -static const intptr_t ignore_waiting_writers[] = { - ~static_cast<intptr_t>(0), // not blocked - ~static_cast<intptr_t>( - kMuWrWait) // blocked; pretend there are no waiting writers -}; +// Conditionally ignores the existence of waiting writers if a reader that has +// already blocked once wakes up. +static intptr_t IgnoreWaitingWritersMask(int flag) { + assert(flag >= 0); + assert(flag <= 1); + switch (flag) { + case 0: // not blocked + return ~static_cast<intptr_t>(0); + case 1: // blocked; pretend there are no waiting writers + return ~static_cast<intptr_t>(kMuWrWait); + } + ABSL_INTERNAL_UNREACHABLE; +} // Internal version of LockWhen(). See LockSlowWithDeadline() ABSL_ATTRIBUTE_NOINLINE void Mutex::LockSlow(MuHow how, const Condition *cond, @@ -1764,7 +1789,7 @@ static inline bool EvalConditionAnnotated(const Condition *cond, Mutex *mu, // All memory accesses are ignored inside of mutex operations + for unlock // operation tsan considers that we've already released the mutex. bool res = false; -#ifdef THREAD_SANITIZER +#ifdef ABSL_INTERNAL_HAVE_TSAN_INTERFACE const int flags = read_lock ? __tsan_mutex_read_lock : 0; const int tryflags = flags | (trylock ? __tsan_mutex_try_lock : 0); #endif @@ -1814,9 +1839,9 @@ static inline bool EvalConditionIgnored(Mutex *mu, const Condition *cond) { // So we "divert" (which un-ignores both memory accesses and synchronization) // and then separately turn on ignores of memory accesses. ABSL_TSAN_MUTEX_PRE_DIVERT(mu, 0); - ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + ABSL_ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); bool res = cond->Eval(); - ANNOTATE_IGNORE_READS_AND_WRITES_END(); + ABSL_ANNOTATE_IGNORE_READS_AND_WRITES_END(); ABSL_TSAN_MUTEX_POST_DIVERT(mu, 0); static_cast<void>(mu); // Prevent unused param warning in non-TSAN builds. return res; @@ -1837,8 +1862,10 @@ bool Mutex::LockSlowWithDeadline(MuHow how, const Condition *cond, bool unlock = false; if ((v & how->fast_need_zero) == 0 && // try fast acquire mu_.compare_exchange_strong( - v, (how->fast_or | (v & zap_desig_waker[flags & kMuHasBlocked])) + - how->fast_add, + v, + (how->fast_or | + (v & ClearDesignatedWakerMask(flags & kMuHasBlocked))) + + how->fast_add, std::memory_order_acquire, std::memory_order_relaxed)) { if (cond == nullptr || EvalConditionAnnotated(cond, this, true, false, how == kShared)) { @@ -1897,6 +1924,7 @@ static void CheckForMutexCorruption(intptr_t v, const char* label) { } void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) { + SchedulingGuard::ScopedDisable disable_rescheduling; int c = 0; intptr_t v = mu_.load(std::memory_order_relaxed); if ((v & kMuEvent) != 0) { @@ -1911,9 +1939,10 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) { CheckForMutexCorruption(v, "Lock"); if ((v & waitp->how->slow_need_zero) == 0) { if (mu_.compare_exchange_strong( - v, (waitp->how->fast_or | - (v & zap_desig_waker[flags & kMuHasBlocked])) + - waitp->how->fast_add, + v, + (waitp->how->fast_or | + (v & ClearDesignatedWakerMask(flags & kMuHasBlocked))) + + waitp->how->fast_add, std::memory_order_acquire, std::memory_order_relaxed)) { if (waitp->cond == nullptr || EvalConditionAnnotated(waitp->cond, this, true, false, @@ -1930,8 +1959,9 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) { if ((v & (kMuSpin|kMuWait)) == 0) { // no waiters // This thread tries to become the one and only waiter. PerThreadSynch *new_h = Enqueue(nullptr, waitp, v, flags); - intptr_t nv = (v & zap_desig_waker[flags & kMuHasBlocked] & kMuLow) | - kMuWait; + intptr_t nv = + (v & ClearDesignatedWakerMask(flags & kMuHasBlocked) & kMuLow) | + kMuWait; ABSL_RAW_CHECK(new_h != nullptr, "Enqueue to empty list failed"); if (waitp->how == kExclusive && (v & kMuReader) != 0) { nv |= kMuWrWait; @@ -1945,12 +1975,13 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) { waitp->thread->waitp = nullptr; } } else if ((v & waitp->how->slow_inc_need_zero & - ignore_waiting_writers[flags & kMuHasBlocked]) == 0) { + IgnoreWaitingWritersMask(flags & kMuHasBlocked)) == 0) { // This is a reader that needs to increment the reader count, // but the count is currently held in the last waiter. if (mu_.compare_exchange_strong( - v, (v & zap_desig_waker[flags & kMuHasBlocked]) | kMuSpin | - kMuReader, + v, + (v & ClearDesignatedWakerMask(flags & kMuHasBlocked)) | + kMuSpin | kMuReader, std::memory_order_acquire, std::memory_order_relaxed)) { PerThreadSynch *h = GetPerThreadSynch(v); h->readers += kMuOne; // inc reader count in waiter @@ -1971,8 +2002,9 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) { } } else if ((v & kMuSpin) == 0 && // attempt to queue ourselves mu_.compare_exchange_strong( - v, (v & zap_desig_waker[flags & kMuHasBlocked]) | kMuSpin | - kMuWait, + v, + (v & ClearDesignatedWakerMask(flags & kMuHasBlocked)) | + kMuSpin | kMuWait, std::memory_order_acquire, std::memory_order_relaxed)) { PerThreadSynch *h = GetPerThreadSynch(v); PerThreadSynch *new_h = Enqueue(h, waitp, v, flags); @@ -1998,7 +2030,8 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) { ABSL_RAW_CHECK( waitp->thread->waitp == nullptr || waitp->thread->suppress_fatal_errors, "detected illegal recursion into Mutex code"); - c = Delay(c, GENTLE); // delay, then try again + // delay, then try again + c = synchronization_internal::MutexDelay(c, GENTLE); } ABSL_RAW_CHECK( waitp->thread->waitp == nullptr || waitp->thread->suppress_fatal_errors, @@ -2016,6 +2049,7 @@ void Mutex::LockSlowLoop(SynchWaitParams *waitp, int flags) { // or it is in the process of blocking on a condition variable; it must requeue // itself on the mutex/condvar to wait for its condition to become true. ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams *waitp) { + SchedulingGuard::ScopedDisable disable_rescheduling; intptr_t v = mu_.load(std::memory_order_relaxed); this->AssertReaderHeld(); CheckForMutexCorruption(v, "Unlock"); @@ -2132,7 +2166,7 @@ ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams *waitp) { !old_h->may_skip) { // we used old_h as a terminator old_h->may_skip = true; // allow old_h to skip once more ABSL_RAW_CHECK(old_h->skip == nullptr, "illegal skip from head"); - if (h != old_h && MuSameCondition(old_h, old_h->next)) { + if (h != old_h && MuEquivalentWaiter(old_h, old_h->next)) { old_h->skip = old_h->next; // old_h not head & can skip to successor } } @@ -2292,22 +2326,26 @@ ABSL_ATTRIBUTE_NOINLINE void Mutex::UnlockSlow(SynchWaitParams *waitp) { mu_.store(nv, std::memory_order_release); break; // out of for(;;)-loop } - c = Delay(c, AGGRESSIVE); // aggressive here; no one can proceed till we do + // aggressive here; no one can proceed till we do + c = synchronization_internal::MutexDelay(c, AGGRESSIVE); } // end of for(;;)-loop if (wake_list != kPerThreadSynchNull) { - int64_t enqueue_timestamp = wake_list->waitp->contention_start_cycles; - bool cond_waiter = wake_list->cond_waiter; + int64_t wait_cycles = 0; + int64_t now = base_internal::CycleClock::Now(); do { + // Sample lock contention events only if the waiter was trying to acquire + // the lock, not waiting on a condition variable or Condition. + if (!wake_list->cond_waiter) { + wait_cycles += (now - wake_list->waitp->contention_start_cycles); + wake_list->waitp->contention_start_cycles = now; + } wake_list = Wakeup(wake_list); // wake waiters } while (wake_list != kPerThreadSynchNull); - if (!cond_waiter) { - // Sample lock contention events only if the (first) waiter was trying to - // acquire the lock, not waiting on a condition variable or Condition. - int64_t wait_cycles = base_internal::CycleClock::Now() - enqueue_timestamp; + if (wait_cycles > 0) { mutex_tracer("slow release", this, wait_cycles); ABSL_TSAN_MUTEX_PRE_DIVERT(this, 0); - submit_profile_data(enqueue_timestamp); + submit_profile_data(wait_cycles); ABSL_TSAN_MUTEX_POST_DIVERT(this, 0); } } @@ -2331,6 +2369,7 @@ void Mutex::Trans(MuHow how) { // It will later acquire the mutex with high probability. Otherwise, we // enqueue thread w on this mutex. void Mutex::Fer(PerThreadSynch *w) { + SchedulingGuard::ScopedDisable disable_rescheduling; int c = 0; ABSL_RAW_CHECK(w->waitp->cond == nullptr, "Mutex::Fer while waiting on Condition"); @@ -2380,7 +2419,7 @@ void Mutex::Fer(PerThreadSynch *w) { return; } } - c = Delay(c, GENTLE); + c = synchronization_internal::MutexDelay(c, GENTLE); } } @@ -2429,6 +2468,7 @@ CondVar::~CondVar() { // Remove thread s from the list of waiters on this condition variable. void CondVar::Remove(PerThreadSynch *s) { + SchedulingGuard::ScopedDisable disable_rescheduling; intptr_t v; int c = 0; for (v = cv_.load(std::memory_order_relaxed);; @@ -2457,7 +2497,8 @@ void CondVar::Remove(PerThreadSynch *s) { std::memory_order_release); return; } else { - c = Delay(c, GENTLE); // try again after a delay + // try again after a delay + c = synchronization_internal::MutexDelay(c, GENTLE); } } } @@ -2469,9 +2510,9 @@ void CondVar::Remove(PerThreadSynch *s) { // before calling Mutex::UnlockSlow(), the Mutex code might be re-entered (via // the logging code, or via a Condition function) and might potentially attempt // to block this thread. That would be a problem if the thread were already on -// a the condition variable waiter queue. Thus, we use the waitp->cv_word -// to tell the unlock code to call CondVarEnqueue() to queue the thread on the -// condition variable queue just before the mutex is to be unlocked, and (most +// a condition variable waiter queue. Thus, we use the waitp->cv_word to tell +// the unlock code to call CondVarEnqueue() to queue the thread on the condition +// variable queue just before the mutex is to be unlocked, and (most // importantly) after any call to an external routine that might re-enter the // mutex code. static void CondVarEnqueue(SynchWaitParams *waitp) { @@ -2490,7 +2531,7 @@ static void CondVarEnqueue(SynchWaitParams *waitp) { !cv_word->compare_exchange_weak(v, v | kCvSpin, std::memory_order_acquire, std::memory_order_relaxed)) { - c = Delay(c, GENTLE); + c = synchronization_internal::MutexDelay(c, GENTLE); v = cv_word->load(std::memory_order_relaxed); } ABSL_RAW_CHECK(waitp->thread->waitp == nullptr, "waiting when shouldn't be"); @@ -2534,6 +2575,23 @@ bool CondVar::WaitCommon(Mutex *mutex, KernelTimeout t) { while (waitp.thread->state.load(std::memory_order_acquire) == PerThreadSynch::kQueued) { if (!Mutex::DecrementSynchSem(mutex, waitp.thread, t)) { + // DecrementSynchSem returned due to timeout. + // Now we will either (1) remove ourselves from the wait list in Remove + // below, in which case Remove will set thread.state = kAvailable and + // we will not call DecrementSynchSem again; or (2) Signal/SignalAll + // has removed us concurrently and is calling Wakeup, which will set + // thread.state = kAvailable and post to the semaphore. + // It's important to reset the timeout for the case (2) because otherwise + // we can live-lock in this loop since DecrementSynchSem will always + // return immediately due to timeout, but Signal/SignalAll is not + // necessary set thread.state = kAvailable yet (and is not scheduled + // due to thread priorities or other scheduler artifacts). + // Note this could also be resolved if Signal/SignalAll would set + // thread.state = kAvailable while holding the wait list spin lock. + // But this can't be easily done for SignalAll since it grabs the whole + // wait list with a single compare-exchange and does not really grab + // the spin lock. + t = KernelTimeout::Never(); this->Remove(waitp.thread); rc = true; } @@ -2589,6 +2647,7 @@ void CondVar::Wakeup(PerThreadSynch *w) { } void CondVar::Signal() { + SchedulingGuard::ScopedDisable disable_rescheduling; ABSL_TSAN_MUTEX_PRE_SIGNAL(nullptr, 0); intptr_t v; int c = 0; @@ -2621,7 +2680,7 @@ void CondVar::Signal() { ABSL_TSAN_MUTEX_POST_SIGNAL(nullptr, 0); return; } else { - c = Delay(c, GENTLE); + c = synchronization_internal::MutexDelay(c, GENTLE); } } ABSL_TSAN_MUTEX_POST_SIGNAL(nullptr, 0); @@ -2658,7 +2717,8 @@ void CondVar::SignalAll () { ABSL_TSAN_MUTEX_POST_SIGNAL(nullptr, 0); return; } else { - c = Delay(c, GENTLE); // try again after a delay + // try again after a delay + c = synchronization_internal::MutexDelay(c, GENTLE); } } ABSL_TSAN_MUTEX_POST_SIGNAL(nullptr, 0); @@ -2671,7 +2731,7 @@ void ReleasableMutexLock::Release() { this->mu_ = nullptr; } -#ifdef THREAD_SANITIZER +#ifdef ABSL_HAVE_THREAD_SANITIZER extern "C" void __tsan_read1(void *addr); #else #define __tsan_read1(addr) // do nothing if TSan not enabled diff --git a/absl/synchronization/mutex.h b/absl/synchronization/mutex.h index 876698ca..b69b7089 100644 --- a/absl/synchronization/mutex.h +++ b/absl/synchronization/mutex.h @@ -31,22 +31,23 @@ // // MutexLock - An RAII wrapper to acquire and release a `Mutex` for exclusive/ // write access within the current scope. +// // ReaderMutexLock // - An RAII wrapper to acquire and release a `Mutex` for shared/read // access within the current scope. // // WriterMutexLock -// - Alias for `MutexLock` above, designed for use in distinguishing -// reader and writer locks within code. +// - Effectively an alias for `MutexLock` above, designed for use in +// distinguishing reader and writer locks within code. // // In addition to simple mutex locks, this file also defines ways to perform // locking under certain conditions. // -// Condition - (Preferred) Used to wait for a particular predicate that -// depends on state protected by the `Mutex` to become true. -// CondVar - A lower-level variant of `Condition` that relies on -// application code to explicitly signal the `CondVar` when -// a condition has been met. +// Condition - (Preferred) Used to wait for a particular predicate that +// depends on state protected by the `Mutex` to become true. +// CondVar - A lower-level variant of `Condition` that relies on +// application code to explicitly signal the `CondVar` when +// a condition has been met. // // See below for more information on using `Condition` or `CondVar`. // @@ -72,15 +73,6 @@ #include "absl/synchronization/internal/per_thread_sem.h" #include "absl/time/time.h" -// Decide if we should use the non-production implementation because -// the production implementation hasn't been fully ported yet. -#ifdef ABSL_INTERNAL_USE_NONPROD_MUTEX -#error ABSL_INTERNAL_USE_NONPROD_MUTEX cannot be directly set -#elif defined(ABSL_LOW_LEVEL_ALLOC_MISSING) -#define ABSL_INTERNAL_USE_NONPROD_MUTEX 1 -#include "absl/synchronization/internal/mutex_nonprod.inc" -#endif - namespace absl { ABSL_NAMESPACE_BEGIN @@ -155,7 +147,7 @@ class ABSL_LOCKABLE Mutex { // // Example usage: // namespace foo { - // ABSL_CONST_INIT Mutex mu(absl::kConstInit); + // ABSL_CONST_INIT absl::Mutex mu(absl::kConstInit); // } explicit constexpr Mutex(absl::ConstInitType); @@ -170,7 +162,7 @@ class ABSL_LOCKABLE Mutex { // Mutex::Unlock() // // Releases this `Mutex` and returns it from the exclusive/write state to the - // free state. Caller must hold the `Mutex` exclusively. + // free state. Calling thread must hold the `Mutex` exclusively. void Unlock() ABSL_UNLOCK_FUNCTION(); // Mutex::TryLock() @@ -182,9 +174,12 @@ class ABSL_LOCKABLE Mutex { // Mutex::AssertHeld() // - // Return immediately if this thread holds the `Mutex` exclusively (in write - // mode). Otherwise, may report an error (typically by crashing with a - // diagnostic), or may return immediately. + // Require that the mutex be held exclusively (write mode) by this thread. + // + // If the mutex is not currently held by this thread, this function may report + // an error (typically by crashing with a diagnostic) or it may do nothing. + // This function is intended only as a tool to assist debugging; it doesn't + // guarantee correctness. void AssertHeld() const ABSL_ASSERT_EXCLUSIVE_LOCK(); // --------------------------------------------------------------------------- @@ -244,9 +239,13 @@ class ABSL_LOCKABLE Mutex { // Mutex::AssertReaderHeld() // - // Returns immediately if this thread holds the `Mutex` in at least shared - // mode (read mode). Otherwise, may report an error (typically by - // crashing with a diagnostic), or may return immediately. + // Require that the mutex be held at least in shared mode (read mode) by this + // thread. + // + // If the mutex is not currently held by this thread, this function may report + // an error (typically by crashing with a diagnostic) or it may do nothing. + // This function is intended only as a tool to assist debugging; it doesn't + // guarantee correctness. void AssertReaderHeld() const ABSL_ASSERT_SHARED_LOCK(); // Mutex::WriterLock() @@ -461,24 +460,13 @@ class ABSL_LOCKABLE Mutex { static void InternalAttemptToUseMutexInFatalSignalHandler(); private: -#ifdef ABSL_INTERNAL_USE_NONPROD_MUTEX - friend class CondVar; - - synchronization_internal::MutexImpl *impl() { return impl_.get(); } - - synchronization_internal::SynchronizationStorage< - synchronization_internal::MutexImpl> - impl_; -#else std::atomic<intptr_t> mu_; // The Mutex state. // Post()/Wait() versus associated PerThreadSem; in class for required // friendship with PerThreadSem. - static inline void IncrementSynchSem(Mutex *mu, - base_internal::PerThreadSynch *w); - static inline bool DecrementSynchSem( - Mutex *mu, base_internal::PerThreadSynch *w, - synchronization_internal::KernelTimeout t); + static void IncrementSynchSem(Mutex *mu, base_internal::PerThreadSynch *w); + static bool DecrementSynchSem(Mutex *mu, base_internal::PerThreadSynch *w, + synchronization_internal::KernelTimeout t); // slow path acquire void LockSlowLoop(SynchWaitParams *waitp, int flags); @@ -504,7 +492,6 @@ class ABSL_LOCKABLE Mutex { void Trans(MuHow how); // used for CondVar->Mutex transfer void Fer( base_internal::PerThreadSynch *w); // used for CondVar->Mutex transfer -#endif // Catch the error of writing Mutex when intending MutexLock. Mutex(const volatile Mutex * /*ignored*/) {} // NOLINT(runtime/explicit) @@ -525,22 +512,36 @@ class ABSL_LOCKABLE Mutex { // Example: // // Class Foo { -// +// public: // Foo::Bar* Baz() { -// MutexLock l(&lock_); +// MutexLock lock(&mu_); // ... // return bar; // } // // private: -// Mutex lock_; +// Mutex mu_; // }; class ABSL_SCOPED_LOCKABLE MutexLock { public: + // Constructors + + // Calls `mu->Lock()` and returns when that call returns. That is, `*mu` is + // guaranteed to be locked when this object is constructed. Requires that + // `mu` be dereferenceable. explicit MutexLock(Mutex *mu) ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) : mu_(mu) { this->mu_->Lock(); } + // Like above, but calls `mu->LockWhen(cond)` instead. That is, in addition to + // the above, the condition given by `cond` is also guaranteed to hold when + // this object is constructed. + explicit MutexLock(Mutex *mu, const Condition &cond) + ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) + : mu_(mu) { + this->mu_->LockWhen(cond); + } + MutexLock(const MutexLock &) = delete; // NOLINT(runtime/mutex) MutexLock(MutexLock&&) = delete; // NOLINT(runtime/mutex) MutexLock& operator=(const MutexLock&) = delete; @@ -562,6 +563,12 @@ class ABSL_SCOPED_LOCKABLE ReaderMutexLock { mu->ReaderLock(); } + explicit ReaderMutexLock(Mutex *mu, const Condition &cond) + ABSL_SHARED_LOCK_FUNCTION(mu) + : mu_(mu) { + mu->ReaderLockWhen(cond); + } + ReaderMutexLock(const ReaderMutexLock&) = delete; ReaderMutexLock(ReaderMutexLock&&) = delete; ReaderMutexLock& operator=(const ReaderMutexLock&) = delete; @@ -584,6 +591,12 @@ class ABSL_SCOPED_LOCKABLE WriterMutexLock { mu->WriterLock(); } + explicit WriterMutexLock(Mutex *mu, const Condition &cond) + ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) + : mu_(mu) { + mu->WriterLockWhen(cond); + } + WriterMutexLock(const WriterMutexLock&) = delete; WriterMutexLock(WriterMutexLock&&) = delete; WriterMutexLock& operator=(const WriterMutexLock&) = delete; @@ -622,16 +635,26 @@ class ABSL_SCOPED_LOCKABLE WriterMutexLock { // `noexcept`; until then this requirement cannot be enforced in the // type system.) // -// Note: to use a `Condition`, you need only construct it and pass it within the -// appropriate `Mutex' member function, such as `Mutex::Await()`. +// Note: to use a `Condition`, you need only construct it and pass it to a +// suitable `Mutex' member function, such as `Mutex::Await()`, or to the +// constructor of one of the scope guard classes. // -// Example: +// Example using LockWhen/Unlock: // // // assume count_ is not internal reference count // int count_ ABSL_GUARDED_BY(mu_); +// Condition count_is_zero(+[](int *count) { return *count == 0; }, &count_); // -// mu_.LockWhen(Condition(+[](int* count) { return *count == 0; }, -// &count_)); +// mu_.LockWhen(count_is_zero); +// // ... +// mu_.Unlock(); +// +// Example using a scope guard: +// +// { +// MutexLock lock(&mu_, count_is_zero); +// // ... +// } // // When multiple threads are waiting on exactly the same condition, make sure // that they are constructed with the same parameters (same pointer to function @@ -685,6 +708,11 @@ class Condition { // return processed_ >= current; // }; // mu_.Await(Condition(&reached)); + // + // NOTE: never use "mu_.AssertHeld()" instead of "mu_.AssertReaderHeld()" in + // the lambda as it may be called when the mutex is being unlocked from a + // scope holding only a reader lock, which will make the assertion not + // fulfilled and crash the binary. // See class comment for performance advice. In particular, if there // might be more than one waiter for the same condition, make sure @@ -757,9 +785,9 @@ class Condition { // // Usage to wake T is: // mu.Lock(); -// // process data, possibly establishing C -// if (C) { cv->Signal(); } -// mu.Unlock(); +// // process data, possibly establishing C +// if (C) { cv->Signal(); } +// mu.Unlock(); // // If C may be useful to more than one waiter, use `SignalAll()` instead of // `Signal()`. @@ -833,17 +861,10 @@ class CondVar { void EnableDebugLog(const char *name); private: -#ifdef ABSL_INTERNAL_USE_NONPROD_MUTEX - synchronization_internal::CondVarImpl *impl() { return impl_.get(); } - synchronization_internal::SynchronizationStorage< - synchronization_internal::CondVarImpl> - impl_; -#else bool WaitCommon(Mutex *mutex, synchronization_internal::KernelTimeout t); void Remove(base_internal::PerThreadSynch *s); void Wakeup(base_internal::PerThreadSynch *w); std::atomic<intptr_t> cv_; // Condition variable state. -#endif CondVar(const CondVar&) = delete; CondVar& operator=(const CondVar&) = delete; }; @@ -865,6 +886,15 @@ class ABSL_SCOPED_LOCKABLE MutexLockMaybe { this->mu_->Lock(); } } + + explicit MutexLockMaybe(Mutex *mu, const Condition &cond) + ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) + : mu_(mu) { + if (this->mu_ != nullptr) { + this->mu_->LockWhen(cond); + } + } + ~MutexLockMaybe() ABSL_UNLOCK_FUNCTION() { if (this->mu_ != nullptr) { this->mu_->Unlock(); } } @@ -887,6 +917,13 @@ class ABSL_SCOPED_LOCKABLE ReleasableMutexLock { : mu_(mu) { this->mu_->Lock(); } + + explicit ReleasableMutexLock(Mutex *mu, const Condition &cond) + ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) + : mu_(mu) { + this->mu_->LockWhen(cond); + } + ~ReleasableMutexLock() ABSL_UNLOCK_FUNCTION() { if (this->mu_ != nullptr) { this->mu_->Unlock(); } } @@ -901,12 +938,6 @@ class ABSL_SCOPED_LOCKABLE ReleasableMutexLock { ReleasableMutexLock& operator=(ReleasableMutexLock&&) = delete; }; -#ifdef ABSL_INTERNAL_USE_NONPROD_MUTEX - -inline constexpr Mutex::Mutex(absl::ConstInitType) : impl_(absl::kConstInit) {} - -#else - inline Mutex::Mutex() : mu_(0) { ABSL_TSAN_MUTEX_CREATE(this, __tsan_mutex_not_static); } @@ -915,8 +946,6 @@ inline constexpr Mutex::Mutex(absl::ConstInitType) : mu_(0) {} inline CondVar::CondVar() : cv_(0) {} -#endif // ABSL_INTERNAL_USE_NONPROD_MUTEX - // static template <typename T> bool Condition::CastAndCallMethod(const Condition *c) { @@ -962,14 +991,15 @@ inline Condition::Condition(const T *object, // Register a hook for profiling support. // // The function pointer registered here will be called whenever a mutex is -// contended. The callback is given the absl/base/cycleclock.h timestamp when -// waiting began. +// contended. The callback is given the cycles for which waiting happened (as +// measured by //absl/base/internal/cycleclock.h, and which may not +// be real "cycle" counts.) // // Calls to this function do not race or block, but there is no ordering // guaranteed between calls to this function and call to the provided hook. // In particular, the previously registered hook may still be called for some // time after this function returns. -void RegisterMutexProfiler(void (*fn)(int64_t wait_timestamp)); +void RegisterMutexProfiler(void (*fn)(int64_t wait_cycles)); // Register a hook for Mutex tracing. // @@ -983,7 +1013,7 @@ void RegisterMutexProfiler(void (*fn)(int64_t wait_timestamp)); // // This has the same memory ordering concerns as RegisterMutexProfiler() above. void RegisterMutexTracer(void (*fn)(const char *msg, const void *obj, - int64_t wait_cycles)); + int64_t wait_cycles)); // TODO(gfalcon): Combine RegisterMutexProfiler() and RegisterMutexTracer() // into a single interface, since they are only ever called in pairs. @@ -1054,7 +1084,7 @@ ABSL_NAMESPACE_END // By changing our extension points to be extern "C", we dodge this // check. extern "C" { -void AbslInternalMutexYield(); +void ABSL_INTERNAL_C_SYMBOL(AbslInternalMutexYield)(); } // extern "C" #endif // ABSL_SYNCHRONIZATION_MUTEX_H_ diff --git a/absl/synchronization/mutex_benchmark.cc b/absl/synchronization/mutex_benchmark.cc index ab188001..b5d2fbc4 100644 --- a/absl/synchronization/mutex_benchmark.cc +++ b/absl/synchronization/mutex_benchmark.cc @@ -16,6 +16,7 @@ #include <mutex> // NOLINT(build/c++11) #include <vector> +#include "absl/base/config.h" #include "absl/base/internal/cycleclock.h" #include "absl/base/internal/spinlock.h" #include "absl/synchronization/blocking_counter.h" @@ -60,8 +61,124 @@ class RaiiLocker<std::mutex> { std::mutex* mu_; }; +// RAII object to change the Mutex priority of the running thread. +class ScopedThreadMutexPriority { + public: + explicit ScopedThreadMutexPriority(int priority) { + absl::base_internal::ThreadIdentity* identity = + absl::synchronization_internal::GetOrCreateCurrentThreadIdentity(); + identity->per_thread_synch.priority = priority; + // Bump next_priority_read_cycles to the infinite future so that the + // implementation doesn't re-read the thread's actual scheduler priority + // and replace our temporary scoped priority. + identity->per_thread_synch.next_priority_read_cycles = + std::numeric_limits<int64_t>::max(); + } + ~ScopedThreadMutexPriority() { + // Reset the "next priority read time" back to the infinite past so that + // the next time the Mutex implementation wants to know this thread's + // priority, it re-reads it from the OS instead of using our overridden + // priority. + absl::synchronization_internal::GetOrCreateCurrentThreadIdentity() + ->per_thread_synch.next_priority_read_cycles = + std::numeric_limits<int64_t>::min(); + } +}; + +void BM_MutexEnqueue(benchmark::State& state) { + // In the "multiple priorities" variant of the benchmark, one of the + // threads runs with Mutex priority 0 while the rest run at elevated priority. + // This benchmarks the performance impact of the presence of a low priority + // waiter when a higher priority waiter adds itself of the queue + // (b/175224064). + // + // NOTE: The actual scheduler priority is not modified in this benchmark: + // all of the threads get CPU slices with the same priority. Only the + // Mutex queueing behavior is modified. + const bool multiple_priorities = state.range(0); + ScopedThreadMutexPriority priority_setter( + (multiple_priorities && state.thread_index() != 0) ? 1 : 0); + + struct Shared { + absl::Mutex mu; + std::atomic<int> looping_threads{0}; + std::atomic<int> blocked_threads{0}; + std::atomic<bool> thread_has_mutex{false}; + }; + static Shared* shared = new Shared; + + // Set up 'blocked_threads' to count how many threads are currently blocked + // in Abseil synchronization code. + // + // NOTE: Blocking done within the Google Benchmark library itself (e.g. + // the barrier which synchronizes threads entering and exiting the benchmark + // loop) does _not_ get registered in this counter. This is because Google + // Benchmark uses its own synchronization primitives based on std::mutex, not + // Abseil synchronization primitives. If at some point the benchmark library + // merges into Abseil, this code may break. + absl::synchronization_internal::PerThreadSem::SetThreadBlockedCounter( + &shared->blocked_threads); + + // The benchmark framework may run several iterations in the same process, + // reusing the same static-initialized 'shared' object. Given the semantics + // of the members, here, we expect everything to be reset to zero by the + // end of any iteration. Assert that's the case, just to be sure. + ABSL_RAW_CHECK( + shared->looping_threads.load(std::memory_order_relaxed) == 0 && + shared->blocked_threads.load(std::memory_order_relaxed) == 0 && + !shared->thread_has_mutex.load(std::memory_order_relaxed), + "Shared state isn't zeroed at start of benchmark iteration"); + + static constexpr int kBatchSize = 1000; + while (state.KeepRunningBatch(kBatchSize)) { + shared->looping_threads.fetch_add(1); + for (int i = 0; i < kBatchSize; i++) { + { + absl::MutexLock l(&shared->mu); + shared->thread_has_mutex.store(true, std::memory_order_relaxed); + // Spin until all other threads are either out of the benchmark loop + // or blocked on the mutex. This ensures that the mutex queue is kept + // at its maximal length to benchmark the performance of queueing on + // a highly contended mutex. + while (shared->looping_threads.load(std::memory_order_relaxed) - + shared->blocked_threads.load(std::memory_order_relaxed) != + 1) { + } + shared->thread_has_mutex.store(false); + } + // Spin until some other thread has acquired the mutex before we block + // again. This ensures that we always go through the slow (queueing) + // acquisition path rather than reacquiring the mutex we just released. + while (!shared->thread_has_mutex.load(std::memory_order_relaxed) && + shared->looping_threads.load(std::memory_order_relaxed) > 1) { + } + } + // The benchmark framework uses a barrier to ensure that all of the threads + // complete their benchmark loop together before any of the threads exit + // the loop. So, we need to remove ourselves from the "looping threads" + // counter here before potentially blocking on that barrier. Otherwise, + // another thread spinning above might wait forever for this thread to + // block on the mutex while we in fact are waiting to exit. + shared->looping_threads.fetch_add(-1); + } + absl::synchronization_internal::PerThreadSem::SetThreadBlockedCounter( + nullptr); +} + +BENCHMARK(BM_MutexEnqueue) + ->Threads(4) + ->Threads(64) + ->Threads(128) + ->Threads(512) + ->ArgName("multiple_priorities") + ->Arg(false) + ->Arg(true); + template <typename MutexType> void BM_Contended(benchmark::State& state) { + int priority = state.thread_index() % state.range(1); + ScopedThreadMutexPriority priority_setter(priority); + struct Shared { MutexType mu; int data = 0; @@ -79,86 +196,56 @@ void BM_Contended(benchmark::State& state) { // To achieve this amount of local work is multiplied by number of threads // to keep ratio between local work and critical section approximately // equal regardless of number of threads. - DelayNs(100 * state.threads, &local); + DelayNs(100 * state.threads(), &local); RaiiLocker<MutexType> locker(&shared->mu); DelayNs(state.range(0), &shared->data); } } +void SetupBenchmarkArgs(benchmark::internal::Benchmark* bm, + bool do_test_priorities) { + const int max_num_priorities = do_test_priorities ? 2 : 1; + bm->UseRealTime() + // ThreadPerCpu poorly handles non-power-of-two CPU counts. + ->Threads(1) + ->Threads(2) + ->Threads(4) + ->Threads(6) + ->Threads(8) + ->Threads(12) + ->Threads(16) + ->Threads(24) + ->Threads(32) + ->Threads(48) + ->Threads(64) + ->Threads(96) + ->Threads(128) + ->Threads(192) + ->Threads(256) + ->ArgNames({"cs_ns", "num_prios"}); + // Some empirically chosen amounts of work in critical section. + // 1 is low contention, 2000 is high contention and few values in between. + for (int critical_section_ns : {1, 20, 50, 200, 2000}) { + for (int num_priorities = 1; num_priorities <= max_num_priorities; + num_priorities++) { + bm->ArgPair(critical_section_ns, num_priorities); + } + } +} BENCHMARK_TEMPLATE(BM_Contended, absl::Mutex) - ->UseRealTime() - // ThreadPerCpu poorly handles non-power-of-two CPU counts. - ->Threads(1) - ->Threads(2) - ->Threads(4) - ->Threads(6) - ->Threads(8) - ->Threads(12) - ->Threads(16) - ->Threads(24) - ->Threads(32) - ->Threads(48) - ->Threads(64) - ->Threads(96) - ->Threads(128) - ->Threads(192) - ->Threads(256) - // Some empirically chosen amounts of work in critical section. - // 1 is low contention, 200 is high contention and few values in between. - ->Arg(1) - ->Arg(20) - ->Arg(50) - ->Arg(200); + ->Apply([](benchmark::internal::Benchmark* bm) { + SetupBenchmarkArgs(bm, /*do_test_priorities=*/true); + }); BENCHMARK_TEMPLATE(BM_Contended, absl::base_internal::SpinLock) - ->UseRealTime() - // ThreadPerCpu poorly handles non-power-of-two CPU counts. - ->Threads(1) - ->Threads(2) - ->Threads(4) - ->Threads(6) - ->Threads(8) - ->Threads(12) - ->Threads(16) - ->Threads(24) - ->Threads(32) - ->Threads(48) - ->Threads(64) - ->Threads(96) - ->Threads(128) - ->Threads(192) - ->Threads(256) - // Some empirically chosen amounts of work in critical section. - // 1 is low contention, 200 is high contention and few values in between. - ->Arg(1) - ->Arg(20) - ->Arg(50) - ->Arg(200); + ->Apply([](benchmark::internal::Benchmark* bm) { + SetupBenchmarkArgs(bm, /*do_test_priorities=*/false); + }); BENCHMARK_TEMPLATE(BM_Contended, std::mutex) - ->UseRealTime() - // ThreadPerCpu poorly handles non-power-of-two CPU counts. - ->Threads(1) - ->Threads(2) - ->Threads(4) - ->Threads(6) - ->Threads(8) - ->Threads(12) - ->Threads(16) - ->Threads(24) - ->Threads(32) - ->Threads(48) - ->Threads(64) - ->Threads(96) - ->Threads(128) - ->Threads(192) - ->Threads(256) - // Some empirically chosen amounts of work in critical section. - // 1 is low contention, 200 is high contention and few values in between. - ->Arg(1) - ->Arg(20) - ->Arg(50) - ->Arg(200); + ->Apply([](benchmark::internal::Benchmark* bm) { + SetupBenchmarkArgs(bm, /*do_test_priorities=*/false); + }); // Measure the overhead of conditions on mutex release (when they must be // evaluated). Mutex has (some) support for equivalence classes allowing @@ -213,7 +300,7 @@ void BM_ConditionWaiters(benchmark::State& state) { } // Some configurations have higher thread limits than others. -#if defined(__linux__) && !defined(THREAD_SANITIZER) +#if defined(__linux__) && !defined(ABSL_HAVE_THREAD_SANITIZER) constexpr int kMaxConditionWaiters = 8192; #else constexpr int kMaxConditionWaiters = 1024; diff --git a/absl/synchronization/mutex_test.cc b/absl/synchronization/mutex_test.cc index afb363af..99bb0175 100644 --- a/absl/synchronization/mutex_test.cc +++ b/absl/synchronization/mutex_test.cc @@ -26,10 +26,12 @@ #include <random> #include <string> #include <thread> // NOLINT(build/c++11) +#include <type_traits> #include <vector> #include "gtest/gtest.h" #include "absl/base/attributes.h" +#include "absl/base/config.h" #include "absl/base/internal/raw_logging.h" #include "absl/base/internal/sysinfo.h" #include "absl/memory/memory.h" @@ -706,6 +708,40 @@ TEST(Mutex, LockWhen) { t.join(); } +TEST(Mutex, LockWhenGuard) { + absl::Mutex mu; + int n = 30; + bool done = false; + + // We don't inline the lambda because the conversion is ambiguous in MSVC. + bool (*cond_eq_10)(int *) = [](int *p) { return *p == 10; }; + bool (*cond_lt_10)(int *) = [](int *p) { return *p < 10; }; + + std::thread t1([&mu, &n, &done, cond_eq_10]() { + absl::ReaderMutexLock lock(&mu, absl::Condition(cond_eq_10, &n)); + done = true; + }); + + std::thread t2[10]; + for (std::thread &t : t2) { + t = std::thread([&mu, &n, cond_lt_10]() { + absl::WriterMutexLock lock(&mu, absl::Condition(cond_lt_10, &n)); + ++n; + }); + } + + { + absl::MutexLock lock(&mu); + n = 0; + } + + for (std::thread &t : t2) t.join(); + t1.join(); + + EXPECT_TRUE(done); + EXPECT_EQ(n, 10); +} + // -------------------------------------------------------- // The following test requires Mutex::ReaderLock to be a real shared // lock, which is not the case in all builds. @@ -815,9 +851,9 @@ TEST(Mutex, MutexReaderDecrementBug) ABSL_NO_THREAD_SAFETY_ANALYSIS { // Test that we correctly handle the situation when a lock is // held and then destroyed (w/o unlocking). -#ifdef THREAD_SANITIZER +#ifdef ABSL_HAVE_THREAD_SANITIZER // TSAN reports errors when locked Mutexes are destroyed. -TEST(Mutex, DISABLED_LockedMutexDestructionBug) NO_THREAD_SAFETY_ANALYSIS { +TEST(Mutex, DISABLED_LockedMutexDestructionBug) ABSL_NO_THREAD_SAFETY_ANALYSIS { #else TEST(Mutex, LockedMutexDestructionBug) ABSL_NO_THREAD_SAFETY_ANALYSIS { #endif @@ -835,33 +871,6 @@ TEST(Mutex, LockedMutexDestructionBug) ABSL_NO_THREAD_SAFETY_ANALYSIS { } } -// -------------------------------------------------------- -// Test for bug with pattern of readers using a condvar. The bug was that if a -// reader went to sleep on a condition variable while one or more other readers -// held the lock, but there were no waiters, the reader count (held in the -// mutex word) would be lost. (This is because Enqueue() had at one time -// always placed the thread on the Mutex queue. Later (CL 4075610), to -// tolerate re-entry into Mutex from a Condition predicate, Enqueue() was -// changed so that it could also place a thread on a condition-variable. This -// introduced the case where Enqueue() returned with an empty queue, and this -// case was handled incorrectly in one place.) - -static void ReaderForReaderOnCondVar(absl::Mutex *mu, absl::CondVar *cv, - int *running) { - std::random_device dev; - std::mt19937 gen(dev()); - std::uniform_int_distribution<int> random_millis(0, 15); - mu->ReaderLock(); - while (*running == 3) { - absl::SleepFor(absl::Milliseconds(random_millis(gen))); - cv->WaitWithTimeout(mu, absl::Milliseconds(random_millis(gen))); - } - mu->ReaderUnlock(); - mu->Lock(); - (*running)--; - mu->Unlock(); -} - struct True { template <class... Args> bool operator()(Args...) const { @@ -910,6 +919,33 @@ TEST(Mutex, FunctorCondition) { } } +// -------------------------------------------------------- +// Test for bug with pattern of readers using a condvar. The bug was that if a +// reader went to sleep on a condition variable while one or more other readers +// held the lock, but there were no waiters, the reader count (held in the +// mutex word) would be lost. (This is because Enqueue() had at one time +// always placed the thread on the Mutex queue. Later (CL 4075610), to +// tolerate re-entry into Mutex from a Condition predicate, Enqueue() was +// changed so that it could also place a thread on a condition-variable. This +// introduced the case where Enqueue() returned with an empty queue, and this +// case was handled incorrectly in one place.) + +static void ReaderForReaderOnCondVar(absl::Mutex *mu, absl::CondVar *cv, + int *running) { + std::random_device dev; + std::mt19937 gen(dev()); + std::uniform_int_distribution<int> random_millis(0, 15); + mu->ReaderLock(); + while (*running == 3) { + absl::SleepFor(absl::Milliseconds(random_millis(gen))); + cv->WaitWithTimeout(mu, absl::Milliseconds(random_millis(gen))); + } + mu->ReaderUnlock(); + mu->Lock(); + (*running)--; + mu->Unlock(); +} + static bool IntIsZero(int *x) { return *x == 0; } // Test for reader waiting condition variable when there are other readers @@ -1001,9 +1037,6 @@ TEST(Mutex, AcquireFromCondition) { x.mu0.Unlock(); } -// The deadlock detector is not part of non-prod builds, so do not test it. -#if !defined(ABSL_INTERNAL_USE_NONPROD_MUTEX) - TEST(Mutex, DeadlockDetector) { absl::SetMutexDeadlockDetectionMode(absl::OnDeadlockCycle::kAbort); @@ -1067,7 +1100,7 @@ class ScopedDisableBazelTestWarnings { const char ScopedDisableBazelTestWarnings::kVarName[] = "TEST_WARNINGS_OUTPUT_FILE"; -#ifdef THREAD_SANITIZER +#ifdef ABSL_HAVE_THREAD_SANITIZER // This test intentionally creates deadlocks to test the deadlock detector. TEST(Mutex, DISABLED_DeadlockDetectorBazelWarning) { #else @@ -1101,7 +1134,7 @@ TEST(Mutex, DeadlockDetectorBazelWarning) { // annotation-based static thread-safety analysis is not currently // predicate-aware and cannot tell if the two for-loops that acquire and // release the locks have the same predicates. -TEST(Mutex, DeadlockDetectorStessTest) ABSL_NO_THREAD_SAFETY_ANALYSIS { +TEST(Mutex, DeadlockDetectorStressTest) ABSL_NO_THREAD_SAFETY_ANALYSIS { // Stress test: Here we create a large number of locks and use all of them. // If a deadlock detector keeps a full graph of lock acquisition order, // it will likely be too slow for this test to pass. @@ -1119,9 +1152,9 @@ TEST(Mutex, DeadlockDetectorStessTest) ABSL_NO_THREAD_SAFETY_ANALYSIS { } } -#ifdef THREAD_SANITIZER +#ifdef ABSL_HAVE_THREAD_SANITIZER // TSAN reports errors when locked Mutexes are destroyed. -TEST(Mutex, DISABLED_DeadlockIdBug) NO_THREAD_SAFETY_ANALYSIS { +TEST(Mutex, DISABLED_DeadlockIdBug) ABSL_NO_THREAD_SAFETY_ANALYSIS { #else TEST(Mutex, DeadlockIdBug) ABSL_NO_THREAD_SAFETY_ANALYSIS { #endif @@ -1157,7 +1190,6 @@ TEST(Mutex, DeadlockIdBug) ABSL_NO_THREAD_SAFETY_ANALYSIS { c.Lock(); c.Unlock(); } -#endif // !defined(ABSL_INTERNAL_USE_NONPROD_MUTEX) // -------------------------------------------------------- // Test for timeouts/deadlines on condition waits that are specified using @@ -1672,4 +1704,30 @@ TEST(Mutex, MuTime) { EXPECT_EQ(RunTest(&TestMuTime, threads, iterations, 1), threads * iterations); } +TEST(Mutex, SignalExitedThread) { + // The test may expose a race when Mutex::Unlock signals a thread + // that has already exited. +#if defined(__wasm__) || defined(__asmjs__) + constexpr int kThreads = 1; // OOMs under WASM +#else + constexpr int kThreads = 100; +#endif + std::vector<std::thread> top; + for (unsigned i = 0; i < 2 * std::thread::hardware_concurrency(); i++) { + top.emplace_back([&]() { + for (int i = 0; i < kThreads; i++) { + absl::Mutex mu; + std::thread t([&]() { + mu.Lock(); + mu.Unlock(); + }); + mu.Lock(); + mu.Unlock(); + t.join(); + } + }); + } + for (auto &th : top) th.join(); +} + } // namespace diff --git a/absl/synchronization/notification.h b/absl/synchronization/notification.h index 9a354ca2..4bec2689 100644 --- a/absl/synchronization/notification.h +++ b/absl/synchronization/notification.h @@ -22,7 +22,7 @@ // The `Notification` object maintains a private boolean "notified" state that // transitions to `true` at most once. The `Notification` class provides the // following primary member functions: -// * `HasBeenNotified() `to query its state +// * `HasBeenNotified()` to query its state // * `WaitForNotification*()` to have threads wait until the "notified" state // is `true`. // * `Notify()` to set the notification's "notified" state to `true` and @@ -52,6 +52,7 @@ #include <atomic> +#include "absl/base/attributes.h" #include "absl/base/macros.h" #include "absl/synchronization/mutex.h" #include "absl/time/time.h" @@ -74,7 +75,7 @@ class Notification { // Notification::HasBeenNotified() // // Returns the value of the notification's internal "notified" state. - bool HasBeenNotified() const { + ABSL_MUST_USE_RESULT bool HasBeenNotified() const { return HasBeenNotifiedInternal(&this->notified_yet_); } |