From ec3dd7164ceea9ceda71e05e774501874fd20924 Mon Sep 17 00:00:00 2001 From: wm4 Date: Sat, 1 Apr 2017 20:32:01 +0200 Subject: misc: add a thread pool To be used by the following commits. --- misc/thread_pool.c | 125 +++++++++++++++++++++++++++++++++++++++++++++++++++++ misc/thread_pool.h | 10 +++++ 2 files changed, 135 insertions(+) create mode 100644 misc/thread_pool.c create mode 100644 misc/thread_pool.h (limited to 'misc') diff --git a/misc/thread_pool.c b/misc/thread_pool.c new file mode 100644 index 0000000000..dddfad6734 --- /dev/null +++ b/misc/thread_pool.c @@ -0,0 +1,125 @@ +/* + * This file is part of mpv. + * + * mpv is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * mpv is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with mpv. If not, see . + */ + +#include + +#include "common/common.h" + +#include "thread_pool.h" + +struct work { + void (*fn)(void *ctx); + void *fn_ctx; +}; + +struct mp_thread_pool { + pthread_t *threads; + int num_threads; + + pthread_mutex_t lock; + pthread_cond_t wakeup; + + // --- the following fields are protected by lock + bool terminate; + struct work *work; + int num_work; +}; + +static void *worker_thread(void *arg) +{ + struct mp_thread_pool *pool = arg; + + pthread_mutex_lock(&pool->lock); + while (1) { + while (!pool->num_work && !pool->terminate) + pthread_cond_wait(&pool->wakeup, &pool->lock); + + if (!pool->num_work && pool->terminate) + break; + + assert(pool->num_work > 0); + struct work work = pool->work[pool->num_work - 1]; + pool->num_work -= 1; + + pthread_mutex_unlock(&pool->lock); + work.fn(work.fn_ctx); + pthread_mutex_lock(&pool->lock); + } + assert(pool->num_work == 0); + pthread_mutex_unlock(&pool->lock); + + return NULL; +} + +static void thread_pool_dtor(void *ctx) +{ + struct mp_thread_pool *pool = ctx; + + pthread_mutex_lock(&pool->lock); + pool->terminate = true; + pthread_cond_broadcast(&pool->wakeup); + pthread_mutex_unlock(&pool->lock); + + for (int n = 0; n < pool->num_threads; n++) + pthread_join(pool->threads[n], NULL); + + assert(pool->num_work == 0); + pthread_cond_destroy(&pool->wakeup); + pthread_mutex_destroy(&pool->lock); +} + +// Create a thread pool with the given number of worker threads. This can return +// NULL if the worker threads could not be created. The thread pool can be +// destroyed with talloc_free(pool), or indirectly with talloc_free(ta_parent). +// If there are still work items on freeing, it will block until all work items +// are done, and the threads terminate. +struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads) +{ + assert(threads > 0); + + struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool); + talloc_set_destructor(pool, thread_pool_dtor); + + pthread_mutex_init(&pool->lock, NULL); + pthread_cond_init(&pool->wakeup, NULL); + + for (int n = 0; n < threads; n++) { + pthread_t thread; + if (pthread_create(&thread, NULL, worker_thread, pool)) { + talloc_free(pool); + return NULL; + } + MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread); + } + + return pool; +} + +// Queue a function to be run on a worker thread: fn(fn_ctx) +// If no worker thread is currently available, it's appended to a list in memory +// with unbounded size. This function always returns immediately. +// Concurrent queue calls are allowed, as long as it does not overlap with +// pool destruction. +void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx), + void *fn_ctx) +{ + pthread_mutex_lock(&pool->lock); + struct work work = {fn, fn_ctx}; + MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work); + pthread_cond_signal(&pool->wakeup); + pthread_mutex_unlock(&pool->lock); +} diff --git a/misc/thread_pool.h b/misc/thread_pool.h new file mode 100644 index 0000000000..c7af7b2b57 --- /dev/null +++ b/misc/thread_pool.h @@ -0,0 +1,10 @@ +#ifndef MPV_MP_THREAD_POOL_H +#define MPV_MP_THREAD_POOL_H + +struct mp_thread_pool; + +struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads); +void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx), + void *fn_ctx); + +#endif -- cgit v1.2.3