# Copyright 2017 The Bazel Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import re import unittest from src.test.py.bazel import test_base class ActionTempTest(test_base.TestBase): """Test that Bazel sets a TMP/TEMP/TMPDIR envvar for actions.""" _invalidations = 0 def testActionTemp(self): self._CreateWorkspace() strategies = self._SpawnStrategies() self.assertIn('standalone', strategies) if not test_base.TestBase.IsWindows(): self.assertIn('sandboxed', strategies) self.assertIn('processwrapper-sandbox', strategies) bazel_bin = self._BazelOutputDirectory('bazel-bin') bazel_genfiles = self._BazelOutputDirectory('bazel-genfiles') # Test without user-defined temp directory. # In the absence of TMP/TEMP/TMPDIR, the LocalEnvProvider implementations # set the fallback temp directory. if test_base.TestBase.IsWindows(): expected_tmpdir_regex = r'execroot\\.+\\local-spawn-runner.[0-9]+\\work$' else: expected_tmpdir_regex = '^/tmp$' self._AssertTempDir('standalone', expected_tmpdir_regex, bazel_bin, bazel_genfiles) if not test_base.TestBase.IsWindows(): self._AssertTempDir('sandboxed', expected_tmpdir_regex, bazel_bin, bazel_genfiles) self._AssertTempDir('processwrapper-sandbox', expected_tmpdir_regex, bazel_bin, bazel_genfiles) # Test with user-defined temp directory. self._AssertClientEnvTemp('standalone', bazel_bin, bazel_genfiles) if not test_base.TestBase.IsWindows(): self._AssertClientEnvTemp('sandboxed', bazel_bin, bazel_genfiles) self._AssertClientEnvTemp('processwrapper-sandbox', bazel_bin, bazel_genfiles) # Helper methods start here ------------------------------------------------- def _AssertClientEnvTemp(self, strategy, bazel_bin, bazel_genfiles): def _Impl(tmp_dir): self._AssertTempDir( strategy=strategy, expected_tmpdir_regex=os.path.basename(tmp_dir), bazel_bin=bazel_bin, bazel_genfiles=bazel_genfiles, env_add=dict((k, tmp_dir) for k in self._TempEnvvars())) _Impl(self.ScratchDir(strategy + '-temp-1')) # Assert that the actions pick up the current client environment. # Check this by invalidating the actions (update input.txt) and running # Bazel with a different environment. _Impl(self.ScratchDir(strategy + '-temp-2')) def _AssertTempDir(self, strategy, expected_tmpdir_regex, bazel_bin, bazel_genfiles, env_add=None): self._invalidations += 1 input_file_contents = str(self._invalidations) self._UpdateInputFile(input_file_contents) outputs = self._BuildRules( strategy, bazel_bin, bazel_genfiles, env_remove=self._TempEnvvars(), env_add=env_add) self.assertEqual(len(outputs), 2) self._AssertOutputFileContents(outputs['genrule'], input_file_contents, expected_tmpdir_regex) self._AssertOutputFileContents(outputs['skylark'], input_file_contents, expected_tmpdir_regex) def _UpdateInputFile(self, content): self.ScratchFile('foo/input.txt', [content]) def _TempEnvvars(self): if test_base.TestBase.IsWindows(): return ['TMP', 'TEMP'] else: return ['TMPDIR'] def _BazelOutputDirectory(self, info_key): exit_code, stdout, stderr = self.RunBazel(['info', info_key]) self.AssertExitCode(exit_code, 0, stderr) return stdout[0] def _InvalidateActions(self, content): self.ScratchFile('foo/input.txt', [content]) def _CreateWorkspace(self, build_flags=None): if test_base.TestBase.IsWindows(): toolname = 'foo.cmd' toolsrc = [ '@SETLOCAL ENABLEEXTENSIONS', '@echo ON', 'if [%TMP%] == [] exit /B 1', 'if [%TEMP%] == [] exit /B 1', 'if not exist "%2" exit /B 2', 'set input_file=%2', # TMP/TEMP may refer to directories that other processes are also # writing to, so let's not try to create any files there because we # cannot generate safe temp file names. Instead just check that the # directory exists. It'd be nice to check that the directory is # writable, but I (@laszlocsomor) don't know how to do that without # actually attempting to write to the directory. 'type "%input_file:/=\\%" > "%1"', 'if exist "%TMP%" (echo TMP:y >> "%1") else (echo TMP:n >> "%1")', 'if exist "%TEMP%" (echo TEMP:y >> "%1") else (echo TEMP:n >> "%1")', 'set TMP >> "%1"', 'set TEMP >> "%1"', 'exit /B 0', ] else: toolname = 'foo.sh' toolsrc = [ '#!/bin/bash', 'set -eu', 'if [ -n "${TMPDIR:-}" ]; then', ' sleep 1', ' cat "$2" > "$1"', # TMPDIR might be "/tmp" or other shared directory, so we need a # unique name for the temp file we want to create there. ' tmpfile="$(mktemp "$TMPDIR/tmp.XXXXXXXX")"', ' echo foo > "$tmpfile"', ' cat "$tmpfile" >> "$1"', ' rm "$tmpfile"', ' echo "TMPDIR=${TMPDIR}" >> "$1"', 'else', ' exit 1', 'fi', ] self.ScratchFile('WORKSPACE') self.ScratchFile('foo/' + toolname, toolsrc, executable=True) self.ScratchFile('foo/foo.bzl', [ 'def _impl(ctx):', ' ctx.actions.run(', ' executable=ctx.executable.tool,', ' arguments=[ctx.outputs.out.path, ctx.file.src.path],', ' inputs=[ctx.file.src],', ' outputs=[ctx.outputs.out])', ' return [DefaultInfo(files=depset([ctx.outputs.out]))]', '', 'foorule = rule(', ' implementation=_impl,', ' attrs={"tool": attr.label(executable=True, cfg="host",', ' allow_files=True, single_file=True),', ' "src": attr.label(allow_files=True, single_file=True)},', ' outputs={"out": "%{name}.txt"},', ')', ]) self.ScratchFile('foo/BUILD', [ 'load("//foo:foo.bzl", "foorule")', '', 'genrule(', ' name = "genrule",', ' tools = ["%s"],' % toolname, ' srcs = ["input.txt"],', ' outs = ["genrule.txt"],', ' cmd = "$(location %s) $@ $(location input.txt)",' % toolname, ')', '', 'foorule(', ' name = "skylark",', ' src = "input.txt",', ' tool = "%s",' % toolname, ')', ]) def _SpawnStrategies(self): """Returns the list of supported --spawn_strategy values.""" # TODO(b/37617303): make test UI-independent exit_code, _, stderr = self.RunBazel([ 'build', '--color=no', '--curses=no', '--spawn_strategy=foo', '--noexperimental_ui' ]) self.AssertExitCode(exit_code, 2, stderr) pattern = re.compile( r'^ERROR:.*is an invalid value for.*Valid values are: (.*)\.$') for line in stderr: m = pattern.match(line) if m: return set(e.strip() for e in m.groups()[0].split(',')) return [] def _BuildRules(self, strategy, bazel_bin, bazel_genfiles, env_remove=None, env_add=None): def _ReadFile(path): with open(path, 'rt') as f: return [l.strip() for l in f] # TODO(b/37617303): make test UI-independent exit_code, _, stderr = self.RunBazel([ 'build', '--verbose_failures', '--noexperimental_ui', '--spawn_strategy=%s' % strategy, '//foo:genrule', '//foo:skylark', ], env_remove, env_add) self.AssertExitCode(exit_code, 0, stderr) self.assertTrue( os.path.exists(os.path.join(bazel_genfiles, 'foo/genrule.txt'))) self.assertTrue(os.path.exists(os.path.join(bazel_bin, 'foo/skylark.txt'))) return { 'genrule': _ReadFile(os.path.join(bazel_genfiles, 'foo/genrule.txt')), 'skylark': _ReadFile(os.path.join(bazel_bin, 'foo/skylark.txt')) } def _AssertOutputFileContents(self, lines, input_file_line, expected_tmpdir_regex): if test_base.TestBase.IsWindows(): # 5 lines = input_file_line, TMP:y, TEMP:y, TMP=, TEMP= if len(lines) != 5: self.fail('lines=%s' % lines) self.assertEqual(lines[0:3], [input_file_line, 'TMP:y', 'TEMP:y']) tmp = lines[3].split('=', 1)[1] temp = lines[4].split('=', 1)[1] self.assertRegexpMatches(tmp, expected_tmpdir_regex) self.assertEqual(tmp, temp) else: # 3 lines = input_file_line, foo, TMPDIR if len(lines) != 3: self.fail('lines=%s' % lines) self.assertEqual(lines[0:2], [input_file_line, 'foo']) tmpdir = lines[2].split('=', 1)[1] self.assertRegexpMatches(tmpdir, expected_tmpdir_regex) if __name__ == '__main__': unittest.main()