diff options
author | Miklos Szeredi <miklos@szeredi.hu> | 2006-09-03 18:28:52 +0000 |
---|---|---|
committer | Miklos Szeredi <miklos@szeredi.hu> | 2006-09-03 18:28:52 +0000 |
commit | 38f152c72decfc8a995c8a9fa7f932f348d0e1e2 (patch) | |
tree | 79a26409e8599e29341c0ec68662a754717efaad /lib/fuse_loop_mt.c | |
parent | 16dbf945f09eb6c84c51b59061dd62c76d3ececd (diff) |
fix
Diffstat (limited to 'lib/fuse_loop_mt.c')
-rw-r--r-- | lib/fuse_loop_mt.c | 232 |
1 files changed, 116 insertions, 116 deletions
diff --git a/lib/fuse_loop_mt.c b/lib/fuse_loop_mt.c index 8327f12..47fca46 100644 --- a/lib/fuse_loop_mt.c +++ b/lib/fuse_loop_mt.c @@ -6,145 +6,131 @@ See the file COPYING.LIB. */ -#include "config.h" #include "fuse_lowlevel.h" +#include "fuse_misc.h" #include <stdio.h> #include <stdlib.h> #include <string.h> -#include <pthread.h> #include <unistd.h> #include <signal.h> #include <errno.h> #include <sys/time.h> -#define FUSE_MAX_WORKERS 10 - struct fuse_worker { + struct fuse_worker *prev; + struct fuse_worker *next; + pthread_t thread_id; + size_t bufsize; + char *buf; + struct fuse_mt *mt; +}; + +struct fuse_mt { pthread_mutex_t lock; int numworker; int numavail; struct fuse_session *se; struct fuse_chan *prevch; - pthread_t threads[FUSE_MAX_WORKERS]; - pthread_t main_thread; + struct fuse_worker main; int exit; int error; }; -struct fuse_wchan { - struct fuse_worker *w; - struct fuse_chan *prevch; -}; - -#ifndef USE_UCLIBC -#define mutex_init(mut) pthread_mutex_init(mut, NULL) -#else -static void mutex_init(pthread_mutex_t *mut) +static void list_add_worker(struct fuse_worker *w, struct fuse_worker *next) { - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP); - pthread_mutex_init(mut, &attr); - pthread_mutexattr_destroy(&attr); + struct fuse_worker *prev = next->prev; + w->next = next; + w->prev = prev; + prev->next = w; + next->prev = w; } -#endif -static int fuse_loop_mt_send(struct fuse_chan *ch, const struct iovec iov[], - size_t count) +static void list_del_worker(struct fuse_worker *w) { - int res; - struct fuse_wchan *wchan_data = (struct fuse_wchan *) fuse_chan_data(ch); - pthread_mutex_lock(&wchan_data->w->lock); - wchan_data->w->numavail ++; - pthread_mutex_unlock(&wchan_data->w->lock); - res = fuse_chan_send(wchan_data->prevch, iov, count); - fuse_chan_destroy(ch); - free(wchan_data); - return res; + struct fuse_worker *prev = w->prev; + struct fuse_worker *next = w->next; + prev->next = next; + next->prev = prev; } -static int start_thread(struct fuse_worker *w, pthread_t *thread_id); +static int fuse_start_thread(struct fuse_mt *mt); -static void *do_work(void *data) +static void *fuse_do_work(void *data) { struct fuse_worker *w = (struct fuse_worker *) data; - size_t bufsize = fuse_chan_bufsize(w->prevch); - char *buf = (char *) malloc(bufsize); - if (!buf) { - fprintf(stderr, "fuse: failed to allocate read buffer\n"); - fuse_session_exit(w->se); - w->error = -1; - return NULL; - } + struct fuse_mt *mt = w->mt; - pthread_cleanup_push(free, buf); - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); - - while (!fuse_session_exited(w->se)) { - struct fuse_chan *ch = w->prevch; - struct fuse_chan *wchan; - struct fuse_wchan *wchan_data; - struct fuse_chan_ops cop = { .send = fuse_loop_mt_send }; - int res = fuse_chan_recv(&ch, buf, bufsize); + while (!fuse_session_exited(mt->se)) { + struct fuse_chan *ch = mt->prevch; + int res = fuse_chan_recv(&ch, w->buf, w->bufsize); if (res == -EINTR) continue; if (res <= 0) { if (res < 0) { - fuse_session_exit(w->se); - w->error = -1; + fuse_session_exit(mt->se); + mt->error = -1; } break; } - pthread_mutex_lock(&w->lock); - if (w->exit) { - pthread_mutex_unlock(&w->lock); - break; + pthread_mutex_lock(&mt->lock); + if (mt->exit) { + pthread_mutex_unlock(&mt->lock); + return NULL; } - w->numavail--; - if (w->numavail == 0 && w->numworker < FUSE_MAX_WORKERS) { - if (w->numworker < FUSE_MAX_WORKERS) { - /* FIXME: threads should be stored in a list instead - of an array */ - int start_res; - pthread_t *thread_id = &w->threads[w->numworker]; - w->numavail ++; - w->numworker ++; - start_res = start_thread(w, thread_id); - if (start_res == -1) - w->numavail --; + mt->numavail--; + if (mt->numavail == 0) + fuse_start_thread(mt); + pthread_mutex_unlock(&mt->lock); + + fuse_session_process(mt->se, w->buf, res, ch); + + pthread_mutex_lock(&mt->lock); + mt->numavail ++; + if (mt->numavail > 10) { + if (mt->exit) { + pthread_mutex_unlock(&mt->lock); + return NULL; } + list_del_worker(w); + mt->numavail--; + mt->numworker--; + pthread_mutex_unlock(&mt->lock); + + pthread_detach(w->thread_id); + free(w->buf); + free(w); + return NULL; } - pthread_mutex_unlock(&w->lock); - wchan_data = malloc(sizeof(struct fuse_wchan)); - wchan = fuse_chan_new(&cop, -1, fuse_chan_bufsize(ch), wchan_data); - if (!wchan_data || !wchan) { - free(wchan_data); - fuse_session_exit(w->se); - w->error = -1; - break; - } - wchan_data->w = w; - wchan_data->prevch = ch; - fuse_session_process(w->se, buf, res, wchan); + pthread_mutex_unlock(&mt->lock); } - pthread_cleanup_pop(1); - if (pthread_self() != w->main_thread) { - pthread_kill(w->main_thread, SIGTERM); - pause(); - } + pthread_kill(mt->main.thread_id, SIGHUP); + pause(); return NULL; } -static int start_thread(struct fuse_worker *w, pthread_t *thread_id) +static int fuse_start_thread(struct fuse_mt *mt) { sigset_t oldset; sigset_t newset; int res; + struct fuse_worker *w = malloc(sizeof(struct fuse_worker)); + if (!w) { + fprintf(stderr, "fuse: failed to allocate worker structure\n"); + return -1; + } + memset(w, 0, sizeof(struct fuse_worker)); + w->bufsize = fuse_chan_bufsize(mt->prevch); + w->buf = malloc(w->bufsize); + w->mt = mt; + if (!w->buf) { + fprintf(stderr, "fuse: failed to allocate read buffer\n"); + free(w); + return -1; + } /* Disallow signal reception in worker threads */ sigemptyset(&newset); @@ -153,47 +139,61 @@ static int start_thread(struct fuse_worker *w, pthread_t *thread_id) sigaddset(&newset, SIGHUP); sigaddset(&newset, SIGQUIT); pthread_sigmask(SIG_BLOCK, &newset, &oldset); - res = pthread_create(thread_id, NULL, do_work, w); + res = pthread_create(&w->thread_id, NULL, fuse_do_work, w); pthread_sigmask(SIG_SETMASK, &oldset, NULL); if (res != 0) { fprintf(stderr, "fuse: error creating thread: %s\n", strerror(res)); return -1; } + list_add_worker(w, &mt->main); + mt->numavail ++; + mt->numworker ++; return 0; } +static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w) +{ + pthread_join(w->thread_id, NULL); + pthread_mutex_lock(&mt->lock); + list_del_worker(w); + pthread_mutex_unlock(&mt->lock); + free(w->buf); + free(w); +} + int fuse_session_loop_mt(struct fuse_session *se) { - int i; int err; + struct fuse_mt mt; struct fuse_worker *w; - w = (struct fuse_worker *) malloc(sizeof(struct fuse_worker)); - if (w == NULL) { - fprintf(stderr, "fuse: failed to allocate worker structure\n"); - return -1; - } - memset(w, 0, sizeof(struct fuse_worker)); - w->se = se; - w->prevch = fuse_session_next_chan(se, NULL); - w->error = 0; - w->numworker = 1; - w->numavail = 1; - w->main_thread = pthread_self(); - mutex_init(&w->lock); - - do_work(w); - - pthread_mutex_lock(&w->lock); - for (i = 1; i < w->numworker; i++) - pthread_cancel(w->threads[i]); - w->exit = 1; - pthread_mutex_unlock(&w->lock); - for (i = 1; i < w->numworker; i++) - pthread_join(w->threads[i], NULL); - pthread_mutex_destroy(&w->lock); - err = w->error; - free(w); + + memset(&mt, 0, sizeof(struct fuse_mt)); + mt.se = se; + mt.prevch = fuse_session_next_chan(se, NULL); + mt.error = 0; + mt.numworker = 0; + mt.numavail = 0; + mt.main.thread_id = pthread_self(); + mt.main.prev = mt.main.next = &mt.main; + fuse_mutex_init(&mt.lock); + + pthread_mutex_lock(&mt.lock); + fuse_start_thread(&mt); + pthread_mutex_unlock(&mt.lock); + while (!fuse_session_exited(se)) + pause(); + + for (w = mt.main.next; w != &mt.main; w = w->next) + pthread_cancel(w->thread_id); + mt.exit = 1; + pthread_mutex_unlock(&mt.lock); + + while (mt.main.next != &mt.main) + fuse_join_worker(&mt, mt.main.next); + + pthread_mutex_destroy(&mt.lock); + err = mt.error; fuse_session_reset(se); return err; } |