// Copyright 2017 The Abseil Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "absl/synchronization/internal/per_thread_sem.h" #include #include // NOLINT(build/c++11) #include #include #include // NOLINT(build/c++11) #include #include // NOLINT(build/c++11) #include "gtest/gtest.h" #include "absl/base/internal/cycleclock.h" #include "absl/base/internal/malloc_extension.h" #include "absl/base/internal/thread_identity.h" #include "absl/strings/str_cat.h" #include "absl/time/clock.h" #include "absl/time/time.h" // In this test we explicitly avoid the use of synchronization // primitives which might use PerThreadSem, most notably absl::Mutex. namespace absl { namespace synchronization_internal { class SimpleSemaphore { public: SimpleSemaphore() : count_(0) {} // Decrements (locks) the semaphore. If the semaphore's value is // greater than zero, then the decrement proceeds, and the function // returns, immediately. If the semaphore currently has the value // zero, then the call blocks until it becomes possible to perform // the decrement. void Wait() { std::unique_lock lock(mu_); cv_.wait(lock, [this]() { return count_ > 0; }); --count_; cv_.notify_one(); } // Increments (unlocks) the semaphore. If the semaphore's value // consequently becomes greater than zero, then another thread // blocked Wait() call will be woken up and proceed to lock the // semaphore. void Post() { std::lock_guard lock(mu_); ++count_; cv_.notify_one(); } private: std::mutex mu_; std::condition_variable cv_; int count_; }; struct ThreadData { int num_iterations; // Number of replies to send. SimpleSemaphore identity2_written; // Posted by thread writing identity2. base_internal::ThreadIdentity *identity1; // First Post()-er. base_internal::ThreadIdentity *identity2; // First Wait()-er. KernelTimeout timeout; }; // Need friendship with PerThreadSem. class PerThreadSemTest : public testing::Test { public: static void TimingThread(ThreadData* t) { t->identity2 = GetOrCreateCurrentThreadIdentity(); t->identity2_written.Post(); while (t->num_iterations--) { Wait(t->timeout); Post(t->identity1); } } void TestTiming(const char *msg, bool timeout) { static const int kNumIterations = 100; ThreadData t; t.num_iterations = kNumIterations; t.timeout = timeout ? KernelTimeout(absl::Now() + absl::Seconds(10000)) // far in the future : KernelTimeout::Never(); t.identity1 = GetOrCreateCurrentThreadIdentity(); // We can't use the Thread class here because it uses the Mutex // class which will invoke PerThreadSem, so we use std::thread instead. std::thread partner_thread(std::bind(TimingThread, &t)); // Wait for our partner thread to register their identity. t.identity2_written.Wait(); int64_t min_cycles = std::numeric_limits::max(); int64_t total_cycles = 0; for (int i = 0; i < kNumIterations; ++i) { absl::SleepFor(absl::Milliseconds(20)); int64_t cycles = base_internal::CycleClock::Now(); Post(t.identity2); Wait(t.timeout); cycles = base_internal::CycleClock::Now() - cycles; min_cycles = std::min(min_cycles, cycles); total_cycles += cycles; } std::string out = StrCat(msg, "min cycle count=", min_cycles, " avg cycle count=", absl::SixDigits(static_cast(total_cycles) / kNumIterations)); printf("%s\n", out.c_str()); partner_thread.join(); } protected: static void Post(base_internal::ThreadIdentity *id) { PerThreadSem::Post(id); } static bool Wait(KernelTimeout t) { return PerThreadSem::Wait(t); } // convenience overload static bool Wait(absl::Time t) { return Wait(KernelTimeout(t)); } static void Tick(base_internal::ThreadIdentity *identity) { PerThreadSem::Tick(identity); } }; namespace { TEST_F(PerThreadSemTest, WithoutTimeout) { PerThreadSemTest::TestTiming("Without timeout: ", false); } TEST_F(PerThreadSemTest, WithTimeout) { PerThreadSemTest::TestTiming("With timeout: ", true); } TEST_F(PerThreadSemTest, Timeouts) { absl::Time timeout = absl::Now() + absl::Milliseconds(50); EXPECT_FALSE(Wait(timeout)); EXPECT_LE(timeout, absl::Now()); absl::Time negative_timeout = absl::UnixEpoch() - absl::Milliseconds(100); EXPECT_FALSE(Wait(negative_timeout)); EXPECT_LE(negative_timeout, absl::Now()); // trivially true :) Post(GetOrCreateCurrentThreadIdentity()); // The wait here has an expired timeout, but we have a wake to consume, // so this should succeed EXPECT_TRUE(Wait(negative_timeout)); } // Test that idle threads properly register themselves as such with malloc. TEST_F(PerThreadSemTest, Idle) { // We can't use gmock because it might use synch calls. So we do it // by hand, messily. I don't bother hitting every one of the // MallocExtension calls because most of them won't get made // anyway--if they do we can add them. class MockMallocExtension : public base_internal::MallocExtension { public: MockMallocExtension(base_internal::MallocExtension *real, base_internal::ThreadIdentity *id, std::atomic *idles, std::atomic *busies) : real_(real), id_(id), idles_(idles), busies_(busies) {} void MarkThreadIdle() override { if (base_internal::CurrentThreadIdentityIfPresent() != id_) { return; } idles_->fetch_add(1, std::memory_order_relaxed); } void MarkThreadBusy() override { if (base_internal::CurrentThreadIdentityIfPresent() != id_) { return; } busies_->fetch_add(1, std::memory_order_relaxed); } size_t GetAllocatedSize(const void* p) override { return real_->GetAllocatedSize(p); } private: MallocExtension *real_; base_internal::ThreadIdentity *id_; std::atomic* idles_; std::atomic* busies_; }; base_internal::ThreadIdentity *id = GetOrCreateCurrentThreadIdentity(); std::atomic idles(0); std::atomic busies(0); base_internal::MallocExtension *old = base_internal::MallocExtension::instance(); MockMallocExtension mock(old, id, &idles, &busies); base_internal::MallocExtension::Register(&mock); std::atomic sync(0); std::thread t([id, &idles, &sync]() { // Wait for the main thread to begin the wait process while (0 == sync.load(std::memory_order_relaxed)) { SleepFor(absl::Milliseconds(1)); } // Wait for main thread to become idle, then wake it // pretend time is passing--enough of these should cause an idling. for (int i = 0; i < 100; ++i) { Tick(id); } while (0 == idles.load(std::memory_order_relaxed)) { // Keep ticking, just in case. Tick(id); SleepFor(absl::Milliseconds(1)); } Post(id); }); idles.store(0, std::memory_order_relaxed); // In case we slept earlier. sync.store(1, std::memory_order_relaxed); Wait(KernelTimeout::Never()); // t will wake us once we become idle. EXPECT_LT(0, busies.load(std::memory_order_relaxed)); t.join(); base_internal::MallocExtension::Register(old); } } // namespace } // namespace synchronization_internal } // namespace absl