Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(780)

Side by Side Diff: swarm_client/swarming.py

Issue 69143004: Delete swarm_client. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/
Patch Set: Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « swarm_client/run_isolated.py ('k') | swarm_client/tests/auto_stub.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 """Client tool to trigger tasks or retrieve results from a Swarming server."""
7
8 __version__ = '0.1'
9
10 import hashlib
11 import json
12 import logging
13 import os
14 import shutil
15 import subprocess
16 import sys
17 import time
18 import urllib
19
20 from third_party import colorama
21 from third_party.depot_tools import fix_encoding
22 from third_party.depot_tools import subcommand
23
24 from utils import net
25 from utils import threading_utils
26 from utils import tools
27 from utils import zip_package
28
29 import isolateserver
30 import run_isolated
31
32
33 ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
34 TOOLS_PATH = os.path.join(ROOT_DIR, 'tools')
35
36
37 # The default time to wait for a shard to finish running.
38 DEFAULT_SHARD_WAIT_TIME = 80 * 60.
39
40
41 NO_OUTPUT_FOUND = (
42 'No output produced by the test, it may have failed to run.\n'
43 '\n')
44
45
46 # TODO(maruel): cygwin != Windows. If a swarm_bot is running in cygwin, it's
47 # different from running in native python.
48 PLATFORM_MAPPING_SWARMING = {
49 'cygwin': 'Windows',
50 'darwin': 'Mac',
51 'linux2': 'Linux',
52 'win32': 'Windows',
53 }
54
55 PLATFORM_MAPPING_ISOLATE = {
56 'linux2': 'linux',
57 'darwin': 'mac',
58 'win32': 'win',
59 }
60
61
62 class Failure(Exception):
63 """Generic failure."""
64 pass
65
66
67 class Manifest(object):
68 """Represents a Swarming task manifest.
69
70 Also includes code to zip code and upload itself.
71 """
72 def __init__(
73 self, isolated_hash, test_name, shards, test_filter, slave_os,
74 working_dir, isolate_server, verbose, profile, priority, algo):
75 """Populates a manifest object.
76 Args:
77 isolated_hash - The manifest's sha-1 that the slave is going to fetch.
78 test_name - The name to give the test request.
79 shards - The number of swarm shards to request.
80 test_filter - The gtest filter to apply when running the test.
81 slave_os - OS to run on.
82 working_dir - Relative working directory to start the script.
83 isolate_server - isolate server url.
84 verbose - if True, have the slave print more details.
85 profile - if True, have the slave print more timing data.
86 priority - int between 0 and 1000, lower the higher priority.
87 algo - hashing algorithm used.
88 """
89 self.isolated_hash = isolated_hash
90 self.bundle = zip_package.ZipPackage(ROOT_DIR)
91
92 self._test_name = test_name
93 self._shards = shards
94 self._test_filter = test_filter
95 self._target_platform = slave_os
96 self._working_dir = working_dir
97
98 self.isolate_server = isolate_server
99 self.storage = isolateserver.get_storage(isolate_server, 'default')
100 self.verbose = bool(verbose)
101 self.profile = bool(profile)
102 self.priority = priority
103 self._algo = algo
104
105 self._isolate_item = None
106 self._tasks = []
107
108 def add_task(self, task_name, actions, time_out=600):
109 """Appends a new task to the swarm manifest file."""
110 # See swarming/src/common/test_request_message.py TestObject constructor for
111 # the valid flags.
112 self._tasks.append(
113 {
114 'action': actions,
115 'decorate_output': self.verbose,
116 'test_name': task_name,
117 'time_out': time_out,
118 })
119
120 def zip_and_upload(self):
121 """Zips up all the files necessary to run a shard and uploads to Swarming
122 master.
123 """
124 assert not self._isolate_item
125
126 start_time = time.time()
127 self._isolate_item = isolateserver.BufferItem(
128 self.bundle.zip_into_buffer(), self._algo, is_isolated=True)
129 print 'Zipping completed, time elapsed: %f' % (time.time() - start_time)
130
131 try:
132 start_time = time.time()
133 uploaded = self.storage.upload_items([self._isolate_item])
134 elapsed = time.time() - start_time
135 except (IOError, OSError) as exc:
136 tools.report_error('Failed to upload the zip file: %s' % exc)
137 return False
138
139 if self._isolate_item in uploaded:
140 print 'Upload complete, time elapsed: %f' % elapsed
141 else:
142 print 'Zip file already on server, time elapsed: %f' % elapsed
143
144 return True
145
146 def to_json(self):
147 """Exports the current configuration into a swarm-readable manifest file.
148
149 This function doesn't mutate the object.
150 """
151 test_case = {
152 'test_case_name': self._test_name,
153 'data': [],
154 'tests': self._tasks,
155 # TODO: Let the encoding get set from the command line.
156 'encoding': 'UTF-8',
157 'env_vars': {},
158 'configurations': [
159 {
160 'min_instances': self._shards,
161 'config_name': self._target_platform,
162 'priority': self.priority,
163 'dimensions': {
164 'os': self._target_platform,
165 },
166 },
167 ],
168 'working_dir': self._working_dir,
169 'restart_on_failure': True,
170 'cleanup': 'root',
171 }
172 if self._isolate_item:
173 test_case['data'].append(
174 [
175 self.storage.get_fetch_url(self._isolate_item.digest),
176 'swarm_data.zip',
177 ])
178 # These flags are googletest specific.
179 if self._test_filter and self._test_filter != '*':
180 test_case['env_vars']['GTEST_FILTER'] = self._test_filter
181 if self._shards > 1:
182 test_case['env_vars']['GTEST_SHARD_INDEX'] = '%(instance_index)s'
183 test_case['env_vars']['GTEST_TOTAL_SHARDS'] = '%(num_instances)s'
184
185 return json.dumps(test_case, separators=(',',':'))
186
187
188 def now():
189 """Exists so it can be mocked easily."""
190 return time.time()
191
192
193 def get_test_keys(swarm_base_url, test_name):
194 """Returns the Swarm test key for each shards of test_name."""
195 key_data = urllib.urlencode([('name', test_name)])
196 url = '%s/get_matching_test_cases?%s' % (swarm_base_url, key_data)
197
198 for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS):
199 result = net.url_read(url, retry_404=True)
200 if result is None:
201 raise Failure(
202 'Error: Unable to find any tests with the name, %s, on swarm server'
203 % test_name)
204
205 # TODO(maruel): Compare exact string.
206 if 'No matching' in result:
207 logging.warning('Unable to find any tests with the name, %s, on swarm '
208 'server' % test_name)
209 continue
210 return json.loads(result)
211
212 raise Failure(
213 'Error: Unable to find any tests with the name, %s, on swarm server'
214 % test_name)
215
216
217 def retrieve_results(base_url, test_key, timeout, should_stop):
218 """Retrieves results for a single test_key."""
219 assert isinstance(timeout, float), timeout
220 params = [('r', test_key)]
221 result_url = '%s/get_result?%s' % (base_url, urllib.urlencode(params))
222 start = now()
223 while True:
224 if timeout and (now() - start) >= timeout:
225 logging.error('retrieve_results(%s) timed out', base_url)
226 return {}
227 # Do retries ourselves.
228 response = net.url_read(result_url, retry_404=False, retry_50x=False)
229 if response is None:
230 # Aggressively poll for results. Do not use retry_404 so
231 # should_stop is polled more often.
232 remaining = min(5, timeout - (now() - start)) if timeout else 5
233 if remaining > 0:
234 if should_stop.get():
235 return {}
236 net.sleep_before_retry(1, remaining)
237 else:
238 try:
239 data = json.loads(response) or {}
240 except (ValueError, TypeError):
241 logging.warning(
242 'Received corrupted data for test_key %s. Retrying.', test_key)
243 else:
244 if data['output']:
245 return data
246 if should_stop.get():
247 return {}
248
249
250 def yield_results(swarm_base_url, test_keys, timeout, max_threads):
251 """Yields swarm test results from the swarm server as (index, result).
252
253 Duplicate shards are ignored, the first one to complete is returned.
254
255 max_threads is optional and is used to limit the number of parallel fetches
256 done. Since in general the number of test_keys is in the range <=10, it's not
257 worth normally to limit the number threads. Mostly used for testing purposes.
258 """
259 shards_remaining = range(len(test_keys))
260 number_threads = (
261 min(max_threads, len(test_keys)) if max_threads else len(test_keys))
262 should_stop = threading_utils.Bit()
263 results_remaining = len(test_keys)
264 with threading_utils.ThreadPool(number_threads, number_threads, 0) as pool:
265 try:
266 for test_key in test_keys:
267 pool.add_task(
268 0, retrieve_results, swarm_base_url, test_key, timeout, should_stop)
269 while shards_remaining and results_remaining:
270 result = pool.get_one_result()
271 results_remaining -= 1
272 if not result:
273 # Failed to retrieve one key.
274 logging.error('Failed to retrieve the results for a swarm key')
275 continue
276 shard_index = result['config_instance_index']
277 if shard_index in shards_remaining:
278 shards_remaining.remove(shard_index)
279 yield shard_index, result
280 else:
281 logging.warning('Ignoring duplicate shard index %d', shard_index)
282 # Pop the last entry, there's no such shard.
283 shards_remaining.pop()
284 finally:
285 # Done, kill the remaining threads.
286 should_stop.set()
287
288
289 def chromium_setup(manifest):
290 """Sets up the commands to run.
291
292 Highly chromium specific.
293 """
294 # Add uncompressed zip here. It'll be compressed as part of the package sent
295 # to Swarming server.
296 run_test_name = 'run_isolated.zip'
297 manifest.bundle.add_buffer(run_test_name,
298 run_isolated.get_as_zip_package().zip_into_buffer(compress=False))
299
300 cleanup_script_name = 'swarm_cleanup.py'
301 manifest.bundle.add_file(os.path.join(TOOLS_PATH, cleanup_script_name),
302 cleanup_script_name)
303
304 run_cmd = [
305 'python', run_test_name,
306 '--hash', manifest.isolated_hash,
307 '--isolate-server', manifest.isolate_server,
308 ]
309 if manifest.verbose or manifest.profile:
310 # Have it print the profiling section.
311 run_cmd.append('--verbose')
312 manifest.add_task('Run Test', run_cmd)
313
314 # Clean up
315 manifest.add_task('Clean Up', ['python', cleanup_script_name])
316
317
318 def archive(isolated, isolate_server, os_slave, algo, verbose):
319 """Archives a .isolated and all the dependencies on the CAC."""
320 tempdir = None
321 try:
322 logging.info('archive(%s, %s)', isolated, isolate_server)
323 cmd = [
324 sys.executable,
325 os.path.join(ROOT_DIR, 'isolate.py'),
326 'archive',
327 '--outdir', isolate_server,
328 '--isolated', isolated,
329 '-V', 'OS', PLATFORM_MAPPING_ISOLATE[os_slave],
330 ]
331 cmd.extend(['--verbose'] * verbose)
332 logging.info(' '.join(cmd))
333 if subprocess.call(cmd, verbose):
334 return
335 return isolateserver.hash_file(isolated, algo)
336 finally:
337 if tempdir:
338 shutil.rmtree(tempdir)
339
340
341 def process_manifest(
342 file_hash_or_isolated, test_name, shards, test_filter, slave_os,
343 working_dir, isolate_server, swarming, verbose, profile, priority, algo):
344 """Process the manifest file and send off the swarm test request.
345
346 Optionally archives an .isolated file.
347 """
348 if file_hash_or_isolated.endswith('.isolated'):
349 file_hash = archive(
350 file_hash_or_isolated, isolate_server, slave_os, algo, verbose)
351 if not file_hash:
352 tools.report_error('Archival failure %s' % file_hash_or_isolated)
353 return 1
354 elif isolateserver.is_valid_hash(file_hash_or_isolated, algo):
355 file_hash = file_hash_or_isolated
356 else:
357 tools.report_error('Invalid hash %s' % file_hash_or_isolated)
358 return 1
359
360 try:
361 manifest = Manifest(
362 file_hash,
363 test_name,
364 shards,
365 test_filter,
366 PLATFORM_MAPPING_SWARMING[slave_os],
367 working_dir,
368 isolate_server,
369 verbose,
370 profile,
371 priority,
372 algo)
373 except ValueError as e:
374 tools.report_error('Unable to process %s: %s' % (test_name, e))
375 return 1
376
377 chromium_setup(manifest)
378
379 # Zip up relevant files.
380 print('Zipping up files...')
381 if not manifest.zip_and_upload():
382 return 1
383
384 # Send test requests off to swarm.
385 print('Sending test requests to swarm.')
386 print('Server: %s' % swarming)
387 print('Job name: %s' % test_name)
388 test_url = swarming + '/test'
389 manifest_text = manifest.to_json()
390 result = net.url_read(test_url, data={'request': manifest_text})
391 if not result:
392 tools.report_error(
393 'Failed to send test for %s\n%s' % (test_name, test_url))
394 return 1
395 try:
396 json.loads(result)
397 except (ValueError, TypeError) as e:
398 msg = '\n'.join((
399 'Failed to send test for %s' % test_name,
400 'Manifest: %s' % manifest_text,
401 'Bad response: %s' % result,
402 str(e)))
403 tools.report_error(msg)
404 return 1
405 return 0
406
407
408 def trigger(
409 slave_os,
410 tasks,
411 task_prefix,
412 working_dir,
413 isolate_server,
414 swarming,
415 verbose,
416 profile,
417 priority):
418 """Sends off the hash swarming test requests."""
419 highest_exit_code = 0
420 for (file_hash, test_name, shards, testfilter) in tasks:
421 # TODO(maruel): It should first create a request manifest object, then pass
422 # it to a function to zip, archive and trigger.
423 exit_code = process_manifest(
424 file_hash,
425 task_prefix + test_name,
426 int(shards),
427 testfilter,
428 slave_os,
429 working_dir,
430 isolate_server,
431 swarming,
432 verbose,
433 profile,
434 priority,
435 hashlib.sha1)
436 highest_exit_code = max(highest_exit_code, exit_code)
437 return highest_exit_code
438
439
440 def decorate_shard_output(result, shard_exit_code):
441 """Returns wrapped output for swarming task shard."""
442 tag = 'index %s (machine tag: %s, id: %s)' % (
443 result['config_instance_index'],
444 result['machine_id'],
445 result.get('machine_tag', 'unknown'))
446 return (
447 '\n'
448 '================================================================\n'
449 'Begin output from shard %s\n'
450 '================================================================\n'
451 '\n'
452 '%s'
453 '================================================================\n'
454 'End output from shard %s. Return %d\n'
455 '================================================================\n'
456 ) % (tag, result['output'] or NO_OUTPUT_FOUND, tag, shard_exit_code)
457
458
459 def collect(url, test_name, timeout, decorate):
460 """Retrieves results of a Swarming job."""
461 test_keys = get_test_keys(url, test_name)
462 if not test_keys:
463 raise Failure('No test keys to get results with.')
464
465 exit_code = None
466 for _index, output in yield_results(url, test_keys, timeout, None):
467 shard_exit_codes = (output['exit_codes'] or '1').split(',')
468 shard_exit_code = max(int(i) for i in shard_exit_codes)
469 if decorate:
470 print decorate_shard_output(output, shard_exit_code)
471 else:
472 print(
473 '%s/%s: %s' % (
474 output['machine_id'],
475 output['machine_tag'],
476 output['exit_codes']))
477 print(''.join(' %s\n' % l for l in output['output'].splitlines()))
478 exit_code = exit_code or shard_exit_code
479 return exit_code if exit_code is not None else 1
480
481
482 def add_trigger_options(parser):
483 """Adds all options to trigger a task on Swarming."""
484 parser.add_option(
485 '-I', '--isolate-server',
486 metavar='URL', default='',
487 help='Isolate server to use')
488 parser.add_option(
489 '-w', '--working_dir', default='swarm_tests',
490 help='Working directory on the swarm slave side. default: %default.')
491 parser.add_option(
492 '-o', '--os', default=sys.platform,
493 help='Swarm OS image to request. Should be one of the valid sys.platform '
494 'values like darwin, linux2 or win32 default: %default.')
495 parser.add_option(
496 '-T', '--task-prefix', default='',
497 help='Prefix to give the swarm test request. default: %default')
498 parser.add_option(
499 '--profile', action='store_true',
500 default=bool(os.environ.get('ISOLATE_DEBUG')),
501 help='Have run_isolated.py print profiling info')
502 parser.add_option(
503 '--priority', type='int', default=100,
504 help='The lower value, the more important the task is')
505
506
507 def process_trigger_options(parser, options):
508 options.isolate_server = options.isolate_server.rstrip('/')
509 if not options.isolate_server:
510 parser.error('--isolate-server is required.')
511 if options.os in ('', 'None'):
512 # Use the current OS.
513 options.os = sys.platform
514 if not options.os in PLATFORM_MAPPING_SWARMING:
515 parser.error('Invalid --os option.')
516
517
518 def add_collect_options(parser):
519 parser.add_option(
520 '-t', '--timeout',
521 type='float',
522 default=DEFAULT_SHARD_WAIT_TIME,
523 help='Timeout to wait for result, set to 0 for no timeout; default: '
524 '%default s')
525 parser.add_option('--decorate', action='store_true', help='Decorate output')
526
527
528 @subcommand.usage('test_name')
529 def CMDcollect(parser, args):
530 """Retrieves results of a Swarming job.
531
532 The result can be in multiple part if the execution was sharded. It can
533 potentially have retries.
534 """
535 add_collect_options(parser)
536 (options, args) = parser.parse_args(args)
537 if not args:
538 parser.error('Must specify one test name.')
539 elif len(args) > 1:
540 parser.error('Must specify only one test name.')
541
542 try:
543 return collect(options.swarming, args[0], options.timeout, options.decorate)
544 except Failure as e:
545 tools.report_error(e)
546 return 1
547
548
549 @subcommand.usage('[hash|isolated ...]')
550 def CMDrun(parser, args):
551 """Triggers a job and wait for the results.
552
553 Basically, does everything to run command(s) remotely.
554 """
555 add_trigger_options(parser)
556 add_collect_options(parser)
557 options, args = parser.parse_args(args)
558
559 if not args:
560 parser.error('Must pass at least one .isolated file or its hash (sha1).')
561 process_trigger_options(parser, options)
562
563 success = []
564 for arg in args:
565 logging.info('Triggering %s', arg)
566 try:
567 result = trigger(
568 options.os,
569 [(arg, os.path.basename(arg), '1', '')],
570 options.task_prefix,
571 options.working_dir,
572 options.isolate_server,
573 options.swarming,
574 options.verbose,
575 options.profile,
576 options.priority)
577 except Failure as e:
578 result = e.args[0]
579 if result:
580 tools.report_error('Failed to trigger %s: %s' % (arg, result))
581 else:
582 success.append(os.path.basename(arg))
583
584 if not success:
585 tools.report_error('Failed to trigger any job.')
586 return result
587
588 code = 0
589 for arg in success:
590 logging.info('Collecting %s', arg)
591 try:
592 new_code = collect(
593 options.swarming,
594 options.task_prefix + arg,
595 options.timeout,
596 options.decorate)
597 code = max(code, new_code)
598 except Failure as e:
599 code = max(code, 1)
600 tools.report_error(e)
601 return code
602
603
604 def CMDtrigger(parser, args):
605 """Triggers Swarm request(s).
606
607 Accepts one or multiple --task requests, with either the hash (sha1) of a
608 .isolated file already uploaded or the path to an .isolated file to archive,
609 packages it if needed and sends a Swarm manifest file to the Swarm server.
610 """
611 add_trigger_options(parser)
612 parser.add_option(
613 '--task', nargs=4, action='append', default=[], dest='tasks',
614 help='Task to trigger. The format is '
615 '(hash|isolated, test_name, shards, test_filter). This may be '
616 'used multiple times to send multiple hashes jobs. If an isolated '
617 'file is specified instead of an hash, it is first archived.')
618 (options, args) = parser.parse_args(args)
619
620 if args:
621 parser.error('Unknown args: %s' % args)
622 process_trigger_options(parser, options)
623 if not options.tasks:
624 parser.error('At least one --task is required.')
625
626 try:
627 return trigger(
628 options.os,
629 options.tasks,
630 options.task_prefix,
631 options.working_dir,
632 options.isolate_server,
633 options.swarming,
634 options.verbose,
635 options.profile,
636 options.priority)
637 except Failure as e:
638 tools.report_error(e)
639 return 1
640
641
642 class OptionParserSwarming(tools.OptionParserWithLogging):
643 def __init__(self, **kwargs):
644 tools.OptionParserWithLogging.__init__(
645 self, prog='swarming.py', **kwargs)
646 self.add_option(
647 '-S', '--swarming',
648 metavar='URL', default='',
649 help='Swarming server to use')
650
651 def parse_args(self, *args, **kwargs):
652 options, args = tools.OptionParserWithLogging.parse_args(
653 self, *args, **kwargs)
654 options.swarming = options.swarming.rstrip('/')
655 if not options.swarming:
656 self.error('--swarming is required.')
657 return options, args
658
659
660 def main(args):
661 dispatcher = subcommand.CommandDispatcher(__name__)
662 try:
663 return dispatcher.execute(OptionParserSwarming(version=__version__), args)
664 except Exception as e:
665 tools.report_error(e)
666 return 1
667
668
669 if __name__ == '__main__':
670 fix_encoding.fix_encoding()
671 tools.disable_buffering()
672 colorama.init()
673 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « swarm_client/run_isolated.py ('k') | swarm_client/tests/auto_stub.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698