From 33232032423dcc06716537204f1995afa5a73940 Mon Sep 17 00:00:00 2001 From: Miklos Szeredi Date: Mon, 19 Nov 2001 17:55:51 +0000 Subject: multithreading improvements --- BUGS | 3 ++ kernel/Makefile.am | 9 +++++- kernel/dev.c | 18 ++++++------ lib/fuse.c | 12 +++++++- lib/fuse_i.h | 2 ++ lib/fuse_mt.c | 25 ++++++++++------ python/Makefile | 7 +---- python/fuse.py | 72 ---------------------------------------------- python/xmp.py | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 134 insertions(+), 97 deletions(-) create mode 100755 python/xmp.py diff --git a/BUGS b/BUGS index df5f428..cd56648 100644 --- a/BUGS +++ b/BUGS @@ -2,3 +2,6 @@ - When a non-directory is mounted the root inode is not filled in, only at the first getattr + +- I want really low priority for my cached pages. Can they start out + 'old' so they will be thrown out on the first oportunity? diff --git a/kernel/Makefile.am b/kernel/Makefile.am index 2ea0e3b..033c8cb 100644 --- a/kernel/Makefile.am +++ b/kernel/Makefile.am @@ -25,7 +25,6 @@ uninstall-local: clean-local: rm -f *.o *.s - .c.o: $(CC) $(CFLAGS) $(CPPFAGS) -c $< @@ -33,3 +32,11 @@ fuse_objs = dev.o dir.o file.o inode.o util.o fuse.o: $(fuse_objs) ld -r -o fuse.o $(fuse_objs) + +fuse_headers = fuse_i.h ../include/linux/fuse.h + +dev.o: $(fuse_headers) +dir.o: $(fuse_headers) +file.o: $(fuse_headers) +inode.o: $(fuse_headers) +util.o: $(fuse_headers) diff --git a/kernel/dev.c b/kernel/dev.c index 4395736..1bff1eb 100644 --- a/kernel/dev.c +++ b/kernel/dev.c @@ -246,15 +246,17 @@ static ssize_t fuse_dev_read(struct file *file, char *buf, size_t nbytes, ret = copy_in_args(req->in, buf, nbytes); spin_lock(&fuse_lock); - if(req->issync || ret < 0) { - if(ret < 0) - list_add_tail(&req->list, &fc->pending); + if(req->issync) { + if(ret < 0) { + req->out->h.error = -EPROTO; + req->finished = 1; + } else { list_add_tail(&req->list, &fc->processing); req->sent = 1; } req->locked = 0; - if(req->interrupted) + if(ret < 0 || req->interrupted) wake_up(&req->waitq); req = NULL; @@ -395,16 +397,15 @@ static ssize_t fuse_dev_write(struct file *file, const char *buf, spin_lock(&fuse_lock); if(err) - list_add_tail(&fc->processing, &req->list); + req->out->h.error = -EPROTO; else { /* fget() needs to be done in this context */ if(req->in->h.opcode == FUSE_GETDIR && !oh.error) process_getdir(req); - req->finished = 1; } + req->finished = 1; req->locked = 0; - if(!err || req->interrupted) - wake_up(&req->waitq); + wake_up(&req->waitq); spin_unlock(&fuse_lock); if(!err) @@ -473,6 +474,7 @@ static void end_requests(struct fuse_conn *fc, struct list_head *head) list_del_init(&req->list); if(req->issync) { req->out->h.error = -ECONNABORTED; + req->finished = 1; wake_up(&req->waitq); } else diff --git a/lib/fuse.c b/lib/fuse.c index 2ed5169..b1b0749 100644 --- a/lib/fuse.c +++ b/lib/fuse.c @@ -319,7 +319,11 @@ static void send_reply_raw(struct fuse *f, char *outbuf, size_t outsize) out->error, strerror(-out->error), outsize); fflush(stdout); } - + + pthread_mutex_lock(&f->lock); + f->numavail ++; + pthread_mutex_unlock(&f->lock); + res = write(f->fd, outbuf, outsize); if(res == -1) { /* ENOENT means the operation was interrupted */ @@ -762,6 +766,10 @@ void __fuse_process_cmd(struct fuse *f, struct fuse_cmd *cmd) void *inarg = cmd->buf + sizeof(struct fuse_in_header); size_t argsize; + pthread_mutex_lock(&f->lock); + f->numavail --; + pthread_mutex_unlock(&f->lock); + if((f->flags & FUSE_DEBUG)) { printf("unique: %i, opcode: %i, ino: %li, insize: %i\n", in->unique, in->opcode, in->ino, cmd->buflen); @@ -904,6 +912,8 @@ struct fuse *fuse_new(int fd, int flags) f->ino_table = (struct node **) calloc(1, sizeof(struct node *) * f->ino_table_size); pthread_mutex_init(&f->lock, NULL); + f->numworker = 0; + f->numavail = 0; root = (struct node *) calloc(1, sizeof(struct node)); root->mode = 0; diff --git a/lib/fuse_i.h b/lib/fuse_i.h index 6740608..6e1453e 100644 --- a/lib/fuse_i.h +++ b/lib/fuse_i.h @@ -33,6 +33,8 @@ struct fuse { size_t ino_table_size; fino_t ctr; pthread_mutex_t lock; + int numworker; + int numavail; }; struct fuse_dirhandle { diff --git a/lib/fuse_mt.c b/lib/fuse_mt.c index 1dc4dcd..d33682c 100644 --- a/lib/fuse_mt.c +++ b/lib/fuse_mt.c @@ -6,7 +6,7 @@ See the file COPYING. */ -#include "fuse.h" +#include "fuse_i.h" #include #include @@ -16,7 +16,7 @@ #include #include -#define FUSE_NUM_WORKERS 5 +#define FUSE_MAX_WORKERS 10 struct fuse_worker { struct fuse *f; @@ -24,17 +24,27 @@ struct fuse_worker { fuse_processor_t proc; }; +static void start_thread(struct fuse_worker *w); + static void *do_work(void *data) { struct fuse_worker *w = (struct fuse_worker *) data; - + struct fuse *f = w->f; + while(1) { struct fuse_cmd *cmd = __fuse_read_cmd(w->f); if(cmd == NULL) exit(1); - w->proc(w->f, cmd, w->data); + if(f->numavail == 0 && f->numworker < FUSE_MAX_WORKERS) { + pthread_mutex_lock(&f->lock); + f->numavail ++; + f->numworker ++; + pthread_mutex_unlock(&f->lock); + start_thread(w); + } + w->proc(w->f, cmd, w->data); } return NULL; @@ -46,7 +56,7 @@ static void start_thread(struct fuse_worker *w) sigset_t oldset; sigset_t newset; int res; - + /* Disallow signal reception in worker threads */ sigfillset(&newset); pthread_sigmask(SIG_SETMASK, &newset, &oldset); @@ -62,16 +72,13 @@ static void start_thread(struct fuse_worker *w) void __fuse_loop_mt(struct fuse *f, fuse_processor_t proc, void *data) { struct fuse_worker *w; - int i; w = malloc(sizeof(struct fuse_worker)); w->f = f; w->data = data; w->proc = proc; - for(i = 1; i < FUSE_NUM_WORKERS; i++) - start_thread(w); - + f->numworker = 1; do_work(w); } diff --git a/python/Makefile b/python/Makefile index dfaf492..8b7ea80 100644 --- a/python/Makefile +++ b/python/Makefile @@ -1,10 +1,5 @@ _fusemodule.so: _fusemodule.c gcc -g3 -I/usr/include/python1.5 _fusemodule.c -Wl,-shared -o _fusemodule.so -Wimplicit -lfuse && python -c 'import _fuse' -demo: _fusemodule.so - -sudo umount tmp - fusermount tmp ./fuse.py - - clean: - rm _fusemodule.so *.pyc *.pyo + rm -f _fusemodule.so *.pyc *.pyo diff --git a/python/fuse.py b/python/fuse.py index 74b2380..ec1e633 100644 --- a/python/fuse.py +++ b/python/fuse.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # # Copyright (C) 2001 Jeff Epler # @@ -8,7 +7,6 @@ from _fuse import main, DEBUG import os -from stat import * from errno import * class ErrnoWrapper: @@ -38,73 +36,3 @@ class Fuse: d[a] = ErrnoWrapper(getattr(self, a)) apply(main, (), d) -class Xmp(Fuse): - flags = 1 - - def getattr(self, path): - return os.lstat(path) - - def readlink(self, path): - return os.readlink(path) - - def getdir(self, path): - return map(lambda x: (x,0), os.listdir(path)) - - def unlink(self, path): - return os.unlink(path) - - def rmdir(self, path): - return os.rmdir(path) - - def symlink(self, path, path1): - return os.symlink(path, path1) - - def rename(self, path, path1): - return os.rename(path, path1) - - def link(self, path, path1): - return os.link(path, path1) - - def chmod(self, path, mode): - return os.chmod(path, mode) - - def chown(self, path, user, group): - return os.chown(path, user, group) - - def truncate(self, path, size): - f = open(path, "w+") - return f.truncate(size) - - def mknod(self, path, mode, dev): - """ Python has no os.mknod, so we can only do some things """ - if S_ISREG(mode): - open(path, "w") - else: - return -EINVAL - - def mkdir(self, path, mode): - return os.mkdir(path, mode) - - def utime(self, path, times): - return os.utime(path, times) - - def open(self, path, flags): - os.close(os.open(path, flags)) - return 0 - - def read(self, path, len, offset): - f = open(path, "r") - f.seek(offset) - return f.read(len) - - def write(self, path, buf, off): - f = open(path, "r+") - f.seek(off) - f.write(buf) - return len(buf) - -if __name__ == '__main__': - server = Xmp() - server.flags = 0 - server.multithreaded = 1; - server.main() diff --git a/python/xmp.py b/python/xmp.py new file mode 100755 index 0000000..271e269 --- /dev/null +++ b/python/xmp.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python +# +# Copyright (C) 2001 Jeff Epler +# +# This program can be distributed under the terms of the GNU GPL. +# See the file COPYING. +# + +from fuse import Fuse +import os +from errno import * +from stat import * + +class Xmp(Fuse): + flags = 1 + + def getattr(self, path): + return os.lstat(path) + + def readlink(self, path): + return os.readlink(path) + + def getdir(self, path): + return map(lambda x: (x,0), os.listdir(path)) + + def unlink(self, path): + return os.unlink(path) + + def rmdir(self, path): + return os.rmdir(path) + + def symlink(self, path, path1): + return os.symlink(path, path1) + + def rename(self, path, path1): + return os.rename(path, path1) + + def link(self, path, path1): + return os.link(path, path1) + + def chmod(self, path, mode): + return os.chmod(path, mode) + + def chown(self, path, user, group): + return os.chown(path, user, group) + + def truncate(self, path, size): + f = open(path, "w+") + return f.truncate(size) + + def mknod(self, path, mode, dev): + """ Python has no os.mknod, so we can only do some things """ + if S_ISREG(mode): + open(path, "w") + else: + return -EINVAL + + def mkdir(self, path, mode): + return os.mkdir(path, mode) + + def utime(self, path, times): + return os.utime(path, times) + + def open(self, path, flags): + os.close(os.open(path, flags)) + return 0 + + def read(self, path, len, offset): + f = open(path, "r") + f.seek(offset) + return f.read(len) + + def write(self, path, buf, off): + f = open(path, "r+") + f.seek(off) + f.write(buf) + return len(buf) + +if __name__ == '__main__': + server = Xmp() + server.flags = 0 + server.multithreaded = 1; + server.main() -- cgit v1.2.3