Index: swarm_client/swarming.py |
=================================================================== |
--- swarm_client/swarming.py (revision 235167) |
+++ swarm_client/swarming.py (working copy) |
@@ -1,673 +0,0 @@ |
-#!/usr/bin/env python |
-# Copyright 2013 The Chromium Authors. All rights reserved. |
-# Use of this source code is governed by a BSD-style license that can be |
-# found in the LICENSE file. |
- |
-"""Client tool to trigger tasks or retrieve results from a Swarming server.""" |
- |
-__version__ = '0.1' |
- |
-import hashlib |
-import json |
-import logging |
-import os |
-import shutil |
-import subprocess |
-import sys |
-import time |
-import urllib |
- |
-from third_party import colorama |
-from third_party.depot_tools import fix_encoding |
-from third_party.depot_tools import subcommand |
- |
-from utils import net |
-from utils import threading_utils |
-from utils import tools |
-from utils import zip_package |
- |
-import isolateserver |
-import run_isolated |
- |
- |
-ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) |
-TOOLS_PATH = os.path.join(ROOT_DIR, 'tools') |
- |
- |
-# The default time to wait for a shard to finish running. |
-DEFAULT_SHARD_WAIT_TIME = 80 * 60. |
- |
- |
-NO_OUTPUT_FOUND = ( |
- 'No output produced by the test, it may have failed to run.\n' |
- '\n') |
- |
- |
-# TODO(maruel): cygwin != Windows. If a swarm_bot is running in cygwin, it's |
-# different from running in native python. |
-PLATFORM_MAPPING_SWARMING = { |
- 'cygwin': 'Windows', |
- 'darwin': 'Mac', |
- 'linux2': 'Linux', |
- 'win32': 'Windows', |
-} |
- |
-PLATFORM_MAPPING_ISOLATE = { |
- 'linux2': 'linux', |
- 'darwin': 'mac', |
- 'win32': 'win', |
-} |
- |
- |
-class Failure(Exception): |
- """Generic failure.""" |
- pass |
- |
- |
-class Manifest(object): |
- """Represents a Swarming task manifest. |
- |
- Also includes code to zip code and upload itself. |
- """ |
- def __init__( |
- self, isolated_hash, test_name, shards, test_filter, slave_os, |
- working_dir, isolate_server, verbose, profile, priority, algo): |
- """Populates a manifest object. |
- Args: |
- isolated_hash - The manifest's sha-1 that the slave is going to fetch. |
- test_name - The name to give the test request. |
- shards - The number of swarm shards to request. |
- test_filter - The gtest filter to apply when running the test. |
- slave_os - OS to run on. |
- working_dir - Relative working directory to start the script. |
- isolate_server - isolate server url. |
- verbose - if True, have the slave print more details. |
- profile - if True, have the slave print more timing data. |
- priority - int between 0 and 1000, lower the higher priority. |
- algo - hashing algorithm used. |
- """ |
- self.isolated_hash = isolated_hash |
- self.bundle = zip_package.ZipPackage(ROOT_DIR) |
- |
- self._test_name = test_name |
- self._shards = shards |
- self._test_filter = test_filter |
- self._target_platform = slave_os |
- self._working_dir = working_dir |
- |
- self.isolate_server = isolate_server |
- self.storage = isolateserver.get_storage(isolate_server, 'default') |
- self.verbose = bool(verbose) |
- self.profile = bool(profile) |
- self.priority = priority |
- self._algo = algo |
- |
- self._isolate_item = None |
- self._tasks = [] |
- |
- def add_task(self, task_name, actions, time_out=600): |
- """Appends a new task to the swarm manifest file.""" |
- # See swarming/src/common/test_request_message.py TestObject constructor for |
- # the valid flags. |
- self._tasks.append( |
- { |
- 'action': actions, |
- 'decorate_output': self.verbose, |
- 'test_name': task_name, |
- 'time_out': time_out, |
- }) |
- |
- def zip_and_upload(self): |
- """Zips up all the files necessary to run a shard and uploads to Swarming |
- master. |
- """ |
- assert not self._isolate_item |
- |
- start_time = time.time() |
- self._isolate_item = isolateserver.BufferItem( |
- self.bundle.zip_into_buffer(), self._algo, is_isolated=True) |
- print 'Zipping completed, time elapsed: %f' % (time.time() - start_time) |
- |
- try: |
- start_time = time.time() |
- uploaded = self.storage.upload_items([self._isolate_item]) |
- elapsed = time.time() - start_time |
- except (IOError, OSError) as exc: |
- tools.report_error('Failed to upload the zip file: %s' % exc) |
- return False |
- |
- if self._isolate_item in uploaded: |
- print 'Upload complete, time elapsed: %f' % elapsed |
- else: |
- print 'Zip file already on server, time elapsed: %f' % elapsed |
- |
- return True |
- |
- def to_json(self): |
- """Exports the current configuration into a swarm-readable manifest file. |
- |
- This function doesn't mutate the object. |
- """ |
- test_case = { |
- 'test_case_name': self._test_name, |
- 'data': [], |
- 'tests': self._tasks, |
- # TODO: Let the encoding get set from the command line. |
- 'encoding': 'UTF-8', |
- 'env_vars': {}, |
- 'configurations': [ |
- { |
- 'min_instances': self._shards, |
- 'config_name': self._target_platform, |
- 'priority': self.priority, |
- 'dimensions': { |
- 'os': self._target_platform, |
- }, |
- }, |
- ], |
- 'working_dir': self._working_dir, |
- 'restart_on_failure': True, |
- 'cleanup': 'root', |
- } |
- if self._isolate_item: |
- test_case['data'].append( |
- [ |
- self.storage.get_fetch_url(self._isolate_item.digest), |
- 'swarm_data.zip', |
- ]) |
- # These flags are googletest specific. |
- if self._test_filter and self._test_filter != '*': |
- test_case['env_vars']['GTEST_FILTER'] = self._test_filter |
- if self._shards > 1: |
- test_case['env_vars']['GTEST_SHARD_INDEX'] = '%(instance_index)s' |
- test_case['env_vars']['GTEST_TOTAL_SHARDS'] = '%(num_instances)s' |
- |
- return json.dumps(test_case, separators=(',',':')) |
- |
- |
-def now(): |
- """Exists so it can be mocked easily.""" |
- return time.time() |
- |
- |
-def get_test_keys(swarm_base_url, test_name): |
- """Returns the Swarm test key for each shards of test_name.""" |
- key_data = urllib.urlencode([('name', test_name)]) |
- url = '%s/get_matching_test_cases?%s' % (swarm_base_url, key_data) |
- |
- for _ in net.retry_loop(max_attempts=net.URL_OPEN_MAX_ATTEMPTS): |
- result = net.url_read(url, retry_404=True) |
- if result is None: |
- raise Failure( |
- 'Error: Unable to find any tests with the name, %s, on swarm server' |
- % test_name) |
- |
- # TODO(maruel): Compare exact string. |
- if 'No matching' in result: |
- logging.warning('Unable to find any tests with the name, %s, on swarm ' |
- 'server' % test_name) |
- continue |
- return json.loads(result) |
- |
- raise Failure( |
- 'Error: Unable to find any tests with the name, %s, on swarm server' |
- % test_name) |
- |
- |
-def retrieve_results(base_url, test_key, timeout, should_stop): |
- """Retrieves results for a single test_key.""" |
- assert isinstance(timeout, float), timeout |
- params = [('r', test_key)] |
- result_url = '%s/get_result?%s' % (base_url, urllib.urlencode(params)) |
- start = now() |
- while True: |
- if timeout and (now() - start) >= timeout: |
- logging.error('retrieve_results(%s) timed out', base_url) |
- return {} |
- # Do retries ourselves. |
- response = net.url_read(result_url, retry_404=False, retry_50x=False) |
- if response is None: |
- # Aggressively poll for results. Do not use retry_404 so |
- # should_stop is polled more often. |
- remaining = min(5, timeout - (now() - start)) if timeout else 5 |
- if remaining > 0: |
- if should_stop.get(): |
- return {} |
- net.sleep_before_retry(1, remaining) |
- else: |
- try: |
- data = json.loads(response) or {} |
- except (ValueError, TypeError): |
- logging.warning( |
- 'Received corrupted data for test_key %s. Retrying.', test_key) |
- else: |
- if data['output']: |
- return data |
- if should_stop.get(): |
- return {} |
- |
- |
-def yield_results(swarm_base_url, test_keys, timeout, max_threads): |
- """Yields swarm test results from the swarm server as (index, result). |
- |
- Duplicate shards are ignored, the first one to complete is returned. |
- |
- max_threads is optional and is used to limit the number of parallel fetches |
- done. Since in general the number of test_keys is in the range <=10, it's not |
- worth normally to limit the number threads. Mostly used for testing purposes. |
- """ |
- shards_remaining = range(len(test_keys)) |
- number_threads = ( |
- min(max_threads, len(test_keys)) if max_threads else len(test_keys)) |
- should_stop = threading_utils.Bit() |
- results_remaining = len(test_keys) |
- with threading_utils.ThreadPool(number_threads, number_threads, 0) as pool: |
- try: |
- for test_key in test_keys: |
- pool.add_task( |
- 0, retrieve_results, swarm_base_url, test_key, timeout, should_stop) |
- while shards_remaining and results_remaining: |
- result = pool.get_one_result() |
- results_remaining -= 1 |
- if not result: |
- # Failed to retrieve one key. |
- logging.error('Failed to retrieve the results for a swarm key') |
- continue |
- shard_index = result['config_instance_index'] |
- if shard_index in shards_remaining: |
- shards_remaining.remove(shard_index) |
- yield shard_index, result |
- else: |
- logging.warning('Ignoring duplicate shard index %d', shard_index) |
- # Pop the last entry, there's no such shard. |
- shards_remaining.pop() |
- finally: |
- # Done, kill the remaining threads. |
- should_stop.set() |
- |
- |
-def chromium_setup(manifest): |
- """Sets up the commands to run. |
- |
- Highly chromium specific. |
- """ |
- # Add uncompressed zip here. It'll be compressed as part of the package sent |
- # to Swarming server. |
- run_test_name = 'run_isolated.zip' |
- manifest.bundle.add_buffer(run_test_name, |
- run_isolated.get_as_zip_package().zip_into_buffer(compress=False)) |
- |
- cleanup_script_name = 'swarm_cleanup.py' |
- manifest.bundle.add_file(os.path.join(TOOLS_PATH, cleanup_script_name), |
- cleanup_script_name) |
- |
- run_cmd = [ |
- 'python', run_test_name, |
- '--hash', manifest.isolated_hash, |
- '--isolate-server', manifest.isolate_server, |
- ] |
- if manifest.verbose or manifest.profile: |
- # Have it print the profiling section. |
- run_cmd.append('--verbose') |
- manifest.add_task('Run Test', run_cmd) |
- |
- # Clean up |
- manifest.add_task('Clean Up', ['python', cleanup_script_name]) |
- |
- |
-def archive(isolated, isolate_server, os_slave, algo, verbose): |
- """Archives a .isolated and all the dependencies on the CAC.""" |
- tempdir = None |
- try: |
- logging.info('archive(%s, %s)', isolated, isolate_server) |
- cmd = [ |
- sys.executable, |
- os.path.join(ROOT_DIR, 'isolate.py'), |
- 'archive', |
- '--outdir', isolate_server, |
- '--isolated', isolated, |
- '-V', 'OS', PLATFORM_MAPPING_ISOLATE[os_slave], |
- ] |
- cmd.extend(['--verbose'] * verbose) |
- logging.info(' '.join(cmd)) |
- if subprocess.call(cmd, verbose): |
- return |
- return isolateserver.hash_file(isolated, algo) |
- finally: |
- if tempdir: |
- shutil.rmtree(tempdir) |
- |
- |
-def process_manifest( |
- file_hash_or_isolated, test_name, shards, test_filter, slave_os, |
- working_dir, isolate_server, swarming, verbose, profile, priority, algo): |
- """Process the manifest file and send off the swarm test request. |
- |
- Optionally archives an .isolated file. |
- """ |
- if file_hash_or_isolated.endswith('.isolated'): |
- file_hash = archive( |
- file_hash_or_isolated, isolate_server, slave_os, algo, verbose) |
- if not file_hash: |
- tools.report_error('Archival failure %s' % file_hash_or_isolated) |
- return 1 |
- elif isolateserver.is_valid_hash(file_hash_or_isolated, algo): |
- file_hash = file_hash_or_isolated |
- else: |
- tools.report_error('Invalid hash %s' % file_hash_or_isolated) |
- return 1 |
- |
- try: |
- manifest = Manifest( |
- file_hash, |
- test_name, |
- shards, |
- test_filter, |
- PLATFORM_MAPPING_SWARMING[slave_os], |
- working_dir, |
- isolate_server, |
- verbose, |
- profile, |
- priority, |
- algo) |
- except ValueError as e: |
- tools.report_error('Unable to process %s: %s' % (test_name, e)) |
- return 1 |
- |
- chromium_setup(manifest) |
- |
- # Zip up relevant files. |
- print('Zipping up files...') |
- if not manifest.zip_and_upload(): |
- return 1 |
- |
- # Send test requests off to swarm. |
- print('Sending test requests to swarm.') |
- print('Server: %s' % swarming) |
- print('Job name: %s' % test_name) |
- test_url = swarming + '/test' |
- manifest_text = manifest.to_json() |
- result = net.url_read(test_url, data={'request': manifest_text}) |
- if not result: |
- tools.report_error( |
- 'Failed to send test for %s\n%s' % (test_name, test_url)) |
- return 1 |
- try: |
- json.loads(result) |
- except (ValueError, TypeError) as e: |
- msg = '\n'.join(( |
- 'Failed to send test for %s' % test_name, |
- 'Manifest: %s' % manifest_text, |
- 'Bad response: %s' % result, |
- str(e))) |
- tools.report_error(msg) |
- return 1 |
- return 0 |
- |
- |
-def trigger( |
- slave_os, |
- tasks, |
- task_prefix, |
- working_dir, |
- isolate_server, |
- swarming, |
- verbose, |
- profile, |
- priority): |
- """Sends off the hash swarming test requests.""" |
- highest_exit_code = 0 |
- for (file_hash, test_name, shards, testfilter) in tasks: |
- # TODO(maruel): It should first create a request manifest object, then pass |
- # it to a function to zip, archive and trigger. |
- exit_code = process_manifest( |
- file_hash, |
- task_prefix + test_name, |
- int(shards), |
- testfilter, |
- slave_os, |
- working_dir, |
- isolate_server, |
- swarming, |
- verbose, |
- profile, |
- priority, |
- hashlib.sha1) |
- highest_exit_code = max(highest_exit_code, exit_code) |
- return highest_exit_code |
- |
- |
-def decorate_shard_output(result, shard_exit_code): |
- """Returns wrapped output for swarming task shard.""" |
- tag = 'index %s (machine tag: %s, id: %s)' % ( |
- result['config_instance_index'], |
- result['machine_id'], |
- result.get('machine_tag', 'unknown')) |
- return ( |
- '\n' |
- '================================================================\n' |
- 'Begin output from shard %s\n' |
- '================================================================\n' |
- '\n' |
- '%s' |
- '================================================================\n' |
- 'End output from shard %s. Return %d\n' |
- '================================================================\n' |
- ) % (tag, result['output'] or NO_OUTPUT_FOUND, tag, shard_exit_code) |
- |
- |
-def collect(url, test_name, timeout, decorate): |
- """Retrieves results of a Swarming job.""" |
- test_keys = get_test_keys(url, test_name) |
- if not test_keys: |
- raise Failure('No test keys to get results with.') |
- |
- exit_code = None |
- for _index, output in yield_results(url, test_keys, timeout, None): |
- shard_exit_codes = (output['exit_codes'] or '1').split(',') |
- shard_exit_code = max(int(i) for i in shard_exit_codes) |
- if decorate: |
- print decorate_shard_output(output, shard_exit_code) |
- else: |
- print( |
- '%s/%s: %s' % ( |
- output['machine_id'], |
- output['machine_tag'], |
- output['exit_codes'])) |
- print(''.join(' %s\n' % l for l in output['output'].splitlines())) |
- exit_code = exit_code or shard_exit_code |
- return exit_code if exit_code is not None else 1 |
- |
- |
-def add_trigger_options(parser): |
- """Adds all options to trigger a task on Swarming.""" |
- parser.add_option( |
- '-I', '--isolate-server', |
- metavar='URL', default='', |
- help='Isolate server to use') |
- parser.add_option( |
- '-w', '--working_dir', default='swarm_tests', |
- help='Working directory on the swarm slave side. default: %default.') |
- parser.add_option( |
- '-o', '--os', default=sys.platform, |
- help='Swarm OS image to request. Should be one of the valid sys.platform ' |
- 'values like darwin, linux2 or win32 default: %default.') |
- parser.add_option( |
- '-T', '--task-prefix', default='', |
- help='Prefix to give the swarm test request. default: %default') |
- parser.add_option( |
- '--profile', action='store_true', |
- default=bool(os.environ.get('ISOLATE_DEBUG')), |
- help='Have run_isolated.py print profiling info') |
- parser.add_option( |
- '--priority', type='int', default=100, |
- help='The lower value, the more important the task is') |
- |
- |
-def process_trigger_options(parser, options): |
- options.isolate_server = options.isolate_server.rstrip('/') |
- if not options.isolate_server: |
- parser.error('--isolate-server is required.') |
- if options.os in ('', 'None'): |
- # Use the current OS. |
- options.os = sys.platform |
- if not options.os in PLATFORM_MAPPING_SWARMING: |
- parser.error('Invalid --os option.') |
- |
- |
-def add_collect_options(parser): |
- parser.add_option( |
- '-t', '--timeout', |
- type='float', |
- default=DEFAULT_SHARD_WAIT_TIME, |
- help='Timeout to wait for result, set to 0 for no timeout; default: ' |
- '%default s') |
- parser.add_option('--decorate', action='store_true', help='Decorate output') |
- |
- |
-@subcommand.usage('test_name') |
-def CMDcollect(parser, args): |
- """Retrieves results of a Swarming job. |
- |
- The result can be in multiple part if the execution was sharded. It can |
- potentially have retries. |
- """ |
- add_collect_options(parser) |
- (options, args) = parser.parse_args(args) |
- if not args: |
- parser.error('Must specify one test name.') |
- elif len(args) > 1: |
- parser.error('Must specify only one test name.') |
- |
- try: |
- return collect(options.swarming, args[0], options.timeout, options.decorate) |
- except Failure as e: |
- tools.report_error(e) |
- return 1 |
- |
- |
-@subcommand.usage('[hash|isolated ...]') |
-def CMDrun(parser, args): |
- """Triggers a job and wait for the results. |
- |
- Basically, does everything to run command(s) remotely. |
- """ |
- add_trigger_options(parser) |
- add_collect_options(parser) |
- options, args = parser.parse_args(args) |
- |
- if not args: |
- parser.error('Must pass at least one .isolated file or its hash (sha1).') |
- process_trigger_options(parser, options) |
- |
- success = [] |
- for arg in args: |
- logging.info('Triggering %s', arg) |
- try: |
- result = trigger( |
- options.os, |
- [(arg, os.path.basename(arg), '1', '')], |
- options.task_prefix, |
- options.working_dir, |
- options.isolate_server, |
- options.swarming, |
- options.verbose, |
- options.profile, |
- options.priority) |
- except Failure as e: |
- result = e.args[0] |
- if result: |
- tools.report_error('Failed to trigger %s: %s' % (arg, result)) |
- else: |
- success.append(os.path.basename(arg)) |
- |
- if not success: |
- tools.report_error('Failed to trigger any job.') |
- return result |
- |
- code = 0 |
- for arg in success: |
- logging.info('Collecting %s', arg) |
- try: |
- new_code = collect( |
- options.swarming, |
- options.task_prefix + arg, |
- options.timeout, |
- options.decorate) |
- code = max(code, new_code) |
- except Failure as e: |
- code = max(code, 1) |
- tools.report_error(e) |
- return code |
- |
- |
-def CMDtrigger(parser, args): |
- """Triggers Swarm request(s). |
- |
- Accepts one or multiple --task requests, with either the hash (sha1) of a |
- .isolated file already uploaded or the path to an .isolated file to archive, |
- packages it if needed and sends a Swarm manifest file to the Swarm server. |
- """ |
- add_trigger_options(parser) |
- parser.add_option( |
- '--task', nargs=4, action='append', default=[], dest='tasks', |
- help='Task to trigger. The format is ' |
- '(hash|isolated, test_name, shards, test_filter). This may be ' |
- 'used multiple times to send multiple hashes jobs. If an isolated ' |
- 'file is specified instead of an hash, it is first archived.') |
- (options, args) = parser.parse_args(args) |
- |
- if args: |
- parser.error('Unknown args: %s' % args) |
- process_trigger_options(parser, options) |
- if not options.tasks: |
- parser.error('At least one --task is required.') |
- |
- try: |
- return trigger( |
- options.os, |
- options.tasks, |
- options.task_prefix, |
- options.working_dir, |
- options.isolate_server, |
- options.swarming, |
- options.verbose, |
- options.profile, |
- options.priority) |
- except Failure as e: |
- tools.report_error(e) |
- return 1 |
- |
- |
-class OptionParserSwarming(tools.OptionParserWithLogging): |
- def __init__(self, **kwargs): |
- tools.OptionParserWithLogging.__init__( |
- self, prog='swarming.py', **kwargs) |
- self.add_option( |
- '-S', '--swarming', |
- metavar='URL', default='', |
- help='Swarming server to use') |
- |
- def parse_args(self, *args, **kwargs): |
- options, args = tools.OptionParserWithLogging.parse_args( |
- self, *args, **kwargs) |
- options.swarming = options.swarming.rstrip('/') |
- if not options.swarming: |
- self.error('--swarming is required.') |
- return options, args |
- |
- |
-def main(args): |
- dispatcher = subcommand.CommandDispatcher(__name__) |
- try: |
- return dispatcher.execute(OptionParserSwarming(version=__version__), args) |
- except Exception as e: |
- tools.report_error(e) |
- return 1 |
- |
- |
-if __name__ == '__main__': |
- fix_encoding.fix_encoding() |
- tools.disable_buffering() |
- colorama.init() |
- sys.exit(main(sys.argv[1:])) |