OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 |
| 6 from recipe_engine import recipe_api |
| 7 import shlex |
| 8 |
| 9 |
| 10 DEFAULT_TASK_EXPIRATION = 20*60*60 |
| 11 DEFAULT_TASK_TIMEOUT = 4*60*60 |
| 12 DEFAULT_IO_TIMEOUT = 40*60 |
| 13 |
| 14 MILO_LOG_LINK = 'https://luci-milo.appspot.com/swarming/task/%s' |
| 15 |
| 16 |
| 17 class SkiaSwarmingApi(recipe_api.RecipeApi): |
| 18 """Provides steps to run Skia tasks on swarming bots.""" |
| 19 |
| 20 @property |
| 21 def swarming_temp_dir(self): |
| 22 """Path where artifacts like isolate file and json output will be stored.""" |
| 23 return self.m.path['slave_build'].join('swarming_temp_dir') |
| 24 |
| 25 @property |
| 26 def tasks_output_dir(self): |
| 27 """Directory where the outputs of the swarming tasks will be stored.""" |
| 28 return self.swarming_temp_dir.join('outputs') |
| 29 |
| 30 def isolated_file_path(self, task_name): |
| 31 """Get the path to the given task's .isolated file.""" |
| 32 return self.swarming_temp_dir.join('skia-task-%s.isolated' % task_name) |
| 33 |
| 34 def setup(self, luci_go_dir, swarming_rev=None): |
| 35 """Performs setup steps for swarming.""" |
| 36 self.m.swarming_client.checkout(revision=swarming_rev) |
| 37 self.m.swarming.check_client_version(step_test_data=(0, 8, 6)) |
| 38 self.setup_go_isolate(luci_go_dir) |
| 39 self.m.swarming.add_default_tag('allow_milo:1') |
| 40 |
| 41 # TODO(rmistry): Remove once the Go binaries are moved to recipes or buildbot. |
| 42 def setup_go_isolate(self, luci_go_dir): |
| 43 """Generates and puts in place the isolate Go binary.""" |
| 44 self.m.step('download luci-go linux', |
| 45 ['download_from_google_storage', '--no_resume', |
| 46 '--platform=linux*', '--no_auth', '--bucket', 'chromium-luci', |
| 47 '-d', luci_go_dir.join('linux64')]) |
| 48 self.m.step('download luci-go mac', |
| 49 ['download_from_google_storage', '--no_resume', |
| 50 '--platform=darwin', '--no_auth', '--bucket', 'chromium-luci', |
| 51 '-d', luci_go_dir.join('mac64')]) |
| 52 self.m.step('download luci-go win', |
| 53 ['download_from_google_storage', '--no_resume', |
| 54 '--platform=win32', '--no_auth', '--bucket', 'chromium-luci', |
| 55 '-d', luci_go_dir.join('win64')]) |
| 56 # Copy binaries to the expected location. |
| 57 dest = self.m.path['slave_build'].join('luci-go') |
| 58 self.m.skia.rmtree(dest) |
| 59 self.m.file.copytree('Copy Go binary', |
| 60 source=luci_go_dir, |
| 61 dest=dest) |
| 62 |
| 63 def isolate_and_trigger_task( |
| 64 self, isolate_path, isolate_base_dir, task_name, isolate_vars, |
| 65 swarm_dimensions, isolate_blacklist=None, extra_isolate_hashes=None, |
| 66 idempotent=False, store_output=True, extra_args=None, expiration=None, |
| 67 hard_timeout=None, io_timeout=None, cipd_packages=None): |
| 68 """Isolate inputs and trigger the task to run.""" |
| 69 os_type = swarm_dimensions.get('os', 'linux') |
| 70 isolated_hash = self.isolate_task( |
| 71 isolate_path, isolate_base_dir, os_type, task_name, isolate_vars, |
| 72 blacklist=isolate_blacklist, extra_hashes=extra_isolate_hashes) |
| 73 tasks = self.trigger_swarming_tasks([(task_name, isolated_hash)], |
| 74 swarm_dimensions, |
| 75 idempotent=idempotent, |
| 76 store_output=store_output, |
| 77 extra_args=extra_args, |
| 78 expiration=expiration, |
| 79 hard_timeout=hard_timeout, |
| 80 io_timeout=io_timeout, |
| 81 cipd_packages=cipd_packages) |
| 82 assert len(tasks) == 1 |
| 83 return tasks[0] |
| 84 |
| 85 def isolate_task(self, isolate_path, base_dir, os_type, task_name, |
| 86 isolate_vars, blacklist=None, extra_hashes=None): |
| 87 """Isolate inputs for the given task.""" |
| 88 self.create_isolated_gen_json(isolate_path, base_dir, os_type, |
| 89 task_name, isolate_vars, |
| 90 blacklist=blacklist) |
| 91 hashes = self.batcharchive([task_name]) |
| 92 assert len(hashes) == 1 |
| 93 isolated_hash = hashes[0][1] |
| 94 if extra_hashes: |
| 95 isolated_hash = self.add_isolated_includes(task_name, extra_hashes) |
| 96 return isolated_hash |
| 97 |
| 98 def create_isolated_gen_json(self, isolate_path, base_dir, os_type, |
| 99 task_name, extra_variables, blacklist=None): |
| 100 """Creates an isolated.gen.json file (used by the isolate recipe module). |
| 101 |
| 102 Args: |
| 103 isolate_path: path obj. Path to the isolate file. |
| 104 base_dir: path obj. Dir that is the base of all paths in the isolate file. |
| 105 os_type: str. The OS type to use when archiving the isolate file. |
| 106 Eg: linux. |
| 107 task_name: str. The isolated.gen.json file will be suffixed by this str. |
| 108 extra_variables: dict of str to str. The extra vars to pass to isolate. |
| 109 Eg: {'SLAVE_NUM': '1', 'MASTER': 'ChromiumPerfFYI'} |
| 110 blacklist: list of regular expressions indicating which files/directories |
| 111 not to archive. |
| 112 """ |
| 113 self.m.file.makedirs('swarming tmp dir', self.swarming_temp_dir) |
| 114 isolated_path = self.isolated_file_path(task_name) |
| 115 isolate_args = [ |
| 116 '--isolate', isolate_path, |
| 117 '--isolated', isolated_path, |
| 118 '--config-variable', 'OS', os_type, |
| 119 ] |
| 120 if blacklist: |
| 121 for b in blacklist: |
| 122 isolate_args.extend(['--blacklist', b]) |
| 123 for k, v in extra_variables.iteritems(): |
| 124 isolate_args.extend(['--extra-variable', k, v]) |
| 125 isolated_gen_dict = { |
| 126 'version': 1, |
| 127 'dir': base_dir, |
| 128 'args': isolate_args, |
| 129 } |
| 130 isolated_gen_json = self.swarming_temp_dir.join( |
| 131 '%s.isolated.gen.json' % task_name) |
| 132 self.m.file.write( |
| 133 'Write %s.isolated.gen.json' % task_name, |
| 134 isolated_gen_json, |
| 135 self.m.json.dumps(isolated_gen_dict, indent=4), |
| 136 ) |
| 137 |
| 138 def batcharchive(self, targets): |
| 139 """Calls batcharchive on the skia.isolated.gen.json file. |
| 140 |
| 141 Args: |
| 142 targets: list of str. The suffixes of the isolated.gen.json files to |
| 143 archive. |
| 144 |
| 145 Returns: |
| 146 list of tuples containing (task_name, swarming_hash). |
| 147 """ |
| 148 return self.m.isolate.isolate_tests( |
| 149 verbose=True, # To avoid no output timeouts. |
| 150 build_dir=self.swarming_temp_dir, |
| 151 targets=targets).presentation.properties['swarm_hashes'].items() |
| 152 |
| 153 def add_isolated_includes(self, task_name, include_hashes): |
| 154 """Add the hashes to the task's .isolated file, return new .isolated hash. |
| 155 |
| 156 Args: |
| 157 task: str. Name of the task to which to add the given hash. |
| 158 include_hashes: list of str. Hashes of the new includes. |
| 159 Returns: |
| 160 Updated hash of the .isolated file. |
| 161 """ |
| 162 isolated_file = self.isolated_file_path(task_name) |
| 163 self.m.python.inline('add_isolated_input', program=""" |
| 164 import json |
| 165 import sys |
| 166 with open(sys.argv[1]) as f: |
| 167 isolated = json.load(f) |
| 168 if not isolated.get('includes'): |
| 169 isolated['includes'] = [] |
| 170 for h in sys.argv[2:]: |
| 171 isolated['includes'].append(h) |
| 172 with open(sys.argv[1], 'w') as f: |
| 173 json.dump(isolated, f, sort_keys=True) |
| 174 """, args=[isolated_file] + include_hashes) |
| 175 isolateserver = self.m.swarming_client.path.join('isolateserver.py') |
| 176 r = self.m.python('upload new .isolated file for %s' % task_name, |
| 177 script=isolateserver, |
| 178 args=['archive', '--isolate-server', |
| 179 self.m.isolate.isolate_server, isolated_file], |
| 180 stdout=self.m.raw_io.output()) |
| 181 return shlex.split(r.stdout)[0] |
| 182 |
| 183 def trigger_swarming_tasks( |
| 184 self, swarm_hashes, dimensions, idempotent=False, store_output=True, |
| 185 extra_args=None, expiration=None, hard_timeout=None, io_timeout=None, |
| 186 cipd_packages=None): |
| 187 """Triggers swarming tasks using swarm hashes. |
| 188 |
| 189 Args: |
| 190 swarm_hashes: list of str. List of swarm hashes from the isolate server. |
| 191 dimensions: dict of str to str. The dimensions to run the task on. |
| 192 Eg: {'os': 'Ubuntu', 'gpu': '10de', 'pool': 'Skia'} |
| 193 idempotent: bool. Whether or not to de-duplicate tasks. |
| 194 store_output: bool. Whether task output should be stored. |
| 195 extra_args: list of str. Extra arguments to pass to the task. |
| 196 expiration: int. Task will expire if not picked up within this time. |
| 197 DEFAULT_TASK_EXPIRATION is used if this argument is None. |
| 198 hard_timeout: int. Task will timeout if not completed within this time. |
| 199 DEFAULT_TASK_TIMEOUT is used if this argument is None. |
| 200 io_timeout: int. Task will timeout if there is no output within this time. |
| 201 DEFAULT_IO_TIMEOUT is used if this argument is None. |
| 202 cipd_packages: CIPD packages which these tasks depend on. |
| 203 |
| 204 Returns: |
| 205 List of swarming.SwarmingTask instances. |
| 206 """ |
| 207 swarming_tasks = [] |
| 208 for task_name, swarm_hash in swarm_hashes: |
| 209 swarming_task = self.m.swarming.task( |
| 210 title=task_name, |
| 211 cipd_packages=cipd_packages, |
| 212 isolated_hash=swarm_hash) |
| 213 if store_output: |
| 214 swarming_task.task_output_dir = self.tasks_output_dir.join(task_name) |
| 215 swarming_task.dimensions = dimensions |
| 216 swarming_task.idempotent = idempotent |
| 217 swarming_task.priority = 90 |
| 218 swarming_task.expiration = ( |
| 219 expiration if expiration else DEFAULT_TASK_EXPIRATION) |
| 220 swarming_task.hard_timeout = ( |
| 221 hard_timeout if hard_timeout else DEFAULT_TASK_TIMEOUT) |
| 222 swarming_task.io_timeout = ( |
| 223 io_timeout if io_timeout else DEFAULT_IO_TIMEOUT) |
| 224 if extra_args: |
| 225 swarming_task.extra_args = extra_args |
| 226 swarming_tasks.append(swarming_task) |
| 227 step_results = self.m.swarming.trigger(swarming_tasks) |
| 228 for step_result in step_results: |
| 229 self._add_log_links(step_result) |
| 230 return swarming_tasks |
| 231 |
| 232 def collect_swarming_task(self, swarming_task): |
| 233 """Collects the specified swarming task. |
| 234 |
| 235 Args: |
| 236 swarming_task: An instance of swarming.SwarmingTask. |
| 237 """ |
| 238 try: |
| 239 rv = self.m.swarming.collect_task(swarming_task) |
| 240 except self.m.step.StepFailure as e: # pragma: no cover |
| 241 step_result = self.m.step.active_result |
| 242 # Change step result to Infra failure if the swarming task failed due to |
| 243 # expiration, time outs, bot crashes or task cancelations. |
| 244 # Infra failures have step.EXCEPTION. |
| 245 states_infra_failure = ( |
| 246 self.m.swarming.State.EXPIRED, self.m.swarming.State.TIMED_OUT, |
| 247 self.m.swarming.State.BOT_DIED, self.m.swarming.State.CANCELED) |
| 248 if step_result.json.output['shards'][0]['state'] in states_infra_failure: |
| 249 step_result.presentation.status = self.m.step.EXCEPTION |
| 250 raise self.m.step.InfraFailure(e.name, step_result) |
| 251 raise |
| 252 finally: |
| 253 step_result = self.m.step.active_result |
| 254 # Add log link. |
| 255 self._add_log_links(step_result) |
| 256 return rv |
| 257 |
| 258 def collect_swarming_task_isolate_hash(self, swarming_task): |
| 259 """Wait for the given swarming task to finish and return its output hash. |
| 260 |
| 261 Args: |
| 262 swarming_task: An instance of swarming.SwarmingTask. |
| 263 Returns: |
| 264 the hash of the isolate output of the task. |
| 265 """ |
| 266 res = self.collect_swarming_task(swarming_task) |
| 267 return res.json.output['shards'][0]['isolated_out']['isolated'] |
| 268 |
| 269 def _add_log_links(self, step_result): |
| 270 """Add Milo log links to all shards in the step.""" |
| 271 ids = [] |
| 272 shards = step_result.json.output.get('shards') |
| 273 if shards: |
| 274 for shard in shards: |
| 275 ids.append(shard['id']) |
| 276 else: |
| 277 for _, task in step_result.json.output.get('tasks', {}).iteritems(): |
| 278 ids.append(task['task_id']) |
| 279 for idx, task_id in enumerate(ids): |
| 280 link = MILO_LOG_LINK % task_id |
| 281 k = 'view steps on Milo' |
| 282 if len(ids) > 1: # pragma: nocover |
| 283 k += ' (shard index %d, %d total)' % (idx, len(ids)) |
| 284 step_result.presentation.links[k] = link |
| 285 |
OLD | NEW |