OLD | NEW |
| (Empty) |
1 # Copyright 2016 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 | |
6 from recipe_engine import recipe_api | |
7 import shlex | |
8 | |
9 | |
10 DEFAULT_TASK_EXPIRATION = 20*60*60 | |
11 DEFAULT_TASK_TIMEOUT = 4*60*60 | |
12 DEFAULT_IO_TIMEOUT = 40*60 | |
13 | |
14 MILO_LOG_LINK = 'https://luci-milo.appspot.com/swarming/task/%s' | |
15 | |
16 | |
17 class SkiaSwarmingApi(recipe_api.RecipeApi): | |
18 """Provides steps to run Skia tasks on swarming bots.""" | |
19 | |
20 @property | |
21 def swarming_temp_dir(self): | |
22 """Path where artifacts like isolate file and json output will be stored.""" | |
23 return self.m.path['slave_build'].join('swarming_temp_dir') | |
24 | |
25 @property | |
26 def tasks_output_dir(self): | |
27 """Directory where the outputs of the swarming tasks will be stored.""" | |
28 return self.swarming_temp_dir.join('outputs') | |
29 | |
30 def isolated_file_path(self, task_name): | |
31 """Get the path to the given task's .isolated file.""" | |
32 return self.swarming_temp_dir.join('skia-task-%s.isolated' % task_name) | |
33 | |
34 def setup(self, luci_go_dir, swarming_rev=None): | |
35 """Performs setup steps for swarming.""" | |
36 self.m.swarming_client.checkout(revision=swarming_rev) | |
37 self.m.swarming.check_client_version(step_test_data=(0, 8, 6)) | |
38 self.setup_go_isolate(luci_go_dir) | |
39 self.m.swarming.add_default_tag('allow_milo:1') | |
40 | |
41 # TODO(rmistry): Remove once the Go binaries are moved to recipes or buildbot. | |
42 def setup_go_isolate(self, luci_go_dir): | |
43 """Generates and puts in place the isolate Go binary.""" | |
44 depot_tools_path = self.m.depot_tools.package_repo_resource() | |
45 env = {'PATH': self.m.path.pathsep.join([ | |
46 str(depot_tools_path), '%(PATH)s'])} | |
47 self.m.step('download luci-go linux', | |
48 ['download_from_google_storage', '--no_resume', | |
49 '--platform=linux*', '--no_auth', '--bucket', 'chromium-luci', | |
50 '-d', luci_go_dir.join('linux64')], | |
51 env=env) | |
52 self.m.step('download luci-go mac', | |
53 ['download_from_google_storage', '--no_resume', | |
54 '--platform=darwin', '--no_auth', '--bucket', 'chromium-luci', | |
55 '-d', luci_go_dir.join('mac64')], | |
56 env=env) | |
57 self.m.step('download luci-go win', | |
58 ['download_from_google_storage', '--no_resume', | |
59 '--platform=win32', '--no_auth', '--bucket', 'chromium-luci', | |
60 '-d', luci_go_dir.join('win64')], | |
61 env=env) | |
62 # Copy binaries to the expected location. | |
63 dest = self.m.path['slave_build'].join('luci-go') | |
64 self.m.skia.rmtree(dest) | |
65 self.m.file.copytree('Copy Go binary', | |
66 source=luci_go_dir, | |
67 dest=dest) | |
68 | |
69 def isolate_and_trigger_task( | |
70 self, isolate_path, isolate_base_dir, task_name, isolate_vars, | |
71 swarm_dimensions, isolate_blacklist=None, extra_isolate_hashes=None, | |
72 idempotent=False, store_output=True, extra_args=None, expiration=None, | |
73 hard_timeout=None, io_timeout=None, cipd_packages=None): | |
74 """Isolate inputs and trigger the task to run.""" | |
75 os_type = swarm_dimensions.get('os', 'linux') | |
76 isolated_hash = self.isolate_task( | |
77 isolate_path, isolate_base_dir, os_type, task_name, isolate_vars, | |
78 blacklist=isolate_blacklist, extra_hashes=extra_isolate_hashes) | |
79 tasks = self.trigger_swarming_tasks([(task_name, isolated_hash)], | |
80 swarm_dimensions, | |
81 idempotent=idempotent, | |
82 store_output=store_output, | |
83 extra_args=extra_args, | |
84 expiration=expiration, | |
85 hard_timeout=hard_timeout, | |
86 io_timeout=io_timeout, | |
87 cipd_packages=cipd_packages) | |
88 assert len(tasks) == 1 | |
89 return tasks[0] | |
90 | |
91 def isolate_task(self, isolate_path, base_dir, os_type, task_name, | |
92 isolate_vars, blacklist=None, extra_hashes=None): | |
93 """Isolate inputs for the given task.""" | |
94 self.create_isolated_gen_json(isolate_path, base_dir, os_type, | |
95 task_name, isolate_vars, | |
96 blacklist=blacklist) | |
97 hashes = self.batcharchive([task_name]) | |
98 assert len(hashes) == 1 | |
99 isolated_hash = hashes[0][1] | |
100 if extra_hashes: | |
101 isolated_hash = self.add_isolated_includes(task_name, extra_hashes) | |
102 return isolated_hash | |
103 | |
104 def create_isolated_gen_json(self, isolate_path, base_dir, os_type, | |
105 task_name, extra_variables, blacklist=None): | |
106 """Creates an isolated.gen.json file (used by the isolate recipe module). | |
107 | |
108 Args: | |
109 isolate_path: path obj. Path to the isolate file. | |
110 base_dir: path obj. Dir that is the base of all paths in the isolate file. | |
111 os_type: str. The OS type to use when archiving the isolate file. | |
112 Eg: linux. | |
113 task_name: str. The isolated.gen.json file will be suffixed by this str. | |
114 extra_variables: dict of str to str. The extra vars to pass to isolate. | |
115 Eg: {'SLAVE_NUM': '1', 'MASTER': 'ChromiumPerfFYI'} | |
116 blacklist: list of regular expressions indicating which files/directories | |
117 not to archive. | |
118 """ | |
119 self.m.file.makedirs('swarming tmp dir', self.swarming_temp_dir) | |
120 isolated_path = self.isolated_file_path(task_name) | |
121 isolate_args = [ | |
122 '--isolate', isolate_path, | |
123 '--isolated', isolated_path, | |
124 '--config-variable', 'OS', os_type, | |
125 ] | |
126 if blacklist: | |
127 for b in blacklist: | |
128 isolate_args.extend(['--blacklist', b]) | |
129 for k, v in extra_variables.iteritems(): | |
130 isolate_args.extend(['--extra-variable', k, v]) | |
131 isolated_gen_dict = { | |
132 'version': 1, | |
133 'dir': base_dir, | |
134 'args': isolate_args, | |
135 } | |
136 isolated_gen_json = self.swarming_temp_dir.join( | |
137 '%s.isolated.gen.json' % task_name) | |
138 self.m.file.write( | |
139 'Write %s.isolated.gen.json' % task_name, | |
140 isolated_gen_json, | |
141 self.m.json.dumps(isolated_gen_dict, indent=4), | |
142 ) | |
143 | |
144 def batcharchive(self, targets): | |
145 """Calls batcharchive on the skia.isolated.gen.json file. | |
146 | |
147 Args: | |
148 targets: list of str. The suffixes of the isolated.gen.json files to | |
149 archive. | |
150 | |
151 Returns: | |
152 list of tuples containing (task_name, swarming_hash). | |
153 """ | |
154 return self.m.isolate.isolate_tests( | |
155 verbose=True, # To avoid no output timeouts. | |
156 build_dir=self.swarming_temp_dir, | |
157 targets=targets).presentation.properties['swarm_hashes'].items() | |
158 | |
159 def add_isolated_includes(self, task_name, include_hashes): | |
160 """Add the hashes to the task's .isolated file, return new .isolated hash. | |
161 | |
162 Args: | |
163 task: str. Name of the task to which to add the given hash. | |
164 include_hashes: list of str. Hashes of the new includes. | |
165 Returns: | |
166 Updated hash of the .isolated file. | |
167 """ | |
168 isolated_file = self.isolated_file_path(task_name) | |
169 self.m.python.inline('add_isolated_input', program=""" | |
170 import json | |
171 import sys | |
172 with open(sys.argv[1]) as f: | |
173 isolated = json.load(f) | |
174 if not isolated.get('includes'): | |
175 isolated['includes'] = [] | |
176 for h in sys.argv[2:]: | |
177 isolated['includes'].append(h) | |
178 with open(sys.argv[1], 'w') as f: | |
179 json.dump(isolated, f, sort_keys=True) | |
180 """, args=[isolated_file] + include_hashes) | |
181 isolateserver = self.m.swarming_client.path.join('isolateserver.py') | |
182 r = self.m.python('upload new .isolated file for %s' % task_name, | |
183 script=isolateserver, | |
184 args=['archive', '--isolate-server', | |
185 self.m.isolate.isolate_server, isolated_file], | |
186 stdout=self.m.raw_io.output()) | |
187 return shlex.split(r.stdout)[0] | |
188 | |
189 def trigger_swarming_tasks( | |
190 self, swarm_hashes, dimensions, idempotent=False, store_output=True, | |
191 extra_args=None, expiration=None, hard_timeout=None, io_timeout=None, | |
192 cipd_packages=None): | |
193 """Triggers swarming tasks using swarm hashes. | |
194 | |
195 Args: | |
196 swarm_hashes: list of str. List of swarm hashes from the isolate server. | |
197 dimensions: dict of str to str. The dimensions to run the task on. | |
198 Eg: {'os': 'Ubuntu', 'gpu': '10de', 'pool': 'Skia'} | |
199 idempotent: bool. Whether or not to de-duplicate tasks. | |
200 store_output: bool. Whether task output should be stored. | |
201 extra_args: list of str. Extra arguments to pass to the task. | |
202 expiration: int. Task will expire if not picked up within this time. | |
203 DEFAULT_TASK_EXPIRATION is used if this argument is None. | |
204 hard_timeout: int. Task will timeout if not completed within this time. | |
205 DEFAULT_TASK_TIMEOUT is used if this argument is None. | |
206 io_timeout: int. Task will timeout if there is no output within this time. | |
207 DEFAULT_IO_TIMEOUT is used if this argument is None. | |
208 cipd_packages: CIPD packages which these tasks depend on. | |
209 | |
210 Returns: | |
211 List of swarming.SwarmingTask instances. | |
212 """ | |
213 swarming_tasks = [] | |
214 for task_name, swarm_hash in swarm_hashes: | |
215 swarming_task = self.m.swarming.task( | |
216 title=task_name, | |
217 cipd_packages=cipd_packages, | |
218 isolated_hash=swarm_hash) | |
219 if store_output: | |
220 swarming_task.task_output_dir = self.tasks_output_dir.join(task_name) | |
221 swarming_task.dimensions = dimensions | |
222 swarming_task.idempotent = idempotent | |
223 swarming_task.priority = 90 | |
224 swarming_task.expiration = ( | |
225 expiration if expiration else DEFAULT_TASK_EXPIRATION) | |
226 swarming_task.hard_timeout = ( | |
227 hard_timeout if hard_timeout else DEFAULT_TASK_TIMEOUT) | |
228 swarming_task.io_timeout = ( | |
229 io_timeout if io_timeout else DEFAULT_IO_TIMEOUT) | |
230 if extra_args: | |
231 swarming_task.extra_args = extra_args | |
232 revision = self.m.properties.get('revision') | |
233 if revision: | |
234 swarming_task.tags.add('revision:%s' % revision) | |
235 swarming_tasks.append(swarming_task) | |
236 step_results = self.m.swarming.trigger(swarming_tasks) | |
237 for step_result in step_results: | |
238 self._add_log_links(step_result) | |
239 return swarming_tasks | |
240 | |
241 def collect_swarming_task(self, swarming_task): | |
242 """Collects the specified swarming task. | |
243 | |
244 Args: | |
245 swarming_task: An instance of swarming.SwarmingTask. | |
246 """ | |
247 try: | |
248 rv = self.m.swarming.collect_task(swarming_task) | |
249 except self.m.step.StepFailure as e: # pragma: no cover | |
250 step_result = self.m.step.active_result | |
251 # Change step result to Infra failure if the swarming task failed due to | |
252 # expiration, time outs, bot crashes or task cancelations. | |
253 # Infra failures have step.EXCEPTION. | |
254 states_infra_failure = ( | |
255 self.m.swarming.State.EXPIRED, self.m.swarming.State.TIMED_OUT, | |
256 self.m.swarming.State.BOT_DIED, self.m.swarming.State.CANCELED) | |
257 if step_result.json.output['shards'][0]['state'] in states_infra_failure: | |
258 step_result.presentation.status = self.m.step.EXCEPTION | |
259 raise self.m.step.InfraFailure(e.name, step_result) | |
260 raise | |
261 finally: | |
262 step_result = self.m.step.active_result | |
263 # Add log link. | |
264 self._add_log_links(step_result) | |
265 return rv | |
266 | |
267 def collect_swarming_task_isolate_hash(self, swarming_task): | |
268 """Wait for the given swarming task to finish and return its output hash. | |
269 | |
270 Args: | |
271 swarming_task: An instance of swarming.SwarmingTask. | |
272 Returns: | |
273 the hash of the isolate output of the task. | |
274 """ | |
275 res = self.collect_swarming_task(swarming_task) | |
276 return res.json.output['shards'][0]['isolated_out']['isolated'] | |
277 | |
278 def _add_log_links(self, step_result): | |
279 """Add Milo log links to all shards in the step.""" | |
280 ids = [] | |
281 shards = step_result.json.output.get('shards') | |
282 if shards: | |
283 for shard in shards: | |
284 ids.append(shard['id']) | |
285 else: | |
286 for _, task in step_result.json.output.get('tasks', {}).iteritems(): | |
287 ids.append(task['task_id']) | |
288 for idx, task_id in enumerate(ids): | |
289 link = MILO_LOG_LINK % task_id | |
290 k = 'view steps on Milo' | |
291 if len(ids) > 1: # pragma: nocover | |
292 k += ' (shard index %d, %d total)' % (idx, len(ids)) | |
293 step_result.presentation.links[k] = link | |
294 | |
OLD | NEW |