diff options
-rw-r--r-- | absl/synchronization/mutex.cc | 51 | ||||
-rw-r--r-- | absl/synchronization/mutex_test.cc | 30 |
2 files changed, 59 insertions, 22 deletions
diff --git a/absl/synchronization/mutex.cc b/absl/synchronization/mutex.cc index 0742680c..e2ee411f 100644 --- a/absl/synchronization/mutex.cc +++ b/absl/synchronization/mutex.cc @@ -648,11 +648,16 @@ static const intptr_t kMuReader = 0x0001L; // a reader holds the lock // set when a thread is unblocked(INV1a), and threads that were // unblocked reset the bit when they either acquire or re-block (INV1b). static const intptr_t kMuDesig = 0x0002L; -static const intptr_t kMuWait = 0x0004L; // threads are waiting -static const intptr_t kMuWriter = 0x0008L; // a writer holds the lock -static const intptr_t kMuEvent = 0x0010L; // record this mutex's events -static const intptr_t kMuWrWait = 0x0020L; // runnable writer is waiting - // for a reader +static const intptr_t kMuWait = 0x0004L; // threads are waiting +static const intptr_t kMuWriter = 0x0008L; // a writer holds the lock +static const intptr_t kMuEvent = 0x0010L; // record this mutex's events +// Runnable writer is waiting for a reader. +// If set, new readers will not lock the mutex to avoid writer starvation. +// Note: if a reader has higher priority than the writer, it will still lock +// the mutex ahead of the waiting writer, but in a very inefficient manner: +// the reader will first queue itself and block, but then the last unlocking +// reader will wake it. +static const intptr_t kMuWrWait = 0x0020L; static const intptr_t kMuSpin = 0x0040L; // spinlock protects wait list static const intptr_t kMuLow = 0x00ffL; // mask all mutex bits static const intptr_t kMuHigh = ~kMuLow; // mask pointer/reader count @@ -919,6 +924,25 @@ static PerThreadSynch *Enqueue(PerThreadSynch *head, s->may_skip = true; // always true on entering queue s->wake = false; // not being woken s->cond_waiter = ((flags & kMuIsCond) != 0); +#ifdef ABSL_HAVE_PTHREAD_GETSCHEDPARAM + int64_t now_cycles = base_internal::CycleClock::Now(); + if (s->next_priority_read_cycles < now_cycles) { + // Every so often, update our idea of the thread's priority. + // pthread_getschedparam() is 5% of the block/wakeup time; + // base_internal::CycleClock::Now() is 0.5%. + int policy; + struct sched_param param; + const int err = pthread_getschedparam(pthread_self(), &policy, ¶m); + if (err != 0) { + ABSL_RAW_LOG(ERROR, "pthread_getschedparam failed: %d", err); + } else { + s->priority = param.sched_priority; + s->next_priority_read_cycles = + now_cycles + + static_cast<int64_t>(base_internal::CycleClock::Frequency()); + } + } +#endif if (head == nullptr) { // s is the only waiter s->next = s; // it's the only entry in the cycle s->readers = mu; // reader count is from mu word @@ -927,23 +951,6 @@ static PerThreadSynch *Enqueue(PerThreadSynch *head, } else { PerThreadSynch *enqueue_after = nullptr; // we'll put s after this element #ifdef ABSL_HAVE_PTHREAD_GETSCHEDPARAM - int64_t now_cycles = base_internal::CycleClock::Now(); - if (s->next_priority_read_cycles < now_cycles) { - // Every so often, update our idea of the thread's priority. - // pthread_getschedparam() is 5% of the block/wakeup time; - // base_internal::CycleClock::Now() is 0.5%. - int policy; - struct sched_param param; - const int err = pthread_getschedparam(pthread_self(), &policy, ¶m); - if (err != 0) { - ABSL_RAW_LOG(ERROR, "pthread_getschedparam failed: %d", err); - } else { - s->priority = param.sched_priority; - s->next_priority_read_cycles = - now_cycles + - static_cast<int64_t>(base_internal::CycleClock::Frequency()); - } - } if (s->priority > head->priority) { // s's priority is above head's // try to put s in priority-fifo order, or failing that at the front. if (!head->maybe_unlocking) { diff --git a/absl/synchronization/mutex_test.cc b/absl/synchronization/mutex_test.cc index 4ae4d7e7..35802b2e 100644 --- a/absl/synchronization/mutex_test.cc +++ b/absl/synchronization/mutex_test.cc @@ -1838,4 +1838,34 @@ TEST(Mutex, SignalExitedThread) { for (auto &th : top) th.join(); } +TEST(Mutex, WriterPriority) { + absl::Mutex mu; + bool wrote = false; + std::atomic<bool> saw_wrote{false}; + auto readfunc = [&]() { + for (size_t i = 0; i < 10; ++i) { + absl::ReaderMutexLock lock(&mu); + if (wrote) { + saw_wrote = true; + break; + } + absl::SleepFor(absl::Seconds(1)); + } + }; + std::thread t1(readfunc); + absl::SleepFor(absl::Milliseconds(500)); + std::thread t2(readfunc); + // Note: this test guards against a bug that was related to an uninit + // PerThreadSynch::priority, so the writer intentionally runs on a new thread. + std::thread t3([&]() { + // The writer should be able squeeze between the two alternating readers. + absl::MutexLock lock(&mu); + wrote = true; + }); + t1.join(); + t2.join(); + t3.join(); + EXPECT_TRUE(saw_wrote.load()); +} + } // namespace |