aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorGravatar Miklos Szeredi <miklos@szeredi.hu>2010-11-08 17:11:46 +0100
committerGravatar Miklos Szeredi <mszeredi@suse.cz>2010-11-08 17:11:46 +0100
commit4e0aea6a96146115e2fb3b8c4a4c75325ad894d7 (patch)
treeddccccd975def8b0881dbd4b219a931a03ffd227 /lib
parent7d878eb13a9b1e0e1a428c1ead2733b8453a3bb7 (diff)
libfuse: support zero copy writes in lowlevel interface
Add new ->write_buf() method to low level interface. This allows passig a generic buffer, either containing a memory buffer or a file descriptor. This allows implementing zero copy writes. Add fuse_session_receive_buf() and fuse_session_process_buf() which may be used in event loop implementations to replace fuse_chan_recv() and fuse_session_process() respectively.
Diffstat (limited to 'lib')
-rw-r--r--lib/fuse_i.h7
-rw-r--r--lib/fuse_loop.c11
-rw-r--r--lib/fuse_loop_mt.c11
-rw-r--r--lib/fuse_lowlevel.c292
-rw-r--r--lib/fuse_session.c28
-rw-r--r--lib/fuse_versionscript2
6 files changed, 326 insertions, 25 deletions
diff --git a/lib/fuse_i.h b/lib/fuse_i.h
index 0206336..6d10b1c 100644
--- a/lib/fuse_i.h
+++ b/lib/fuse_i.h
@@ -15,6 +15,12 @@ struct fuse_ll;
struct fuse_session {
struct fuse_session_ops op;
+ int (*receive_buf)(struct fuse_session *se, struct fuse_buf *buf,
+ struct fuse_chan **chp);
+
+ void (*process_buf)(void *data, const struct fuse_buf *buf,
+ struct fuse_chan *ch);
+
void *data;
volatile int exited;
@@ -51,6 +57,7 @@ struct fuse_ll {
int big_writes;
int no_splice_write;
int no_splice_move;
+ int no_splice_read;
struct fuse_lowlevel_ops op;
int got_init;
struct cuse_data *cuse_data;
diff --git a/lib/fuse_loop.c b/lib/fuse_loop.c
index 104c5d4..b7b4ca4 100644
--- a/lib/fuse_loop.c
+++ b/lib/fuse_loop.c
@@ -25,12 +25,19 @@ int fuse_session_loop(struct fuse_session *se)
while (!fuse_session_exited(se)) {
struct fuse_chan *tmpch = ch;
- res = fuse_chan_recv(&tmpch, buf, bufsize);
+ struct fuse_buf fbuf = {
+ .mem = buf,
+ .size = bufsize,
+ };
+
+ res = fuse_session_receive_buf(se, &fbuf, &tmpch);
+
if (res == -EINTR)
continue;
if (res <= 0)
break;
- fuse_session_process(se, buf, res, tmpch);
+
+ fuse_session_process_buf(se, &fbuf, tmpch);
}
free(buf);
diff --git a/lib/fuse_loop_mt.c b/lib/fuse_loop_mt.c
index 05935d5..a76713b 100644
--- a/lib/fuse_loop_mt.c
+++ b/lib/fuse_loop_mt.c
@@ -70,10 +70,14 @@ static void *fuse_do_work(void *data)
while (!fuse_session_exited(mt->se)) {
int isforget = 0;
struct fuse_chan *ch = mt->prevch;
+ struct fuse_buf fbuf = {
+ .mem = w->buf,
+ .size = w->bufsize,
+ };
int res;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- res = fuse_chan_recv(&ch, w->buf, w->bufsize);
+ res = fuse_session_receive_buf(mt->se, &fbuf, &ch);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
if (res == -EINTR)
continue;
@@ -95,7 +99,8 @@ static void *fuse_do_work(void *data)
* This disgusting hack is needed so that zillions of threads
* are not created on a burst of FORGET messages
*/
- if (((struct fuse_in_header *) w->buf)->opcode == FUSE_FORGET)
+ if (!(fbuf.flags & FUSE_BUF_IS_FD) &&
+ ((struct fuse_in_header *) fbuf.mem)->opcode == FUSE_FORGET)
isforget = 1;
if (!isforget)
@@ -104,7 +109,7 @@ static void *fuse_do_work(void *data)
fuse_start_thread(mt);
pthread_mutex_unlock(&mt->lock);
- fuse_session_process(mt->se, w->buf, res, ch);
+ fuse_session_process_buf(mt->se, &fbuf, ch);
pthread_mutex_lock(&mt->lock);
if (!isforget)
diff --git a/lib/fuse_lowlevel.c b/lib/fuse_lowlevel.c
index 50c4715..9d3fa98 100644
--- a/lib/fuse_lowlevel.c
+++ b/lib/fuse_lowlevel.c
@@ -22,6 +22,7 @@
#include <unistd.h>
#include <limits.h>
#include <errno.h>
+#include <assert.h>
#ifndef F_LINUX_SPECIFIC_BASE
#define F_LINUX_SPECIFIC_BASE 1024
@@ -40,6 +41,13 @@ struct fuse_pollhandle {
struct fuse_ll *f;
};
+static size_t pagesize;
+
+static __attribute__((constructor)) void fuse_ll_init_pagesize(void)
+{
+ pagesize = getpagesize();
+}
+
static void convert_stat(const struct stat *stbuf, struct fuse_attr *attr)
{
attr->ino = stbuf->st_ino;
@@ -425,7 +433,7 @@ static struct fuse_ll_pipe *fuse_ll_get_pipe(struct fuse_ll *f)
/*
*the default size is 16 pages on linux
*/
- llp->size = getpagesize() * 16;
+ llp->size = pagesize * 16;
llp->can_grow = 1;
pthread_setspecific(f->pipe_key, llp);
@@ -434,6 +442,15 @@ static struct fuse_ll_pipe *fuse_ll_get_pipe(struct fuse_ll *f)
return llp;
}
+static void fuse_ll_clear_pipe(struct fuse_ll *f)
+{
+ struct fuse_ll_pipe *llp = pthread_getspecific(f->pipe_key);
+ if (llp) {
+ pthread_setspecific(f->pipe_key, NULL);
+ fuse_ll_pipe_free(llp);
+ }
+}
+
static int send_reply_iov_buf(fuse_req_t req, const struct iovec *iov,
int count, const char *buf, size_t len)
{
@@ -491,10 +508,6 @@ static int fuse_reply_data_iov(fuse_req_t req, struct iovec *iov, int iov_count,
.count = 1,
};
- static size_t pagesize = 0;
- if (!pagesize)
- pagesize = getpagesize();
-
if (req->f->broken_splice_nonblock)
goto fallback;
@@ -677,8 +690,7 @@ static int fuse_reply_data_iov(fuse_req_t req, struct iovec *iov, int iov_count,
return 0;
clear_pipe:
- pthread_setspecific(req->f->pipe_key, NULL);
- fuse_ll_pipe_free(llp);
+ fuse_ll_clear_pipe(req->f);
return res;
fallback:
@@ -1106,6 +1118,50 @@ static void do_write(fuse_req_t req, fuse_ino_t nodeid, const void *inarg)
fuse_reply_err(req, ENOSYS);
}
+static void do_write_buf(fuse_req_t req, fuse_ino_t nodeid, const void *inarg,
+ const struct fuse_buf *ibuf)
+{
+ struct fuse_buf buf = *ibuf;
+ struct fuse_bufvec bufv = {
+ .buf = &buf,
+ .count = 1,
+ };
+ struct fuse_write_in *arg = (struct fuse_write_in *) inarg;
+ struct fuse_file_info fi;
+
+ memset(&fi, 0, sizeof(fi));
+ fi.fh = arg->fh;
+ fi.fh_old = fi.fh;
+ fi.writepage = arg->write_flags & 1;
+
+ if (req->f->conn.proto_minor < 9) {
+ buf.mem = ((char *) arg) + FUSE_COMPAT_WRITE_IN_SIZE;
+ buf.size -= sizeof(struct fuse_in_header) +
+ FUSE_COMPAT_WRITE_IN_SIZE;
+ assert(!(buf.flags & FUSE_BUF_IS_FD));
+ } else {
+ fi.lock_owner = arg->lock_owner;
+ fi.flags = arg->flags;
+ if (!(buf.flags & FUSE_BUF_IS_FD))
+ buf.mem = PARAM(arg);
+
+ buf.size -= sizeof(struct fuse_in_header) +
+ sizeof(struct fuse_write_in);
+ }
+ if (buf.size < arg->size) {
+ fprintf(stderr, "fuse: do_write_buf: buffer size too small\n");
+ fuse_reply_err(req, EIO);
+ return;
+ }
+ buf.size = arg->size;
+
+ req->f->op.write_buf(req, nodeid, &bufv, arg->offset, &fi);
+
+ /* Need to reset the pipe if ->write_buf() didn't consume all data */
+ if ((ibuf->flags & FUSE_BUF_IS_FD) && bufv.idx < bufv.count)
+ fuse_ll_clear_pipe(req->f);
+}
+
static void do_flush(fuse_req_t req, fuse_ino_t nodeid, const void *inarg)
{
struct fuse_flush_in *arg = (struct fuse_flush_in *) inarg;
@@ -1543,10 +1599,13 @@ static void do_init(fuse_req_t req, fuse_ino_t nodeid, const void *inarg)
if (req->f->conn.proto_minor >= 14) {
f->conn.capable |= FUSE_CAP_SPLICE_WRITE | FUSE_CAP_SPLICE_MOVE;
+ f->conn.capable |= FUSE_CAP_SPLICE_READ;
if (!f->no_splice_write)
f->conn.want |= FUSE_CAP_SPLICE_WRITE;
if (!f->no_splice_move)
f->conn.want |= FUSE_CAP_SPLICE_MOVE;
+ if (f->op.write_buf && !f->no_splice_read)
+ f->conn.want |= FUSE_CAP_SPLICE_READ;
}
if (f->atomic_o_trunc)
@@ -1816,26 +1875,63 @@ static const char *opname(enum fuse_opcode opcode)
return fuse_ll_ops[opcode].name;
}
-static void fuse_ll_process(void *data, const char *buf, size_t len,
- struct fuse_chan *ch)
+static int fuse_ll_copy_from_pipe(struct fuse_buf *dst,
+ struct fuse_bufvec *srcv)
+{
+ int res;
+ struct fuse_bufvec dstv = { .buf = dst, .count = 1 };
+
+ res = fuse_buf_copy(&dstv, srcv, 0);
+ if (res < 0) {
+ fprintf(stderr, "fuse: copy from pipe: %s\n", strerror(-res));
+ return res;
+ }
+ if (res < dst->size) {
+ fprintf(stderr, "fuse: copy from pipe: short read\n");
+ return -1;
+ }
+ return 0;
+}
+
+static void fuse_ll_process_buf(void *data, const struct fuse_buf *buf,
+ struct fuse_chan *ch)
{
struct fuse_ll *f = (struct fuse_ll *) data;
- struct fuse_in_header *in = (struct fuse_in_header *) buf;
- const void *inarg = buf + sizeof(struct fuse_in_header);
+ const size_t write_header_size = sizeof(struct fuse_in_header) +
+ sizeof(struct fuse_write_in);
+ struct fuse_bufvec bufv = { .buf = buf, .count = 1 };
+ struct fuse_buf tmpbuf = { .size = write_header_size };
+ struct fuse_in_header *in;
+ const void *inarg;
struct fuse_req *req;
+ void *mbuf = NULL;
int err;
+ int res;
- if (f->debug)
- fprintf(stderr,
- "unique: %llu, opcode: %s (%i), nodeid: %lu, insize: %zu\n",
- (unsigned long long) in->unique,
- opname((enum fuse_opcode) in->opcode), in->opcode,
- (unsigned long) in->nodeid, len);
+ if (buf->flags & FUSE_BUF_IS_FD) {
+ if (buf->size < tmpbuf.size)
+ tmpbuf.size = buf->size;
+
+ mbuf = malloc(tmpbuf.size);
+ if (mbuf == NULL) {
+ fprintf(stderr, "fuse: failed to allocate header\n");
+ goto clear_pipe;
+ }
+ tmpbuf.mem = mbuf;
+
+ res = fuse_ll_copy_from_pipe(&tmpbuf, &bufv);
+ if (res < 0)
+ goto clear_pipe;
+
+ in = mbuf;
+ } else {
+ in = buf->mem;
+ }
req = (struct fuse_req *) calloc(1, sizeof(struct fuse_req));
if (req == NULL) {
fprintf(stderr, "fuse: failed to allocate request\n");
- return;
+ goto clear_pipe;
}
req->f = f;
@@ -1848,6 +1944,14 @@ static void fuse_ll_process(void *data, const char *buf, size_t len,
list_init_req(req);
fuse_mutex_init(&req->lock);
+ if (f->debug)
+ fprintf(stderr,
+ "unique: %llu, opcode: %s (%i), nodeid: %lu, insize: %zu\n",
+ (unsigned long long) in->unique,
+ opname((enum fuse_opcode) in->opcode), in->opcode,
+ (unsigned long) in->nodeid, buf->size);
+
+
err = EIO;
if (!f->got_init) {
enum fuse_opcode expected;
@@ -1878,11 +1982,56 @@ static void fuse_ll_process(void *data, const char *buf, size_t len,
if (intr)
fuse_reply_err(intr, EAGAIN);
}
- fuse_ll_ops[in->opcode].func(req, in->nodeid, inarg);
+
+ if ((buf->flags & FUSE_BUF_IS_FD) && write_header_size < buf->size &&
+ (in->opcode != FUSE_WRITE || !f->op.write_buf)) {
+ void *newmbuf;
+
+ err = ENOMEM;
+ newmbuf = realloc(mbuf, buf->size);
+ if (newmbuf == NULL)
+ goto reply_err;
+ mbuf = newmbuf;
+
+ tmpbuf = (struct fuse_buf) {
+ .size = buf->size - write_header_size,
+ .mem = mbuf + write_header_size,
+ };
+ res = fuse_ll_copy_from_pipe(&tmpbuf, &bufv);
+ err = -res;
+ if (res < 0)
+ goto reply_err;
+
+ in = mbuf;
+ }
+
+ inarg = (void *) &in[1];
+ if (in->opcode == FUSE_WRITE && f->op.write_buf)
+ do_write_buf(req, in->nodeid, inarg, buf);
+ else
+ fuse_ll_ops[in->opcode].func(req, in->nodeid, inarg);
+
+out_free:
+ free(mbuf);
return;
- reply_err:
+reply_err:
fuse_reply_err(req, err);
+clear_pipe:
+ if (buf->flags & FUSE_BUF_IS_FD)
+ fuse_ll_clear_pipe(f);
+ goto out_free;
+}
+
+static void fuse_ll_process(void *data, const char *buf, size_t len,
+ struct fuse_chan *ch)
+{
+ struct fuse_buf fbuf = {
+ .mem = (void *) buf,
+ .size = len,
+ };
+
+ fuse_ll_process_buf(data, &fbuf, ch);
}
enum {
@@ -1906,6 +2055,7 @@ static struct fuse_opt fuse_ll_opts[] = {
{ "big_writes", offsetof(struct fuse_ll, big_writes), 1},
{ "no_splice_write", offsetof(struct fuse_ll, no_splice_write), 1},
{ "no_splice_move", offsetof(struct fuse_ll, no_splice_move), 1},
+ { "no_splice_read", offsetof(struct fuse_ll, no_splice_read), 1},
FUSE_OPT_KEY("max_read=", FUSE_OPT_KEY_DISCARD),
FUSE_OPT_KEY("-h", KEY_HELP),
FUSE_OPT_KEY("--help", KEY_HELP),
@@ -1934,6 +2084,7 @@ static void fuse_ll_help(void)
" -o no_remote_lock disable remote file locking\n"
" -o no_splice_write don't use splice to write to the fuse device\n"
" -o no_splice_move don't move data while splicing to the fuse device\n"
+" -o no_splice_read don't use splice to read from the fuse device\n"
);
}
@@ -1987,6 +2138,104 @@ static void fuse_ll_pipe_destructor(void *data)
fuse_ll_pipe_free(llp);
}
+static int fuse_ll_receive_buf(struct fuse_session *se, struct fuse_buf *buf,
+ struct fuse_chan **chp)
+{
+ struct fuse_chan *ch = *chp;
+ struct fuse_ll *f = fuse_session_data(se);
+ size_t bufsize = buf->size;
+ struct fuse_ll_pipe *llp;
+ struct fuse_buf tmpbuf;
+ int err;
+ int res;
+
+ if (f->conn.proto_minor < 14 || !(f->conn.want & FUSE_CAP_SPLICE_READ))
+ goto fallback;
+
+ llp = fuse_ll_get_pipe(f);
+ if (llp == NULL)
+ goto fallback;
+
+ if (llp->size < bufsize) {
+ if (llp->can_grow) {
+ res = fcntl(llp->pipe[0], F_SETPIPE_SZ, bufsize);
+ if (res == -1) {
+ llp->can_grow = 0;
+ goto fallback;
+ }
+ llp->size = res;
+ }
+ if (llp->size < bufsize)
+ goto fallback;
+ }
+
+ res = splice(fuse_chan_fd(ch), NULL, llp->pipe[1], NULL, bufsize, 0);
+ err = errno;
+
+ if (fuse_session_exited(se))
+ return 0;
+
+ if (res == -1) {
+ if (err == ENODEV) {
+ fuse_session_exit(se);
+ return 0;
+ }
+ if (err != EINTR && err != EAGAIN)
+ perror("fuse: splice from device");
+ return -err;
+ }
+
+ if (res < sizeof(struct fuse_in_header)) {
+ fprintf(stderr, "short splice from fuse device\n");
+ return -EIO;
+ }
+
+ tmpbuf = (struct fuse_buf) {
+ .size = res,
+ .flags = FUSE_BUF_IS_FD,
+ .fd = llp->pipe[0],
+ };
+
+ /*
+ * Don't bother with zero copy for small requests.
+ * fuse_loop_mt() needs to check for FORGET so this more than
+ * just an optimization.
+ */
+ if (res < sizeof(struct fuse_in_header) +
+ sizeof(struct fuse_write_in) + pagesize) {
+ struct fuse_bufvec src = { .buf = &tmpbuf, .count = 1 };
+ struct fuse_bufvec dst = { .buf = buf, .count = 1 };
+
+ res = fuse_buf_copy(&dst, &src, 0);
+ if (res < 0) {
+ fprintf(stderr, "fuse: copy from pipe: %s\n",
+ strerror(-res));
+ fuse_ll_clear_pipe(f);
+ return res;
+ }
+ if (res < tmpbuf.size) {
+ fprintf(stderr, "fuse: copy from pipe: short read\n");
+ fuse_ll_clear_pipe(f);
+ return -EIO;
+ }
+ buf->size = tmpbuf.size;
+ return buf->size;
+ }
+
+ *buf = tmpbuf;
+
+ return res;
+
+fallback:
+ res = fuse_chan_recv(chp, buf->mem, bufsize);
+ if (res <= 0)
+ return res;
+
+ buf->size = res;
+
+ return res;
+}
+
/*
* always call fuse_lowlevel_new_common() internally, to work around a
* misfeature in the FreeBSD runtime linker, which links the old
@@ -2044,6 +2293,9 @@ struct fuse_session *fuse_lowlevel_new_common(struct fuse_args *args,
if (!se)
goto out_key_destroy;
+ se->receive_buf = fuse_ll_receive_buf;
+ se->process_buf = fuse_ll_process_buf;
+
return se;
out_key_destroy:
diff --git a/lib/fuse_session.c b/lib/fuse_session.c
index 3758627..c55f250 100644
--- a/lib/fuse_session.c
+++ b/lib/fuse_session.c
@@ -80,6 +80,34 @@ void fuse_session_process(struct fuse_session *se, const char *buf, size_t len,
se->op.process(se->data, buf, len, ch);
}
+void fuse_session_process_buf(struct fuse_session *se,
+ const struct fuse_buf *buf, struct fuse_chan *ch)
+{
+ if (se->process_buf) {
+ se->process_buf(se->data, buf, ch);
+ } else {
+ assert(!(buf->flags & FUSE_BUF_IS_FD));
+ fuse_session_process(se->data, buf->mem, buf->size, ch);
+ }
+}
+
+int fuse_session_receive_buf(struct fuse_session *se, struct fuse_buf *buf,
+ struct fuse_chan **chp)
+{
+ int res;
+
+ if (se->receive_buf) {
+ res = se->receive_buf(se, buf, chp);
+ } else {
+ res = fuse_chan_recv(chp, buf->mem, buf->size);
+ if (res > 0)
+ buf->size = res;
+ }
+
+ return res;
+}
+
+
void fuse_session_destroy(struct fuse_session *se)
{
if (se->op.destroy)
diff --git a/lib/fuse_versionscript b/lib/fuse_versionscript
index 860c403..2f531e1 100644
--- a/lib/fuse_versionscript
+++ b/lib/fuse_versionscript
@@ -185,6 +185,8 @@ FUSE_2.9 {
fuse_buf_copy;
fuse_buf_size;
fuse_reply_data;
+ fuse_session_process_buf;
+ fuse_session_receive_buf;
local:
*;