aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Miklos Szeredi <miklos@szeredi.hu>2001-11-16 10:12:59 +0000
committerGravatar Miklos Szeredi <miklos@szeredi.hu>2001-11-16 10:12:59 +0000
commitfff56ab1242e3ad7cddf15e7e981da55d06c4da5 (patch)
treebaca68469ea73b679ac2e74aba52e7312e0ea7a8
parent39f28679ed1c313bbeea85d370d95f62551bb21b (diff)
better thread management
-rw-r--r--example/fusexmp.c12
-rw-r--r--include/fuse.h28
-rw-r--r--kernel/dev.c4
-rw-r--r--lib/Makefile.am1
-rw-r--r--lib/fuse.c89
-rw-r--r--lib/fuse_i.h6
-rw-r--r--lib/fuse_mt.c111
-rw-r--r--python/_fusemodule.c61
8 files changed, 229 insertions, 83 deletions
diff --git a/example/fusexmp.c b/example/fusexmp.c
index 2ed35dc..aeb4f88 100644
--- a/example/fusexmp.c
+++ b/example/fusexmp.c
@@ -290,6 +290,7 @@ int main(int argc, char *argv[])
{
int argctr;
int flags;
+ int multithreaded;
struct fuse *fuse;
if(argc < 2) {
@@ -308,7 +309,8 @@ int main(int argc, char *argv[])
set_signal_handlers();
atexit(cleanup);
- flags = FUSE_MULTITHREAD;
+ flags = 0;
+ multithreaded = 1;
for(; argctr < argc && argv[argctr][0] == '-'; argctr ++) {
switch(argv[argctr][1]) {
case 'd':
@@ -316,7 +318,7 @@ int main(int argc, char *argv[])
break;
case 's':
- flags &= ~FUSE_MULTITHREAD;
+ multithreaded = 0;
break;
default:
@@ -331,7 +333,11 @@ int main(int argc, char *argv[])
fuse = fuse_new(0, flags);
fuse_set_operations(fuse, &xmp_oper);
- fuse_loop(fuse);
+
+ if(multithreaded)
+ fuse_loop_mt(fuse);
+ else
+ fuse_loop(fuse);
return 0;
}
diff --git a/include/fuse.h b/include/fuse.h
index 60cd378..e284469 100644
--- a/include/fuse.h
+++ b/include/fuse.h
@@ -78,9 +78,6 @@ struct fuse_operations {
/* FUSE flags: */
-/** Process requests in multiple threads */
-#define FUSE_MULTITHREAD (1 << 0)
-
/** Enable debuging output */
#define FUSE_DEBUG (1 << 1)
@@ -115,6 +112,20 @@ void fuse_set_operations(struct fuse *f, const struct fuse_operations *op);
void fuse_loop(struct fuse *f);
/**
+ * FUSE event loop with multiple threads
+ *
+ * Requests from the kernel are processed, and the apropriate
+ * operations are called. Request are processed in parallel by
+ * distributing them between multiple threads.
+ *
+ * Calling this function requires the pthreads library to be linked to
+ * the application.
+ *
+ * @param f the FUSE handle
+ */
+void fuse_loop_mt(struct fuse *f);
+
+/**
* Destroy the FUSE handle.
*
* The filesystem is not unmounted.
@@ -122,3 +133,14 @@ void fuse_loop(struct fuse *f);
* @param f the FUSE handle
*/
void fuse_destroy(struct fuse *f);
+
+
+/* --------------------------------------------------- *
+ * Advanced API, usually you need not bother with this *
+ * --------------------------------------------------- */
+
+struct fuse_cmd;
+
+struct fuse_cmd *__fuse_read_cmd(struct fuse *f);
+
+void __fuse_process_cmd(struct fuse *f, struct fuse_cmd *cmd);
diff --git a/kernel/dev.c b/kernel/dev.c
index d183616..1cc12a2 100644
--- a/kernel/dev.c
+++ b/kernel/dev.c
@@ -163,8 +163,8 @@ static int request_wait(struct fuse_conn *fc)
{
int ret = 0;
DECLARE_WAITQUEUE(wait, current);
-
- add_wait_queue(&fc->waitq, &wait);
+
+ add_wait_queue_exclusive(&fc->waitq, &wait);
while(list_empty(&fc->pending)) {
set_current_state(TASK_INTERRUPTIBLE);
if(signal_pending(current)) {
diff --git a/lib/Makefile.am b/lib/Makefile.am
index f970b11..7f289f7 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -4,4 +4,5 @@ lib_LIBRARIES = libfuse.a
libfuse_a_SOURCES = \
fuse.c \
+ fuse_mt.c \
fuse_i.h
diff --git a/lib/fuse.c b/lib/fuse.c
index ed352c6..6bd32ae 100644
--- a/lib/fuse.c
+++ b/lib/fuse.c
@@ -301,6 +301,7 @@ static void convert_stat(struct stat *stbuf, struct fuse_attr *attr)
attr->atime = stbuf->st_atime;
attr->mtime = stbuf->st_mtime;
attr->ctime = stbuf->st_ctime;
+ attr->_dummy = 4096;
}
static int fill_dir(struct fuse_dirhandle *dh, char *name, int type)
@@ -745,19 +746,11 @@ static void do_write(struct fuse *f, struct fuse_in_header *in,
send_reply(f, in, res, NULL, 0);
}
-struct cmd {
- struct fuse *f;
- char *buf;
- size_t buflen;
-};
-
-static void *do_command(void *data)
+void __fuse_process_cmd(struct fuse *f, struct fuse_cmd *cmd)
{
- struct cmd *cmd = (struct cmd *) data;
struct fuse_in_header *in = (struct fuse_in_header *) cmd->buf;
void *inarg = cmd->buf + sizeof(struct fuse_in_header);
size_t argsize;
- struct fuse *f = cmd->f;
if((f->flags & FUSE_DEBUG)) {
printf("unique: %i, opcode: %i, ino: %li, insize: %i\n", in->unique,
@@ -836,66 +829,46 @@ static void *do_command(void *data)
if(in->unique != 0)
send_reply(f, in, -ENOSYS, NULL, 0);
}
-
+
free(cmd->buf);
free(cmd);
-
- return NULL;
}
-/* This hack makes it possible to link FUSE with or without the
- pthread library */
-__attribute__((weak))
-int pthread_create(pthread_t *thrid __attribute__((unused)),
- const pthread_attr_t *attr __attribute__((unused)),
- void *(*func)(void *) __attribute__((unused)),
- void *arg __attribute__((unused)))
+struct fuse_cmd *__fuse_read_cmd(struct fuse *f)
{
- return ENOSYS;
+ ssize_t res;
+ char inbuf[FUSE_MAX_IN];
+ struct fuse_cmd *cmd;
+
+ res = read(f->fd, inbuf, sizeof(inbuf));
+ if(res == -1) {
+ perror("reading fuse device");
+ /* BAD... This will happen again */
+ return NULL;
+ }
+ if((size_t) res < sizeof(struct fuse_in_header)) {
+ fprintf(stderr, "short read on fuse device\n");
+ /* Cannot happen */
+ return NULL;
+ }
+
+ cmd = (struct fuse_cmd *) malloc(sizeof(*cmd));
+ cmd->buflen = res;
+ cmd->buf = (char *) malloc(cmd->buflen);
+ memcpy(cmd->buf, inbuf, cmd->buflen);
+
+ return cmd;
}
+
void fuse_loop(struct fuse *f)
{
- int res;
- char inbuf[FUSE_MAX_IN];
- pthread_attr_t attr;
- pthread_t thrid;
-
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-
while(1) {
- struct cmd *cmd;
-
- res = read(f->fd, inbuf, sizeof(inbuf));
- if(res == -1) {
- perror("reading fuse device");
- /* BAD... This will happen again */
- exit(1);
- }
- if((size_t) res < sizeof(struct fuse_in_header)) {
- fprintf(stderr, "short read on fuse device\n");
- /* Cannot happen */
+ struct fuse_cmd *cmd = __fuse_read_cmd(f);
+ if(cmd == NULL)
exit(1);
- }
-
- cmd = (struct cmd *) malloc(sizeof(struct cmd));
- cmd->f = f;
- cmd->buflen = res;
- cmd->buf = (char *) malloc(cmd->buflen);
- memcpy(cmd->buf, inbuf, cmd->buflen);
-
- if(f->flags & FUSE_MULTITHREAD) {
- res = pthread_create(&thrid, &attr, do_command, cmd);
- if(res == 0)
- continue;
-
- fprintf(stderr, "Error creating thread: %s\n", strerror(res));
- fprintf(stderr, "Will run in single thread mode\n");
- f->flags &= ~FUSE_MULTITHREAD;
- }
- do_command(cmd);
+ __fuse_process_cmd(f, cmd);
}
}
@@ -909,6 +882,7 @@ struct fuse *fuse_new(int fd, int flags)
f->flags = flags;
f->fd = fd;
f->ctr = 0;
+ /* FIXME: Dynamic hash table */
f->name_table_size = 14057;
f->name_table = (struct node **)
calloc(1, sizeof(struct node *) * f->name_table_size);
@@ -934,6 +908,7 @@ void fuse_set_operations(struct fuse *f, const struct fuse_operations *op)
void fuse_destroy(struct fuse *f)
{
+ /* FIXME: Kill all threads... */
size_t i;
for(i = 0; i < f->ino_table_size; i++) {
struct node *node;
diff --git a/lib/fuse_i.h b/lib/fuse_i.h
index 4d3e042..6740608 100644
--- a/lib/fuse_i.h
+++ b/lib/fuse_i.h
@@ -40,3 +40,9 @@ struct fuse_dirhandle {
fino_t dir;
FILE *fp;
};
+
+struct fuse_cmd {
+ struct fuse *f;
+ char *buf;
+ size_t buflen;
+};
diff --git a/lib/fuse_mt.c b/lib/fuse_mt.c
new file mode 100644
index 0000000..ac616fe
--- /dev/null
+++ b/lib/fuse_mt.c
@@ -0,0 +1,111 @@
+/*
+ FUSE: Filesystem in Userspace
+ Copyright (C) 2001 Miklos Szeredi (mszeredi@inf.bme.hu)
+
+ This program can be distributed under the terms of the GNU GPL.
+ See the file COPYING.
+*/
+
+#include "fuse.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <pthread.h>
+#include <signal.h>
+#include <sys/time.h>
+
+
+struct thread_common {
+ struct fuse *f;
+ struct fuse_cmd *cmd;
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ int avail;
+};
+
+/* Called with c->lock held */
+static void *do_work(void *data)
+{
+ struct thread_common *c = (struct thread_common *) data;
+ struct fuse *f = c->f;
+
+ c->avail ++;
+ while(1) {
+ int res;
+ struct timespec timeout;
+ struct timeval now;
+ struct fuse_cmd *cmd;
+
+ gettimeofday(&now, NULL);
+ timeout.tv_sec = now.tv_sec + 1;
+ timeout.tv_nsec = now.tv_usec * 1000;
+
+ res = 0;
+ while(c->cmd == NULL && res != ETIMEDOUT)
+ res = pthread_cond_timedwait(&c->cond, &c->lock, &timeout);
+ if(res == ETIMEDOUT)
+ break;
+
+ cmd = c->cmd;
+ c->cmd = NULL;
+ c->avail --;
+ pthread_mutex_unlock(&c->lock);
+ __fuse_process_cmd(f, cmd);
+ pthread_mutex_lock(&c->lock);
+ c->avail ++;
+ }
+
+ c->avail --;
+ pthread_mutex_unlock(&c->lock);
+ return NULL;
+}
+
+static void start_thread(struct thread_common *c)
+{
+ pthread_attr_t attr;
+ pthread_t thrid;
+ sigset_t oldset;
+ sigset_t newset;
+ int res;
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+
+ /* Disallow signal reception in worker threads */
+ sigfillset(&newset);
+ sigprocmask(SIG_SETMASK, &newset, &oldset);
+ res = pthread_create(&thrid, &attr, do_work, c);
+ sigprocmask(SIG_SETMASK, &oldset, NULL);
+ pthread_mutex_lock(&c->lock);
+ if(res != 0) {
+ fprintf(stderr, "Error creating thread: %s\n", strerror(res));
+ exit(1);
+ }
+}
+
+void fuse_loop_mt(struct fuse *f)
+{
+ struct thread_common *c;
+
+ c = (struct thread_common *) malloc(sizeof(struct thread_common));
+ c->f = f;
+ c->cmd = NULL;
+ pthread_cond_init(&c->cond, NULL);
+ pthread_mutex_init(&c->lock, NULL);
+ c->avail = 0;
+
+ while(1) {
+ struct fuse_cmd *cmd = __fuse_read_cmd(f);
+ if(cmd == NULL)
+ exit(1);
+
+ pthread_mutex_lock(&c->lock);
+ c->cmd = cmd;
+ while(c->avail == 0)
+ start_thread(c);
+ pthread_cond_signal(&c->cond);
+ pthread_mutex_unlock(&c->lock);
+ }
+}
diff --git a/python/_fusemodule.c b/python/_fusemodule.c
index eee4fda..2f5e79c 100644
--- a/python/_fusemodule.c
+++ b/python/_fusemodule.c
@@ -68,29 +68,58 @@ static int readlink_func(const char *path, char *link, size_t size) {
EPILOGUE
}
+static int getdir_add_entry(PyObject *w, fuse_dirh_t dh, fuse_dirfil_t df)
+{
+ PyObject *o0;
+ PyObject *o1;
+ int ret = -EINVAL;
+
+ if(!PySequence_Check(w)) {
+ printf("getdir item not sequence\n");
+ goto out;
+ }
+ if(PySequence_Length(w) != 2) {
+ printf("getdir item not len 2\n");
+ goto out;
+ }
+ o0 = PySequence_GetItem(w, 0);
+ o1 = PySequence_GetItem(w, 1);
+
+ if(!PyString_Check(o0)) {
+ printf("getdir item[0] not string\n");
+ goto out_decref;
+ }
+ if(!PyInt_Check(o1)) {
+ printf("getdir item[1] not int\n");
+ goto out_decref;
+ }
+
+ ret = df(dh, PyString_AsString(o0), PyInt_AsLong(o1));
+
+out_decref:
+ Py_DECREF(o0);
+ Py_DECREF(o1);
+
+out:
+ return ret;
+}
+
static int getdir_func(const char *path, fuse_dirh_t dh, fuse_dirfil_t df) {
PyObject *v = PyObject_CallFunction(getdir_cb, "s", path);
int i;
PROLOGUE
- if(!PySequence_Check(v)) { printf("getdir_func not sequence\n");goto OUT_DECREF; }
- for(i=0; i < PySequence_Length(v); i++) {
- PyObject *w = PySequence_GetItem(v, i);
- printf("getdir_func validate %d\n", i);
- if(!PySequence_Check(w)) { printf("getdir item not sequence\n"); goto OUT_DECREF; }
- if(PySequence_Length(w) != 2) { printf("getdir item not len 2\n"); goto OUT_DECREF; }
- if(!PyString_Check(PySequence_GetItem(w,0))){ printf("getdir item[0] not string"); goto OUT_DECREF; }
- if(!PyInt_Check(PySequence_GetItem(w, 1))) { printf("getdir item[1] not int"); goto OUT_DECREF; }
+ if(!PySequence_Check(v)) {
+ printf("getdir_func not sequence\n");
+ goto OUT_DECREF;
}
-
for(i=0; i < PySequence_Length(v); i++) {
PyObject *w = PySequence_GetItem(v, i);
- printf("getdir_func %d\n", i);
- ret = df(dh, PyString_AsString(PySequence_GetItem(w, 0)),
- PyInt_AsLong(PySequence_GetItem(w, 1)));
- if(ret) goto OUT_DECREF;
+ ret = getdir_add_entry(w, dh, df);
+ Py_DECREF(w);
+ if(ret != 0)
+ goto OUT_DECREF;
}
-
ret = 0;
EPILOGUE
@@ -191,8 +220,6 @@ int open_func(const char *path, int mode) {
static PyObject *
Fuse_main(PyObject *self, PyObject *args, PyObject *kw)
{
- PyObject *list, *item;
-
int flags=0;
struct fuse_operations op;
@@ -260,7 +287,5 @@ init_fuse(void)
d = PyModule_GetDict(m);
ErrorObject = PyErr_NewException("fuse.error", NULL, NULL);
PyDict_SetItemString(d, "error", ErrorObject);
- PyDict_SetItemString(d, "MULTITHREAD", PyInt_FromLong(FUSE_MULTITHREAD));
PyDict_SetItemString(d, "DEBUG", PyInt_FromLong(FUSE_DEBUG));
-
}