Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(429)

Side by Side Diff: swarming.py

Issue 22980008: Merge all swarm_*.py scripts into swarming.py. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/swarm_client
Patch Set: Improvement, still not done Created 7 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 import hashlib
7 import json
8 import logging
9 import os
10 import re
11 import shutil
12 import StringIO
13 import subprocess
14 import sys
15 import threading
16 import time
17 import urllib
18 import zipfile
19
20 from third_party.depot_tools import fix_encoding
21 from third_party.depot_tools import subcommand
22
23 import run_isolated
24 import trace_inputs
25
26
27 ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
28 TOOLS_PATH = os.path.join(ROOT_DIR, 'tools')
29
30
31 # Default servers.
32 # TODO(maruel): Chromium-specific.
33 ISOLATE_SERVER = 'https://isolateserver-dev.appspot.com/'
34 SWARM_SERVER = 'https://chromium-swarm-dev.appspot.com'
35
36
37 # The default time to wait for a shard to finish running.
38 DEFAULT_SHARD_WAIT_TIME = 40 * 60.
39
40
41 PLATFORM_MAPPING = {
42 'cygwin': 'Windows',
43 'darwin': 'Mac',
44 'linux2': 'Linux',
45 'win32': 'Windows',
46 }
47
48
49 class Failure(Exception):
50 """Generic failure."""
51 pass
52
53
54 class Manifest(object):
55 """Represents a Swarming task manifest.
56
57 Also includes code to zip code and upload itself.
58 """
59 def __init__(
60 self, manifest_hash, test_name, shards, test_filter, os_image,
61 working_dir, cac, verbose, profile, priority):
62 """Populates a manifest object.
63 Args:
64 manifest_hash - The manifest's sha-1 that the slave is going to fetch.
65 test_name - The name to give the test request.
66 shards - The number of swarm shards to request.
67 test_filter - The gtest filter to apply when running the test.
68 os_image - OS to run on.
69 working_dir - Relative working directory to start the script.
70 cac - isolate server url.
71 verbose - if True, have the slave print more details.
72 profile - if True, have the slave print more timing data.
73 priority - int between 0 and 1000, lower the higher priority
74 """
75 self.manifest_hash = manifest_hash
76 self._test_name = test_name
77 self._shards = shards
78 self._test_filter = test_filter
79 self._target_platform = PLATFORM_MAPPING[os_image]
80 self._working_dir = working_dir
81
82 base_url = cac.rstrip('/')
83 self.data_server_retrieval = base_url + '/content/retrieve/default/'
84 self._data_server_storage = base_url + '/content/store/default/'
85 self._data_server_has = base_url + '/content/contains/default'
86 self._data_server_get_token = base_url + '/content/get_token'
87
88 self.verbose = bool(verbose)
89 self.profile = bool(profile)
90 self.priority = priority
91
92 self._zip_file_hash = ''
93 self._tasks = []
94 self._files = {}
95 self._token_cache = None
96
97 def _token(self):
98 if not self._token_cache:
99 result = run_isolated.url_open(self._data_server_get_token)
100 if not result:
101 # TODO(maruel): Implement authentication.
102 raise Failure('Failed to get token, need authentication')
103 # Quote it right away, so creating the urls is simpler.
104 self._token_cache = urllib.quote(result.read())
105 return self._token_cache
106
107 def add_task(self, task_name, actions, time_out=600):
108 """Appends a new task to the swarm manifest file."""
109 # See swarming/src/common/test_request_message.py TestObject constructor for
110 # the valid flags.
111 self._tasks.append(
112 {
113 'action': actions,
114 'decorate_output': self.verbose,
115 'test_name': task_name,
116 'time_out': time_out,
117 })
118
119 def add_file(self, source_path, rel_path):
120 self._files[source_path] = rel_path
121
122 def zip_and_upload(self):
123 """Zips up all the files necessary to run a shard and uploads to Swarming
124 master.
125 """
126 assert not self._zip_file_hash
127 start_time = time.time()
128
129 zip_memory_file = StringIO.StringIO()
130 zip_file = zipfile.ZipFile(zip_memory_file, 'w')
131
132 for source, relpath in self._files.iteritems():
133 zip_file.write(source, relpath)
134
135 zip_file.close()
136 print 'Zipping completed, time elapsed: %f' % (time.time() - start_time)
137
138 zip_memory_file.flush()
139 zip_contents = zip_memory_file.getvalue()
140 zip_memory_file.close()
141
142 self._zip_file_hash = hashlib.sha1(zip_contents).hexdigest()
143
144 response = run_isolated.url_open(
145 self._data_server_has + '?token=%s' % self._token(),
146 data=self._zip_file_hash,
147 content_type='application/octet-stream')
148 if response is None:
149 print >> sys.stderr, (
150 'Unable to query server for zip file presence, aborting.')
151 return False
152
153 if response.read(1) == chr(1):
154 print 'Zip file already on server, no need to reupload.'
155 return True
156
157 print 'Zip file not on server, starting uploading.'
158
159 url = '%s%s?priority=0&token=%s' % (
160 self._data_server_storage, self._zip_file_hash, self._token())
161 response = run_isolated.url_open(
162 url, data=zip_contents, content_type='application/octet-stream')
163 if response is None:
164 print >> sys.stderr, 'Failed to upload the zip file: %s' % url
165 return False
166
167 return True
168
169 def to_json(self):
170 """Exports the current configuration into a swarm-readable manifest file.
171
172 This function doesn't mutate the object.
173 """
174 test_case = {
175 'test_case_name': self._test_name,
176 'data': [
177 [self.data_server_retrieval + urllib.quote(self._zip_file_hash),
178 'swarm_data.zip'],
179 ],
180 'tests': self._tasks,
181 'env_vars': {},
182 'configurations': [
183 {
184 'min_instances': self._shards,
185 'config_name': self._target_platform,
186 'dimensions': {
187 'os': self._target_platform,
188 },
189 },
190 ],
191 'working_dir': self._working_dir,
192 'restart_on_failure': True,
193 'cleanup': 'root',
194 'priority': self.priority,
195 }
196
197 # These flags are googletest specific.
198 if self._test_filter and self._test_filter != '*':
199 test_case['env_vars']['GTEST_FILTER'] = self._test_filter
200 if self._shards > 1:
201 test_case['env_vars']['GTEST_SHARD_INDEX'] = '%(instance_index)s'
202 test_case['env_vars']['GTEST_TOTAL_SHARDS'] = '%(num_instances)s'
203
204 return json.dumps(test_case, separators=(',',':'))
205
206
207 class Bit(object):
208 """Thread safe setable bit."""
209 _lock = threading.Lock()
210 _value = False
211
212 def get(self):
213 with self._lock:
214 return self._value
215
216 def set(self):
217 with self._lock:
218 self._value = True
219
220
221 def now():
222 """Exists so it can be mocked easily."""
223 return time.time()
224
225
226 def get_test_keys(swarm_base_url, test_name):
227 """Returns the Swarm test key for each shards of test_name."""
228 key_data = urllib.urlencode([('name', test_name)])
229 url = '%s/get_matching_test_cases?%s' % (swarm_base_url, key_data)
230
231 for i in range(run_isolated.URL_OPEN_MAX_ATTEMPTS):
232 response = run_isolated.url_open(url, retry_404=True)
233 if response is None:
234 raise Failure(
235 'Error: Unable to find any tests with the name, %s, on swarm server'
236 % test_name)
237
238 result = response.read()
239 # TODO(maruel): Compare exact string.
240 if 'No matching' in result:
241 logging.warning('Unable to find any tests with the name, %s, on swarm '
242 'server' % test_name)
243 if i != run_isolated.URL_OPEN_MAX_ATTEMPTS:
244 run_isolated.HttpService.sleep_before_retry(i, None)
245 continue
246 return json.loads(result)
247
248 raise Failure(
249 'Error: Unable to find any tests with the name, %s, on swarm server'
250 % test_name)
251
252
253 def retrieve_results(base_url, test_key, timeout, should_stop):
254 """Retrieves results for a single test_key."""
255 assert isinstance(timeout, float)
256 params = [('r', test_key)]
257 result_url = '%s/get_result?%s' % (base_url, urllib.urlencode(params))
258 start = now()
259 while True:
260 if timeout and (now() - start) >= timeout:
261 logging.error('retrieve_results(%s) timed out', base_url)
262 return {}
263 # Do retries ourselves.
264 response = run_isolated.url_open(
265 result_url, retry_404=False, retry_50x=False)
266 if response is None:
267 # Aggressively poll for results. Do not use retry_404 so
268 # should_stop is polled more often.
269 remaining = min(5, timeout - (now() - start)) if timeout else 5
270 if remaining > 0:
271 run_isolated.HttpService.sleep_before_retry(1, remaining)
272 else:
273 try:
274 data = json.load(response) or {}
275 except (ValueError, TypeError):
276 logging.warning(
277 'Received corrupted data for test_key %s. Retrying.', test_key)
278 else:
279 if data['output']:
280 return data
281 if should_stop.get():
282 return {}
283
284
285 def yield_results(swarm_base_url, test_keys, timeout, max_threads):
286 """Yields swarm test results from the swarm server as (index, result).
287
288 Duplicate shards are ignored, the first one to complete is returned.
289
290 max_threads is optional and is used to limit the number of parallel fetches
291 done. Since in general the number of test_keys is in the range <=10, it's not
292 worth normally to limit the number threads. Mostly used for testing purposes.
293 """
294 shards_remaining = range(len(test_keys))
295 number_threads = (
296 min(max_threads, len(test_keys)) if max_threads else len(test_keys))
297 should_stop = Bit()
298 results_remaining = len(test_keys)
299 with run_isolated.ThreadPool(number_threads, number_threads, 0) as pool:
300 try:
301 for test_key in test_keys:
302 pool.add_task(
303 0, retrieve_results, swarm_base_url, test_key, timeout, should_stop)
304 while shards_remaining and results_remaining:
305 result = pool.get_one_result()
306 results_remaining -= 1
307 if not result:
308 # Failed to retrieve one key.
309 logging.error('Failed to retrieve the results for a swarm key')
310 continue
311 shard_index = result['config_instance_index']
312 if shard_index in shards_remaining:
313 shards_remaining.remove(shard_index)
314 yield shard_index, result
315 else:
316 logging.warning('Ignoring duplicate shard index %d', shard_index)
317 # Pop the last entry, there's no such shard.
318 shards_remaining.pop()
319 finally:
320 # Done, kill the remaining threads.
321 should_stop.set()
322
323
324 def chromium_setup(manifest):
325 """Sets up the commands to run.
326
327 Highly chromium specific.
328 """
329 cleanup_script_name = 'swarm_cleanup.py'
330 cleanup_script_path = os.path.join(TOOLS_PATH, cleanup_script_name)
331 run_test_name = 'run_isolated.py'
332 run_test_path = os.path.join(ROOT_DIR, run_test_name)
333
334 manifest.add_file(run_test_path, run_test_name)
335 manifest.add_file(cleanup_script_path, cleanup_script_name)
336 run_cmd = [
337 'python', run_test_name,
338 '--hash', manifest.manifest_hash,
339 '--remote', manifest.data_server_retrieval.rstrip('/') + '-gzip/',
340 ]
341 if manifest.verbose or manifest.profile:
342 # Have it print the profiling section.
343 run_cmd.append('--verbose')
344 manifest.add_task('Run Test', run_cmd)
345
346 # Clean up
347 manifest.add_task('Clean Up', ['python', cleanup_script_name])
348
349
350 def archive(isolated, cac, verbose):
351 """Archives a .isolated and all the dependencies on the CAC."""
352 tempdir = None
353 try:
354 logging.info('Archiving')
355 cmd = [
356 'isolate.py',
357 'hashtable',
358 '--outdir', cac,
359 '--isolated', isolated,
360 ]
361 if verbose:
362 cmd.append('--verbose')
363 if subprocess.call(cmd, verbose):
364 return
365 return hashlib.sha1(open(isolated, 'rb').read()).hexdigest()
366 finally:
367 if tempdir:
368 shutil.rmtree(tempdir)
369
370
371 def process_manifest(
372 file_sha1_or_isolated, test_name, shards, test_filter, os_image,
373 working_dir, cac, swarm_url, verbose, profile, priority):
374 """Process the manifest file and send off the swarm test request.
375
376 Optionally archives an .isolated file.
377 """
378 if file_sha1_or_isolated.endswith('.isolated'):
379 file_sha1 = archive(file_sha1_or_isolated, swarm_url, verbose)
380 if not file_sha1:
381 print >> sys.stderr, 'Archival failure %s' % file_sha1_or_isolated
382 return 1
383 elif re.match(r'^[a-f0-9]{40}$', file_sha1_or_isolated):
384 file_sha1 = file_sha1_or_isolated
385 else:
386 print >> sys.stderr, 'Invalid hash %s' % file_sha1_or_isolated
387 return 1
388
389 try:
390 manifest = Manifest(
391 file_sha1, test_name, shards, test_filter, os_image,
392 working_dir, cac, verbose, profile, priority)
393 except ValueError as e:
394 print >> sys.stderr, 'Unable to process %s: %s' % (test_name, e)
395 return 1
396
397 chromium_setup(manifest)
398
399 # Zip up relevant files.
400 print('Zipping up files...')
401 if not manifest.zip_and_upload():
402 return 1
403
404 # Send test requests off to swarm.
405 print('Sending test requests to swarm.')
406 print('Server: %s' % swarm_url)
407 print('Job name: %s' % test_name)
408 test_url = swarm_url.rstrip('/') + '/test'
409 manifest_text = manifest.to_json()
410 result = run_isolated.url_open(test_url, data={'request': manifest_text})
411 if not result:
412 print >> sys.stderr, 'Failed to send test for %s\n%s' % (
413 test_name, test_url)
414 return 1
415 try:
416 json.load(result)
417 except (ValueError, TypeError) as e:
418 print >> sys.stderr, 'Failed to send test for %s' % test_name
419 print >> sys.stderr, 'Manifest: %s' % manifest_text
420 print >> sys.stderr, str(e)
421 return 1
422 return 0
423
424
425 def trigger(
426 os_image,
Vadim Sh. 2013/08/19 18:55:44 That's one hell of an argument list. Maybe put so
M-A Ruel 2013/08/20 17:09:30 The Manifest object will have to be redesigned, bu
427 tasks,
428 test_name_prefix,
429 working_dir,
430 cac,
431 swarm_url,
432 verbose,
433 profile,
434 priority):
435 """Sends off the hash swarming test requests."""
436 if not os_image or os_image == 'None':
437 # Use the current OS.
438 os_image = sys.platform
439
440 highest_exit_code = 0
441 for (file_sha1, test_name, shards, testfilter) in tasks:
442 exit_code = process_manifest(
443 file_sha1,
444 test_name_prefix + test_name,
445 int(shards),
446 testfilter,
447 os_image,
448 working_dir,
449 cac,
450 swarm_url,
451 verbose,
452 profile,
453 priority)
454 highest_exit_code = max(highest_exit_code, exit_code)
455 return highest_exit_code
456
457
458 def collect(url, test_name, timeout):
459 """Retrieves results of a Swarming job."""
460 test_keys = get_test_keys(url, test_name)
461 if not test_keys:
462 raise Failure('No test keys to get results with.')
463
464 exit_code = 0
465 for _index, output in yield_results(url, test_keys, timeout, None):
466 print(
467 '%s/%s: %s' % (
468 output['machine_id'], output['machine_tag'], output['exit_codes']))
469 print(''.join(' %s\n' % l for l in output['output'].splitlines()))
470 exit_code = max(exit_code, max(map(int, output['exit_codes'].split(','))))
471 return exit_code
472
473
474 def add_trigger_options(parser):
475 """Adds all options to trigger a task on Swarming."""
476 parser.add_option(
477 '-c', '--cac',
M-A Ruel 2013/08/18 00:18:19 I don't like --data_server. --isolate_server (like
478 default=ISOLATE_SERVER,
479 metavar='URL',
480 help='Isolate server where data is stored. default: %default')
481 parser.add_option(
482 '-w', '--working_dir', default='swarm_tests',
483 help='Working directory on the swarm slave side. default: %default.')
484 parser.add_option(
485 '-o', '--os_image',
M-A Ruel 2013/08/18 00:18:19 --os makes sense, but --os_image is weird.
Vadim Sh. 2013/08/19 18:55:44 If it's 'one of valid sys.platform' maybe call it
M-A Ruel 2013/08/20 17:09:30 platform is more ambiguous than 'os'
486 help='Swarm OS image to request. Should be one of the valid sys.platform '
487 'values like darwin, linux2 or win32.')
488 parser.add_option(
489 '-T', '--test-name-prefix', default='',
Vadim Sh. 2013/08/19 18:55:44 I personally prefer dashes in argument names (like
M-A Ruel 2013/08/20 17:09:30 --task-prefix.
490 help='Prefix to give the swarm test request. default: %default')
491 parser.add_option(
492 '--profile', action='store_true',
493 default=bool(os.environ.get('ISOLATE_DEBUG')),
494 help='Have run_isolated.py print profiling info')
495 parser.add_option(
496 '--priority', type='int', default=100,
497 help='The lower value, the more important the task is')
498
499
500 def add_collect_options(parser):
501 parser.add_option(
502 '-t', '--timeout',
503 type='float',
504 default=DEFAULT_SHARD_WAIT_TIME,
505 help='Timeout to wait for result, set to 0 for no timeout; default: '
506 '%default s')
507
508
509 @subcommand.usage('test_name')
510 def CMDcollect(parser, args):
511 """Retrieves results of a Swarming job.
512
513 The result can be in multiple part if the execution was sharded. It can
514 potentially have retries.
515 """
516 add_collect_options(parser)
517 (options, args) = parser.parse_args(args)
518 if not args:
519 parser.error('Must specify one test name.')
520 elif len(args) > 1:
521 parser.error('Must specify only one test name.')
522
523 try:
524 return collect(options.server, args[0], options.timeout)
525 except Failure as e:
526 parser.error(e.args[0])
527
528
529 @subcommand.usage('[sha1|isolated ...]')
530 def CMDrun(parser, args):
531 """Trigger a job and wait for the results.
532
533 Basically, does everything to run a command remotely.
534 """
535 add_trigger_options(parser)
536 add_collect_options(parser)
537 options, args = parser.parse_args(args)
538
539 if not args:
540 parser.error('Must pass at least one .isolated file or its sha1.')
541
542 success = []
543 for arg in args:
544 try:
545 result = trigger(
546 options.os_image,
547 options.tasks,
548 options.test_name_prefix,
549 options.working_dir,
550 options.cac,
551 options.swarm_url,
552 options.verbose,
553 options.profile,
554 options.priority)
555 except Failure as e:
556 parser.error(e.args[0])
557 success.append(arg)
558
559 if not success:
560 return result
561
562 code = 0
563 for arg in success:
564 try:
565 code = min(code, collect(options.server, arg, options.timeout))
566 except Failure as e:
567 result = 1
568 print >> sys.stderr, e.args[0]
569 return code
570
571
572 def CMDtrigger(parser, args):
573 """Triggers Swarm request(s).
574
575 Accepts one or multiple --task requests, with either the sha1 of a .isolated
576 file already uploaded or the path to an .isolated file to archive, packages it
577 if needed and sends a Swarm manifest file to the Swarm server.
578 """
579 add_trigger_options(parser)
580 parser.add_option(
581 '--task', nargs=4, action='append', default=[], dest='tasks',
M-A Ruel 2013/08/18 00:18:19 I like --task contrary to --run_from_hash, especia
582 help='Task to trigger. The format is '
583 '(hash|isolated, test_name, shards, test_filter). This may be '
584 'used multiple times to send multiple hashes jobs. If an isolated '
585 'file is specified instead of an hash, it is first archived.')
586 (options, args) = parser.parse_args(args)
587
588 if args:
589 parser.error('Unknown args: %s' % args)
590 if not options.tasks:
591 parser.error('At least one --task is required.')
592 if not options.cac:
593 parser.error('Must specify the CAC server.')
594
595 try:
596 return trigger(
597 options.os_image,
598 options.tasks,
599 options.test_name_prefix,
600 options.working_dir,
601 options.cac,
602 options.server,
603 options.verbose,
604 options.profile,
605 options.priority)
606 except Failure as e:
607 parser.error(e.args[0])
608
609
610 class OptionParserSwarming(trace_inputs.OptionParserWithLogging):
611 def __init__(self, **kwargs):
612 trace_inputs.OptionParserWithLogging.__init__(
613 self, prog='swarming.py', **kwargs)
614 self.add_option(
615 '-s', '--server', default=SWARM_SERVER,
M-A Ruel 2013/08/18 00:18:19 --server is too generic. --swarm_url, for an unkno
616 help='Specify the url of the Swarming server, default: %default')
617
618 def parse_args(self, *args, **kwargs):
619 options, args = trace_inputs.OptionParserWithLogging.parse_args(
620 self, *args, **kwargs)
621 options.server = options.server.rstrip('/')
622 if not options.server:
623 self.error('--server is required')
624 return options, args
625
626
627 def main(args):
628 dispatcher = subcommand.CommandDispatcher(__name__)
629 try:
630 return dispatcher.execute(OptionParserSwarming(), args)
631 except (
632 Failure,
633 run_isolated.MappingError,
634 run_isolated.ConfigError) as e:
635 sys.stderr.write('\nError: ')
636 sys.stderr.write(str(e))
637 sys.stderr.write('\n')
638 return 1
639
640
641 if __name__ == '__main__':
642 fix_encoding.fix_encoding()
643 run_isolated.disable_buffering()
644 sys.exit(main(sys.argv[1:]))
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698