Index: client/swarming.py |
diff --git a/client/swarming.py b/client/swarming.py |
index b88931fde8e118b8d7116549e1f29d9bb0fbb65c..16adbe793069b5e44058c1eb38c7f2c77132cd2f 100755 |
--- a/client/swarming.py |
+++ b/client/swarming.py |
@@ -5,7 +5,7 @@ |
"""Client tool to trigger tasks or retrieve results from a Swarming server.""" |
-__version__ = '0.6.3' |
+__version__ = '0.8.2' |
import collections |
import datetime |
@@ -13,16 +13,11 @@ import json |
import logging |
import optparse |
import os |
-import re |
-import shutil |
-import StringIO |
import subprocess |
import sys |
import threading |
import time |
import urllib |
-import urlparse |
-import zipfile |
from third_party import colorama |
from third_party.depot_tools import fix_encoding |
@@ -35,12 +30,10 @@ from utils import net |
from utils import on_error |
from utils import threading_utils |
from utils import tools |
-from utils import zip_package |
import auth |
import isolated_format |
import isolateserver |
-import run_isolated |
ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) |
@@ -54,101 +47,14 @@ class Failure(Exception): |
### Isolated file handling. |
-def isolated_upload_zip_bundle(isolate_server, bundle): |
- """Uploads a zip package to Isolate Server and returns raw fetch URL. |
- |
- Args: |
- isolate_server: URL of an Isolate Server. |
- bundle: instance of ZipPackage to upload. |
- |
- Returns: |
- URL to get the file from. |
- """ |
- # Swarming bot needs to be able to grab the file from the Isolate Server using |
- # a simple HTTPS GET. Use 'default' namespace so that the raw data returned to |
- # a bot is not zipped, since the swarming_bot doesn't understand compressed |
- # data. This namespace have nothing to do with |namespace| passed to |
- # run_isolated.py that is used to store files for isolated task. |
- logging.info('Zipping up and uploading files...') |
- start_time = time.time() |
- isolate_item = isolateserver.BufferItem(bundle.zip_into_buffer()) |
- with isolateserver.get_storage(isolate_server, 'default') as storage: |
- uploaded = storage.upload_items([isolate_item]) |
- bundle_url = storage.get_fetch_url(isolate_item) |
- elapsed = time.time() - start_time |
- if isolate_item in uploaded: |
- logging.info('Upload complete, time elapsed: %f', elapsed) |
- else: |
- logging.info('Zip file already on server, time elapsed: %f', elapsed) |
- return bundle_url |
- |
- |
-def isolated_get_data(isolate_server): |
- """Returns the 'data' section with all files necessary to bootstrap a task |
- execution running an isolated task. |
- |
- It's mainly zipping run_isolated.zip over and over again. |
- TODO(maruel): Get rid of this with. |
- https://code.google.com/p/swarming/issues/detail?id=173 |
- """ |
- bundle = zip_package.ZipPackage(ROOT_DIR) |
- bundle.add_buffer( |
- 'run_isolated.zip', |
- run_isolated.get_as_zip_package().zip_into_buffer(compress=False)) |
- bundle_url = isolated_upload_zip_bundle(isolate_server, bundle) |
- return [(bundle_url, 'swarm_data.zip')] |
- |
- |
-def isolated_get_run_commands( |
- isolate_server, namespace, isolated_hash, extra_args, verbose): |
- """Returns the 'commands' to run an isolated task via run_isolated.zip. |
- |
- Returns: |
- commands list to be added to the request. |
- """ |
- run_cmd = [ |
- 'python', 'run_isolated.zip', |
- '--isolated', isolated_hash, |
- '--isolate-server', isolate_server, |
- '--namespace', namespace, |
- ] |
- if verbose: |
- run_cmd.append('--verbose') |
- # Pass all extra args for run_isolated.py, it will pass them to the command. |
- if extra_args: |
- run_cmd.append('--') |
- run_cmd.extend(extra_args) |
- return run_cmd |
- |
- |
-def isolated_archive(isolate_server, namespace, isolated, algo, verbose): |
- """Archives a .isolated and all the dependencies on the Isolate Server.""" |
- logging.info( |
- 'isolated_archive(%s, %s, %s)', isolate_server, namespace, isolated) |
- print('Archiving: %s' % isolated) |
- cmd = [ |
- sys.executable, |
- os.path.join(ROOT_DIR, 'isolate.py'), |
- 'archive', |
- '--isolate-server', isolate_server, |
- '--namespace', namespace, |
- '--isolated', isolated, |
- ] |
- cmd.extend(['--verbose'] * verbose) |
- logging.info(' '.join(cmd)) |
- if subprocess.call(cmd, verbose): |
- return None |
- return isolated_format.hash_file(isolated, algo) |
- |
- |
-def isolated_to_hash(isolate_server, namespace, arg, algo, verbose): |
+def isolated_to_hash(arg, algo): |
"""Archives a .isolated file if needed. |
Returns the file hash to trigger and a bool specifying if it was a file (True) |
or a hash (False). |
""" |
if arg.endswith('.isolated'): |
- file_hash = isolated_archive(isolate_server, namespace, arg, algo, verbose) |
+ file_hash = isolated_format.hash_file(arg, algo) |
if not file_hash: |
on_error.report('Archival failure %s' % arg) |
return None, True |
@@ -164,7 +70,7 @@ def isolated_handle_options(options, args): |
"""Handles '--isolated <isolated>', '<isolated>' and '-- <args...>' arguments. |
Returns: |
- tuple(command, data). |
+ tuple(command, inputs_ref). |
""" |
isolated_cmd_args = [] |
if not options.isolated: |
@@ -182,8 +88,7 @@ def isolated_handle_options(options, args): |
'process.') |
# Old code. To be removed eventually. |
options.isolated, is_file = isolated_to_hash( |
- options.isolate_server, options.namespace, args[0], |
- isolated_format.get_hash_algo(options.namespace), options.verbose) |
+ args[0], isolated_format.get_hash_algo(options.namespace)) |
if not options.isolated: |
raise ValueError('Invalid argument %s' % args[0]) |
elif args: |
@@ -197,10 +102,6 @@ def isolated_handle_options(options, args): |
# optparse eats '--' sometimes. |
isolated_cmd_args = args |
- command = isolated_get_run_commands( |
- options.isolate_server, options.namespace, options.isolated, |
- isolated_cmd_args, options.verbose) |
- |
# If a file name was passed, use its base name of the isolated hash. |
# Otherwise, use user name as an approximation of a task name. |
if not options.task_name: |
@@ -215,76 +116,86 @@ def isolated_handle_options(options, args): |
for k, v in sorted(options.dimensions.iteritems())), |
options.isolated) |
- try: |
- data = isolated_get_data(options.isolate_server) |
- except (IOError, OSError): |
- on_error.report('Failed to upload the zip file') |
- raise ValueError('Failed to upload the zip file') |
- |
- return command, data |
+ inputs_ref = FilesRef( |
+ isolated=options.isolated, |
+ isolatedserver=options.isolate_server, |
+ namespace=options.namespace) |
+ return isolated_cmd_args, inputs_ref |
### Triggering. |
-TaskRequest = collections.namedtuple( |
- 'TaskRequest', |
+# See ../appengine/swarming/swarming_rpcs.py. |
+FilesRef = collections.namedtuple( |
+ 'FilesRef', |
+ [ |
+ 'isolated', |
+ 'isolatedserver', |
+ 'namespace', |
+ ]) |
+ |
+ |
+# See ../appengine/swarming/swarming_rpcs.py. |
+TaskProperties = collections.namedtuple( |
+ 'TaskProperties', |
[ |
'command', |
- 'data', |
'dimensions', |
'env', |
- 'expiration', |
- 'hard_timeout', |
+ 'execution_timeout_secs', |
+ 'extra_args', |
+ 'grace_period_secs', |
'idempotent', |
- 'io_timeout', |
+ 'inputs_ref', |
+ 'io_timeout_secs', |
+ ]) |
+ |
+ |
+# See ../appengine/swarming/swarming_rpcs.py. |
+NewTaskRequest = collections.namedtuple( |
+ 'NewTaskRequest', |
+ [ |
+ 'expiration_secs', |
'name', |
+ 'parent_task_id', |
'priority', |
+ 'properties', |
'tags', |
'user', |
- 'verbose', |
]) |
+def namedtuple_to_dict(value): |
+ """Recursively converts a namedtuple to a dict.""" |
+ out = dict(value._asdict()) |
+ for k, v in out.iteritems(): |
+ if hasattr(v, '_asdict'): |
+ out[k] = namedtuple_to_dict(v) |
+ return out |
+ |
+ |
def task_request_to_raw_request(task_request): |
"""Returns the json dict expected by the Swarming server for new request. |
This is for the v1 client Swarming API. |
""" |
- return { |
- 'name': task_request.name, |
- 'parent_task_id': os.environ.get('SWARMING_TASK_ID', ''), |
- 'priority': task_request.priority, |
- 'properties': { |
- 'commands': [task_request.command], |
- 'data': task_request.data, |
- 'dimensions': task_request.dimensions, |
- 'env': task_request.env, |
- 'execution_timeout_secs': task_request.hard_timeout, |
- 'io_timeout_secs': task_request.io_timeout, |
- 'idempotent': task_request.idempotent, |
- }, |
- 'scheduling_expiration_secs': task_request.expiration, |
- 'tags': task_request.tags, |
- 'user': task_request.user, |
- } |
- |
- |
-def swarming_handshake(swarming): |
- """Initiates the connection to the Swarming server.""" |
- headers = {'X-XSRF-Token-Request': '1'} |
- response = net.url_read_json( |
- swarming + '/swarming/api/v1/client/handshake', |
- headers=headers, |
- data={}) |
- if not response: |
- logging.error('Failed to handshake with server') |
- return None |
- logging.info('Connected to server version: %s', response['server_version']) |
- return response['xsrf_token'] |
+ out = namedtuple_to_dict(task_request) |
+ # Maps are not supported until protobuf v3. |
+ out['properties']['dimensions'] = [ |
+ {'key': k, 'value': v} |
+ for k, v in out['properties']['dimensions'].iteritems() |
+ ] |
+ out['properties']['dimensions'].sort(key=lambda x: x['key']) |
+ out['properties']['env'] = [ |
+ {'key': k, 'value': v} |
+ for k, v in out['properties']['env'].iteritems() |
+ ] |
+ out['properties']['env'].sort(key=lambda x: x['key']) |
+ return out |
-def swarming_trigger(swarming, raw_request, xsrf_token): |
+def swarming_trigger(swarming, raw_request): |
"""Triggers a request on the Swarming server and returns the json data. |
It's the low-level function. |
@@ -300,11 +211,8 @@ def swarming_trigger(swarming, raw_request, xsrf_token): |
""" |
logging.info('Triggering: %s', raw_request['name']) |
- headers = {'X-XSRF-Token': xsrf_token} |
result = net.url_read_json( |
- swarming + '/swarming/api/v1/client/request', |
- data=raw_request, |
- headers=headers) |
+ swarming + '/_ah/api/swarming/v1/tasks/new', data=raw_request) |
if not result: |
on_error.report('Failed to trigger task %s' % raw_request['name']) |
return None |
@@ -314,9 +222,11 @@ def swarming_trigger(swarming, raw_request, xsrf_token): |
def setup_googletest(env, shards, index): |
"""Sets googletest specific environment variables.""" |
if shards > 1: |
- env = env.copy() |
- env['GTEST_SHARD_INDEX'] = str(index) |
- env['GTEST_TOTAL_SHARDS'] = str(shards) |
+ assert not any(i['key'] == 'GTEST_SHARD_INDEX' for i in env), env |
+ assert not any(i['key'] == 'GTEST_TOTAL_SHARDS' for i in env), env |
+ env = env[:] |
+ env.append({'key': 'GTEST_SHARD_INDEX', 'value': str(index)}) |
+ env.append({'key': 'GTEST_TOTAL_SHARDS', 'value': str(shards)}) |
return env |
@@ -328,21 +238,18 @@ def trigger_task_shards(swarming, task_request, shards): |
None in case of failure. |
""" |
def convert(index): |
- req = task_request |
+ req = task_request_to_raw_request(task_request) |
if shards > 1: |
- req = req._replace( |
- env=setup_googletest(req.env, shards, index), |
- name='%s:%s:%s' % (req.name, index, shards)) |
- return task_request_to_raw_request(req) |
+ req['properties']['env'] = setup_googletest( |
+ req['properties']['env'], shards, index) |
+ req['name'] += ':%s:%s' % (index, shards) |
+ return req |
requests = [convert(index) for index in xrange(shards)] |
- xsrf_token = swarming_handshake(swarming) |
- if not xsrf_token: |
- return None |
tasks = {} |
priority_warning = False |
for index, request in enumerate(requests): |
- task = swarming_trigger(swarming, request, xsrf_token) |
+ task = swarming_trigger(swarming, request) |
if not task: |
break |
logging.info('Request result: %s', task) |
@@ -392,11 +299,14 @@ class State(object): |
CANCELED = 0x60 |
COMPLETED = 0x70 |
- STATES = (RUNNING, PENDING, EXPIRED, TIMED_OUT, BOT_DIED, CANCELED, COMPLETED) |
- STATES_RUNNING = (RUNNING, PENDING) |
- STATES_NOT_RUNNING = (EXPIRED, TIMED_OUT, BOT_DIED, CANCELED, COMPLETED) |
- STATES_DONE = (TIMED_OUT, COMPLETED) |
- STATES_ABANDONED = (EXPIRED, BOT_DIED, CANCELED) |
+ STATES = ( |
+ 'RUNNING', 'PENDING', 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED', |
+ 'COMPLETED') |
+ STATES_RUNNING = ('RUNNING', 'PENDING') |
+ STATES_NOT_RUNNING = ( |
+ 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED', 'COMPLETED') |
+ STATES_DONE = ('TIMED_OUT', 'COMPLETED') |
+ STATES_ABANDONED = ('EXPIRED', 'BOT_DIED', 'CANCELED') |
_NAMES = { |
RUNNING: 'Running', |
@@ -408,6 +318,16 @@ class State(object): |
COMPLETED: 'Completed', |
} |
+ _ENUMS = { |
+ 'RUNNING': RUNNING, |
+ 'PENDING': PENDING, |
+ 'EXPIRED': EXPIRED, |
+ 'TIMED_OUT': TIMED_OUT, |
+ 'BOT_DIED': BOT_DIED, |
+ 'CANCELED': CANCELED, |
+ 'COMPLETED': COMPLETED, |
+ } |
+ |
@classmethod |
def to_string(cls, state): |
"""Returns a user-readable string representing a State.""" |
@@ -415,6 +335,13 @@ class State(object): |
raise ValueError('Invalid state %s' % state) |
return cls._NAMES[state] |
+ @classmethod |
+ def from_enum(cls, state): |
+ """Returns int value based on the string.""" |
+ if state not in cls._ENUMS: |
+ raise ValueError('Invalid state %s' % state) |
+ return cls._ENUMS[state] |
+ |
class TaskOutputCollector(object): |
"""Assembles task execution summary (for --task-summary-json output). |
@@ -450,6 +377,8 @@ class TaskOutputCollector(object): |
Modifies |result| in place. |
+ shard_index is 0-based. |
+ |
Called concurrently from multiple threads. |
""" |
# Sanity check index is in expected range. |
@@ -460,14 +389,12 @@ class TaskOutputCollector(object): |
shard_index, self.shard_count - 1) |
return |
- assert not 'isolated_out' in result |
- result['isolated_out'] = None |
- for output in result['outputs']: |
- isolated_files_location = extract_output_files_location(output) |
- if isolated_files_location: |
- if result['isolated_out']: |
- raise ValueError('Unexpected two task with output') |
- result['isolated_out'] = isolated_files_location |
+ if result.get('outputs_ref'): |
+ ref = result['outputs_ref'] |
+ result['outputs_ref']['view_url'] = '%s/browse?%s' % ( |
+ ref['isolatedserver'], |
+ urllib.urlencode( |
+ [('namespace', ref['namespace']), ('hash', ref['isolated'])])) |
# Store result dict of that shard, ignore results we've already seen. |
with self._lock: |
@@ -477,16 +404,16 @@ class TaskOutputCollector(object): |
self._per_shard_results[shard_index] = result |
# Fetch output files if necessary. |
- if self.task_output_dir and result['isolated_out']: |
+ if self.task_output_dir and result.get('outputs_ref'): |
storage = self._get_storage( |
- result['isolated_out']['server'], |
- result['isolated_out']['namespace']) |
+ result['outputs_ref']['isolatedserver'], |
+ result['outputs_ref']['namespace']) |
if storage: |
# Output files are supposed to be small and they are not reused across |
# tasks. So use MemoryCache for them instead of on-disk cache. Make |
# files writable, so that calling script can delete them. |
isolateserver.fetch_isolated( |
- result['isolated_out']['hash'], |
+ result['outputs_ref']['isolated'], |
storage, |
isolateserver.MemoryCache(file_mode_mask=0700), |
os.path.join(self.task_output_dir, str(shard_index)), |
@@ -533,58 +460,23 @@ class TaskOutputCollector(object): |
return self._storage |
-def extract_output_files_location(task_log): |
- """Task log -> location of task output files to fetch. |
- |
- TODO(vadimsh,maruel): Use side-channel to get this information. |
- See 'run_tha_test' in run_isolated.py for where the data is generated. |
- |
- Returns: |
- Tuple (isolate server URL, namespace, isolated hash) on success. |
- None if information is missing or can not be parsed. |
- """ |
- if not task_log: |
- return None |
- match = re.search( |
- r'\[run_isolated_out_hack\](.*)\[/run_isolated_out_hack\]', |
- task_log, |
- re.DOTALL) |
- if not match: |
- return None |
- |
- def to_ascii(val): |
- if not isinstance(val, basestring): |
- raise ValueError() |
- return val.encode('ascii') |
- |
- try: |
- data = json.loads(match.group(1)) |
- if not isinstance(data, dict): |
- raise ValueError() |
- isolated_hash = to_ascii(data['hash']) |
- namespace = to_ascii(data['namespace']) |
- isolate_server = to_ascii(data['storage']) |
- if not file_path.is_url(isolate_server): |
- raise ValueError() |
- data = { |
- 'hash': isolated_hash, |
- 'namespace': namespace, |
- 'server': isolate_server, |
- 'view_url': '%s/browse?%s' % (isolate_server, urllib.urlencode( |
- [('namespace', namespace), ('hash', isolated_hash)])), |
- } |
- return data |
- except (KeyError, ValueError): |
- logging.warning( |
- 'Unexpected value of run_isolated_out_hack: %s', match.group(1)) |
- return None |
- |
- |
def now(): |
"""Exists so it can be mocked easily.""" |
return time.time() |
+def parse_time(value): |
+ """Converts serialized time from the API to datetime.datetime.""" |
+ # When microseconds are 0, the '.123456' suffix is elided. This means the |
+ # serialized format is not consistent, which confuses the hell out of python. |
+ for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'): |
+ try: |
+ return datetime.datetime.strptime(value, fmt) |
+ except ValueError: |
+ pass |
+ raise ValueError('Failed to parse %s' % value) |
+ |
+ |
def retrieve_results( |
base_url, shard_index, task_id, timeout, should_stop, output_collector): |
"""Retrieves results for a single task ID. |
@@ -594,9 +486,8 @@ def retrieve_results( |
None on failure. |
""" |
assert isinstance(timeout, float), timeout |
- result_url = '%s/swarming/api/v1/client/task/%s' % (base_url, task_id) |
- output_url = '%s/swarming/api/v1/client/task/%s/output/all' % ( |
- base_url, task_id) |
+ result_url = '%s/_ah/api/swarming/v1/task/%s/result' % (base_url, task_id) |
+ output_url = '%s/_ah/api/swarming/v1/task/%s/stdout' % (base_url, task_id) |
started = now() |
deadline = started + timeout if timeout else None |
attempt = 0 |
@@ -629,18 +520,69 @@ def retrieve_results( |
result = net.url_read_json(result_url, retry_50x=False) |
if not result: |
continue |
+ |
if result['state'] in State.STATES_NOT_RUNNING: |
+ # TODO(maruel): Not always fetch stdout? |
out = net.url_read_json(output_url) |
- result['outputs'] = (out or {}).get('outputs', []) |
- if not result['outputs']: |
+ result['output'] = out.get('output') if out else out |
+ if not result['output']: |
logging.error('No output found for task %s', task_id) |
# Record the result, try to fetch attached output files (if any). |
if output_collector: |
# TODO(vadimsh): Respect |should_stop| and |deadline| when fetching. |
output_collector.process_shard_result(shard_index, result) |
+ if result.get('internal_failure'): |
+ logging.error('Internal error!') |
+ elif result['state'] == 'BOT_DIED': |
+ logging.error('Bot died!') |
return result |
+def convert_to_old_format(result): |
+ """Converts the task result data from Endpoints API format to old API format |
+ for compatibility. |
+ |
+ This goes into the file generated as --task-summary-json. |
+ """ |
+ # Sets default. |
+ result.setdefault('abandoned_ts', None) |
+ result.setdefault('bot_id', None) |
+ result.setdefault('bot_version', None) |
+ result.setdefault('children_task_ids', []) |
+ result.setdefault('completed_ts', None) |
+ result.setdefault('cost_saved_usd', None) |
+ result.setdefault('costs_usd', None) |
+ result.setdefault('deduped_from', None) |
+ result.setdefault('name', None) |
+ result.setdefault('outputs_ref', None) |
+ result.setdefault('properties_hash', None) |
+ result.setdefault('server_versions', None) |
+ result.setdefault('started_ts', None) |
+ result.setdefault('tags', None) |
+ result.setdefault('user', None) |
+ |
+ # Convertion back to old API. |
+ duration = result.pop('duration', None) |
+ result['durations'] = [duration] if duration else [] |
+ exit_code = result.pop('exit_code', None) |
+ result['exit_codes'] = [int(exit_code)] if exit_code else [] |
+ result['id'] = result.pop('task_id') |
+ result['isolated_out'] = result.get('outputs_ref', None) |
+ output = result.pop('output', None) |
+ result['outputs'] = [output] if output else [] |
+ # properties_hash |
+ # server_version |
+ # Endpoints result 'state' as string. For compatibility with old code, convert |
+ # to int. |
+ result['state'] = State.from_enum(result['state']) |
+ # tags |
+ result['try_number'] = ( |
+ int(result['try_number']) if result['try_number'] else None) |
+ result['bot_dimensions'] = { |
+ i['key']: i['value'] for i in result['bot_dimensions'] |
+ } |
+ |
+ |
def yield_results( |
swarm_base_url, task_ids, timeout, max_threads, print_status_updates, |
output_collector): |
@@ -716,27 +658,27 @@ def yield_results( |
def decorate_shard_output(swarming, shard_index, metadata): |
"""Returns wrapped output for swarming task shard.""" |
- def t(d): |
- return datetime.datetime.strptime(d, '%Y-%m-%d %H:%M:%S') |
- if metadata.get('started_ts'): |
+ if metadata.get('started_ts') and not metadata.get('deduped_from'): |
pending = '%.1fs' % ( |
- t(metadata['started_ts']) - t(metadata['created_ts'])).total_seconds() |
+ parse_time(metadata['started_ts']) - parse_time(metadata['created_ts']) |
+ ).total_seconds() |
else: |
pending = 'N/A' |
- if metadata.get('durations'): |
- duration = '%.1fs' % metadata['durations'][0] |
+ if metadata.get('duration') is not None: |
+ duration = '%.1fs' % metadata['duration'] |
else: |
duration = 'N/A' |
- if metadata.get('exit_codes'): |
- exit_code = '%d' % metadata['exit_codes'][0] |
+ if metadata.get('exit_code') is not None: |
+ # Integers are encoded as string to not loose precision. |
+ exit_code = '%s' % metadata['exit_code'] |
else: |
exit_code = 'N/A' |
bot_id = metadata.get('bot_id') or 'N/A' |
- url = '%s/user/task/%s' % (swarming, metadata['id']) |
+ url = '%s/user/task/%s' % (swarming, metadata['task_id']) |
tag_header = 'Shard %d %s' % (shard_index, url) |
tag_footer = ( |
'End of shard %d Pending: %s Duration: %s Bot: %s Exit: %s' % ( |
@@ -749,7 +691,7 @@ def decorate_shard_output(swarming, shard_index, metadata): |
header = dash_pad + tag_header + dash_pad |
footer = dash_pad + tag_footer + dash_pad[:-1] |
- output = '\n'.join(o for o in metadata['outputs'] if o).rstrip() + '\n' |
+ output = metadata.get('output', '').rstrip() + '\n' |
return header + output + footer |
@@ -771,34 +713,33 @@ def collect( |
seen_shards.add(index) |
# Default to failure if there was no process that even started. |
- shard_exit_code = 1 |
- if metadata.get('exit_codes'): |
- shard_exit_code = metadata['exit_codes'][0] |
+ shard_exit_code = metadata.get('exit_code') |
+ if shard_exit_code: |
+ shard_exit_code = int(shard_exit_code) |
if shard_exit_code: |
exit_code = shard_exit_code |
- if metadata.get('durations'): |
- total_duration += metadata['durations'][0] |
+ total_duration += metadata.get('duration', 0) |
if decorate: |
print(decorate_shard_output(swarming, index, metadata)) |
if len(seen_shards) < len(task_ids): |
print('') |
else: |
- if metadata.get('exit_codes'): |
- exit_code = metadata['exit_codes'][0] |
- else: |
- exit_code = 'N/A' |
- print('%s: %s %s' % |
- (metadata.get('bot_id') or 'N/A', metadata['id'], exit_code)) |
- for output in metadata['outputs']: |
- if not output: |
- continue |
- output = output.rstrip() |
+ print('%s: %s %s' % ( |
+ metadata.get('bot_id', 'N/A'), |
+ metadata['task_id'], |
+ shard_exit_code)) |
+ if metadata['output']: |
+ output = metadata['output'].rstrip() |
if output: |
print(''.join(' %s\n' % l for l in output.splitlines())) |
finally: |
summary = output_collector.finalize() |
if task_summary_json: |
+ # TODO(maruel): Make this optional. |
+ for i in summary['shards']: |
+ if i: |
+ convert_to_old_format(i) |
tools.write_json(task_summary_json, summary, False) |
if decorate and total_duration: |
@@ -813,6 +754,36 @@ def collect( |
return exit_code |
+### API management. |
+ |
+ |
+class APIError(Exception): |
+ pass |
+ |
+ |
+def endpoints_api_discovery_apis(host): |
+ """Uses Cloud Endpoints' API Discovery Service to returns metadata about all |
+ the APIs exposed by a host. |
+ |
+ https://developers.google.com/discovery/v1/reference/apis/list |
+ """ |
+ data = net.url_read_json(host + '/_ah/api/discovery/v1/apis') |
+ if data is None: |
+ raise APIError('Failed to discover APIs on %s' % host) |
+ out = {} |
+ for api in data['items']: |
+ if api['id'] == 'discovery:v1': |
+ continue |
+ # URL is of the following form: |
+ # url = host + ( |
+ # '/_ah/api/discovery/v1/apis/%s/%s/rest' % (api['id'], api['version']) |
+ api_data = net.url_read_json(api['discoveryRestUrl']) |
+ if api_data is None: |
+ raise APIError('Failed to discover %s on %s' % (api['id'], host)) |
+ out[api['id']] = api_data |
+ return out |
+ |
+ |
### Commands. |
@@ -896,7 +867,6 @@ def process_trigger_options(parser, options, args): |
options.dimensions = dict(options.dimensions) |
options.env = dict(options.env) |
- data = [] |
if not options.dimensions: |
parser.error('Please at least specify one --dimension') |
if options.raw_cmd: |
@@ -914,27 +884,34 @@ def process_trigger_options(parser, options, args): |
'_'.join( |
'%s=%s' % (k, v) |
for k, v in sorted(options.dimensions.iteritems()))) |
+ inputs_ref = None |
else: |
isolateserver.process_isolate_server_options(parser, options, False) |
try: |
- command, data = isolated_handle_options(options, args) |
+ command, inputs_ref = isolated_handle_options(options, args) |
except ValueError as e: |
parser.error(str(e)) |
- return TaskRequest( |
- command=command, |
- data=data, |
+ # If inputs_ref is used, command is actually extra_args. Otherwise it's an |
+ # actual command to run. |
+ properties = TaskProperties( |
+ command=None if inputs_ref else command, |
dimensions=options.dimensions, |
env=options.env, |
- expiration=options.expiration, |
- hard_timeout=options.hard_timeout, |
+ execution_timeout_secs=options.hard_timeout, |
+ extra_args=command if inputs_ref else None, |
+ grace_period_secs=30, |
idempotent=options.idempotent, |
- io_timeout=options.io_timeout, |
+ inputs_ref=inputs_ref, |
+ io_timeout_secs=options.io_timeout) |
+ return NewTaskRequest( |
+ expiration_secs=options.expiration, |
name=options.task_name, |
+ parent_task_id=os.environ.get('SWARMING_TASK_ID', ''), |
priority=options.priority, |
+ properties=properties, |
tags=options.tags, |
- user=options.user, |
- verbose=options.verbose) |
+ user=options.user) |
def add_collect_options(parser): |
@@ -986,7 +963,7 @@ def CMDbot_delete(parser, args): |
result = 0 |
for bot in bots: |
- url = '%s/swarming/api/v1/client/bot/%s' % (options.swarming, bot) |
+ url = '%s/_ah/api/swarming/v1/bot/%s' % (options.swarming, bot) |
if net.url_read_json(url, method='DELETE') is None: |
print('Deleting %s failed' % bot) |
result = 1 |
@@ -1014,7 +991,8 @@ def CMDbots(parser, args): |
cursor = None |
limit = 250 |
# Iterate via cursors. |
- base_url = options.swarming + '/swarming/api/v1/client/bots?limit=%d' % limit |
+ base_url = ( |
+ options.swarming + '/_ah/api/swarming/v1/bots/list?limit=%d' % limit) |
while True: |
url = base_url |
if cursor: |
@@ -1024,20 +1002,20 @@ def CMDbots(parser, args): |
print >> sys.stderr, 'Failed to access %s' % options.swarming |
return 1 |
bots.extend(data['items']) |
- cursor = data['cursor'] |
+ cursor = data.get('cursor') |
if not cursor: |
break |
- for bot in natsort.natsorted(bots, key=lambda x: x['id']): |
+ for bot in natsort.natsorted(bots, key=lambda x: x['bot_id']): |
if options.dead_only: |
- if not bot['is_dead']: |
+ if not bot.get('is_dead'): |
continue |
- elif not options.keep_dead and bot['is_dead']: |
+ elif not options.keep_dead and bot.get('is_dead'): |
continue |
# If the user requested to filter on dimensions, ensure the bot has all the |
# dimensions requested. |
- dimensions = bot['dimensions'] |
+ dimensions = {i['key']: i['value'] for i in bot['dimensions']} |
for key, value in options.dimensions: |
if key not in dimensions: |
break |
@@ -1051,7 +1029,7 @@ def CMDbots(parser, args): |
if value != dimensions[key]: |
break |
else: |
- print bot['id'] |
+ print bot['bot_id'] |
if not options.bare: |
print ' %s' % json.dumps(dimensions, sort_keys=True) |
if bot.get('task_id'): |
@@ -1070,7 +1048,7 @@ def CMDcollect(parser, args): |
parser.add_option( |
'-j', '--json', |
help='Load the task ids from .json as saved by trigger --dump-json') |
- (options, args) = parser.parse_args(args) |
+ options, args = parser.parse_args(args) |
if not args and not options.json: |
parser.error('Must specify at least one task id or --json.') |
if args and options.json: |
@@ -1104,17 +1082,17 @@ def CMDcollect(parser, args): |
return 1 |
-@subcommand.usage('[resource name]') |
+@subcommand.usage('[method name]') |
def CMDquery(parser, args): |
- """Returns raw JSON information via an URL endpoint. Use 'list' to gather the |
- list of valid values from the server. |
+ """Returns raw JSON information via an URL endpoint. Use 'query-list' to |
+ gather the list of API methods from the server. |
Examples: |
- Printing the list of known URLs: |
- swarming.py query -S https://server-url list |
+ Listing all bots: |
+ swarming.py query -S https://server-url bots/list |
- Listing last 50 tasks on a specific bot named 'swarm1' |
- swarming.py query -S https://server-url --limit 50 bot/swarm1/tasks |
+ Listing last 10 tasks on a specific bot named 'swarm1': |
+ swarming.py query -S https://server-url --limit 10 bot/swarm1/tasks |
""" |
CHUNK_SIZE = 250 |
@@ -1124,11 +1102,15 @@ def CMDquery(parser, args): |
'default=%default') |
parser.add_option( |
'--json', help='Path to JSON output file (otherwise prints to stdout)') |
- (options, args) = parser.parse_args(args) |
+ parser.add_option( |
+ '--progress', action='store_true', |
+ help='Prints a dot at each request to show progress') |
+ options, args = parser.parse_args(args) |
if len(args) != 1: |
- parser.error('Must specify only one resource name.') |
- |
- base_url = options.swarming + '/swarming/api/v1/client/' + args[0] |
+ parser.error( |
+ 'Must specify only method name and optionally query args properly ' |
+ 'escaped.') |
+ base_url = options.swarming + '/_ah/api/swarming/v1/' + args[0] |
url = base_url |
if options.limit: |
# Check check, change if not working out. |
@@ -1136,7 +1118,8 @@ def CMDquery(parser, args): |
url += '%slimit=%d' % (merge_char, min(CHUNK_SIZE, options.limit)) |
data = net.url_read_json(url) |
if data is None: |
- print >> sys.stderr, 'Failed to access %s' % options.swarming |
+ # TODO(maruel): Do basic diagnostic. |
+ print >> sys.stderr, 'Failed to access %s' % url |
return 1 |
# Some items support cursors. Try to get automatically if cursors are needed |
@@ -1148,29 +1131,72 @@ def CMDquery(parser, args): |
url = base_url + '%scursor=%s' % (merge_char, urllib.quote(data['cursor'])) |
if options.limit: |
url += '&limit=%d' % min(CHUNK_SIZE, options.limit - len(data['items'])) |
+ if options.progress: |
+ sys.stdout.write('.') |
+ sys.stdout.flush() |
new = net.url_read_json(url) |
if new is None: |
+ if options.progress: |
+ print('') |
print >> sys.stderr, 'Failed to access %s' % options.swarming |
return 1 |
data['items'].extend(new['items']) |
- data['cursor'] = new['cursor'] |
+ data['cursor'] = new.get('cursor') |
+ if options.progress: |
+ print('') |
if options.limit and len(data.get('items', [])) > options.limit: |
data['items'] = data['items'][:options.limit] |
data.pop('cursor', None) |
if options.json: |
- with open(options.json, 'w') as f: |
- json.dump(data, f) |
+ tools.write_json(options.json, data, True) |
else: |
try: |
- json.dump(data, sys.stdout, indent=2, sort_keys=True) |
+ tools.write_json(sys.stdout, data, False) |
sys.stdout.write('\n') |
except IOError: |
pass |
return 0 |
+def CMDquery_list(parser, args): |
+ """Returns list of all the Swarming APIs that can be used with command |
+ 'query'. |
+ """ |
+ parser.add_option( |
+ '--json', help='Path to JSON output file (otherwise prints to stdout)') |
+ options, args = parser.parse_args(args) |
+ if args: |
+ parser.error('No argument allowed.') |
+ |
+ try: |
+ apis = endpoints_api_discovery_apis(options.swarming) |
+ except APIError as e: |
+ parser.error(str(e)) |
+ if options.json: |
+ with open(options.json, 'wb') as f: |
+ json.dump(apis, f) |
+ else: |
+ help_url = ( |
+ 'https://apis-explorer.appspot.com/apis-explorer/?base=%s/_ah/api#p/' % |
+ options.swarming) |
+ for api_id, api in sorted(apis.iteritems()): |
+ print api_id |
+ print ' ' + api['description'] |
+ for resource_name, resource in sorted(api['resources'].iteritems()): |
+ print '' |
+ for method_name, method in sorted(resource['methods'].iteritems()): |
+ # Only list the GET ones. |
+ if method['httpMethod'] != 'GET': |
+ continue |
+ print '- %s.%s: %s' % ( |
+ resource_name, method_name, method['path']) |
+ print ' ' + method['description'] |
+ print ' %s%s%s' % (help_url, api['servicePath'], method['id']) |
+ return 0 |
+ |
+ |
@subcommand.usage('(hash|isolated) [-- extra_args]') |
def CMDrun(parser, args): |
"""Triggers a task and wait for the results. |
@@ -1225,7 +1251,7 @@ def CMDreproduce(parser, args): |
if len(args) != 1: |
parser.error('Must specify exactly one task id.') |
- url = options.swarming + '/swarming/api/v1/client/task/%s/request' % args[0] |
+ url = options.swarming + '/_ah/api/swarming/v1/task/%s/request' % args[0] |
request = net.url_read_json(url) |
if not request: |
print >> sys.stderr, 'Failed to retrieve request data for the task' |
@@ -1234,40 +1260,21 @@ def CMDreproduce(parser, args): |
if not os.path.isdir('work'): |
os.mkdir('work') |
- swarming_host = urlparse.urlparse(options.swarming).netloc |
properties = request['properties'] |
- for data_url, _ in properties['data']: |
- assert data_url.startswith('https://'), data_url |
- data_host = urlparse.urlparse(data_url).netloc |
- if data_host != swarming_host: |
- auth.ensure_logged_in('https://' + data_host) |
- |
- content = net.url_read(data_url) |
- if content is None: |
- print >> sys.stderr, 'Failed to download %s' % data_url |
- return 1 |
- with zipfile.ZipFile(StringIO.StringIO(content)) as zip_file: |
- zip_file.extractall('work') |
- |
env = None |
if properties['env']: |
env = os.environ.copy() |
logging.info('env: %r', properties['env']) |
env.update( |
- (k.encode('utf-8'), v.encode('utf-8')) |
- for k, v in properties['env'].iteritems()) |
+ (i['key'].encode('utf-8'), i['value'].encode('utf-8')) |
+ for i in properties['env']) |
- exit_code = 0 |
- for cmd in properties['commands']: |
- try: |
- c = subprocess.call(cmd, env=env, cwd='work') |
- except OSError as e: |
- print >> sys.stderr, 'Failed to run: %s' % ' '.join(cmd) |
- print >> sys.stderr, str(e) |
- c = 1 |
- if not exit_code: |
- exit_code = c |
- return exit_code |
+ try: |
+ return subprocess.call(properties['command'], env=env, cwd='work') |
+ except OSError as e: |
+ print >> sys.stderr, 'Failed to run: %s' % ' '.join(properties['command']) |
+ print >> sys.stderr, str(e) |
+ return 1 |
@subcommand.usage("(hash|isolated) [-- extra_args|raw command]") |