aboutsummaryrefslogtreecommitdiffhomepage
path: root/misc/dispatch.c
blob: e625f196dc30d71958b62042a93640a0ea4a4364 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
/*
 * This file is part of mpv.
 *
 * mpv is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * mpv is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with mpv.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdbool.h>
#include <assert.h>

#include "common/common.h"
#include "osdep/threads.h"
#include "osdep/timer.h"

#include "dispatch.h"

struct mp_dispatch_queue {
    struct mp_dispatch_item *head, *tail;
    pthread_mutex_t lock;
    pthread_cond_t cond;
    void (*wakeup_fn)(void *wakeup_ctx);
    void *wakeup_ctx;
    // Make mp_dispatch_queue_process() exit if it's idle.
    bool interrupted;
    // The target thread is blocked by mp_dispatch_queue_process(). Note that
    // mp_dispatch_lock() can set this from true to false to keep the thread
    // blocked (this stops if from processing other dispatch items, and from
    // other threads to return from mp_dispatch_lock(), making it an exclusive
    // lock).
    bool idling;
    // A mp_dispatch_lock() call is requesting an exclusive lock.
    bool lock_request;
    // Used to block out threads calling mp_dispatch_queue_process() while
    // they're externall locked via mp_dispatch_lock().
    // We could use a simple counter (increment it instead of adding a frame,
    // also increment it when locking), but with this we can perform some
    // minimal debug checks.
    struct lock_frame *frame;
};

struct lock_frame {
    struct lock_frame *prev;
    pthread_t thread;
    pthread_t locked_thread;
    bool locked;
};

struct mp_dispatch_item {
    mp_dispatch_fn fn;
    void *fn_data;
    bool asynchronous;
    bool mergeable;
    bool completed;
    struct mp_dispatch_item *next;
};

static void queue_dtor(void *p)
{
    struct mp_dispatch_queue *queue = p;
    assert(!queue->head);
    assert(!queue->idling);
    assert(!queue->lock_request);
    assert(!queue->frame);
    pthread_cond_destroy(&queue->cond);
    pthread_mutex_destroy(&queue->lock);
}

// A dispatch queue lets other threads run callbacks in a target thread.
// The target thread is the thread which calls mp_dispatch_queue_process().
// Free the dispatch queue with talloc_free(). At the time of destruction,
// the queue must be empty. The easiest way to guarantee this is to
// terminate all potential senders, then call mp_dispatch_run() with a
// function that e.g. makes the target thread exit, then pthread_join() the
// target thread, and finally destroy the queue. Another way is calling
// mp_dispatch_queue_process() after terminating all potential senders, and
// then destroying the queue.
struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
{
    struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
    *queue = (struct mp_dispatch_queue){0};
    talloc_set_destructor(queue, queue_dtor);
    pthread_mutex_init(&queue->lock, NULL);
    pthread_cond_init(&queue->cond, NULL);
    return queue;
}

// Set a custom function that should be called to guarantee that the target
// thread wakes up. This is intended for use with code that needs to block
// on non-pthread primitives, such as e.g. select(). In the case of select(),
// the wakeup_fn could for example write a byte into a "wakeup" pipe in order
// to unblock the select(). The wakeup_fn is called from the dispatch queue
// when there are new dispatch items, and the target thread should then enter
// mp_dispatch_queue_process() as soon as possible. Note that wakeup_fn is
// called under no lock, so you might have to do synchronization yourself.
void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue,
                               void (*wakeup_fn)(void *wakeup_ctx),
                               void *wakeup_ctx)
{
    queue->wakeup_fn = wakeup_fn;
    queue->wakeup_ctx = wakeup_ctx;
}

static void mp_dispatch_append(struct mp_dispatch_queue *queue,
                               struct mp_dispatch_item *item)
{
    pthread_mutex_lock(&queue->lock);
    if (item->mergeable) {
        for (struct mp_dispatch_item *cur = queue->head; cur; cur = cur->next) {
            if (cur->mergeable && cur->fn == item->fn &&
                cur->fn_data == item->fn_data)
            {
                talloc_free(item);
                pthread_mutex_unlock(&queue->lock);
                return;
            }
        }
    }

    if (queue->tail) {
        queue->tail->next = item;
    } else {
        queue->head = item;
    }
    queue->tail = item;

    // Wake up the main thread; note that other threads might wait on this
    // condition for reasons, so broadcast the condition.
    pthread_cond_broadcast(&queue->cond);
    // No wakeup callback -> assume mp_dispatch_queue_process() needs to be
    // interrupted instead.
    if (!queue->wakeup_fn)
        queue->interrupted = true;
    pthread_mutex_unlock(&queue->lock);

    if (queue->wakeup_fn)
        queue->wakeup_fn(queue->wakeup_ctx);
}

// Enqueue a callback to run it on the target thread asynchronously. The target
// thread will run fn(fn_data) as soon as it enter mp_dispatch_queue_process.
// Note that mp_dispatch_enqueue() will usually return long before that happens.
// It's up to the user to signal completion of the callback. It's also up to
// the user to guarantee that the context fn_data has correct lifetime, i.e.
// lives until the callback is run, and is freed after that.
void mp_dispatch_enqueue(struct mp_dispatch_queue *queue,
                         mp_dispatch_fn fn, void *fn_data)
{
    struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
    *item = (struct mp_dispatch_item){
        .fn = fn,
        .fn_data = fn_data,
        .asynchronous = true,
    };
    mp_dispatch_append(queue, item);
}

// Like mp_dispatch_enqueue(), but the queue code will call talloc_free(fn_data)
// after the fn callback has been run. (The callback could trivially do that
// itself, but it makes it easier to implement synchronous and asynchronous
// requests with the same callback implementation.)
void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue,
                                  mp_dispatch_fn fn, void *fn_data)
{
    struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
    *item = (struct mp_dispatch_item){
        .fn = fn,
        .fn_data = talloc_steal(item, fn_data),
        .asynchronous = true,
    };
    mp_dispatch_append(queue, item);
}

// Like mp_dispatch_enqueue(), but
void mp_dispatch_enqueue_notify(struct mp_dispatch_queue *queue,
                                mp_dispatch_fn fn, void *fn_data)
{
    struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
    *item = (struct mp_dispatch_item){
        .fn = fn,
        .fn_data = fn_data,
        .mergeable = true,
        .asynchronous = true,
    };
    mp_dispatch_append(queue, item);
}

// Remove already queued item. Only items enqueued with the following functions
// can be canceled:
//  - mp_dispatch_enqueue()
//  - mp_dispatch_enqueue_notify()
// Items which were enqueued, and which are currently executing, can not be
// canceled anymore. This function is mostly for being called from the same
// context as mp_dispatch_queue_process(), where the "currently executing" case
// can be excluded.
void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue,
                           mp_dispatch_fn fn, void *fn_data)
{
    pthread_mutex_lock(&queue->lock);
    struct mp_dispatch_item **pcur = &queue->head;
    queue->tail = NULL;
    while (*pcur) {
        struct mp_dispatch_item *cur = *pcur;
        if (cur->fn == fn && cur->fn_data == fn_data) {
            *pcur = cur->next;
            talloc_free(cur);
        } else {
            queue->tail = cur;
            pcur = &cur->next;
        }
    }
    pthread_mutex_unlock(&queue->lock);
}

// Run fn(fn_data) on the target thread synchronously. This function enqueues
// the callback and waits until the target thread is done doing this.
// This is redundant to calling the function inside mp_dispatch_[un]lock(),
// but can be helpful with code that relies on TLS (such as OpenGL).
void mp_dispatch_run(struct mp_dispatch_queue *queue,
                     mp_dispatch_fn fn, void *fn_data)
{
    struct mp_dispatch_item item = {
        .fn = fn,
        .fn_data = fn_data,
    };
    mp_dispatch_append(queue, &item);

    pthread_mutex_lock(&queue->lock);
    while (!item.completed)
        pthread_cond_wait(&queue->cond, &queue->lock);
    pthread_mutex_unlock(&queue->lock);
}

// Process any outstanding dispatch items in the queue. This also handles
// suspending or locking the this thread from another thread via
// mp_dispatch_lock().
// The timeout specifies the minimum wait time. The actual time spent in this
// function can be much higher if the suspending/locking functions are used, or
// if executing the dispatch items takes time. On the other hand, this function
// can return much earlier than the timeout due to sporadic wakeups.
// Note that this will strictly return only after:
//      - timeout has passed,
//      - all queue items were processed,
//      - the possibly acquired lock has been released
// It's possible to cancel the timeout by calling mp_dispatch_interrupt().
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
{
    int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
    struct lock_frame frame = {
        .thread = pthread_self(),
    };

    pthread_mutex_lock(&queue->lock);
    frame.prev = queue->frame;
    queue->frame = &frame;
    // Logically, the queue is idling if the target thread is blocked in
    // mp_dispatch_queue_process() doing nothing, so it's not possible to call
    // it again. (Reentrant calls via callbacks temporarily reset the field.)
    assert(!queue->idling);
    queue->idling = true;
    // Wake up thread which called mp_dispatch_lock().
    if (queue->lock_request)
        pthread_cond_broadcast(&queue->cond);
    while (1) {
        if (queue->lock_request || queue->frame != &frame || frame.locked) {
            // Block due to something having called mp_dispatch_lock(). This
            // is either a lock "acquire" (lock_request=true), or a lock in
            // progress, with the possibility the thread which called
            // mp_dispatch_lock() is now calling mp_dispatch_queue_process()
            // (the latter means we must ignore any queue state changes,
            // until it has been unlocked again).
            pthread_cond_wait(&queue->cond, &queue->lock);
            if (queue->frame == &frame && !frame.locked)
                assert(queue->idling);
        } else if (queue->head) {
            struct mp_dispatch_item *item = queue->head;
            queue->head = item->next;
            if (!queue->head)
                queue->tail = NULL;
            item->next = NULL;
            // Unlock, because we want to allow other threads to queue items
            // while the dispatch item is processed.
            // At the same time, we must prevent other threads from returning
            // from mp_dispatch_lock(), which is done by idling=false.
            queue->idling = false;
            pthread_mutex_unlock(&queue->lock);

            item->fn(item->fn_data);

            pthread_mutex_lock(&queue->lock);
            assert(!queue->idling);
            queue->idling = true;
            // Wakeup mp_dispatch_run(), also mp_dispatch_lock().
            pthread_cond_broadcast(&queue->cond);
            if (item->asynchronous) {
                talloc_free(item);
            } else {
                item->completed = true;
            }
        } else if (wait > 0 && !queue->interrupted) {
            struct timespec ts = mp_time_us_to_timespec(wait);
            if (pthread_cond_timedwait(&queue->cond, &queue->lock, &ts))
                wait = 0;
        } else {
            break;
        }
    }
    queue->idling = false;
    assert(!frame.locked);
    assert(queue->frame == &frame);
    queue->frame = frame.prev;
    queue->interrupted = false;
    pthread_mutex_unlock(&queue->lock);
}

// If the queue is inside of mp_dispatch_queue_process(), make it return as
// soon as all work items have been run, without waiting for the timeout. This
// does not make it return early if it's blocked by a mp_dispatch_lock().
// If mp_dispatch_queue_process() is called in a reentrant way (including the
// case where another thread calls mp_dispatch_lock() and then
// mp_dispatch_queue_process()), this affects only the "topmost" invocation.
void mp_dispatch_interrupt(struct mp_dispatch_queue *queue)
{
    pthread_mutex_lock(&queue->lock);
    queue->interrupted = true;
    pthread_cond_broadcast(&queue->cond);
    pthread_mutex_unlock(&queue->lock);
}

// Grant exclusive access to the target thread's state. While this is active,
// no other thread can return from mp_dispatch_lock() (i.e. it behaves like
// a pthread mutex), and no other thread can get dispatch items completed.
// Other threads can still queue asynchronous dispatch items without waiting,
// and the mutex behavior applies to this function only.
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
    pthread_mutex_lock(&queue->lock);
    // First grab the queue lock. Something else could be holding the lock.
    while (queue->lock_request)
        pthread_cond_wait(&queue->cond, &queue->lock);
    queue->lock_request = true;
    // And now wait until the target thread gets "trapped" within the
    // mp_dispatch_queue_process() call, which will mean we get exclusive
    // access to the target's thread state.
    while (!queue->idling) {
        pthread_mutex_unlock(&queue->lock);
        if (queue->wakeup_fn)
            queue->wakeup_fn(queue->wakeup_ctx);
        pthread_mutex_lock(&queue->lock);
        if (queue->idling)
            break;
        pthread_cond_wait(&queue->cond, &queue->lock);
    }
    assert(queue->lock_request);
    assert(queue->frame); // must be set if idling
    assert(!queue->frame->locked); // no recursive locking on the same level
    // "Lock".
    queue->frame->locked = true;
    queue->frame->locked_thread = pthread_self();
    // Reset state for recursive mp_dispatch_queue_process() calls.
    queue->lock_request = false;
    queue->idling = false;
    pthread_mutex_unlock(&queue->lock);
}

// Undo mp_dispatch_lock().
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
    pthread_mutex_lock(&queue->lock);
    // Must be called atfer a mp_dispatch_lock().
    assert(queue->frame);
    assert(queue->frame->locked);
    assert(pthread_equal(queue->frame->locked_thread, pthread_self()));
    // "Unlock".
    queue->frame->locked = false;
    // This must have been set to false during locking (except temporarily
    // during recursive mp_dispatch_queue_process() calls).
    assert(!queue->idling);
    queue->idling = true;
    // Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
    pthread_cond_broadcast(&queue->cond);
    pthread_mutex_unlock(&queue->lock);
}