aboutsummaryrefslogtreecommitdiffhomepage
path: root/misc/dispatch.c
blob: 6391caa063fe3e7e78024c2b1ba33e4333d0d33c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
/*
 * This file is part of mpv.
 *
 * mpv is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * mpv is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with mpv.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdbool.h>
#include <assert.h>

#include "common/common.h"
#include "osdep/threads.h"
#include "osdep/timer.h"

#include "dispatch.h"

struct mp_dispatch_queue {
    struct mp_dispatch_item *head, *tail;
    pthread_mutex_t lock;
    pthread_cond_t cond;
    int suspend_requested;
    bool suspended;
    void (*wakeup_fn)(void *wakeup_ctx);
    void *wakeup_ctx;
    // This lock grant access to the target thread's state during suspend mode.
    // During suspend mode, the target thread is blocked in the function
    // mp_dispatch_queue_process(), however this function may be processing
    // dispatch queue items. This lock serializes the dispatch queue processing
    // and external mp_dispatch_lock() calls.
    // Invariant: can be held only while suspended==true, and suspend_requested
    // must be >0 (unless mp_dispatch_queue_process() locks it). In particular,
    // suspend mode must not be left while the lock is held.
    pthread_mutex_t exclusive_lock;
};

struct mp_dispatch_item {
    mp_dispatch_fn fn;
    void *fn_data;
    bool asynchronous;
    bool completed;
    struct mp_dispatch_item *next;
};

static void queue_dtor(void *p)
{
    struct mp_dispatch_queue *queue = p;
    assert(!queue->head);
    assert(!queue->suspend_requested);
    assert(!queue->suspended);
    pthread_cond_destroy(&queue->cond);
    pthread_mutex_destroy(&queue->lock);
    pthread_mutex_destroy(&queue->exclusive_lock);
}

// A dispatch queue lets other threads runs callbacks in a target thread.
// The target thread is the thread which created the queue and which calls
// mp_dispatch_queue_process().
// Free the dispatch queue with talloc_free(). (It must be empty.)
struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
{
    struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
    *queue = (struct mp_dispatch_queue){0};
    talloc_set_destructor(queue, queue_dtor);
    pthread_mutex_init(&queue->exclusive_lock, NULL);
    pthread_mutex_init(&queue->lock, NULL);
    pthread_cond_init(&queue->cond, NULL);
    return queue;
}

// Set a custom function that should be called to guarantee that the target
// thread wakes up. This is intended for use with code that needs to block
// on non-pthread primitives, such as e.g. select(). In the case of select(),
// the wakeup_fn could for example write a byte into a "wakeup" pipe in order
// to unblock the select(). The wakeup_fn is called from the dispatch queue
// when there are new dispatch items, and the target thread should then enter
// mp_dispatch_queue_process() as soon as possible. Note that wakeup_fn is
// called under no lock, so you might have to do synchronization yourself.
void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue,
                               void (*wakeup_fn)(void *wakeup_ctx),
                               void *wakeup_ctx)
{
    queue->wakeup_fn = wakeup_fn;
    queue->wakeup_ctx = wakeup_ctx;
}

static void mp_dispatch_append(struct mp_dispatch_queue *queue,
                               struct mp_dispatch_item *item)
{
    pthread_mutex_lock(&queue->lock);
    if (queue->tail) {
        queue->tail->next = item;
    } else {
        queue->head = item;
    }
    queue->tail = item;
    // Wake up the main thread; note that other threads might wait on this
    // condition for reasons, so broadcast the condition.
    pthread_cond_broadcast(&queue->cond);
    pthread_mutex_unlock(&queue->lock);
    if (queue->wakeup_fn)
        queue->wakeup_fn(queue->wakeup_ctx);
}

// Enqueue a callback to run it on the target thread asynchronously. The target
// thread will run fn(fn_data) as soon as it enter mp_dispatch_queue_process.
// Note that mp_dispatch_enqueue() will usually return long before that happens.
// It's up to the user to signal completion of the callback. It's also up to
// the user to guarantee that the context fn_data has correct lifetime, i.e.
// lives until the callback is run, and is freed after that.
void mp_dispatch_enqueue(struct mp_dispatch_queue *queue,
                         mp_dispatch_fn fn, void *fn_data)
{
    struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
    *item = (struct mp_dispatch_item){
        .fn = fn,
        .fn_data = fn_data,
        .asynchronous = true,
    };
    mp_dispatch_append(queue, item);
}

// Like mp_dispatch_enqueue(), but the queue code will call talloc_free(fn_data)
// after the fn callback has been run. (The callback could trivially do that
// itself, but it makes it easier to implement synchronous and asynchronous
// requests with the same callback implementation.)
void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue,
                                  mp_dispatch_fn fn, void *fn_data)
{
    struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
    *item = (struct mp_dispatch_item){
        .fn = fn,
        .fn_data = talloc_steal(item, fn_data),
        .asynchronous = true,
    };
    mp_dispatch_append(queue, item);
}

// Run fn(fn_data) on the target thread synchronously. This function enqueues
// the callback and waits until the target thread is done doing this.
// This is redundant to calling the function inside mp_dispatch_[un]lock(),
// but can be helpful with code that relies on TLS (such as OpenGL).
void mp_dispatch_run(struct mp_dispatch_queue *queue,
                     mp_dispatch_fn fn, void *fn_data)
{
    struct mp_dispatch_item item = {
        .fn = fn,
        .fn_data = fn_data,
    };
    mp_dispatch_append(queue, &item);

    pthread_mutex_lock(&queue->lock);
    while (!item.completed)
        pthread_cond_wait(&queue->cond, &queue->lock);
    pthread_mutex_unlock(&queue->lock);
}

// Process any outstanding dispatch items in the queue. This also handles
// suspending or locking the target thread.
// The timeout specifies the minimum wait time. The actual time spent in this
// function can be much higher if the suspending/locking functions are used, or
// if executing the dispatch items takes time. On the other hand, this function
// can return much earlier than the timeout due to sporadic wakeups.
// It is also guaranteed that if at least one queue item was processed, the
// function will return as soon as possible, ignoring the timeout. This
// simplifies users, such as re-checking conditions before waiting. (It will
// still process the remaining queue items, and wait for unsuspend.)
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
{
    int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
    pthread_mutex_lock(&queue->lock);
    queue->suspended = true;
    // Wake up thread which called mp_dispatch_suspend().
    pthread_cond_broadcast(&queue->cond);
    while (queue->head || queue->suspend_requested || wait > 0) {
        if (queue->head) {
            struct mp_dispatch_item *item = queue->head;
            queue->head = item->next;
            if (!queue->head)
                queue->tail = NULL;
            item->next = NULL;
            // Unlock, because we want to allow other threads to queue items
            // while the dispatch item is processed.
            // At the same time, exclusive_lock must be held to protect the
            // thread's user state.
            pthread_mutex_unlock(&queue->lock);
            pthread_mutex_lock(&queue->exclusive_lock);
            item->fn(item->fn_data);
            pthread_mutex_unlock(&queue->exclusive_lock);
            pthread_mutex_lock(&queue->lock);
            if (item->asynchronous) {
                talloc_free(item);
            } else {
                item->completed = true;
                // Wakeup mp_dispatch_run()
                pthread_cond_broadcast(&queue->cond);
            }
        } else {
            if (wait > 0) {
                mpthread_cond_timedwait(&queue->cond, &queue->lock, wait);
            } else {
                pthread_cond_wait(&queue->cond, &queue->lock);
            }
        }
        wait = 0;
    }
    queue->suspended = false;
    pthread_mutex_unlock(&queue->lock);
}

// Set the target thread into suspend mode: in this mode, the thread will enter
// mp_dispatch_queue_process(), process any outstanding dispatch items, and
// wait for new items when done (instead of exiting the process function).
// Multiple threads can enter suspend mode at the same time. Suspend mode is
// not a synchronization mechanism; it merely makes sure the target thread does
// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock()
// can be used for exclusive access.
void mp_dispatch_suspend(struct mp_dispatch_queue *queue)
{
    pthread_mutex_lock(&queue->lock);
    queue->suspend_requested++;
    while (!queue->suspended) {
        pthread_mutex_unlock(&queue->lock);
        if (queue->wakeup_fn)
            queue->wakeup_fn(queue->wakeup_ctx);
        pthread_mutex_lock(&queue->lock);
        if (queue->suspended)
            break;
        pthread_cond_wait(&queue->cond, &queue->lock);
    }
    pthread_mutex_unlock(&queue->lock);
}

// Undo mp_dispatch_suspend().
void mp_dispatch_resume(struct mp_dispatch_queue *queue)
{
    pthread_mutex_lock(&queue->lock);
    assert(queue->suspended);
    assert(queue->suspend_requested > 0);
    queue->suspend_requested--;
    if (queue->suspend_requested == 0)
        pthread_cond_broadcast(&queue->cond);
    pthread_mutex_unlock(&queue->lock);
}

// Grant exclusive access to the target thread's state. While this is active,
// no other thread can return from mp_dispatch_lock() (i.e. it behaves like
// a pthread mutex), and no other thread can get dispatch items completed.
// Other threads can still queue asynchronous dispatch items without waiting,
// and the mutex behavior applies to this function only.
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
    mp_dispatch_suspend(queue);
    pthread_mutex_lock(&queue->exclusive_lock);
}

// Undo mp_dispatch_lock().
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
    pthread_mutex_unlock(&queue->exclusive_lock);
    mp_dispatch_resume(queue);
}