/* FUSE: Filesystem in Userspace Copyright (C) 2001-2006 Miklos Szeredi This program can be distributed under the terms of the GNU LGPL. See the file COPYING.LIB. */ #include "fuse_lowlevel.h" #include #include #include #include #include #include #include #include #define FUSE_MAX_WORKERS 10 struct fuse_worker { pthread_mutex_t lock; int numworker; int numavail; struct fuse_session *se; struct fuse_chan *ch; struct fuse_chan *prevch; pthread_t threads[FUSE_MAX_WORKERS]; pthread_t main_thread; int exit; int error; }; #ifndef USE_UCLIBC #define mutex_init(mut) pthread_mutex_init(mut, NULL) #else static void mutex_init(pthread_mutex_t *mut) { pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP); pthread_mutex_init(mut, &attr); pthread_mutexattr_destroy(&attr); } #endif static int fuse_loop_mt_send(struct fuse_chan *ch, const struct iovec iov[], size_t count) { struct fuse_worker *w = (struct fuse_worker *) fuse_chan_data(ch); pthread_mutex_lock(&w->lock); w->numavail ++; pthread_mutex_unlock(&w->lock); return fuse_chan_send(w->prevch, iov, count); } static int start_thread(struct fuse_worker *w, pthread_t *thread_id); static void *do_work(void *data) { struct fuse_worker *w = (struct fuse_worker *) data; size_t bufsize = fuse_chan_bufsize(w->prevch); char *buf = (char *) malloc(bufsize); if (!buf) { fprintf(stderr, "fuse: failed to allocate read buffer\n"); fuse_session_exit(w->se); w->error = -1; return NULL; } pthread_cleanup_push(free, buf); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); while (!fuse_session_exited(w->se)) { int res = fuse_chan_recv(w->prevch, buf, bufsize); if (res == -EINTR) continue; if (res <= 0) { if (res < 0) { fuse_session_exit(w->se); w->error = -1; } break; } pthread_mutex_lock(&w->lock); if (w->exit) { pthread_mutex_unlock(&w->lock); break; } w->numavail--; if (w->numavail == 0 && w->numworker < FUSE_MAX_WORKERS) { if (w->numworker < FUSE_MAX_WORKERS) { /* FIXME: threads should be stored in a list instead of an array */ int start_res; pthread_t *thread_id = &w->threads[w->numworker]; w->numavail ++; w->numworker ++; start_res = start_thread(w, thread_id); if (start_res == -1) w->numavail --; } } pthread_mutex_unlock(&w->lock); fuse_session_process(w->se, buf, res, w->ch); } pthread_cleanup_pop(1); if (pthread_self() != w->main_thread) { pthread_kill(w->main_thread, SIGTERM); pause(); } return NULL; } static int start_thread(struct fuse_worker *w, pthread_t *thread_id) { sigset_t oldset; sigset_t newset; int res; /* Disallow signal reception in worker threads */ sigemptyset(&newset); sigaddset(&newset, SIGTERM); sigaddset(&newset, SIGINT); sigaddset(&newset, SIGHUP); sigaddset(&newset, SIGQUIT); pthread_sigmask(SIG_BLOCK, &newset, &oldset); res = pthread_create(thread_id, NULL, do_work, w); pthread_sigmask(SIG_SETMASK, &oldset, NULL); if (res != 0) { fprintf(stderr, "fuse: error creating thread: %s\n", strerror(res)); return -1; } return 0; } int fuse_session_loop_mt(struct fuse_session *se) { int i; int err; struct fuse_worker *w; struct fuse_chan_ops cop = { .send = fuse_loop_mt_send, }; w = (struct fuse_worker *) malloc(sizeof(struct fuse_worker)); if (w == NULL) { fprintf(stderr, "fuse: failed to allocate worker structure\n"); return -1; } memset(w, 0, sizeof(struct fuse_worker)); w->se = se; w->prevch = fuse_session_next_chan(se, NULL); w->ch = fuse_chan_new(&cop, -1, fuse_chan_bufsize(w->prevch), w); if (w->ch == NULL) { free(w); return -1; } w->error = 0; w->numworker = 1; w->numavail = 1; w->main_thread = pthread_self(); mutex_init(&w->lock); do_work(w); pthread_mutex_lock(&w->lock); for (i = 1; i < w->numworker; i++) pthread_cancel(w->threads[i]); w->exit = 1; pthread_mutex_unlock(&w->lock); for (i = 1; i < w->numworker; i++) pthread_join(w->threads[i], NULL); pthread_mutex_destroy(&w->lock); err = w->error; fuse_chan_destroy(w->ch); free(w); fuse_session_reset(se); return err; }