diff options
Diffstat (limited to 'infra/bots/recipe_modules/swarming/api.py')
-rw-r--r-- | infra/bots/recipe_modules/swarming/api.py | 1063 |
1 files changed, 874 insertions, 189 deletions
diff --git a/infra/bots/recipe_modules/swarming/api.py b/infra/bots/recipe_modules/swarming/api.py index 35dddcf507..47e2c84158 100644 --- a/infra/bots/recipe_modules/swarming/api.py +++ b/infra/bots/recipe_modules/swarming/api.py @@ -1,220 +1,905 @@ -# Copyright 2016 The Chromium Authors. All rights reserved. +# Copyright 2014 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. +import datetime +import functools +import hashlib +import logging +import os.path +from recipe_engine import config_types from recipe_engine import recipe_api -import shlex +from recipe_engine import util as recipe_util +import state -DEFAULT_TASK_EXPIRATION = 20*60*60 -DEFAULT_TASK_TIMEOUT = 4*60*60 -DEFAULT_IO_TIMEOUT = 40*60 -MILO_LOG_LINK = 'https://luci-milo.appspot.com/swarming/task/%s' +# TODO(borenet): This module was copied from build.git and heavily modified to +# remove dependencies on other modules in build.git. It belongs in a different +# repo. Remove this once it has been moved. -class SkiaSwarmingApi(recipe_api.RecipeApi): - """Provides steps to run Skia tasks on swarming bots.""" +# Minimally supported version of swarming.py script (reported by --version). +MINIMAL_SWARMING_VERSION = (0, 8, 6) + + +def text_for_task(task): + lines = [] + + if task.dimensions.get('id'): # pragma: no cover + lines.append('Bot id: %r' % task.dimensions['id']) + if task.dimensions.get('os'): + lines.append('Run on OS: %r' % task.dimensions['os']) + + return '<br/>'.join(lines) + + +def parse_time(value): + """Converts serialized time from the API to datetime.datetime.""" + # When microseconds are 0, the '.123456' suffix is elided. This means the + # serialized format is not consistent, which confuses the hell out of python. + # TODO(maruel): Remove third format once we enforce version >=0.8.2. + for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S'): + try: + return datetime.datetime.strptime(value, fmt) + except ValueError: # pragma: no cover + pass + raise ValueError('Failed to parse %s' % value) # pragma: no cover + + +class ReadOnlyDict(dict): + def __setitem__(self, key, value): + raise TypeError('ReadOnlyDict is immutable') + + +class SwarmingApi(recipe_api.RecipeApi): + """Recipe module to use swarming.py tool to run tasks on Swarming. + + General usage: + 1. Tweak default task parameters applied to all swarming tasks (such as + default_dimensions and default_priority). + 2. Isolate some test using 'isolate' recipe module. Get isolated hash as + a result of that process. + 3. Create a task configuration using 'task(...)' method, providing + isolated hash obtained previously. + 4. Tweak the task parameters. This step is optional. + 5. Launch the task on swarming by calling 'trigger_task(...)'. + 6. Continue doing useful work locally while the task is running concurrently + on swarming. + 7. Wait for task to finish and collect its result (exit code, logs) + by calling 'collect_task(...)'. + + See also example.py for concrete code. + """ + + State = state.State + + ############################################################################# + # The below are helper functions to help transition between the old and new # + # swarming result formats. TODO(martiniss): remove these # + ############################################################################# + + def _is_expired(self, shard): + # FIXME: We really should only have one format for enums. We want to move to + # strings, currently have numbers. + return ( + shard.get('state') == self.State.EXPIRED or + shard.get('state') == 'EXPIRED') + + def _is_timed_out(self, shard): + # FIXME: We really should only have one format for enums. We want to move to + # strings, currently have numbers. + return ( + shard.get('state') == self.State.TIMED_OUT or + shard.get('state') == 'TIMED_OUT') + + def _get_exit_code(self, shard): + if shard.get('exit_code'): + return shard.get('exit_code') # pragma: no cover + lst = shard.get('exit_codes', []) + return str(lst[0]) if lst else None + + def __init__(self, **kwargs): + super(SwarmingApi, self).__init__(**kwargs) + # All tests default to a x86-64 bot running with no GPU. This simplifies + # management so that new tests are not executed on exotic bots by accidents + # even if misconfigured. + self._default_dimensions = { + 'cpu': 'x86-64', + 'gpu': 'none', + } + # Expirations are set to mildly good values and will be tightened soon. + self._default_expiration = 60*60 + self._default_env = {} + self._default_hard_timeout = 60*60 + self._default_idempotent = False + self._default_io_timeout = 20*60 + # The default priority is extremely low and should be increased dependending + # on the type of task. + self._default_priority = 200 + self._default_tags = set() + self._default_user = None + self._pending_tasks = set() + self._show_isolated_out_in_collect_step = True + self._show_shards_in_collect_step = False + self._swarming_server = 'https://chromium-swarm.appspot.com' + self._verbose = False + + @recipe_util.returns_placeholder + def summary(self): + return self.m.json.output() + + @property + def swarming_server(self): + """URL of Swarming server to use, default is a production one.""" + return self._swarming_server + + @swarming_server.setter + def swarming_server(self, value): + """Changes URL of Swarming server to use.""" + self._swarming_server = value @property - def swarming_temp_dir(self): - """Path where artifacts like isolate file and json output will be stored.""" - return self.m.path['start_dir'].join('swarming_temp_dir') + def verbose(self): + """True to run swarming scripts with verbose output.""" + return self._verbose + + @verbose.setter + def verbose(self, value): + """Enables or disables verbose output in swarming scripts.""" + assert isinstance(value, bool), value + self._verbose = value @property - def tasks_output_dir(self): - """Directory where the outputs of the swarming tasks will be stored.""" - return self.swarming_temp_dir.join('outputs') - - def isolated_file_path(self, task_name): - """Get the path to the given task's .isolated file.""" - return self.swarming_temp_dir.join('skia-task-%s.isolated' % task_name) - - def setup(self, luci_go_dir, swarming_rev=None): - """Performs setup steps for swarming.""" - self.m.swarming_client.checkout(revision=swarming_rev) - self.m.swarming.check_client_version(step_test_data=(0, 8, 6)) - self.setup_go_isolate(luci_go_dir) - self.m.swarming.add_default_tag('allow_milo:1') - - # TODO(rmistry): Remove once the Go binaries are moved to recipes or buildbot. - def setup_go_isolate(self, luci_go_dir): - """Generates and puts in place the isolate Go binary.""" - depot_tools_path = self.m.depot_tools.package_repo_resource() - env = {'PATH': self.m.path.pathsep.join([ - str(depot_tools_path), '%(PATH)s'])} - with self.m.context(env=env): - self.m.step('download luci-go linux', - ['download_from_google_storage', '--no_resume', - '--platform=linux*', '--no_auth', - '--bucket', 'chromium-luci', - '-d', luci_go_dir.join('linux64')]) - self.m.step('download luci-go mac', - ['download_from_google_storage', '--no_resume', - '--platform=darwin', '--no_auth', - '--bucket', 'chromium-luci', - '-d', luci_go_dir.join('mac64')]) - self.m.step('download luci-go win', - ['download_from_google_storage', '--no_resume', - '--platform=win32', '--no_auth', '--bucket', - 'chromium-luci', - '-d', luci_go_dir.join('win64')]) - # Copy binaries to the expected location. - dest = self.m.path['start_dir'].join('luci-go') - self.m.run.rmtree(dest) - self.m.file.copytree('Copy Go binary', - source=luci_go_dir, - dest=dest) - - def create_isolated_gen_json(self, isolate_path, base_dir, os_type, - task_name, extra_variables, blacklist=None): - """Creates an isolated.gen.json file (used by the isolate recipe module). + def default_expiration(self): + """Number of seconds that the server will wait to find a bot able to run the + task. - Args: - isolate_path: path obj. Path to the isolate file. - base_dir: path obj. Dir that is the base of all paths in the isolate file. - os_type: str. The OS type to use when archiving the isolate file. - Eg: linux. - task_name: str. The isolated.gen.json file will be suffixed by this str. - extra_variables: dict of str to str. The extra vars to pass to isolate. - Eg: {'SLAVE_NUM': '1', 'MASTER': 'ChromiumPerfFYI'} - blacklist: list of regular expressions indicating which files/directories - not to archive. - """ - self.m.file.makedirs('swarming tmp dir', self.swarming_temp_dir) - isolated_path = self.isolated_file_path(task_name) - isolate_args = [ - '--isolate', isolate_path, - '--isolated', isolated_path, - '--config-variable', 'OS', os_type, - ] - if blacklist: - for b in blacklist: - isolate_args.extend(['--blacklist', b]) - for k, v in extra_variables.iteritems(): - isolate_args.extend(['--extra-variable', k, v]) - isolated_gen_dict = { - 'version': 1, - 'dir': base_dir, - 'args': isolate_args, - } - isolated_gen_json = self.swarming_temp_dir.join( - '%s.isolated.gen.json' % task_name) - self.m.file.write( - 'Write %s.isolated.gen.json' % task_name, - isolated_gen_json, - self.m.json.dumps(isolated_gen_dict, indent=4), - ) + If not bot runs the task by this number of seconds, the task is canceled as + EXPIRED. - def batcharchive(self, targets): - """Calls batcharchive on the skia.isolated.gen.json file. + This value can be changed per individual task. + """ + return self._default_expiration - Args: - targets: list of str. The suffixes of the isolated.gen.json files to - archive. + @default_expiration.setter + def default_expiration(self, value): + assert 30 <= value <= 24*60*60, value + self._default_expiration = value - Returns: - list of tuples containing (task_name, swarming_hash). + @property + def default_hard_timeout(self): + """Number of seconds in which the task must complete. + + If the task takes more than this amount of time, the process is assumed to + be hung. It forcibly killed via SIGTERM then SIGKILL after a grace period + (default: 30s). Then the task is marked as TIMED_OUT. + + This value can be changed per individual task. + """ + return self._default_hard_timeout + + @default_hard_timeout.setter + def default_hard_timeout(self, value): + assert 30 <= value <= 6*60*60, value + self._default_hard_timeout = value + + @property + def default_io_timeout(self): + """Number of seconds at which interval the task must write to stdout or + stderr. + + If the task takes more than this amount of time between writes to stdout or + stderr, the process is assumed to be hung. It forcibly killed via SIGTERM + then SIGKILL after a grace period (default: 30s). Then the task is marked as + TIMED_OUT. + + This value can be changed per individual task. + """ + return self._default_io_timeout + + @default_io_timeout.setter + def default_io_timeout(self, value): + assert 30 <= value <= 6*60*60, value + self._default_io_timeout = value + + @property + def default_idempotent(self): + """Bool to specify if task deduplication can be done. + + When set, the server will search for another task that ran in the last days + that had the exact same properties. If it finds one, the task will not be + run at all, the previous results will be returned as-is. + + For more infos, see: + https://github.com/luci/luci-py/blob/master/appengine/swarming/doc/User-Guide.md#task-idempotency + + This value can be changed per individual task. + """ + return self._default_idempotent + + @default_idempotent.setter + def default_idempotent(self, value): + assert isinstance(value, bool), value + self._default_idempotent = value + + @property + def default_user(self): + """String to represent who triggered the task. + + The user should be an email address when someone requested testing via + pre-commit or manual testing. + + This value can be changed per individual task. + """ + return self._default_user + + @default_user.setter + def default_user(self, value): + assert value is None or isinstance(value, basestring), value + self._default_user = value + + @property + def default_dimensions(self): + """Returns a copy of the default Swarming dimensions to run task on. + + The dimensions are what is used to filter which bots are able to run the + task successfully. This is particularly useful to discern between OS + versions, type of CPU, GPU card or VM, or preallocated pool. + + Example: + {'cpu': 'x86-64', 'os': 'Windows-XP-SP3'} + + This value can be changed per individual task. + """ + return ReadOnlyDict(self._default_dimensions) + + def set_default_dimension(self, key, value): + assert isinstance(key, basestring), key + assert isinstance(value, basestring) or value is None, value + if value is None: + self._default_dimensions.pop(key, None) + else: + self._default_dimensions[key] = value # pragma: no cover + + @property + def default_env(self): + """Returns a copy of the default environment variable to run tasks with. + + By default the environment variable is not modified. Additional environment + variables can be specified for each task. + + This value can be changed per individual task. + """ + return ReadOnlyDict(self._default_env) + + def set_default_env(self, key, value): + assert isinstance(key, basestring), key + assert isinstance(value, basestring), value + self._default_env[key] = value + + @property + def default_priority(self): + """Swarming task priority for tasks triggered from the recipe. + + Priority ranges from 1 to 255. The lower the value, the most important the + task is and will preempty any task with a lower priority. + + This value can be changed per individual task. + """ + return self._default_priority + + @default_priority.setter + def default_priority(self, value): + assert 1 <= value <= 255 + self._default_priority = value + + def add_default_tag(self, tag): + """Adds a tag to the Swarming tasks triggered. + + Tags are used for maintenance, they can be used to calculate the number of + tasks run for a day to calculate the cost of a type of type (CQ, ASAN, etc). + + Tags can be added per individual task. + """ + assert ':' in tag, tag + self._default_tags.add(tag) + + @property + def show_isolated_out_in_collect_step(self): + """Show the shard's isolated out link in each collect step.""" + return self._show_isolated_out_in_collect_step + + @show_isolated_out_in_collect_step.setter + def show_isolated_out_in_collect_step(self, value): + self._show_isolated_out_in_collect_step = value + + @property + def show_shards_in_collect_step(self): + """Show the shard link in each collect step.""" + return self._show_shards_in_collect_step + + @show_shards_in_collect_step.setter + def show_shards_in_collect_step(self, value): + self._show_shards_in_collect_step = value + + @staticmethod + def prefered_os_dimension(platform): + """Given a platform name returns the prefered Swarming OS dimension. + + Platform name is usually provided by 'platform' recipe module, it's one + of 'win', 'linux', 'mac'. This function returns more concrete Swarming OS + dimension that represent this platform on Swarming by default. + + Recipes are free to use other OS dimension if there's a need for it. For + example WinXP try bot recipe may explicitly specify 'Windows-XP-SP3' + dimension. """ - return self.m.isolate.isolate_tests( - verbose=True, # To avoid no output timeouts. - build_dir=self.swarming_temp_dir, - targets=targets).presentation.properties['swarm_hashes'].items() + return { + 'linux': 'Ubuntu-14.04', + 'mac': 'Mac-10.9', + 'win': 'Windows-7-SP1', + }[platform] + + def task(self, title, isolated_hash, ignore_task_failure=False, shards=1, + task_output_dir=None, extra_args=None, idempotent=None, + cipd_packages=None, build_properties=None, merge=None): + """Returns a new SwarmingTask instance to run an isolated executable on + Swarming. + + For google test executables, use gtest_task() instead. - def trigger_swarming_tasks( - self, swarm_hashes, dimensions, idempotent=False, store_output=True, - extra_args=None, expiration=None, hard_timeout=None, io_timeout=None, - cipd_packages=None): - """Triggers swarming tasks using swarm hashes. + At the time of this writting, this code is used by V8, Skia and iOS. + + The return value can be customized if necessary (see SwarmingTask class + below). Pass it to 'trigger_task' to launch it on swarming. Later pass the + same instance to 'collect_task' to wait for the task to finish and fetch its + results. Args: - swarm_hashes: list of str. List of swarm hashes from the isolate server. - dimensions: dict of str to str. The dimensions to run the task on. - Eg: {'os': 'Ubuntu', 'gpu': '10de', 'pool': 'Skia'} - idempotent: bool. Whether or not to de-duplicate tasks. - store_output: bool. Whether task output should be stored. - extra_args: list of str. Extra arguments to pass to the task. - expiration: int. Task will expire if not picked up within this time. - DEFAULT_TASK_EXPIRATION is used if this argument is None. - hard_timeout: int. Task will timeout if not completed within this time. - DEFAULT_TASK_TIMEOUT is used if this argument is None. - io_timeout: int. Task will timeout if there is no output within this time. - DEFAULT_IO_TIMEOUT is used if this argument is None. - cipd_packages: CIPD packages which these tasks depend on. + title: name of the test, used as part of a task ID. + isolated_hash: hash of isolated test on isolate server, the test should + be already isolated there, see 'isolate' recipe module. + ignore_task_failure: whether to ignore the test failure of swarming + tasks. By default, this is set to False. + shards: if defined, the number of shards to use for the task. By default + this value is either 1 or based on the title. + task_output_dir: if defined, the directory where task results are placed. + The caller is responsible for removing this folder when finished. + extra_args: list of command line arguments to pass to isolated tasks. + idempotent: whether this task is considered idempotent. Defaults + to self.default_idempotent if not specified. + cipd_packages: list of 3-tuples corresponding to CIPD packages needed for + the task: ('path', 'package_name', 'version'), defined as follows: + path: Path relative to the Swarming root dir in which to install + the package. + package_name: Name of the package to install, + eg. "infra/tools/authutil/${platform}" + version: Version of the package, either a package instance ID, + ref, or tag key/value pair. + build_properties: An optional dict containing various build properties. + These are typically but not necessarily the properties emitted by + bot_update. + merge: An optional dict containing: + "script": path to a script to call to post process and merge the + collected outputs from the tasks. The script should take one + named (but required) parameter, '-o' (for output), that represents + the path that the merged results should be written to, and accept + N additional paths to result files to merge. The merged results + should be in the JSON Results File Format + (https://www.chromium.org/developers/the-json-test-results-format) + and may optionally contain a top level "links" field that + may contain a dict mapping link text to URLs, for a set of + links that will be included in the buildbot output. + "args": an optional list of additional arguments to pass to the + above script. + """ + if idempotent is None: + idempotent = self.default_idempotent + return SwarmingTask( + title=title, + isolated_hash=isolated_hash, + dimensions=self._default_dimensions, + env=self._default_env, + priority=self.default_priority, + shards=shards, + buildername=self.m.properties.get('buildername'), + buildnumber=self.m.properties.get('buildnumber'), + user=self.default_user, + expiration=self.default_expiration, + io_timeout=self.default_io_timeout, + hard_timeout=self.default_hard_timeout, + idempotent=idempotent, + ignore_task_failure=ignore_task_failure, + extra_args=extra_args, + collect_step=self._default_collect_step, + task_output_dir=task_output_dir, + cipd_packages=cipd_packages, + build_properties=build_properties, + merge=merge) - Returns: - List of swarming.SwarmingTask instances. - """ - swarming_tasks = [] - for task_name, swarm_hash in swarm_hashes: - swarming_task = self.m.swarming.task( - title=task_name, - cipd_packages=cipd_packages, - isolated_hash=swarm_hash) - if store_output: - swarming_task.task_output_dir = self.tasks_output_dir.join(task_name) - swarming_task.dimensions = dimensions - swarming_task.idempotent = idempotent - swarming_task.priority = 90 - swarming_task.expiration = ( - expiration if expiration else DEFAULT_TASK_EXPIRATION) - swarming_task.hard_timeout = ( - hard_timeout if hard_timeout else DEFAULT_TASK_TIMEOUT) - swarming_task.io_timeout = ( - io_timeout if io_timeout else DEFAULT_IO_TIMEOUT) - if extra_args: - swarming_task.extra_args = extra_args - revision = self.m.properties.get('revision') - if revision: - swarming_task.tags.add('revision:%s' % revision) - swarming_tasks.append(swarming_task) - step_results = self.m.swarming.trigger(swarming_tasks) - for step_result in step_results: - self._add_log_links(step_result, step_result.json.output) - return swarming_tasks - - def collect_swarming_task(self, swarming_task): - """Collects the specified swarming task. + def check_client_version(self, step_test_data=None): + """Yields steps to verify compatibility with swarming_client version.""" + return self.m.swarming_client.ensure_script_version( + 'swarming.py', MINIMAL_SWARMING_VERSION, step_test_data) + + def trigger_task(self, task, **kwargs): + """Triggers one task. + + It the task is sharded, will trigger all shards. This steps justs posts + the task and immediately returns. Use 'collect_task' to wait for a task to + finish and grab its result. + + Behaves as a regular recipe step: returns StepData with step results + on success or raises StepFailure if step fails. Args: - swarming_task: An instance of swarming.SwarmingTask. + task: SwarmingTask instance. + kwargs: passed to recipe step constructor as-is. """ + assert isinstance(task, SwarmingTask) + assert task.task_name not in self._pending_tasks, ( + 'Triggered same task twice: %s' % task.task_name) + assert 'os' in task.dimensions, task.dimensions + self._pending_tasks.add(task.task_name) + + # Trigger parameters. + args = [ + 'trigger', + '--swarming', self.swarming_server, + '--isolate-server', self.m.isolate.isolate_server, + '--priority', str(task.priority), + '--shards', str(task.shards), + '--task-name', task.task_name, + '--dump-json', self.m.json.output(), + '--expiration', str(task.expiration), + '--io-timeout', str(task.io_timeout), + '--hard-timeout', str(task.hard_timeout), + ] + for name, value in sorted(task.dimensions.iteritems()): + assert isinstance(value, basestring), value + args.extend(['--dimension', name, value]) + for name, value in sorted(task.env.iteritems()): + assert isinstance(value, basestring), value + args.extend(['--env', name, value]) + + # Default tags. + tags = set(task.tags) + tags.update(self._default_tags) + tags.add('data:' + task.isolated_hash) + tags.add('name:' + task.title.split(' ')[0]) + mastername = self.m.properties.get('mastername') + if mastername: # pragma: no cover + tags.add('master:' + mastername) + if task.buildername: # pragma: no cover + tags.add('buildername:' + task.buildername) + if task.buildnumber: # pragma: no cover + tags.add('buildnumber:%s' % task.buildnumber) + if task.dimensions.get('os'): + tags.add('os:' + task.dimensions['os']) + if self.m.properties.get('bot_id'): # pragma: no cover + tags.add('slavename:%s' % self.m.properties['bot_id']) + tags.add('stepname:%s' % self.get_step_name('', task)) + rietveld = self.m.properties.get('rietveld') + issue = self.m.properties.get('issue') + patchset = self.m.properties.get('patchset') + if rietveld and issue and patchset: + # The expected format is strict to the usage of buildbot properties on the + # Chromium Try Server. Fix if necessary. + tags.add('rietveld:%s/%s/#ps%s' % (rietveld, issue, patchset)) + for tag in sorted(tags): + assert ':' in tag, tag + args.extend(['--tag', tag]) + + if self.verbose: + args.append('--verbose') + if task.idempotent: + args.append('--idempotent') + if task.user: + args.extend(['--user', task.user]) + + if task.cipd_packages: + for path, pkg, version in task.cipd_packages: + args.extend(['--cipd-package', '%s:%s:%s' % (path, pkg, version)]) + + # What isolated command to trigger. + args.extend(('--isolated', task.isolated_hash)) + + # Additional command line args for isolated command. + if task.extra_args: # pragma: no cover + args.append('--') + args.extend(task.extra_args) + + # The step can fail only on infra failures, so mark it as 'infra_step'. try: - rv = self.m.swarming.collect_task(swarming_task) - except self.m.step.StepFailure as e: # pragma: no cover - step_result = self.m.step.active_result - # Change step result to Infra failure if the swarming task failed due to - # expiration, time outs, bot crashes or task cancelations. - # Infra failures have step.EXCEPTION. - states_infra_failure = ( - self.m.swarming.State.EXPIRED, self.m.swarming.State.TIMED_OUT, - self.m.swarming.State.BOT_DIED, self.m.swarming.State.CANCELED) - summary = step_result.swarming.summary - if summary['shards'][0]['state'] in states_infra_failure: - step_result.presentation.status = self.m.step.EXCEPTION - raise self.m.step.InfraFailure(e.name, step_result) - raise + return self.m.python( + name=self.get_step_name('trigger', task), + script=self.m.swarming_client.path.join('swarming.py'), + args=args, + step_test_data=functools.partial( + self._gen_trigger_step_test_data, task), + infra_step=True, + **kwargs) finally: + # Store trigger output with the |task|, print links to triggered shards. step_result = self.m.step.active_result - # Add log link. - self._add_log_links(step_result, step_result.swarming.summary) - return rv - - def _add_log_links(self, step_result, summary): - """Add Milo log links to all shards in the step.""" - ids = [] - shards = summary.get('shards') - if shards: - for shard in shards: - ids.append(shard['id']) + step_result.presentation.step_text += text_for_task(task) + + if step_result.presentation != self.m.step.FAILURE: + task._trigger_output = step_result.json.output + links = step_result.presentation.links + for index in xrange(task.shards): + url = task.get_shard_view_url(index) + if url: + links['shard #%d' % index] = url + assert not hasattr(step_result, 'swarming_task') + step_result.swarming_task = task + + def collect_task(self, task, **kwargs): + """Waits for a single triggered task to finish. + + If the task is sharded, will wait for all shards to finish. Behaves as + a regular recipe step: returns StepData with step results on success or + raises StepFailure if task fails. + + Args: + task: SwarmingTask instance, previously triggered with 'trigger' method. + kwargs: passed to recipe step constructor as-is. + """ + # TODO(vadimsh): Raise InfraFailure on Swarming failures. + assert isinstance(task, SwarmingTask) + assert task.task_name in self._pending_tasks, ( + 'Trying to collect a task that was not triggered: %s' % + task.task_name) + self._pending_tasks.remove(task.task_name) + + try: + return task.collect_step(task, **kwargs) + finally: + try: + self.m.step.active_result.swarming_task = task + except Exception: # pragma: no cover + # If we don't have an active_result, something failed very early, + # so we eat this exception and let that one propagate. + pass + + def trigger(self, tasks, **kwargs): # pragma: no cover + """Batch version of 'trigger_task'. + + Deprecated, to be removed soon. Use 'trigger_task' in a loop instead, + properly handling exceptions. This method doesn't handle trigger failures + well (it aborts on a first failure). + """ + return [self.trigger_task(t, **kwargs) for t in tasks] + + def collect(self, tasks, **kwargs): # pragma: no cover + """Batch version of 'collect_task'. + + Deprecated, to be removed soon. Use 'collect_task' in a loop instead, + properly handling exceptions. This method doesn't handle collect failures + well (it aborts on a first failure). + """ + return [self.collect_task(t, **kwargs) for t in tasks] + + # To keep compatibility with some build_internal code. To be removed as well. + collect_each = collect + + @staticmethod + def _display_pending(summary_json, step_presentation): + """Shows max pending time in seconds across all shards if it exceeds 10s.""" + pending_times = [ + (parse_time(shard['started_ts']) - + parse_time(shard['created_ts'])).total_seconds() + for shard in summary_json.get('shards', []) if shard.get('started_ts') + ] + max_pending = max(pending_times) if pending_times else 0 + + # Only display annotation when pending more than 10 seconds to reduce noise. + if max_pending > 10: + step_presentation.step_text += '<br>swarming pending %ds' % max_pending + + def _default_collect_step( + self, task, merged_test_output=None, + step_test_data=None, + **kwargs): + """Produces a step that collects a result of an arbitrary task.""" + task_output_dir = task.task_output_dir or self.m.raw_io.output_dir() + + # If we don't already have a Placeholder, wrap the task_output_dir in one + # so we can read out of it later w/ step_result.raw_io.output_dir. + if not isinstance(task_output_dir, recipe_util.Placeholder): + task_output_dir = self.m.raw_io.output_dir(leak_to=task_output_dir) + + task_args = [ + '-o', merged_test_output or self.m.json.output(), + '--task-output-dir', task_output_dir, + ] + + merge_script = (task.merge.get('script') + or self.resource('noop_merge.py')) + merge_args = (task.merge.get('args') or []) + + task_args.extend([ + '--merge-script', merge_script, + '--merge-additional-args', self.m.json.dumps(merge_args), + ]) + + if task.build_properties: # pragma: no cover + properties = dict(task.build_properties) + properties.update(self.m.properties) + task_args.extend([ + '--build-properties', self.m.json.dumps(properties), + ]) + + task_args.append('--') + # Arguments for the actual 'collect' command. + collect_cmd = [ + 'python', + '-u', + self.m.swarming_client.path.join('swarming.py'), + ] + collect_cmd.extend(self.get_collect_cmd_args(task)) + collect_cmd.extend([ + '--task-summary-json', self.summary(), + ]) + + task_args.extend(collect_cmd) + + allowed_return_codes = {0} + if task.ignore_task_failure: # pragma: no cover + allowed_return_codes = 'any' + + # The call to collect_task emits two JSON files: + # 1) a task summary JSON emitted by swarming + # 2) a gtest results JSON emitted by the task + # This builds an instance of StepTestData that covers both. + step_test_data = step_test_data or ( + self.test_api.canned_summary_output(task.shards) + + self.m.json.test_api.output({})) + + try: + with self.m.context(cwd=self.m.path['start_dir']): + return self.m.python( + name=self.get_step_name('', task), + script=self.resource('collect_task.py'), + args=task_args, + ok_ret=allowed_return_codes, + step_test_data=lambda: step_test_data, + **kwargs) + finally: + step_result = None + try: + step_result = self.m.step.active_result + step_result.presentation.step_text = text_for_task(task) + summary_json = step_result.swarming.summary + self._handle_summary_json(task, summary_json, step_result) + + links = {} + if hasattr(step_result, 'json') and hasattr(step_result.json, 'output'): + links = step_result.json.output.get('links', {}) + for k, v in links.iteritems(): # pragma: no cover + step_result.presentation.links[k] = v + except Exception as e: + if step_result: + step_result.presentation.logs['no_results_exc'] = [str(e)] + + def get_step_name(self, prefix, task): + """SwarmingTask -> name of a step of a waterfall. + + Will take a task title (+ step name prefix) and append OS dimension to it. + + Args: + prefix: prefix to append to task title, like 'trigger'. + task: SwarmingTask instance. + + Returns: + '[<prefix>] <task title> on <OS>' + """ + prefix = '[%s] ' % prefix if prefix else '' + task_os = task.dimensions['os'] + + bot_os = self.prefered_os_dimension(self.m.platform.name) + suffix = ('' if ( + task_os == bot_os or task_os.lower() == self.m.platform.name.lower()) + else ' on %s' % task_os) + # Note: properly detecting dimensions of the bot the recipe is running + # on is somewhat non-trivial. It is not safe to assume it uses default + # or preferred dimensions for its OS. For example, the version of the OS + # can differ. + return ''.join((prefix, task.title, suffix)) + + def _handle_summary_json(self, task, summary, step_result): + # We store this now, and add links to all shards first, before failing the + # build. Format is tuple of (error message, shard that failed) + infra_failures = [] + links = step_result.presentation.links + for index, shard in enumerate(summary['shards']): + url = task.get_shard_view_url(index) + display_text = 'shard #%d' % index + + if not shard or shard.get('internal_failure'): # pragma: no cover + display_text = ( + 'shard #%d had an internal swarming failure' % index) + infra_failures.append((index, 'Internal swarming failure')) + elif self._is_expired(shard): + display_text = ( + 'shard #%d expired, not enough capacity' % index) + infra_failures.append(( + index, 'There isn\'t enough capacity to run your test')) + elif self._is_timed_out(shard): + display_text = ( + 'shard #%d timed out, took too much time to complete' % index) + elif self._get_exit_code(shard) != '0': # pragma: no cover + display_text = 'shard #%d (failed)' % index + + if self.show_isolated_out_in_collect_step: + isolated_out = shard.get('isolated_out') + if isolated_out: + link_name = 'shard #%d isolated out' % index + links[link_name] = isolated_out['view_url'] + + if url and self.show_shards_in_collect_step: + links[display_text] = url + + self._display_pending(summary, step_result.presentation) + + if infra_failures: + template = 'Shard #%s failed: %s' + + # Done so that raising an InfraFailure doesn't cause an error. + # TODO(martiniss): Remove this hack. Requires recipe engine change + step_result._retcode = 2 + step_result.presentation.status = self.m.step.EXCEPTION + raise recipe_api.InfraFailure( + '\n'.join(template % f for f in infra_failures), result=step_result) + + def get_collect_cmd_args(self, task): + """SwarmingTask -> argument list for 'swarming.py' command.""" + args = [ + 'collect', + '--swarming', self.swarming_server, + '--decorate', + '--print-status-updates', + ] + if self.verbose: + args.append('--verbose') + args.extend(('--json', self.m.json.input(task.trigger_output))) + return args + + def _gen_trigger_step_test_data(self, task): + """Generates an expected value of --dump-json in 'trigger' step. + + Used when running recipes to generate test expectations. + """ + # Suffixes of shard subtask names. + subtasks = [] + if task.shards == 1: + subtasks = [''] else: - for _, task in summary.get('tasks', {}).iteritems(): - ids.append(task['task_id']) - for idx, task_id in enumerate(ids): - link = MILO_LOG_LINK % task_id - k = 'view steps on Milo' - if len(ids) > 1: # pragma: nocover - k += ' (shard index %d, %d total)' % (idx, len(ids)) - step_result.presentation.links[k] = link + subtasks = [':%d:%d' % (task.shards, i) for i in range(task.shards)] + return self.m.json.test_api.output({ + 'base_task_name': task.task_name, + 'tasks': { + '%s%s' % (task.task_name, suffix): { + 'task_id': '1%02d00' % i, + 'shard_index': i, + 'view_url': '%s/user/task/1%02d00' % (self.swarming_server, i), + } for i, suffix in enumerate(subtasks) + }, + }) + + +class SwarmingTask(object): + """Definition of a task to run on swarming.""" + def __init__(self, title, isolated_hash, ignore_task_failure, dimensions, + env, priority, shards, buildername, buildnumber, expiration, + user, io_timeout, hard_timeout, idempotent, extra_args, + collect_step, task_output_dir, cipd_packages=None, + build_properties=None, merge=None): + """Configuration of a swarming task. + + Args: + title: display name of the task, hints to what task is doing. Usually + corresponds to a name of a test executable. Doesn't have to be unique. + isolated_hash: hash of isolated file that describes all files needed to + run the task as well as command line to launch. See 'isolate' recipe + module. + ignore_task_failure: whether to ignore the test failure of swarming + tasks. + cipd_packages: list of 3-tuples corresponding to CIPD packages needed for + the task: ('path', 'package_name', 'version'), defined as follows: + path: Path relative to the Swarming root dir in which to install + the package. + package_name: Name of the package to install, + eg. "infra/tools/authutil/${platform}" + version: Version of the package, either a package instance ID, + ref, or tag key/value pair. + collect_step: callback that will be called to collect and processes + results of task execution, signature is collect_step(task, **kwargs). + dimensions: key-value mapping with swarming dimensions that specify + on what Swarming slaves task can run. One important dimension is 'os', + which defines platform flavor to run the task on. See Swarming doc. + env: key-value mapping with additional environment variables to add to + environment before launching the task executable. + priority: integer [0, 255] that defines how urgent the task is. + Lower value corresponds to higher priority. Swarming service executes + tasks with higher priority first. + shards: how many concurrent shards to run, makes sense only for + isolated tests based on gtest. Swarming uses GTEST_SHARD_INDEX + and GTEST_TOTAL_SHARDS environment variables to tell the executable + what shard to run. + buildername: buildbot builder this task was triggered from. + buildnumber: build number of a build this task was triggered from. + expiration: number of schedule until the task shouldn't even be run if it + hadn't started yet. + user: user that requested this task, if applicable. + io_timeout: number of seconds that the task is allowed to not emit any + stdout bytes, after which it is forcibly killed. + hard_timeout: number of seconds for which the task is allowed to run, + after which it is forcibly killed. + idempotent: True if the results from a previous task can be reused. E.g. + this task has no side-effects. + extra_args: list of command line arguments to pass to isolated tasks. + task_output_dir: if defined, the directory where task results are placed + during the collect step. + build_properties: An optional dict containing various build properties. + These are typically but not necessarily the properties emitted by + bot_update. + merge: An optional dict containing: + "script": path to a script to call to post process and merge the + collected outputs from the tasks. + "args": an optional list of additional arguments to pass to the + above script. + """ + self._trigger_output = None + self.build_properties = build_properties + self.buildername = buildername + self.buildnumber = buildnumber + self.cipd_packages = cipd_packages + self.collect_step = collect_step + self.dimensions = dimensions.copy() + self.env = env.copy() + self.expiration = expiration + self.extra_args = tuple(extra_args or []) + self.hard_timeout = hard_timeout + self.idempotent = idempotent + self.ignore_task_failure = ignore_task_failure + self.io_timeout = io_timeout + self.isolated_hash = isolated_hash + self.merge = merge or {} + self.priority = priority + self.shards = shards + self.tags = set() + self.task_output_dir = task_output_dir + self.title = title + self.user = user + + @property + def task_name(self): + """Name of this task, derived from its other properties. + + The task name is purely to make sense of the task and is not used in any + other way. + """ + out = '%s/%s/%s' % ( + self.title, self.dimensions['os'], self.isolated_hash[:10]) + if self.buildername: # pragma: no cover + out += '/%s/%s' % (self.buildername, self.buildnumber or -1) + return out + + @property + def trigger_output(self): + """JSON results of 'trigger' step or None if not triggered.""" + return self._trigger_output + + def get_shard_view_url(self, index): + """Returns URL of HTML page with shard details or None if not available. + + Works only after the task has been successfully triggered. + """ + if self._trigger_output and self._trigger_output.get('tasks'): + for shard_dict in self._trigger_output['tasks'].itervalues(): + if shard_dict['shard_index'] == index: + return shard_dict['view_url'] |