diff options
Diffstat (limited to 'tools/run_tests/run_performance_tests.py')
-rwxr-xr-x | tools/run_tests/run_performance_tests.py | 142 |
1 files changed, 110 insertions, 32 deletions
diff --git a/tools/run_tests/run_performance_tests.py b/tools/run_tests/run_performance_tests.py index 77c0addb42..0ab3d264a5 100755 --- a/tools/run_tests/run_performance_tests.py +++ b/tools/run_tests/run_performance_tests.py @@ -31,6 +31,7 @@ """Run performance tests locally or remotely.""" import argparse +import itertools import jobset import multiprocessing import os @@ -53,6 +54,12 @@ class CXXLanguage: def __init__(self): self.safename = 'cxx' + def worker_cmdline(self): + return ['bins/opt/qps_worker'] + + def worker_port_offset(self): + return 0 + def scenarios(self): # TODO(jtattermusch): add more scenarios return { @@ -96,6 +103,32 @@ class CSharpLanguage: def __init__(self): self.safename = str(self) + def worker_cmdline(self): + return ['tools/run_tests/performance/run_worker_csharp.sh'] + + def worker_port_offset(self): + return 100 + + def scenarios(self): + # TODO(jtattermusch): add more scenarios + return { + # Scenario 1: generic async streaming ping-pong (contentionless latency) + 'csharp_async_generic_streaming_ping_pong': [ + '--rpc_type=STREAMING', + '--client_type=ASYNC_CLIENT', + '--server_type=ASYNC_GENERIC_SERVER', + '--outstanding_rpcs_per_channel=1', + '--client_channels=1', + '--bbuf_req_size=0', + '--bbuf_resp_size=0', + '--async_client_threads=1', + '--async_server_threads=1', + '--secure_test=true', + '--num_servers=1', + '--num_clients=1', + '--server_core_limit=0', + '--client_core_limit=0']} + def __str__(self): return 'csharp' @@ -106,6 +139,29 @@ class NodeLanguage: pass self.safename = str(self) + def worker_cmdline(self): + return ['tools/run_tests/performance/run_worker_node.sh'] + + def worker_port_offset(self): + return 200 + + def scenarios(self): + # TODO(jtattermusch): add more scenarios + return { + 'node_sync_unary_ping_pong_protobuf': [ + '--rpc_type=UNARY', + '--client_type=ASYNC_CLIENT', + '--server_type=ASYNC_SERVER', + '--outstanding_rpcs_per_channel=1', + '--client_channels=1', + '--simple_req_size=0', + '--simple_resp_size=0', + '--secure_test=false', + '--num_servers=1', + '--num_clients=1', + '--server_core_limit=0', + '--client_core_limit=0']} + def __str__(self): return 'node' @@ -120,8 +176,9 @@ _LANGUAGES = { class QpsWorkerJob: """Encapsulates a qps worker server job.""" - def __init__(self, spec, host_and_port): + def __init__(self, spec, language, host_and_port): self._spec = spec + self.language = language self.host_and_port = host_and_port self._job = jobset.Job(spec, bin_hash=None, newline_on_success=True, travis=True, add_env={}) @@ -133,22 +190,24 @@ class QpsWorkerJob: return self._job.kill() -def create_qpsworker_job(language, port=10000, remote_host=None): +def create_qpsworker_job(language, shortname=None, + port=10000, remote_host=None): # TODO: support more languages - cmd = 'bins/opt/qps_worker --driver_port=%s' % port + cmdline = language.worker_cmdline() + ['--driver_port=%s' % port] if remote_host: user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host) - cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && %s"' % (user_at_host, cmd) + cmdline = ['ssh', + str(user_at_host), + 'cd ~/performance_workspace/grpc/ && %s' % ' '.join(cmdline)] host_and_port='%s:%s' % (remote_host, port) else: host_and_port='localhost:%s' % port jobspec = jobset.JobSpec( - cmdline=[cmd], - shortname='qps_worker', - timeout_seconds=15*60, - shell=True) - return QpsWorkerJob(jobspec, host_and_port) + cmdline=cmdline, + shortname=shortname, + timeout_seconds=15*60) + return QpsWorkerJob(jobspec, language, host_and_port) def create_scenario_jobspec(scenario_name, driver_args, workers, remote_host=None): @@ -199,7 +258,7 @@ def prepare_remote_hosts(hosts): cmdline=['tools/run_tests/performance/remote_host_prepare.sh'], shortname='remote_host_prepare.%s' % host, environ = {'USER_AT_HOST': user_at_host}, - timeout_seconds=3*60)) + timeout_seconds=5*60)) jobset.message('START', 'Preparing remote hosts.', do_newline=True) num_failures, _ = jobset.run( prepare_jobs, newline_on_success=True, maxjobs=10) @@ -213,15 +272,15 @@ def prepare_remote_hosts(hosts): sys.exit(1) -def build_on_remote_hosts(hosts, build_local=False): - """Builds performance worker on remote hosts.""" +def build_on_remote_hosts(hosts, languages=_LANGUAGES.keys(), build_local=False): + """Builds performance worker on remote hosts (and maybe also locally).""" build_timeout = 15*60 build_jobs = [] for host in hosts: user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, host) build_jobs.append( jobset.JobSpec( - cmdline=['tools/run_tests/performance/remote_host_build.sh'], + cmdline=['tools/run_tests/performance/remote_host_build.sh'] + languages, shortname='remote_host_build.%s' % host, environ = {'USER_AT_HOST': user_at_host, 'CONFIG': 'opt'}, timeout_seconds=build_timeout)) @@ -229,56 +288,62 @@ def build_on_remote_hosts(hosts, build_local=False): # Build locally as well build_jobs.append( jobset.JobSpec( - cmdline=['tools/run_tests/performance/build_performance.sh'], + cmdline=['tools/run_tests/performance/build_performance.sh'] + languages, shortname='local_build', environ = {'CONFIG': 'opt'}, timeout_seconds=build_timeout)) - jobset.message('START', 'Building on remote hosts.', do_newline=True) + jobset.message('START', 'Building.', do_newline=True) num_failures, _ = jobset.run( build_jobs, newline_on_success=True, maxjobs=10) if num_failures == 0: jobset.message('SUCCESS', - 'Build on remote hosts was successful.', + 'Built successfully.', do_newline=True) else: - jobset.message('FAILED', 'Failed to build on remote hosts.', + jobset.message('FAILED', 'Build failed.', do_newline=True) sys.exit(1) -def start_qpsworkers(worker_hosts): +def start_qpsworkers(languages, worker_hosts): """Starts QPS workers as background jobs.""" if not worker_hosts: - # run two workers locally + # run two workers locally (for each language) workers=[(None, 10000), (None, 10010)] elif len(worker_hosts) == 1: - # run two workers on the remote host + # run two workers on the remote host (for each language) workers=[(worker_hosts[0], 10000), (worker_hosts[0], 10010)] else: - # run one worker per each remote host + # run one worker per each remote host (for each language) workers=[(worker_host, 10000) for worker_host in worker_hosts] - return [create_qpsworker_job(CXXLanguage(), - port=worker[1], + return [create_qpsworker_job(language, + shortname= 'qps_worker_%s_%s' % (language, + worker_idx), + port=worker[1] + language.worker_port_offset(), remote_host=worker[0]) - for worker in workers] + for language in languages + for worker_idx, worker in enumerate(workers)] -def create_scenarios(languages, workers, remote_host=None): +def create_scenarios(languages, workers_by_lang, remote_host=None): """Create jobspecs for scenarios to run.""" scenarios = [] for language in languages: for scenario_name, driver_args in language.scenarios().iteritems(): scenario = create_scenario_jobspec(scenario_name, driver_args, - workers, + workers_by_lang[str(language)], remote_host=remote_host) scenarios.append(scenario) # the very last scenario requests shutting down the workers. + all_workers = [worker + for workers in workers_by_lang.values() + for worker in workers] scenarios.append(create_scenario_jobspec('quit_workers', ['--quit=true'], - workers, + all_workers, remote_host=remote_host)) return scenarios @@ -300,6 +365,11 @@ def finish_qps_workers(jobs): argp = argparse.ArgumentParser(description='Run performance tests.') +argp.add_argument('-l', '--language', + choices=['all'] + sorted(_LANGUAGES.keys()), + nargs='+', + default=['all'], + help='Languages to benchmark.') argp.add_argument('--remote_driver_host', default=None, help='Run QPS driver on given host. By default, QPS driver is run locally.') @@ -310,6 +380,11 @@ argp.add_argument('--remote_worker_host', args = argp.parse_args() +languages = set(_LANGUAGES[l] + for l in itertools.chain.from_iterable( + _LANGUAGES.iterkeys() if x == 'all' else [x] + for x in args.language)) + # Put together set of remote hosts where to run and build remote_hosts = set() if args.remote_worker_host: @@ -325,15 +400,18 @@ if remote_hosts: build_local = False if not args.remote_driver_host: build_local = True -build_on_remote_hosts(remote_hosts, build_local=build_local) +build_on_remote_hosts(remote_hosts, languages=[str(l) for l in languages], build_local=build_local) -qpsworker_jobs = start_qpsworkers(args.remote_worker_host) +qpsworker_jobs = start_qpsworkers(languages, args.remote_worker_host) -worker_addresses = [job.host_and_port for job in qpsworker_jobs] +# get list of worker addresses for each language. +worker_addresses = dict([(str(language), []) for language in languages]) +for job in qpsworker_jobs: + worker_addresses[str(job.language)].append(job.host_and_port) try: - scenarios = create_scenarios(languages=[CXXLanguage()], - workers=worker_addresses, + scenarios = create_scenarios(languages, + workers_by_lang=worker_addresses, remote_host=args.remote_driver_host) if not scenarios: raise Exception('No scenarios to run') |