aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/common/concurrent_ring_buffer.h
blob: c5889513a9b0512b16de15e950244e40466aff4c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// Copyright 2014 Citra Emulator Project
// Licensed under GPLv2 or any later version
// Refer to the license.txt file included.

#pragma once

#include <array>
#include <condition_variable>
#include <cstdint>
#include <mutex>
#include <thread>

#include "common/common_types.h" // for NonCopyable

namespace Common {

/**
 * A MPMC (Multiple-Producer Multiple-Consumer) concurrent ring buffer. This data structure permits
 * multiple threads to push and pop from a queue of bounded size.
 */
template <typename T, size_t ArraySize>
class ConcurrentRingBuffer : private NonCopyable {
public:
    /// Value returned by the popping functions when the queue has been closed.
    static const size_t QUEUE_CLOSED = -1;

    ConcurrentRingBuffer() {}

    ~ConcurrentRingBuffer() {
        // If for whatever reason the queue wasn't completely drained, destroy the left over items.
        for (size_t i = reader_index, end = writer_index; i != end; i = (i + 1) % ArraySize) {
            Data()[i].~T();
        }
    }

    /**
     * Pushes a value to the queue. If the queue is full, this method will block. Does nothing if
     * the queue is closed.
     */
    void Push(T val) {
        std::unique_lock<std::mutex> lock(mutex);
        if (closed) {
            return;
        }

        // If the buffer is full, wait
        writer.wait(lock, [&]{
            return (writer_index + 1) % ArraySize != reader_index;
        });

        T* item = &Data()[writer_index];
        new (item) T(std::move(val));

        writer_index = (writer_index + 1) % ArraySize;

        // Wake up waiting readers
        lock.unlock();
        reader.notify_one();
    }

    /**
     * Pops up to `dest_len` items from the queue, storing them in `dest`. This function will not
     * block, and might return 0 values if there are no elements in the queue when it is called.
     *
     * @return The number of elements stored in `dest`. If the queue has been closed, returns
     *          `QUEUE_CLOSED`.
     */
    size_t Pop(T* dest, size_t dest_len) {
        std::unique_lock<std::mutex> lock(mutex);
        if (closed && !CanRead()) {
            return QUEUE_CLOSED;
        }
        return PopInternal(dest, dest_len);
    }

    /**
     * Pops up to `dest_len` items from the queue, storing them in `dest`. This function will block
     * if there are no elements in the queue when it is called.
     *
     * @return The number of elements stored in `dest`. If the queue has been closed, returns
     *         `QUEUE_CLOSED`.
     */
    size_t BlockingPop(T* dest, size_t dest_len) {
        std::unique_lock<std::mutex> lock(mutex);
        if (closed && !CanRead()) {
            return QUEUE_CLOSED;
        }

        while (!CanRead()) {
            reader.wait(lock);
            if (closed && !CanRead()) {
                return QUEUE_CLOSED;
            }
        }
        DEBUG_ASSERT(CanRead());
        return PopInternal(dest, dest_len);
    }

    /**
     * Closes the queue. After calling this method, `Push` operations won't have any effect, and
     * `PopMany` and `PopManyBlock` will start returning `QUEUE_CLOSED`. This is intended to allow
     * a graceful shutdown of all consumers.
     */
    void Close() {
        std::unique_lock<std::mutex> lock(mutex);
        closed = true;
        // We need to wake up any reader that are waiting for an item that will never come.
        lock.unlock();
        reader.notify_all();
    }

    /// Returns true if `Close()` has been called.
    bool IsClosed() const {
        return closed;
    }

private:
    size_t PopInternal(T* dest, size_t dest_len) {
        size_t output_count = 0;
        while (output_count < dest_len && CanRead()) {
            DEBUG_ASSERT(CanRead());

            T* item = &Data()[reader_index];
            T out_val = std::move(*item);
            item->~T();

            size_t prev_index = (reader_index + ArraySize - 1) % ArraySize;
            reader_index = (reader_index + 1) % ArraySize;
            if (writer_index == prev_index) {
                writer.notify_one();
            }
            dest[output_count++] = std::move(out_val);
        }
        return output_count;
    }

    bool CanRead() const {
        return reader_index != writer_index;
    }

    T* Data() {
        return static_cast<T*>(static_cast<void*>(&storage));
    }

    /// Storage for entries
    typename std::aligned_storage<ArraySize * sizeof(T),
                                  std::alignment_of<T>::value>::type storage;

    /// Data is valid in the half-open interval [reader, writer). If they are `QUEUE_CLOSED` then the
    /// queue has been closed.
    size_t writer_index = 0, reader_index = 0;
    // True if the queue has been closed.
    bool closed = false;

    /// Mutex that protects the entire data structure.
    std::mutex mutex;
    /// Signaling wakes up reader which is waiting for storage to be non-empty.
    std::condition_variable reader;
    /// Signaling wakes up writer which is waiting for storage to be non-full.
    std::condition_variable writer;
};

} // namespace