aboutsummaryrefslogtreecommitdiffhomepage
path: root/tools/run_tests/python_utils
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2017-12-11 16:54:47 -0800
committerGravatar ncteisen <ncteisen@gmail.com>2017-12-11 16:54:47 -0800
commit05687c3da9848ab79f08417793965de9bbbb52b0 (patch)
treed0081bba30e90f7ea576dded2604021a9c5baeb2 /tools/run_tests/python_utils
parent173c477bd077e9081505e44af17bc2d442ae11b0 (diff)
yapf tools/run_tests/python_utils
Diffstat (limited to 'tools/run_tests/python_utils')
-rwxr-xr-xtools/run_tests/python_utils/antagonist.py3
-rw-r--r--tools/run_tests/python_utils/comment_on_pr.py33
-rwxr-xr-xtools/run_tests/python_utils/dockerjob.py162
-rw-r--r--tools/run_tests/python_utils/filter_pull_request_tests.py200
-rwxr-xr-xtools/run_tests/python_utils/jobset.py826
-rwxr-xr-xtools/run_tests/python_utils/port_server.py253
-rw-r--r--tools/run_tests/python_utils/report_utils.py219
-rw-r--r--tools/run_tests/python_utils/start_port_server.py11
-rw-r--r--tools/run_tests/python_utils/upload_test_results.py231
-rwxr-xr-xtools/run_tests/python_utils/watch_dirs.py79
10 files changed, 1056 insertions, 961 deletions
diff --git a/tools/run_tests/python_utils/antagonist.py b/tools/run_tests/python_utils/antagonist.py
index 0d79ce0986..a928a4cb00 100755
--- a/tools/run_tests/python_utils/antagonist.py
+++ b/tools/run_tests/python_utils/antagonist.py
@@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
"""This is used by run_tests.py to create cpu load on a machine"""
while True:
- pass
+ pass
diff --git a/tools/run_tests/python_utils/comment_on_pr.py b/tools/run_tests/python_utils/comment_on_pr.py
index 21b9bb7085..399c996d4d 100644
--- a/tools/run_tests/python_utils/comment_on_pr.py
+++ b/tools/run_tests/python_utils/comment_on_pr.py
@@ -16,19 +16,22 @@ import os
import json
import urllib2
+
def comment_on_pr(text):
- if 'JENKINS_OAUTH_TOKEN' not in os.environ:
- print 'Missing JENKINS_OAUTH_TOKEN env var: not commenting'
- return
- if 'ghprbPullId' not in os.environ:
- print 'Missing ghprbPullId env var: not commenting'
- return
- req = urllib2.Request(
- url = 'https://api.github.com/repos/grpc/grpc/issues/%s/comments' %
- os.environ['ghprbPullId'],
- data = json.dumps({'body': text}),
- headers = {
- 'Authorization': 'token %s' % os.environ['JENKINS_OAUTH_TOKEN'],
- 'Content-Type': 'application/json',
- })
- print urllib2.urlopen(req).read()
+ if 'JENKINS_OAUTH_TOKEN' not in os.environ:
+ print 'Missing JENKINS_OAUTH_TOKEN env var: not commenting'
+ return
+ if 'ghprbPullId' not in os.environ:
+ print 'Missing ghprbPullId env var: not commenting'
+ return
+ req = urllib2.Request(
+ url='https://api.github.com/repos/grpc/grpc/issues/%s/comments' %
+ os.environ['ghprbPullId'],
+ data=json.dumps({
+ 'body': text
+ }),
+ headers={
+ 'Authorization': 'token %s' % os.environ['JENKINS_OAUTH_TOKEN'],
+ 'Content-Type': 'application/json',
+ })
+ print urllib2.urlopen(req).read()
diff --git a/tools/run_tests/python_utils/dockerjob.py b/tools/run_tests/python_utils/dockerjob.py
index 2f5285b26c..d2941c0811 100755
--- a/tools/run_tests/python_utils/dockerjob.py
+++ b/tools/run_tests/python_utils/dockerjob.py
@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
"""Helpers to run docker instances as jobs."""
from __future__ import print_function
@@ -28,102 +27,109 @@ _DEVNULL = open(os.devnull, 'w')
def random_name(base_name):
- """Randomizes given base name."""
- return '%s_%s' % (base_name, uuid.uuid4())
+ """Randomizes given base name."""
+ return '%s_%s' % (base_name, uuid.uuid4())
def docker_kill(cid):
- """Kills a docker container. Returns True if successful."""
- return subprocess.call(['docker','kill', str(cid)],
- stdin=subprocess.PIPE,
- stdout=_DEVNULL,
- stderr=subprocess.STDOUT) == 0
+ """Kills a docker container. Returns True if successful."""
+ return subprocess.call(
+ ['docker', 'kill', str(cid)],
+ stdin=subprocess.PIPE,
+ stdout=_DEVNULL,
+ stderr=subprocess.STDOUT) == 0
def docker_mapped_port(cid, port, timeout_seconds=15):
- """Get port mapped to internal given internal port for given container."""
- started = time.time()
- while time.time() - started < timeout_seconds:
- try:
- output = subprocess.check_output('docker port %s %s' % (cid, port),
- stderr=_DEVNULL,
- shell=True)
- return int(output.split(':', 2)[1])
- except subprocess.CalledProcessError as e:
- pass
- raise Exception('Failed to get exposed port %s for container %s.' %
- (port, cid))
+ """Get port mapped to internal given internal port for given container."""
+ started = time.time()
+ while time.time() - started < timeout_seconds:
+ try:
+ output = subprocess.check_output(
+ 'docker port %s %s' % (cid, port), stderr=_DEVNULL, shell=True)
+ return int(output.split(':', 2)[1])
+ except subprocess.CalledProcessError as e:
+ pass
+ raise Exception('Failed to get exposed port %s for container %s.' %
+ (port, cid))
def wait_for_healthy(cid, shortname, timeout_seconds):
- """Wait timeout_seconds for the container to become healthy"""
- started = time.time()
- while time.time() - started < timeout_seconds:
- try:
- output = subprocess.check_output(
- ['docker', 'inspect', '--format="{{.State.Health.Status}}"', cid],
- stderr=_DEVNULL)
- if output.strip('\n') == 'healthy':
- return
- except subprocess.CalledProcessError as e:
- pass
- time.sleep(1)
- raise Exception('Timed out waiting for %s (%s) to pass health check' %
- (shortname, cid))
+ """Wait timeout_seconds for the container to become healthy"""
+ started = time.time()
+ while time.time() - started < timeout_seconds:
+ try:
+ output = subprocess.check_output(
+ [
+ 'docker', 'inspect', '--format="{{.State.Health.Status}}"',
+ cid
+ ],
+ stderr=_DEVNULL)
+ if output.strip('\n') == 'healthy':
+ return
+ except subprocess.CalledProcessError as e:
+ pass
+ time.sleep(1)
+ raise Exception('Timed out waiting for %s (%s) to pass health check' %
+ (shortname, cid))
def finish_jobs(jobs):
- """Kills given docker containers and waits for corresponding jobs to finish"""
- for job in jobs:
- job.kill(suppress_failure=True)
+ """Kills given docker containers and waits for corresponding jobs to finish"""
+ for job in jobs:
+ job.kill(suppress_failure=True)
- while any(job.is_running() for job in jobs):
- time.sleep(1)
+ while any(job.is_running() for job in jobs):
+ time.sleep(1)
def image_exists(image):
- """Returns True if given docker image exists."""
- return subprocess.call(['docker','inspect', image],
- stdin=subprocess.PIPE,
- stdout=_DEVNULL,
- stderr=subprocess.STDOUT) == 0
+ """Returns True if given docker image exists."""
+ return subprocess.call(
+ ['docker', 'inspect', image],
+ stdin=subprocess.PIPE,
+ stdout=_DEVNULL,
+ stderr=subprocess.STDOUT) == 0
def remove_image(image, skip_nonexistent=False, max_retries=10):
- """Attempts to remove docker image with retries."""
- if skip_nonexistent and not image_exists(image):
- return True
- for attempt in range(0, max_retries):
- if subprocess.call(['docker','rmi', '-f', image],
- stdin=subprocess.PIPE,
- stdout=_DEVNULL,
- stderr=subprocess.STDOUT) == 0:
- return True
- time.sleep(2)
- print('Failed to remove docker image %s' % image)
- return False
+ """Attempts to remove docker image with retries."""
+ if skip_nonexistent and not image_exists(image):
+ return True
+ for attempt in range(0, max_retries):
+ if subprocess.call(
+ ['docker', 'rmi', '-f', image],
+ stdin=subprocess.PIPE,
+ stdout=_DEVNULL,
+ stderr=subprocess.STDOUT) == 0:
+ return True
+ time.sleep(2)
+ print('Failed to remove docker image %s' % image)
+ return False
class DockerJob:
- """Encapsulates a job"""
-
- def __init__(self, spec):
- self._spec = spec
- self._job = jobset.Job(spec, newline_on_success=True, travis=True, add_env={})
- self._container_name = spec.container_name
-
- def mapped_port(self, port):
- return docker_mapped_port(self._container_name, port)
-
- def wait_for_healthy(self, timeout_seconds):
- wait_for_healthy(self._container_name, self._spec.shortname, timeout_seconds)
-
- def kill(self, suppress_failure=False):
- """Sends kill signal to the container."""
- if suppress_failure:
- self._job.suppress_failure_message()
- return docker_kill(self._container_name)
-
- def is_running(self):
- """Polls a job and returns True if given job is still running."""
- return self._job.state() == jobset._RUNNING
+ """Encapsulates a job"""
+
+ def __init__(self, spec):
+ self._spec = spec
+ self._job = jobset.Job(
+ spec, newline_on_success=True, travis=True, add_env={})
+ self._container_name = spec.container_name
+
+ def mapped_port(self, port):
+ return docker_mapped_port(self._container_name, port)
+
+ def wait_for_healthy(self, timeout_seconds):
+ wait_for_healthy(self._container_name, self._spec.shortname,
+ timeout_seconds)
+
+ def kill(self, suppress_failure=False):
+ """Sends kill signal to the container."""
+ if suppress_failure:
+ self._job.suppress_failure_message()
+ return docker_kill(self._container_name)
+
+ def is_running(self):
+ """Polls a job and returns True if given job is still running."""
+ return self._job.state() == jobset._RUNNING
diff --git a/tools/run_tests/python_utils/filter_pull_request_tests.py b/tools/run_tests/python_utils/filter_pull_request_tests.py
index e880734651..8e0dc708dd 100644
--- a/tools/run_tests/python_utils/filter_pull_request_tests.py
+++ b/tools/run_tests/python_utils/filter_pull_request_tests.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
"""Filter out tests based on file differences compared to merge target branch"""
from __future__ import print_function
@@ -23,24 +22,25 @@ from subprocess import check_output
class TestSuite:
- """
+ """
Contains label to identify job as belonging to this test suite and
triggers to identify if changed files are relevant
"""
- def __init__(self, labels):
- """
+
+ def __init__(self, labels):
+ """
Build TestSuite to group tests based on labeling
:param label: strings that should match a jobs's platform, config, language, or test group
"""
- self.triggers = []
- self.labels = labels
+ self.triggers = []
+ self.labels = labels
- def add_trigger(self, trigger):
- """
+ def add_trigger(self, trigger):
+ """
Add a regex to list of triggers that determine if a changed file should run tests
:param trigger: regex matching file relevant to tests
"""
- self.triggers.append(trigger)
+ self.triggers.append(trigger)
# Create test suites
@@ -55,10 +55,11 @@ _RUBY_TEST_SUITE = TestSuite(['ruby'])
_LINUX_TEST_SUITE = TestSuite(['linux'])
_WINDOWS_TEST_SUITE = TestSuite(['windows'])
_MACOS_TEST_SUITE = TestSuite(['macos'])
-_ALL_TEST_SUITES = [_CORE_TEST_SUITE, _CPP_TEST_SUITE, _CSHARP_TEST_SUITE,
- _NODE_TEST_SUITE, _OBJC_TEST_SUITE, _PHP_TEST_SUITE,
- _PYTHON_TEST_SUITE, _RUBY_TEST_SUITE, _LINUX_TEST_SUITE,
- _WINDOWS_TEST_SUITE, _MACOS_TEST_SUITE]
+_ALL_TEST_SUITES = [
+ _CORE_TEST_SUITE, _CPP_TEST_SUITE, _CSHARP_TEST_SUITE, _NODE_TEST_SUITE,
+ _OBJC_TEST_SUITE, _PHP_TEST_SUITE, _PYTHON_TEST_SUITE, _RUBY_TEST_SUITE,
+ _LINUX_TEST_SUITE, _WINDOWS_TEST_SUITE, _MACOS_TEST_SUITE
+]
# Dictionary of whitelistable files where the key is a regex matching changed files
# and the value is a list of tests that should be run. An empty list means that
@@ -66,46 +67,46 @@ _ALL_TEST_SUITES = [_CORE_TEST_SUITE, _CPP_TEST_SUITE, _CSHARP_TEST_SUITE,
# match any of these regexes will trigger all tests
# DO NOT CHANGE THIS UNLESS YOU KNOW WHAT YOU ARE DOING (be careful even if you do)
_WHITELIST_DICT = {
- '^doc/': [],
- '^examples/': [],
- '^include/grpc\+\+/': [_CPP_TEST_SUITE],
- '^summerofcode/': [],
- '^src/cpp/': [_CPP_TEST_SUITE],
- '^src/csharp/': [_CSHARP_TEST_SUITE],
- '^src/objective\-c/': [_OBJC_TEST_SUITE],
- '^src/php/': [_PHP_TEST_SUITE],
- '^src/python/': [_PYTHON_TEST_SUITE],
- '^src/ruby/': [_RUBY_TEST_SUITE],
- '^templates/': [],
- '^test/core/': [_CORE_TEST_SUITE, _CPP_TEST_SUITE],
- '^test/cpp/': [_CPP_TEST_SUITE],
- '^test/distrib/cpp/': [_CPP_TEST_SUITE],
- '^test/distrib/csharp/': [_CSHARP_TEST_SUITE],
- '^test/distrib/php/': [_PHP_TEST_SUITE],
- '^test/distrib/python/': [_PYTHON_TEST_SUITE],
- '^test/distrib/ruby/': [_RUBY_TEST_SUITE],
- '^vsprojects/': [_WINDOWS_TEST_SUITE],
- 'composer\.json$': [_PHP_TEST_SUITE],
- 'config\.m4$': [_PHP_TEST_SUITE],
- 'CONTRIBUTING\.md$': [],
- 'Gemfile$': [_RUBY_TEST_SUITE],
- 'grpc\.def$': [_WINDOWS_TEST_SUITE],
- 'grpc\.gemspec$': [_RUBY_TEST_SUITE],
- 'gRPC\.podspec$': [_OBJC_TEST_SUITE],
- 'gRPC\-Core\.podspec$': [_OBJC_TEST_SUITE],
- 'gRPC\-ProtoRPC\.podspec$': [_OBJC_TEST_SUITE],
- 'gRPC\-RxLibrary\.podspec$': [_OBJC_TEST_SUITE],
- 'INSTALL\.md$': [],
- 'LICENSE$': [],
- 'MANIFEST\.md$': [],
- 'package\.json$': [_PHP_TEST_SUITE],
- 'package\.xml$': [_PHP_TEST_SUITE],
- 'PATENTS$': [],
- 'PYTHON\-MANIFEST\.in$': [_PYTHON_TEST_SUITE],
- 'README\.md$': [],
- 'requirements\.txt$': [_PYTHON_TEST_SUITE],
- 'setup\.cfg$': [_PYTHON_TEST_SUITE],
- 'setup\.py$': [_PYTHON_TEST_SUITE]
+ '^doc/': [],
+ '^examples/': [],
+ '^include/grpc\+\+/': [_CPP_TEST_SUITE],
+ '^summerofcode/': [],
+ '^src/cpp/': [_CPP_TEST_SUITE],
+ '^src/csharp/': [_CSHARP_TEST_SUITE],
+ '^src/objective\-c/': [_OBJC_TEST_SUITE],
+ '^src/php/': [_PHP_TEST_SUITE],
+ '^src/python/': [_PYTHON_TEST_SUITE],
+ '^src/ruby/': [_RUBY_TEST_SUITE],
+ '^templates/': [],
+ '^test/core/': [_CORE_TEST_SUITE, _CPP_TEST_SUITE],
+ '^test/cpp/': [_CPP_TEST_SUITE],
+ '^test/distrib/cpp/': [_CPP_TEST_SUITE],
+ '^test/distrib/csharp/': [_CSHARP_TEST_SUITE],
+ '^test/distrib/php/': [_PHP_TEST_SUITE],
+ '^test/distrib/python/': [_PYTHON_TEST_SUITE],
+ '^test/distrib/ruby/': [_RUBY_TEST_SUITE],
+ '^vsprojects/': [_WINDOWS_TEST_SUITE],
+ 'composer\.json$': [_PHP_TEST_SUITE],
+ 'config\.m4$': [_PHP_TEST_SUITE],
+ 'CONTRIBUTING\.md$': [],
+ 'Gemfile$': [_RUBY_TEST_SUITE],
+ 'grpc\.def$': [_WINDOWS_TEST_SUITE],
+ 'grpc\.gemspec$': [_RUBY_TEST_SUITE],
+ 'gRPC\.podspec$': [_OBJC_TEST_SUITE],
+ 'gRPC\-Core\.podspec$': [_OBJC_TEST_SUITE],
+ 'gRPC\-ProtoRPC\.podspec$': [_OBJC_TEST_SUITE],
+ 'gRPC\-RxLibrary\.podspec$': [_OBJC_TEST_SUITE],
+ 'INSTALL\.md$': [],
+ 'LICENSE$': [],
+ 'MANIFEST\.md$': [],
+ 'package\.json$': [_PHP_TEST_SUITE],
+ 'package\.xml$': [_PHP_TEST_SUITE],
+ 'PATENTS$': [],
+ 'PYTHON\-MANIFEST\.in$': [_PYTHON_TEST_SUITE],
+ 'README\.md$': [],
+ 'requirements\.txt$': [_PYTHON_TEST_SUITE],
+ 'setup\.cfg$': [_PYTHON_TEST_SUITE],
+ 'setup\.py$': [_PYTHON_TEST_SUITE]
}
# Regex that combines all keys in _WHITELIST_DICT
@@ -113,83 +114,88 @@ _ALL_TRIGGERS = "(" + ")|(".join(_WHITELIST_DICT.keys()) + ")"
# Add all triggers to their respective test suites
for trigger, test_suites in six.iteritems(_WHITELIST_DICT):
- for test_suite in test_suites:
- test_suite.add_trigger(trigger)
+ for test_suite in test_suites:
+ test_suite.add_trigger(trigger)
def _get_changed_files(base_branch):
- """
+ """
Get list of changed files between current branch and base of target merge branch
"""
- # Get file changes between branch and merge-base of specified branch
- # Not combined to be Windows friendly
- base_commit = check_output(["git", "merge-base", base_branch, "HEAD"]).rstrip()
- return check_output(["git", "diff", base_commit, "--name-only", "HEAD"]).splitlines()
+ # Get file changes between branch and merge-base of specified branch
+ # Not combined to be Windows friendly
+ base_commit = check_output(
+ ["git", "merge-base", base_branch, "HEAD"]).rstrip()
+ return check_output(
+ ["git", "diff", base_commit, "--name-only", "HEAD"]).splitlines()
def _can_skip_tests(file_names, triggers):
- """
+ """
Determines if tests are skippable based on if all files do not match list of regexes
:param file_names: list of changed files generated by _get_changed_files()
:param triggers: list of regexes matching file name that indicates tests should be run
:return: safe to skip tests
"""
- for file_name in file_names:
- if any(re.match(trigger, file_name) for trigger in triggers):
- return False
- return True
+ for file_name in file_names:
+ if any(re.match(trigger, file_name) for trigger in triggers):
+ return False
+ return True
def _remove_irrelevant_tests(tests, skippable_labels):
- """
+ """
Filters out tests by config or language - will not remove sanitizer tests
:param tests: list of all tests generated by run_tests_matrix.py
:param skippable_labels: list of languages and platforms with skippable tests
:return: list of relevant tests
"""
- # test.labels[0] is platform and test.labels[2] is language
- # We skip a test if both are considered safe to skip
- return [test for test in tests if test.labels[0] not in skippable_labels or \
- test.labels[2] not in skippable_labels]
+ # test.labels[0] is platform and test.labels[2] is language
+ # We skip a test if both are considered safe to skip
+ return [test for test in tests if test.labels[0] not in skippable_labels or \
+ test.labels[2] not in skippable_labels]
def affects_c_cpp(base_branch):
- """
+ """
Determines if a pull request's changes affect C/C++. This function exists because
there are pull request tests that only test C/C++ code
:param base_branch: branch that a pull request is requesting to merge into
:return: boolean indicating whether C/C++ changes are made in pull request
"""
- changed_files = _get_changed_files(base_branch)
- # Run all tests if any changed file is not in the whitelist dictionary
- for changed_file in changed_files:
- if not re.match(_ALL_TRIGGERS, changed_file):
- return True
- return not _can_skip_tests(changed_files, _CPP_TEST_SUITE.triggers + _CORE_TEST_SUITE.triggers)
+ changed_files = _get_changed_files(base_branch)
+ # Run all tests if any changed file is not in the whitelist dictionary
+ for changed_file in changed_files:
+ if not re.match(_ALL_TRIGGERS, changed_file):
+ return True
+ return not _can_skip_tests(
+ changed_files, _CPP_TEST_SUITE.triggers + _CORE_TEST_SUITE.triggers)
def filter_tests(tests, base_branch):
- """
+ """
Filters out tests that are safe to ignore
:param tests: list of all tests generated by run_tests_matrix.py
:return: list of relevant tests
"""
- print('Finding file differences between gRPC %s branch and pull request...\n' % base_branch)
- changed_files = _get_changed_files(base_branch)
- for changed_file in changed_files:
- print(' %s' % changed_file)
- print('')
-
- # Run all tests if any changed file is not in the whitelist dictionary
- for changed_file in changed_files:
- if not re.match(_ALL_TRIGGERS, changed_file):
- return(tests)
- # Figure out which language and platform tests to run
- skippable_labels = []
- for test_suite in _ALL_TEST_SUITES:
- if _can_skip_tests(changed_files, test_suite.triggers):
- for label in test_suite.labels:
- print(' %s tests safe to skip' % label)
- skippable_labels.append(label)
- tests = _remove_irrelevant_tests(tests, skippable_labels)
- return tests
+ print(
+ 'Finding file differences between gRPC %s branch and pull request...\n'
+ % base_branch)
+ changed_files = _get_changed_files(base_branch)
+ for changed_file in changed_files:
+ print(' %s' % changed_file)
+ print('')
+
+ # Run all tests if any changed file is not in the whitelist dictionary
+ for changed_file in changed_files:
+ if not re.match(_ALL_TRIGGERS, changed_file):
+ return (tests)
+ # Figure out which language and platform tests to run
+ skippable_labels = []
+ for test_suite in _ALL_TEST_SUITES:
+ if _can_skip_tests(changed_files, test_suite.triggers):
+ for label in test_suite.labels:
+ print(' %s tests safe to skip' % label)
+ skippable_labels.append(label)
+ tests = _remove_irrelevant_tests(tests, skippable_labels)
+ return tests
diff --git a/tools/run_tests/python_utils/jobset.py b/tools/run_tests/python_utils/jobset.py
index 85eef444ef..424b4404a8 100755
--- a/tools/run_tests/python_utils/jobset.py
+++ b/tools/run_tests/python_utils/jobset.py
@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
"""Run a group of subprocesses and then finish."""
from __future__ import print_function
@@ -28,11 +27,9 @@ import tempfile
import time
import errno
-
# cpu cost measurement
measure_cpu_costs = False
-
_DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count()
_MAX_RESULT_SIZE = 8192
@@ -42,63 +39,60 @@ _MAX_RESULT_SIZE = 8192
# characters to the PR description, which leak into the environment here
# and cause failures.
def strip_non_ascii_chars(s):
- return ''.join(c for c in s if ord(c) < 128)
+ return ''.join(c for c in s if ord(c) < 128)
def sanitized_environment(env):
- sanitized = {}
- for key, value in env.items():
- sanitized[strip_non_ascii_chars(key)] = strip_non_ascii_chars(value)
- return sanitized
+ sanitized = {}
+ for key, value in env.items():
+ sanitized[strip_non_ascii_chars(key)] = strip_non_ascii_chars(value)
+ return sanitized
def platform_string():
- if platform.system() == 'Windows':
- return 'windows'
- elif platform.system()[:7] == 'MSYS_NT':
- return 'windows'
- elif platform.system() == 'Darwin':
- return 'mac'
- elif platform.system() == 'Linux':
- return 'linux'
- else:
- return 'posix'
+ if platform.system() == 'Windows':
+ return 'windows'
+ elif platform.system()[:7] == 'MSYS_NT':
+ return 'windows'
+ elif platform.system() == 'Darwin':
+ return 'mac'
+ elif platform.system() == 'Linux':
+ return 'linux'
+ else:
+ return 'posix'
# setup a signal handler so that signal.pause registers 'something'
# when a child finishes
# not using futures and threading to avoid a dependency on subprocess32
if platform_string() == 'windows':
- pass
-else:
- def alarm_handler(unused_signum, unused_frame):
pass
+else:
- signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None)
- signal.signal(signal.SIGALRM, alarm_handler)
+ def alarm_handler(unused_signum, unused_frame):
+ pass
+ signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None)
+ signal.signal(signal.SIGALRM, alarm_handler)
_SUCCESS = object()
_FAILURE = object()
_RUNNING = object()
_KILLED = object()
-
_COLORS = {
- 'red': [ 31, 0 ],
- 'green': [ 32, 0 ],
- 'yellow': [ 33, 0 ],
- 'lightgray': [ 37, 0],
- 'gray': [ 30, 1 ],
- 'purple': [ 35, 0 ],
- 'cyan': [ 36, 0 ]
- }
-
+ 'red': [31, 0],
+ 'green': [32, 0],
+ 'yellow': [33, 0],
+ 'lightgray': [37, 0],
+ 'gray': [30, 1],
+ 'purple': [35, 0],
+ 'cyan': [36, 0]
+}
_BEGINNING_OF_LINE = '\x1b[0G'
_CLEAR_LINE = '\x1b[2K'
-
_TAG_COLOR = {
'FAILED': 'red',
'FLAKE': 'purple',
@@ -111,392 +105,436 @@ _TAG_COLOR = {
'SUCCESS': 'green',
'IDLE': 'gray',
'SKIPPED': 'cyan'
- }
+}
_FORMAT = '%(asctime)-15s %(message)s'
logging.basicConfig(level=logging.INFO, format=_FORMAT)
def eintr_be_gone(fn):
- """Run fn until it doesn't stop because of EINTR"""
- while True:
- try:
- return fn()
- except IOError, e:
- if e.errno != errno.EINTR:
- raise
-
+ """Run fn until it doesn't stop because of EINTR"""
+ while True:
+ try:
+ return fn()
+ except IOError, e:
+ if e.errno != errno.EINTR:
+ raise
def message(tag, msg, explanatory_text=None, do_newline=False):
- if message.old_tag == tag and message.old_msg == msg and not explanatory_text:
- return
- message.old_tag = tag
- message.old_msg = msg
- while True:
- try:
- if platform_string() == 'windows' or not sys.stdout.isatty():
- if explanatory_text:
- logging.info(explanatory_text)
- logging.info('%s: %s', tag, msg)
- else:
- sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % (
- _BEGINNING_OF_LINE,
- _CLEAR_LINE,
- '\n%s' % explanatory_text if explanatory_text is not None else '',
- _COLORS[_TAG_COLOR[tag]][1],
- _COLORS[_TAG_COLOR[tag]][0],
- tag,
- msg,
- '\n' if do_newline or explanatory_text is not None else ''))
- sys.stdout.flush()
- return
- except IOError, e:
- if e.errno != errno.EINTR:
- raise
+ if message.old_tag == tag and message.old_msg == msg and not explanatory_text:
+ return
+ message.old_tag = tag
+ message.old_msg = msg
+ while True:
+ try:
+ if platform_string() == 'windows' or not sys.stdout.isatty():
+ if explanatory_text:
+ logging.info(explanatory_text)
+ logging.info('%s: %s', tag, msg)
+ else:
+ sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % (
+ _BEGINNING_OF_LINE, _CLEAR_LINE, '\n%s' % explanatory_text
+ if explanatory_text is not None else '',
+ _COLORS[_TAG_COLOR[tag]][1], _COLORS[_TAG_COLOR[tag]][0],
+ tag, msg, '\n'
+ if do_newline or explanatory_text is not None else ''))
+ sys.stdout.flush()
+ return
+ except IOError, e:
+ if e.errno != errno.EINTR:
+ raise
+
message.old_tag = ''
message.old_msg = ''
+
def which(filename):
- if '/' in filename:
- return filename
- for path in os.environ['PATH'].split(os.pathsep):
- if os.path.exists(os.path.join(path, filename)):
- return os.path.join(path, filename)
- raise Exception('%s not found' % filename)
+ if '/' in filename:
+ return filename
+ for path in os.environ['PATH'].split(os.pathsep):
+ if os.path.exists(os.path.join(path, filename)):
+ return os.path.join(path, filename)
+ raise Exception('%s not found' % filename)
class JobSpec(object):
- """Specifies what to run for a job."""
-
- def __init__(self, cmdline, shortname=None, environ=None,
- cwd=None, shell=False, timeout_seconds=5*60, flake_retries=0,
- timeout_retries=0, kill_handler=None, cpu_cost=1.0,
- verbose_success=False):
- """
+ """Specifies what to run for a job."""
+
+ def __init__(self,
+ cmdline,
+ shortname=None,
+ environ=None,
+ cwd=None,
+ shell=False,
+ timeout_seconds=5 * 60,
+ flake_retries=0,
+ timeout_retries=0,
+ kill_handler=None,
+ cpu_cost=1.0,
+ verbose_success=False):
+ """
Arguments:
cmdline: a list of arguments to pass as the command line
environ: a dictionary of environment variables to set in the child process
kill_handler: a handler that will be called whenever job.kill() is invoked
cpu_cost: number of cores per second this job needs
"""
- if environ is None:
- environ = {}
- self.cmdline = cmdline
- self.environ = environ
- self.shortname = cmdline[0] if shortname is None else shortname
- self.cwd = cwd
- self.shell = shell
- self.timeout_seconds = timeout_seconds
- self.flake_retries = flake_retries
- self.timeout_retries = timeout_retries
- self.kill_handler = kill_handler
- self.cpu_cost = cpu_cost
- self.verbose_success = verbose_success
-
- def identity(self):
- return '%r %r' % (self.cmdline, self.environ)
-
- def __hash__(self):
- return hash(self.identity())
-
- def __cmp__(self, other):
- return self.identity() == other.identity()
-
- def __repr__(self):
- return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname, self.cmdline)
-
- def __str__(self):
- return '%s: %s %s' % (self.shortname,
- ' '.join('%s=%s' % kv for kv in self.environ.items()),
- ' '.join(self.cmdline))
+ if environ is None:
+ environ = {}
+ self.cmdline = cmdline
+ self.environ = environ
+ self.shortname = cmdline[0] if shortname is None else shortname
+ self.cwd = cwd
+ self.shell = shell
+ self.timeout_seconds = timeout_seconds
+ self.flake_retries = flake_retries
+ self.timeout_retries = timeout_retries
+ self.kill_handler = kill_handler
+ self.cpu_cost = cpu_cost
+ self.verbose_success = verbose_success
+
+ def identity(self):
+ return '%r %r' % (self.cmdline, self.environ)
+
+ def __hash__(self):
+ return hash(self.identity())
+
+ def __cmp__(self, other):
+ return self.identity() == other.identity()
+
+ def __repr__(self):
+ return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname,
+ self.cmdline)
+
+ def __str__(self):
+ return '%s: %s %s' % (self.shortname,
+ ' '.join('%s=%s' % kv
+ for kv in self.environ.items()),
+ ' '.join(self.cmdline))
class JobResult(object):
- def __init__(self):
- self.state = 'UNKNOWN'
- self.returncode = -1
- self.elapsed_time = 0
- self.num_failures = 0
- self.retries = 0
- self.message = ''
- self.cpu_estimated = 1
- self.cpu_measured = 1
+
+ def __init__(self):
+ self.state = 'UNKNOWN'
+ self.returncode = -1
+ self.elapsed_time = 0
+ self.num_failures = 0
+ self.retries = 0
+ self.message = ''
+ self.cpu_estimated = 1
+ self.cpu_measured = 1
def read_from_start(f):
- f.seek(0)
- return f.read()
+ f.seek(0)
+ return f.read()
class Job(object):
- """Manages one job."""
-
- def __init__(self, spec, newline_on_success, travis, add_env,
- quiet_success=False):
- self._spec = spec
- self._newline_on_success = newline_on_success
- self._travis = travis
- self._add_env = add_env.copy()
- self._retries = 0
- self._timeout_retries = 0
- self._suppress_failure_message = False
- self._quiet_success = quiet_success
- if not self._quiet_success:
- message('START', spec.shortname, do_newline=self._travis)
- self.result = JobResult()
- self.start()
-
- def GetSpec(self):
- return self._spec
-
- def start(self):
- self._tempfile = tempfile.TemporaryFile()
- env = dict(os.environ)
- env.update(self._spec.environ)
- env.update(self._add_env)
- env = sanitized_environment(env)
- self._start = time.time()
- cmdline = self._spec.cmdline
- # The Unix time command is finicky when used with MSBuild, so we don't use it
- # with jobs that run MSBuild.
- global measure_cpu_costs
- if measure_cpu_costs and not 'vsprojects\\build' in cmdline[0]:
- cmdline = ['time', '-p'] + cmdline
- else:
- measure_cpu_costs = False
- try_start = lambda: subprocess.Popen(args=cmdline,
- stderr=subprocess.STDOUT,
- stdout=self._tempfile,
- cwd=self._spec.cwd,
- shell=self._spec.shell,
- env=env)
- delay = 0.3
- for i in range(0, 4):
- try:
- self._process = try_start()
- break
- except OSError:
- message('WARNING', 'Failed to start %s, retrying in %f seconds' % (self._spec.shortname, delay))
- time.sleep(delay)
- delay *= 2
- else:
- self._process = try_start()
- self._state = _RUNNING
-
- def state(self):
- """Poll current state of the job. Prints messages at completion."""
- def stdout(self=self):
- stdout = read_from_start(self._tempfile)
- self.result.message = stdout[-_MAX_RESULT_SIZE:]
- return stdout
- if self._state == _RUNNING and self._process.poll() is not None:
- elapsed = time.time() - self._start
- self.result.elapsed_time = elapsed
- if self._process.returncode != 0:
- if self._retries < self._spec.flake_retries:
- message('FLAKE', '%s [ret=%d, pid=%d]' % (
- self._spec.shortname, self._process.returncode, self._process.pid),
- stdout(), do_newline=True)
- self._retries += 1
- self.result.num_failures += 1
- self.result.retries = self._timeout_retries + self._retries
- # NOTE: job is restarted regardless of jobset's max_time setting
- self.start()
- else:
- self._state = _FAILURE
- if not self._suppress_failure_message:
- message('FAILED', '%s [ret=%d, pid=%d, time=%.1fsec]' % (
- self._spec.shortname, self._process.returncode, self._process.pid, elapsed),
- stdout(), do_newline=True)
- self.result.state = 'FAILED'
- self.result.num_failures += 1
- self.result.returncode = self._process.returncode
- else:
- self._state = _SUCCESS
- measurement = ''
- if measure_cpu_costs:
- m = re.search(r'real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)', stdout())
- real = float(m.group(1))
- user = float(m.group(2))
- sys = float(m.group(3))
- if real > 0.5:
- cores = (user + sys) / real
- self.result.cpu_measured = float('%.01f' % cores)
- self.result.cpu_estimated = float('%.01f' % self._spec.cpu_cost)
- measurement = '; cpu_cost=%.01f; estimated=%.01f' % (self.result.cpu_measured, self.result.cpu_estimated)
+ """Manages one job."""
+
+ def __init__(self,
+ spec,
+ newline_on_success,
+ travis,
+ add_env,
+ quiet_success=False):
+ self._spec = spec
+ self._newline_on_success = newline_on_success
+ self._travis = travis
+ self._add_env = add_env.copy()
+ self._retries = 0
+ self._timeout_retries = 0
+ self._suppress_failure_message = False
+ self._quiet_success = quiet_success
if not self._quiet_success:
- message('PASSED', '%s [time=%.1fsec, retries=%d:%d%s]' % (
- self._spec.shortname, elapsed, self._retries, self._timeout_retries, measurement),
- stdout() if self._spec.verbose_success else None,
- do_newline=self._newline_on_success or self._travis)
- self.result.state = 'PASSED'
- elif (self._state == _RUNNING and
- self._spec.timeout_seconds is not None and
- time.time() - self._start > self._spec.timeout_seconds):
- elapsed = time.time() - self._start
- self.result.elapsed_time = elapsed
- if self._timeout_retries < self._spec.timeout_retries:
- message('TIMEOUT_FLAKE', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True)
- self._timeout_retries += 1
- self.result.num_failures += 1
- self.result.retries = self._timeout_retries + self._retries
- if self._spec.kill_handler:
- self._spec.kill_handler(self)
- self._process.terminate()
- # NOTE: job is restarted regardless of jobset's max_time setting
+ message('START', spec.shortname, do_newline=self._travis)
+ self.result = JobResult()
self.start()
- else:
- message('TIMEOUT', '%s [pid=%d, time=%.1fsec]' % (self._spec.shortname, self._process.pid, elapsed), stdout(), do_newline=True)
- self.kill()
- self.result.state = 'TIMEOUT'
- self.result.num_failures += 1
- return self._state
- def kill(self):
- if self._state == _RUNNING:
- self._state = _KILLED
- if self._spec.kill_handler:
- self._spec.kill_handler(self)
- self._process.terminate()
-
- def suppress_failure_message(self):
- self._suppress_failure_message = True
+ def GetSpec(self):
+ return self._spec
+
+ def start(self):
+ self._tempfile = tempfile.TemporaryFile()
+ env = dict(os.environ)
+ env.update(self._spec.environ)
+ env.update(self._add_env)
+ env = sanitized_environment(env)
+ self._start = time.time()
+ cmdline = self._spec.cmdline
+ # The Unix time command is finicky when used with MSBuild, so we don't use it
+ # with jobs that run MSBuild.
+ global measure_cpu_costs
+ if measure_cpu_costs and not 'vsprojects\\build' in cmdline[0]:
+ cmdline = ['time', '-p'] + cmdline
+ else:
+ measure_cpu_costs = False
+ try_start = lambda: subprocess.Popen(args=cmdline,
+ stderr=subprocess.STDOUT,
+ stdout=self._tempfile,
+ cwd=self._spec.cwd,
+ shell=self._spec.shell,
+ env=env)
+ delay = 0.3
+ for i in range(0, 4):
+ try:
+ self._process = try_start()
+ break
+ except OSError:
+ message('WARNING', 'Failed to start %s, retrying in %f seconds'
+ % (self._spec.shortname, delay))
+ time.sleep(delay)
+ delay *= 2
+ else:
+ self._process = try_start()
+ self._state = _RUNNING
+
+ def state(self):
+ """Poll current state of the job. Prints messages at completion."""
+
+ def stdout(self=self):
+ stdout = read_from_start(self._tempfile)
+ self.result.message = stdout[-_MAX_RESULT_SIZE:]
+ return stdout
+
+ if self._state == _RUNNING and self._process.poll() is not None:
+ elapsed = time.time() - self._start
+ self.result.elapsed_time = elapsed
+ if self._process.returncode != 0:
+ if self._retries < self._spec.flake_retries:
+ message(
+ 'FLAKE',
+ '%s [ret=%d, pid=%d]' %
+ (self._spec.shortname, self._process.returncode,
+ self._process.pid),
+ stdout(),
+ do_newline=True)
+ self._retries += 1
+ self.result.num_failures += 1
+ self.result.retries = self._timeout_retries + self._retries
+ # NOTE: job is restarted regardless of jobset's max_time setting
+ self.start()
+ else:
+ self._state = _FAILURE
+ if not self._suppress_failure_message:
+ message(
+ 'FAILED',
+ '%s [ret=%d, pid=%d, time=%.1fsec]' %
+ (self._spec.shortname, self._process.returncode,
+ self._process.pid, elapsed),
+ stdout(),
+ do_newline=True)
+ self.result.state = 'FAILED'
+ self.result.num_failures += 1
+ self.result.returncode = self._process.returncode
+ else:
+ self._state = _SUCCESS
+ measurement = ''
+ if measure_cpu_costs:
+ m = re.search(
+ r'real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)',
+ stdout())
+ real = float(m.group(1))
+ user = float(m.group(2))
+ sys = float(m.group(3))
+ if real > 0.5:
+ cores = (user + sys) / real
+ self.result.cpu_measured = float('%.01f' % cores)
+ self.result.cpu_estimated = float('%.01f' %
+ self._spec.cpu_cost)
+ measurement = '; cpu_cost=%.01f; estimated=%.01f' % (
+ self.result.cpu_measured, self.result.cpu_estimated)
+ if not self._quiet_success:
+ message(
+ 'PASSED',
+ '%s [time=%.1fsec, retries=%d:%d%s]' %
+ (self._spec.shortname, elapsed, self._retries,
+ self._timeout_retries, measurement),
+ stdout() if self._spec.verbose_success else None,
+ do_newline=self._newline_on_success or self._travis)
+ self.result.state = 'PASSED'
+ elif (self._state == _RUNNING and
+ self._spec.timeout_seconds is not None and
+ time.time() - self._start > self._spec.timeout_seconds):
+ elapsed = time.time() - self._start
+ self.result.elapsed_time = elapsed
+ if self._timeout_retries < self._spec.timeout_retries:
+ message(
+ 'TIMEOUT_FLAKE',
+ '%s [pid=%d]' % (self._spec.shortname, self._process.pid),
+ stdout(),
+ do_newline=True)
+ self._timeout_retries += 1
+ self.result.num_failures += 1
+ self.result.retries = self._timeout_retries + self._retries
+ if self._spec.kill_handler:
+ self._spec.kill_handler(self)
+ self._process.terminate()
+ # NOTE: job is restarted regardless of jobset's max_time setting
+ self.start()
+ else:
+ message(
+ 'TIMEOUT',
+ '%s [pid=%d, time=%.1fsec]' %
+ (self._spec.shortname, self._process.pid, elapsed),
+ stdout(),
+ do_newline=True)
+ self.kill()
+ self.result.state = 'TIMEOUT'
+ self.result.num_failures += 1
+ return self._state
+
+ def kill(self):
+ if self._state == _RUNNING:
+ self._state = _KILLED
+ if self._spec.kill_handler:
+ self._spec.kill_handler(self)
+ self._process.terminate()
+
+ def suppress_failure_message(self):
+ self._suppress_failure_message = True
class Jobset(object):
- """Manages one run of jobs."""
-
- def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic, newline_on_success, travis,
- stop_on_failure, add_env, quiet_success, max_time):
- self._running = set()
- self._check_cancelled = check_cancelled
- self._cancelled = False
- self._failures = 0
- self._completed = 0
- self._maxjobs = maxjobs
- self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic
- self._newline_on_success = newline_on_success
- self._travis = travis
- self._stop_on_failure = stop_on_failure
- self._add_env = add_env
- self._quiet_success = quiet_success
- self._max_time = max_time
- self.resultset = {}
- self._remaining = None
- self._start_time = time.time()
-
- def set_remaining(self, remaining):
- self._remaining = remaining
-
- def get_num_failures(self):
- return self._failures
-
- def cpu_cost(self):
- c = 0
- for job in self._running:
- c += job._spec.cpu_cost
- return c
-
- def start(self, spec):
- """Start a job. Return True on success, False on failure."""
- while True:
- if self._max_time > 0 and time.time() - self._start_time > self._max_time:
- skipped_job_result = JobResult()
- skipped_job_result.state = 'SKIPPED'
- message('SKIPPED', spec.shortname, do_newline=True)
- self.resultset[spec.shortname] = [skipped_job_result]
+ """Manages one run of jobs."""
+
+ def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic,
+ newline_on_success, travis, stop_on_failure, add_env,
+ quiet_success, max_time):
+ self._running = set()
+ self._check_cancelled = check_cancelled
+ self._cancelled = False
+ self._failures = 0
+ self._completed = 0
+ self._maxjobs = maxjobs
+ self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic
+ self._newline_on_success = newline_on_success
+ self._travis = travis
+ self._stop_on_failure = stop_on_failure
+ self._add_env = add_env
+ self._quiet_success = quiet_success
+ self._max_time = max_time
+ self.resultset = {}
+ self._remaining = None
+ self._start_time = time.time()
+
+ def set_remaining(self, remaining):
+ self._remaining = remaining
+
+ def get_num_failures(self):
+ return self._failures
+
+ def cpu_cost(self):
+ c = 0
+ for job in self._running:
+ c += job._spec.cpu_cost
+ return c
+
+ def start(self, spec):
+ """Start a job. Return True on success, False on failure."""
+ while True:
+ if self._max_time > 0 and time.time(
+ ) - self._start_time > self._max_time:
+ skipped_job_result = JobResult()
+ skipped_job_result.state = 'SKIPPED'
+ message('SKIPPED', spec.shortname, do_newline=True)
+ self.resultset[spec.shortname] = [skipped_job_result]
+ return True
+ if self.cancelled(): return False
+ current_cpu_cost = self.cpu_cost()
+ if current_cpu_cost == 0: break
+ if current_cpu_cost + spec.cpu_cost <= self._maxjobs:
+ if len(self._running) < self._maxjobs_cpu_agnostic:
+ break
+ self.reap(spec.shortname, spec.cpu_cost)
+ if self.cancelled(): return False
+ job = Job(spec, self._newline_on_success, self._travis, self._add_env,
+ self._quiet_success)
+ self._running.add(job)
+ if job.GetSpec().shortname not in self.resultset:
+ self.resultset[job.GetSpec().shortname] = []
return True
- if self.cancelled(): return False
- current_cpu_cost = self.cpu_cost()
- if current_cpu_cost == 0: break
- if current_cpu_cost + spec.cpu_cost <= self._maxjobs:
- if len(self._running) < self._maxjobs_cpu_agnostic:
- break
- self.reap(spec.shortname, spec.cpu_cost)
- if self.cancelled(): return False
- job = Job(spec,
- self._newline_on_success,
- self._travis,
- self._add_env,
- self._quiet_success)
- self._running.add(job)
- if job.GetSpec().shortname not in self.resultset:
- self.resultset[job.GetSpec().shortname] = []
- return True
-
- def reap(self, waiting_for=None, waiting_for_cost=None):
- """Collect the dead jobs."""
- while self._running:
- dead = set()
- for job in self._running:
- st = eintr_be_gone(lambda: job.state())
- if st == _RUNNING: continue
- if st == _FAILURE or st == _KILLED:
- self._failures += 1
- if self._stop_on_failure:
- self._cancelled = True
+
+ def reap(self, waiting_for=None, waiting_for_cost=None):
+ """Collect the dead jobs."""
+ while self._running:
+ dead = set()
for job in self._running:
- job.kill()
- dead.add(job)
- break
- for job in dead:
- self._completed += 1
- if not self._quiet_success or job.result.state != 'PASSED':
- self.resultset[job.GetSpec().shortname].append(job.result)
- self._running.remove(job)
- if dead: return
- if not self._travis and platform_string() != 'windows':
- rstr = '' if self._remaining is None else '%d queued, ' % self._remaining
- if self._remaining is not None and self._completed > 0:
- now = time.time()
- sofar = now - self._start_time
- remaining = sofar / self._completed * (self._remaining + len(self._running))
- rstr = 'ETA %.1f sec; %s' % (remaining, rstr)
- if waiting_for is not None:
- wstr = ' next: %s @ %.2f cpu' % (waiting_for, waiting_for_cost)
- else:
- wstr = ''
- message('WAITING', '%s%d jobs running, %d complete, %d failed (load %.2f)%s' % (
- rstr, len(self._running), self._completed, self._failures, self.cpu_cost(), wstr))
- if platform_string() == 'windows':
- time.sleep(0.1)
- else:
- signal.alarm(10)
- signal.pause()
-
- def cancelled(self):
- """Poll for cancellation."""
- if self._cancelled: return True
- if not self._check_cancelled(): return False
- for job in self._running:
- job.kill()
- self._cancelled = True
- return True
-
- def finish(self):
- while self._running:
- if self.cancelled(): pass # poll cancellation
- self.reap()
- if platform_string() != 'windows':
- signal.alarm(0)
- return not self.cancelled() and self._failures == 0
+ st = eintr_be_gone(lambda: job.state())
+ if st == _RUNNING: continue
+ if st == _FAILURE or st == _KILLED:
+ self._failures += 1
+ if self._stop_on_failure:
+ self._cancelled = True
+ for job in self._running:
+ job.kill()
+ dead.add(job)
+ break
+ for job in dead:
+ self._completed += 1
+ if not self._quiet_success or job.result.state != 'PASSED':
+ self.resultset[job.GetSpec().shortname].append(job.result)
+ self._running.remove(job)
+ if dead: return
+ if not self._travis and platform_string() != 'windows':
+ rstr = '' if self._remaining is None else '%d queued, ' % self._remaining
+ if self._remaining is not None and self._completed > 0:
+ now = time.time()
+ sofar = now - self._start_time
+ remaining = sofar / self._completed * (
+ self._remaining + len(self._running))
+ rstr = 'ETA %.1f sec; %s' % (remaining, rstr)
+ if waiting_for is not None:
+ wstr = ' next: %s @ %.2f cpu' % (waiting_for,
+ waiting_for_cost)
+ else:
+ wstr = ''
+ message(
+ 'WAITING',
+ '%s%d jobs running, %d complete, %d failed (load %.2f)%s' %
+ (rstr, len(self._running), self._completed, self._failures,
+ self.cpu_cost(), wstr))
+ if platform_string() == 'windows':
+ time.sleep(0.1)
+ else:
+ signal.alarm(10)
+ signal.pause()
+
+ def cancelled(self):
+ """Poll for cancellation."""
+ if self._cancelled: return True
+ if not self._check_cancelled(): return False
+ for job in self._running:
+ job.kill()
+ self._cancelled = True
+ return True
+
+ def finish(self):
+ while self._running:
+ if self.cancelled(): pass # poll cancellation
+ self.reap()
+ if platform_string() != 'windows':
+ signal.alarm(0)
+ return not self.cancelled() and self._failures == 0
def _never_cancelled():
- return False
+ return False
def tag_remaining(xs):
- staging = []
- for x in xs:
- staging.append(x)
- if len(staging) > 5000:
- yield (staging.pop(0), None)
- n = len(staging)
- for i, x in enumerate(staging):
- yield (x, n - i - 1)
+ staging = []
+ for x in xs:
+ staging.append(x)
+ if len(staging) > 5000:
+ yield (staging.pop(0), None)
+ n = len(staging)
+ for i, x in enumerate(staging):
+ yield (x, n - i - 1)
def run(cmdlines,
@@ -511,23 +549,23 @@ def run(cmdlines,
skip_jobs=False,
quiet_success=False,
max_time=-1):
- if skip_jobs:
- resultset = {}
- skipped_job_result = JobResult()
- skipped_job_result.state = 'SKIPPED'
- for job in cmdlines:
- message('SKIPPED', job.shortname, do_newline=True)
- resultset[job.shortname] = [skipped_job_result]
- return 0, resultset
- js = Jobset(check_cancelled,
- maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS,
- maxjobs_cpu_agnostic if maxjobs_cpu_agnostic is not None else _DEFAULT_MAX_JOBS,
- newline_on_success, travis, stop_on_failure, add_env,
- quiet_success, max_time)
- for cmdline, remaining in tag_remaining(cmdlines):
- if not js.start(cmdline):
- break
- if remaining is not None:
- js.set_remaining(remaining)
- js.finish()
- return js.get_num_failures(), js.resultset
+ if skip_jobs:
+ resultset = {}
+ skipped_job_result = JobResult()
+ skipped_job_result.state = 'SKIPPED'
+ for job in cmdlines:
+ message('SKIPPED', job.shortname, do_newline=True)
+ resultset[job.shortname] = [skipped_job_result]
+ return 0, resultset
+ js = Jobset(check_cancelled, maxjobs if maxjobs is not None else
+ _DEFAULT_MAX_JOBS, maxjobs_cpu_agnostic
+ if maxjobs_cpu_agnostic is not None else _DEFAULT_MAX_JOBS,
+ newline_on_success, travis, stop_on_failure, add_env,
+ quiet_success, max_time)
+ for cmdline, remaining in tag_remaining(cmdlines):
+ if not js.start(cmdline):
+ break
+ if remaining is not None:
+ js.set_remaining(remaining)
+ js.finish()
+ return js.get_num_failures(), js.resultset
diff --git a/tools/run_tests/python_utils/port_server.py b/tools/run_tests/python_utils/port_server.py
index e8ac71af8d..15f55f46dd 100755
--- a/tools/run_tests/python_utils/port_server.py
+++ b/tools/run_tests/python_utils/port_server.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
"""Manage TCP ports for unit tests; started by run_tests.py"""
import argparse
@@ -27,17 +26,14 @@ from SocketServer import ThreadingMixIn
import threading
import platform
-
# increment this number whenever making a change to ensure that
# the changes are picked up by running CI servers
# note that all changes must be backwards compatible
_MY_VERSION = 20
-
if len(sys.argv) == 2 and sys.argv[1] == 'dump_version':
- print _MY_VERSION
- sys.exit(0)
-
+ print _MY_VERSION
+ sys.exit(0)
argp = argparse.ArgumentParser(description='Server for httpcli_test')
argp.add_argument('-p', '--port', default=12345, type=int)
@@ -45,11 +41,11 @@ argp.add_argument('-l', '--logfile', default=None, type=str)
args = argp.parse_args()
if args.logfile is not None:
- sys.stdin.close()
- sys.stderr.close()
- sys.stdout.close()
- sys.stderr = open(args.logfile, 'w')
- sys.stdout = sys.stderr
+ sys.stdin.close()
+ sys.stderr.close()
+ sys.stdout.close()
+ sys.stderr = open(args.logfile, 'w')
+ sys.stdout = sys.stderr
print 'port server running on port %d' % args.port
@@ -61,74 +57,81 @@ mu = threading.Lock()
# https://cs.chromium.org/chromium/src/net/base/port_util.cc). When one of these
# ports is used in a Cronet test, the test would fail (see issue #12149). These
# ports must be excluded from pool.
-cronet_restricted_ports = [1, 7, 9, 11, 13, 15, 17, 19, 20, 21, 22, 23, 25, 37,
- 42, 43, 53, 77, 79, 87, 95, 101, 102, 103, 104, 109,
- 110, 111, 113, 115, 117, 119, 123, 135, 139, 143,
- 179, 389, 465, 512, 513, 514, 515, 526, 530, 531,
- 532, 540, 556, 563, 587, 601, 636, 993, 995, 2049,
- 3659, 4045, 6000, 6665, 6666, 6667, 6668, 6669, 6697]
+cronet_restricted_ports = [
+ 1, 7, 9, 11, 13, 15, 17, 19, 20, 21, 22, 23, 25, 37, 42, 43, 53, 77, 79, 87,
+ 95, 101, 102, 103, 104, 109, 110, 111, 113, 115, 117, 119, 123, 135, 139,
+ 143, 179, 389, 465, 512, 513, 514, 515, 526, 530, 531, 532, 540, 556, 563,
+ 587, 601, 636, 993, 995, 2049, 3659, 4045, 6000, 6665, 6666, 6667, 6668,
+ 6669, 6697
+]
+
def can_connect(port):
- # this test is only really useful on unices where SO_REUSE_PORT is available
- # so on Windows, where this test is expensive, skip it
- if platform.system() == 'Windows': return False
- s = socket.socket()
- try:
- s.connect(('localhost', port))
- return True
- except socket.error, e:
- return False
- finally:
- s.close()
+ # this test is only really useful on unices where SO_REUSE_PORT is available
+ # so on Windows, where this test is expensive, skip it
+ if platform.system() == 'Windows': return False
+ s = socket.socket()
+ try:
+ s.connect(('localhost', port))
+ return True
+ except socket.error, e:
+ return False
+ finally:
+ s.close()
+
def can_bind(port, proto):
- s = socket.socket(proto, socket.SOCK_STREAM)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- try:
- s.bind(('localhost', port))
- return True
- except socket.error, e:
- return False
- finally:
- s.close()
+ s = socket.socket(proto, socket.SOCK_STREAM)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ try:
+ s.bind(('localhost', port))
+ return True
+ except socket.error, e:
+ return False
+ finally:
+ s.close()
def refill_pool(max_timeout, req):
- """Scan for ports not marked for being in use"""
- chk = [port for port in list(range(1025, 32766)) if port not in cronet_restricted_ports]
- random.shuffle(chk)
- for i in chk:
- if len(pool) > 100: break
- if i in in_use:
- age = time.time() - in_use[i]
- if age < max_timeout:
- continue
- req.log_message("kill old request %d" % i)
- del in_use[i]
- if can_bind(i, socket.AF_INET) and can_bind(i, socket.AF_INET6) and not can_connect(i):
- req.log_message("found available port %d" % i)
- pool.append(i)
+ """Scan for ports not marked for being in use"""
+ chk = [
+ port for port in list(range(1025, 32766))
+ if port not in cronet_restricted_ports
+ ]
+ random.shuffle(chk)
+ for i in chk:
+ if len(pool) > 100: break
+ if i in in_use:
+ age = time.time() - in_use[i]
+ if age < max_timeout:
+ continue
+ req.log_message("kill old request %d" % i)
+ del in_use[i]
+ if can_bind(i, socket.AF_INET) and can_bind(
+ i, socket.AF_INET6) and not can_connect(i):
+ req.log_message("found available port %d" % i)
+ pool.append(i)
def allocate_port(req):
- global pool
- global in_use
- global mu
- mu.acquire()
- max_timeout = 600
- while not pool:
- refill_pool(max_timeout, req)
- if not pool:
- req.log_message("failed to find ports: retrying soon")
- mu.release()
- time.sleep(1)
- mu.acquire()
- max_timeout /= 2
- port = pool[0]
- pool = pool[1:]
- in_use[port] = time.time()
- mu.release()
- return port
+ global pool
+ global in_use
+ global mu
+ mu.acquire()
+ max_timeout = 600
+ while not pool:
+ refill_pool(max_timeout, req)
+ if not pool:
+ req.log_message("failed to find ports: retrying soon")
+ mu.release()
+ time.sleep(1)
+ mu.acquire()
+ max_timeout /= 2
+ port = pool[0]
+ pool = pool[1:]
+ in_use[port] = time.time()
+ mu.release()
+ return port
keep_running = True
@@ -136,61 +139,67 @@ keep_running = True
class Handler(BaseHTTPRequestHandler):
- def setup(self):
- # If the client is unreachable for 5 seconds, close the connection
- self.timeout = 5
- BaseHTTPRequestHandler.setup(self)
+ def setup(self):
+ # If the client is unreachable for 5 seconds, close the connection
+ self.timeout = 5
+ BaseHTTPRequestHandler.setup(self)
+
+ def do_GET(self):
+ global keep_running
+ global mu
+ if self.path == '/get':
+ # allocate a new port, it will stay bound for ten minutes and until
+ # it's unused
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/plain')
+ self.end_headers()
+ p = allocate_port(self)
+ self.log_message('allocated port %d' % p)
+ self.wfile.write('%d' % p)
+ elif self.path[0:6] == '/drop/':
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/plain')
+ self.end_headers()
+ p = int(self.path[6:])
+ mu.acquire()
+ if p in in_use:
+ del in_use[p]
+ pool.append(p)
+ k = 'known'
+ else:
+ k = 'unknown'
+ mu.release()
+ self.log_message('drop %s port %d' % (k, p))
+ elif self.path == '/version_number':
+ # fetch a version string and the current process pid
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/plain')
+ self.end_headers()
+ self.wfile.write(_MY_VERSION)
+ elif self.path == '/dump':
+ # yaml module is not installed on Macs and Windows machines by default
+ # so we import it lazily (/dump action is only used for debugging)
+ import yaml
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/plain')
+ self.end_headers()
+ mu.acquire()
+ now = time.time()
+ out = yaml.dump(
+ {
+ 'pool': pool,
+ 'in_use': dict((k, now - v) for k, v in in_use.items())
+ })
+ mu.release()
+ self.wfile.write(out)
+ elif self.path == '/quitquitquit':
+ self.send_response(200)
+ self.end_headers()
+ self.server.shutdown()
- def do_GET(self):
- global keep_running
- global mu
- if self.path == '/get':
- # allocate a new port, it will stay bound for ten minutes and until
- # it's unused
- self.send_response(200)
- self.send_header('Content-Type', 'text/plain')
- self.end_headers()
- p = allocate_port(self)
- self.log_message('allocated port %d' % p)
- self.wfile.write('%d' % p)
- elif self.path[0:6] == '/drop/':
- self.send_response(200)
- self.send_header('Content-Type', 'text/plain')
- self.end_headers()
- p = int(self.path[6:])
- mu.acquire()
- if p in in_use:
- del in_use[p]
- pool.append(p)
- k = 'known'
- else:
- k = 'unknown'
- mu.release()
- self.log_message('drop %s port %d' % (k, p))
- elif self.path == '/version_number':
- # fetch a version string and the current process pid
- self.send_response(200)
- self.send_header('Content-Type', 'text/plain')
- self.end_headers()
- self.wfile.write(_MY_VERSION)
- elif self.path == '/dump':
- # yaml module is not installed on Macs and Windows machines by default
- # so we import it lazily (/dump action is only used for debugging)
- import yaml
- self.send_response(200)
- self.send_header('Content-Type', 'text/plain')
- self.end_headers()
- mu.acquire()
- now = time.time()
- out = yaml.dump({'pool': pool, 'in_use': dict((k, now - v) for k, v in in_use.items())})
- mu.release()
- self.wfile.write(out)
- elif self.path == '/quitquitquit':
- self.send_response(200)
- self.end_headers()
- self.server.shutdown()
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
- """Handle requests in a separate thread"""
+ """Handle requests in a separate thread"""
+
ThreadedHTTPServer(('', args.port), Handler).serve_forever()
diff --git a/tools/run_tests/python_utils/report_utils.py b/tools/run_tests/python_utils/report_utils.py
index a3867808b5..e4fddb8a7d 100644
--- a/tools/run_tests/python_utils/report_utils.py
+++ b/tools/run_tests/python_utils/report_utils.py
@@ -11,17 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
"""Generate XML and HTML test reports."""
from __future__ import print_function
try:
- from mako.runtime import Context
- from mako.template import Template
- from mako import exceptions
+ from mako.runtime import Context
+ from mako.template import Template
+ from mako import exceptions
except (ImportError):
- pass # Mako not installed but it is ok.
+ pass # Mako not installed but it is ok.
import datetime
import os
import string
@@ -30,111 +29,127 @@ import six
def _filter_msg(msg, output_format):
- """Filters out nonprintable and illegal characters from the message."""
- if output_format in ['XML', 'HTML']:
- # keep whitespaces but remove formfeed and vertical tab characters
- # that make XML report unparseable.
- filtered_msg = filter(
- lambda x: x in string.printable and x != '\f' and x != '\v',
- msg.decode('UTF-8', 'ignore'))
- if output_format == 'HTML':
- filtered_msg = filtered_msg.replace('"', '&quot;')
- return filtered_msg
- else:
- return msg
+ """Filters out nonprintable and illegal characters from the message."""
+ if output_format in ['XML', 'HTML']:
+ # keep whitespaces but remove formfeed and vertical tab characters
+ # that make XML report unparseable.
+ filtered_msg = filter(
+ lambda x: x in string.printable and x != '\f' and x != '\v',
+ msg.decode('UTF-8', 'ignore'))
+ if output_format == 'HTML':
+ filtered_msg = filtered_msg.replace('"', '&quot;')
+ return filtered_msg
+ else:
+ return msg
def new_junit_xml_tree():
- return ET.ElementTree(ET.Element('testsuites'))
+ return ET.ElementTree(ET.Element('testsuites'))
+
-def render_junit_xml_report(resultset, report_file, suite_package='grpc',
+def render_junit_xml_report(resultset,
+ report_file,
+ suite_package='grpc',
suite_name='tests'):
- """Generate JUnit-like XML report."""
- tree = new_junit_xml_tree()
- append_junit_xml_results(tree, resultset, suite_package, suite_name, '1')
- create_xml_report_file(tree, report_file)
+ """Generate JUnit-like XML report."""
+ tree = new_junit_xml_tree()
+ append_junit_xml_results(tree, resultset, suite_package, suite_name, '1')
+ create_xml_report_file(tree, report_file)
+
def create_xml_report_file(tree, report_file):
- """Generate JUnit-like report file from xml tree ."""
- # ensure the report directory exists
- report_dir = os.path.dirname(os.path.abspath(report_file))
- if not os.path.exists(report_dir):
- os.makedirs(report_dir)
- tree.write(report_file, encoding='UTF-8')
+ """Generate JUnit-like report file from xml tree ."""
+ # ensure the report directory exists
+ report_dir = os.path.dirname(os.path.abspath(report_file))
+ if not os.path.exists(report_dir):
+ os.makedirs(report_dir)
+ tree.write(report_file, encoding='UTF-8')
+
def append_junit_xml_results(tree, resultset, suite_package, suite_name, id):
- """Append a JUnit-like XML report tree with test results as a new suite."""
- testsuite = ET.SubElement(tree.getroot(), 'testsuite',
- id=id, package=suite_package, name=suite_name,
- timestamp=datetime.datetime.now().isoformat())
- failure_count = 0
- error_count = 0
- for shortname, results in six.iteritems(resultset):
- for result in results:
- xml_test = ET.SubElement(testsuite, 'testcase', name=shortname)
- if result.elapsed_time:
- xml_test.set('time', str(result.elapsed_time))
- filtered_msg = _filter_msg(result.message, 'XML')
- if result.state == 'FAILED':
- ET.SubElement(xml_test, 'failure', message='Failure').text = filtered_msg
- failure_count += 1
- elif result.state == 'TIMEOUT':
- ET.SubElement(xml_test, 'error', message='Timeout').text = filtered_msg
- error_count += 1
- elif result.state == 'SKIPPED':
- ET.SubElement(xml_test, 'skipped', message='Skipped')
- testsuite.set('failures', str(failure_count))
- testsuite.set('errors', str(error_count))
-
-def render_interop_html_report(
- client_langs, server_langs, test_cases, auth_test_cases, http2_cases,
- http2_server_cases, resultset,
- num_failures, cloud_to_prod, prod_servers, http2_interop):
- """Generate HTML report for interop tests."""
- template_file = 'tools/run_tests/interop/interop_html_report.template'
- try:
- mytemplate = Template(filename=template_file, format_exceptions=True)
- except NameError:
- print('Mako template is not installed. Skipping HTML report generation.')
- return
- except IOError as e:
- print('Failed to find the template %s: %s' % (template_file, e))
- return
-
- sorted_test_cases = sorted(test_cases)
- sorted_auth_test_cases = sorted(auth_test_cases)
- sorted_http2_cases = sorted(http2_cases)
- sorted_http2_server_cases = sorted(http2_server_cases)
- sorted_client_langs = sorted(client_langs)
- sorted_server_langs = sorted(server_langs)
- sorted_prod_servers = sorted(prod_servers)
-
- args = {'client_langs': sorted_client_langs,
- 'server_langs': sorted_server_langs,
- 'test_cases': sorted_test_cases,
- 'auth_test_cases': sorted_auth_test_cases,
- 'http2_cases': sorted_http2_cases,
- 'http2_server_cases': sorted_http2_server_cases,
- 'resultset': resultset,
- 'num_failures': num_failures,
- 'cloud_to_prod': cloud_to_prod,
- 'prod_servers': sorted_prod_servers,
- 'http2_interop': http2_interop}
-
- html_report_out_dir = 'reports'
- if not os.path.exists(html_report_out_dir):
- os.mkdir(html_report_out_dir)
- html_file_path = os.path.join(html_report_out_dir, 'index.html')
- try:
- with open(html_file_path, 'w') as output_file:
- mytemplate.render_context(Context(output_file, **args))
- except:
- print(exceptions.text_error_template().render())
- raise
+ """Append a JUnit-like XML report tree with test results as a new suite."""
+ testsuite = ET.SubElement(
+ tree.getroot(),
+ 'testsuite',
+ id=id,
+ package=suite_package,
+ name=suite_name,
+ timestamp=datetime.datetime.now().isoformat())
+ failure_count = 0
+ error_count = 0
+ for shortname, results in six.iteritems(resultset):
+ for result in results:
+ xml_test = ET.SubElement(testsuite, 'testcase', name=shortname)
+ if result.elapsed_time:
+ xml_test.set('time', str(result.elapsed_time))
+ filtered_msg = _filter_msg(result.message, 'XML')
+ if result.state == 'FAILED':
+ ET.SubElement(
+ xml_test, 'failure', message='Failure').text = filtered_msg
+ failure_count += 1
+ elif result.state == 'TIMEOUT':
+ ET.SubElement(
+ xml_test, 'error', message='Timeout').text = filtered_msg
+ error_count += 1
+ elif result.state == 'SKIPPED':
+ ET.SubElement(xml_test, 'skipped', message='Skipped')
+ testsuite.set('failures', str(failure_count))
+ testsuite.set('errors', str(error_count))
+
+
+def render_interop_html_report(client_langs, server_langs, test_cases,
+ auth_test_cases, http2_cases, http2_server_cases,
+ resultset, num_failures, cloud_to_prod,
+ prod_servers, http2_interop):
+ """Generate HTML report for interop tests."""
+ template_file = 'tools/run_tests/interop/interop_html_report.template'
+ try:
+ mytemplate = Template(filename=template_file, format_exceptions=True)
+ except NameError:
+ print(
+ 'Mako template is not installed. Skipping HTML report generation.')
+ return
+ except IOError as e:
+ print('Failed to find the template %s: %s' % (template_file, e))
+ return
+
+ sorted_test_cases = sorted(test_cases)
+ sorted_auth_test_cases = sorted(auth_test_cases)
+ sorted_http2_cases = sorted(http2_cases)
+ sorted_http2_server_cases = sorted(http2_server_cases)
+ sorted_client_langs = sorted(client_langs)
+ sorted_server_langs = sorted(server_langs)
+ sorted_prod_servers = sorted(prod_servers)
+
+ args = {
+ 'client_langs': sorted_client_langs,
+ 'server_langs': sorted_server_langs,
+ 'test_cases': sorted_test_cases,
+ 'auth_test_cases': sorted_auth_test_cases,
+ 'http2_cases': sorted_http2_cases,
+ 'http2_server_cases': sorted_http2_server_cases,
+ 'resultset': resultset,
+ 'num_failures': num_failures,
+ 'cloud_to_prod': cloud_to_prod,
+ 'prod_servers': sorted_prod_servers,
+ 'http2_interop': http2_interop
+ }
+
+ html_report_out_dir = 'reports'
+ if not os.path.exists(html_report_out_dir):
+ os.mkdir(html_report_out_dir)
+ html_file_path = os.path.join(html_report_out_dir, 'index.html')
+ try:
+ with open(html_file_path, 'w') as output_file:
+ mytemplate.render_context(Context(output_file, **args))
+ except:
+ print(exceptions.text_error_template().render())
+ raise
+
def render_perf_profiling_results(output_filepath, profile_names):
- with open(output_filepath, 'w') as output_file:
- output_file.write('<ul>\n')
- for name in profile_names:
- output_file.write('<li><a href=%s>%s</a></li>\n' % (name, name))
- output_file.write('</ul>\n')
+ with open(output_filepath, 'w') as output_file:
+ output_file.write('<ul>\n')
+ for name in profile_names:
+ output_file.write('<li><a href=%s>%s</a></li>\n' % (name, name))
+ output_file.write('</ul>\n')
diff --git a/tools/run_tests/python_utils/start_port_server.py b/tools/run_tests/python_utils/start_port_server.py
index 786103ccdf..5572cdcfe7 100644
--- a/tools/run_tests/python_utils/start_port_server.py
+++ b/tools/run_tests/python_utils/start_port_server.py
@@ -22,10 +22,10 @@ import sys
import tempfile
import time
-
# must be synchronized with test/core/utils/port_server_client.h
_PORT_SERVER_PORT = 32766
+
def start_port_server():
# check if a compatible port server is running
# if incompatible (version mismatch) ==> start a new one
@@ -33,9 +33,8 @@ def start_port_server():
# otherwise, leave it up
try:
version = int(
- urllib.urlopen(
- 'http://localhost:%d/version_number' %
- _PORT_SERVER_PORT).read())
+ urllib.urlopen('http://localhost:%d/version_number' %
+ _PORT_SERVER_PORT).read())
logging.info('detected port server running version %d', version)
running = True
except Exception as e:
@@ -92,8 +91,8 @@ def start_port_server():
# try one final time: maybe another build managed to start one
time.sleep(1)
try:
- urllib.urlopen(
- 'http://localhost:%d/get' % _PORT_SERVER_PORT).read()
+ urllib.urlopen('http://localhost:%d/get' %
+ _PORT_SERVER_PORT).read()
logging.info(
'last ditch attempt to contact port server succeeded')
break
diff --git a/tools/run_tests/python_utils/upload_test_results.py b/tools/run_tests/python_utils/upload_test_results.py
index ea97bc0aec..9eb8e2a862 100644
--- a/tools/run_tests/python_utils/upload_test_results.py
+++ b/tools/run_tests/python_utils/upload_test_results.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
"""Helper to upload Jenkins test results to BQ"""
from __future__ import print_function
@@ -23,8 +22,8 @@ import sys
import time
import uuid
-gcp_utils_dir = os.path.abspath(os.path.join(
- os.path.dirname(__file__), '../../gcp/utils'))
+gcp_utils_dir = os.path.abspath(
+ os.path.join(os.path.dirname(__file__), '../../gcp/utils'))
sys.path.append(gcp_utils_dir)
import big_query_utils
@@ -35,55 +34,57 @@ _EXPIRATION_MS = 90 * 24 * 60 * 60 * 1000
_PARTITION_TYPE = 'DAY'
_PROJECT_ID = 'grpc-testing'
_RESULTS_SCHEMA = [
- ('job_name', 'STRING', 'Name of Jenkins job'),
- ('build_id', 'INTEGER', 'Build ID of Jenkins job'),
- ('build_url', 'STRING', 'URL of Jenkins job'),
- ('test_name', 'STRING', 'Individual test name'),
- ('language', 'STRING', 'Language of test'),
- ('platform', 'STRING', 'Platform used for test'),
- ('config', 'STRING', 'Config used for test'),
- ('compiler', 'STRING', 'Compiler used for test'),
- ('iomgr_platform', 'STRING', 'Iomgr used for test'),
- ('result', 'STRING', 'Test result: PASSED, TIMEOUT, FAILED, or SKIPPED'),
- ('timestamp', 'TIMESTAMP', 'Timestamp of test run'),
- ('elapsed_time', 'FLOAT', 'How long test took to run'),
- ('cpu_estimated', 'FLOAT', 'Estimated CPU usage of test'),
- ('cpu_measured', 'FLOAT', 'Actual CPU usage of test'),
- ('return_code', 'INTEGER', 'Exit code of test'),
+ ('job_name', 'STRING', 'Name of Jenkins job'),
+ ('build_id', 'INTEGER', 'Build ID of Jenkins job'),
+ ('build_url', 'STRING', 'URL of Jenkins job'),
+ ('test_name', 'STRING', 'Individual test name'),
+ ('language', 'STRING', 'Language of test'),
+ ('platform', 'STRING', 'Platform used for test'),
+ ('config', 'STRING', 'Config used for test'),
+ ('compiler', 'STRING', 'Compiler used for test'),
+ ('iomgr_platform', 'STRING', 'Iomgr used for test'),
+ ('result', 'STRING', 'Test result: PASSED, TIMEOUT, FAILED, or SKIPPED'),
+ ('timestamp', 'TIMESTAMP', 'Timestamp of test run'),
+ ('elapsed_time', 'FLOAT', 'How long test took to run'),
+ ('cpu_estimated', 'FLOAT', 'Estimated CPU usage of test'),
+ ('cpu_measured', 'FLOAT', 'Actual CPU usage of test'),
+ ('return_code', 'INTEGER', 'Exit code of test'),
]
_INTEROP_RESULTS_SCHEMA = [
- ('job_name', 'STRING', 'Name of Jenkins/Kokoro job'),
- ('build_id', 'INTEGER', 'Build ID of Jenkins/Kokoro job'),
- ('build_url', 'STRING', 'URL of Jenkins/Kokoro job'),
- ('test_name', 'STRING', 'Unique test name combining client, server, and test_name'),
- ('suite', 'STRING', 'Test suite: cloud_to_cloud, cloud_to_prod, or cloud_to_prod_auth'),
- ('client', 'STRING', 'Client language'),
- ('server', 'STRING', 'Server host name'),
- ('test_case', 'STRING', 'Name of test case'),
- ('result', 'STRING', 'Test result: PASSED, TIMEOUT, FAILED, or SKIPPED'),
- ('timestamp', 'TIMESTAMP', 'Timestamp of test run'),
- ('elapsed_time', 'FLOAT', 'How long test took to run'),
+ ('job_name', 'STRING', 'Name of Jenkins/Kokoro job'),
+ ('build_id', 'INTEGER', 'Build ID of Jenkins/Kokoro job'),
+ ('build_url', 'STRING', 'URL of Jenkins/Kokoro job'),
+ ('test_name', 'STRING',
+ 'Unique test name combining client, server, and test_name'),
+ ('suite', 'STRING',
+ 'Test suite: cloud_to_cloud, cloud_to_prod, or cloud_to_prod_auth'),
+ ('client', 'STRING', 'Client language'),
+ ('server', 'STRING', 'Server host name'),
+ ('test_case', 'STRING', 'Name of test case'),
+ ('result', 'STRING', 'Test result: PASSED, TIMEOUT, FAILED, or SKIPPED'),
+ ('timestamp', 'TIMESTAMP', 'Timestamp of test run'),
+ ('elapsed_time', 'FLOAT', 'How long test took to run'),
]
def _get_build_metadata(test_results):
- """Add Jenkins/Kokoro build metadata to test_results based on environment
+ """Add Jenkins/Kokoro build metadata to test_results based on environment
variables set by Jenkins/Kokoro.
"""
- build_id = os.getenv('BUILD_ID') or os.getenv('KOKORO_BUILD_NUMBER')
- build_url = os.getenv('BUILD_URL') or os.getenv('KOKORO_BUILD_URL')
- job_name = os.getenv('JOB_BASE_NAME') or os.getenv('KOKORO_JOB_NAME')
+ build_id = os.getenv('BUILD_ID') or os.getenv('KOKORO_BUILD_NUMBER')
+ build_url = os.getenv('BUILD_URL') or os.getenv('KOKORO_BUILD_URL')
+ job_name = os.getenv('JOB_BASE_NAME') or os.getenv('KOKORO_JOB_NAME')
- if build_id:
- test_results['build_id'] = build_id
- if build_url:
- test_results['build_url'] = build_url
- if job_name:
- test_results['job_name'] = job_name
+ if build_id:
+ test_results['build_id'] = build_id
+ if build_url:
+ test_results['build_url'] = build_url
+ if job_name:
+ test_results['job_name'] = job_name
def upload_results_to_bq(resultset, bq_table, args, platform):
- """Upload test results to a BQ table.
+ """Upload test results to a BQ table.
Args:
resultset: dictionary generated by jobset.run
@@ -91,77 +92,97 @@ def upload_results_to_bq(resultset, bq_table, args, platform):
args: args in run_tests.py, generated by argparse
platform: string name of platform tests were run on
"""
- bq = big_query_utils.create_big_query()
- big_query_utils.create_partitioned_table(bq, _PROJECT_ID, _DATASET_ID, bq_table, _RESULTS_SCHEMA, _DESCRIPTION,
- partition_type=_PARTITION_TYPE, expiration_ms= _EXPIRATION_MS)
-
- for shortname, results in six.iteritems(resultset):
- for result in results:
- test_results = {}
- _get_build_metadata(test_results)
- test_results['compiler'] = args.compiler
- test_results['config'] = args.config
- test_results['cpu_estimated'] = result.cpu_estimated
- test_results['cpu_measured'] = result.cpu_measured
- test_results['elapsed_time'] = '%.2f' % result.elapsed_time
- test_results['iomgr_platform'] = args.iomgr_platform
- # args.language is a list, but will always have one element in the contexts
- # this function is used.
- test_results['language'] = args.language[0]
- test_results['platform'] = platform
- test_results['result'] = result.state
- test_results['return_code'] = result.returncode
- test_results['test_name'] = shortname
- test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S')
-
- row = big_query_utils.make_row(str(uuid.uuid4()), test_results)
-
- # TODO(jtattermusch): rows are inserted one by one, very inefficient
- max_retries = 3
- for attempt in range(max_retries):
- if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, bq_table, [row]):
- break
- else:
- if attempt < max_retries - 1:
- print('Error uploading result to bigquery, will retry.')
- else:
- print('Error uploading result to bigquery, all attempts failed.')
- sys.exit(1)
+ bq = big_query_utils.create_big_query()
+ big_query_utils.create_partitioned_table(
+ bq,
+ _PROJECT_ID,
+ _DATASET_ID,
+ bq_table,
+ _RESULTS_SCHEMA,
+ _DESCRIPTION,
+ partition_type=_PARTITION_TYPE,
+ expiration_ms=_EXPIRATION_MS)
+
+ for shortname, results in six.iteritems(resultset):
+ for result in results:
+ test_results = {}
+ _get_build_metadata(test_results)
+ test_results['compiler'] = args.compiler
+ test_results['config'] = args.config
+ test_results['cpu_estimated'] = result.cpu_estimated
+ test_results['cpu_measured'] = result.cpu_measured
+ test_results['elapsed_time'] = '%.2f' % result.elapsed_time
+ test_results['iomgr_platform'] = args.iomgr_platform
+ # args.language is a list, but will always have one element in the contexts
+ # this function is used.
+ test_results['language'] = args.language[0]
+ test_results['platform'] = platform
+ test_results['result'] = result.state
+ test_results['return_code'] = result.returncode
+ test_results['test_name'] = shortname
+ test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S')
+
+ row = big_query_utils.make_row(str(uuid.uuid4()), test_results)
+
+ # TODO(jtattermusch): rows are inserted one by one, very inefficient
+ max_retries = 3
+ for attempt in range(max_retries):
+ if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID,
+ bq_table, [row]):
+ break
+ else:
+ if attempt < max_retries - 1:
+ print('Error uploading result to bigquery, will retry.')
+ else:
+ print(
+ 'Error uploading result to bigquery, all attempts failed.'
+ )
+ sys.exit(1)
def upload_interop_results_to_bq(resultset, bq_table, args):
- """Upload interop test results to a BQ table.
+ """Upload interop test results to a BQ table.
Args:
resultset: dictionary generated by jobset.run
bq_table: string name of table to create/upload results to in BQ
args: args in run_interop_tests.py, generated by argparse
"""
- bq = big_query_utils.create_big_query()
- big_query_utils.create_partitioned_table(bq, _PROJECT_ID, _DATASET_ID, bq_table, _INTEROP_RESULTS_SCHEMA, _DESCRIPTION,
- partition_type=_PARTITION_TYPE, expiration_ms= _EXPIRATION_MS)
-
- for shortname, results in six.iteritems(resultset):
- for result in results:
- test_results = {}
- _get_build_metadata(test_results)
- test_results['elapsed_time'] = '%.2f' % result.elapsed_time
- test_results['result'] = result.state
- test_results['test_name'] = shortname
- test_results['suite'] = shortname.split(':')[0]
- test_results['client'] = shortname.split(':')[1]
- test_results['server'] = shortname.split(':')[2]
- test_results['test_case'] = shortname.split(':')[3]
- test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S')
- row = big_query_utils.make_row(str(uuid.uuid4()), test_results)
- # TODO(jtattermusch): rows are inserted one by one, very inefficient
- max_retries = 3
- for attempt in range(max_retries):
- if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, bq_table, [row]):
- break
- else:
- if attempt < max_retries - 1:
- print('Error uploading result to bigquery, will retry.')
- else:
- print('Error uploading result to bigquery, all attempts failed.')
- sys.exit(1)
+ bq = big_query_utils.create_big_query()
+ big_query_utils.create_partitioned_table(
+ bq,
+ _PROJECT_ID,
+ _DATASET_ID,
+ bq_table,
+ _INTEROP_RESULTS_SCHEMA,
+ _DESCRIPTION,
+ partition_type=_PARTITION_TYPE,
+ expiration_ms=_EXPIRATION_MS)
+
+ for shortname, results in six.iteritems(resultset):
+ for result in results:
+ test_results = {}
+ _get_build_metadata(test_results)
+ test_results['elapsed_time'] = '%.2f' % result.elapsed_time
+ test_results['result'] = result.state
+ test_results['test_name'] = shortname
+ test_results['suite'] = shortname.split(':')[0]
+ test_results['client'] = shortname.split(':')[1]
+ test_results['server'] = shortname.split(':')[2]
+ test_results['test_case'] = shortname.split(':')[3]
+ test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S')
+ row = big_query_utils.make_row(str(uuid.uuid4()), test_results)
+ # TODO(jtattermusch): rows are inserted one by one, very inefficient
+ max_retries = 3
+ for attempt in range(max_retries):
+ if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID,
+ bq_table, [row]):
+ break
+ else:
+ if attempt < max_retries - 1:
+ print('Error uploading result to bigquery, will retry.')
+ else:
+ print(
+ 'Error uploading result to bigquery, all attempts failed.'
+ )
+ sys.exit(1)
diff --git a/tools/run_tests/python_utils/watch_dirs.py b/tools/run_tests/python_utils/watch_dirs.py
index 7bd085efaf..d2ad303a07 100755
--- a/tools/run_tests/python_utils/watch_dirs.py
+++ b/tools/run_tests/python_utils/watch_dirs.py
@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
"""Helper to watch a (set) of directories for modifications."""
import os
@@ -19,42 +18,42 @@ import time
class DirWatcher(object):
- """Helper to watch a (set) of directories for modifications."""
-
- def __init__(self, paths):
- if isinstance(paths, basestring):
- paths = [paths]
- self._done = False
- self.paths = list(paths)
- self.lastrun = time.time()
- self._cache = self._calculate()
-
- def _calculate(self):
- """Walk over all subscribed paths, check most recent mtime."""
- most_recent_change = None
- for path in self.paths:
- if not os.path.exists(path):
- continue
- if not os.path.isdir(path):
- continue
- for root, _, files in os.walk(path):
- for f in files:
- if f and f[0] == '.': continue
- try:
- st = os.stat(os.path.join(root, f))
- except OSError as e:
- if e.errno == os.errno.ENOENT:
- continue
- raise
- if most_recent_change is None:
- most_recent_change = st.st_mtime
- else:
- most_recent_change = max(most_recent_change, st.st_mtime)
- return most_recent_change
-
- def most_recent_change(self):
- if time.time() - self.lastrun > 1:
- self._cache = self._calculate()
- self.lastrun = time.time()
- return self._cache
-
+ """Helper to watch a (set) of directories for modifications."""
+
+ def __init__(self, paths):
+ if isinstance(paths, basestring):
+ paths = [paths]
+ self._done = False
+ self.paths = list(paths)
+ self.lastrun = time.time()
+ self._cache = self._calculate()
+
+ def _calculate(self):
+ """Walk over all subscribed paths, check most recent mtime."""
+ most_recent_change = None
+ for path in self.paths:
+ if not os.path.exists(path):
+ continue
+ if not os.path.isdir(path):
+ continue
+ for root, _, files in os.walk(path):
+ for f in files:
+ if f and f[0] == '.': continue
+ try:
+ st = os.stat(os.path.join(root, f))
+ except OSError as e:
+ if e.errno == os.errno.ENOENT:
+ continue
+ raise
+ if most_recent_change is None:
+ most_recent_change = st.st_mtime
+ else:
+ most_recent_change = max(most_recent_change,
+ st.st_mtime)
+ return most_recent_change
+
+ def most_recent_change(self):
+ if time.time() - self.lastrun > 1:
+ self._cache = self._calculate()
+ self.lastrun = time.time()
+ return self._cache