aboutsummaryrefslogtreecommitdiff
path: root/test/util.py
blob: b9c1b0cb179120b252e54934bf569c9d5dd8660b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python3
import subprocess
import pytest
import os
import stat
import time
from os.path import join as pjoin
import sys
import re

basename = pjoin(os.path.dirname(__file__), '..')

def test_printcap():
    cmdline = base_cmdline + [ pjoin(basename, 'example', 'printcap') ]
    proc = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
                            universal_newlines=True)
    (stdout, _) = proc.communicate(30)
    assert proc.returncode == 0

    proto = None
    caps = set()
    for line in stdout.split('\n'):
        if line.startswith('\t'):
            caps.add(line.strip())
            continue

        hit = re.match(r'Protocol version: (\d+)\.(\d+)$', line) 
        if hit:
            proto = (int(hit.group(1)), int(hit.group(2)))

    return (proto, caps)


def wait_for_mount(mount_process, mnt_dir,
                   test_fn=os.path.ismount):
    elapsed = 0
    while elapsed < 30:
        if test_fn(mnt_dir):
            return True
        if mount_process.poll() is not None:
            pytest.fail('file system process terminated prematurely')
        time.sleep(0.1)
        elapsed += 0.1
    pytest.fail("mountpoint failed to come up")

def cleanup(mnt_dir):
    # Don't bother trying Valgrind if things already went wrong

    if 'bsd' in sys.platform or 'dragonfly' in sys.platform:
        cmd = [ 'umount', '-f', mnt_dir ]
    else:
        cmd = [pjoin(basename, 'util', 'fusermount3'),
                         '-z', '-u', mnt_dir]
    subprocess.call(cmd, stdout=subprocess.DEVNULL,
                    stderr=subprocess.STDOUT)

def umount(mount_process, mnt_dir):

    if 'bsd' in sys.platform or 'dragonfly' in sys.platform:
        cmdline = [ 'umount', mnt_dir ]
    else:
        # fusermount3 will be setuid root, so we can only trace it with
        # valgrind if we're root
        if os.getuid() == 0:
            cmdline = base_cmdline
        else:
            cmdline = []
        cmdline = cmdline + [ pjoin(basename, 'util', 'fusermount3'),
                              '-z', '-u', mnt_dir ]

    subprocess.check_call(cmdline)
    assert not os.path.ismount(mnt_dir)

    # Give mount process a little while to terminate. Popen.wait(timeout)
    # was only added in 3.3...
    elapsed = 0
    while elapsed < 30:
        code = mount_process.poll()
        if code is not None:
            if code == 0:
                return
            pytest.fail('file system process terminated with code %s' % (code,))
        time.sleep(0.1)
        elapsed += 0.1
    pytest.fail('mount process did not terminate')


def safe_sleep(secs):
    '''Like time.sleep(), but sleep for at least *secs*

    `time.sleep` may sleep less than the given period if a signal is
    received. This function ensures that we sleep for at least the
    desired time.
    '''

    now = time.time()
    end = now + secs
    while now < end:
        time.sleep(end - now)
        now = time.time()

def fuse_test_marker():
    '''Return a pytest.marker that indicates FUSE availability

    If system/user/environment does not support FUSE, return
    a `pytest.mark.skip` object with more details. If FUSE is
    supported, return `pytest.mark.uses_fuse()`.
    '''

    skip = lambda x: pytest.mark.skip(reason=x)

    if 'bsd' in sys.platform or 'dragonfly' in sys.platform:
        return pytest.mark.uses_fuse()

    with subprocess.Popen(['which', 'fusermount3'], stdout=subprocess.PIPE,
                          universal_newlines=True) as which:
        fusermount_path = which.communicate()[0].strip()

    if not fusermount_path or which.returncode != 0:
        return skip("Can't find fusermount executable")

    if not os.path.exists('/dev/fuse'):
        return skip("FUSE kernel module does not seem to be loaded")

    if os.getuid() == 0:
        return pytest.mark.uses_fuse()

    mode = os.stat(fusermount_path).st_mode
    if mode & stat.S_ISUID == 0:
        return skip('fusermount executable not setuid, and we are not root.')

    try:
        fd = os.open('/dev/fuse', os.O_RDWR)
    except OSError as exc:
        return skip('Unable to open /dev/fuse: %s' % exc.strerror)
    else:
        os.close(fd)

    return pytest.mark.uses_fuse()

# Use valgrind if requested
if os.environ.get('TEST_WITH_VALGRIND', 'no').lower().strip() \
   not in ('no', 'false', '0'):
    base_cmdline = [ 'valgrind', '-q', '--' ]
else:
    base_cmdline = []

# Try to use local fusermount3
os.environ['PATH'] = '%s:%s' % (pjoin(basename, 'util'), os.environ['PATH'])

try:
    (fuse_proto, fuse_caps) = test_printcap()
except:
    # Rely on test to raise error
    fuse_proto = (0,0)
    fuse_caps = set()