aboutsummaryrefslogtreecommitdiffhomepage
path: root/infra/bots/recipe_modules/skia_swarming/api.py
diff options
context:
space:
mode:
Diffstat (limited to 'infra/bots/recipe_modules/skia_swarming/api.py')
-rw-r--r--infra/bots/recipe_modules/skia_swarming/api.py285
1 files changed, 285 insertions, 0 deletions
diff --git a/infra/bots/recipe_modules/skia_swarming/api.py b/infra/bots/recipe_modules/skia_swarming/api.py
new file mode 100644
index 0000000000..e5f05b1491
--- /dev/null
+++ b/infra/bots/recipe_modules/skia_swarming/api.py
@@ -0,0 +1,285 @@
+# Copyright 2016 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+
+from recipe_engine import recipe_api
+import shlex
+
+
+DEFAULT_TASK_EXPIRATION = 20*60*60
+DEFAULT_TASK_TIMEOUT = 4*60*60
+DEFAULT_IO_TIMEOUT = 40*60
+
+MILO_LOG_LINK = 'https://luci-milo.appspot.com/swarming/task/%s'
+
+
+class SkiaSwarmingApi(recipe_api.RecipeApi):
+ """Provides steps to run Skia tasks on swarming bots."""
+
+ @property
+ def swarming_temp_dir(self):
+ """Path where artifacts like isolate file and json output will be stored."""
+ return self.m.path['slave_build'].join('swarming_temp_dir')
+
+ @property
+ def tasks_output_dir(self):
+ """Directory where the outputs of the swarming tasks will be stored."""
+ return self.swarming_temp_dir.join('outputs')
+
+ def isolated_file_path(self, task_name):
+ """Get the path to the given task's .isolated file."""
+ return self.swarming_temp_dir.join('skia-task-%s.isolated' % task_name)
+
+ def setup(self, luci_go_dir, swarming_rev=None):
+ """Performs setup steps for swarming."""
+ self.m.swarming_client.checkout(revision=swarming_rev)
+ self.m.swarming.check_client_version(step_test_data=(0, 8, 6))
+ self.setup_go_isolate(luci_go_dir)
+ self.m.swarming.add_default_tag('allow_milo:1')
+
+ # TODO(rmistry): Remove once the Go binaries are moved to recipes or buildbot.
+ def setup_go_isolate(self, luci_go_dir):
+ """Generates and puts in place the isolate Go binary."""
+ self.m.step('download luci-go linux',
+ ['download_from_google_storage', '--no_resume',
+ '--platform=linux*', '--no_auth', '--bucket', 'chromium-luci',
+ '-d', luci_go_dir.join('linux64')])
+ self.m.step('download luci-go mac',
+ ['download_from_google_storage', '--no_resume',
+ '--platform=darwin', '--no_auth', '--bucket', 'chromium-luci',
+ '-d', luci_go_dir.join('mac64')])
+ self.m.step('download luci-go win',
+ ['download_from_google_storage', '--no_resume',
+ '--platform=win32', '--no_auth', '--bucket', 'chromium-luci',
+ '-d', luci_go_dir.join('win64')])
+ # Copy binaries to the expected location.
+ dest = self.m.path['slave_build'].join('luci-go')
+ self.m.skia.rmtree(dest)
+ self.m.file.copytree('Copy Go binary',
+ source=luci_go_dir,
+ dest=dest)
+
+ def isolate_and_trigger_task(
+ self, isolate_path, isolate_base_dir, task_name, isolate_vars,
+ swarm_dimensions, isolate_blacklist=None, extra_isolate_hashes=None,
+ idempotent=False, store_output=True, extra_args=None, expiration=None,
+ hard_timeout=None, io_timeout=None, cipd_packages=None):
+ """Isolate inputs and trigger the task to run."""
+ os_type = swarm_dimensions.get('os', 'linux')
+ isolated_hash = self.isolate_task(
+ isolate_path, isolate_base_dir, os_type, task_name, isolate_vars,
+ blacklist=isolate_blacklist, extra_hashes=extra_isolate_hashes)
+ tasks = self.trigger_swarming_tasks([(task_name, isolated_hash)],
+ swarm_dimensions,
+ idempotent=idempotent,
+ store_output=store_output,
+ extra_args=extra_args,
+ expiration=expiration,
+ hard_timeout=hard_timeout,
+ io_timeout=io_timeout,
+ cipd_packages=cipd_packages)
+ assert len(tasks) == 1
+ return tasks[0]
+
+ def isolate_task(self, isolate_path, base_dir, os_type, task_name,
+ isolate_vars, blacklist=None, extra_hashes=None):
+ """Isolate inputs for the given task."""
+ self.create_isolated_gen_json(isolate_path, base_dir, os_type,
+ task_name, isolate_vars,
+ blacklist=blacklist)
+ hashes = self.batcharchive([task_name])
+ assert len(hashes) == 1
+ isolated_hash = hashes[0][1]
+ if extra_hashes:
+ isolated_hash = self.add_isolated_includes(task_name, extra_hashes)
+ return isolated_hash
+
+ def create_isolated_gen_json(self, isolate_path, base_dir, os_type,
+ task_name, extra_variables, blacklist=None):
+ """Creates an isolated.gen.json file (used by the isolate recipe module).
+
+ Args:
+ isolate_path: path obj. Path to the isolate file.
+ base_dir: path obj. Dir that is the base of all paths in the isolate file.
+ os_type: str. The OS type to use when archiving the isolate file.
+ Eg: linux.
+ task_name: str. The isolated.gen.json file will be suffixed by this str.
+ extra_variables: dict of str to str. The extra vars to pass to isolate.
+ Eg: {'SLAVE_NUM': '1', 'MASTER': 'ChromiumPerfFYI'}
+ blacklist: list of regular expressions indicating which files/directories
+ not to archive.
+ """
+ self.m.file.makedirs('swarming tmp dir', self.swarming_temp_dir)
+ isolated_path = self.isolated_file_path(task_name)
+ isolate_args = [
+ '--isolate', isolate_path,
+ '--isolated', isolated_path,
+ '--config-variable', 'OS', os_type,
+ ]
+ if blacklist:
+ for b in blacklist:
+ isolate_args.extend(['--blacklist', b])
+ for k, v in extra_variables.iteritems():
+ isolate_args.extend(['--extra-variable', k, v])
+ isolated_gen_dict = {
+ 'version': 1,
+ 'dir': base_dir,
+ 'args': isolate_args,
+ }
+ isolated_gen_json = self.swarming_temp_dir.join(
+ '%s.isolated.gen.json' % task_name)
+ self.m.file.write(
+ 'Write %s.isolated.gen.json' % task_name,
+ isolated_gen_json,
+ self.m.json.dumps(isolated_gen_dict, indent=4),
+ )
+
+ def batcharchive(self, targets):
+ """Calls batcharchive on the skia.isolated.gen.json file.
+
+ Args:
+ targets: list of str. The suffixes of the isolated.gen.json files to
+ archive.
+
+ Returns:
+ list of tuples containing (task_name, swarming_hash).
+ """
+ return self.m.isolate.isolate_tests(
+ verbose=True, # To avoid no output timeouts.
+ build_dir=self.swarming_temp_dir,
+ targets=targets).presentation.properties['swarm_hashes'].items()
+
+ def add_isolated_includes(self, task_name, include_hashes):
+ """Add the hashes to the task's .isolated file, return new .isolated hash.
+
+ Args:
+ task: str. Name of the task to which to add the given hash.
+ include_hashes: list of str. Hashes of the new includes.
+ Returns:
+ Updated hash of the .isolated file.
+ """
+ isolated_file = self.isolated_file_path(task_name)
+ self.m.python.inline('add_isolated_input', program="""
+ import json
+ import sys
+ with open(sys.argv[1]) as f:
+ isolated = json.load(f)
+ if not isolated.get('includes'):
+ isolated['includes'] = []
+ for h in sys.argv[2:]:
+ isolated['includes'].append(h)
+ with open(sys.argv[1], 'w') as f:
+ json.dump(isolated, f, sort_keys=True)
+ """, args=[isolated_file] + include_hashes)
+ isolateserver = self.m.swarming_client.path.join('isolateserver.py')
+ r = self.m.python('upload new .isolated file for %s' % task_name,
+ script=isolateserver,
+ args=['archive', '--isolate-server',
+ self.m.isolate.isolate_server, isolated_file],
+ stdout=self.m.raw_io.output())
+ return shlex.split(r.stdout)[0]
+
+ def trigger_swarming_tasks(
+ self, swarm_hashes, dimensions, idempotent=False, store_output=True,
+ extra_args=None, expiration=None, hard_timeout=None, io_timeout=None,
+ cipd_packages=None):
+ """Triggers swarming tasks using swarm hashes.
+
+ Args:
+ swarm_hashes: list of str. List of swarm hashes from the isolate server.
+ dimensions: dict of str to str. The dimensions to run the task on.
+ Eg: {'os': 'Ubuntu', 'gpu': '10de', 'pool': 'Skia'}
+ idempotent: bool. Whether or not to de-duplicate tasks.
+ store_output: bool. Whether task output should be stored.
+ extra_args: list of str. Extra arguments to pass to the task.
+ expiration: int. Task will expire if not picked up within this time.
+ DEFAULT_TASK_EXPIRATION is used if this argument is None.
+ hard_timeout: int. Task will timeout if not completed within this time.
+ DEFAULT_TASK_TIMEOUT is used if this argument is None.
+ io_timeout: int. Task will timeout if there is no output within this time.
+ DEFAULT_IO_TIMEOUT is used if this argument is None.
+ cipd_packages: CIPD packages which these tasks depend on.
+
+ Returns:
+ List of swarming.SwarmingTask instances.
+ """
+ swarming_tasks = []
+ for task_name, swarm_hash in swarm_hashes:
+ swarming_task = self.m.swarming.task(
+ title=task_name,
+ cipd_packages=cipd_packages,
+ isolated_hash=swarm_hash)
+ if store_output:
+ swarming_task.task_output_dir = self.tasks_output_dir.join(task_name)
+ swarming_task.dimensions = dimensions
+ swarming_task.idempotent = idempotent
+ swarming_task.priority = 90
+ swarming_task.expiration = (
+ expiration if expiration else DEFAULT_TASK_EXPIRATION)
+ swarming_task.hard_timeout = (
+ hard_timeout if hard_timeout else DEFAULT_TASK_TIMEOUT)
+ swarming_task.io_timeout = (
+ io_timeout if io_timeout else DEFAULT_IO_TIMEOUT)
+ if extra_args:
+ swarming_task.extra_args = extra_args
+ swarming_tasks.append(swarming_task)
+ step_results = self.m.swarming.trigger(swarming_tasks)
+ for step_result in step_results:
+ self._add_log_links(step_result)
+ return swarming_tasks
+
+ def collect_swarming_task(self, swarming_task):
+ """Collects the specified swarming task.
+
+ Args:
+ swarming_task: An instance of swarming.SwarmingTask.
+ """
+ try:
+ rv = self.m.swarming.collect_task(swarming_task)
+ except self.m.step.StepFailure as e: # pragma: no cover
+ step_result = self.m.step.active_result
+ # Change step result to Infra failure if the swarming task failed due to
+ # expiration, time outs, bot crashes or task cancelations.
+ # Infra failures have step.EXCEPTION.
+ states_infra_failure = (
+ self.m.swarming.State.EXPIRED, self.m.swarming.State.TIMED_OUT,
+ self.m.swarming.State.BOT_DIED, self.m.swarming.State.CANCELED)
+ if step_result.json.output['shards'][0]['state'] in states_infra_failure:
+ step_result.presentation.status = self.m.step.EXCEPTION
+ raise self.m.step.InfraFailure(e.name, step_result)
+ raise
+ finally:
+ step_result = self.m.step.active_result
+ # Add log link.
+ self._add_log_links(step_result)
+ return rv
+
+ def collect_swarming_task_isolate_hash(self, swarming_task):
+ """Wait for the given swarming task to finish and return its output hash.
+
+ Args:
+ swarming_task: An instance of swarming.SwarmingTask.
+ Returns:
+ the hash of the isolate output of the task.
+ """
+ res = self.collect_swarming_task(swarming_task)
+ return res.json.output['shards'][0]['isolated_out']['isolated']
+
+ def _add_log_links(self, step_result):
+ """Add Milo log links to all shards in the step."""
+ ids = []
+ shards = step_result.json.output.get('shards')
+ if shards:
+ for shard in shards:
+ ids.append(shard['id'])
+ else:
+ for _, task in step_result.json.output.get('tasks', {}).iteritems():
+ ids.append(task['task_id'])
+ for idx, task_id in enumerate(ids):
+ link = MILO_LOG_LINK % task_id
+ k = 'view steps on Milo'
+ if len(ids) > 1: # pragma: nocover
+ k += ' (shard index %d, %d total)' % (idx, len(ids))
+ step_result.presentation.links[k] = link
+