diff options
author | Abseil Team <absl-team@google.com> | 2020-12-30 14:30:17 -0800 |
---|---|---|
committer | Derek Mauro <dmauro@google.com> | 2020-12-31 03:16:21 +0000 |
commit | 384af0e9141283172e2bff3210dae79fb7130d9c (patch) | |
tree | ac7fe48ee33eae8f9f79653c7488f892ad233ec8 /absl/flags | |
parent | e7ca23acac146b10edc4f752edd0bd28b0f14ea3 (diff) |
Export of internal Abseil changes
--
465461299a9814aca325fee599cefbfe462f12fe by Abseil Team <absl-team@google.com>:
Optimize trivially copyable flags with a sequence lock
PiperOrigin-RevId: 349602779
--
73f39f959e21121684a51887243abad0814a335e by Abseil Team <absl-team@google.com>:
Internal change
PiperOrigin-RevId: 349590869
--
6b3106fa66b8f075a39a1a8f3265ae132b7e2c84 by Abseil Team <absl-team@google.com>:
Remove ABSL_DLL from `log_prefix_hook` and `abort_hook`.
PiperOrigin-RevId: 349560499
--
bb0d295e699a509f3284145e025d00036b70dbb2 by Abseil Team <absl-team@google.com>:
Tiny docstring fix
A small edit to make "use of this is useful" a little less redundant. :)
PiperOrigin-RevId: 349445689
GitOrigin-RevId: 465461299a9814aca325fee599cefbfe462f12fe
Change-Id: I08cc4091b8b95b68188cb9168ac622dacc5fa688
Diffstat (limited to 'absl/flags')
-rw-r--r-- | absl/flags/BUILD.bazel | 20 | ||||
-rw-r--r-- | absl/flags/CMakeLists.txt | 15 | ||||
-rw-r--r-- | absl/flags/config.h | 11 | ||||
-rw-r--r-- | absl/flags/flag_test.cc | 55 | ||||
-rw-r--r-- | absl/flags/internal/flag.cc | 91 | ||||
-rw-r--r-- | absl/flags/internal/flag.h | 109 | ||||
-rw-r--r-- | absl/flags/internal/sequence_lock.h | 187 | ||||
-rw-r--r-- | absl/flags/internal/sequence_lock_test.cc | 146 |
8 files changed, 507 insertions, 127 deletions
diff --git a/absl/flags/BUILD.bazel b/absl/flags/BUILD.bazel index 6e5c4e87..1937609a 100644 --- a/absl/flags/BUILD.bazel +++ b/absl/flags/BUILD.bazel @@ -191,6 +191,7 @@ cc_library( ], hdrs = [ "internal/flag.h", + "internal/sequence_lock.h", ], copts = ABSL_DEFAULT_COPTS, linkopts = ABSL_DEFAULT_LINKOPTS, @@ -479,6 +480,25 @@ cc_test( ) cc_test( + name = "sequence_lock_test", + size = "small", + timeout = "moderate", + srcs = [ + "internal/sequence_lock_test.cc", + ], + copts = ABSL_TEST_COPTS, + linkopts = ABSL_DEFAULT_LINKOPTS, + shard_count = 32, + deps = [ + ":flag_internal", + "//absl/base", + "//absl/container:fixed_array", + "//absl/time", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( name = "usage_config_test", size = "small", srcs = [ diff --git a/absl/flags/CMakeLists.txt b/absl/flags/CMakeLists.txt index e5083d7b..caac69cf 100644 --- a/absl/flags/CMakeLists.txt +++ b/absl/flags/CMakeLists.txt @@ -176,6 +176,7 @@ absl_cc_library( "internal/flag.cc" HDRS "internal/flag.h" + "internal/sequence_lock.h" COPTS ${ABSL_DEFAULT_COPTS} LINKOPTS @@ -418,6 +419,20 @@ absl_cc_test( absl_cc_test( NAME + flags_sequence_lock_test + SRCS + "internal/sequence_lock_test.cc" + COPTS + ${ABSL_TEST_COPTS} + DEPS + absl::base + absl::flags_internal + absl::time + gmock_main +) + +absl_cc_test( + NAME flags_usage_config_test SRCS "usage_config_test.cc" diff --git a/absl/flags/config.h b/absl/flags/config.h index 813a9257..5ab1f311 100644 --- a/absl/flags/config.h +++ b/absl/flags/config.h @@ -45,17 +45,6 @@ #define ABSL_FLAGS_STRIP_HELP ABSL_FLAGS_STRIP_NAMES #endif -// ABSL_FLAGS_INTERNAL_ATOMIC_DOUBLE_WORD macro is used for using atomics with -// double words, e.g. absl::Duration. -// For reasons in bug https://gcc.gnu.org/bugzilla/show_bug.cgi?id=80878, modern -// versions of GCC do not support cmpxchg16b instruction in standard atomics. -#ifdef ABSL_FLAGS_INTERNAL_ATOMIC_DOUBLE_WORD -#error "ABSL_FLAGS_INTERNAL_ATOMIC_DOUBLE_WORD should not be defined." -#elif defined(__clang__) && defined(__x86_64__) && \ - defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_16) -#define ABSL_FLAGS_INTERNAL_ATOMIC_DOUBLE_WORD 1 -#endif - // ABSL_FLAGS_INTERNAL_HAS_RTTI macro is used for selecting if we can use RTTI // for flag type identification. #ifdef ABSL_FLAGS_INTERNAL_HAS_RTTI diff --git a/absl/flags/flag_test.cc b/absl/flags/flag_test.cc index 654c8122..72507b99 100644 --- a/absl/flags/flag_test.cc +++ b/absl/flags/flag_test.cc @@ -18,6 +18,7 @@ #include <stddef.h> #include <stdint.h> +#include <atomic> #include <cmath> #include <new> #include <string> @@ -26,6 +27,7 @@ #include "gtest/gtest.h" #include "absl/base/attributes.h" +#include "absl/base/macros.h" #include "absl/flags/config.h" #include "absl/flags/declare.h" #include "absl/flags/internal/flag.h" @@ -108,16 +110,16 @@ TEST_F(FlagTest, Traits) { EXPECT_EQ(flags::StorageKind<int64_t>(), flags::FlagValueStorageKind::kOneWordAtomic); -#if defined(ABSL_FLAGS_INTERNAL_ATOMIC_DOUBLE_WORD) EXPECT_EQ(flags::StorageKind<S1>(), - flags::FlagValueStorageKind::kTwoWordsAtomic); + flags::FlagValueStorageKind::kSequenceLocked); EXPECT_EQ(flags::StorageKind<S2>(), - flags::FlagValueStorageKind::kTwoWordsAtomic); -#else - EXPECT_EQ(flags::StorageKind<S1>(), - flags::FlagValueStorageKind::kAlignedBuffer); - EXPECT_EQ(flags::StorageKind<S2>(), - flags::FlagValueStorageKind::kAlignedBuffer); + flags::FlagValueStorageKind::kSequenceLocked); +// Make sure absl::Duration uses the sequence-locked code path. MSVC 2015 +// doesn't consider absl::Duration to be trivially-copyable so we just +// restrict this to clang as it seems to be a well-behaved compiler. +#ifdef __clang__ + EXPECT_EQ(flags::StorageKind<absl::Duration>(), + flags::FlagValueStorageKind::kSequenceLocked); #endif EXPECT_EQ(flags::StorageKind<std::string>(), @@ -583,6 +585,43 @@ TEST_F(FlagTest, TestGetViaReflection) { // -------------------------------------------------------------------- +TEST_F(FlagTest, ConcurrentSetAndGet) { + static constexpr int kNumThreads = 8; + // Two arbitrary durations. One thread will concurrently flip the flag + // between these two values, while the other threads read it and verify + // that no other value is seen. + static const absl::Duration kValidDurations[] = { + absl::Seconds(int64_t{0x6cebf47a9b68c802}) + absl::Nanoseconds(229702057), + absl::Seconds(int64_t{0x23fec0307e4e9d3}) + absl::Nanoseconds(44555374)}; + absl::SetFlag(&FLAGS_test_flag_12, kValidDurations[0]); + + std::atomic<bool> stop{false}; + std::vector<std::thread> threads; + auto* handle = absl::FindCommandLineFlag("test_flag_12"); + for (int i = 0; i < kNumThreads; i++) { + threads.emplace_back([&]() { + while (!stop.load(std::memory_order_relaxed)) { + // Try loading the flag both directly and via a reflection + // handle. + absl::Duration v = absl::GetFlag(FLAGS_test_flag_12); + EXPECT_TRUE(v == kValidDurations[0] || v == kValidDurations[1]); + v = *handle->TryGet<absl::Duration>(); + EXPECT_TRUE(v == kValidDurations[0] || v == kValidDurations[1]); + } + }); + } + absl::Time end_time = absl::Now() + absl::Seconds(1); + int i = 0; + while (absl::Now() < end_time) { + absl::SetFlag(&FLAGS_test_flag_12, + kValidDurations[i++ % ABSL_ARRAYSIZE(kValidDurations)]); + } + stop.store(true, std::memory_order_relaxed); + for (auto& t : threads) t.join(); +} + +// -------------------------------------------------------------------- + int GetDflt1() { return 1; } } // namespace diff --git a/absl/flags/internal/flag.cc b/absl/flags/internal/flag.cc index 1502e7f1..f83c1fe7 100644 --- a/absl/flags/internal/flag.cc +++ b/absl/flags/internal/flag.cc @@ -96,7 +96,8 @@ class FlagState : public flags_internal::FlagStateInterface { counter_(counter) {} ~FlagState() override { - if (flag_impl_.ValueStorageKind() != FlagValueStorageKind::kAlignedBuffer) + if (flag_impl_.ValueStorageKind() != FlagValueStorageKind::kAlignedBuffer && + flag_impl_.ValueStorageKind() != FlagValueStorageKind::kSequenceLocked) return; flags_internal::Delete(flag_impl_.op_, value_.heap_allocated); } @@ -118,11 +119,9 @@ class FlagState : public flags_internal::FlagStateInterface { union SavedValue { explicit SavedValue(void* v) : heap_allocated(v) {} explicit SavedValue(int64_t v) : one_word(v) {} - explicit SavedValue(flags_internal::AlignedTwoWords v) : two_words(v) {} void* heap_allocated; int64_t one_word; - flags_internal::AlignedTwoWords two_words; } value_; bool modified_; bool on_command_line_; @@ -164,17 +163,15 @@ void FlagImpl::Init() { std::memory_order_release); break; } - case FlagValueStorageKind::kTwoWordsAtomic: { + case FlagValueStorageKind::kSequenceLocked: { // For this storage kind the default_value_ always points to gen_func // during initialization. assert(def_kind == FlagDefaultKind::kGenFunc); - alignas(AlignedTwoWords) std::array<char, sizeof(AlignedTwoWords)> buf{}; - (*default_value_.gen_func)(buf.data()); - auto atomic_value = absl::bit_cast<AlignedTwoWords>(buf); - TwoWordsValue().store(atomic_value, std::memory_order_release); + (*default_value_.gen_func)(AtomicBufferValue()); break; } } + seq_lock_.MarkInitialized(); } absl::Mutex* FlagImpl::DataGuard() const { @@ -231,23 +228,21 @@ void FlagImpl::StoreValue(const void* src) { switch (ValueStorageKind()) { case FlagValueStorageKind::kAlignedBuffer: Copy(op_, src, AlignedBufferValue()); + seq_lock_.IncrementModificationCount(); break; case FlagValueStorageKind::kOneWordAtomic: { int64_t one_word_val = 0; std::memcpy(&one_word_val, src, Sizeof(op_)); OneWordValue().store(one_word_val, std::memory_order_release); + seq_lock_.IncrementModificationCount(); break; } - case FlagValueStorageKind::kTwoWordsAtomic: { - AlignedTwoWords two_words_val{0, 0}; - std::memcpy(&two_words_val, src, Sizeof(op_)); - TwoWordsValue().store(two_words_val, std::memory_order_release); + case FlagValueStorageKind::kSequenceLocked: { + seq_lock_.Write(AtomicBufferValue(), src, Sizeof(op_)); break; } } - modified_ = true; - ++counter_; InvokeCallback(); } @@ -266,6 +261,10 @@ FlagFastTypeId FlagImpl::TypeId() const { return flags_internal::FastTypeId(op_); } +int64_t FlagImpl::ModificationCount() const { + return seq_lock_.ModificationCount(); +} + bool FlagImpl::IsSpecifiedOnCommandLine() const { absl::MutexLock l(DataGuard()); return on_command_line_; @@ -291,11 +290,11 @@ std::string FlagImpl::CurrentValue() const { OneWordValue().load(std::memory_order_acquire)); return flags_internal::Unparse(op_, one_word_val.data()); } - case FlagValueStorageKind::kTwoWordsAtomic: { - const auto two_words_val = - absl::bit_cast<std::array<char, sizeof(AlignedTwoWords)>>( - TwoWordsValue().load(std::memory_order_acquire)); - return flags_internal::Unparse(op_, two_words_val.data()); + case FlagValueStorageKind::kSequenceLocked: { + std::unique_ptr<void, DynValueDeleter> cloned(flags_internal::Alloc(op_), + DynValueDeleter{op_}); + ReadSequenceLockedData(cloned.get()); + return flags_internal::Unparse(op_, cloned.get()); } } @@ -345,17 +344,22 @@ std::unique_ptr<FlagStateInterface> FlagImpl::SaveState() { case FlagValueStorageKind::kAlignedBuffer: { return absl::make_unique<FlagState>( *this, flags_internal::Clone(op_, AlignedBufferValue()), modified, - on_command_line, counter_); + on_command_line, ModificationCount()); } case FlagValueStorageKind::kOneWordAtomic: { return absl::make_unique<FlagState>( *this, OneWordValue().load(std::memory_order_acquire), modified, - on_command_line, counter_); + on_command_line, ModificationCount()); } - case FlagValueStorageKind::kTwoWordsAtomic: { - return absl::make_unique<FlagState>( - *this, TwoWordsValue().load(std::memory_order_acquire), modified, - on_command_line, counter_); + case FlagValueStorageKind::kSequenceLocked: { + void* cloned = flags_internal::Alloc(op_); + // Read is guaranteed to be successful because we hold the lock. + bool success = + seq_lock_.TryRead(cloned, AtomicBufferValue(), Sizeof(op_)); + assert(success); + static_cast<void>(success); + return absl::make_unique<FlagState>(*this, cloned, modified, + on_command_line, ModificationCount()); } } return nullptr; @@ -363,21 +367,18 @@ std::unique_ptr<FlagStateInterface> FlagImpl::SaveState() { bool FlagImpl::RestoreState(const FlagState& flag_state) { absl::MutexLock l(DataGuard()); - - if (flag_state.counter_ == counter_) { + if (flag_state.counter_ == ModificationCount()) { return false; } switch (ValueStorageKind()) { case FlagValueStorageKind::kAlignedBuffer: + case FlagValueStorageKind::kSequenceLocked: StoreValue(flag_state.value_.heap_allocated); break; case FlagValueStorageKind::kOneWordAtomic: StoreValue(&flag_state.value_.one_word); break; - case FlagValueStorageKind::kTwoWordsAtomic: - StoreValue(&flag_state.value_.two_words); - break; } modified_ = flag_state.modified_; @@ -400,16 +401,16 @@ void* FlagImpl::AlignedBufferValue() const { return OffsetValue<void>(); } +std::atomic<uint64_t>* FlagImpl::AtomicBufferValue() const { + assert(ValueStorageKind() == FlagValueStorageKind::kSequenceLocked); + return OffsetValue<std::atomic<uint64_t>>(); +} + std::atomic<int64_t>& FlagImpl::OneWordValue() const { assert(ValueStorageKind() == FlagValueStorageKind::kOneWordAtomic); return OffsetValue<FlagOneWordValue>()->value; } -std::atomic<AlignedTwoWords>& FlagImpl::TwoWordsValue() const { - assert(ValueStorageKind() == FlagValueStorageKind::kTwoWordsAtomic); - return OffsetValue<FlagTwoWordsValue>()->value; -} - // Attempts to parse supplied `value` string using parsing routine in the `flag` // argument. If parsing successful, this function replaces the dst with newly // parsed value. In case if any error is encountered in either step, the error @@ -443,15 +444,27 @@ void FlagImpl::Read(void* dst) const { std::memcpy(dst, &one_word_val, Sizeof(op_)); break; } - case FlagValueStorageKind::kTwoWordsAtomic: { - const AlignedTwoWords two_words_val = - TwoWordsValue().load(std::memory_order_acquire); - std::memcpy(dst, &two_words_val, Sizeof(op_)); + case FlagValueStorageKind::kSequenceLocked: { + ReadSequenceLockedData(dst); break; } } } +void FlagImpl::ReadSequenceLockedData(void* dst) const { + int size = Sizeof(op_); + // Attempt to read using the sequence lock. + if (ABSL_PREDICT_TRUE(seq_lock_.TryRead(dst, AtomicBufferValue(), size))) { + return; + } + // We failed due to contention. Acquire the lock to prevent contention + // and try again. + absl::ReaderMutexLock l(DataGuard()); + bool success = seq_lock_.TryRead(dst, AtomicBufferValue(), size); + assert(success); + static_cast<void>(success); +} + void FlagImpl::Write(const void* src) { absl::MutexLock l(DataGuard()); diff --git a/absl/flags/internal/flag.h b/absl/flags/internal/flag.h index 370d8a02..83548143 100644 --- a/absl/flags/internal/flag.h +++ b/absl/flags/internal/flag.h @@ -36,6 +36,7 @@ #include "absl/flags/config.h" #include "absl/flags/internal/commandlineflag.h" #include "absl/flags/internal/registry.h" +#include "absl/flags/internal/sequence_lock.h" #include "absl/flags/marshalling.h" #include "absl/meta/type_traits.h" #include "absl/strings/string_view.h" @@ -308,59 +309,23 @@ using FlagUseOneWordStorage = std::integral_constant< bool, absl::type_traits_internal::is_trivially_copyable<T>::value && (sizeof(T) <= 8)>; -#if defined(ABSL_FLAGS_INTERNAL_ATOMIC_DOUBLE_WORD) -// Clang does not always produce cmpxchg16b instruction when alignment of a 16 -// bytes type is not 16. -struct alignas(16) AlignedTwoWords { - int64_t first; - int64_t second; - - bool IsInitialized() const { - return first != flags_internal::UninitializedFlagValue(); - } -}; - -template <typename T> -using FlagUseTwoWordsStorage = std::integral_constant< +template <class T> +using FlagShouldUseSequenceLock = std::integral_constant< bool, absl::type_traits_internal::is_trivially_copyable<T>::value && - (sizeof(T) > 8) && (sizeof(T) <= 16)>; -#else -// This is actually unused and only here to avoid ifdefs in other palces. -struct AlignedTwoWords { - constexpr AlignedTwoWords() noexcept : dummy() {} - constexpr AlignedTwoWords(int64_t, int64_t) noexcept : dummy() {} - char dummy; - - bool IsInitialized() const { - std::abort(); - return true; - } -}; - -// This trait should be type dependent, otherwise SFINAE below will fail -template <typename T> -using FlagUseTwoWordsStorage = - std::integral_constant<bool, sizeof(T) != sizeof(T)>; -#endif - -template <typename T> -using FlagUseBufferStorage = - std::integral_constant<bool, !FlagUseOneWordStorage<T>::value && - !FlagUseTwoWordsStorage<T>::value>; + (sizeof(T) > 8)>; enum class FlagValueStorageKind : uint8_t { kAlignedBuffer = 0, kOneWordAtomic = 1, - kTwoWordsAtomic = 2 + kSequenceLocked = 2, }; template <typename T> static constexpr FlagValueStorageKind StorageKind() { - return FlagUseBufferStorage<T>::value - ? FlagValueStorageKind::kAlignedBuffer - : FlagUseOneWordStorage<T>::value - ? FlagValueStorageKind::kOneWordAtomic - : FlagValueStorageKind::kTwoWordsAtomic; + return FlagUseOneWordStorage<T>::value ? FlagValueStorageKind::kOneWordAtomic + : FlagShouldUseSequenceLock<T>::value + ? FlagValueStorageKind::kSequenceLocked + : FlagValueStorageKind::kAlignedBuffer; } struct FlagOneWordValue { @@ -369,27 +334,20 @@ struct FlagOneWordValue { std::atomic<int64_t> value; }; -struct FlagTwoWordsValue { - constexpr FlagTwoWordsValue() - : value(AlignedTwoWords{UninitializedFlagValue(), 0}) {} - - std::atomic<AlignedTwoWords> value; -}; - template <typename T, FlagValueStorageKind Kind = flags_internal::StorageKind<T>()> struct FlagValue; template <typename T> struct FlagValue<T, FlagValueStorageKind::kAlignedBuffer> { - bool Get(T&) const { return false; } + bool Get(const SequenceLock&, T&) const { return false; } alignas(T) char value[sizeof(T)]; }; template <typename T> struct FlagValue<T, FlagValueStorageKind::kOneWordAtomic> : FlagOneWordValue { - bool Get(T& dst) const { + bool Get(const SequenceLock&, T& dst) const { int64_t one_word_val = value.load(std::memory_order_acquire); if (ABSL_PREDICT_FALSE(one_word_val == UninitializedFlagValue())) { return false; @@ -400,15 +358,16 @@ struct FlagValue<T, FlagValueStorageKind::kOneWordAtomic> : FlagOneWordValue { }; template <typename T> -struct FlagValue<T, FlagValueStorageKind::kTwoWordsAtomic> : FlagTwoWordsValue { - bool Get(T& dst) const { - AlignedTwoWords two_words_val = value.load(std::memory_order_acquire); - if (ABSL_PREDICT_FALSE(!two_words_val.IsInitialized())) { - return false; - } - std::memcpy(&dst, static_cast<const void*>(&two_words_val), sizeof(T)); - return true; +struct FlagValue<T, FlagValueStorageKind::kSequenceLocked> { + bool Get(const SequenceLock& lock, T& dst) const { + return lock.TryRead(&dst, value_words, sizeof(T)); } + + static constexpr int kNumWords = + flags_internal::AlignUp(sizeof(T), sizeof(uint64_t)) / sizeof(uint64_t); + + alignas(T) alignas( + std::atomic<uint64_t>) std::atomic<uint64_t> value_words[kNumWords]; }; /////////////////////////////////////////////////////////////////////////////// @@ -451,7 +410,6 @@ class FlagImpl final : public CommandLineFlag { def_kind_(static_cast<uint8_t>(default_arg.kind)), modified_(false), on_command_line_(false), - counter_(0), callback_(nullptr), default_value_(default_arg.source), data_guard_{} {} @@ -498,15 +456,17 @@ class FlagImpl final : public CommandLineFlag { // flag.cc, we can define it in that file as well. template <typename StorageT> StorageT* OffsetValue() const; - // This is an accessor for a value stored in an aligned buffer storage. + // This is an accessor for a value stored in an aligned buffer storage + // used for non-trivially-copyable data types. // Returns a mutable pointer to the start of a buffer. void* AlignedBufferValue() const; + + // The same as above, but used for sequencelock-protected storage. + std::atomic<uint64_t>* AtomicBufferValue() const; + // This is an accessor for a value stored as one word atomic. Returns a // mutable reference to an atomic value. std::atomic<int64_t>& OneWordValue() const; - // This is an accessor for a value stored as two words atomic. Returns a - // mutable reference to an atomic value. - std::atomic<AlignedTwoWords>& TwoWordsValue() const; // Attempts to parse supplied `value` string. If parsing is successful, // returns new value. Otherwise returns nullptr. @@ -516,6 +476,12 @@ class FlagImpl final : public CommandLineFlag { // Stores the flag value based on the pointer to the source. void StoreValue(const void* src) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*DataGuard()); + // Copy the flag data, protected by `seq_lock_` into `dst`. + // + // REQUIRES: ValueStorageKind() == kSequenceLocked. + void ReadSequenceLockedData(void* dst) const + ABSL_LOCKS_EXCLUDED(*DataGuard()); + FlagHelpKind HelpSourceKind() const { return static_cast<FlagHelpKind>(help_source_kind_); } @@ -541,6 +507,8 @@ class FlagImpl final : public CommandLineFlag { void CheckDefaultValueParsingRoundtrip() const override ABSL_LOCKS_EXCLUDED(*DataGuard()); + int64_t ModificationCount() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(*DataGuard()); + // Interfaces to save and restore flags to/from persistent state. // Returns current flag state or nullptr if flag does not support // saving and restoring a state. @@ -587,8 +555,9 @@ class FlagImpl final : public CommandLineFlag { // Unique tag for absl::call_once call to initialize this flag. absl::once_flag init_control_; - // Mutation counter - int64_t counter_ ABSL_GUARDED_BY(*DataGuard()); + // Sequence lock / mutation counter. + flags_internal::SequenceLock seq_lock_; + // Optional flag's callback and absl::Mutex to guard the invocations. FlagCallback* callback_ ABSL_GUARDED_BY(*DataGuard()); // Either a pointer to the function generating the default value based on the @@ -649,7 +618,9 @@ class Flag { impl_.AssertValidType(base_internal::FastTypeId<T>(), &GenRuntimeTypeId<T>); #endif - if (!value_.Get(u.value)) impl_.Read(&u.value); + if (ABSL_PREDICT_FALSE(!value_.Get(impl_.seq_lock_, u.value))) { + impl_.Read(&u.value); + } return std::move(u.value); } void Set(const T& v) { diff --git a/absl/flags/internal/sequence_lock.h b/absl/flags/internal/sequence_lock.h new file mode 100644 index 00000000..807b2a73 --- /dev/null +++ b/absl/flags/internal/sequence_lock.h @@ -0,0 +1,187 @@ +// +// Copyright 2020 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ABSL_FLAGS_INTERNAL_SEQUENCE_LOCK_H_ +#define ABSL_FLAGS_INTERNAL_SEQUENCE_LOCK_H_ + +#include <stddef.h> +#include <stdint.h> + +#include <atomic> +#include <cassert> +#include <cstring> + +#include "absl/base/optimization.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace flags_internal { + +// Align 'x' up to the nearest 'align' bytes. +inline constexpr size_t AlignUp(size_t x, size_t align) { + return align * ((x + align - 1) / align); +} + +// A SequenceLock implements lock-free reads. A sequence counter is incremented +// before and after each write, and readers access the counter before and after +// accessing the protected data. If the counter is verified to not change during +// the access, and the sequence counter value was even, then the reader knows +// that the read was race-free and valid. Otherwise, the reader must fall back +// to a Mutex-based code path. +// +// This particular SequenceLock starts in an "uninitialized" state in which +// TryRead() returns false. It must be enabled by calling MarkInitialized(). +// This serves as a marker that the associated flag value has not yet been +// initialized and a slow path needs to be taken. +// +// The memory reads and writes protected by this lock must use the provided +// `TryRead()` and `Write()` functions. These functions behave similarly to +// `memcpy()`, with one oddity: the protected data must be an array of +// `std::atomic<int64>`. This is to comply with the C++ standard, which +// considers data races on non-atomic objects to be undefined behavior. See "Can +// Seqlocks Get Along With Programming Language Memory Models?"[1] by Hans J. +// Boehm for more details. +// +// [1] https://www.hpl.hp.com/techreports/2012/HPL-2012-68.pdf +class SequenceLock { + public: + constexpr SequenceLock() : lock_(kUninitialized) {} + + // Mark that this lock is ready for use. + void MarkInitialized() { + assert(lock_.load(std::memory_order_relaxed) == kUninitialized); + lock_.store(0, std::memory_order_release); + } + + // Copy "size" bytes of data from "src" to "dst", protected as a read-side + // critical section of the sequence lock. + // + // Unlike traditional sequence lock implementations which loop until getting a + // clean read, this implementation returns false in the case of concurrent + // calls to `Write`. In such a case, the caller should fall back to a + // locking-based slow path. + // + // Returns false if the sequence lock was not yet marked as initialized. + // + // NOTE: If this returns false, "dst" may be overwritten with undefined + // (potentially uninitialized) data. + bool TryRead(void* dst, const std::atomic<uint64_t>* src, size_t size) const { + // Acquire barrier ensures that no loads done by f() are reordered + // above the first load of the sequence counter. + int64_t seq_before = lock_.load(std::memory_order_acquire); + if (ABSL_PREDICT_FALSE(seq_before & 1) == 1) return false; + RelaxedCopyFromAtomic(dst, src, size); + // Another acquire fence ensures that the load of 'lock_' below is + // strictly ordered after the RelaxedCopyToAtomic call above. + std::atomic_thread_fence(std::memory_order_acquire); + int64_t seq_after = lock_.load(std::memory_order_relaxed); + return ABSL_PREDICT_TRUE(seq_before == seq_after); + } + + // Copy "size" bytes from "src" to "dst" as a write-side critical section + // of the sequence lock. Any concurrent readers will be forced to retry + // until they get a read that does not conflict with this write. + // + // This call must be externally synchronized against other calls to Write, + // but may proceed concurrently with reads. + void Write(std::atomic<uint64_t>* dst, const void* src, size_t size) { + // We can use relaxed instructions to increment the counter since we + // are extenally synchronized. The std::atomic_thread_fence below + // ensures that the counter updates don't get interleaved with the + // copy to the data. + int64_t orig_seq = lock_.load(std::memory_order_relaxed); + assert((orig_seq & 1) == 0); // Must be initially unlocked. + lock_.store(orig_seq + 1, std::memory_order_relaxed); + + // We put a release fence between update to lock_ and writes to shared data. + // Thus all stores to shared data are effectively release operations and + // update to lock_ above cannot be re-ordered past any of them. Note that + // this barrier is not for the fetch_add above. A release barrier for the + // fetch_add would be before it, not after. + std::atomic_thread_fence(std::memory_order_release); + RelaxedCopyToAtomic(dst, src, size); + // "Release" semantics ensure that none of the writes done by + // RelaxedCopyToAtomic() can be reordered after the following modification. + lock_.store(orig_seq + 2, std::memory_order_release); + } + + // Return the number of times that Write() has been called. + // + // REQUIRES: This must be externally synchronized against concurrent calls to + // `Write()` or `IncrementModificationCount()`. + // REQUIRES: `MarkInitialized()` must have been previously called. + int64_t ModificationCount() const { + int64_t val = lock_.load(std::memory_order_relaxed); + assert(val != kUninitialized && (val & 1) == 0); + return val / 2; + } + + // REQUIRES: This must be externally synchronized against concurrent calls to + // `Write()` or `ModificationCount()`. + // REQUIRES: `MarkInitialized()` must have been previously called. + void IncrementModificationCount() { + int64_t val = lock_.load(std::memory_order_relaxed); + assert(val != kUninitialized); + lock_.store(val + 2, std::memory_order_relaxed); + } + + private: + // Perform the equivalent of "memcpy(dst, src, size)", but using relaxed + // atomics. + static void RelaxedCopyFromAtomic(void* dst, const std::atomic<uint64_t>* src, + size_t size) { + char* dst_byte = static_cast<char*>(dst); + while (size >= sizeof(uint64_t)) { + uint64_t word = src->load(std::memory_order_relaxed); + std::memcpy(dst_byte, &word, sizeof(word)); + dst_byte += sizeof(word); + src++; + size -= sizeof(word); + } + if (size > 0) { + uint64_t word = src->load(std::memory_order_relaxed); + std::memcpy(dst_byte, &word, size); + } + } + + // Perform the equivalent of "memcpy(dst, src, size)", but using relaxed + // atomics. + static void RelaxedCopyToAtomic(std::atomic<uint64_t>* dst, const void* src, + size_t size) { + const char* src_byte = static_cast<const char*>(src); + while (size >= sizeof(uint64_t)) { + uint64_t word; + std::memcpy(&word, src_byte, sizeof(word)); + dst->store(word, std::memory_order_relaxed); + src_byte += sizeof(word); + dst++; + size -= sizeof(word); + } + if (size > 0) { + uint64_t word = 0; + std::memcpy(&word, src_byte, size); + dst->store(word, std::memory_order_relaxed); + } + } + + static constexpr int64_t kUninitialized = -1; + std::atomic<int64_t> lock_; +}; + +} // namespace flags_internal +ABSL_NAMESPACE_END +} // namespace absl + +#endif // ABSL_FLAGS_INTERNAL_SEQUENCE_LOCK_H_ diff --git a/absl/flags/internal/sequence_lock_test.cc b/absl/flags/internal/sequence_lock_test.cc new file mode 100644 index 00000000..9aff1edc --- /dev/null +++ b/absl/flags/internal/sequence_lock_test.cc @@ -0,0 +1,146 @@ +// Copyright 2020 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "absl/flags/internal/sequence_lock.h" + +#include <atomic> +#include <thread> // NOLINT(build/c++11) +#include <tuple> +#include <vector> + +#include "gtest/gtest.h" +#include "absl/base/internal/sysinfo.h" +#include "absl/container/fixed_array.h" +#include "absl/time/clock.h" + +namespace { + +namespace flags = absl::flags_internal; + +class ConcurrentSequenceLockTest + : public testing::TestWithParam<std::tuple<int, int>> { + public: + ConcurrentSequenceLockTest() + : buf_bytes_(std::get<0>(GetParam())), + num_threads_(std::get<1>(GetParam())) {} + + protected: + const int buf_bytes_; + const int num_threads_; +}; + +TEST_P(ConcurrentSequenceLockTest, ReadAndWrite) { + const int buf_words = + flags::AlignUp(buf_bytes_, sizeof(uint64_t)) / sizeof(uint64_t); + + // The buffer that will be protected by the SequenceLock. + absl::FixedArray<std::atomic<uint64_t>> protected_buf(buf_words); + for (auto& v : protected_buf) v = -1; + + flags::SequenceLock seq_lock; + std::atomic<bool> stop{false}; + std::atomic<int64_t> bad_reads{0}; + std::atomic<int64_t> good_reads{0}; + std::atomic<int64_t> unsuccessful_reads{0}; + + // Start a bunch of threads which read 'protected_buf' under the sequence + // lock. The main thread will concurrently update 'protected_buf'. The updates + // always consist of an array of identical integers. The reader ensures that + // any data it reads matches that pattern (i.e. the reads are not "torn"). + std::vector<std::thread> threads; + for (int i = 0; i < num_threads_; i++) { + threads.emplace_back([&]() { + absl::FixedArray<char> local_buf(buf_bytes_); + while (!stop.load(std::memory_order_relaxed)) { + if (seq_lock.TryRead(local_buf.data(), protected_buf.data(), + buf_bytes_)) { + bool good = true; + for (const auto& v : local_buf) { + if (v != local_buf[0]) good = false; + } + if (good) { + good_reads.fetch_add(1, std::memory_order_relaxed); + } else { + bad_reads.fetch_add(1, std::memory_order_relaxed); + } + } else { + unsuccessful_reads.fetch_add(1, std::memory_order_relaxed); + } + } + }); + } + while (unsuccessful_reads.load(std::memory_order_relaxed) < num_threads_) { + absl::SleepFor(absl::Milliseconds(1)); + } + seq_lock.MarkInitialized(); + + // Run a maximum of 5 seconds. On Windows, the scheduler behavior seems + // somewhat unfair and without an explicit timeout for this loop, the tests + // can run a long time. + absl::Time deadline = absl::Now() + absl::Seconds(5); + for (int i = 0; i < 100 && absl::Now() < deadline; i++) { + absl::FixedArray<char> writer_buf(buf_bytes_); + for (auto& v : writer_buf) v = i; + seq_lock.Write(protected_buf.data(), writer_buf.data(), buf_bytes_); + absl::SleepFor(absl::Microseconds(10)); + } + stop.store(true, std::memory_order_relaxed); + for (auto& t : threads) t.join(); + ASSERT_GE(good_reads, 0); + ASSERT_EQ(bad_reads, 0); +} + +// Simple helper for generating a range of thread counts. +// Generates [low, low*scale, low*scale^2, ...high) +// (even if high is between low*scale^k and low*scale^(k+1)). +std::vector<int> MultiplicativeRange(int low, int high, int scale) { + std::vector<int> result; + for (int current = low; current < high; current *= scale) { + result.push_back(current); + } + result.push_back(high); + return result; +} + +INSTANTIATE_TEST_SUITE_P(TestManyByteSizes, ConcurrentSequenceLockTest, + testing::Combine( + // Buffer size (bytes). + testing::Range(1, 128), + // Number of reader threads. + testing::ValuesIn(MultiplicativeRange( + 1, absl::base_internal::NumCPUs(), 2)))); + +// Simple single-threaded test, parameterized by the size of the buffer to be +// protected. +class SequenceLockTest : public testing::TestWithParam<int> {}; + +TEST_P(SequenceLockTest, SingleThreaded) { + const int size = GetParam(); + absl::FixedArray<std::atomic<uint64_t>> protected_buf( + flags::AlignUp(size, sizeof(uint64_t)) / sizeof(uint64_t)); + + flags::SequenceLock seq_lock; + seq_lock.MarkInitialized(); + + std::vector<char> src_buf(size, 'x'); + seq_lock.Write(protected_buf.data(), src_buf.data(), size); + + std::vector<char> dst_buf(size, '0'); + ASSERT_TRUE(seq_lock.TryRead(dst_buf.data(), protected_buf.data(), size)); + ASSERT_EQ(src_buf, dst_buf); +} +INSTANTIATE_TEST_SUITE_P(TestManyByteSizes, SequenceLockTest, + // Buffer size (bytes). + testing::Range(1, 128)); + +} // namespace |