| Index: client/swarming.py
|
| diff --git a/client/swarming.py b/client/swarming.py
|
| index b88931fde8e118b8d7116549e1f29d9bb0fbb65c..16adbe793069b5e44058c1eb38c7f2c77132cd2f 100755
|
| --- a/client/swarming.py
|
| +++ b/client/swarming.py
|
| @@ -5,7 +5,7 @@
|
|
|
| """Client tool to trigger tasks or retrieve results from a Swarming server."""
|
|
|
| -__version__ = '0.6.3'
|
| +__version__ = '0.8.2'
|
|
|
| import collections
|
| import datetime
|
| @@ -13,16 +13,11 @@ import json
|
| import logging
|
| import optparse
|
| import os
|
| -import re
|
| -import shutil
|
| -import StringIO
|
| import subprocess
|
| import sys
|
| import threading
|
| import time
|
| import urllib
|
| -import urlparse
|
| -import zipfile
|
|
|
| from third_party import colorama
|
| from third_party.depot_tools import fix_encoding
|
| @@ -35,12 +30,10 @@ from utils import net
|
| from utils import on_error
|
| from utils import threading_utils
|
| from utils import tools
|
| -from utils import zip_package
|
|
|
| import auth
|
| import isolated_format
|
| import isolateserver
|
| -import run_isolated
|
|
|
|
|
| ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
|
| @@ -54,101 +47,14 @@ class Failure(Exception):
|
| ### Isolated file handling.
|
|
|
|
|
| -def isolated_upload_zip_bundle(isolate_server, bundle):
|
| - """Uploads a zip package to Isolate Server and returns raw fetch URL.
|
| -
|
| - Args:
|
| - isolate_server: URL of an Isolate Server.
|
| - bundle: instance of ZipPackage to upload.
|
| -
|
| - Returns:
|
| - URL to get the file from.
|
| - """
|
| - # Swarming bot needs to be able to grab the file from the Isolate Server using
|
| - # a simple HTTPS GET. Use 'default' namespace so that the raw data returned to
|
| - # a bot is not zipped, since the swarming_bot doesn't understand compressed
|
| - # data. This namespace have nothing to do with |namespace| passed to
|
| - # run_isolated.py that is used to store files for isolated task.
|
| - logging.info('Zipping up and uploading files...')
|
| - start_time = time.time()
|
| - isolate_item = isolateserver.BufferItem(bundle.zip_into_buffer())
|
| - with isolateserver.get_storage(isolate_server, 'default') as storage:
|
| - uploaded = storage.upload_items([isolate_item])
|
| - bundle_url = storage.get_fetch_url(isolate_item)
|
| - elapsed = time.time() - start_time
|
| - if isolate_item in uploaded:
|
| - logging.info('Upload complete, time elapsed: %f', elapsed)
|
| - else:
|
| - logging.info('Zip file already on server, time elapsed: %f', elapsed)
|
| - return bundle_url
|
| -
|
| -
|
| -def isolated_get_data(isolate_server):
|
| - """Returns the 'data' section with all files necessary to bootstrap a task
|
| - execution running an isolated task.
|
| -
|
| - It's mainly zipping run_isolated.zip over and over again.
|
| - TODO(maruel): Get rid of this with.
|
| - https://code.google.com/p/swarming/issues/detail?id=173
|
| - """
|
| - bundle = zip_package.ZipPackage(ROOT_DIR)
|
| - bundle.add_buffer(
|
| - 'run_isolated.zip',
|
| - run_isolated.get_as_zip_package().zip_into_buffer(compress=False))
|
| - bundle_url = isolated_upload_zip_bundle(isolate_server, bundle)
|
| - return [(bundle_url, 'swarm_data.zip')]
|
| -
|
| -
|
| -def isolated_get_run_commands(
|
| - isolate_server, namespace, isolated_hash, extra_args, verbose):
|
| - """Returns the 'commands' to run an isolated task via run_isolated.zip.
|
| -
|
| - Returns:
|
| - commands list to be added to the request.
|
| - """
|
| - run_cmd = [
|
| - 'python', 'run_isolated.zip',
|
| - '--isolated', isolated_hash,
|
| - '--isolate-server', isolate_server,
|
| - '--namespace', namespace,
|
| - ]
|
| - if verbose:
|
| - run_cmd.append('--verbose')
|
| - # Pass all extra args for run_isolated.py, it will pass them to the command.
|
| - if extra_args:
|
| - run_cmd.append('--')
|
| - run_cmd.extend(extra_args)
|
| - return run_cmd
|
| -
|
| -
|
| -def isolated_archive(isolate_server, namespace, isolated, algo, verbose):
|
| - """Archives a .isolated and all the dependencies on the Isolate Server."""
|
| - logging.info(
|
| - 'isolated_archive(%s, %s, %s)', isolate_server, namespace, isolated)
|
| - print('Archiving: %s' % isolated)
|
| - cmd = [
|
| - sys.executable,
|
| - os.path.join(ROOT_DIR, 'isolate.py'),
|
| - 'archive',
|
| - '--isolate-server', isolate_server,
|
| - '--namespace', namespace,
|
| - '--isolated', isolated,
|
| - ]
|
| - cmd.extend(['--verbose'] * verbose)
|
| - logging.info(' '.join(cmd))
|
| - if subprocess.call(cmd, verbose):
|
| - return None
|
| - return isolated_format.hash_file(isolated, algo)
|
| -
|
| -
|
| -def isolated_to_hash(isolate_server, namespace, arg, algo, verbose):
|
| +def isolated_to_hash(arg, algo):
|
| """Archives a .isolated file if needed.
|
|
|
| Returns the file hash to trigger and a bool specifying if it was a file (True)
|
| or a hash (False).
|
| """
|
| if arg.endswith('.isolated'):
|
| - file_hash = isolated_archive(isolate_server, namespace, arg, algo, verbose)
|
| + file_hash = isolated_format.hash_file(arg, algo)
|
| if not file_hash:
|
| on_error.report('Archival failure %s' % arg)
|
| return None, True
|
| @@ -164,7 +70,7 @@ def isolated_handle_options(options, args):
|
| """Handles '--isolated <isolated>', '<isolated>' and '-- <args...>' arguments.
|
|
|
| Returns:
|
| - tuple(command, data).
|
| + tuple(command, inputs_ref).
|
| """
|
| isolated_cmd_args = []
|
| if not options.isolated:
|
| @@ -182,8 +88,7 @@ def isolated_handle_options(options, args):
|
| 'process.')
|
| # Old code. To be removed eventually.
|
| options.isolated, is_file = isolated_to_hash(
|
| - options.isolate_server, options.namespace, args[0],
|
| - isolated_format.get_hash_algo(options.namespace), options.verbose)
|
| + args[0], isolated_format.get_hash_algo(options.namespace))
|
| if not options.isolated:
|
| raise ValueError('Invalid argument %s' % args[0])
|
| elif args:
|
| @@ -197,10 +102,6 @@ def isolated_handle_options(options, args):
|
| # optparse eats '--' sometimes.
|
| isolated_cmd_args = args
|
|
|
| - command = isolated_get_run_commands(
|
| - options.isolate_server, options.namespace, options.isolated,
|
| - isolated_cmd_args, options.verbose)
|
| -
|
| # If a file name was passed, use its base name of the isolated hash.
|
| # Otherwise, use user name as an approximation of a task name.
|
| if not options.task_name:
|
| @@ -215,76 +116,86 @@ def isolated_handle_options(options, args):
|
| for k, v in sorted(options.dimensions.iteritems())),
|
| options.isolated)
|
|
|
| - try:
|
| - data = isolated_get_data(options.isolate_server)
|
| - except (IOError, OSError):
|
| - on_error.report('Failed to upload the zip file')
|
| - raise ValueError('Failed to upload the zip file')
|
| -
|
| - return command, data
|
| + inputs_ref = FilesRef(
|
| + isolated=options.isolated,
|
| + isolatedserver=options.isolate_server,
|
| + namespace=options.namespace)
|
| + return isolated_cmd_args, inputs_ref
|
|
|
|
|
| ### Triggering.
|
|
|
|
|
| -TaskRequest = collections.namedtuple(
|
| - 'TaskRequest',
|
| +# See ../appengine/swarming/swarming_rpcs.py.
|
| +FilesRef = collections.namedtuple(
|
| + 'FilesRef',
|
| + [
|
| + 'isolated',
|
| + 'isolatedserver',
|
| + 'namespace',
|
| + ])
|
| +
|
| +
|
| +# See ../appengine/swarming/swarming_rpcs.py.
|
| +TaskProperties = collections.namedtuple(
|
| + 'TaskProperties',
|
| [
|
| 'command',
|
| - 'data',
|
| 'dimensions',
|
| 'env',
|
| - 'expiration',
|
| - 'hard_timeout',
|
| + 'execution_timeout_secs',
|
| + 'extra_args',
|
| + 'grace_period_secs',
|
| 'idempotent',
|
| - 'io_timeout',
|
| + 'inputs_ref',
|
| + 'io_timeout_secs',
|
| + ])
|
| +
|
| +
|
| +# See ../appengine/swarming/swarming_rpcs.py.
|
| +NewTaskRequest = collections.namedtuple(
|
| + 'NewTaskRequest',
|
| + [
|
| + 'expiration_secs',
|
| 'name',
|
| + 'parent_task_id',
|
| 'priority',
|
| + 'properties',
|
| 'tags',
|
| 'user',
|
| - 'verbose',
|
| ])
|
|
|
|
|
| +def namedtuple_to_dict(value):
|
| + """Recursively converts a namedtuple to a dict."""
|
| + out = dict(value._asdict())
|
| + for k, v in out.iteritems():
|
| + if hasattr(v, '_asdict'):
|
| + out[k] = namedtuple_to_dict(v)
|
| + return out
|
| +
|
| +
|
| def task_request_to_raw_request(task_request):
|
| """Returns the json dict expected by the Swarming server for new request.
|
|
|
| This is for the v1 client Swarming API.
|
| """
|
| - return {
|
| - 'name': task_request.name,
|
| - 'parent_task_id': os.environ.get('SWARMING_TASK_ID', ''),
|
| - 'priority': task_request.priority,
|
| - 'properties': {
|
| - 'commands': [task_request.command],
|
| - 'data': task_request.data,
|
| - 'dimensions': task_request.dimensions,
|
| - 'env': task_request.env,
|
| - 'execution_timeout_secs': task_request.hard_timeout,
|
| - 'io_timeout_secs': task_request.io_timeout,
|
| - 'idempotent': task_request.idempotent,
|
| - },
|
| - 'scheduling_expiration_secs': task_request.expiration,
|
| - 'tags': task_request.tags,
|
| - 'user': task_request.user,
|
| - }
|
| -
|
| -
|
| -def swarming_handshake(swarming):
|
| - """Initiates the connection to the Swarming server."""
|
| - headers = {'X-XSRF-Token-Request': '1'}
|
| - response = net.url_read_json(
|
| - swarming + '/swarming/api/v1/client/handshake',
|
| - headers=headers,
|
| - data={})
|
| - if not response:
|
| - logging.error('Failed to handshake with server')
|
| - return None
|
| - logging.info('Connected to server version: %s', response['server_version'])
|
| - return response['xsrf_token']
|
| + out = namedtuple_to_dict(task_request)
|
| + # Maps are not supported until protobuf v3.
|
| + out['properties']['dimensions'] = [
|
| + {'key': k, 'value': v}
|
| + for k, v in out['properties']['dimensions'].iteritems()
|
| + ]
|
| + out['properties']['dimensions'].sort(key=lambda x: x['key'])
|
| + out['properties']['env'] = [
|
| + {'key': k, 'value': v}
|
| + for k, v in out['properties']['env'].iteritems()
|
| + ]
|
| + out['properties']['env'].sort(key=lambda x: x['key'])
|
| + return out
|
|
|
|
|
| -def swarming_trigger(swarming, raw_request, xsrf_token):
|
| +def swarming_trigger(swarming, raw_request):
|
| """Triggers a request on the Swarming server and returns the json data.
|
|
|
| It's the low-level function.
|
| @@ -300,11 +211,8 @@ def swarming_trigger(swarming, raw_request, xsrf_token):
|
| """
|
| logging.info('Triggering: %s', raw_request['name'])
|
|
|
| - headers = {'X-XSRF-Token': xsrf_token}
|
| result = net.url_read_json(
|
| - swarming + '/swarming/api/v1/client/request',
|
| - data=raw_request,
|
| - headers=headers)
|
| + swarming + '/_ah/api/swarming/v1/tasks/new', data=raw_request)
|
| if not result:
|
| on_error.report('Failed to trigger task %s' % raw_request['name'])
|
| return None
|
| @@ -314,9 +222,11 @@ def swarming_trigger(swarming, raw_request, xsrf_token):
|
| def setup_googletest(env, shards, index):
|
| """Sets googletest specific environment variables."""
|
| if shards > 1:
|
| - env = env.copy()
|
| - env['GTEST_SHARD_INDEX'] = str(index)
|
| - env['GTEST_TOTAL_SHARDS'] = str(shards)
|
| + assert not any(i['key'] == 'GTEST_SHARD_INDEX' for i in env), env
|
| + assert not any(i['key'] == 'GTEST_TOTAL_SHARDS' for i in env), env
|
| + env = env[:]
|
| + env.append({'key': 'GTEST_SHARD_INDEX', 'value': str(index)})
|
| + env.append({'key': 'GTEST_TOTAL_SHARDS', 'value': str(shards)})
|
| return env
|
|
|
|
|
| @@ -328,21 +238,18 @@ def trigger_task_shards(swarming, task_request, shards):
|
| None in case of failure.
|
| """
|
| def convert(index):
|
| - req = task_request
|
| + req = task_request_to_raw_request(task_request)
|
| if shards > 1:
|
| - req = req._replace(
|
| - env=setup_googletest(req.env, shards, index),
|
| - name='%s:%s:%s' % (req.name, index, shards))
|
| - return task_request_to_raw_request(req)
|
| + req['properties']['env'] = setup_googletest(
|
| + req['properties']['env'], shards, index)
|
| + req['name'] += ':%s:%s' % (index, shards)
|
| + return req
|
|
|
| requests = [convert(index) for index in xrange(shards)]
|
| - xsrf_token = swarming_handshake(swarming)
|
| - if not xsrf_token:
|
| - return None
|
| tasks = {}
|
| priority_warning = False
|
| for index, request in enumerate(requests):
|
| - task = swarming_trigger(swarming, request, xsrf_token)
|
| + task = swarming_trigger(swarming, request)
|
| if not task:
|
| break
|
| logging.info('Request result: %s', task)
|
| @@ -392,11 +299,14 @@ class State(object):
|
| CANCELED = 0x60
|
| COMPLETED = 0x70
|
|
|
| - STATES = (RUNNING, PENDING, EXPIRED, TIMED_OUT, BOT_DIED, CANCELED, COMPLETED)
|
| - STATES_RUNNING = (RUNNING, PENDING)
|
| - STATES_NOT_RUNNING = (EXPIRED, TIMED_OUT, BOT_DIED, CANCELED, COMPLETED)
|
| - STATES_DONE = (TIMED_OUT, COMPLETED)
|
| - STATES_ABANDONED = (EXPIRED, BOT_DIED, CANCELED)
|
| + STATES = (
|
| + 'RUNNING', 'PENDING', 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED',
|
| + 'COMPLETED')
|
| + STATES_RUNNING = ('RUNNING', 'PENDING')
|
| + STATES_NOT_RUNNING = (
|
| + 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED', 'COMPLETED')
|
| + STATES_DONE = ('TIMED_OUT', 'COMPLETED')
|
| + STATES_ABANDONED = ('EXPIRED', 'BOT_DIED', 'CANCELED')
|
|
|
| _NAMES = {
|
| RUNNING: 'Running',
|
| @@ -408,6 +318,16 @@ class State(object):
|
| COMPLETED: 'Completed',
|
| }
|
|
|
| + _ENUMS = {
|
| + 'RUNNING': RUNNING,
|
| + 'PENDING': PENDING,
|
| + 'EXPIRED': EXPIRED,
|
| + 'TIMED_OUT': TIMED_OUT,
|
| + 'BOT_DIED': BOT_DIED,
|
| + 'CANCELED': CANCELED,
|
| + 'COMPLETED': COMPLETED,
|
| + }
|
| +
|
| @classmethod
|
| def to_string(cls, state):
|
| """Returns a user-readable string representing a State."""
|
| @@ -415,6 +335,13 @@ class State(object):
|
| raise ValueError('Invalid state %s' % state)
|
| return cls._NAMES[state]
|
|
|
| + @classmethod
|
| + def from_enum(cls, state):
|
| + """Returns int value based on the string."""
|
| + if state not in cls._ENUMS:
|
| + raise ValueError('Invalid state %s' % state)
|
| + return cls._ENUMS[state]
|
| +
|
|
|
| class TaskOutputCollector(object):
|
| """Assembles task execution summary (for --task-summary-json output).
|
| @@ -450,6 +377,8 @@ class TaskOutputCollector(object):
|
|
|
| Modifies |result| in place.
|
|
|
| + shard_index is 0-based.
|
| +
|
| Called concurrently from multiple threads.
|
| """
|
| # Sanity check index is in expected range.
|
| @@ -460,14 +389,12 @@ class TaskOutputCollector(object):
|
| shard_index, self.shard_count - 1)
|
| return
|
|
|
| - assert not 'isolated_out' in result
|
| - result['isolated_out'] = None
|
| - for output in result['outputs']:
|
| - isolated_files_location = extract_output_files_location(output)
|
| - if isolated_files_location:
|
| - if result['isolated_out']:
|
| - raise ValueError('Unexpected two task with output')
|
| - result['isolated_out'] = isolated_files_location
|
| + if result.get('outputs_ref'):
|
| + ref = result['outputs_ref']
|
| + result['outputs_ref']['view_url'] = '%s/browse?%s' % (
|
| + ref['isolatedserver'],
|
| + urllib.urlencode(
|
| + [('namespace', ref['namespace']), ('hash', ref['isolated'])]))
|
|
|
| # Store result dict of that shard, ignore results we've already seen.
|
| with self._lock:
|
| @@ -477,16 +404,16 @@ class TaskOutputCollector(object):
|
| self._per_shard_results[shard_index] = result
|
|
|
| # Fetch output files if necessary.
|
| - if self.task_output_dir and result['isolated_out']:
|
| + if self.task_output_dir and result.get('outputs_ref'):
|
| storage = self._get_storage(
|
| - result['isolated_out']['server'],
|
| - result['isolated_out']['namespace'])
|
| + result['outputs_ref']['isolatedserver'],
|
| + result['outputs_ref']['namespace'])
|
| if storage:
|
| # Output files are supposed to be small and they are not reused across
|
| # tasks. So use MemoryCache for them instead of on-disk cache. Make
|
| # files writable, so that calling script can delete them.
|
| isolateserver.fetch_isolated(
|
| - result['isolated_out']['hash'],
|
| + result['outputs_ref']['isolated'],
|
| storage,
|
| isolateserver.MemoryCache(file_mode_mask=0700),
|
| os.path.join(self.task_output_dir, str(shard_index)),
|
| @@ -533,58 +460,23 @@ class TaskOutputCollector(object):
|
| return self._storage
|
|
|
|
|
| -def extract_output_files_location(task_log):
|
| - """Task log -> location of task output files to fetch.
|
| -
|
| - TODO(vadimsh,maruel): Use side-channel to get this information.
|
| - See 'run_tha_test' in run_isolated.py for where the data is generated.
|
| -
|
| - Returns:
|
| - Tuple (isolate server URL, namespace, isolated hash) on success.
|
| - None if information is missing or can not be parsed.
|
| - """
|
| - if not task_log:
|
| - return None
|
| - match = re.search(
|
| - r'\[run_isolated_out_hack\](.*)\[/run_isolated_out_hack\]',
|
| - task_log,
|
| - re.DOTALL)
|
| - if not match:
|
| - return None
|
| -
|
| - def to_ascii(val):
|
| - if not isinstance(val, basestring):
|
| - raise ValueError()
|
| - return val.encode('ascii')
|
| -
|
| - try:
|
| - data = json.loads(match.group(1))
|
| - if not isinstance(data, dict):
|
| - raise ValueError()
|
| - isolated_hash = to_ascii(data['hash'])
|
| - namespace = to_ascii(data['namespace'])
|
| - isolate_server = to_ascii(data['storage'])
|
| - if not file_path.is_url(isolate_server):
|
| - raise ValueError()
|
| - data = {
|
| - 'hash': isolated_hash,
|
| - 'namespace': namespace,
|
| - 'server': isolate_server,
|
| - 'view_url': '%s/browse?%s' % (isolate_server, urllib.urlencode(
|
| - [('namespace', namespace), ('hash', isolated_hash)])),
|
| - }
|
| - return data
|
| - except (KeyError, ValueError):
|
| - logging.warning(
|
| - 'Unexpected value of run_isolated_out_hack: %s', match.group(1))
|
| - return None
|
| -
|
| -
|
| def now():
|
| """Exists so it can be mocked easily."""
|
| return time.time()
|
|
|
|
|
| +def parse_time(value):
|
| + """Converts serialized time from the API to datetime.datetime."""
|
| + # When microseconds are 0, the '.123456' suffix is elided. This means the
|
| + # serialized format is not consistent, which confuses the hell out of python.
|
| + for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
|
| + try:
|
| + return datetime.datetime.strptime(value, fmt)
|
| + except ValueError:
|
| + pass
|
| + raise ValueError('Failed to parse %s' % value)
|
| +
|
| +
|
| def retrieve_results(
|
| base_url, shard_index, task_id, timeout, should_stop, output_collector):
|
| """Retrieves results for a single task ID.
|
| @@ -594,9 +486,8 @@ def retrieve_results(
|
| None on failure.
|
| """
|
| assert isinstance(timeout, float), timeout
|
| - result_url = '%s/swarming/api/v1/client/task/%s' % (base_url, task_id)
|
| - output_url = '%s/swarming/api/v1/client/task/%s/output/all' % (
|
| - base_url, task_id)
|
| + result_url = '%s/_ah/api/swarming/v1/task/%s/result' % (base_url, task_id)
|
| + output_url = '%s/_ah/api/swarming/v1/task/%s/stdout' % (base_url, task_id)
|
| started = now()
|
| deadline = started + timeout if timeout else None
|
| attempt = 0
|
| @@ -629,18 +520,69 @@ def retrieve_results(
|
| result = net.url_read_json(result_url, retry_50x=False)
|
| if not result:
|
| continue
|
| +
|
| if result['state'] in State.STATES_NOT_RUNNING:
|
| + # TODO(maruel): Not always fetch stdout?
|
| out = net.url_read_json(output_url)
|
| - result['outputs'] = (out or {}).get('outputs', [])
|
| - if not result['outputs']:
|
| + result['output'] = out.get('output') if out else out
|
| + if not result['output']:
|
| logging.error('No output found for task %s', task_id)
|
| # Record the result, try to fetch attached output files (if any).
|
| if output_collector:
|
| # TODO(vadimsh): Respect |should_stop| and |deadline| when fetching.
|
| output_collector.process_shard_result(shard_index, result)
|
| + if result.get('internal_failure'):
|
| + logging.error('Internal error!')
|
| + elif result['state'] == 'BOT_DIED':
|
| + logging.error('Bot died!')
|
| return result
|
|
|
|
|
| +def convert_to_old_format(result):
|
| + """Converts the task result data from Endpoints API format to old API format
|
| + for compatibility.
|
| +
|
| + This goes into the file generated as --task-summary-json.
|
| + """
|
| + # Sets default.
|
| + result.setdefault('abandoned_ts', None)
|
| + result.setdefault('bot_id', None)
|
| + result.setdefault('bot_version', None)
|
| + result.setdefault('children_task_ids', [])
|
| + result.setdefault('completed_ts', None)
|
| + result.setdefault('cost_saved_usd', None)
|
| + result.setdefault('costs_usd', None)
|
| + result.setdefault('deduped_from', None)
|
| + result.setdefault('name', None)
|
| + result.setdefault('outputs_ref', None)
|
| + result.setdefault('properties_hash', None)
|
| + result.setdefault('server_versions', None)
|
| + result.setdefault('started_ts', None)
|
| + result.setdefault('tags', None)
|
| + result.setdefault('user', None)
|
| +
|
| + # Convertion back to old API.
|
| + duration = result.pop('duration', None)
|
| + result['durations'] = [duration] if duration else []
|
| + exit_code = result.pop('exit_code', None)
|
| + result['exit_codes'] = [int(exit_code)] if exit_code else []
|
| + result['id'] = result.pop('task_id')
|
| + result['isolated_out'] = result.get('outputs_ref', None)
|
| + output = result.pop('output', None)
|
| + result['outputs'] = [output] if output else []
|
| + # properties_hash
|
| + # server_version
|
| + # Endpoints result 'state' as string. For compatibility with old code, convert
|
| + # to int.
|
| + result['state'] = State.from_enum(result['state'])
|
| + # tags
|
| + result['try_number'] = (
|
| + int(result['try_number']) if result['try_number'] else None)
|
| + result['bot_dimensions'] = {
|
| + i['key']: i['value'] for i in result['bot_dimensions']
|
| + }
|
| +
|
| +
|
| def yield_results(
|
| swarm_base_url, task_ids, timeout, max_threads, print_status_updates,
|
| output_collector):
|
| @@ -716,27 +658,27 @@ def yield_results(
|
|
|
| def decorate_shard_output(swarming, shard_index, metadata):
|
| """Returns wrapped output for swarming task shard."""
|
| - def t(d):
|
| - return datetime.datetime.strptime(d, '%Y-%m-%d %H:%M:%S')
|
| - if metadata.get('started_ts'):
|
| + if metadata.get('started_ts') and not metadata.get('deduped_from'):
|
| pending = '%.1fs' % (
|
| - t(metadata['started_ts']) - t(metadata['created_ts'])).total_seconds()
|
| + parse_time(metadata['started_ts']) - parse_time(metadata['created_ts'])
|
| + ).total_seconds()
|
| else:
|
| pending = 'N/A'
|
|
|
| - if metadata.get('durations'):
|
| - duration = '%.1fs' % metadata['durations'][0]
|
| + if metadata.get('duration') is not None:
|
| + duration = '%.1fs' % metadata['duration']
|
| else:
|
| duration = 'N/A'
|
|
|
| - if metadata.get('exit_codes'):
|
| - exit_code = '%d' % metadata['exit_codes'][0]
|
| + if metadata.get('exit_code') is not None:
|
| + # Integers are encoded as string to not loose precision.
|
| + exit_code = '%s' % metadata['exit_code']
|
| else:
|
| exit_code = 'N/A'
|
|
|
| bot_id = metadata.get('bot_id') or 'N/A'
|
|
|
| - url = '%s/user/task/%s' % (swarming, metadata['id'])
|
| + url = '%s/user/task/%s' % (swarming, metadata['task_id'])
|
| tag_header = 'Shard %d %s' % (shard_index, url)
|
| tag_footer = (
|
| 'End of shard %d Pending: %s Duration: %s Bot: %s Exit: %s' % (
|
| @@ -749,7 +691,7 @@ def decorate_shard_output(swarming, shard_index, metadata):
|
|
|
| header = dash_pad + tag_header + dash_pad
|
| footer = dash_pad + tag_footer + dash_pad[:-1]
|
| - output = '\n'.join(o for o in metadata['outputs'] if o).rstrip() + '\n'
|
| + output = metadata.get('output', '').rstrip() + '\n'
|
| return header + output + footer
|
|
|
|
|
| @@ -771,34 +713,33 @@ def collect(
|
| seen_shards.add(index)
|
|
|
| # Default to failure if there was no process that even started.
|
| - shard_exit_code = 1
|
| - if metadata.get('exit_codes'):
|
| - shard_exit_code = metadata['exit_codes'][0]
|
| + shard_exit_code = metadata.get('exit_code')
|
| + if shard_exit_code:
|
| + shard_exit_code = int(shard_exit_code)
|
| if shard_exit_code:
|
| exit_code = shard_exit_code
|
| - if metadata.get('durations'):
|
| - total_duration += metadata['durations'][0]
|
| + total_duration += metadata.get('duration', 0)
|
|
|
| if decorate:
|
| print(decorate_shard_output(swarming, index, metadata))
|
| if len(seen_shards) < len(task_ids):
|
| print('')
|
| else:
|
| - if metadata.get('exit_codes'):
|
| - exit_code = metadata['exit_codes'][0]
|
| - else:
|
| - exit_code = 'N/A'
|
| - print('%s: %s %s' %
|
| - (metadata.get('bot_id') or 'N/A', metadata['id'], exit_code))
|
| - for output in metadata['outputs']:
|
| - if not output:
|
| - continue
|
| - output = output.rstrip()
|
| + print('%s: %s %s' % (
|
| + metadata.get('bot_id', 'N/A'),
|
| + metadata['task_id'],
|
| + shard_exit_code))
|
| + if metadata['output']:
|
| + output = metadata['output'].rstrip()
|
| if output:
|
| print(''.join(' %s\n' % l for l in output.splitlines()))
|
| finally:
|
| summary = output_collector.finalize()
|
| if task_summary_json:
|
| + # TODO(maruel): Make this optional.
|
| + for i in summary['shards']:
|
| + if i:
|
| + convert_to_old_format(i)
|
| tools.write_json(task_summary_json, summary, False)
|
|
|
| if decorate and total_duration:
|
| @@ -813,6 +754,36 @@ def collect(
|
| return exit_code
|
|
|
|
|
| +### API management.
|
| +
|
| +
|
| +class APIError(Exception):
|
| + pass
|
| +
|
| +
|
| +def endpoints_api_discovery_apis(host):
|
| + """Uses Cloud Endpoints' API Discovery Service to returns metadata about all
|
| + the APIs exposed by a host.
|
| +
|
| + https://developers.google.com/discovery/v1/reference/apis/list
|
| + """
|
| + data = net.url_read_json(host + '/_ah/api/discovery/v1/apis')
|
| + if data is None:
|
| + raise APIError('Failed to discover APIs on %s' % host)
|
| + out = {}
|
| + for api in data['items']:
|
| + if api['id'] == 'discovery:v1':
|
| + continue
|
| + # URL is of the following form:
|
| + # url = host + (
|
| + # '/_ah/api/discovery/v1/apis/%s/%s/rest' % (api['id'], api['version'])
|
| + api_data = net.url_read_json(api['discoveryRestUrl'])
|
| + if api_data is None:
|
| + raise APIError('Failed to discover %s on %s' % (api['id'], host))
|
| + out[api['id']] = api_data
|
| + return out
|
| +
|
| +
|
| ### Commands.
|
|
|
|
|
| @@ -896,7 +867,6 @@ def process_trigger_options(parser, options, args):
|
| options.dimensions = dict(options.dimensions)
|
| options.env = dict(options.env)
|
|
|
| - data = []
|
| if not options.dimensions:
|
| parser.error('Please at least specify one --dimension')
|
| if options.raw_cmd:
|
| @@ -914,27 +884,34 @@ def process_trigger_options(parser, options, args):
|
| '_'.join(
|
| '%s=%s' % (k, v)
|
| for k, v in sorted(options.dimensions.iteritems())))
|
| + inputs_ref = None
|
| else:
|
| isolateserver.process_isolate_server_options(parser, options, False)
|
| try:
|
| - command, data = isolated_handle_options(options, args)
|
| + command, inputs_ref = isolated_handle_options(options, args)
|
| except ValueError as e:
|
| parser.error(str(e))
|
|
|
| - return TaskRequest(
|
| - command=command,
|
| - data=data,
|
| + # If inputs_ref is used, command is actually extra_args. Otherwise it's an
|
| + # actual command to run.
|
| + properties = TaskProperties(
|
| + command=None if inputs_ref else command,
|
| dimensions=options.dimensions,
|
| env=options.env,
|
| - expiration=options.expiration,
|
| - hard_timeout=options.hard_timeout,
|
| + execution_timeout_secs=options.hard_timeout,
|
| + extra_args=command if inputs_ref else None,
|
| + grace_period_secs=30,
|
| idempotent=options.idempotent,
|
| - io_timeout=options.io_timeout,
|
| + inputs_ref=inputs_ref,
|
| + io_timeout_secs=options.io_timeout)
|
| + return NewTaskRequest(
|
| + expiration_secs=options.expiration,
|
| name=options.task_name,
|
| + parent_task_id=os.environ.get('SWARMING_TASK_ID', ''),
|
| priority=options.priority,
|
| + properties=properties,
|
| tags=options.tags,
|
| - user=options.user,
|
| - verbose=options.verbose)
|
| + user=options.user)
|
|
|
|
|
| def add_collect_options(parser):
|
| @@ -986,7 +963,7 @@ def CMDbot_delete(parser, args):
|
|
|
| result = 0
|
| for bot in bots:
|
| - url = '%s/swarming/api/v1/client/bot/%s' % (options.swarming, bot)
|
| + url = '%s/_ah/api/swarming/v1/bot/%s' % (options.swarming, bot)
|
| if net.url_read_json(url, method='DELETE') is None:
|
| print('Deleting %s failed' % bot)
|
| result = 1
|
| @@ -1014,7 +991,8 @@ def CMDbots(parser, args):
|
| cursor = None
|
| limit = 250
|
| # Iterate via cursors.
|
| - base_url = options.swarming + '/swarming/api/v1/client/bots?limit=%d' % limit
|
| + base_url = (
|
| + options.swarming + '/_ah/api/swarming/v1/bots/list?limit=%d' % limit)
|
| while True:
|
| url = base_url
|
| if cursor:
|
| @@ -1024,20 +1002,20 @@ def CMDbots(parser, args):
|
| print >> sys.stderr, 'Failed to access %s' % options.swarming
|
| return 1
|
| bots.extend(data['items'])
|
| - cursor = data['cursor']
|
| + cursor = data.get('cursor')
|
| if not cursor:
|
| break
|
|
|
| - for bot in natsort.natsorted(bots, key=lambda x: x['id']):
|
| + for bot in natsort.natsorted(bots, key=lambda x: x['bot_id']):
|
| if options.dead_only:
|
| - if not bot['is_dead']:
|
| + if not bot.get('is_dead'):
|
| continue
|
| - elif not options.keep_dead and bot['is_dead']:
|
| + elif not options.keep_dead and bot.get('is_dead'):
|
| continue
|
|
|
| # If the user requested to filter on dimensions, ensure the bot has all the
|
| # dimensions requested.
|
| - dimensions = bot['dimensions']
|
| + dimensions = {i['key']: i['value'] for i in bot['dimensions']}
|
| for key, value in options.dimensions:
|
| if key not in dimensions:
|
| break
|
| @@ -1051,7 +1029,7 @@ def CMDbots(parser, args):
|
| if value != dimensions[key]:
|
| break
|
| else:
|
| - print bot['id']
|
| + print bot['bot_id']
|
| if not options.bare:
|
| print ' %s' % json.dumps(dimensions, sort_keys=True)
|
| if bot.get('task_id'):
|
| @@ -1070,7 +1048,7 @@ def CMDcollect(parser, args):
|
| parser.add_option(
|
| '-j', '--json',
|
| help='Load the task ids from .json as saved by trigger --dump-json')
|
| - (options, args) = parser.parse_args(args)
|
| + options, args = parser.parse_args(args)
|
| if not args and not options.json:
|
| parser.error('Must specify at least one task id or --json.')
|
| if args and options.json:
|
| @@ -1104,17 +1082,17 @@ def CMDcollect(parser, args):
|
| return 1
|
|
|
|
|
| -@subcommand.usage('[resource name]')
|
| +@subcommand.usage('[method name]')
|
| def CMDquery(parser, args):
|
| - """Returns raw JSON information via an URL endpoint. Use 'list' to gather the
|
| - list of valid values from the server.
|
| + """Returns raw JSON information via an URL endpoint. Use 'query-list' to
|
| + gather the list of API methods from the server.
|
|
|
| Examples:
|
| - Printing the list of known URLs:
|
| - swarming.py query -S https://server-url list
|
| + Listing all bots:
|
| + swarming.py query -S https://server-url bots/list
|
|
|
| - Listing last 50 tasks on a specific bot named 'swarm1'
|
| - swarming.py query -S https://server-url --limit 50 bot/swarm1/tasks
|
| + Listing last 10 tasks on a specific bot named 'swarm1':
|
| + swarming.py query -S https://server-url --limit 10 bot/swarm1/tasks
|
| """
|
| CHUNK_SIZE = 250
|
|
|
| @@ -1124,11 +1102,15 @@ def CMDquery(parser, args):
|
| 'default=%default')
|
| parser.add_option(
|
| '--json', help='Path to JSON output file (otherwise prints to stdout)')
|
| - (options, args) = parser.parse_args(args)
|
| + parser.add_option(
|
| + '--progress', action='store_true',
|
| + help='Prints a dot at each request to show progress')
|
| + options, args = parser.parse_args(args)
|
| if len(args) != 1:
|
| - parser.error('Must specify only one resource name.')
|
| -
|
| - base_url = options.swarming + '/swarming/api/v1/client/' + args[0]
|
| + parser.error(
|
| + 'Must specify only method name and optionally query args properly '
|
| + 'escaped.')
|
| + base_url = options.swarming + '/_ah/api/swarming/v1/' + args[0]
|
| url = base_url
|
| if options.limit:
|
| # Check check, change if not working out.
|
| @@ -1136,7 +1118,8 @@ def CMDquery(parser, args):
|
| url += '%slimit=%d' % (merge_char, min(CHUNK_SIZE, options.limit))
|
| data = net.url_read_json(url)
|
| if data is None:
|
| - print >> sys.stderr, 'Failed to access %s' % options.swarming
|
| + # TODO(maruel): Do basic diagnostic.
|
| + print >> sys.stderr, 'Failed to access %s' % url
|
| return 1
|
|
|
| # Some items support cursors. Try to get automatically if cursors are needed
|
| @@ -1148,29 +1131,72 @@ def CMDquery(parser, args):
|
| url = base_url + '%scursor=%s' % (merge_char, urllib.quote(data['cursor']))
|
| if options.limit:
|
| url += '&limit=%d' % min(CHUNK_SIZE, options.limit - len(data['items']))
|
| + if options.progress:
|
| + sys.stdout.write('.')
|
| + sys.stdout.flush()
|
| new = net.url_read_json(url)
|
| if new is None:
|
| + if options.progress:
|
| + print('')
|
| print >> sys.stderr, 'Failed to access %s' % options.swarming
|
| return 1
|
| data['items'].extend(new['items'])
|
| - data['cursor'] = new['cursor']
|
| + data['cursor'] = new.get('cursor')
|
|
|
| + if options.progress:
|
| + print('')
|
| if options.limit and len(data.get('items', [])) > options.limit:
|
| data['items'] = data['items'][:options.limit]
|
| data.pop('cursor', None)
|
|
|
| if options.json:
|
| - with open(options.json, 'w') as f:
|
| - json.dump(data, f)
|
| + tools.write_json(options.json, data, True)
|
| else:
|
| try:
|
| - json.dump(data, sys.stdout, indent=2, sort_keys=True)
|
| + tools.write_json(sys.stdout, data, False)
|
| sys.stdout.write('\n')
|
| except IOError:
|
| pass
|
| return 0
|
|
|
|
|
| +def CMDquery_list(parser, args):
|
| + """Returns list of all the Swarming APIs that can be used with command
|
| + 'query'.
|
| + """
|
| + parser.add_option(
|
| + '--json', help='Path to JSON output file (otherwise prints to stdout)')
|
| + options, args = parser.parse_args(args)
|
| + if args:
|
| + parser.error('No argument allowed.')
|
| +
|
| + try:
|
| + apis = endpoints_api_discovery_apis(options.swarming)
|
| + except APIError as e:
|
| + parser.error(str(e))
|
| + if options.json:
|
| + with open(options.json, 'wb') as f:
|
| + json.dump(apis, f)
|
| + else:
|
| + help_url = (
|
| + 'https://apis-explorer.appspot.com/apis-explorer/?base=%s/_ah/api#p/' %
|
| + options.swarming)
|
| + for api_id, api in sorted(apis.iteritems()):
|
| + print api_id
|
| + print ' ' + api['description']
|
| + for resource_name, resource in sorted(api['resources'].iteritems()):
|
| + print ''
|
| + for method_name, method in sorted(resource['methods'].iteritems()):
|
| + # Only list the GET ones.
|
| + if method['httpMethod'] != 'GET':
|
| + continue
|
| + print '- %s.%s: %s' % (
|
| + resource_name, method_name, method['path'])
|
| + print ' ' + method['description']
|
| + print ' %s%s%s' % (help_url, api['servicePath'], method['id'])
|
| + return 0
|
| +
|
| +
|
| @subcommand.usage('(hash|isolated) [-- extra_args]')
|
| def CMDrun(parser, args):
|
| """Triggers a task and wait for the results.
|
| @@ -1225,7 +1251,7 @@ def CMDreproduce(parser, args):
|
| if len(args) != 1:
|
| parser.error('Must specify exactly one task id.')
|
|
|
| - url = options.swarming + '/swarming/api/v1/client/task/%s/request' % args[0]
|
| + url = options.swarming + '/_ah/api/swarming/v1/task/%s/request' % args[0]
|
| request = net.url_read_json(url)
|
| if not request:
|
| print >> sys.stderr, 'Failed to retrieve request data for the task'
|
| @@ -1234,40 +1260,21 @@ def CMDreproduce(parser, args):
|
| if not os.path.isdir('work'):
|
| os.mkdir('work')
|
|
|
| - swarming_host = urlparse.urlparse(options.swarming).netloc
|
| properties = request['properties']
|
| - for data_url, _ in properties['data']:
|
| - assert data_url.startswith('https://'), data_url
|
| - data_host = urlparse.urlparse(data_url).netloc
|
| - if data_host != swarming_host:
|
| - auth.ensure_logged_in('https://' + data_host)
|
| -
|
| - content = net.url_read(data_url)
|
| - if content is None:
|
| - print >> sys.stderr, 'Failed to download %s' % data_url
|
| - return 1
|
| - with zipfile.ZipFile(StringIO.StringIO(content)) as zip_file:
|
| - zip_file.extractall('work')
|
| -
|
| env = None
|
| if properties['env']:
|
| env = os.environ.copy()
|
| logging.info('env: %r', properties['env'])
|
| env.update(
|
| - (k.encode('utf-8'), v.encode('utf-8'))
|
| - for k, v in properties['env'].iteritems())
|
| + (i['key'].encode('utf-8'), i['value'].encode('utf-8'))
|
| + for i in properties['env'])
|
|
|
| - exit_code = 0
|
| - for cmd in properties['commands']:
|
| - try:
|
| - c = subprocess.call(cmd, env=env, cwd='work')
|
| - except OSError as e:
|
| - print >> sys.stderr, 'Failed to run: %s' % ' '.join(cmd)
|
| - print >> sys.stderr, str(e)
|
| - c = 1
|
| - if not exit_code:
|
| - exit_code = c
|
| - return exit_code
|
| + try:
|
| + return subprocess.call(properties['command'], env=env, cwd='work')
|
| + except OSError as e:
|
| + print >> sys.stderr, 'Failed to run: %s' % ' '.join(properties['command'])
|
| + print >> sys.stderr, str(e)
|
| + return 1
|
|
|
|
|
| @subcommand.usage("(hash|isolated) [-- extra_args|raw command]")
|
|
|