Chromium Code Reviews| Index: client/swarming.py |
| diff --git a/client/swarming.py b/client/swarming.py |
| index b88931fde8e118b8d7116549e1f29d9bb0fbb65c..1df3509294b4e4edb8beb53406af3aa75aedb993 100755 |
| --- a/client/swarming.py |
| +++ b/client/swarming.py |
| @@ -5,7 +5,7 @@ |
| """Client tool to trigger tasks or retrieve results from a Swarming server.""" |
| -__version__ = '0.6.3' |
| +__version__ = '0.8.2' |
| import collections |
| import datetime |
| @@ -13,16 +13,11 @@ import json |
| import logging |
| import optparse |
| import os |
| -import re |
| -import shutil |
| -import StringIO |
| import subprocess |
| import sys |
| import threading |
| import time |
| import urllib |
| -import urlparse |
| -import zipfile |
| from third_party import colorama |
| from third_party.depot_tools import fix_encoding |
| @@ -35,12 +30,10 @@ from utils import net |
| from utils import on_error |
| from utils import threading_utils |
| from utils import tools |
| -from utils import zip_package |
| import auth |
| import isolated_format |
| import isolateserver |
| -import run_isolated |
| ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) |
| @@ -54,101 +47,14 @@ class Failure(Exception): |
| ### Isolated file handling. |
| -def isolated_upload_zip_bundle(isolate_server, bundle): |
| - """Uploads a zip package to Isolate Server and returns raw fetch URL. |
| - |
| - Args: |
| - isolate_server: URL of an Isolate Server. |
| - bundle: instance of ZipPackage to upload. |
| - |
| - Returns: |
| - URL to get the file from. |
| - """ |
| - # Swarming bot needs to be able to grab the file from the Isolate Server using |
| - # a simple HTTPS GET. Use 'default' namespace so that the raw data returned to |
| - # a bot is not zipped, since the swarming_bot doesn't understand compressed |
| - # data. This namespace have nothing to do with |namespace| passed to |
| - # run_isolated.py that is used to store files for isolated task. |
| - logging.info('Zipping up and uploading files...') |
| - start_time = time.time() |
| - isolate_item = isolateserver.BufferItem(bundle.zip_into_buffer()) |
| - with isolateserver.get_storage(isolate_server, 'default') as storage: |
| - uploaded = storage.upload_items([isolate_item]) |
| - bundle_url = storage.get_fetch_url(isolate_item) |
| - elapsed = time.time() - start_time |
| - if isolate_item in uploaded: |
| - logging.info('Upload complete, time elapsed: %f', elapsed) |
| - else: |
| - logging.info('Zip file already on server, time elapsed: %f', elapsed) |
| - return bundle_url |
| - |
| - |
| -def isolated_get_data(isolate_server): |
| - """Returns the 'data' section with all files necessary to bootstrap a task |
| - execution running an isolated task. |
| - |
| - It's mainly zipping run_isolated.zip over and over again. |
| - TODO(maruel): Get rid of this with. |
| - https://code.google.com/p/swarming/issues/detail?id=173 |
| - """ |
| - bundle = zip_package.ZipPackage(ROOT_DIR) |
| - bundle.add_buffer( |
| - 'run_isolated.zip', |
| - run_isolated.get_as_zip_package().zip_into_buffer(compress=False)) |
| - bundle_url = isolated_upload_zip_bundle(isolate_server, bundle) |
| - return [(bundle_url, 'swarm_data.zip')] |
| - |
| - |
| -def isolated_get_run_commands( |
| - isolate_server, namespace, isolated_hash, extra_args, verbose): |
| - """Returns the 'commands' to run an isolated task via run_isolated.zip. |
| - |
| - Returns: |
| - commands list to be added to the request. |
| - """ |
| - run_cmd = [ |
| - 'python', 'run_isolated.zip', |
| - '--isolated', isolated_hash, |
| - '--isolate-server', isolate_server, |
| - '--namespace', namespace, |
| - ] |
| - if verbose: |
| - run_cmd.append('--verbose') |
| - # Pass all extra args for run_isolated.py, it will pass them to the command. |
| - if extra_args: |
| - run_cmd.append('--') |
| - run_cmd.extend(extra_args) |
| - return run_cmd |
| - |
| - |
| -def isolated_archive(isolate_server, namespace, isolated, algo, verbose): |
| - """Archives a .isolated and all the dependencies on the Isolate Server.""" |
| - logging.info( |
| - 'isolated_archive(%s, %s, %s)', isolate_server, namespace, isolated) |
| - print('Archiving: %s' % isolated) |
| - cmd = [ |
| - sys.executable, |
| - os.path.join(ROOT_DIR, 'isolate.py'), |
| - 'archive', |
| - '--isolate-server', isolate_server, |
| - '--namespace', namespace, |
| - '--isolated', isolated, |
| - ] |
| - cmd.extend(['--verbose'] * verbose) |
| - logging.info(' '.join(cmd)) |
| - if subprocess.call(cmd, verbose): |
| - return None |
| - return isolated_format.hash_file(isolated, algo) |
| - |
| - |
| -def isolated_to_hash(isolate_server, namespace, arg, algo, verbose): |
| +def isolated_to_hash(arg, algo): |
| """Archives a .isolated file if needed. |
| Returns the file hash to trigger and a bool specifying if it was a file (True) |
| or a hash (False). |
| """ |
| if arg.endswith('.isolated'): |
| - file_hash = isolated_archive(isolate_server, namespace, arg, algo, verbose) |
| + file_hash = isolated_format.hash_file(arg, algo) |
| if not file_hash: |
| on_error.report('Archival failure %s' % arg) |
| return None, True |
| @@ -164,7 +70,7 @@ def isolated_handle_options(options, args): |
| """Handles '--isolated <isolated>', '<isolated>' and '-- <args...>' arguments. |
| Returns: |
| - tuple(command, data). |
| + tuple(command, inputs_ref). |
| """ |
| isolated_cmd_args = [] |
| if not options.isolated: |
| @@ -182,8 +88,7 @@ def isolated_handle_options(options, args): |
| 'process.') |
| # Old code. To be removed eventually. |
| options.isolated, is_file = isolated_to_hash( |
| - options.isolate_server, options.namespace, args[0], |
| - isolated_format.get_hash_algo(options.namespace), options.verbose) |
| + args[0], isolated_format.get_hash_algo(options.namespace)) |
| if not options.isolated: |
| raise ValueError('Invalid argument %s' % args[0]) |
| elif args: |
| @@ -197,10 +102,6 @@ def isolated_handle_options(options, args): |
| # optparse eats '--' sometimes. |
| isolated_cmd_args = args |
| - command = isolated_get_run_commands( |
| - options.isolate_server, options.namespace, options.isolated, |
| - isolated_cmd_args, options.verbose) |
| - |
| # If a file name was passed, use its base name of the isolated hash. |
| # Otherwise, use user name as an approximation of a task name. |
| if not options.task_name: |
| @@ -215,76 +116,86 @@ def isolated_handle_options(options, args): |
| for k, v in sorted(options.dimensions.iteritems())), |
| options.isolated) |
| - try: |
| - data = isolated_get_data(options.isolate_server) |
| - except (IOError, OSError): |
| - on_error.report('Failed to upload the zip file') |
| - raise ValueError('Failed to upload the zip file') |
| - |
| - return command, data |
| + inputs_ref = FilesRef( |
| + isolated=options.isolated, |
| + isolatedserver=options.isolate_server, |
| + namespace=options.namespace) |
| + return isolated_cmd_args, inputs_ref |
| ### Triggering. |
| -TaskRequest = collections.namedtuple( |
| - 'TaskRequest', |
| +# See ../appengine/swarming/swarming_rpcs.py. |
| +FilesRef = collections.namedtuple( |
| + 'FilesRef', |
| + [ |
| + 'isolated', |
| + 'isolatedserver', |
| + 'namespace', |
| + ]) |
| + |
| + |
| +# See ../appengine/swarming/swarming_rpcs.py. |
| +TaskProperties = collections.namedtuple( |
| + 'TaskProperties', |
| [ |
| 'command', |
| - 'data', |
| 'dimensions', |
| 'env', |
| - 'expiration', |
| - 'hard_timeout', |
| + 'execution_timeout_secs', |
| + 'extra_args', |
| + 'grace_period_secs', |
| 'idempotent', |
| - 'io_timeout', |
| + 'inputs_ref', |
| + 'io_timeout_secs', |
| + ]) |
| + |
| + |
| +# See ../appengine/swarming/swarming_rpcs.py. |
| +NewTaskRequest = collections.namedtuple( |
| + 'NewTaskRequest', |
| + [ |
| + 'expiration_secs', |
| 'name', |
| + 'parent_task_id', |
| 'priority', |
| + 'properties', |
| 'tags', |
| 'user', |
| - 'verbose', |
| ]) |
| +def namedtuple_to_dict(value): |
| + """Recursively converts a namedtuple to a dict.""" |
| + out = dict(value._asdict()) |
| + for k, v in out.iteritems(): |
| + if hasattr(v, '_asdict'): |
| + out[k] = namedtuple_to_dict(v) |
| + return out |
| + |
| + |
| def task_request_to_raw_request(task_request): |
| """Returns the json dict expected by the Swarming server for new request. |
| This is for the v1 client Swarming API. |
| """ |
| - return { |
| - 'name': task_request.name, |
| - 'parent_task_id': os.environ.get('SWARMING_TASK_ID', ''), |
| - 'priority': task_request.priority, |
| - 'properties': { |
| - 'commands': [task_request.command], |
| - 'data': task_request.data, |
| - 'dimensions': task_request.dimensions, |
| - 'env': task_request.env, |
| - 'execution_timeout_secs': task_request.hard_timeout, |
| - 'io_timeout_secs': task_request.io_timeout, |
| - 'idempotent': task_request.idempotent, |
| - }, |
| - 'scheduling_expiration_secs': task_request.expiration, |
| - 'tags': task_request.tags, |
| - 'user': task_request.user, |
| - } |
| - |
| - |
| -def swarming_handshake(swarming): |
| - """Initiates the connection to the Swarming server.""" |
| - headers = {'X-XSRF-Token-Request': '1'} |
| - response = net.url_read_json( |
| - swarming + '/swarming/api/v1/client/handshake', |
| - headers=headers, |
| - data={}) |
| - if not response: |
| - logging.error('Failed to handshake with server') |
| - return None |
| - logging.info('Connected to server version: %s', response['server_version']) |
| - return response['xsrf_token'] |
| + out = namedtuple_to_dict(task_request) |
| + # Maps are not supported until protobuf v3. |
| + out['properties']['dimensions'] = [ |
| + {'key': k, 'value': v} |
| + for k, v in out['properties']['dimensions'].iteritems() |
| + ] |
| + out['properties']['dimensions'].sort(key=lambda x: x['key']) |
| + out['properties']['env'] = [ |
| + {'key': k, 'value': v} |
| + for k, v in out['properties']['env'].iteritems() |
| + ] |
| + out['properties']['env'].sort(key=lambda x: x['key']) |
| + return out |
| -def swarming_trigger(swarming, raw_request, xsrf_token): |
| +def swarming_trigger(swarming, raw_request): |
| """Triggers a request on the Swarming server and returns the json data. |
| It's the low-level function. |
| @@ -300,11 +211,8 @@ def swarming_trigger(swarming, raw_request, xsrf_token): |
| """ |
| logging.info('Triggering: %s', raw_request['name']) |
| - headers = {'X-XSRF-Token': xsrf_token} |
| result = net.url_read_json( |
| - swarming + '/swarming/api/v1/client/request', |
| - data=raw_request, |
| - headers=headers) |
| + swarming + '/_ah/api/swarming/v1/tasks/new', data=raw_request) |
| if not result: |
| on_error.report('Failed to trigger task %s' % raw_request['name']) |
| return None |
| @@ -314,9 +222,9 @@ def swarming_trigger(swarming, raw_request, xsrf_token): |
| def setup_googletest(env, shards, index): |
| """Sets googletest specific environment variables.""" |
| if shards > 1: |
| - env = env.copy() |
| - env['GTEST_SHARD_INDEX'] = str(index) |
| - env['GTEST_TOTAL_SHARDS'] = str(shards) |
| + env = env[:] |
| + env.append({'key': 'GTEST_SHARD_INDEX', 'value': str(index)}) |
|
Vadim Sh.
2015/09/14 17:46:52
nit: override existing GTEST_SHARD_INDEX and GTEST
M-A Ruel
2015/09/15 14:13:54
Added asserts that they are not present instead. I
|
| + env.append({'key': 'GTEST_TOTAL_SHARDS', 'value': str(shards)}) |
| return env |
| @@ -328,21 +236,18 @@ def trigger_task_shards(swarming, task_request, shards): |
| None in case of failure. |
| """ |
| def convert(index): |
| - req = task_request |
| + req = task_request_to_raw_request(task_request) |
| if shards > 1: |
| - req = req._replace( |
| - env=setup_googletest(req.env, shards, index), |
| - name='%s:%s:%s' % (req.name, index, shards)) |
| - return task_request_to_raw_request(req) |
| + req['properties']['env'] = setup_googletest( |
| + req['properties']['env'], shards, index) |
| + req['name'] += ':%s:%s' % (index, shards) |
| + return req |
| requests = [convert(index) for index in xrange(shards)] |
| - xsrf_token = swarming_handshake(swarming) |
| - if not xsrf_token: |
| - return None |
| tasks = {} |
| priority_warning = False |
| for index, request in enumerate(requests): |
| - task = swarming_trigger(swarming, request, xsrf_token) |
| + task = swarming_trigger(swarming, request) |
| if not task: |
| break |
| logging.info('Request result: %s', task) |
| @@ -392,11 +297,14 @@ class State(object): |
| CANCELED = 0x60 |
| COMPLETED = 0x70 |
| - STATES = (RUNNING, PENDING, EXPIRED, TIMED_OUT, BOT_DIED, CANCELED, COMPLETED) |
| - STATES_RUNNING = (RUNNING, PENDING) |
| - STATES_NOT_RUNNING = (EXPIRED, TIMED_OUT, BOT_DIED, CANCELED, COMPLETED) |
| - STATES_DONE = (TIMED_OUT, COMPLETED) |
| - STATES_ABANDONED = (EXPIRED, BOT_DIED, CANCELED) |
| + STATES = ( |
| + 'RUNNING', 'PENDING', 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED', |
| + 'COMPLETED') |
| + STATES_RUNNING = ('RUNNING', 'PENDING') |
| + STATES_NOT_RUNNING = ( |
| + 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED', 'COMPLETED') |
| + STATES_DONE = ('TIMED_OUT', 'COMPLETED') |
| + STATES_ABANDONED = ('EXPIRED', 'BOT_DIED', 'CANCELED') |
| _NAMES = { |
| RUNNING: 'Running', |
| @@ -408,6 +316,16 @@ class State(object): |
| COMPLETED: 'Completed', |
| } |
| + _ENUMS = { |
| + 'RUNNING': RUNNING, |
| + 'PENDING': PENDING, |
| + 'EXPIRED': EXPIRED, |
| + 'TIMED_OUT': TIMED_OUT, |
| + 'BOT_DIED': BOT_DIED, |
| + 'CANCELED': CANCELED, |
| + 'COMPLETED': COMPLETED, |
| + } |
| + |
| @classmethod |
| def to_string(cls, state): |
| """Returns a user-readable string representing a State.""" |
| @@ -415,6 +333,13 @@ class State(object): |
| raise ValueError('Invalid state %s' % state) |
| return cls._NAMES[state] |
| + @classmethod |
| + def from_enum(cls, state): |
| + """Returns int value based on the string.""" |
| + if state not in cls._ENUMS: |
| + raise ValueError('Invalid state %s' % state) |
| + return cls._ENUMS[state] |
| + |
| class TaskOutputCollector(object): |
| """Assembles task execution summary (for --task-summary-json output). |
| @@ -450,6 +375,8 @@ class TaskOutputCollector(object): |
| Modifies |result| in place. |
| + shard_index is 0-based. |
| + |
| Called concurrently from multiple threads. |
| """ |
| # Sanity check index is in expected range. |
| @@ -460,14 +387,12 @@ class TaskOutputCollector(object): |
| shard_index, self.shard_count - 1) |
| return |
| - assert not 'isolated_out' in result |
| - result['isolated_out'] = None |
| - for output in result['outputs']: |
| - isolated_files_location = extract_output_files_location(output) |
| - if isolated_files_location: |
| - if result['isolated_out']: |
| - raise ValueError('Unexpected two task with output') |
| - result['isolated_out'] = isolated_files_location |
| + if result.get('outputs_ref'): |
| + ref = result['outputs_ref'] |
| + result['outputs_ref']['view_url'] = '%s/browse?%s' % ( |
| + ref['isolatedserver'], |
| + urllib.urlencode( |
| + [('namespace', ref['namespace']), ('hash', ref['isolated'])])) |
| # Store result dict of that shard, ignore results we've already seen. |
| with self._lock: |
| @@ -477,16 +402,16 @@ class TaskOutputCollector(object): |
| self._per_shard_results[shard_index] = result |
| # Fetch output files if necessary. |
| - if self.task_output_dir and result['isolated_out']: |
| + if self.task_output_dir and result.get('outputs_ref'): |
| storage = self._get_storage( |
| - result['isolated_out']['server'], |
| - result['isolated_out']['namespace']) |
| + result['outputs_ref']['isolatedserver'], |
| + result['outputs_ref']['namespace']) |
| if storage: |
| # Output files are supposed to be small and they are not reused across |
| # tasks. So use MemoryCache for them instead of on-disk cache. Make |
| # files writable, so that calling script can delete them. |
| isolateserver.fetch_isolated( |
| - result['isolated_out']['hash'], |
| + result['outputs_ref']['isolated'], |
| storage, |
| isolateserver.MemoryCache(file_mode_mask=0700), |
| os.path.join(self.task_output_dir, str(shard_index)), |
| @@ -533,58 +458,23 @@ class TaskOutputCollector(object): |
| return self._storage |
| -def extract_output_files_location(task_log): |
| - """Task log -> location of task output files to fetch. |
| - |
| - TODO(vadimsh,maruel): Use side-channel to get this information. |
| - See 'run_tha_test' in run_isolated.py for where the data is generated. |
| - |
| - Returns: |
| - Tuple (isolate server URL, namespace, isolated hash) on success. |
| - None if information is missing or can not be parsed. |
| - """ |
| - if not task_log: |
| - return None |
| - match = re.search( |
| - r'\[run_isolated_out_hack\](.*)\[/run_isolated_out_hack\]', |
| - task_log, |
| - re.DOTALL) |
| - if not match: |
| - return None |
| - |
| - def to_ascii(val): |
| - if not isinstance(val, basestring): |
| - raise ValueError() |
| - return val.encode('ascii') |
| - |
| - try: |
| - data = json.loads(match.group(1)) |
| - if not isinstance(data, dict): |
| - raise ValueError() |
| - isolated_hash = to_ascii(data['hash']) |
| - namespace = to_ascii(data['namespace']) |
| - isolate_server = to_ascii(data['storage']) |
| - if not file_path.is_url(isolate_server): |
| - raise ValueError() |
| - data = { |
| - 'hash': isolated_hash, |
| - 'namespace': namespace, |
| - 'server': isolate_server, |
| - 'view_url': '%s/browse?%s' % (isolate_server, urllib.urlencode( |
| - [('namespace', namespace), ('hash', isolated_hash)])), |
| - } |
| - return data |
| - except (KeyError, ValueError): |
| - logging.warning( |
| - 'Unexpected value of run_isolated_out_hack: %s', match.group(1)) |
| - return None |
| - |
| - |
| def now(): |
| """Exists so it can be mocked easily.""" |
| return time.time() |
| +def parse_time(value): |
| + """Converts serialized time from the API to datetime.datetime.""" |
| + # When microseconds are 0, the '.123456' suffix is elided. This means the |
| + # serialized format is not consistent, which confuses the hell out of python. |
| + for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'): |
| + try: |
| + return datetime.datetime.strptime(value, fmt) |
| + except ValueError: |
| + pass |
| + raise ValueError('Failed to parse %s' % value) |
| + |
| + |
| def retrieve_results( |
| base_url, shard_index, task_id, timeout, should_stop, output_collector): |
| """Retrieves results for a single task ID. |
| @@ -594,9 +484,8 @@ def retrieve_results( |
| None on failure. |
| """ |
| assert isinstance(timeout, float), timeout |
| - result_url = '%s/swarming/api/v1/client/task/%s' % (base_url, task_id) |
| - output_url = '%s/swarming/api/v1/client/task/%s/output/all' % ( |
| - base_url, task_id) |
| + result_url = '%s/_ah/api/swarming/v1/task/%s/result' % (base_url, task_id) |
| + output_url = '%s/_ah/api/swarming/v1/task/%s/stdout' % (base_url, task_id) |
| started = now() |
| deadline = started + timeout if timeout else None |
| attempt = 0 |
| @@ -629,10 +518,12 @@ def retrieve_results( |
| result = net.url_read_json(result_url, retry_50x=False) |
| if not result: |
| continue |
| + |
| if result['state'] in State.STATES_NOT_RUNNING: |
| + # TODO(maruel): Not always fetch stdout? |
| out = net.url_read_json(output_url) |
| - result['outputs'] = (out or {}).get('outputs', []) |
| - if not result['outputs']: |
| + result['output'] = out['output'] if out else out |
| + if not result['output']: |
| logging.error('No output found for task %s', task_id) |
| # Record the result, try to fetch attached output files (if any). |
| if output_collector: |
| @@ -641,6 +532,51 @@ def retrieve_results( |
| return result |
| +def convert_to_old_format(result): |
| + """Converts the task result data from Endpoints API format to old API format |
| + for compatibility. |
| + |
| + This goes into the file generated as --task-summary-json. |
| + """ |
| + # Sets default. |
| + result.setdefault('abandoned_ts', None) |
| + result.setdefault('bot_id', None) |
| + result.setdefault('bot_version', None) |
| + result.setdefault('children_task_ids', []) |
| + result.setdefault('completed_ts', None) |
| + result.setdefault('cost_saved_usd', None) |
| + result.setdefault('costs_usd', None) |
| + result.setdefault('deduped_from', None) |
| + result.setdefault('name', None) |
| + result.setdefault('outputs_ref', None) |
| + result.setdefault('properties_hash', None) |
| + result.setdefault('server_versions', None) |
| + result.setdefault('started_ts', None) |
| + result.setdefault('tags', None) |
| + result.setdefault('user', None) |
| + |
| + # Convertion back to old API. |
| + duration = result.pop('duration', None) |
| + result['durations'] = [duration] if duration else [] |
| + exit_code = result.pop('exit_code', None) |
| + result['exit_codes'] = [int(exit_code)] if exit_code else [] |
| + result['id'] = result.pop('task_id') |
| + result['isolated_out'] = result.get('outputs_ref', None) |
| + output = result.pop('output', None) |
| + result['outputs'] = [output] if output else [] |
| + # properties_hash |
| + # server_version |
| + # Endpoints result 'state' as string. For compatibility with old code, convert |
| + # to int. |
| + result['state'] = State.from_enum(result['state']) |
| + # tags |
| + result['try_number'] = ( |
| + int(result['try_number']) if result['try_number'] else None) |
| + result['bot_dimensions'] = { |
| + i['key']: i['value'] for i in result['bot_dimensions'] |
| + } |
| + |
| + |
| def yield_results( |
| swarm_base_url, task_ids, timeout, max_threads, print_status_updates, |
| output_collector): |
| @@ -716,27 +652,27 @@ def yield_results( |
| def decorate_shard_output(swarming, shard_index, metadata): |
| """Returns wrapped output for swarming task shard.""" |
| - def t(d): |
| - return datetime.datetime.strptime(d, '%Y-%m-%d %H:%M:%S') |
| - if metadata.get('started_ts'): |
| + if metadata.get('started_ts') and not metadata.get('deduped_from'): |
| pending = '%.1fs' % ( |
| - t(metadata['started_ts']) - t(metadata['created_ts'])).total_seconds() |
| + parse_time(metadata['started_ts']) - parse_time(metadata['created_ts']) |
| + ).total_seconds() |
| else: |
| pending = 'N/A' |
| - if metadata.get('durations'): |
| - duration = '%.1fs' % metadata['durations'][0] |
| + if metadata.get('duration') is not None: |
| + duration = '%.1fs' % metadata['duration'] |
| else: |
| duration = 'N/A' |
| - if metadata.get('exit_codes'): |
| - exit_code = '%d' % metadata['exit_codes'][0] |
| + if metadata.get('exit_code') is not None: |
| + # Integers are encoded as string to not loose precision. |
| + exit_code = '%s' % metadata['exit_code'] |
| else: |
| exit_code = 'N/A' |
| bot_id = metadata.get('bot_id') or 'N/A' |
| - url = '%s/user/task/%s' % (swarming, metadata['id']) |
| + url = '%s/user/task/%s' % (swarming, metadata['task_id']) |
| tag_header = 'Shard %d %s' % (shard_index, url) |
| tag_footer = ( |
| 'End of shard %d Pending: %s Duration: %s Bot: %s Exit: %s' % ( |
| @@ -749,7 +685,7 @@ def decorate_shard_output(swarming, shard_index, metadata): |
| header = dash_pad + tag_header + dash_pad |
| footer = dash_pad + tag_footer + dash_pad[:-1] |
| - output = '\n'.join(o for o in metadata['outputs'] if o).rstrip() + '\n' |
| + output = metadata['output'].rstrip() + '\n' |
| return header + output + footer |
| @@ -771,34 +707,33 @@ def collect( |
| seen_shards.add(index) |
| # Default to failure if there was no process that even started. |
| - shard_exit_code = 1 |
| - if metadata.get('exit_codes'): |
| - shard_exit_code = metadata['exit_codes'][0] |
| + shard_exit_code = metadata.get('exit_code') |
| + if shard_exit_code: |
| + shard_exit_code = int(shard_exit_code) |
| if shard_exit_code: |
| exit_code = shard_exit_code |
| - if metadata.get('durations'): |
| - total_duration += metadata['durations'][0] |
| + total_duration += metadata.get('duration', 0) |
| if decorate: |
| print(decorate_shard_output(swarming, index, metadata)) |
| if len(seen_shards) < len(task_ids): |
| print('') |
| else: |
| - if metadata.get('exit_codes'): |
| - exit_code = metadata['exit_codes'][0] |
| - else: |
| - exit_code = 'N/A' |
| - print('%s: %s %s' % |
| - (metadata.get('bot_id') or 'N/A', metadata['id'], exit_code)) |
| - for output in metadata['outputs']: |
| - if not output: |
| - continue |
| - output = output.rstrip() |
| + print('%s: %s %s' % ( |
| + metadata.get('bot_id', 'N/A'), |
| + metadata['task_id'], |
| + shard_exit_code)) |
| + if metadata['output']: |
| + output = metadata['output'].rstrip() |
| if output: |
| print(''.join(' %s\n' % l for l in output.splitlines())) |
| finally: |
| summary = output_collector.finalize() |
| if task_summary_json: |
| + # TODO(maruel): Make this optional. |
| + for i in summary['shards']: |
| + if i: |
| + convert_to_old_format(i) |
| tools.write_json(task_summary_json, summary, False) |
| if decorate and total_duration: |
| @@ -813,6 +748,36 @@ def collect( |
| return exit_code |
| +### API management. |
| + |
| + |
| +class APIError(Exception): |
| + pass |
| + |
| + |
| +def endpoints_api_discovery_apis(host): |
| + """Uses Cloud Endpoints' API Discovery Service to returns metadata about all |
| + the APIs exposed by a host. |
| + |
| + https://developers.google.com/discovery/v1/reference/apis/list |
| + """ |
| + data = net.url_read_json(host + '/_ah/api/discovery/v1/apis') |
| + if data is None: |
| + raise APIError('Failed to discover APIs on %s' % host) |
| + out = {} |
| + for api in data['items']: |
| + if api['id'] == 'discovery:v1': |
| + continue |
| + # URL is of the following form: |
| + # url = host + ( |
| + # '/_ah/api/discovery/v1/apis/%s/%s/rest' % (api['id'], api['version']) |
| + api_data = net.url_read_json(api['discoveryRestUrl']) |
| + if api_data is None: |
| + raise APIError('Failed to discover %s on %s' % (api['id'], host)) |
| + out[api['id']] = api_data |
| + return out |
| + |
| + |
| ### Commands. |
| @@ -896,7 +861,6 @@ def process_trigger_options(parser, options, args): |
| options.dimensions = dict(options.dimensions) |
| options.env = dict(options.env) |
| - data = [] |
| if not options.dimensions: |
| parser.error('Please at least specify one --dimension') |
| if options.raw_cmd: |
| @@ -914,27 +878,34 @@ def process_trigger_options(parser, options, args): |
| '_'.join( |
| '%s=%s' % (k, v) |
| for k, v in sorted(options.dimensions.iteritems()))) |
| + inputs_ref = None |
| else: |
| isolateserver.process_isolate_server_options(parser, options, False) |
| try: |
| - command, data = isolated_handle_options(options, args) |
| + command, inputs_ref = isolated_handle_options(options, args) |
| except ValueError as e: |
| parser.error(str(e)) |
| - return TaskRequest( |
| - command=command, |
| - data=data, |
| + # If inputs_ref is used, command is actually extra_args. Otherwise it's an |
| + # actual command to run. |
| + properties = TaskProperties( |
| + command=None if inputs_ref else command, |
| dimensions=options.dimensions, |
| env=options.env, |
| - expiration=options.expiration, |
| - hard_timeout=options.hard_timeout, |
| + execution_timeout_secs=options.hard_timeout, |
| + extra_args=command if inputs_ref else None, |
| + grace_period_secs=30, |
| idempotent=options.idempotent, |
| - io_timeout=options.io_timeout, |
| + inputs_ref=inputs_ref, |
| + io_timeout_secs=options.io_timeout) |
| + return NewTaskRequest( |
| + expiration_secs=options.expiration, |
| name=options.task_name, |
| + parent_task_id=os.environ.get('SWARMING_TASK_ID', ''), |
| priority=options.priority, |
| + properties=properties, |
| tags=options.tags, |
| - user=options.user, |
| - verbose=options.verbose) |
| + user=options.user) |
| def add_collect_options(parser): |
| @@ -986,7 +957,7 @@ def CMDbot_delete(parser, args): |
| result = 0 |
| for bot in bots: |
| - url = '%s/swarming/api/v1/client/bot/%s' % (options.swarming, bot) |
| + url = '%s/_ah/api/swarming/v1/bot/%s' % (options.swarming, bot) |
| if net.url_read_json(url, method='DELETE') is None: |
| print('Deleting %s failed' % bot) |
| result = 1 |
| @@ -1014,7 +985,8 @@ def CMDbots(parser, args): |
| cursor = None |
| limit = 250 |
| # Iterate via cursors. |
| - base_url = options.swarming + '/swarming/api/v1/client/bots?limit=%d' % limit |
| + base_url = ( |
| + options.swarming + '/_ah/api/swarming/v1/bots/list?limit=%d' % limit) |
| while True: |
| url = base_url |
| if cursor: |
| @@ -1024,20 +996,20 @@ def CMDbots(parser, args): |
| print >> sys.stderr, 'Failed to access %s' % options.swarming |
| return 1 |
| bots.extend(data['items']) |
| - cursor = data['cursor'] |
| + cursor = data.get('cursor') |
| if not cursor: |
| break |
| - for bot in natsort.natsorted(bots, key=lambda x: x['id']): |
| + for bot in natsort.natsorted(bots, key=lambda x: x['bot_id']): |
| if options.dead_only: |
| - if not bot['is_dead']: |
| + if not bot.get('is_dead'): |
| continue |
| - elif not options.keep_dead and bot['is_dead']: |
| + elif not options.keep_dead and bot.get('is_dead'): |
| continue |
| # If the user requested to filter on dimensions, ensure the bot has all the |
| # dimensions requested. |
| - dimensions = bot['dimensions'] |
| + dimensions = {i['key']: i['value'] for i in bot['dimensions']} |
| for key, value in options.dimensions: |
| if key not in dimensions: |
| break |
| @@ -1051,7 +1023,7 @@ def CMDbots(parser, args): |
| if value != dimensions[key]: |
| break |
| else: |
| - print bot['id'] |
| + print bot['bot_id'] |
| if not options.bare: |
| print ' %s' % json.dumps(dimensions, sort_keys=True) |
| if bot.get('task_id'): |
| @@ -1070,7 +1042,7 @@ def CMDcollect(parser, args): |
| parser.add_option( |
| '-j', '--json', |
| help='Load the task ids from .json as saved by trigger --dump-json') |
| - (options, args) = parser.parse_args(args) |
| + options, args = parser.parse_args(args) |
| if not args and not options.json: |
| parser.error('Must specify at least one task id or --json.') |
| if args and options.json: |
| @@ -1104,17 +1076,17 @@ def CMDcollect(parser, args): |
| return 1 |
| -@subcommand.usage('[resource name]') |
| +@subcommand.usage('[method name]') |
| def CMDquery(parser, args): |
| - """Returns raw JSON information via an URL endpoint. Use 'list' to gather the |
| - list of valid values from the server. |
| + """Returns raw JSON information via an URL endpoint. Use 'query-list' to |
| + gather the list of API methods from the server. |
| Examples: |
| - Printing the list of known URLs: |
| - swarming.py query -S https://server-url list |
| + Listing all bots: |
| + swarming.py query -S https://server-url bots/list |
| - Listing last 50 tasks on a specific bot named 'swarm1' |
| - swarming.py query -S https://server-url --limit 50 bot/swarm1/tasks |
| + Listing last 10 tasks on a specific bot named 'swarm1': |
| + swarming.py query -S https://server-url --limit 10 bot/swarm1/tasks |
| """ |
| CHUNK_SIZE = 250 |
| @@ -1124,11 +1096,15 @@ def CMDquery(parser, args): |
| 'default=%default') |
| parser.add_option( |
| '--json', help='Path to JSON output file (otherwise prints to stdout)') |
| - (options, args) = parser.parse_args(args) |
| + parser.add_option( |
| + '--progress', action='store_true', |
| + help='Prints a dot at each request to show progress') |
| + options, args = parser.parse_args(args) |
| if len(args) != 1: |
| - parser.error('Must specify only one resource name.') |
| - |
| - base_url = options.swarming + '/swarming/api/v1/client/' + args[0] |
| + parser.error( |
| + 'Must specify only method name and optionally query args properly ' |
| + 'escaped.') |
| + base_url = options.swarming + '/_ah/api/swarming/v1/' + args[0] |
| url = base_url |
| if options.limit: |
| # Check check, change if not working out. |
| @@ -1136,7 +1112,8 @@ def CMDquery(parser, args): |
| url += '%slimit=%d' % (merge_char, min(CHUNK_SIZE, options.limit)) |
| data = net.url_read_json(url) |
| if data is None: |
| - print >> sys.stderr, 'Failed to access %s' % options.swarming |
| + # TODO(maruel): Do basic diagnostic. |
| + print >> sys.stderr, 'Failed to access %s' % url |
| return 1 |
| # Some items support cursors. Try to get automatically if cursors are needed |
| @@ -1148,29 +1125,72 @@ def CMDquery(parser, args): |
| url = base_url + '%scursor=%s' % (merge_char, urllib.quote(data['cursor'])) |
| if options.limit: |
| url += '&limit=%d' % min(CHUNK_SIZE, options.limit - len(data['items'])) |
| + if options.progress: |
| + sys.stdout.write('.') |
| + sys.stdout.flush() |
| new = net.url_read_json(url) |
| if new is None: |
| + if options.progress: |
| + print('') |
| print >> sys.stderr, 'Failed to access %s' % options.swarming |
| return 1 |
| data['items'].extend(new['items']) |
| - data['cursor'] = new['cursor'] |
| + data['cursor'] = new.get('cursor') |
| + if options.progress: |
| + print('') |
| if options.limit and len(data.get('items', [])) > options.limit: |
| data['items'] = data['items'][:options.limit] |
| data.pop('cursor', None) |
| if options.json: |
| - with open(options.json, 'w') as f: |
| - json.dump(data, f) |
| + tools.write_json(options.json, data, True) |
| else: |
| try: |
| - json.dump(data, sys.stdout, indent=2, sort_keys=True) |
| + tools.write_json(sys.stdout, data, False) |
| sys.stdout.write('\n') |
| except IOError: |
| pass |
| return 0 |
| +def CMDquery_list(parser, args): |
| + """Returns list of all the Swarming APIs that can be used with command |
| + 'query'. |
| + """ |
| + parser.add_option( |
| + '--json', help='Path to JSON output file (otherwise prints to stdout)') |
| + options, args = parser.parse_args(args) |
| + if args: |
| + parser.error('No argument allowed.') |
| + |
| + try: |
| + apis = endpoints_api_discovery_apis(options.swarming) |
| + except APIError as e: |
| + parser.error(str(e)) |
| + if options.json: |
| + with open(options.json, 'wb') as f: |
| + json.dump(apis, f) |
| + else: |
| + help_url = ( |
| + 'https://apis-explorer.appspot.com/apis-explorer/?base=%s/_ah/api#p/' % |
| + options.swarming) |
| + for api_id, api in sorted(apis.iteritems()): |
| + print api_id |
| + print ' ' + api['description'] |
| + for resource_name, resource in sorted(api['resources'].iteritems()): |
| + print '' |
| + for method_name, method in sorted(resource['methods'].iteritems()): |
| + # Only list the GET ones. |
| + if method['httpMethod'] != 'GET': |
| + continue |
| + print '- %s.%s: %s' % ( |
| + resource_name, method_name, method['path']) |
| + print ' ' + method['description'] |
| + print ' %s%s%s' % (help_url, api['servicePath'], method['id']) |
| + return 0 |
| + |
| + |
| @subcommand.usage('(hash|isolated) [-- extra_args]') |
| def CMDrun(parser, args): |
| """Triggers a task and wait for the results. |
| @@ -1225,7 +1245,7 @@ def CMDreproduce(parser, args): |
| if len(args) != 1: |
| parser.error('Must specify exactly one task id.') |
| - url = options.swarming + '/swarming/api/v1/client/task/%s/request' % args[0] |
| + url = options.swarming + '/_ah/api/swarming/v1/task/%s/request' % args[0] |
| request = net.url_read_json(url) |
| if not request: |
| print >> sys.stderr, 'Failed to retrieve request data for the task' |
| @@ -1234,40 +1254,21 @@ def CMDreproduce(parser, args): |
| if not os.path.isdir('work'): |
| os.mkdir('work') |
| - swarming_host = urlparse.urlparse(options.swarming).netloc |
| properties = request['properties'] |
| - for data_url, _ in properties['data']: |
| - assert data_url.startswith('https://'), data_url |
| - data_host = urlparse.urlparse(data_url).netloc |
| - if data_host != swarming_host: |
| - auth.ensure_logged_in('https://' + data_host) |
| - |
| - content = net.url_read(data_url) |
| - if content is None: |
| - print >> sys.stderr, 'Failed to download %s' % data_url |
| - return 1 |
| - with zipfile.ZipFile(StringIO.StringIO(content)) as zip_file: |
| - zip_file.extractall('work') |
| - |
| env = None |
| if properties['env']: |
| env = os.environ.copy() |
| logging.info('env: %r', properties['env']) |
| env.update( |
| - (k.encode('utf-8'), v.encode('utf-8')) |
| - for k, v in properties['env'].iteritems()) |
| + (i['key'].encode('utf-8'), i['value'].encode('utf-8')) |
| + for i in properties['env']) |
| - exit_code = 0 |
| - for cmd in properties['commands']: |
| - try: |
| - c = subprocess.call(cmd, env=env, cwd='work') |
| - except OSError as e: |
| - print >> sys.stderr, 'Failed to run: %s' % ' '.join(cmd) |
| - print >> sys.stderr, str(e) |
| - c = 1 |
| - if not exit_code: |
| - exit_code = c |
| - return exit_code |
| + try: |
| + return subprocess.call(properties['command'], env=env, cwd='work') |
| + except OSError as e: |
| + print >> sys.stderr, 'Failed to run: %s' % ' '.join(properties['command']) |
| + print >> sys.stderr, str(e) |
| + return 1 |
| @subcommand.usage("(hash|isolated) [-- extra_args|raw command]") |