aboutsummaryrefslogtreecommitdiffhomepage
path: root/tools/run_tests/python_utils/jobset.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/run_tests/python_utils/jobset.py')
-rwxr-xr-xtools/run_tests/python_utils/jobset.py57
1 files changed, 30 insertions, 27 deletions
diff --git a/tools/run_tests/python_utils/jobset.py b/tools/run_tests/python_utils/jobset.py
index 08d652ae3f..85eef444ef 100755
--- a/tools/run_tests/python_utils/jobset.py
+++ b/tools/run_tests/python_utils/jobset.py
@@ -71,10 +71,8 @@ def platform_string():
if platform_string() == 'windows':
pass
else:
- have_alarm = False
def alarm_handler(unused_signum, unused_frame):
- global have_alarm
- have_alarm = False
+ pass
signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None)
signal.signal(signal.SIGALRM, alarm_handler)
@@ -224,7 +222,7 @@ class JobResult(object):
self.retries = 0
self.message = ''
self.cpu_estimated = 1
- self.cpu_measured = 0
+ self.cpu_measured = 1
def read_from_start(f):
@@ -304,12 +302,13 @@ class Job(object):
self._retries += 1
self.result.num_failures += 1
self.result.retries = self._timeout_retries + self._retries
+ # NOTE: job is restarted regardless of jobset's max_time setting
self.start()
else:
self._state = _FAILURE
if not self._suppress_failure_message:
- message('FAILED', '%s [ret=%d, pid=%d]' % (
- self._spec.shortname, self._process.returncode, self._process.pid),
+ message('FAILED', '%s [ret=%d, pid=%d, time=%.1fsec]' % (
+ self._spec.shortname, self._process.returncode, self._process.pid, elapsed),
stdout(), do_newline=True)
self.result.state = 'FAILED'
self.result.num_failures += 1
@@ -328,7 +327,7 @@ class Job(object):
self.result.cpu_estimated = float('%.01f' % self._spec.cpu_cost)
measurement = '; cpu_cost=%.01f; estimated=%.01f' % (self.result.cpu_measured, self.result.cpu_estimated)
if not self._quiet_success:
- message('PASSED', '%s [time=%.1fsec; retries=%d:%d%s]' % (
+ message('PASSED', '%s [time=%.1fsec, retries=%d:%d%s]' % (
self._spec.shortname, elapsed, self._retries, self._timeout_retries, measurement),
stdout() if self._spec.verbose_success else None,
do_newline=self._newline_on_success or self._travis)
@@ -336,6 +335,8 @@ class Job(object):
elif (self._state == _RUNNING and
self._spec.timeout_seconds is not None and
time.time() - self._start > self._spec.timeout_seconds):
+ elapsed = time.time() - self._start
+ self.result.elapsed_time = elapsed
if self._timeout_retries < self._spec.timeout_retries:
message('TIMEOUT_FLAKE', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True)
self._timeout_retries += 1
@@ -344,9 +345,10 @@ class Job(object):
if self._spec.kill_handler:
self._spec.kill_handler(self)
self._process.terminate()
+ # NOTE: job is restarted regardless of jobset's max_time setting
self.start()
else:
- message('TIMEOUT', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True)
+ message('TIMEOUT', '%s [pid=%d, time=%.1fsec]' % (self._spec.shortname, self._process.pid, elapsed), stdout(), do_newline=True)
self.kill()
self.result.state = 'TIMEOUT'
self.result.num_failures += 1
@@ -366,15 +368,15 @@ class Job(object):
class Jobset(object):
"""Manages one run of jobs."""
- def __init__(self, check_cancelled, maxjobs, newline_on_success, travis,
- stop_on_failure, add_env, quiet_success, max_time, clear_alarms):
+ def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic, newline_on_success, travis,
+ stop_on_failure, add_env, quiet_success, max_time):
self._running = set()
self._check_cancelled = check_cancelled
- self._clear_alarms = clear_alarms
self._cancelled = False
self._failures = 0
self._completed = 0
self._maxjobs = maxjobs
+ self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic
self._newline_on_success = newline_on_success
self._travis = travis
self._stop_on_failure = stop_on_failure
@@ -409,8 +411,10 @@ class Jobset(object):
if self.cancelled(): return False
current_cpu_cost = self.cpu_cost()
if current_cpu_cost == 0: break
- if current_cpu_cost + spec.cpu_cost <= self._maxjobs: break
- self.reap()
+ if current_cpu_cost + spec.cpu_cost <= self._maxjobs:
+ if len(self._running) < self._maxjobs_cpu_agnostic:
+ break
+ self.reap(spec.shortname, spec.cpu_cost)
if self.cancelled(): return False
job = Job(spec,
self._newline_on_success,
@@ -422,7 +426,7 @@ class Jobset(object):
self.resultset[job.GetSpec().shortname] = []
return True
- def reap(self):
+ def reap(self, waiting_for=None, waiting_for_cost=None):
"""Collect the dead jobs."""
while self._running:
dead = set()
@@ -450,15 +454,16 @@ class Jobset(object):
sofar = now - self._start_time
remaining = sofar / self._completed * (self._remaining + len(self._running))
rstr = 'ETA %.1f sec; %s' % (remaining, rstr)
- message('WAITING', '%s%d jobs running, %d complete, %d failed' % (
- rstr, len(self._running), self._completed, self._failures))
+ if waiting_for is not None:
+ wstr = ' next: %s @ %.2f cpu' % (waiting_for, waiting_for_cost)
+ else:
+ wstr = ''
+ message('WAITING', '%s%d jobs running, %d complete, %d failed (load %.2f)%s' % (
+ rstr, len(self._running), self._completed, self._failures, self.cpu_cost(), wstr))
if platform_string() == 'windows':
time.sleep(0.1)
else:
- global have_alarm
- if not have_alarm:
- have_alarm = True
- signal.alarm(10)
+ signal.alarm(10)
signal.pause()
def cancelled(self):
@@ -474,10 +479,7 @@ class Jobset(object):
while self._running:
if self.cancelled(): pass # poll cancellation
self.reap()
- # Clear the alarms when finished to avoid a race condition causing job
- # failures. Don't do this when running multi-VM tests because clearing
- # the alarms causes the test to stall
- if platform_string() != 'windows' and self._clear_alarms:
+ if platform_string() != 'windows':
signal.alarm(0)
return not self.cancelled() and self._failures == 0
@@ -500,6 +502,7 @@ def tag_remaining(xs):
def run(cmdlines,
check_cancelled=_never_cancelled,
maxjobs=None,
+ maxjobs_cpu_agnostic=None,
newline_on_success=False,
travis=False,
infinite_runs=False,
@@ -507,8 +510,7 @@ def run(cmdlines,
add_env={},
skip_jobs=False,
quiet_success=False,
- max_time=-1,
- clear_alarms=True):
+ max_time=-1):
if skip_jobs:
resultset = {}
skipped_job_result = JobResult()
@@ -519,8 +521,9 @@ def run(cmdlines,
return 0, resultset
js = Jobset(check_cancelled,
maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS,
+ maxjobs_cpu_agnostic if maxjobs_cpu_agnostic is not None else _DEFAULT_MAX_JOBS,
newline_on_success, travis, stop_on_failure, add_env,
- quiet_success, max_time, clear_alarms)
+ quiet_success, max_time)
for cmdline, remaining in tag_remaining(cmdlines):
if not js.start(cmdline):
break