Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(715)

Side by Side Diff: swarming.py

Issue 22980008: Merge all swarm_*.py scripts into swarming.py. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/swarm_client
Patch Set: Rebase against r219402 Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « swarm_trigger_step.py ('k') | tests/swarm_get_results_smoke_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 """Client tool to trigger tasks or retrieve results from a Swarming server."""
7
8 __version__ = '0.1'
9
10 import hashlib
11 import json
12 import logging
13 import os
14 import re
15 import shutil
16 import StringIO
17 import subprocess
18 import sys
19 import time
20 import urllib
21 import zipfile
22
23 from third_party import colorama
24 from third_party.depot_tools import fix_encoding
25 from third_party.depot_tools import subcommand
26 from utils import tools
27 from utils import threading_utils
28
29 import run_isolated
30
31
32 ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
33 TOOLS_PATH = os.path.join(ROOT_DIR, 'tools')
34
35
36 # Default servers.
37 # TODO(maruel): Chromium-specific.
38 ISOLATE_SERVER = 'https://isolateserver-dev.appspot.com/'
39 SWARM_SERVER = 'https://chromium-swarm-dev.appspot.com'
40
41
42 # The default time to wait for a shard to finish running.
43 DEFAULT_SHARD_WAIT_TIME = 40 * 60.
44
45
46 NO_OUTPUT_FOUND = (
47 'No output produced by the test, it may have failed to run.\n'
48 '\n')
49
50
51 PLATFORM_MAPPING = {
52 'cygwin': 'Windows',
53 'darwin': 'Mac',
54 'linux2': 'Linux',
55 'win32': 'Windows',
56 }
57
58
59 class Failure(Exception):
60 """Generic failure."""
61 pass
62
63
64 class Manifest(object):
65 """Represents a Swarming task manifest.
66
67 Also includes code to zip code and upload itself.
68 """
69 def __init__(
70 self, manifest_hash, test_name, shards, test_filter, slave_os,
71 working_dir, isolate_server, verbose, profile, priority):
72 """Populates a manifest object.
73 Args:
74 manifest_hash - The manifest's sha-1 that the slave is going to fetch.
75 test_name - The name to give the test request.
76 shards - The number of swarm shards to request.
77 test_filter - The gtest filter to apply when running the test.
78 slave_os - OS to run on.
79 working_dir - Relative working directory to start the script.
80 isolate_server - isolate server url.
81 verbose - if True, have the slave print more details.
82 profile - if True, have the slave print more timing data.
83 priority - int between 0 and 1000, lower the higher priority
84 """
85 self.manifest_hash = manifest_hash
86 self._test_name = test_name
87 self._shards = shards
88 self._test_filter = test_filter
89 self._target_platform = PLATFORM_MAPPING[slave_os]
90 self._working_dir = working_dir
91
92 self.data_server_retrieval = isolate_server + '/content/retrieve/default/'
93 self._data_server_storage = isolate_server + '/content/store/default/'
94 self._data_server_has = isolate_server + '/content/contains/default'
95 self._data_server_get_token = isolate_server + '/content/get_token'
96
97 self.verbose = bool(verbose)
98 self.profile = bool(profile)
99 self.priority = priority
100
101 self._zip_file_hash = ''
102 self._tasks = []
103 self._files = {}
104 self._token_cache = None
105
106 def _token(self):
107 if not self._token_cache:
108 result = run_isolated.url_open(self._data_server_get_token)
109 if not result:
110 # TODO(maruel): Implement authentication.
111 raise Failure('Failed to get token, need authentication')
112 # Quote it right away, so creating the urls is simpler.
113 self._token_cache = urllib.quote(result.read())
114 return self._token_cache
115
116 def add_task(self, task_name, actions, time_out=600):
117 """Appends a new task to the swarm manifest file."""
118 # See swarming/src/common/test_request_message.py TestObject constructor for
119 # the valid flags.
120 self._tasks.append(
121 {
122 'action': actions,
123 'decorate_output': self.verbose,
124 'test_name': task_name,
125 'time_out': time_out,
126 })
127
128 def add_file(self, source_path, rel_path):
129 self._files[source_path] = rel_path
130
131 def zip_and_upload(self):
132 """Zips up all the files necessary to run a shard and uploads to Swarming
133 master.
134 """
135 assert not self._zip_file_hash
136 start_time = time.time()
137
138 zip_memory_file = StringIO.StringIO()
139 zip_file = zipfile.ZipFile(zip_memory_file, 'w')
140
141 for source, relpath in self._files.iteritems():
142 zip_file.write(source, relpath)
143
144 zip_file.close()
145 print 'Zipping completed, time elapsed: %f' % (time.time() - start_time)
146
147 zip_memory_file.flush()
148 zip_contents = zip_memory_file.getvalue()
149 zip_memory_file.close()
150
151 self._zip_file_hash = hashlib.sha1(zip_contents).hexdigest()
152
153 response = run_isolated.url_open(
154 self._data_server_has + '?token=%s' % self._token(),
155 data=self._zip_file_hash,
156 content_type='application/octet-stream')
157 if response is None:
158 print >> sys.stderr, (
159 'Unable to query server for zip file presence, aborting.')
160 return False
161
162 if response.read(1) == chr(1):
163 print 'Zip file already on server, no need to reupload.'
164 return True
165
166 print 'Zip file not on server, starting uploading.'
167
168 url = '%s%s?priority=0&token=%s' % (
169 self._data_server_storage, self._zip_file_hash, self._token())
170 response = run_isolated.url_open(
171 url, data=zip_contents, content_type='application/octet-stream')
172 if response is None:
173 print >> sys.stderr, 'Failed to upload the zip file: %s' % url
174 return False
175
176 return True
177
178 def to_json(self):
179 """Exports the current configuration into a swarm-readable manifest file.
180
181 This function doesn't mutate the object.
182 """
183 test_case = {
184 'test_case_name': self._test_name,
185 'data': [
186 [self.data_server_retrieval + urllib.quote(self._zip_file_hash),
187 'swarm_data.zip'],
188 ],
189 'tests': self._tasks,
190 'env_vars': {},
191 'configurations': [
192 {
193 'min_instances': self._shards,
194 'config_name': self._target_platform,
195 'dimensions': {
196 'os': self._target_platform,
197 },
198 },
199 ],
200 'working_dir': self._working_dir,
201 'restart_on_failure': True,
202 'cleanup': 'root',
203 'priority': self.priority,
204 }
205
206 # These flags are googletest specific.
207 if self._test_filter and self._test_filter != '*':
208 test_case['env_vars']['GTEST_FILTER'] = self._test_filter
209 if self._shards > 1:
210 test_case['env_vars']['GTEST_SHARD_INDEX'] = '%(instance_index)s'
211 test_case['env_vars']['GTEST_TOTAL_SHARDS'] = '%(num_instances)s'
212
213 return json.dumps(test_case, separators=(',',':'))
214
215
216 def now():
217 """Exists so it can be mocked easily."""
218 return time.time()
219
220
221 def get_test_keys(swarm_base_url, test_name):
222 """Returns the Swarm test key for each shards of test_name."""
223 key_data = urllib.urlencode([('name', test_name)])
224 url = '%s/get_matching_test_cases?%s' % (swarm_base_url, key_data)
225
226 for i in range(run_isolated.URL_OPEN_MAX_ATTEMPTS):
227 response = run_isolated.url_open(url, retry_404=True)
228 if response is None:
229 raise Failure(
230 'Error: Unable to find any tests with the name, %s, on swarm server'
231 % test_name)
232
233 result = response.read()
234 # TODO(maruel): Compare exact string.
235 if 'No matching' in result:
236 logging.warning('Unable to find any tests with the name, %s, on swarm '
237 'server' % test_name)
238 if i != run_isolated.URL_OPEN_MAX_ATTEMPTS:
239 run_isolated.HttpService.sleep_before_retry(i, None)
240 continue
241 return json.loads(result)
242
243 raise Failure(
244 'Error: Unable to find any tests with the name, %s, on swarm server'
245 % test_name)
246
247
248 def retrieve_results(base_url, test_key, timeout, should_stop):
249 """Retrieves results for a single test_key."""
250 assert isinstance(timeout, float)
251 params = [('r', test_key)]
252 result_url = '%s/get_result?%s' % (base_url, urllib.urlencode(params))
253 start = now()
254 while True:
255 if timeout and (now() - start) >= timeout:
256 logging.error('retrieve_results(%s) timed out', base_url)
257 return {}
258 # Do retries ourselves.
259 response = run_isolated.url_open(
260 result_url, retry_404=False, retry_50x=False)
261 if response is None:
262 # Aggressively poll for results. Do not use retry_404 so
263 # should_stop is polled more often.
264 remaining = min(5, timeout - (now() - start)) if timeout else 5
265 if remaining > 0:
266 run_isolated.HttpService.sleep_before_retry(1, remaining)
267 else:
268 try:
269 data = json.load(response) or {}
270 except (ValueError, TypeError):
271 logging.warning(
272 'Received corrupted data for test_key %s. Retrying.', test_key)
273 else:
274 if data['output']:
275 return data
276 if should_stop.get():
277 return {}
278
279
280 def yield_results(swarm_base_url, test_keys, timeout, max_threads):
281 """Yields swarm test results from the swarm server as (index, result).
282
283 Duplicate shards are ignored, the first one to complete is returned.
284
285 max_threads is optional and is used to limit the number of parallel fetches
286 done. Since in general the number of test_keys is in the range <=10, it's not
287 worth normally to limit the number threads. Mostly used for testing purposes.
288 """
289 shards_remaining = range(len(test_keys))
290 number_threads = (
291 min(max_threads, len(test_keys)) if max_threads else len(test_keys))
292 should_stop = threading_utils.Bit()
293 results_remaining = len(test_keys)
294 with threading_utils.ThreadPool(number_threads, number_threads, 0) as pool:
295 try:
296 for test_key in test_keys:
297 pool.add_task(
298 0, retrieve_results, swarm_base_url, test_key, timeout, should_stop)
299 while shards_remaining and results_remaining:
300 result = pool.get_one_result()
301 results_remaining -= 1
302 if not result:
303 # Failed to retrieve one key.
304 logging.error('Failed to retrieve the results for a swarm key')
305 continue
306 shard_index = result['config_instance_index']
307 if shard_index in shards_remaining:
308 shards_remaining.remove(shard_index)
309 yield shard_index, result
310 else:
311 logging.warning('Ignoring duplicate shard index %d', shard_index)
312 # Pop the last entry, there's no such shard.
313 shards_remaining.pop()
314 finally:
315 # Done, kill the remaining threads.
316 should_stop.set()
317
318
319 def chromium_setup(manifest):
320 """Sets up the commands to run.
321
322 Highly chromium specific.
323 """
324 cleanup_script_name = 'swarm_cleanup.py'
325 cleanup_script_path = os.path.join(TOOLS_PATH, cleanup_script_name)
326 run_test_name = 'run_isolated.py'
327 run_test_path = os.path.join(ROOT_DIR, run_test_name)
328
329 manifest.add_file(run_test_path, run_test_name)
330 manifest.add_file(cleanup_script_path, cleanup_script_name)
331 run_cmd = [
332 'python', run_test_name,
333 '--hash', manifest.manifest_hash,
334 '--remote', manifest.data_server_retrieval.rstrip('/') + '-gzip/',
335 ]
336 if manifest.verbose or manifest.profile:
337 # Have it print the profiling section.
338 run_cmd.append('--verbose')
339 manifest.add_task('Run Test', run_cmd)
340
341 # Clean up
342 manifest.add_task('Clean Up', ['python', cleanup_script_name])
343
344
345 def archive(isolated, isolate_server, verbose):
346 """Archives a .isolated and all the dependencies on the CAC."""
347 tempdir = None
348 try:
349 logging.info('Archiving')
350 cmd = [
351 sys.executable,
352 os.path.join(ROOT_DIR, 'isolate.py'),
353 'hashtable',
354 '--outdir', isolate_server,
355 '--isolated', isolated,
356 ]
357 if verbose:
358 cmd.append('--verbose')
359 logging.info(' '.join(cmd))
360 if subprocess.call(cmd, verbose):
361 return
362 return hashlib.sha1(open(isolated, 'rb').read()).hexdigest()
363 finally:
364 if tempdir:
365 shutil.rmtree(tempdir)
366
367
368 def process_manifest(
369 file_sha1_or_isolated, test_name, shards, test_filter, slave_os,
370 working_dir, isolate_server, swarming, verbose, profile, priority):
371 """Process the manifest file and send off the swarm test request.
372
373 Optionally archives an .isolated file.
374 """
375 if file_sha1_or_isolated.endswith('.isolated'):
376 file_sha1 = archive(file_sha1_or_isolated, isolate_server, verbose)
377 if not file_sha1:
378 print >> sys.stderr, 'Archival failure %s' % file_sha1_or_isolated
379 return 1
380 elif re.match(r'^[a-f0-9]{40}$', file_sha1_or_isolated):
381 file_sha1 = file_sha1_or_isolated
382 else:
383 print >> sys.stderr, 'Invalid hash %s' % file_sha1_or_isolated
384 return 1
385
386 try:
387 manifest = Manifest(
388 file_sha1, test_name, shards, test_filter, slave_os,
389 working_dir, isolate_server, verbose, profile, priority)
390 except ValueError as e:
391 print >> sys.stderr, 'Unable to process %s: %s' % (test_name, e)
392 return 1
393
394 chromium_setup(manifest)
395
396 # Zip up relevant files.
397 print('Zipping up files...')
398 if not manifest.zip_and_upload():
399 return 1
400
401 # Send test requests off to swarm.
402 print('Sending test requests to swarm.')
403 print('Server: %s' % swarming)
404 print('Job name: %s' % test_name)
405 test_url = swarming + '/test'
406 manifest_text = manifest.to_json()
407 result = run_isolated.url_open(test_url, data={'request': manifest_text})
408 if not result:
409 print >> sys.stderr, 'Failed to send test for %s\n%s' % (
410 test_name, test_url)
411 return 1
412 try:
413 json.load(result)
414 except (ValueError, TypeError) as e:
415 print >> sys.stderr, 'Failed to send test for %s' % test_name
416 print >> sys.stderr, 'Manifest: %s' % manifest_text
417 print >> sys.stderr, str(e)
418 return 1
419 return 0
420
421
422 def trigger(
423 slave_os,
424 tasks,
425 task_prefix,
426 working_dir,
427 isolate_server,
428 swarming,
429 verbose,
430 profile,
431 priority):
432 """Sends off the hash swarming test requests."""
433 highest_exit_code = 0
434 for (file_sha1, test_name, shards, testfilter) in tasks:
435 # TODO(maruel): It should first create a request manifest object, then pass
436 # it to a function to zip, archive and trigger.
437 exit_code = process_manifest(
438 file_sha1,
439 task_prefix + test_name,
440 int(shards),
441 testfilter,
442 slave_os,
443 working_dir,
444 isolate_server,
445 swarming,
446 verbose,
447 profile,
448 priority)
449 highest_exit_code = max(highest_exit_code, exit_code)
450 return highest_exit_code
451
452
453 def decorate_shard_output(result, shard_exit_code):
454 """Returns wrapped output for swarming task shard."""
455 tag = 'index %s (machine tag: %s, id: %s)' % (
456 result['config_instance_index'],
457 result['machine_id'],
458 result.get('machine_tag', 'unknown'))
459 return (
460 '\n'
461 '================================================================\n'
462 'Begin output from shard %s\n'
463 '================================================================\n'
464 '\n'
465 '%s'
466 '================================================================\n'
467 'End output from shard %s. Return %d\n'
468 '================================================================\n'
469 ) % (tag, result['output'] or NO_OUTPUT_FOUND, tag, shard_exit_code)
470
471
472 def collect(url, test_name, timeout, decorate):
473 """Retrieves results of a Swarming job."""
474 test_keys = get_test_keys(url, test_name)
475 if not test_keys:
476 raise Failure('No test keys to get results with.')
477
478 exit_code = 0
479 for _index, output in yield_results(url, test_keys, timeout, None):
480 shard_exit_codes = (output['exit_codes'] or '1').split(',')
481 shard_exit_code = max(int(i) for i in shard_exit_codes)
482 if decorate:
483 print decorate_shard_output(output, shard_exit_code)
484 else:
485 print(
486 '%s/%s: %s' % (
487 output['machine_id'],
488 output['machine_tag'],
489 output['exit_codes']))
490 print(''.join(' %s\n' % l for l in output['output'].splitlines()))
491 return exit_code
492
493
494 def add_trigger_options(parser):
495 """Adds all options to trigger a task on Swarming."""
496 parser.add_option(
497 '-I', '--isolate-server',
498 default=ISOLATE_SERVER,
499 metavar='URL',
500 help='Isolate server where data is stored. default: %default')
501 parser.add_option(
502 '-w', '--working_dir', default='swarm_tests',
503 help='Working directory on the swarm slave side. default: %default.')
504 parser.add_option(
505 '-o', '--os', default=sys.platform,
506 help='Swarm OS image to request. Should be one of the valid sys.platform '
507 'values like darwin, linux2 or win32 default: %default.')
508 parser.add_option(
509 '-T', '--task-prefix', default='',
510 help='Prefix to give the swarm test request. default: %default')
511 parser.add_option(
512 '--profile', action='store_true',
513 default=bool(os.environ.get('ISOLATE_DEBUG')),
514 help='Have run_isolated.py print profiling info')
515 parser.add_option(
516 '--priority', type='int', default=100,
517 help='The lower value, the more important the task is')
518
519
520 def process_trigger_options(parser, options):
521 options.isolate_server = options.isolate_server.rstrip('/')
522 if not options.isolate_server:
523 parser.error('--isolate-server is required.')
524 if options.os in ('', 'None'):
525 # Use the current OS.
526 options.os = sys.platform
527 if not options.os in PLATFORM_MAPPING:
528 parser.error('Invalid --os option.')
529
530
531 def add_collect_options(parser):
532 parser.add_option(
533 '-t', '--timeout',
534 type='float',
535 default=DEFAULT_SHARD_WAIT_TIME,
536 help='Timeout to wait for result, set to 0 for no timeout; default: '
537 '%default s')
538 parser.add_option('--decorate', action='store_true', help='Decorate output')
539
540
541 @subcommand.usage('test_name')
542 def CMDcollect(parser, args):
543 """Retrieves results of a Swarming job.
544
545 The result can be in multiple part if the execution was sharded. It can
546 potentially have retries.
547 """
548 add_collect_options(parser)
549 (options, args) = parser.parse_args(args)
550 if not args:
551 parser.error('Must specify one test name.')
552 elif len(args) > 1:
553 parser.error('Must specify only one test name.')
554
555 try:
556 return collect(options.swarming, args[0], options.timeout, options.decorate)
557 except Failure as e:
558 parser.error(e.args[0])
559
560
561 @subcommand.usage('[sha1|isolated ...]')
562 def CMDrun(parser, args):
563 """Triggers a job and wait for the results.
564
565 Basically, does everything to run command(s) remotely.
566 """
567 add_trigger_options(parser)
568 add_collect_options(parser)
569 options, args = parser.parse_args(args)
570
571 if not args:
572 parser.error('Must pass at least one .isolated file or its sha1.')
573 process_trigger_options(parser, options)
574
575 success = []
576 for arg in args:
577 logging.info('Triggering %s', arg)
578 try:
579 result = trigger(
580 options.os,
581 [(arg, os.path.basename(arg), '1', '')],
582 options.task_prefix,
583 options.working_dir,
584 options.isolate_server,
585 options.swarming,
586 options.verbose,
587 options.profile,
588 options.priority)
589 except Failure as e:
590 result = e.args[0]
591 if result:
592 print >> sys.stderr, 'Failed to trigger %s: %s' % (arg, result)
593 else:
594 success.append(os.path.basename(arg))
595
596 if not success:
597 print >> sys.stderr, 'Failed to trigger any job.'
598 return result
599
600 code = 0
601 for arg in success:
602 logging.info('Collecting %s', arg)
603 try:
604 new_code = collect(
605 options.swarming,
606 options.task_prefix + arg,
607 options.timeout,
608 options.decorate)
609 code = max(code, new_code)
610 except Failure as e:
611 code = max(code, 1)
612 print >> sys.stderr, e.args[0]
613 return code
614
615
616 def CMDtrigger(parser, args):
617 """Triggers Swarm request(s).
618
619 Accepts one or multiple --task requests, with either the sha1 of a .isolated
620 file already uploaded or the path to an .isolated file to archive, packages it
621 if needed and sends a Swarm manifest file to the Swarm server.
622 """
623 add_trigger_options(parser)
624 parser.add_option(
625 '--task', nargs=4, action='append', default=[], dest='tasks',
626 help='Task to trigger. The format is '
627 '(hash|isolated, test_name, shards, test_filter). This may be '
628 'used multiple times to send multiple hashes jobs. If an isolated '
629 'file is specified instead of an hash, it is first archived.')
630 (options, args) = parser.parse_args(args)
631
632 if args:
633 parser.error('Unknown args: %s' % args)
634 process_trigger_options(parser, options)
635 if not options.tasks:
636 parser.error('At least one --task is required.')
637
638 try:
639 return trigger(
640 options.os,
641 options.tasks,
642 options.task_prefix,
643 options.working_dir,
644 options.isolate_server,
645 options.swarming,
646 options.verbose,
647 options.profile,
648 options.priority)
649 except Failure as e:
650 parser.error(e.args[0])
651
652
653 class OptionParserSwarming(tools.OptionParserWithLogging):
654 def __init__(self, **kwargs):
655 tools.OptionParserWithLogging.__init__(
656 self, prog='swarming.py', **kwargs)
657 self.add_option(
658 '-S', '--swarming', default=SWARM_SERVER,
659 help='Specify the url of the Swarming server, default: %default')
660
661 def parse_args(self, *args, **kwargs):
662 options, args = tools.OptionParserWithLogging.parse_args(
663 self, *args, **kwargs)
664 options.swarming = options.swarming.rstrip('/')
665 if not options.swarming:
666 self.error('--swarming is required.')
667 return options, args
668
669
670 def main(args):
671 dispatcher = subcommand.CommandDispatcher(__name__)
672 try:
673 return dispatcher.execute(OptionParserSwarming(version=__version__), args)
674 except (
675 Failure,
676 run_isolated.MappingError,
677 run_isolated.ConfigError) as e:
678 sys.stderr.write('\nError: ')
679 sys.stderr.write(str(e))
680 sys.stderr.write('\n')
681 return 1
682
683
684 if __name__ == '__main__':
685 fix_encoding.fix_encoding()
686 tools.disable_buffering()
687 colorama.init()
688 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « swarm_trigger_step.py ('k') | tests/swarm_get_results_smoke_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698