aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar wm4 <wm4@nowhere>2018-04-15 11:09:53 +0200
committerGravatar Jan Ekström <jeebjp@gmail.com>2018-04-18 01:17:41 +0300
commit435bc003c0af7c51aa994f26ac0e80ad3e310b75 (patch)
tree5539148a05eb210ee2d5f2ba62a71a8dd7a9f2d8
parent10584159df2c27998b2178ec32e707f2e167ed12 (diff)
dispatch: simplify, disallow recursive invocation
Recursive invocation was needed up until the previous commit. Drop this feature, and simplify the code. It's more logical, and easier to detect miuses of the API. This partially reverts commit 3878a59e. The original reason for it was removed.
-rw-r--r--misc/dispatch.c135
1 files changed, 60 insertions, 75 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c
index e625f196dc..940cbb21d7 100644
--- a/misc/dispatch.c
+++ b/misc/dispatch.c
@@ -32,27 +32,19 @@ struct mp_dispatch_queue {
void *wakeup_ctx;
// Make mp_dispatch_queue_process() exit if it's idle.
bool interrupted;
- // The target thread is blocked by mp_dispatch_queue_process(). Note that
- // mp_dispatch_lock() can set this from true to false to keep the thread
- // blocked (this stops if from processing other dispatch items, and from
- // other threads to return from mp_dispatch_lock(), making it an exclusive
- // lock).
- bool idling;
- // A mp_dispatch_lock() call is requesting an exclusive lock.
- bool lock_request;
- // Used to block out threads calling mp_dispatch_queue_process() while
- // they're externall locked via mp_dispatch_lock().
- // We could use a simple counter (increment it instead of adding a frame,
- // also increment it when locking), but with this we can perform some
- // minimal debug checks.
- struct lock_frame *frame;
-};
-
-struct lock_frame {
- struct lock_frame *prev;
- pthread_t thread;
- pthread_t locked_thread;
+ // The target thread is in mp_dispatch_queue_process() (and either idling,
+ // locked, or running a dispatch callback).
+ bool in_process;
+ pthread_t in_process_thread;
+ // The target thread is in mp_dispatch_queue_process(), and currently
+ // something has exclusive access to it (e.g. running a dispatch callback,
+ // or a different thread got it with mp_dispatch_lock()).
bool locked;
+ // A mp_dispatch_lock() call is requesting an exclusive lock.
+ size_t lock_requests;
+ // locked==true is due to a mp_dispatch_lock() call (for debugging).
+ bool locked_explicit;
+ pthread_t locked_explicit_thread;
};
struct mp_dispatch_item {
@@ -68,9 +60,9 @@ static void queue_dtor(void *p)
{
struct mp_dispatch_queue *queue = p;
assert(!queue->head);
- assert(!queue->idling);
- assert(!queue->lock_request);
- assert(!queue->frame);
+ assert(!queue->in_process);
+ assert(!queue->lock_requests);
+ assert(!queue->locked);
pthread_cond_destroy(&queue->cond);
pthread_mutex_destroy(&queue->lock);
}
@@ -252,35 +244,25 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
// - all queue items were processed,
// - the possibly acquired lock has been released
// It's possible to cancel the timeout by calling mp_dispatch_interrupt().
+// Reentrant calls are not allowed. There can be only 1 thread calling
+// mp_dispatch_queue_process() at a time. In addition, mp_dispatch_lock() can
+// not be called from a thread that is calling mp_dispatch_queue_process() (i.e.
+// no enqueued callback can call the lock/unlock functions).
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
{
int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
- struct lock_frame frame = {
- .thread = pthread_self(),
- };
pthread_mutex_lock(&queue->lock);
- frame.prev = queue->frame;
- queue->frame = &frame;
- // Logically, the queue is idling if the target thread is blocked in
- // mp_dispatch_queue_process() doing nothing, so it's not possible to call
- // it again. (Reentrant calls via callbacks temporarily reset the field.)
- assert(!queue->idling);
- queue->idling = true;
+ assert(!queue->in_process); // recursion not allowed
+ queue->in_process = true;
+ queue->in_process_thread = pthread_self();
// Wake up thread which called mp_dispatch_lock().
- if (queue->lock_request)
+ if (queue->lock_requests)
pthread_cond_broadcast(&queue->cond);
while (1) {
- if (queue->lock_request || queue->frame != &frame || frame.locked) {
- // Block due to something having called mp_dispatch_lock(). This
- // is either a lock "acquire" (lock_request=true), or a lock in
- // progress, with the possibility the thread which called
- // mp_dispatch_lock() is now calling mp_dispatch_queue_process()
- // (the latter means we must ignore any queue state changes,
- // until it has been unlocked again).
+ if (queue->lock_requests) {
+ // Block due to something having called mp_dispatch_lock().
pthread_cond_wait(&queue->cond, &queue->lock);
- if (queue->frame == &frame && !frame.locked)
- assert(queue->idling);
} else if (queue->head) {
struct mp_dispatch_item *item = queue->head;
queue->head = item->next;
@@ -290,15 +272,15 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
// Unlock, because we want to allow other threads to queue items
// while the dispatch item is processed.
// At the same time, we must prevent other threads from returning
- // from mp_dispatch_lock(), which is done by idling=false.
- queue->idling = false;
+ // from mp_dispatch_lock(), which is done by locked=true.
+ queue->locked = true;
pthread_mutex_unlock(&queue->lock);
item->fn(item->fn_data);
pthread_mutex_lock(&queue->lock);
- assert(!queue->idling);
- queue->idling = true;
+ assert(queue->locked);
+ queue->locked = false;
// Wakeup mp_dispatch_run(), also mp_dispatch_lock().
pthread_cond_broadcast(&queue->cond);
if (item->asynchronous) {
@@ -314,10 +296,8 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
break;
}
}
- queue->idling = false;
- assert(!frame.locked);
- assert(queue->frame == &frame);
- queue->frame = frame.prev;
+ assert(!queue->locked);
+ queue->in_process = false;
queue->interrupted = false;
pthread_mutex_unlock(&queue->lock);
}
@@ -340,35 +320,41 @@ void mp_dispatch_interrupt(struct mp_dispatch_queue *queue)
// no other thread can return from mp_dispatch_lock() (i.e. it behaves like
// a pthread mutex), and no other thread can get dispatch items completed.
// Other threads can still queue asynchronous dispatch items without waiting,
-// and the mutex behavior applies to this function only.
+// and the mutex behavior applies to this function and dispatch callbacks only.
+// The lock is non-recursive, and dispatch callback functions can be thought of
+// already holding the dispatch lock.
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
pthread_mutex_lock(&queue->lock);
- // First grab the queue lock. Something else could be holding the lock.
- while (queue->lock_request)
- pthread_cond_wait(&queue->cond, &queue->lock);
- queue->lock_request = true;
+ // Must not be called recursively from dispatched callbacks.
+ if (queue->in_process)
+ assert(!pthread_equal(queue->in_process_thread, pthread_self()));
+ // Must not be called recursively at all.
+ if (queue->locked_explicit)
+ assert(!pthread_equal(queue->locked_explicit_thread, pthread_self()));
+ queue->lock_requests += 1;
// And now wait until the target thread gets "trapped" within the
// mp_dispatch_queue_process() call, which will mean we get exclusive
// access to the target's thread state.
- while (!queue->idling) {
+ while (!queue->in_process) {
pthread_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
pthread_mutex_lock(&queue->lock);
- if (queue->idling)
+ if (queue->in_process)
break;
pthread_cond_wait(&queue->cond, &queue->lock);
}
- assert(queue->lock_request);
- assert(queue->frame); // must be set if idling
- assert(!queue->frame->locked); // no recursive locking on the same level
+ // Wait until we can get the lock.
+ while (!queue->in_process || queue->locked)
+ pthread_cond_wait(&queue->cond, &queue->lock);
// "Lock".
- queue->frame->locked = true;
- queue->frame->locked_thread = pthread_self();
- // Reset state for recursive mp_dispatch_queue_process() calls.
- queue->lock_request = false;
- queue->idling = false;
+ assert(queue->lock_requests);
+ assert(!queue->locked);
+ assert(!queue->locked_explicit);
+ queue->locked = true;
+ queue->locked_explicit = true;
+ queue->locked_explicit_thread = pthread_self();
pthread_mutex_unlock(&queue->lock);
}
@@ -376,17 +362,16 @@ void mp_dispatch_lock(struct mp_dispatch_queue *queue)
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
pthread_mutex_lock(&queue->lock);
- // Must be called atfer a mp_dispatch_lock().
- assert(queue->frame);
- assert(queue->frame->locked);
- assert(pthread_equal(queue->frame->locked_thread, pthread_self()));
+ assert(queue->locked);
+ // Must be called after a mp_dispatch_lock(), from the same thread.
+ assert(queue->locked_explicit);
+ assert(pthread_equal(queue->locked_explicit_thread, pthread_self()));
// "Unlock".
- queue->frame->locked = false;
- // This must have been set to false during locking (except temporarily
- // during recursive mp_dispatch_queue_process() calls).
- assert(!queue->idling);
- queue->idling = true;
+ queue->locked = false;
+ queue->locked_explicit = false;
+ queue->lock_requests -= 1;
// Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
+ // (Would be nice to wake up only 1 other locker if lock_requests>0.)
pthread_cond_broadcast(&queue->cond);
pthread_mutex_unlock(&queue->lock);
}