aboutsummaryrefslogtreecommitdiffhomepage
path: root/misc/dispatch.c
diff options
context:
space:
mode:
Diffstat (limited to 'misc/dispatch.c')
-rw-r--r--misc/dispatch.c135
1 files changed, 60 insertions, 75 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c
index e625f196dc..940cbb21d7 100644
--- a/misc/dispatch.c
+++ b/misc/dispatch.c
@@ -32,27 +32,19 @@ struct mp_dispatch_queue {
void *wakeup_ctx;
// Make mp_dispatch_queue_process() exit if it's idle.
bool interrupted;
- // The target thread is blocked by mp_dispatch_queue_process(). Note that
- // mp_dispatch_lock() can set this from true to false to keep the thread
- // blocked (this stops if from processing other dispatch items, and from
- // other threads to return from mp_dispatch_lock(), making it an exclusive
- // lock).
- bool idling;
- // A mp_dispatch_lock() call is requesting an exclusive lock.
- bool lock_request;
- // Used to block out threads calling mp_dispatch_queue_process() while
- // they're externall locked via mp_dispatch_lock().
- // We could use a simple counter (increment it instead of adding a frame,
- // also increment it when locking), but with this we can perform some
- // minimal debug checks.
- struct lock_frame *frame;
-};
-
-struct lock_frame {
- struct lock_frame *prev;
- pthread_t thread;
- pthread_t locked_thread;
+ // The target thread is in mp_dispatch_queue_process() (and either idling,
+ // locked, or running a dispatch callback).
+ bool in_process;
+ pthread_t in_process_thread;
+ // The target thread is in mp_dispatch_queue_process(), and currently
+ // something has exclusive access to it (e.g. running a dispatch callback,
+ // or a different thread got it with mp_dispatch_lock()).
bool locked;
+ // A mp_dispatch_lock() call is requesting an exclusive lock.
+ size_t lock_requests;
+ // locked==true is due to a mp_dispatch_lock() call (for debugging).
+ bool locked_explicit;
+ pthread_t locked_explicit_thread;
};
struct mp_dispatch_item {
@@ -68,9 +60,9 @@ static void queue_dtor(void *p)
{
struct mp_dispatch_queue *queue = p;
assert(!queue->head);
- assert(!queue->idling);
- assert(!queue->lock_request);
- assert(!queue->frame);
+ assert(!queue->in_process);
+ assert(!queue->lock_requests);
+ assert(!queue->locked);
pthread_cond_destroy(&queue->cond);
pthread_mutex_destroy(&queue->lock);
}
@@ -252,35 +244,25 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
// - all queue items were processed,
// - the possibly acquired lock has been released
// It's possible to cancel the timeout by calling mp_dispatch_interrupt().
+// Reentrant calls are not allowed. There can be only 1 thread calling
+// mp_dispatch_queue_process() at a time. In addition, mp_dispatch_lock() can
+// not be called from a thread that is calling mp_dispatch_queue_process() (i.e.
+// no enqueued callback can call the lock/unlock functions).
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
{
int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
- struct lock_frame frame = {
- .thread = pthread_self(),
- };
pthread_mutex_lock(&queue->lock);
- frame.prev = queue->frame;
- queue->frame = &frame;
- // Logically, the queue is idling if the target thread is blocked in
- // mp_dispatch_queue_process() doing nothing, so it's not possible to call
- // it again. (Reentrant calls via callbacks temporarily reset the field.)
- assert(!queue->idling);
- queue->idling = true;
+ assert(!queue->in_process); // recursion not allowed
+ queue->in_process = true;
+ queue->in_process_thread = pthread_self();
// Wake up thread which called mp_dispatch_lock().
- if (queue->lock_request)
+ if (queue->lock_requests)
pthread_cond_broadcast(&queue->cond);
while (1) {
- if (queue->lock_request || queue->frame != &frame || frame.locked) {
- // Block due to something having called mp_dispatch_lock(). This
- // is either a lock "acquire" (lock_request=true), or a lock in
- // progress, with the possibility the thread which called
- // mp_dispatch_lock() is now calling mp_dispatch_queue_process()
- // (the latter means we must ignore any queue state changes,
- // until it has been unlocked again).
+ if (queue->lock_requests) {
+ // Block due to something having called mp_dispatch_lock().
pthread_cond_wait(&queue->cond, &queue->lock);
- if (queue->frame == &frame && !frame.locked)
- assert(queue->idling);
} else if (queue->head) {
struct mp_dispatch_item *item = queue->head;
queue->head = item->next;
@@ -290,15 +272,15 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
// Unlock, because we want to allow other threads to queue items
// while the dispatch item is processed.
// At the same time, we must prevent other threads from returning
- // from mp_dispatch_lock(), which is done by idling=false.
- queue->idling = false;
+ // from mp_dispatch_lock(), which is done by locked=true.
+ queue->locked = true;
pthread_mutex_unlock(&queue->lock);
item->fn(item->fn_data);
pthread_mutex_lock(&queue->lock);
- assert(!queue->idling);
- queue->idling = true;
+ assert(queue->locked);
+ queue->locked = false;
// Wakeup mp_dispatch_run(), also mp_dispatch_lock().
pthread_cond_broadcast(&queue->cond);
if (item->asynchronous) {
@@ -314,10 +296,8 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
break;
}
}
- queue->idling = false;
- assert(!frame.locked);
- assert(queue->frame == &frame);
- queue->frame = frame.prev;
+ assert(!queue->locked);
+ queue->in_process = false;
queue->interrupted = false;
pthread_mutex_unlock(&queue->lock);
}
@@ -340,35 +320,41 @@ void mp_dispatch_interrupt(struct mp_dispatch_queue *queue)
// no other thread can return from mp_dispatch_lock() (i.e. it behaves like
// a pthread mutex), and no other thread can get dispatch items completed.
// Other threads can still queue asynchronous dispatch items without waiting,
-// and the mutex behavior applies to this function only.
+// and the mutex behavior applies to this function and dispatch callbacks only.
+// The lock is non-recursive, and dispatch callback functions can be thought of
+// already holding the dispatch lock.
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
pthread_mutex_lock(&queue->lock);
- // First grab the queue lock. Something else could be holding the lock.
- while (queue->lock_request)
- pthread_cond_wait(&queue->cond, &queue->lock);
- queue->lock_request = true;
+ // Must not be called recursively from dispatched callbacks.
+ if (queue->in_process)
+ assert(!pthread_equal(queue->in_process_thread, pthread_self()));
+ // Must not be called recursively at all.
+ if (queue->locked_explicit)
+ assert(!pthread_equal(queue->locked_explicit_thread, pthread_self()));
+ queue->lock_requests += 1;
// And now wait until the target thread gets "trapped" within the
// mp_dispatch_queue_process() call, which will mean we get exclusive
// access to the target's thread state.
- while (!queue->idling) {
+ while (!queue->in_process) {
pthread_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
pthread_mutex_lock(&queue->lock);
- if (queue->idling)
+ if (queue->in_process)
break;
pthread_cond_wait(&queue->cond, &queue->lock);
}
- assert(queue->lock_request);
- assert(queue->frame); // must be set if idling
- assert(!queue->frame->locked); // no recursive locking on the same level
+ // Wait until we can get the lock.
+ while (!queue->in_process || queue->locked)
+ pthread_cond_wait(&queue->cond, &queue->lock);
// "Lock".
- queue->frame->locked = true;
- queue->frame->locked_thread = pthread_self();
- // Reset state for recursive mp_dispatch_queue_process() calls.
- queue->lock_request = false;
- queue->idling = false;
+ assert(queue->lock_requests);
+ assert(!queue->locked);
+ assert(!queue->locked_explicit);
+ queue->locked = true;
+ queue->locked_explicit = true;
+ queue->locked_explicit_thread = pthread_self();
pthread_mutex_unlock(&queue->lock);
}
@@ -376,17 +362,16 @@ void mp_dispatch_lock(struct mp_dispatch_queue *queue)
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
pthread_mutex_lock(&queue->lock);
- // Must be called atfer a mp_dispatch_lock().
- assert(queue->frame);
- assert(queue->frame->locked);
- assert(pthread_equal(queue->frame->locked_thread, pthread_self()));
+ assert(queue->locked);
+ // Must be called after a mp_dispatch_lock(), from the same thread.
+ assert(queue->locked_explicit);
+ assert(pthread_equal(queue->locked_explicit_thread, pthread_self()));
// "Unlock".
- queue->frame->locked = false;
- // This must have been set to false during locking (except temporarily
- // during recursive mp_dispatch_queue_process() calls).
- assert(!queue->idling);
- queue->idling = true;
+ queue->locked = false;
+ queue->locked_explicit = false;
+ queue->lock_requests -= 1;
// Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
+ // (Would be nice to wake up only 1 other locker if lock_requests>0.)
pthread_cond_broadcast(&queue->cond);
pthread_mutex_unlock(&queue->lock);
}