aboutsummaryrefslogtreecommitdiffhomepage
path: root/tools/run_tests/jobset.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/run_tests/jobset.py')
-rwxr-xr-xtools/run_tests/jobset.py42
1 files changed, 36 insertions, 6 deletions
diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py
index e33433daf2..adf178bb3c 100755
--- a/tools/run_tests/jobset.py
+++ b/tools/run_tests/jobset.py
@@ -33,6 +33,7 @@ import hashlib
import multiprocessing
import os
import platform
+import re
import signal
import subprocess
import sys
@@ -40,6 +41,10 @@ import tempfile
import time
+# cpu cost measurement
+measure_cpu_costs = False
+
+
_DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count()
_MAX_RESULT_SIZE = 8192
@@ -146,7 +151,7 @@ class JobSpec(object):
def __init__(self, cmdline, shortname=None, environ=None, hash_targets=None,
cwd=None, shell=False, timeout_seconds=5*60, flake_retries=0,
- timeout_retries=0, kill_handler=None):
+ timeout_retries=0, kill_handler=None, cpu_cost=1.0):
"""
Arguments:
cmdline: a list of arguments to pass as the command line
@@ -154,6 +159,7 @@ class JobSpec(object):
hash_targets: which files to include in the hash representing the jobs version
(or empty, indicating the job should not be hashed)
kill_handler: a handler that will be called whenever job.kill() is invoked
+ cpu_cost: number of cores per second this job needs
"""
if environ is None:
environ = {}
@@ -169,6 +175,7 @@ class JobSpec(object):
self.flake_retries = flake_retries
self.timeout_retries = timeout_retries
self.kill_handler = kill_handler
+ self.cpu_cost = cpu_cost
def identity(self):
return '%r %r %r' % (self.cmdline, self.environ, self.hash_targets)
@@ -218,7 +225,10 @@ class Job(object):
env.update(self._spec.environ)
env.update(self._add_env)
self._start = time.time()
- try_start = lambda: subprocess.Popen(args=self._spec.cmdline,
+ cmdline = self._spec.cmdline
+ if measure_cpu_costs:
+ cmdline = ['time', '--portability'] + cmdline
+ try_start = lambda: subprocess.Popen(args=cmdline,
stderr=subprocess.STDOUT,
stdout=self._tempfile,
cwd=self._spec.cwd,
@@ -267,13 +277,24 @@ class Job(object):
self.result.returncode = self._process.returncode
else:
self._state = _SUCCESS
- message('PASSED', '%s [time=%.1fsec; retries=%d;%d]' % (
- self._spec.shortname, elapsed, self._retries, self._timeout_retries),
+ measurement = ''
+ if measure_cpu_costs:
+ m = re.search(r'real ([0-9.]+)\nuser ([0-9.]+)\nsys ([0-9.]+)', stdout())
+ real = float(m.group(1))
+ user = float(m.group(2))
+ sys = float(m.group(3))
+ if real > 0.5:
+ cores = (user + sys) / real
+ measurement = '; cpu_cost=%.01f; estimated=%.01f' % (cores, self._spec.cpu_cost)
+ message('PASSED', '%s [time=%.1fsec; retries=%d:%d%s]' % (
+ self._spec.shortname, elapsed, self._retries, self._timeout_retries, measurement),
do_newline=self._newline_on_success or self._travis)
self.result.state = 'PASSED'
if self._bin_hash:
update_cache.finished(self._spec.identity(), self._bin_hash)
- elif self._state == _RUNNING and time.time() - self._start > self._spec.timeout_seconds:
+ elif (self._state == _RUNNING and
+ self._spec.timeout_seconds is not None and
+ time.time() - self._start > self._spec.timeout_seconds):
if self._timeout_retries < self._spec.timeout_retries:
message('TIMEOUT_FLAKE', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True)
self._timeout_retries += 1
@@ -327,10 +348,19 @@ class Jobset(object):
def get_num_failures(self):
return self._failures
+ def cpu_cost(self):
+ c = 0
+ for job in self._running:
+ c += job._spec.cpu_cost
+ return c
+
def start(self, spec):
"""Start a job. Return True on success, False on failure."""
- while len(self._running) >= self._maxjobs:
+ while True:
if self.cancelled(): return False
+ current_cpu_cost = self.cpu_cost()
+ if current_cpu_cost == 0: break
+ if current_cpu_cost + spec.cpu_cost <= self._maxjobs: break
self.reap()
if self.cancelled(): return False
if spec.hash_targets: