Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(487)

Unified Diff: swarming.py

Issue 22980008: Merge all swarm_*.py scripts into swarming.py. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/swarm_client
Patch Set: Created 7 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: swarming.py
diff --git a/swarming.py b/swarming.py
new file mode 100755
index 0000000000000000000000000000000000000000..83aa12e6e81d8b6930648f64edcce7863d87229b
--- /dev/null
+++ b/swarming.py
@@ -0,0 +1,635 @@
+#!/usr/bin/env python
+# Copyright (c) 2012 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import StringIO
+import datetime
+import getpass
+import hashlib
+import json
+import logging
+import optparse
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import threading
+import time
+import urllib
+import zipfile
+
+import trace_inputs
+from third_party.depot_tools import fix_encoding
Vadim Sh. 2013/08/16 18:58:58 nit: I'd rearrange this imports into two groups:
M-A Ruel 2013/08/18 00:18:18 Done.
+from third_party.depot_tools import subcommand
+import run_isolated
+
+
+ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
+TOOLS_PATH = os.path.join(ROOT_DIR, 'tools')
+
+
+# Default servers.
+# TODO(maruel): Chromium-specific.
+ISOLATE_SERVER = 'https://isolateserver-dev.appspot.com/'
Vadim Sh. 2013/08/16 18:58:58 At some point we need to get rid of this defaults
M-A Ruel 2013/08/18 00:18:18 Yes. But it's out of scope for this CL.
+SWARM_SERVER = 'https://chromium-swarm-dev.appspot.com'
+
+
+# The default time to wait for a shard to finish running.
+DEFAULT_SHARD_WAIT_TIME = 40 * 60.
+
+
+PLATFORM_MAPPING = {
+ 'cygwin': 'Windows',
+ 'darwin': 'Mac',
+ 'linux2': 'Linux',
+ 'win32': 'Windows',
+}
+
+
+class Failure(Exception):
+ """Generic failure."""
+ pass
+
+
+class Manifest(object):
+ """Represents a Swarming task manifest.
+
+ Also includes code to zip code and upload itself.
+ """
+ def __init__(
+ self, manifest_hash, test_name, shards, test_filter, os_image,
+ working_dir, data_server, verbose, profile, priority):
+ """Populates a manifest object.
+ Args:
+ manifest_hash - The manifest's sha-1 that the slave is going to fetch.
+ test_name - The name to give the test request.
+ shards - The number of swarm shards to request.
+ test_filter - The gtest filter to apply when running the test.
+ os_image - OS to run on.
+ working_dir - Relative working directory to start the script.
+ data_server - isolate server url.
+ verbose - if True, have the slave print more details.
+ profile - if True, have the slave print more timing data.
+ priority - int between 0 and 1000, lower the higher priority
+ """
+ self.manifest_hash = manifest_hash
+ self._test_name = test_name
+ self._shards = shards
+ self._test_filter = test_filter
+ self._target_platform = PLATFORM_MAPPING[os_image]
+ self._working_dir = working_dir
+
+ base_url = data_server.rstrip('/')
+ self.data_server_retrieval = base_url + '/content/retrieve/default/'
+ self._data_server_storage = base_url + '/content/store/default/'
+ self._data_server_has = base_url + '/content/contains/default'
+ self._data_server_get_token = base_url + '/content/get_token'
+
+ self.verbose = bool(verbose)
+ self.profile = bool(profile)
+ self.priority = priority
+
+ self._zip_file_hash = ''
+ self._tasks = []
+ self._files = {}
+ self._token_cache = None
+
+ def _token(self):
+ if not self._token_cache:
+ result = run_isolated.url_open(self._data_server_get_token)
+ if not result:
+ # TODO(maruel): Implement authentication.
+ raise Failure('Failed to get token, need authentication')
+ # Quote it right away, so creating the urls is simpler.
+ self._token_cache = urllib.quote(result.read())
+ return self._token_cache
+
+ def add_task(self, task_name, actions, time_out=600):
+ """Appends a new task to the swarm manifest file."""
+ # See swarming/src/common/test_request_message.py TestObject constructor for
+ # the valid flags.
+ self._tasks.append(
+ {
+ 'action': actions,
+ 'decorate_output': self.verbose,
+ 'test_name': task_name,
+ 'time_out': time_out,
+ })
+
+ def add_file(self, source_path, rel_path):
+ self._files[source_path] = rel_path
+
+ def zip_and_upload(self):
+ """Zips up all the files necessary to run a shard and uploads to Swarming
+ master.
+ """
+ assert not self._zip_file_hash
+ start_time = time.time()
+
+ zip_memory_file = StringIO.StringIO()
+ zip_file = zipfile.ZipFile(zip_memory_file, 'w')
+
+ for source, relpath in self._files.iteritems():
+ zip_file.write(source, relpath)
+
+ zip_file.close()
+ print 'Zipping completed, time elapsed: %f' % (time.time() - start_time)
+
+ zip_memory_file.flush()
+ zip_contents = zip_memory_file.getvalue()
+ zip_memory_file.close()
+
+ self._zip_file_hash = hashlib.sha1(zip_contents).hexdigest()
+
+ response = run_isolated.url_open(
+ self._data_server_has + '?token=%s' % self._token(),
+ data=self._zip_file_hash,
+ content_type='application/octet-stream')
+ if response is None:
+ print >> sys.stderr, (
+ 'Unable to query server for zip file presence, aborting.')
+ return False
+
+ if response.read(1) == chr(1):
+ print 'Zip file already on server, no need to reupload.'
+ return True
+
+ print 'Zip file not on server, starting uploading.'
+
+ url = '%s%s?priority=0&token=%s' % (
+ self._data_server_storage, self._zip_file_hash, self._token())
+ response = run_isolated.url_open(
+ url, data=zip_contents, content_type='application/octet-stream')
+ if response is None:
+ print >> sys.stderr, 'Failed to upload the zip file: %s' % url
+ return False
+
+ return True
+
+ def to_json(self):
+ """Exports the current configuration into a swarm-readable manifest file.
+
+ This function doesn't mutate the object.
+ """
+ test_case = {
+ 'test_case_name': self._test_name,
+ 'data': [
+ [self.data_server_retrieval + urllib.quote(self._zip_file_hash),
+ 'swarm_data.zip'],
+ ],
+ 'tests': self._tasks,
+ 'env_vars': {},
+ 'configurations': [
+ {
+ 'min_instances': self._shards,
+ 'config_name': self._target_platform,
+ 'dimensions': {
+ 'os': self._target_platform,
+ },
+ },
+ ],
+ 'working_dir': self._working_dir,
+ 'restart_on_failure': True,
+ 'cleanup': 'root',
+ 'priority': self.priority,
+ }
+
+ # These flags are googletest specific.
+ if self._test_filter and self._test_filter != '*':
Vadim Sh. 2013/08/16 18:58:58 Actually that part looks more chromium specific (o
M-A Ruel 2013/08/18 00:18:18 That's what the comment one line above at line 199
+ test_case['env_vars']['GTEST_FILTER'] = self._test_filter
+ if self._shards > 1:
+ test_case['env_vars']['GTEST_SHARD_INDEX'] = '%(instance_index)s'
+ test_case['env_vars']['GTEST_TOTAL_SHARDS'] = '%(num_instances)s'
+
+ return json.dumps(test_case, separators=(',',':'))
+
+
+class Bit(object):
+ """Thread safe setable bit."""
+ _lock = threading.Lock()
+ _value = False
+
+ def get(self):
+ with self._lock:
+ return self._value
+
+ def set(self):
+ with self._lock:
+ self._value = True
+
+
+def now():
+ """Exists so it can be mocked easily."""
+ return time.time()
+
+
+def get_test_keys(swarm_base_url, test_name, _=None):
+ """Returns the Swarm test key for each shards of test_name."""
+ # TODO(maruel): Remove the parameter '_' once the
+ # build/scripts/slave/get_swarm_results.py stops passing it.
Vadim Sh. 2013/08/16 18:58:58 Is it still passing it? :)
M-A Ruel 2013/08/18 00:18:18 Done.
+ key_data = urllib.urlencode([('name', test_name)])
+ url = '%s/get_matching_test_cases?%s' % (swarm_base_url, key_data)
+
+ for i in range(run_isolated.URL_OPEN_MAX_ATTEMPTS):
+ response = run_isolated.url_open(url, retry_404=True)
+ if response is None:
+ raise Failure(
+ 'Error: Unable to find any tests with the name, %s, on swarm server'
+ % test_name)
+
+ result = response.read()
+ # TODO(maruel): Compare exact string.
+ if 'No matching' in result:
+ logging.warning('Unable to find any tests with the name, %s, on swarm '
+ 'server' % test_name)
+ if i != run_isolated.URL_OPEN_MAX_ATTEMPTS:
+ run_isolated.HttpService.sleep_before_retry(i, None)
+ continue
+ return json.loads(result)
+
+ raise Failure(
+ 'Error: Unable to find any tests with the name, %s, on swarm server'
+ % test_name)
+
+
+def retrieve_results(base_url, test_key, timeout, should_stop):
+ """Retrieves results for a single test_key."""
+ assert isinstance(timeout, float)
+ params = [('r', test_key)]
+ result_url = '%s/get_result?%s' % (base_url, urllib.urlencode(params))
+ start = now()
+ while True:
+ if timeout and (now() - start) >= timeout:
+ logging.error('retrieve_results(%s) timed out', base_url)
+ return {}
+ # Do retries ourselves.
+ response = run_isolated.url_open(
+ result_url, retry_404=False, retry_50x=False)
+ if response is None:
+ # Aggressively poll for results. Do not use retry_404 so
+ # should_stop is polled more often.
+ remaining = min(5, timeout - (now() - start)) if timeout else 5
+ if remaining > 0:
+ run_isolated.HttpService.sleep_before_retry(1, remaining)
+ else:
+ try:
+ data = json.load(response) or {}
+ except (ValueError, TypeError):
+ logging.warning(
+ 'Received corrupted data for test_key %s. Retrying.', test_key)
+ else:
+ if data['output']:
+ return data
+ if should_stop.get():
+ return {}
+
+
+def yield_results(swarm_base_url, test_keys, timeout, max_threads):
+ """Yields swarm test results from the swarm server as (index, result).
+
+ Duplicate shards are ignored, the first one to complete is returned.
+
+ max_threads is optional and is used to limit the number of parallel fetches
+ done. Since in general the number of test_keys is in the range <=10, it's not
+ worth normally to limit the number threads. Mostly used for testing purposes.
+ """
+ shards_remaining = range(len(test_keys))
+ number_threads = (
+ min(max_threads, len(test_keys)) if max_threads else len(test_keys))
+ should_stop = Bit()
+ results_remaining = len(test_keys)
+ with run_isolated.ThreadPool(number_threads, number_threads, 0) as pool:
+ try:
+ for test_key in test_keys:
+ pool.add_task(
+ 0, retrieve_results, swarm_base_url, test_key, timeout, should_stop)
+ while shards_remaining and results_remaining:
+ result = pool.get_one_result()
+ results_remaining -= 1
+ if not result:
+ # Failed to retrieve one key.
+ logging.error('Failed to retrieve the results for a swarm key')
+ continue
+ shard_index = result['config_instance_index']
+ if shard_index in shards_remaining:
+ shards_remaining.remove(shard_index)
+ yield shard_index, result
+ else:
+ logging.warning('Ignoring duplicate shard index %d', shard_index)
+ # Pop the last entry, there's no such shard.
+ shards_remaining.pop()
+ finally:
+ # Done, kill the remaining threads.
+ should_stop.set()
+
+
+def chromium_setup(manifest):
+ """Sets up the commands to run.
+
+ Highly chromium specific.
Vadim Sh. 2013/08/16 18:58:58 Actually 'run_isolated.py' is not that chromium sp
M-A Ruel 2013/08/18 00:18:18 But still. We'll have to make this less hardcoded.
+ """
+ cleanup_script_name = 'swarm_cleanup.py'
+ cleanup_script_path = os.path.join(TOOLS_PATH, cleanup_script_name)
+ run_test_name = 'run_isolated.py'
+ run_test_path = os.path.join(ROOT_DIR, run_test_name)
+
+ manifest.add_file(run_test_path, run_test_name)
+ manifest.add_file(cleanup_script_path, cleanup_script_name)
+ run_cmd = [
+ 'python', run_test_name,
+ '--hash', manifest.manifest_hash,
+ '--remote', manifest.data_server_retrieval.rstrip('/') + '-gzip/',
+ ]
+ if manifest.verbose or manifest.profile:
+ # Have it print the profiling section.
+ run_cmd.append('--verbose')
+ manifest.add_task('Run Test', run_cmd)
+
+ # Clean up
+ manifest.add_task('Clean Up', ['python', cleanup_script_name])
+
+
+def process_manifest(
+ file_sha1, test_name, shards, test_filter, os_image, working_dir,
+ data_server, swarm_url, verbose, profile, priority):
+ """Process the manifest file and send off the swarm test request."""
+ try:
+ manifest = Manifest(
+ file_sha1, test_name, shards, test_filter, os_image, working_dir,
+ data_server, verbose, profile, priority)
+ except ValueError as e:
+ print >> sys.stderr, 'Unable to process %s: %s' % (test_name, e)
+ return 1
+
+ chromium_setup(manifest)
+
+ # Zip up relevent files
+ print "Zipping up files..."
+ if not manifest.zip_and_upload():
+ return 1
+
+ # Send test requests off to swarm.
+ print('Sending test requests to swarm.')
+ print('Server: %s' % swarm_url)
+ print('Job name: %s' % test_name)
+ test_url = swarm_url.rstrip('/') + '/test'
+ manifest_text = manifest.to_json()
+ result = run_isolated.url_open(test_url, data={'request': manifest_text})
+ if not result:
+ print >> sys.stderr, 'Failed to send test for %s\n%s' % (
+ test_name, test_url)
+ return 1
+ try:
+ json.load(result)
+ except (ValueError, TypeError) as e:
+ print >> sys.stderr, 'Failed to send test for %s' % test_name
+ print >> sys.stderr, 'Manifest: %s' % manifest_text
+ print >> sys.stderr, str(e)
+ return 1
+ return 0
+
+
+def run(cmd, verbose):
+ if verbose:
+ print('Running: %s' % ' '.join(cmd))
+ cmd = [sys.executable, os.path.join(ROOT_DIR, cmd[0])] + cmd[1:]
+ if verbose and sys.platform != 'win32':
+ cmd = ['time', '-p'] + cmd
+ subprocess.check_call(cmd)
+
+
+def trigger_and_return(
Vadim Sh. 2013/08/16 18:58:58 Is this function covered by any test?
M-A Ruel 2013/08/18 00:18:18 Nope. That's why we have a canary master.
+ isolate, isolated, swarm_server, cad_server, slave_os, verbose):
+ """Does the archive, trigger and get results dance."""
+ prefix = getpass.getuser() + '-' + datetime.datetime.now().isoformat() + '-'
+ shards = 1
+
+ if verbose:
+ os.environ.setdefault('ISOLATE_DEBUG', '2')
+
+ tempdir = None
+ try:
+ if not isolated:
+ # A directory is used because isolated + '.state' will also be created.
+ tempdir = tempfile.mkdtemp(prefix='swarm_trigger_and_get_results')
+ isolated = os.path.join(tempdir, 'swarm_trigger.isolated')
+ step_name = os.path.basename(isolate).split('.', 1)[0]
+ else:
+ step_name = os.path.basename(isolated).split('.', 1)[0]
+
+ print('Archiving')
+ cmd = [
+ 'isolate.py',
+ 'hashtable',
+ '--outdir', cad_server,
+ '--isolated', isolated,
+ ]
+ if isolate:
+ cmd.extend(('--isolate', isolate))
+ if slave_os:
+ cmd.extend(('-V', 'OS', run_isolated.FLAVOR_MAPPING[slave_os]))
+ run(cmd, verbose)
+
+ print('\nRunning')
+ hashval = hashlib.sha1(open(isolated, 'rb').read()).hexdigest()
+ cmd = [
+ 'swarm_trigger_step.py',
Vadim Sh. 2013/08/16 18:58:58 'swarm_trigger_step.py' is no more.
M-A Ruel 2013/08/18 00:18:18 This code was not completed yet, as I hadn't sent
+ '--swarm-url', swarm_server,
+ '--test-name-prefix', prefix,
+ '--data-server', cad_server,
+ '--run_from_hash',
+ hashval,
+ step_name,
+ str(shards),
+ '',
+ ]
+ if slave_os:
+ cmd.extend(('--os_image', slave_os))
+ run(cmd, verbose)
+
+ print('\nGetting results')
+ run(
+ [
+ 'swarm_get_results.py',
Vadim Sh. 2013/08/16 18:58:58 Same here. Actually, maybe do it all in the same
+ '--url', swarm_server,
+ prefix + step_name,
+ ],
+ verbose)
+ return 0
+ finally:
+ if tempdir:
+ shutil.rmtree(tempdir)
+
+
+@subcommand.usage('test_name')
+def CMDresults(parser, args):
+ """Retrieves results of a Swarming job.
+
+ The result can be in multiple part if the execution was sharded. It can
+ potentially have retries.
+ """
+ parser.add_option(
+ '-u', '--url', default=SWARM_SERVER,
Vadim Sh. 2013/08/16 18:58:58 --url here, --swarm-url below :(
M-A Ruel 2013/08/18 00:18:18 Yep, being addressed.
+ help='Specify the url of the Swarm server, defaults: %default')
+ parser.add_option(
+ '-t', '--timeout',
+ type='float',
+ default=DEFAULT_SHARD_WAIT_TIME,
+ help='Timeout to wait for result, set to 0 for no timeout; default: '
+ '%default s')
+ # TODO(maruel): Remove once the masters have been updated.
+ parser.add_option(
+ '-s', '--shards',
+ help=optparse.SUPPRESS_HELP)
+
+ (options, args) = parser.parse_args(args)
+ if not args:
+ parser.error('Must specify one test name.')
+ elif len(args) > 1:
+ parser.error('Must specify only one test name.')
+
+ options.url = options.url.rstrip('/')
+ test_name = args[0]
+
+ try:
+ test_keys = get_test_keys(options.url, test_name)
+ except Failure as e:
+ parser.error(e.args[0])
+ if not test_keys:
+ parser.error('No test keys to get results with.')
+
+ options.shards = len(test_keys) if options.shards == -1 else options.shards
+ exit_code = None
+ for _index, output in yield_results(
+ options.url, test_keys, options.timeout, None):
+ print(
+ '%s/%s: %s' % (
+ output['machine_id'], output['machine_tag'], output['exit_codes']))
+ print(''.join(' %s\n' % l for l in output['output'].splitlines()))
+ exit_code = max(exit_code, max(map(int, output['exit_codes'].split(','))))
+
+ return exit_code
+
+
+def CMDrun(parser, args):
+ """Archives a .isolated file, triggers it on Swarm and get the results.
+
+ Basically, everything to run a command remotely.
+ """
+ parser.add_option('-i', '--isolate', help='.isolate file to use')
+ parser.add_option(
+ '-s', '--isolated',
+ help='.isolated file to use. One of -i or -s must be used.')
+ parser.add_option(
+ '-o', '--os_image',
+ metavar='OS',
+ help='Swarm OS image to request. Should be one of the valid sys.platform '
+ 'values like darwin, linux2 or win32.')
+ parser.add_option(
+ '-u', '--swarm-url',
+ metavar='URL',
+ default=SWARM_SERVER,
+ help='Specify the url of the Swarm server. Defaults to %default')
+ parser.add_option(
+ '-d', '--data-server',
+ default=ISOLATE_SERVER,
+ metavar='URL',
+ help='The server where all the test data is stored. Defaults to %default')
+ options, args = parser.parse_args(args)
+
+ if args:
+ parser.error('Use one of -i or -s but no unsupported arguments: %s' % args)
+ if not options.isolate and not options.isolated:
+ parser.error('Use one of -i or -s')
+
+ return trigger_and_return(
+ options.isolate,
+ options.isolated,
+ options.swarm_url,
+ options.data_server,
+ options.os_image,
+ options.verbose)
+
+
+def CMDtrigger(parser, args):
+ """Triggers a Swarm request based off of a .isolated file.
+
+ This script takes a .isolated file, packages it, and sends a Swarm manifest
+ file to the Swarm server. This is expected to be called as a build step with
+ the cwd as the parent of the src/ directory.
+ """
+ parser.add_option('-w', '--working_dir', default='swarm_tests',
+ help='Desired working direction on the swarm slave side. '
Vadim Sh. 2013/08/16 18:58:58 'direction'? Also why do we need this argument?
M-A Ruel 2013/08/18 00:18:18 Copy-pasted code from the original script.
csharp 2013/08/19 13:46:04 This lets swarm know what directory to download, u
+ 'Defaults to %default.')
+ parser.add_option('-o', '--os_image',
+ help='Swarm OS image to request.')
+ parser.add_option('-u', '--swarm-url', default=SWARM_SERVER,
+ help='Specify the url of the Swarm server. '
+ 'Defaults to %default')
+ parser.add_option('-d', '--data-server',
Vadim Sh. 2013/08/16 18:58:58 At some point we need to converge on single term f
M-A Ruel 2013/08/18 00:18:18 Doing now.
+ help='The server where all the test data is stored.')
+ parser.add_option('-t', '--test-name-prefix', default='',
+ help='Specify the prefix to give the swarm test request. '
+ 'Defaults to %default')
+ parser.add_option('--run_from_hash', nargs=4, action='append', default=[],
+ help='Specify a hash to run on swarm. The format is '
+ '(hash, hash_test_name, shards, test_filter). This may be '
+ 'used multiple times to send multiple hashes.')
+ parser.add_option('--profile', action='store_true',
+ default=bool(os.environ.get('ISOLATE_DEBUG')),
+ help='Have run_isolated.py print profiling info')
+ parser.add_option('--priority', type='int', default=100,
+ help='The lower value, the more important the task is')
+ (options, args) = parser.parse_args(args)
+
+ if args:
+ parser.error('Unknown args: %s' % args)
+ if not options.data_server:
+ parser.error('Must specify the data directory')
+ if not options.run_from_hash:
+ parser.error('At least one --run_from_hash is required.')
+
+ if not options.os_image or options.os_image == 'None':
+ # This means the Try Server/user wants to use the current OS.
+ options.os_image = sys.platform
+
+ highest_exit_code = 0
+ # Send off the hash swarm test requests.
+ for (file_sha1, test_name, shards, testfilter) in options.run_from_hash:
+ exit_code = process_manifest(
+ file_sha1,
+ options.test_name_prefix + test_name,
+ int(shards),
+ testfilter,
+ options.os_image,
+ options.working_dir,
+ options.data_server,
+ options.swarm_url,
+ options.verbose,
+ options.profile,
+ options.priority)
+ highest_exit_code = max(highest_exit_code, exit_code)
+ return highest_exit_code
+
+
+def main(args):
+ dispatcher = subcommand.CommandDispatcher(__name__)
+ try:
+ return dispatcher.execute(
+ trace_inputs.OptionParserWithLogging(prog='swarming.py'), args)
Vadim Sh. 2013/08/16 18:58:58 I'm really looking forward to a time when we'll ha
M-A Ruel 2013/08/18 00:18:18 Yes but that won't happen until run_isolated.py is
+ except (
+ Failure,
+ run_isolated.MappingError,
+ run_isolated.ConfigError) as e:
+ sys.stderr.write('\nError: ')
+ sys.stderr.write(str(e))
+ sys.stderr.write('\n')
+ return 1
+
+
+if __name__ == '__main__':
+ fix_encoding.fix_encoding()
+ run_isolated.disable_buffering()
+ sys.exit(main(sys.argv[1:]))

Powered by Google App Engine
This is Rietveld 408576698