aboutsummaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorGravatar Nikolaus Rath <Nikolaus@rath.org>2016-03-29 16:07:29 -0700
committerGravatar Nikolaus Rath <Nikolaus@rath.org>2016-03-29 16:07:29 -0700
commit0db5bc69ea511eb1009f68775209ae8b58d6c553 (patch)
tree5ad0d545c01ce3d2166839e7b9f934e90f2bf644 /test
parent59e58de40c50ac05e24f5dc2d3267890974cde04 (diff)
Added unit tests for fusexmp and hello
Diffstat (limited to 'test')
-rwxr-xr-xtest/test_examples.py306
1 files changed, 306 insertions, 0 deletions
diff --git a/test/test_examples.py b/test/test_examples.py
new file mode 100755
index 0000000..9da7bb1
--- /dev/null
+++ b/test/test_examples.py
@@ -0,0 +1,306 @@
+#!/usr/bin/env python3
+
+if __name__ == '__main__':
+ import pytest
+ import sys
+ sys.exit(pytest.main([__file__] + sys.argv[1:]))
+
+import subprocess
+import os
+import sys
+import pytest
+import stat
+import shutil
+import filecmp
+import errno
+from tempfile import NamedTemporaryFile
+from util import wait_for_mount, umount, cleanup
+
+basename = os.path.join(os.path.dirname(__file__), '..')
+TEST_FILE = __file__
+
+with open(TEST_FILE, 'rb') as fh:
+ TEST_DATA = fh.read()
+
+def name_generator(__ctr=[0]):
+ __ctr[0] += 1
+ return 'testfile_%d' % __ctr[0]
+
+@pytest.mark.parametrize("name", ('hello', 'hello_ll'))
+def test_hello(tmpdir, name):
+ mnt_dir = str(tmpdir)
+ cmdline = [os.path.join(basename, 'example', name),
+ '-f', mnt_dir ]
+ mount_process = subprocess.Popen(cmdline)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+ assert os.listdir(mnt_dir) == [ 'hello' ]
+ filename = os.path.join(mnt_dir, 'hello')
+ with open(filename, 'r') as fh:
+ assert fh.read() == 'Hello World!\n'
+ with pytest.raises(IOError) as exc_info:
+ open(filename, 'r+')
+ assert exc_info.value.errno == errno.EACCES
+ with pytest.raises(IOError) as exc_info:
+ open(filename + 'does-not-exist', 'r+')
+ assert exc_info.value.errno == errno.ENOENT
+ except:
+ cleanup(mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
+
+@pytest.mark.parametrize("name", ('fusexmp', 'fusexmp_fh'))
+def test_fusexmp_fh(tmpdir, name):
+ mnt_dir = str(tmpdir.mkdir('mnt'))
+ src_dir = str(tmpdir.mkdir('src'))
+
+ cmdline = [os.path.join(basename, 'example', name),
+ '-f', '-o' , 'use_ino,readdir_ino,kernel_cache',
+ mnt_dir ]
+ mount_process = subprocess.Popen(cmdline)
+ try:
+ wait_for_mount(mount_process, mnt_dir)
+ work_dir = os.path.join(mnt_dir, src_dir)
+ tst_write(work_dir)
+ tst_mkdir(work_dir)
+ tst_symlink(work_dir)
+ tst_mknod(work_dir)
+ if os.getuid() == 0:
+ tst_chown(work_dir)
+ # Underlying fs may not have full nanosecond resolution
+ tst_utimens(work_dir, ns_tol=1000)
+ tst_link(work_dir)
+ tst_readdir(work_dir)
+ tst_statvfs(work_dir)
+ tst_truncate_path(work_dir)
+ tst_truncate_fd(work_dir)
+ tst_unlink(work_dir)
+ tst_passthrough(src_dir, work_dir)
+ except:
+ cleanup(mnt_dir)
+ raise
+ else:
+ umount(mount_process, mnt_dir)
+
+def checked_unlink(filename, path, isdir=False):
+ fullname = os.path.join(path, filename)
+ if isdir:
+ os.rmdir(fullname)
+ else:
+ os.unlink(fullname)
+ with pytest.raises(OSError) as exc_info:
+ os.stat(fullname)
+ assert exc_info.value.errno == errno.ENOENT
+ assert filename not in os.listdir(path)
+
+def tst_mkdir(mnt_dir):
+ dirname = name_generator()
+ fullname = mnt_dir + "/" + dirname
+ os.mkdir(fullname)
+ fstat = os.stat(fullname)
+ assert stat.S_ISDIR(fstat.st_mode)
+ assert os.listdir(fullname) == []
+ assert fstat.st_nlink in (1,2)
+ assert dirname in os.listdir(mnt_dir)
+ checked_unlink(dirname, mnt_dir, isdir=True)
+
+def tst_symlink(mnt_dir):
+ linkname = name_generator()
+ fullname = mnt_dir + "/" + linkname
+ os.symlink("/imaginary/dest", fullname)
+ fstat = os.lstat(fullname)
+ assert stat.S_ISLNK(fstat.st_mode)
+ assert os.readlink(fullname) == "/imaginary/dest"
+ assert fstat.st_nlink == 1
+ assert linkname in os.listdir(mnt_dir)
+ checked_unlink(linkname, mnt_dir)
+
+def tst_mknod(mnt_dir):
+ filename = os.path.join(mnt_dir, name_generator())
+ shutil.copyfile(TEST_FILE, filename)
+ fstat = os.lstat(filename)
+ assert stat.S_ISREG(fstat.st_mode)
+ assert fstat.st_nlink == 1
+ assert os.path.basename(filename) in os.listdir(mnt_dir)
+ assert filecmp.cmp(TEST_FILE, filename, False)
+ checked_unlink(filename, mnt_dir)
+
+def tst_chown(mnt_dir):
+ filename = os.path.join(mnt_dir, name_generator())
+ os.mkdir(filename)
+ fstat = os.lstat(filename)
+ uid = fstat.st_uid
+ gid = fstat.st_gid
+
+ uid_new = uid + 1
+ os.chown(filename, uid_new, -1)
+ fstat = os.lstat(filename)
+ assert fstat.st_uid == uid_new
+ assert fstat.st_gid == gid
+
+ gid_new = gid + 1
+ os.chown(filename, -1, gid_new)
+ fstat = os.lstat(filename)
+ assert fstat.st_uid == uid_new
+ assert fstat.st_gid == gid_new
+
+ checked_unlink(filename, mnt_dir, isdir=True)
+
+def tst_write(mnt_dir):
+ name = os.path.join(mnt_dir, name_generator())
+ shutil.copyfile(TEST_FILE, name)
+ assert filecmp.cmp(name, TEST_FILE, False)
+ checked_unlink(name, mnt_dir)
+
+def tst_unlink(mnt_dir):
+ name = os.path.join(mnt_dir, name_generator())
+ data1 = b'foo'
+ data2 = b'bar'
+
+ with open(os.path.join(mnt_dir, name), 'wb+', buffering=0) as fh:
+ fh.write(data1)
+ checked_unlink(name, mnt_dir)
+ fh.write(data2)
+ fh.seek(0)
+ assert fh.read() == data1+data2
+
+def tst_statvfs(mnt_dir):
+ os.statvfs(mnt_dir)
+
+def tst_link(mnt_dir):
+ name1 = os.path.join(mnt_dir, name_generator())
+ name2 = os.path.join(mnt_dir, name_generator())
+ shutil.copyfile(TEST_FILE, name1)
+ assert filecmp.cmp(name1, TEST_FILE, False)
+ os.link(name1, name2)
+
+ fstat1 = os.lstat(name1)
+ fstat2 = os.lstat(name2)
+
+ assert fstat1 == fstat2
+ assert fstat1.st_nlink == 2
+
+ assert os.path.basename(name2) in os.listdir(mnt_dir)
+ assert filecmp.cmp(name1, name2, False)
+ os.unlink(name2)
+ fstat1 = os.lstat(name1)
+ assert fstat1.st_nlink == 1
+ os.unlink(name1)
+
+def tst_readdir(mnt_dir):
+ dir_ = os.path.join(mnt_dir, name_generator())
+ file_ = dir_ + "/" + name_generator()
+ subdir = dir_ + "/" + name_generator()
+ subfile = subdir + "/" + name_generator()
+
+ os.mkdir(dir_)
+ shutil.copyfile(TEST_FILE, file_)
+ os.mkdir(subdir)
+ shutil.copyfile(TEST_FILE, subfile)
+
+ listdir_is = os.listdir(dir_)
+ listdir_is.sort()
+ listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ]
+ listdir_should.sort()
+ assert listdir_is == listdir_should
+
+ os.unlink(file_)
+ os.unlink(subfile)
+ os.rmdir(subdir)
+ os.rmdir(dir_)
+
+def tst_truncate_path(mnt_dir):
+ assert len(TEST_DATA) > 1024
+
+ filename = os.path.join(mnt_dir, name_generator())
+ with open(filename, 'wb') as fh:
+ fh.write(TEST_DATA)
+
+ fstat = os.stat(filename)
+ size = fstat.st_size
+ assert size == len(TEST_DATA)
+
+ # Add zeros at the end
+ os.truncate(filename, size + 1024)
+ assert os.stat(filename).st_size == size + 1024
+ with open(filename, 'rb') as fh:
+ assert fh.read(size) == TEST_DATA
+ assert fh.read(1025) == b'\0' * 1024
+
+ # Truncate data
+ os.truncate(filename, size - 1024)
+ assert os.stat(filename).st_size == size - 1024
+ with open(filename, 'rb') as fh:
+ assert fh.read(size) == TEST_DATA[:size-1024]
+
+ os.unlink(filename)
+
+def tst_truncate_fd(mnt_dir):
+ assert len(TEST_DATA) > 1024
+ with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh:
+ fd = fh.fileno()
+ fh.write(TEST_DATA)
+ fstat = os.fstat(fd)
+ size = fstat.st_size
+ assert size == len(TEST_DATA)
+
+ # Add zeros at the end
+ os.ftruncate(fd, size + 1024)
+ assert os.fstat(fd).st_size == size + 1024
+ fh.seek(0)
+ assert fh.read(size) == TEST_DATA
+ assert fh.read(1025) == b'\0' * 1024
+
+ # Truncate data
+ os.ftruncate(fd, size - 1024)
+ assert os.fstat(fd).st_size == size - 1024
+ fh.seek(0)
+ assert fh.read(size) == TEST_DATA[:size-1024]
+
+def tst_utimens(mnt_dir, ns_tol=0):
+ filename = os.path.join(mnt_dir, name_generator())
+ os.mkdir(filename)
+ fstat = os.lstat(filename)
+
+ atime = fstat.st_atime + 42.28
+ mtime = fstat.st_mtime - 42.23
+ if sys.version_info < (3,3):
+ os.utime(filename, (atime, mtime))
+ else:
+ atime_ns = fstat.st_atime_ns + int(42.28*1e9)
+ mtime_ns = fstat.st_mtime_ns - int(42.23*1e9)
+ os.utime(filename, None, ns=(atime_ns, mtime_ns))
+
+ fstat = os.lstat(filename)
+
+ assert abs(fstat.st_atime - atime) < 1e-3
+ assert abs(fstat.st_mtime - mtime) < 1e-3
+ if sys.version_info >= (3,3):
+ assert abs(fstat.st_atime_ns - atime_ns) <= ns_tol
+ assert abs(fstat.st_mtime_ns - mtime_ns) <= ns_tol
+
+ checked_unlink(filename, mnt_dir, isdir=True)
+
+def tst_passthrough(src_dir, mnt_dir):
+ name = name_generator()
+ src_name = os.path.join(src_dir, name)
+ mnt_name = os.path.join(src_dir, name)
+ assert name not in os.listdir(src_dir)
+ assert name not in os.listdir(mnt_dir)
+ with open(src_name, 'w') as fh:
+ fh.write('Hello, world')
+ assert name in os.listdir(src_dir)
+ assert name in os.listdir(mnt_dir)
+ assert os.stat(src_name) == os.stat(mnt_name)
+
+ name = name_generator()
+ src_name = os.path.join(src_dir, name)
+ mnt_name = os.path.join(src_dir, name)
+ assert name not in os.listdir(src_dir)
+ assert name not in os.listdir(mnt_dir)
+ with open(mnt_name, 'w') as fh:
+ fh.write('Hello, world')
+ assert name in os.listdir(src_dir)
+ assert name in os.listdir(mnt_dir)
+ assert os.stat(src_name) == os.stat(mnt_name)