diff options
-rw-r--r-- | misc/dispatch.c | 135 |
1 files changed, 60 insertions, 75 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c index e625f196dc..940cbb21d7 100644 --- a/misc/dispatch.c +++ b/misc/dispatch.c @@ -32,27 +32,19 @@ struct mp_dispatch_queue { void *wakeup_ctx; // Make mp_dispatch_queue_process() exit if it's idle. bool interrupted; - // The target thread is blocked by mp_dispatch_queue_process(). Note that - // mp_dispatch_lock() can set this from true to false to keep the thread - // blocked (this stops if from processing other dispatch items, and from - // other threads to return from mp_dispatch_lock(), making it an exclusive - // lock). - bool idling; - // A mp_dispatch_lock() call is requesting an exclusive lock. - bool lock_request; - // Used to block out threads calling mp_dispatch_queue_process() while - // they're externall locked via mp_dispatch_lock(). - // We could use a simple counter (increment it instead of adding a frame, - // also increment it when locking), but with this we can perform some - // minimal debug checks. - struct lock_frame *frame; -}; - -struct lock_frame { - struct lock_frame *prev; - pthread_t thread; - pthread_t locked_thread; + // The target thread is in mp_dispatch_queue_process() (and either idling, + // locked, or running a dispatch callback). + bool in_process; + pthread_t in_process_thread; + // The target thread is in mp_dispatch_queue_process(), and currently + // something has exclusive access to it (e.g. running a dispatch callback, + // or a different thread got it with mp_dispatch_lock()). bool locked; + // A mp_dispatch_lock() call is requesting an exclusive lock. + size_t lock_requests; + // locked==true is due to a mp_dispatch_lock() call (for debugging). + bool locked_explicit; + pthread_t locked_explicit_thread; }; struct mp_dispatch_item { @@ -68,9 +60,9 @@ static void queue_dtor(void *p) { struct mp_dispatch_queue *queue = p; assert(!queue->head); - assert(!queue->idling); - assert(!queue->lock_request); - assert(!queue->frame); + assert(!queue->in_process); + assert(!queue->lock_requests); + assert(!queue->locked); pthread_cond_destroy(&queue->cond); pthread_mutex_destroy(&queue->lock); } @@ -252,35 +244,25 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue, // - all queue items were processed, // - the possibly acquired lock has been released // It's possible to cancel the timeout by calling mp_dispatch_interrupt(). +// Reentrant calls are not allowed. There can be only 1 thread calling +// mp_dispatch_queue_process() at a time. In addition, mp_dispatch_lock() can +// not be called from a thread that is calling mp_dispatch_queue_process() (i.e. +// no enqueued callback can call the lock/unlock functions). void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) { int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0; - struct lock_frame frame = { - .thread = pthread_self(), - }; pthread_mutex_lock(&queue->lock); - frame.prev = queue->frame; - queue->frame = &frame; - // Logically, the queue is idling if the target thread is blocked in - // mp_dispatch_queue_process() doing nothing, so it's not possible to call - // it again. (Reentrant calls via callbacks temporarily reset the field.) - assert(!queue->idling); - queue->idling = true; + assert(!queue->in_process); // recursion not allowed + queue->in_process = true; + queue->in_process_thread = pthread_self(); // Wake up thread which called mp_dispatch_lock(). - if (queue->lock_request) + if (queue->lock_requests) pthread_cond_broadcast(&queue->cond); while (1) { - if (queue->lock_request || queue->frame != &frame || frame.locked) { - // Block due to something having called mp_dispatch_lock(). This - // is either a lock "acquire" (lock_request=true), or a lock in - // progress, with the possibility the thread which called - // mp_dispatch_lock() is now calling mp_dispatch_queue_process() - // (the latter means we must ignore any queue state changes, - // until it has been unlocked again). + if (queue->lock_requests) { + // Block due to something having called mp_dispatch_lock(). pthread_cond_wait(&queue->cond, &queue->lock); - if (queue->frame == &frame && !frame.locked) - assert(queue->idling); } else if (queue->head) { struct mp_dispatch_item *item = queue->head; queue->head = item->next; @@ -290,15 +272,15 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) // Unlock, because we want to allow other threads to queue items // while the dispatch item is processed. // At the same time, we must prevent other threads from returning - // from mp_dispatch_lock(), which is done by idling=false. - queue->idling = false; + // from mp_dispatch_lock(), which is done by locked=true. + queue->locked = true; pthread_mutex_unlock(&queue->lock); item->fn(item->fn_data); pthread_mutex_lock(&queue->lock); - assert(!queue->idling); - queue->idling = true; + assert(queue->locked); + queue->locked = false; // Wakeup mp_dispatch_run(), also mp_dispatch_lock(). pthread_cond_broadcast(&queue->cond); if (item->asynchronous) { @@ -314,10 +296,8 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) break; } } - queue->idling = false; - assert(!frame.locked); - assert(queue->frame == &frame); - queue->frame = frame.prev; + assert(!queue->locked); + queue->in_process = false; queue->interrupted = false; pthread_mutex_unlock(&queue->lock); } @@ -340,35 +320,41 @@ void mp_dispatch_interrupt(struct mp_dispatch_queue *queue) // no other thread can return from mp_dispatch_lock() (i.e. it behaves like // a pthread mutex), and no other thread can get dispatch items completed. // Other threads can still queue asynchronous dispatch items without waiting, -// and the mutex behavior applies to this function only. +// and the mutex behavior applies to this function and dispatch callbacks only. +// The lock is non-recursive, and dispatch callback functions can be thought of +// already holding the dispatch lock. void mp_dispatch_lock(struct mp_dispatch_queue *queue) { pthread_mutex_lock(&queue->lock); - // First grab the queue lock. Something else could be holding the lock. - while (queue->lock_request) - pthread_cond_wait(&queue->cond, &queue->lock); - queue->lock_request = true; + // Must not be called recursively from dispatched callbacks. + if (queue->in_process) + assert(!pthread_equal(queue->in_process_thread, pthread_self())); + // Must not be called recursively at all. + if (queue->locked_explicit) + assert(!pthread_equal(queue->locked_explicit_thread, pthread_self())); + queue->lock_requests += 1; // And now wait until the target thread gets "trapped" within the // mp_dispatch_queue_process() call, which will mean we get exclusive // access to the target's thread state. - while (!queue->idling) { + while (!queue->in_process) { pthread_mutex_unlock(&queue->lock); if (queue->wakeup_fn) queue->wakeup_fn(queue->wakeup_ctx); pthread_mutex_lock(&queue->lock); - if (queue->idling) + if (queue->in_process) break; pthread_cond_wait(&queue->cond, &queue->lock); } - assert(queue->lock_request); - assert(queue->frame); // must be set if idling - assert(!queue->frame->locked); // no recursive locking on the same level + // Wait until we can get the lock. + while (!queue->in_process || queue->locked) + pthread_cond_wait(&queue->cond, &queue->lock); // "Lock". - queue->frame->locked = true; - queue->frame->locked_thread = pthread_self(); - // Reset state for recursive mp_dispatch_queue_process() calls. - queue->lock_request = false; - queue->idling = false; + assert(queue->lock_requests); + assert(!queue->locked); + assert(!queue->locked_explicit); + queue->locked = true; + queue->locked_explicit = true; + queue->locked_explicit_thread = pthread_self(); pthread_mutex_unlock(&queue->lock); } @@ -376,17 +362,16 @@ void mp_dispatch_lock(struct mp_dispatch_queue *queue) void mp_dispatch_unlock(struct mp_dispatch_queue *queue) { pthread_mutex_lock(&queue->lock); - // Must be called atfer a mp_dispatch_lock(). - assert(queue->frame); - assert(queue->frame->locked); - assert(pthread_equal(queue->frame->locked_thread, pthread_self())); + assert(queue->locked); + // Must be called after a mp_dispatch_lock(), from the same thread. + assert(queue->locked_explicit); + assert(pthread_equal(queue->locked_explicit_thread, pthread_self())); // "Unlock". - queue->frame->locked = false; - // This must have been set to false during locking (except temporarily - // during recursive mp_dispatch_queue_process() calls). - assert(!queue->idling); - queue->idling = true; + queue->locked = false; + queue->locked_explicit = false; + queue->lock_requests -= 1; // Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s. + // (Would be nice to wake up only 1 other locker if lock_requests>0.) pthread_cond_broadcast(&queue->cond); pthread_mutex_unlock(&queue->lock); } |