From 616d87444313db865c60fbeee36ebe5250ef301e Mon Sep 17 00:00:00 2001 From: Yuri Kunde Schlesner Date: Tue, 28 Oct 2014 05:36:00 -0200 Subject: New logging system --- src/common/concurrent_ring_buffer.h | 164 ++++++++++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 src/common/concurrent_ring_buffer.h (limited to 'src/common/concurrent_ring_buffer.h') diff --git a/src/common/concurrent_ring_buffer.h b/src/common/concurrent_ring_buffer.h new file mode 100644 index 00000000..2951d93d --- /dev/null +++ b/src/common/concurrent_ring_buffer.h @@ -0,0 +1,164 @@ +// Copyright 2014 Citra Emulator Project +// Licensed under GPLv2+ +// Refer to the license.txt file included. + +#pragma once + +#include +#include +#include +#include +#include + +#include "common/common.h" // for NonCopyable +#include "common/log.h" // for _dbg_assert_ + +namespace Common { + +/** + * A MPMC (Multiple-Producer Multiple-Consumer) concurrent ring buffer. This data structure permits + * multiple threads to push and pop from a queue of bounded size. + */ +template +class ConcurrentRingBuffer : private NonCopyable { +public: + /// Value returned by the popping functions when the queue has been closed. + static const size_t QUEUE_CLOSED = -1; + + ConcurrentRingBuffer() {} + + ~ConcurrentRingBuffer() { + // If for whatever reason the queue wasn't completely drained, destroy the left over items. + for (size_t i = reader_index, end = writer_index; i != end; i = (i + 1) % ArraySize) { + Data()[i].~T(); + } + } + + /** + * Pushes a value to the queue. If the queue is full, this method will block. Does nothing if + * the queue is closed. + */ + void Push(T val) { + std::unique_lock lock(mutex); + if (closed) { + return; + } + + // If the buffer is full, wait + writer.wait(lock, [&]{ + return (writer_index + 1) % ArraySize != reader_index; + }); + + T* item = &Data()[writer_index]; + new (item) T(std::move(val)); + + writer_index = (writer_index + 1) % ArraySize; + + // Wake up waiting readers + lock.unlock(); + reader.notify_one(); + } + + /** + * Pops up to `dest_len` items from the queue, storing them in `dest`. This function will not + * block, and might return 0 values if there are no elements in the queue when it is called. + * + * @return The number of elements stored in `dest`. If the queue has been closed, returns + * `QUEUE_CLOSED`. + */ + size_t Pop(T* dest, size_t dest_len) { + std::unique_lock lock(mutex); + if (closed && !CanRead()) { + return QUEUE_CLOSED; + } + return PopInternal(dest, dest_len); + } + + /** + * Pops up to `dest_len` items from the queue, storing them in `dest`. This function will block + * if there are no elements in the queue when it is called. + * + * @return The number of elements stored in `dest`. If the queue has been closed, returns + * `QUEUE_CLOSED`. + */ + size_t BlockingPop(T* dest, size_t dest_len) { + std::unique_lock lock(mutex); + if (closed && !CanRead()) { + return QUEUE_CLOSED; + } + + while (!CanRead()) { + reader.wait(lock); + if (closed && !CanRead()) { + return QUEUE_CLOSED; + } + } + _dbg_assert_(Common, CanRead()); + return PopInternal(dest, dest_len); + } + + /** + * Closes the queue. After calling this method, `Push` operations won't have any effect, and + * `PopMany` and `PopManyBlock` will start returning `QUEUE_CLOSED`. This is intended to allow + * a graceful shutdown of all consumers. + */ + void Close() { + std::unique_lock lock(mutex); + closed = true; + // We need to wake up any reader that are waiting for an item that will never come. + lock.unlock(); + reader.notify_all(); + } + + /// Returns true if `Close()` has been called. + bool IsClosed() const { + return closed; + } + +private: + size_t PopInternal(T* dest, size_t dest_len) { + size_t output_count = 0; + while (output_count < dest_len && CanRead()) { + _dbg_assert_(Common, CanRead()); + + T* item = &Data()[reader_index]; + T out_val = std::move(*item); + item->~T(); + + size_t prev_index = (reader_index + ArraySize - 1) % ArraySize; + reader_index = (reader_index + 1) % ArraySize; + if (writer_index == prev_index) { + writer.notify_one(); + } + dest[output_count++] = std::move(out_val); + } + return output_count; + } + + bool CanRead() const { + return reader_index != writer_index; + } + + T* Data() { + return static_cast(static_cast(&storage)); + } + + /// Storage for entries + typename std::aligned_storage::value>::type storage; + + /// Data is valid in the half-open interval [reader, writer). If they are `QUEUE_CLOSED` then the + /// queue has been closed. + size_t writer_index = 0, reader_index = 0; + // True if the queue has been closed. + bool closed = false; + + /// Mutex that protects the entire data structure. + std::mutex mutex; + /// Signaling wakes up reader which is waiting for storage to be non-empty. + std::condition_variable reader; + /// Signaling wakes up writer which is waiting for storage to be non-full. + std::condition_variable writer; +}; + +} // namespace -- cgit v1.2.3