diff options
Diffstat (limited to 'tools/run_tests/jobset.py')
-rwxr-xr-x | tools/run_tests/jobset.py | 42 |
1 files changed, 36 insertions, 6 deletions
diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py index e33433daf2..adf178bb3c 100755 --- a/tools/run_tests/jobset.py +++ b/tools/run_tests/jobset.py @@ -33,6 +33,7 @@ import hashlib import multiprocessing import os import platform +import re import signal import subprocess import sys @@ -40,6 +41,10 @@ import tempfile import time +# cpu cost measurement +measure_cpu_costs = False + + _DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count() _MAX_RESULT_SIZE = 8192 @@ -146,7 +151,7 @@ class JobSpec(object): def __init__(self, cmdline, shortname=None, environ=None, hash_targets=None, cwd=None, shell=False, timeout_seconds=5*60, flake_retries=0, - timeout_retries=0, kill_handler=None): + timeout_retries=0, kill_handler=None, cpu_cost=1.0): """ Arguments: cmdline: a list of arguments to pass as the command line @@ -154,6 +159,7 @@ class JobSpec(object): hash_targets: which files to include in the hash representing the jobs version (or empty, indicating the job should not be hashed) kill_handler: a handler that will be called whenever job.kill() is invoked + cpu_cost: number of cores per second this job needs """ if environ is None: environ = {} @@ -169,6 +175,7 @@ class JobSpec(object): self.flake_retries = flake_retries self.timeout_retries = timeout_retries self.kill_handler = kill_handler + self.cpu_cost = cpu_cost def identity(self): return '%r %r %r' % (self.cmdline, self.environ, self.hash_targets) @@ -218,7 +225,10 @@ class Job(object): env.update(self._spec.environ) env.update(self._add_env) self._start = time.time() - try_start = lambda: subprocess.Popen(args=self._spec.cmdline, + cmdline = self._spec.cmdline + if measure_cpu_costs: + cmdline = ['time', '--portability'] + cmdline + try_start = lambda: subprocess.Popen(args=cmdline, stderr=subprocess.STDOUT, stdout=self._tempfile, cwd=self._spec.cwd, @@ -267,13 +277,24 @@ class Job(object): self.result.returncode = self._process.returncode else: self._state = _SUCCESS - message('PASSED', '%s [time=%.1fsec; retries=%d;%d]' % ( - self._spec.shortname, elapsed, self._retries, self._timeout_retries), + measurement = '' + if measure_cpu_costs: + m = re.search(r'real ([0-9.]+)\nuser ([0-9.]+)\nsys ([0-9.]+)', stdout()) + real = float(m.group(1)) + user = float(m.group(2)) + sys = float(m.group(3)) + if real > 0.5: + cores = (user + sys) / real + measurement = '; cpu_cost=%.01f; estimated=%.01f' % (cores, self._spec.cpu_cost) + message('PASSED', '%s [time=%.1fsec; retries=%d:%d%s]' % ( + self._spec.shortname, elapsed, self._retries, self._timeout_retries, measurement), do_newline=self._newline_on_success or self._travis) self.result.state = 'PASSED' if self._bin_hash: update_cache.finished(self._spec.identity(), self._bin_hash) - elif self._state == _RUNNING and time.time() - self._start > self._spec.timeout_seconds: + elif (self._state == _RUNNING and + self._spec.timeout_seconds is not None and + time.time() - self._start > self._spec.timeout_seconds): if self._timeout_retries < self._spec.timeout_retries: message('TIMEOUT_FLAKE', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True) self._timeout_retries += 1 @@ -327,10 +348,19 @@ class Jobset(object): def get_num_failures(self): return self._failures + def cpu_cost(self): + c = 0 + for job in self._running: + c += job._spec.cpu_cost + return c + def start(self, spec): """Start a job. Return True on success, False on failure.""" - while len(self._running) >= self._maxjobs: + while True: if self.cancelled(): return False + current_cpu_cost = self.cpu_cost() + if current_cpu_cost == 0: break + if current_cpu_cost + spec.cpu_cost <= self._maxjobs: break self.reap() if self.cancelled(): return False if spec.hash_targets: |