Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(200)

Unified Diff: client/swarming.py

Issue 1337633002: Reapply "Isolated task support in Endpoints API: client side (3/3)" and fixes" (Closed) Base URL: git@github.com:luci/luci-py.git@master
Patch Set: Final fixes Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « client/example/payload/hello_world.py ('k') | client/tests/swarming_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/swarming.py
diff --git a/client/swarming.py b/client/swarming.py
index b88931fde8e118b8d7116549e1f29d9bb0fbb65c..16adbe793069b5e44058c1eb38c7f2c77132cd2f 100755
--- a/client/swarming.py
+++ b/client/swarming.py
@@ -5,7 +5,7 @@
"""Client tool to trigger tasks or retrieve results from a Swarming server."""
-__version__ = '0.6.3'
+__version__ = '0.8.2'
import collections
import datetime
@@ -13,16 +13,11 @@ import json
import logging
import optparse
import os
-import re
-import shutil
-import StringIO
import subprocess
import sys
import threading
import time
import urllib
-import urlparse
-import zipfile
from third_party import colorama
from third_party.depot_tools import fix_encoding
@@ -35,12 +30,10 @@ from utils import net
from utils import on_error
from utils import threading_utils
from utils import tools
-from utils import zip_package
import auth
import isolated_format
import isolateserver
-import run_isolated
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
@@ -54,101 +47,14 @@ class Failure(Exception):
### Isolated file handling.
-def isolated_upload_zip_bundle(isolate_server, bundle):
- """Uploads a zip package to Isolate Server and returns raw fetch URL.
-
- Args:
- isolate_server: URL of an Isolate Server.
- bundle: instance of ZipPackage to upload.
-
- Returns:
- URL to get the file from.
- """
- # Swarming bot needs to be able to grab the file from the Isolate Server using
- # a simple HTTPS GET. Use 'default' namespace so that the raw data returned to
- # a bot is not zipped, since the swarming_bot doesn't understand compressed
- # data. This namespace have nothing to do with |namespace| passed to
- # run_isolated.py that is used to store files for isolated task.
- logging.info('Zipping up and uploading files...')
- start_time = time.time()
- isolate_item = isolateserver.BufferItem(bundle.zip_into_buffer())
- with isolateserver.get_storage(isolate_server, 'default') as storage:
- uploaded = storage.upload_items([isolate_item])
- bundle_url = storage.get_fetch_url(isolate_item)
- elapsed = time.time() - start_time
- if isolate_item in uploaded:
- logging.info('Upload complete, time elapsed: %f', elapsed)
- else:
- logging.info('Zip file already on server, time elapsed: %f', elapsed)
- return bundle_url
-
-
-def isolated_get_data(isolate_server):
- """Returns the 'data' section with all files necessary to bootstrap a task
- execution running an isolated task.
-
- It's mainly zipping run_isolated.zip over and over again.
- TODO(maruel): Get rid of this with.
- https://code.google.com/p/swarming/issues/detail?id=173
- """
- bundle = zip_package.ZipPackage(ROOT_DIR)
- bundle.add_buffer(
- 'run_isolated.zip',
- run_isolated.get_as_zip_package().zip_into_buffer(compress=False))
- bundle_url = isolated_upload_zip_bundle(isolate_server, bundle)
- return [(bundle_url, 'swarm_data.zip')]
-
-
-def isolated_get_run_commands(
- isolate_server, namespace, isolated_hash, extra_args, verbose):
- """Returns the 'commands' to run an isolated task via run_isolated.zip.
-
- Returns:
- commands list to be added to the request.
- """
- run_cmd = [
- 'python', 'run_isolated.zip',
- '--isolated', isolated_hash,
- '--isolate-server', isolate_server,
- '--namespace', namespace,
- ]
- if verbose:
- run_cmd.append('--verbose')
- # Pass all extra args for run_isolated.py, it will pass them to the command.
- if extra_args:
- run_cmd.append('--')
- run_cmd.extend(extra_args)
- return run_cmd
-
-
-def isolated_archive(isolate_server, namespace, isolated, algo, verbose):
- """Archives a .isolated and all the dependencies on the Isolate Server."""
- logging.info(
- 'isolated_archive(%s, %s, %s)', isolate_server, namespace, isolated)
- print('Archiving: %s' % isolated)
- cmd = [
- sys.executable,
- os.path.join(ROOT_DIR, 'isolate.py'),
- 'archive',
- '--isolate-server', isolate_server,
- '--namespace', namespace,
- '--isolated', isolated,
- ]
- cmd.extend(['--verbose'] * verbose)
- logging.info(' '.join(cmd))
- if subprocess.call(cmd, verbose):
- return None
- return isolated_format.hash_file(isolated, algo)
-
-
-def isolated_to_hash(isolate_server, namespace, arg, algo, verbose):
+def isolated_to_hash(arg, algo):
"""Archives a .isolated file if needed.
Returns the file hash to trigger and a bool specifying if it was a file (True)
or a hash (False).
"""
if arg.endswith('.isolated'):
- file_hash = isolated_archive(isolate_server, namespace, arg, algo, verbose)
+ file_hash = isolated_format.hash_file(arg, algo)
if not file_hash:
on_error.report('Archival failure %s' % arg)
return None, True
@@ -164,7 +70,7 @@ def isolated_handle_options(options, args):
"""Handles '--isolated <isolated>', '<isolated>' and '-- <args...>' arguments.
Returns:
- tuple(command, data).
+ tuple(command, inputs_ref).
"""
isolated_cmd_args = []
if not options.isolated:
@@ -182,8 +88,7 @@ def isolated_handle_options(options, args):
'process.')
# Old code. To be removed eventually.
options.isolated, is_file = isolated_to_hash(
- options.isolate_server, options.namespace, args[0],
- isolated_format.get_hash_algo(options.namespace), options.verbose)
+ args[0], isolated_format.get_hash_algo(options.namespace))
if not options.isolated:
raise ValueError('Invalid argument %s' % args[0])
elif args:
@@ -197,10 +102,6 @@ def isolated_handle_options(options, args):
# optparse eats '--' sometimes.
isolated_cmd_args = args
- command = isolated_get_run_commands(
- options.isolate_server, options.namespace, options.isolated,
- isolated_cmd_args, options.verbose)
-
# If a file name was passed, use its base name of the isolated hash.
# Otherwise, use user name as an approximation of a task name.
if not options.task_name:
@@ -215,76 +116,86 @@ def isolated_handle_options(options, args):
for k, v in sorted(options.dimensions.iteritems())),
options.isolated)
- try:
- data = isolated_get_data(options.isolate_server)
- except (IOError, OSError):
- on_error.report('Failed to upload the zip file')
- raise ValueError('Failed to upload the zip file')
-
- return command, data
+ inputs_ref = FilesRef(
+ isolated=options.isolated,
+ isolatedserver=options.isolate_server,
+ namespace=options.namespace)
+ return isolated_cmd_args, inputs_ref
### Triggering.
-TaskRequest = collections.namedtuple(
- 'TaskRequest',
+# See ../appengine/swarming/swarming_rpcs.py.
+FilesRef = collections.namedtuple(
+ 'FilesRef',
+ [
+ 'isolated',
+ 'isolatedserver',
+ 'namespace',
+ ])
+
+
+# See ../appengine/swarming/swarming_rpcs.py.
+TaskProperties = collections.namedtuple(
+ 'TaskProperties',
[
'command',
- 'data',
'dimensions',
'env',
- 'expiration',
- 'hard_timeout',
+ 'execution_timeout_secs',
+ 'extra_args',
+ 'grace_period_secs',
'idempotent',
- 'io_timeout',
+ 'inputs_ref',
+ 'io_timeout_secs',
+ ])
+
+
+# See ../appengine/swarming/swarming_rpcs.py.
+NewTaskRequest = collections.namedtuple(
+ 'NewTaskRequest',
+ [
+ 'expiration_secs',
'name',
+ 'parent_task_id',
'priority',
+ 'properties',
'tags',
'user',
- 'verbose',
])
+def namedtuple_to_dict(value):
+ """Recursively converts a namedtuple to a dict."""
+ out = dict(value._asdict())
+ for k, v in out.iteritems():
+ if hasattr(v, '_asdict'):
+ out[k] = namedtuple_to_dict(v)
+ return out
+
+
def task_request_to_raw_request(task_request):
"""Returns the json dict expected by the Swarming server for new request.
This is for the v1 client Swarming API.
"""
- return {
- 'name': task_request.name,
- 'parent_task_id': os.environ.get('SWARMING_TASK_ID', ''),
- 'priority': task_request.priority,
- 'properties': {
- 'commands': [task_request.command],
- 'data': task_request.data,
- 'dimensions': task_request.dimensions,
- 'env': task_request.env,
- 'execution_timeout_secs': task_request.hard_timeout,
- 'io_timeout_secs': task_request.io_timeout,
- 'idempotent': task_request.idempotent,
- },
- 'scheduling_expiration_secs': task_request.expiration,
- 'tags': task_request.tags,
- 'user': task_request.user,
- }
-
-
-def swarming_handshake(swarming):
- """Initiates the connection to the Swarming server."""
- headers = {'X-XSRF-Token-Request': '1'}
- response = net.url_read_json(
- swarming + '/swarming/api/v1/client/handshake',
- headers=headers,
- data={})
- if not response:
- logging.error('Failed to handshake with server')
- return None
- logging.info('Connected to server version: %s', response['server_version'])
- return response['xsrf_token']
+ out = namedtuple_to_dict(task_request)
+ # Maps are not supported until protobuf v3.
+ out['properties']['dimensions'] = [
+ {'key': k, 'value': v}
+ for k, v in out['properties']['dimensions'].iteritems()
+ ]
+ out['properties']['dimensions'].sort(key=lambda x: x['key'])
+ out['properties']['env'] = [
+ {'key': k, 'value': v}
+ for k, v in out['properties']['env'].iteritems()
+ ]
+ out['properties']['env'].sort(key=lambda x: x['key'])
+ return out
-def swarming_trigger(swarming, raw_request, xsrf_token):
+def swarming_trigger(swarming, raw_request):
"""Triggers a request on the Swarming server and returns the json data.
It's the low-level function.
@@ -300,11 +211,8 @@ def swarming_trigger(swarming, raw_request, xsrf_token):
"""
logging.info('Triggering: %s', raw_request['name'])
- headers = {'X-XSRF-Token': xsrf_token}
result = net.url_read_json(
- swarming + '/swarming/api/v1/client/request',
- data=raw_request,
- headers=headers)
+ swarming + '/_ah/api/swarming/v1/tasks/new', data=raw_request)
if not result:
on_error.report('Failed to trigger task %s' % raw_request['name'])
return None
@@ -314,9 +222,11 @@ def swarming_trigger(swarming, raw_request, xsrf_token):
def setup_googletest(env, shards, index):
"""Sets googletest specific environment variables."""
if shards > 1:
- env = env.copy()
- env['GTEST_SHARD_INDEX'] = str(index)
- env['GTEST_TOTAL_SHARDS'] = str(shards)
+ assert not any(i['key'] == 'GTEST_SHARD_INDEX' for i in env), env
+ assert not any(i['key'] == 'GTEST_TOTAL_SHARDS' for i in env), env
+ env = env[:]
+ env.append({'key': 'GTEST_SHARD_INDEX', 'value': str(index)})
+ env.append({'key': 'GTEST_TOTAL_SHARDS', 'value': str(shards)})
return env
@@ -328,21 +238,18 @@ def trigger_task_shards(swarming, task_request, shards):
None in case of failure.
"""
def convert(index):
- req = task_request
+ req = task_request_to_raw_request(task_request)
if shards > 1:
- req = req._replace(
- env=setup_googletest(req.env, shards, index),
- name='%s:%s:%s' % (req.name, index, shards))
- return task_request_to_raw_request(req)
+ req['properties']['env'] = setup_googletest(
+ req['properties']['env'], shards, index)
+ req['name'] += ':%s:%s' % (index, shards)
+ return req
requests = [convert(index) for index in xrange(shards)]
- xsrf_token = swarming_handshake(swarming)
- if not xsrf_token:
- return None
tasks = {}
priority_warning = False
for index, request in enumerate(requests):
- task = swarming_trigger(swarming, request, xsrf_token)
+ task = swarming_trigger(swarming, request)
if not task:
break
logging.info('Request result: %s', task)
@@ -392,11 +299,14 @@ class State(object):
CANCELED = 0x60
COMPLETED = 0x70
- STATES = (RUNNING, PENDING, EXPIRED, TIMED_OUT, BOT_DIED, CANCELED, COMPLETED)
- STATES_RUNNING = (RUNNING, PENDING)
- STATES_NOT_RUNNING = (EXPIRED, TIMED_OUT, BOT_DIED, CANCELED, COMPLETED)
- STATES_DONE = (TIMED_OUT, COMPLETED)
- STATES_ABANDONED = (EXPIRED, BOT_DIED, CANCELED)
+ STATES = (
+ 'RUNNING', 'PENDING', 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED',
+ 'COMPLETED')
+ STATES_RUNNING = ('RUNNING', 'PENDING')
+ STATES_NOT_RUNNING = (
+ 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED', 'COMPLETED')
+ STATES_DONE = ('TIMED_OUT', 'COMPLETED')
+ STATES_ABANDONED = ('EXPIRED', 'BOT_DIED', 'CANCELED')
_NAMES = {
RUNNING: 'Running',
@@ -408,6 +318,16 @@ class State(object):
COMPLETED: 'Completed',
}
+ _ENUMS = {
+ 'RUNNING': RUNNING,
+ 'PENDING': PENDING,
+ 'EXPIRED': EXPIRED,
+ 'TIMED_OUT': TIMED_OUT,
+ 'BOT_DIED': BOT_DIED,
+ 'CANCELED': CANCELED,
+ 'COMPLETED': COMPLETED,
+ }
+
@classmethod
def to_string(cls, state):
"""Returns a user-readable string representing a State."""
@@ -415,6 +335,13 @@ class State(object):
raise ValueError('Invalid state %s' % state)
return cls._NAMES[state]
+ @classmethod
+ def from_enum(cls, state):
+ """Returns int value based on the string."""
+ if state not in cls._ENUMS:
+ raise ValueError('Invalid state %s' % state)
+ return cls._ENUMS[state]
+
class TaskOutputCollector(object):
"""Assembles task execution summary (for --task-summary-json output).
@@ -450,6 +377,8 @@ class TaskOutputCollector(object):
Modifies |result| in place.
+ shard_index is 0-based.
+
Called concurrently from multiple threads.
"""
# Sanity check index is in expected range.
@@ -460,14 +389,12 @@ class TaskOutputCollector(object):
shard_index, self.shard_count - 1)
return
- assert not 'isolated_out' in result
- result['isolated_out'] = None
- for output in result['outputs']:
- isolated_files_location = extract_output_files_location(output)
- if isolated_files_location:
- if result['isolated_out']:
- raise ValueError('Unexpected two task with output')
- result['isolated_out'] = isolated_files_location
+ if result.get('outputs_ref'):
+ ref = result['outputs_ref']
+ result['outputs_ref']['view_url'] = '%s/browse?%s' % (
+ ref['isolatedserver'],
+ urllib.urlencode(
+ [('namespace', ref['namespace']), ('hash', ref['isolated'])]))
# Store result dict of that shard, ignore results we've already seen.
with self._lock:
@@ -477,16 +404,16 @@ class TaskOutputCollector(object):
self._per_shard_results[shard_index] = result
# Fetch output files if necessary.
- if self.task_output_dir and result['isolated_out']:
+ if self.task_output_dir and result.get('outputs_ref'):
storage = self._get_storage(
- result['isolated_out']['server'],
- result['isolated_out']['namespace'])
+ result['outputs_ref']['isolatedserver'],
+ result['outputs_ref']['namespace'])
if storage:
# Output files are supposed to be small and they are not reused across
# tasks. So use MemoryCache for them instead of on-disk cache. Make
# files writable, so that calling script can delete them.
isolateserver.fetch_isolated(
- result['isolated_out']['hash'],
+ result['outputs_ref']['isolated'],
storage,
isolateserver.MemoryCache(file_mode_mask=0700),
os.path.join(self.task_output_dir, str(shard_index)),
@@ -533,58 +460,23 @@ class TaskOutputCollector(object):
return self._storage
-def extract_output_files_location(task_log):
- """Task log -> location of task output files to fetch.
-
- TODO(vadimsh,maruel): Use side-channel to get this information.
- See 'run_tha_test' in run_isolated.py for where the data is generated.
-
- Returns:
- Tuple (isolate server URL, namespace, isolated hash) on success.
- None if information is missing or can not be parsed.
- """
- if not task_log:
- return None
- match = re.search(
- r'\[run_isolated_out_hack\](.*)\[/run_isolated_out_hack\]',
- task_log,
- re.DOTALL)
- if not match:
- return None
-
- def to_ascii(val):
- if not isinstance(val, basestring):
- raise ValueError()
- return val.encode('ascii')
-
- try:
- data = json.loads(match.group(1))
- if not isinstance(data, dict):
- raise ValueError()
- isolated_hash = to_ascii(data['hash'])
- namespace = to_ascii(data['namespace'])
- isolate_server = to_ascii(data['storage'])
- if not file_path.is_url(isolate_server):
- raise ValueError()
- data = {
- 'hash': isolated_hash,
- 'namespace': namespace,
- 'server': isolate_server,
- 'view_url': '%s/browse?%s' % (isolate_server, urllib.urlencode(
- [('namespace', namespace), ('hash', isolated_hash)])),
- }
- return data
- except (KeyError, ValueError):
- logging.warning(
- 'Unexpected value of run_isolated_out_hack: %s', match.group(1))
- return None
-
-
def now():
"""Exists so it can be mocked easily."""
return time.time()
+def parse_time(value):
+ """Converts serialized time from the API to datetime.datetime."""
+ # When microseconds are 0, the '.123456' suffix is elided. This means the
+ # serialized format is not consistent, which confuses the hell out of python.
+ for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
+ try:
+ return datetime.datetime.strptime(value, fmt)
+ except ValueError:
+ pass
+ raise ValueError('Failed to parse %s' % value)
+
+
def retrieve_results(
base_url, shard_index, task_id, timeout, should_stop, output_collector):
"""Retrieves results for a single task ID.
@@ -594,9 +486,8 @@ def retrieve_results(
None on failure.
"""
assert isinstance(timeout, float), timeout
- result_url = '%s/swarming/api/v1/client/task/%s' % (base_url, task_id)
- output_url = '%s/swarming/api/v1/client/task/%s/output/all' % (
- base_url, task_id)
+ result_url = '%s/_ah/api/swarming/v1/task/%s/result' % (base_url, task_id)
+ output_url = '%s/_ah/api/swarming/v1/task/%s/stdout' % (base_url, task_id)
started = now()
deadline = started + timeout if timeout else None
attempt = 0
@@ -629,18 +520,69 @@ def retrieve_results(
result = net.url_read_json(result_url, retry_50x=False)
if not result:
continue
+
if result['state'] in State.STATES_NOT_RUNNING:
+ # TODO(maruel): Not always fetch stdout?
out = net.url_read_json(output_url)
- result['outputs'] = (out or {}).get('outputs', [])
- if not result['outputs']:
+ result['output'] = out.get('output') if out else out
+ if not result['output']:
logging.error('No output found for task %s', task_id)
# Record the result, try to fetch attached output files (if any).
if output_collector:
# TODO(vadimsh): Respect |should_stop| and |deadline| when fetching.
output_collector.process_shard_result(shard_index, result)
+ if result.get('internal_failure'):
+ logging.error('Internal error!')
+ elif result['state'] == 'BOT_DIED':
+ logging.error('Bot died!')
return result
+def convert_to_old_format(result):
+ """Converts the task result data from Endpoints API format to old API format
+ for compatibility.
+
+ This goes into the file generated as --task-summary-json.
+ """
+ # Sets default.
+ result.setdefault('abandoned_ts', None)
+ result.setdefault('bot_id', None)
+ result.setdefault('bot_version', None)
+ result.setdefault('children_task_ids', [])
+ result.setdefault('completed_ts', None)
+ result.setdefault('cost_saved_usd', None)
+ result.setdefault('costs_usd', None)
+ result.setdefault('deduped_from', None)
+ result.setdefault('name', None)
+ result.setdefault('outputs_ref', None)
+ result.setdefault('properties_hash', None)
+ result.setdefault('server_versions', None)
+ result.setdefault('started_ts', None)
+ result.setdefault('tags', None)
+ result.setdefault('user', None)
+
+ # Convertion back to old API.
+ duration = result.pop('duration', None)
+ result['durations'] = [duration] if duration else []
+ exit_code = result.pop('exit_code', None)
+ result['exit_codes'] = [int(exit_code)] if exit_code else []
+ result['id'] = result.pop('task_id')
+ result['isolated_out'] = result.get('outputs_ref', None)
+ output = result.pop('output', None)
+ result['outputs'] = [output] if output else []
+ # properties_hash
+ # server_version
+ # Endpoints result 'state' as string. For compatibility with old code, convert
+ # to int.
+ result['state'] = State.from_enum(result['state'])
+ # tags
+ result['try_number'] = (
+ int(result['try_number']) if result['try_number'] else None)
+ result['bot_dimensions'] = {
+ i['key']: i['value'] for i in result['bot_dimensions']
+ }
+
+
def yield_results(
swarm_base_url, task_ids, timeout, max_threads, print_status_updates,
output_collector):
@@ -716,27 +658,27 @@ def yield_results(
def decorate_shard_output(swarming, shard_index, metadata):
"""Returns wrapped output for swarming task shard."""
- def t(d):
- return datetime.datetime.strptime(d, '%Y-%m-%d %H:%M:%S')
- if metadata.get('started_ts'):
+ if metadata.get('started_ts') and not metadata.get('deduped_from'):
pending = '%.1fs' % (
- t(metadata['started_ts']) - t(metadata['created_ts'])).total_seconds()
+ parse_time(metadata['started_ts']) - parse_time(metadata['created_ts'])
+ ).total_seconds()
else:
pending = 'N/A'
- if metadata.get('durations'):
- duration = '%.1fs' % metadata['durations'][0]
+ if metadata.get('duration') is not None:
+ duration = '%.1fs' % metadata['duration']
else:
duration = 'N/A'
- if metadata.get('exit_codes'):
- exit_code = '%d' % metadata['exit_codes'][0]
+ if metadata.get('exit_code') is not None:
+ # Integers are encoded as string to not loose precision.
+ exit_code = '%s' % metadata['exit_code']
else:
exit_code = 'N/A'
bot_id = metadata.get('bot_id') or 'N/A'
- url = '%s/user/task/%s' % (swarming, metadata['id'])
+ url = '%s/user/task/%s' % (swarming, metadata['task_id'])
tag_header = 'Shard %d %s' % (shard_index, url)
tag_footer = (
'End of shard %d Pending: %s Duration: %s Bot: %s Exit: %s' % (
@@ -749,7 +691,7 @@ def decorate_shard_output(swarming, shard_index, metadata):
header = dash_pad + tag_header + dash_pad
footer = dash_pad + tag_footer + dash_pad[:-1]
- output = '\n'.join(o for o in metadata['outputs'] if o).rstrip() + '\n'
+ output = metadata.get('output', '').rstrip() + '\n'
return header + output + footer
@@ -771,34 +713,33 @@ def collect(
seen_shards.add(index)
# Default to failure if there was no process that even started.
- shard_exit_code = 1
- if metadata.get('exit_codes'):
- shard_exit_code = metadata['exit_codes'][0]
+ shard_exit_code = metadata.get('exit_code')
+ if shard_exit_code:
+ shard_exit_code = int(shard_exit_code)
if shard_exit_code:
exit_code = shard_exit_code
- if metadata.get('durations'):
- total_duration += metadata['durations'][0]
+ total_duration += metadata.get('duration', 0)
if decorate:
print(decorate_shard_output(swarming, index, metadata))
if len(seen_shards) < len(task_ids):
print('')
else:
- if metadata.get('exit_codes'):
- exit_code = metadata['exit_codes'][0]
- else:
- exit_code = 'N/A'
- print('%s: %s %s' %
- (metadata.get('bot_id') or 'N/A', metadata['id'], exit_code))
- for output in metadata['outputs']:
- if not output:
- continue
- output = output.rstrip()
+ print('%s: %s %s' % (
+ metadata.get('bot_id', 'N/A'),
+ metadata['task_id'],
+ shard_exit_code))
+ if metadata['output']:
+ output = metadata['output'].rstrip()
if output:
print(''.join(' %s\n' % l for l in output.splitlines()))
finally:
summary = output_collector.finalize()
if task_summary_json:
+ # TODO(maruel): Make this optional.
+ for i in summary['shards']:
+ if i:
+ convert_to_old_format(i)
tools.write_json(task_summary_json, summary, False)
if decorate and total_duration:
@@ -813,6 +754,36 @@ def collect(
return exit_code
+### API management.
+
+
+class APIError(Exception):
+ pass
+
+
+def endpoints_api_discovery_apis(host):
+ """Uses Cloud Endpoints' API Discovery Service to returns metadata about all
+ the APIs exposed by a host.
+
+ https://developers.google.com/discovery/v1/reference/apis/list
+ """
+ data = net.url_read_json(host + '/_ah/api/discovery/v1/apis')
+ if data is None:
+ raise APIError('Failed to discover APIs on %s' % host)
+ out = {}
+ for api in data['items']:
+ if api['id'] == 'discovery:v1':
+ continue
+ # URL is of the following form:
+ # url = host + (
+ # '/_ah/api/discovery/v1/apis/%s/%s/rest' % (api['id'], api['version'])
+ api_data = net.url_read_json(api['discoveryRestUrl'])
+ if api_data is None:
+ raise APIError('Failed to discover %s on %s' % (api['id'], host))
+ out[api['id']] = api_data
+ return out
+
+
### Commands.
@@ -896,7 +867,6 @@ def process_trigger_options(parser, options, args):
options.dimensions = dict(options.dimensions)
options.env = dict(options.env)
- data = []
if not options.dimensions:
parser.error('Please at least specify one --dimension')
if options.raw_cmd:
@@ -914,27 +884,34 @@ def process_trigger_options(parser, options, args):
'_'.join(
'%s=%s' % (k, v)
for k, v in sorted(options.dimensions.iteritems())))
+ inputs_ref = None
else:
isolateserver.process_isolate_server_options(parser, options, False)
try:
- command, data = isolated_handle_options(options, args)
+ command, inputs_ref = isolated_handle_options(options, args)
except ValueError as e:
parser.error(str(e))
- return TaskRequest(
- command=command,
- data=data,
+ # If inputs_ref is used, command is actually extra_args. Otherwise it's an
+ # actual command to run.
+ properties = TaskProperties(
+ command=None if inputs_ref else command,
dimensions=options.dimensions,
env=options.env,
- expiration=options.expiration,
- hard_timeout=options.hard_timeout,
+ execution_timeout_secs=options.hard_timeout,
+ extra_args=command if inputs_ref else None,
+ grace_period_secs=30,
idempotent=options.idempotent,
- io_timeout=options.io_timeout,
+ inputs_ref=inputs_ref,
+ io_timeout_secs=options.io_timeout)
+ return NewTaskRequest(
+ expiration_secs=options.expiration,
name=options.task_name,
+ parent_task_id=os.environ.get('SWARMING_TASK_ID', ''),
priority=options.priority,
+ properties=properties,
tags=options.tags,
- user=options.user,
- verbose=options.verbose)
+ user=options.user)
def add_collect_options(parser):
@@ -986,7 +963,7 @@ def CMDbot_delete(parser, args):
result = 0
for bot in bots:
- url = '%s/swarming/api/v1/client/bot/%s' % (options.swarming, bot)
+ url = '%s/_ah/api/swarming/v1/bot/%s' % (options.swarming, bot)
if net.url_read_json(url, method='DELETE') is None:
print('Deleting %s failed' % bot)
result = 1
@@ -1014,7 +991,8 @@ def CMDbots(parser, args):
cursor = None
limit = 250
# Iterate via cursors.
- base_url = options.swarming + '/swarming/api/v1/client/bots?limit=%d' % limit
+ base_url = (
+ options.swarming + '/_ah/api/swarming/v1/bots/list?limit=%d' % limit)
while True:
url = base_url
if cursor:
@@ -1024,20 +1002,20 @@ def CMDbots(parser, args):
print >> sys.stderr, 'Failed to access %s' % options.swarming
return 1
bots.extend(data['items'])
- cursor = data['cursor']
+ cursor = data.get('cursor')
if not cursor:
break
- for bot in natsort.natsorted(bots, key=lambda x: x['id']):
+ for bot in natsort.natsorted(bots, key=lambda x: x['bot_id']):
if options.dead_only:
- if not bot['is_dead']:
+ if not bot.get('is_dead'):
continue
- elif not options.keep_dead and bot['is_dead']:
+ elif not options.keep_dead and bot.get('is_dead'):
continue
# If the user requested to filter on dimensions, ensure the bot has all the
# dimensions requested.
- dimensions = bot['dimensions']
+ dimensions = {i['key']: i['value'] for i in bot['dimensions']}
for key, value in options.dimensions:
if key not in dimensions:
break
@@ -1051,7 +1029,7 @@ def CMDbots(parser, args):
if value != dimensions[key]:
break
else:
- print bot['id']
+ print bot['bot_id']
if not options.bare:
print ' %s' % json.dumps(dimensions, sort_keys=True)
if bot.get('task_id'):
@@ -1070,7 +1048,7 @@ def CMDcollect(parser, args):
parser.add_option(
'-j', '--json',
help='Load the task ids from .json as saved by trigger --dump-json')
- (options, args) = parser.parse_args(args)
+ options, args = parser.parse_args(args)
if not args and not options.json:
parser.error('Must specify at least one task id or --json.')
if args and options.json:
@@ -1104,17 +1082,17 @@ def CMDcollect(parser, args):
return 1
-@subcommand.usage('[resource name]')
+@subcommand.usage('[method name]')
def CMDquery(parser, args):
- """Returns raw JSON information via an URL endpoint. Use 'list' to gather the
- list of valid values from the server.
+ """Returns raw JSON information via an URL endpoint. Use 'query-list' to
+ gather the list of API methods from the server.
Examples:
- Printing the list of known URLs:
- swarming.py query -S https://server-url list
+ Listing all bots:
+ swarming.py query -S https://server-url bots/list
- Listing last 50 tasks on a specific bot named 'swarm1'
- swarming.py query -S https://server-url --limit 50 bot/swarm1/tasks
+ Listing last 10 tasks on a specific bot named 'swarm1':
+ swarming.py query -S https://server-url --limit 10 bot/swarm1/tasks
"""
CHUNK_SIZE = 250
@@ -1124,11 +1102,15 @@ def CMDquery(parser, args):
'default=%default')
parser.add_option(
'--json', help='Path to JSON output file (otherwise prints to stdout)')
- (options, args) = parser.parse_args(args)
+ parser.add_option(
+ '--progress', action='store_true',
+ help='Prints a dot at each request to show progress')
+ options, args = parser.parse_args(args)
if len(args) != 1:
- parser.error('Must specify only one resource name.')
-
- base_url = options.swarming + '/swarming/api/v1/client/' + args[0]
+ parser.error(
+ 'Must specify only method name and optionally query args properly '
+ 'escaped.')
+ base_url = options.swarming + '/_ah/api/swarming/v1/' + args[0]
url = base_url
if options.limit:
# Check check, change if not working out.
@@ -1136,7 +1118,8 @@ def CMDquery(parser, args):
url += '%slimit=%d' % (merge_char, min(CHUNK_SIZE, options.limit))
data = net.url_read_json(url)
if data is None:
- print >> sys.stderr, 'Failed to access %s' % options.swarming
+ # TODO(maruel): Do basic diagnostic.
+ print >> sys.stderr, 'Failed to access %s' % url
return 1
# Some items support cursors. Try to get automatically if cursors are needed
@@ -1148,29 +1131,72 @@ def CMDquery(parser, args):
url = base_url + '%scursor=%s' % (merge_char, urllib.quote(data['cursor']))
if options.limit:
url += '&limit=%d' % min(CHUNK_SIZE, options.limit - len(data['items']))
+ if options.progress:
+ sys.stdout.write('.')
+ sys.stdout.flush()
new = net.url_read_json(url)
if new is None:
+ if options.progress:
+ print('')
print >> sys.stderr, 'Failed to access %s' % options.swarming
return 1
data['items'].extend(new['items'])
- data['cursor'] = new['cursor']
+ data['cursor'] = new.get('cursor')
+ if options.progress:
+ print('')
if options.limit and len(data.get('items', [])) > options.limit:
data['items'] = data['items'][:options.limit]
data.pop('cursor', None)
if options.json:
- with open(options.json, 'w') as f:
- json.dump(data, f)
+ tools.write_json(options.json, data, True)
else:
try:
- json.dump(data, sys.stdout, indent=2, sort_keys=True)
+ tools.write_json(sys.stdout, data, False)
sys.stdout.write('\n')
except IOError:
pass
return 0
+def CMDquery_list(parser, args):
+ """Returns list of all the Swarming APIs that can be used with command
+ 'query'.
+ """
+ parser.add_option(
+ '--json', help='Path to JSON output file (otherwise prints to stdout)')
+ options, args = parser.parse_args(args)
+ if args:
+ parser.error('No argument allowed.')
+
+ try:
+ apis = endpoints_api_discovery_apis(options.swarming)
+ except APIError as e:
+ parser.error(str(e))
+ if options.json:
+ with open(options.json, 'wb') as f:
+ json.dump(apis, f)
+ else:
+ help_url = (
+ 'https://apis-explorer.appspot.com/apis-explorer/?base=%s/_ah/api#p/' %
+ options.swarming)
+ for api_id, api in sorted(apis.iteritems()):
+ print api_id
+ print ' ' + api['description']
+ for resource_name, resource in sorted(api['resources'].iteritems()):
+ print ''
+ for method_name, method in sorted(resource['methods'].iteritems()):
+ # Only list the GET ones.
+ if method['httpMethod'] != 'GET':
+ continue
+ print '- %s.%s: %s' % (
+ resource_name, method_name, method['path'])
+ print ' ' + method['description']
+ print ' %s%s%s' % (help_url, api['servicePath'], method['id'])
+ return 0
+
+
@subcommand.usage('(hash|isolated) [-- extra_args]')
def CMDrun(parser, args):
"""Triggers a task and wait for the results.
@@ -1225,7 +1251,7 @@ def CMDreproduce(parser, args):
if len(args) != 1:
parser.error('Must specify exactly one task id.')
- url = options.swarming + '/swarming/api/v1/client/task/%s/request' % args[0]
+ url = options.swarming + '/_ah/api/swarming/v1/task/%s/request' % args[0]
request = net.url_read_json(url)
if not request:
print >> sys.stderr, 'Failed to retrieve request data for the task'
@@ -1234,40 +1260,21 @@ def CMDreproduce(parser, args):
if not os.path.isdir('work'):
os.mkdir('work')
- swarming_host = urlparse.urlparse(options.swarming).netloc
properties = request['properties']
- for data_url, _ in properties['data']:
- assert data_url.startswith('https://'), data_url
- data_host = urlparse.urlparse(data_url).netloc
- if data_host != swarming_host:
- auth.ensure_logged_in('https://' + data_host)
-
- content = net.url_read(data_url)
- if content is None:
- print >> sys.stderr, 'Failed to download %s' % data_url
- return 1
- with zipfile.ZipFile(StringIO.StringIO(content)) as zip_file:
- zip_file.extractall('work')
-
env = None
if properties['env']:
env = os.environ.copy()
logging.info('env: %r', properties['env'])
env.update(
- (k.encode('utf-8'), v.encode('utf-8'))
- for k, v in properties['env'].iteritems())
+ (i['key'].encode('utf-8'), i['value'].encode('utf-8'))
+ for i in properties['env'])
- exit_code = 0
- for cmd in properties['commands']:
- try:
- c = subprocess.call(cmd, env=env, cwd='work')
- except OSError as e:
- print >> sys.stderr, 'Failed to run: %s' % ' '.join(cmd)
- print >> sys.stderr, str(e)
- c = 1
- if not exit_code:
- exit_code = c
- return exit_code
+ try:
+ return subprocess.call(properties['command'], env=env, cwd='work')
+ except OSError as e:
+ print >> sys.stderr, 'Failed to run: %s' % ' '.join(properties['command'])
+ print >> sys.stderr, str(e)
+ return 1
@subcommand.usage("(hash|isolated) [-- extra_args|raw command]")
« no previous file with comments | « client/example/payload/hello_world.py ('k') | client/tests/swarming_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698