diff options
Diffstat (limited to 'tools/run_tests')
-rwxr-xr-x | tools/run_tests/jobset.py | 88 | ||||
-rwxr-xr-x | tools/run_tests/run_tests.py | 49 |
2 files changed, 137 insertions, 0 deletions
diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py new file mode 100755 index 0000000000..0890cc5d99 --- /dev/null +++ b/tools/run_tests/jobset.py @@ -0,0 +1,88 @@ +"""Run a group of subprocesses and then finish.""" + +import multiprocessing +import random +import subprocess +import sys +import threading + +# multiplicative factor to over subscribe CPU cores +# (many tests sleep for a long time) +_OVERSUBSCRIBE = 32 +_active_jobs = threading.Semaphore( + multiprocessing.cpu_count() * _OVERSUBSCRIBE) +_output_lock = threading.Lock() + + +def shuffle_iteratable(it): + """Return an iterable that randomly walks it""" + # take a random sampling from the passed in iterable + # we take an element with probablity 1/p and rapidly increase + # p as we take elements - this gives us a somewhat random set of values before + # we've seen all the values, but starts producing values without having to + # compute ALL of them at once, allowing tests to start a little earlier + nextit = [] + p = 1 + for val in it: + if random.randint(0, p) == 0: + p *= 2 + yield val + else: + nextit.append(val) + # after taking a random sampling, we shuffle the rest of the elements and + # yield them + random.shuffle(nextit) + for val in nextit: + yield val + + +class Jobset(object): + """Manages one run of jobs.""" + + def __init__(self, cmdlines): + self._cmdlines = shuffle_iteratable(cmdlines) + self._failures = 0 + + def _run_thread(self, cmdline): + try: + # start the process + p = subprocess.Popen(args=cmdline, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE) + stdout, _ = p.communicate() + # log output (under a lock) + _output_lock.acquire() + try: + if p.returncode != 0: + sys.stdout.write('\x1b[0G\x1b[2K\x1b[31mFAILED\x1b[0m: %s' + ' [ret=%d]\n' + '%s\n' % ( + ' '.join(cmdline), p.returncode, + stdout)) + self._failures += 1 + else: + sys.stdout.write('\x1b[0G\x1b[2K\x1b[32mPASSED\x1b[0m: %s' % + ' '.join(cmdline)) + sys.stdout.flush() + finally: + _output_lock.release() + finally: + _active_jobs.release() + + def run(self): + threads = [] + for cmdline in self._cmdlines: + # cap number of active jobs - release in _run_thread + _active_jobs.acquire() + t = threading.Thread(target=self._run_thread, + args=[cmdline]) + t.start() + threads.append(t) + for thread in threads: + thread.join() + return self._failures == 0 + + +def run(cmdlines): + return Jobset(cmdlines).run() + diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py new file mode 100755 index 0000000000..ee61f33484 --- /dev/null +++ b/tools/run_tests/run_tests.py @@ -0,0 +1,49 @@ +#!/usr/bin/python +"""Run tests in parallel.""" + +import argparse +import glob +import itertools +import multiprocessing +import sys + +import jobset + +# flags required for make for each configuration +_CONFIGS = ['dbg', 'opt', 'tsan', 'msan', 'asan'] + +# parse command line +argp = argparse.ArgumentParser(description='Run grpc tests.') +argp.add_argument('-c', '--config', + choices=['all'] + _CONFIGS, + nargs='+', + default=['all']) +argp.add_argument('-t', '--test-filter', nargs='*', default=['*']) +argp.add_argument('-n', '--runs_per_test', default=1, type=int) +args = argp.parse_args() + +# grab config +configs = [cfg + for cfg in itertools.chain.from_iterable( + _CONFIGS if x == 'all' else [x] + for x in args.config)] +filters = args.test_filter +runs_per_test = args.runs_per_test + +# build latest, sharing cpu between the various makes +if not jobset.run( + ['make', + '-j', '%d' % max(multiprocessing.cpu_count() / len(configs), 1), + 'buildtests_c', + 'CONFIG=%s' % cfg] + for cfg in configs): + sys.exit(1) + +# run all the tests +jobset.run([x] + for x in itertools.chain.from_iterable( + itertools.chain.from_iterable(itertools.repeat( + glob.glob('bins/%s/%s_test' % (config, filt)), + runs_per_test)) + for config in configs + for filt in filters)) |