From 0db5bc69ea511eb1009f68775209ae8b58d6c553 Mon Sep 17 00:00:00 2001 From: Nikolaus Rath Date: Tue, 29 Mar 2016 16:07:29 -0700 Subject: Added unit tests for fusexmp and hello --- test/test_examples.py | 306 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 306 insertions(+) create mode 100755 test/test_examples.py (limited to 'test') diff --git a/test/test_examples.py b/test/test_examples.py new file mode 100755 index 0000000..9da7bb1 --- /dev/null +++ b/test/test_examples.py @@ -0,0 +1,306 @@ +#!/usr/bin/env python3 + +if __name__ == '__main__': + import pytest + import sys + sys.exit(pytest.main([__file__] + sys.argv[1:])) + +import subprocess +import os +import sys +import pytest +import stat +import shutil +import filecmp +import errno +from tempfile import NamedTemporaryFile +from util import wait_for_mount, umount, cleanup + +basename = os.path.join(os.path.dirname(__file__), '..') +TEST_FILE = __file__ + +with open(TEST_FILE, 'rb') as fh: + TEST_DATA = fh.read() + +def name_generator(__ctr=[0]): + __ctr[0] += 1 + return 'testfile_%d' % __ctr[0] + +@pytest.mark.parametrize("name", ('hello', 'hello_ll')) +def test_hello(tmpdir, name): + mnt_dir = str(tmpdir) + cmdline = [os.path.join(basename, 'example', name), + '-f', mnt_dir ] + mount_process = subprocess.Popen(cmdline) + try: + wait_for_mount(mount_process, mnt_dir) + assert os.listdir(mnt_dir) == [ 'hello' ] + filename = os.path.join(mnt_dir, 'hello') + with open(filename, 'r') as fh: + assert fh.read() == 'Hello World!\n' + with pytest.raises(IOError) as exc_info: + open(filename, 'r+') + assert exc_info.value.errno == errno.EACCES + with pytest.raises(IOError) as exc_info: + open(filename + 'does-not-exist', 'r+') + assert exc_info.value.errno == errno.ENOENT + except: + cleanup(mnt_dir) + raise + else: + umount(mount_process, mnt_dir) + +@pytest.mark.parametrize("name", ('fusexmp', 'fusexmp_fh')) +def test_fusexmp_fh(tmpdir, name): + mnt_dir = str(tmpdir.mkdir('mnt')) + src_dir = str(tmpdir.mkdir('src')) + + cmdline = [os.path.join(basename, 'example', name), + '-f', '-o' , 'use_ino,readdir_ino,kernel_cache', + mnt_dir ] + mount_process = subprocess.Popen(cmdline) + try: + wait_for_mount(mount_process, mnt_dir) + work_dir = os.path.join(mnt_dir, src_dir) + tst_write(work_dir) + tst_mkdir(work_dir) + tst_symlink(work_dir) + tst_mknod(work_dir) + if os.getuid() == 0: + tst_chown(work_dir) + # Underlying fs may not have full nanosecond resolution + tst_utimens(work_dir, ns_tol=1000) + tst_link(work_dir) + tst_readdir(work_dir) + tst_statvfs(work_dir) + tst_truncate_path(work_dir) + tst_truncate_fd(work_dir) + tst_unlink(work_dir) + tst_passthrough(src_dir, work_dir) + except: + cleanup(mnt_dir) + raise + else: + umount(mount_process, mnt_dir) + +def checked_unlink(filename, path, isdir=False): + fullname = os.path.join(path, filename) + if isdir: + os.rmdir(fullname) + else: + os.unlink(fullname) + with pytest.raises(OSError) as exc_info: + os.stat(fullname) + assert exc_info.value.errno == errno.ENOENT + assert filename not in os.listdir(path) + +def tst_mkdir(mnt_dir): + dirname = name_generator() + fullname = mnt_dir + "/" + dirname + os.mkdir(fullname) + fstat = os.stat(fullname) + assert stat.S_ISDIR(fstat.st_mode) + assert os.listdir(fullname) == [] + assert fstat.st_nlink in (1,2) + assert dirname in os.listdir(mnt_dir) + checked_unlink(dirname, mnt_dir, isdir=True) + +def tst_symlink(mnt_dir): + linkname = name_generator() + fullname = mnt_dir + "/" + linkname + os.symlink("/imaginary/dest", fullname) + fstat = os.lstat(fullname) + assert stat.S_ISLNK(fstat.st_mode) + assert os.readlink(fullname) == "/imaginary/dest" + assert fstat.st_nlink == 1 + assert linkname in os.listdir(mnt_dir) + checked_unlink(linkname, mnt_dir) + +def tst_mknod(mnt_dir): + filename = os.path.join(mnt_dir, name_generator()) + shutil.copyfile(TEST_FILE, filename) + fstat = os.lstat(filename) + assert stat.S_ISREG(fstat.st_mode) + assert fstat.st_nlink == 1 + assert os.path.basename(filename) in os.listdir(mnt_dir) + assert filecmp.cmp(TEST_FILE, filename, False) + checked_unlink(filename, mnt_dir) + +def tst_chown(mnt_dir): + filename = os.path.join(mnt_dir, name_generator()) + os.mkdir(filename) + fstat = os.lstat(filename) + uid = fstat.st_uid + gid = fstat.st_gid + + uid_new = uid + 1 + os.chown(filename, uid_new, -1) + fstat = os.lstat(filename) + assert fstat.st_uid == uid_new + assert fstat.st_gid == gid + + gid_new = gid + 1 + os.chown(filename, -1, gid_new) + fstat = os.lstat(filename) + assert fstat.st_uid == uid_new + assert fstat.st_gid == gid_new + + checked_unlink(filename, mnt_dir, isdir=True) + +def tst_write(mnt_dir): + name = os.path.join(mnt_dir, name_generator()) + shutil.copyfile(TEST_FILE, name) + assert filecmp.cmp(name, TEST_FILE, False) + checked_unlink(name, mnt_dir) + +def tst_unlink(mnt_dir): + name = os.path.join(mnt_dir, name_generator()) + data1 = b'foo' + data2 = b'bar' + + with open(os.path.join(mnt_dir, name), 'wb+', buffering=0) as fh: + fh.write(data1) + checked_unlink(name, mnt_dir) + fh.write(data2) + fh.seek(0) + assert fh.read() == data1+data2 + +def tst_statvfs(mnt_dir): + os.statvfs(mnt_dir) + +def tst_link(mnt_dir): + name1 = os.path.join(mnt_dir, name_generator()) + name2 = os.path.join(mnt_dir, name_generator()) + shutil.copyfile(TEST_FILE, name1) + assert filecmp.cmp(name1, TEST_FILE, False) + os.link(name1, name2) + + fstat1 = os.lstat(name1) + fstat2 = os.lstat(name2) + + assert fstat1 == fstat2 + assert fstat1.st_nlink == 2 + + assert os.path.basename(name2) in os.listdir(mnt_dir) + assert filecmp.cmp(name1, name2, False) + os.unlink(name2) + fstat1 = os.lstat(name1) + assert fstat1.st_nlink == 1 + os.unlink(name1) + +def tst_readdir(mnt_dir): + dir_ = os.path.join(mnt_dir, name_generator()) + file_ = dir_ + "/" + name_generator() + subdir = dir_ + "/" + name_generator() + subfile = subdir + "/" + name_generator() + + os.mkdir(dir_) + shutil.copyfile(TEST_FILE, file_) + os.mkdir(subdir) + shutil.copyfile(TEST_FILE, subfile) + + listdir_is = os.listdir(dir_) + listdir_is.sort() + listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ] + listdir_should.sort() + assert listdir_is == listdir_should + + os.unlink(file_) + os.unlink(subfile) + os.rmdir(subdir) + os.rmdir(dir_) + +def tst_truncate_path(mnt_dir): + assert len(TEST_DATA) > 1024 + + filename = os.path.join(mnt_dir, name_generator()) + with open(filename, 'wb') as fh: + fh.write(TEST_DATA) + + fstat = os.stat(filename) + size = fstat.st_size + assert size == len(TEST_DATA) + + # Add zeros at the end + os.truncate(filename, size + 1024) + assert os.stat(filename).st_size == size + 1024 + with open(filename, 'rb') as fh: + assert fh.read(size) == TEST_DATA + assert fh.read(1025) == b'\0' * 1024 + + # Truncate data + os.truncate(filename, size - 1024) + assert os.stat(filename).st_size == size - 1024 + with open(filename, 'rb') as fh: + assert fh.read(size) == TEST_DATA[:size-1024] + + os.unlink(filename) + +def tst_truncate_fd(mnt_dir): + assert len(TEST_DATA) > 1024 + with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh: + fd = fh.fileno() + fh.write(TEST_DATA) + fstat = os.fstat(fd) + size = fstat.st_size + assert size == len(TEST_DATA) + + # Add zeros at the end + os.ftruncate(fd, size + 1024) + assert os.fstat(fd).st_size == size + 1024 + fh.seek(0) + assert fh.read(size) == TEST_DATA + assert fh.read(1025) == b'\0' * 1024 + + # Truncate data + os.ftruncate(fd, size - 1024) + assert os.fstat(fd).st_size == size - 1024 + fh.seek(0) + assert fh.read(size) == TEST_DATA[:size-1024] + +def tst_utimens(mnt_dir, ns_tol=0): + filename = os.path.join(mnt_dir, name_generator()) + os.mkdir(filename) + fstat = os.lstat(filename) + + atime = fstat.st_atime + 42.28 + mtime = fstat.st_mtime - 42.23 + if sys.version_info < (3,3): + os.utime(filename, (atime, mtime)) + else: + atime_ns = fstat.st_atime_ns + int(42.28*1e9) + mtime_ns = fstat.st_mtime_ns - int(42.23*1e9) + os.utime(filename, None, ns=(atime_ns, mtime_ns)) + + fstat = os.lstat(filename) + + assert abs(fstat.st_atime - atime) < 1e-3 + assert abs(fstat.st_mtime - mtime) < 1e-3 + if sys.version_info >= (3,3): + assert abs(fstat.st_atime_ns - atime_ns) <= ns_tol + assert abs(fstat.st_mtime_ns - mtime_ns) <= ns_tol + + checked_unlink(filename, mnt_dir, isdir=True) + +def tst_passthrough(src_dir, mnt_dir): + name = name_generator() + src_name = os.path.join(src_dir, name) + mnt_name = os.path.join(src_dir, name) + assert name not in os.listdir(src_dir) + assert name not in os.listdir(mnt_dir) + with open(src_name, 'w') as fh: + fh.write('Hello, world') + assert name in os.listdir(src_dir) + assert name in os.listdir(mnt_dir) + assert os.stat(src_name) == os.stat(mnt_name) + + name = name_generator() + src_name = os.path.join(src_dir, name) + mnt_name = os.path.join(src_dir, name) + assert name not in os.listdir(src_dir) + assert name not in os.listdir(mnt_dir) + with open(mnt_name, 'w') as fh: + fh.write('Hello, world') + assert name in os.listdir(src_dir) + assert name in os.listdir(mnt_dir) + assert os.stat(src_name) == os.stat(mnt_name) -- cgit v1.2.3