diff options
Diffstat (limited to 'tools/run_tests/python_utils/jobset.py')
-rwxr-xr-x | tools/run_tests/python_utils/jobset.py | 57 |
1 files changed, 30 insertions, 27 deletions
diff --git a/tools/run_tests/python_utils/jobset.py b/tools/run_tests/python_utils/jobset.py index 08d652ae3f..85eef444ef 100755 --- a/tools/run_tests/python_utils/jobset.py +++ b/tools/run_tests/python_utils/jobset.py @@ -71,10 +71,8 @@ def platform_string(): if platform_string() == 'windows': pass else: - have_alarm = False def alarm_handler(unused_signum, unused_frame): - global have_alarm - have_alarm = False + pass signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None) signal.signal(signal.SIGALRM, alarm_handler) @@ -224,7 +222,7 @@ class JobResult(object): self.retries = 0 self.message = '' self.cpu_estimated = 1 - self.cpu_measured = 0 + self.cpu_measured = 1 def read_from_start(f): @@ -304,12 +302,13 @@ class Job(object): self._retries += 1 self.result.num_failures += 1 self.result.retries = self._timeout_retries + self._retries + # NOTE: job is restarted regardless of jobset's max_time setting self.start() else: self._state = _FAILURE if not self._suppress_failure_message: - message('FAILED', '%s [ret=%d, pid=%d]' % ( - self._spec.shortname, self._process.returncode, self._process.pid), + message('FAILED', '%s [ret=%d, pid=%d, time=%.1fsec]' % ( + self._spec.shortname, self._process.returncode, self._process.pid, elapsed), stdout(), do_newline=True) self.result.state = 'FAILED' self.result.num_failures += 1 @@ -328,7 +327,7 @@ class Job(object): self.result.cpu_estimated = float('%.01f' % self._spec.cpu_cost) measurement = '; cpu_cost=%.01f; estimated=%.01f' % (self.result.cpu_measured, self.result.cpu_estimated) if not self._quiet_success: - message('PASSED', '%s [time=%.1fsec; retries=%d:%d%s]' % ( + message('PASSED', '%s [time=%.1fsec, retries=%d:%d%s]' % ( self._spec.shortname, elapsed, self._retries, self._timeout_retries, measurement), stdout() if self._spec.verbose_success else None, do_newline=self._newline_on_success or self._travis) @@ -336,6 +335,8 @@ class Job(object): elif (self._state == _RUNNING and self._spec.timeout_seconds is not None and time.time() - self._start > self._spec.timeout_seconds): + elapsed = time.time() - self._start + self.result.elapsed_time = elapsed if self._timeout_retries < self._spec.timeout_retries: message('TIMEOUT_FLAKE', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True) self._timeout_retries += 1 @@ -344,9 +345,10 @@ class Job(object): if self._spec.kill_handler: self._spec.kill_handler(self) self._process.terminate() + # NOTE: job is restarted regardless of jobset's max_time setting self.start() else: - message('TIMEOUT', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True) + message('TIMEOUT', '%s [pid=%d, time=%.1fsec]' % (self._spec.shortname, self._process.pid, elapsed), stdout(), do_newline=True) self.kill() self.result.state = 'TIMEOUT' self.result.num_failures += 1 @@ -366,15 +368,15 @@ class Job(object): class Jobset(object): """Manages one run of jobs.""" - def __init__(self, check_cancelled, maxjobs, newline_on_success, travis, - stop_on_failure, add_env, quiet_success, max_time, clear_alarms): + def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic, newline_on_success, travis, + stop_on_failure, add_env, quiet_success, max_time): self._running = set() self._check_cancelled = check_cancelled - self._clear_alarms = clear_alarms self._cancelled = False self._failures = 0 self._completed = 0 self._maxjobs = maxjobs + self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic self._newline_on_success = newline_on_success self._travis = travis self._stop_on_failure = stop_on_failure @@ -409,8 +411,10 @@ class Jobset(object): if self.cancelled(): return False current_cpu_cost = self.cpu_cost() if current_cpu_cost == 0: break - if current_cpu_cost + spec.cpu_cost <= self._maxjobs: break - self.reap() + if current_cpu_cost + spec.cpu_cost <= self._maxjobs: + if len(self._running) < self._maxjobs_cpu_agnostic: + break + self.reap(spec.shortname, spec.cpu_cost) if self.cancelled(): return False job = Job(spec, self._newline_on_success, @@ -422,7 +426,7 @@ class Jobset(object): self.resultset[job.GetSpec().shortname] = [] return True - def reap(self): + def reap(self, waiting_for=None, waiting_for_cost=None): """Collect the dead jobs.""" while self._running: dead = set() @@ -450,15 +454,16 @@ class Jobset(object): sofar = now - self._start_time remaining = sofar / self._completed * (self._remaining + len(self._running)) rstr = 'ETA %.1f sec; %s' % (remaining, rstr) - message('WAITING', '%s%d jobs running, %d complete, %d failed' % ( - rstr, len(self._running), self._completed, self._failures)) + if waiting_for is not None: + wstr = ' next: %s @ %.2f cpu' % (waiting_for, waiting_for_cost) + else: + wstr = '' + message('WAITING', '%s%d jobs running, %d complete, %d failed (load %.2f)%s' % ( + rstr, len(self._running), self._completed, self._failures, self.cpu_cost(), wstr)) if platform_string() == 'windows': time.sleep(0.1) else: - global have_alarm - if not have_alarm: - have_alarm = True - signal.alarm(10) + signal.alarm(10) signal.pause() def cancelled(self): @@ -474,10 +479,7 @@ class Jobset(object): while self._running: if self.cancelled(): pass # poll cancellation self.reap() - # Clear the alarms when finished to avoid a race condition causing job - # failures. Don't do this when running multi-VM tests because clearing - # the alarms causes the test to stall - if platform_string() != 'windows' and self._clear_alarms: + if platform_string() != 'windows': signal.alarm(0) return not self.cancelled() and self._failures == 0 @@ -500,6 +502,7 @@ def tag_remaining(xs): def run(cmdlines, check_cancelled=_never_cancelled, maxjobs=None, + maxjobs_cpu_agnostic=None, newline_on_success=False, travis=False, infinite_runs=False, @@ -507,8 +510,7 @@ def run(cmdlines, add_env={}, skip_jobs=False, quiet_success=False, - max_time=-1, - clear_alarms=True): + max_time=-1): if skip_jobs: resultset = {} skipped_job_result = JobResult() @@ -519,8 +521,9 @@ def run(cmdlines, return 0, resultset js = Jobset(check_cancelled, maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS, + maxjobs_cpu_agnostic if maxjobs_cpu_agnostic is not None else _DEFAULT_MAX_JOBS, newline_on_success, travis, stop_on_failure, add_env, - quiet_success, max_time, clear_alarms) + quiet_success, max_time) for cmdline, remaining in tag_remaining(cmdlines): if not js.start(cmdline): break |