Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(75)

Side by Side Diff: client/swarming.py

Issue 1337633002: Reapply "Isolated task support in Endpoints API: client side (3/3)" and fixes" (Closed) Base URL: git@github.com:luci/luci-py.git@master
Patch Set: Final fixes Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/example/payload/hello_world.py ('k') | client/tests/swarming_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2013 The Swarming Authors. All rights reserved. 2 # Copyright 2013 The Swarming Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 that 3 # Use of this source code is governed under the Apache License, Version 2.0 that
4 # can be found in the LICENSE file. 4 # can be found in the LICENSE file.
5 5
6 """Client tool to trigger tasks or retrieve results from a Swarming server.""" 6 """Client tool to trigger tasks or retrieve results from a Swarming server."""
7 7
8 __version__ = '0.6.3' 8 __version__ = '0.8.2'
9 9
10 import collections 10 import collections
11 import datetime 11 import datetime
12 import json 12 import json
13 import logging 13 import logging
14 import optparse 14 import optparse
15 import os 15 import os
16 import re
17 import shutil
18 import StringIO
19 import subprocess 16 import subprocess
20 import sys 17 import sys
21 import threading 18 import threading
22 import time 19 import time
23 import urllib 20 import urllib
24 import urlparse
25 import zipfile
26 21
27 from third_party import colorama 22 from third_party import colorama
28 from third_party.depot_tools import fix_encoding 23 from third_party.depot_tools import fix_encoding
29 from third_party.depot_tools import subcommand 24 from third_party.depot_tools import subcommand
30 25
31 from utils import file_path 26 from utils import file_path
32 from utils import logging_utils 27 from utils import logging_utils
33 from third_party.chromium import natsort 28 from third_party.chromium import natsort
34 from utils import net 29 from utils import net
35 from utils import on_error 30 from utils import on_error
36 from utils import threading_utils 31 from utils import threading_utils
37 from utils import tools 32 from utils import tools
38 from utils import zip_package
39 33
40 import auth 34 import auth
41 import isolated_format 35 import isolated_format
42 import isolateserver 36 import isolateserver
43 import run_isolated
44 37
45 38
46 ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) 39 ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
47 40
48 41
49 class Failure(Exception): 42 class Failure(Exception):
50 """Generic failure.""" 43 """Generic failure."""
51 pass 44 pass
52 45
53 46
54 ### Isolated file handling. 47 ### Isolated file handling.
55 48
56 49
57 def isolated_upload_zip_bundle(isolate_server, bundle): 50 def isolated_to_hash(arg, algo):
58 """Uploads a zip package to Isolate Server and returns raw fetch URL.
59
60 Args:
61 isolate_server: URL of an Isolate Server.
62 bundle: instance of ZipPackage to upload.
63
64 Returns:
65 URL to get the file from.
66 """
67 # Swarming bot needs to be able to grab the file from the Isolate Server using
68 # a simple HTTPS GET. Use 'default' namespace so that the raw data returned to
69 # a bot is not zipped, since the swarming_bot doesn't understand compressed
70 # data. This namespace have nothing to do with |namespace| passed to
71 # run_isolated.py that is used to store files for isolated task.
72 logging.info('Zipping up and uploading files...')
73 start_time = time.time()
74 isolate_item = isolateserver.BufferItem(bundle.zip_into_buffer())
75 with isolateserver.get_storage(isolate_server, 'default') as storage:
76 uploaded = storage.upload_items([isolate_item])
77 bundle_url = storage.get_fetch_url(isolate_item)
78 elapsed = time.time() - start_time
79 if isolate_item in uploaded:
80 logging.info('Upload complete, time elapsed: %f', elapsed)
81 else:
82 logging.info('Zip file already on server, time elapsed: %f', elapsed)
83 return bundle_url
84
85
86 def isolated_get_data(isolate_server):
87 """Returns the 'data' section with all files necessary to bootstrap a task
88 execution running an isolated task.
89
90 It's mainly zipping run_isolated.zip over and over again.
91 TODO(maruel): Get rid of this with.
92 https://code.google.com/p/swarming/issues/detail?id=173
93 """
94 bundle = zip_package.ZipPackage(ROOT_DIR)
95 bundle.add_buffer(
96 'run_isolated.zip',
97 run_isolated.get_as_zip_package().zip_into_buffer(compress=False))
98 bundle_url = isolated_upload_zip_bundle(isolate_server, bundle)
99 return [(bundle_url, 'swarm_data.zip')]
100
101
102 def isolated_get_run_commands(
103 isolate_server, namespace, isolated_hash, extra_args, verbose):
104 """Returns the 'commands' to run an isolated task via run_isolated.zip.
105
106 Returns:
107 commands list to be added to the request.
108 """
109 run_cmd = [
110 'python', 'run_isolated.zip',
111 '--isolated', isolated_hash,
112 '--isolate-server', isolate_server,
113 '--namespace', namespace,
114 ]
115 if verbose:
116 run_cmd.append('--verbose')
117 # Pass all extra args for run_isolated.py, it will pass them to the command.
118 if extra_args:
119 run_cmd.append('--')
120 run_cmd.extend(extra_args)
121 return run_cmd
122
123
124 def isolated_archive(isolate_server, namespace, isolated, algo, verbose):
125 """Archives a .isolated and all the dependencies on the Isolate Server."""
126 logging.info(
127 'isolated_archive(%s, %s, %s)', isolate_server, namespace, isolated)
128 print('Archiving: %s' % isolated)
129 cmd = [
130 sys.executable,
131 os.path.join(ROOT_DIR, 'isolate.py'),
132 'archive',
133 '--isolate-server', isolate_server,
134 '--namespace', namespace,
135 '--isolated', isolated,
136 ]
137 cmd.extend(['--verbose'] * verbose)
138 logging.info(' '.join(cmd))
139 if subprocess.call(cmd, verbose):
140 return None
141 return isolated_format.hash_file(isolated, algo)
142
143
144 def isolated_to_hash(isolate_server, namespace, arg, algo, verbose):
145 """Archives a .isolated file if needed. 51 """Archives a .isolated file if needed.
146 52
147 Returns the file hash to trigger and a bool specifying if it was a file (True) 53 Returns the file hash to trigger and a bool specifying if it was a file (True)
148 or a hash (False). 54 or a hash (False).
149 """ 55 """
150 if arg.endswith('.isolated'): 56 if arg.endswith('.isolated'):
151 file_hash = isolated_archive(isolate_server, namespace, arg, algo, verbose) 57 file_hash = isolated_format.hash_file(arg, algo)
152 if not file_hash: 58 if not file_hash:
153 on_error.report('Archival failure %s' % arg) 59 on_error.report('Archival failure %s' % arg)
154 return None, True 60 return None, True
155 return file_hash, True 61 return file_hash, True
156 elif isolated_format.is_valid_hash(arg, algo): 62 elif isolated_format.is_valid_hash(arg, algo):
157 return arg, False 63 return arg, False
158 else: 64 else:
159 on_error.report('Invalid hash %s' % arg) 65 on_error.report('Invalid hash %s' % arg)
160 return None, False 66 return None, False
161 67
162 68
163 def isolated_handle_options(options, args): 69 def isolated_handle_options(options, args):
164 """Handles '--isolated <isolated>', '<isolated>' and '-- <args...>' arguments. 70 """Handles '--isolated <isolated>', '<isolated>' and '-- <args...>' arguments.
165 71
166 Returns: 72 Returns:
167 tuple(command, data). 73 tuple(command, inputs_ref).
168 """ 74 """
169 isolated_cmd_args = [] 75 isolated_cmd_args = []
170 if not options.isolated: 76 if not options.isolated:
171 if '--' in args: 77 if '--' in args:
172 index = args.index('--') 78 index = args.index('--')
173 isolated_cmd_args = args[index+1:] 79 isolated_cmd_args = args[index+1:]
174 args = args[:index] 80 args = args[:index]
175 else: 81 else:
176 # optparse eats '--' sometimes. 82 # optparse eats '--' sometimes.
177 isolated_cmd_args = args[1:] 83 isolated_cmd_args = args[1:]
178 args = args[:1] 84 args = args[:1]
179 if len(args) != 1: 85 if len(args) != 1:
180 raise ValueError( 86 raise ValueError(
181 'Use --isolated, --raw-cmd or \'--\' to pass arguments to the called ' 87 'Use --isolated, --raw-cmd or \'--\' to pass arguments to the called '
182 'process.') 88 'process.')
183 # Old code. To be removed eventually. 89 # Old code. To be removed eventually.
184 options.isolated, is_file = isolated_to_hash( 90 options.isolated, is_file = isolated_to_hash(
185 options.isolate_server, options.namespace, args[0], 91 args[0], isolated_format.get_hash_algo(options.namespace))
186 isolated_format.get_hash_algo(options.namespace), options.verbose)
187 if not options.isolated: 92 if not options.isolated:
188 raise ValueError('Invalid argument %s' % args[0]) 93 raise ValueError('Invalid argument %s' % args[0])
189 elif args: 94 elif args:
190 is_file = False 95 is_file = False
191 if '--' in args: 96 if '--' in args:
192 index = args.index('--') 97 index = args.index('--')
193 isolated_cmd_args = args[index+1:] 98 isolated_cmd_args = args[index+1:]
194 if index != 0: 99 if index != 0:
195 raise ValueError('Unexpected arguments.') 100 raise ValueError('Unexpected arguments.')
196 else: 101 else:
197 # optparse eats '--' sometimes. 102 # optparse eats '--' sometimes.
198 isolated_cmd_args = args 103 isolated_cmd_args = args
199 104
200 command = isolated_get_run_commands(
201 options.isolate_server, options.namespace, options.isolated,
202 isolated_cmd_args, options.verbose)
203
204 # If a file name was passed, use its base name of the isolated hash. 105 # If a file name was passed, use its base name of the isolated hash.
205 # Otherwise, use user name as an approximation of a task name. 106 # Otherwise, use user name as an approximation of a task name.
206 if not options.task_name: 107 if not options.task_name:
207 if is_file: 108 if is_file:
208 key = os.path.splitext(os.path.basename(args[0]))[0] 109 key = os.path.splitext(os.path.basename(args[0]))[0]
209 else: 110 else:
210 key = options.user 111 key = options.user
211 options.task_name = u'%s/%s/%s' % ( 112 options.task_name = u'%s/%s/%s' % (
212 key, 113 key,
213 '_'.join( 114 '_'.join(
214 '%s=%s' % (k, v) 115 '%s=%s' % (k, v)
215 for k, v in sorted(options.dimensions.iteritems())), 116 for k, v in sorted(options.dimensions.iteritems())),
216 options.isolated) 117 options.isolated)
217 118
218 try: 119 inputs_ref = FilesRef(
219 data = isolated_get_data(options.isolate_server) 120 isolated=options.isolated,
220 except (IOError, OSError): 121 isolatedserver=options.isolate_server,
221 on_error.report('Failed to upload the zip file') 122 namespace=options.namespace)
222 raise ValueError('Failed to upload the zip file') 123 return isolated_cmd_args, inputs_ref
223
224 return command, data
225 124
226 125
227 ### Triggering. 126 ### Triggering.
228 127
229 128
230 TaskRequest = collections.namedtuple( 129 # See ../appengine/swarming/swarming_rpcs.py.
231 'TaskRequest', 130 FilesRef = collections.namedtuple(
131 'FilesRef',
132 [
133 'isolated',
134 'isolatedserver',
135 'namespace',
136 ])
137
138
139 # See ../appengine/swarming/swarming_rpcs.py.
140 TaskProperties = collections.namedtuple(
141 'TaskProperties',
232 [ 142 [
233 'command', 143 'command',
234 'data',
235 'dimensions', 144 'dimensions',
236 'env', 145 'env',
237 'expiration', 146 'execution_timeout_secs',
238 'hard_timeout', 147 'extra_args',
148 'grace_period_secs',
239 'idempotent', 149 'idempotent',
240 'io_timeout', 150 'inputs_ref',
151 'io_timeout_secs',
152 ])
153
154
155 # See ../appengine/swarming/swarming_rpcs.py.
156 NewTaskRequest = collections.namedtuple(
157 'NewTaskRequest',
158 [
159 'expiration_secs',
241 'name', 160 'name',
161 'parent_task_id',
242 'priority', 162 'priority',
163 'properties',
243 'tags', 164 'tags',
244 'user', 165 'user',
245 'verbose',
246 ]) 166 ])
247 167
248 168
169 def namedtuple_to_dict(value):
170 """Recursively converts a namedtuple to a dict."""
171 out = dict(value._asdict())
172 for k, v in out.iteritems():
173 if hasattr(v, '_asdict'):
174 out[k] = namedtuple_to_dict(v)
175 return out
176
177
249 def task_request_to_raw_request(task_request): 178 def task_request_to_raw_request(task_request):
250 """Returns the json dict expected by the Swarming server for new request. 179 """Returns the json dict expected by the Swarming server for new request.
251 180
252 This is for the v1 client Swarming API. 181 This is for the v1 client Swarming API.
253 """ 182 """
254 return { 183 out = namedtuple_to_dict(task_request)
255 'name': task_request.name, 184 # Maps are not supported until protobuf v3.
256 'parent_task_id': os.environ.get('SWARMING_TASK_ID', ''), 185 out['properties']['dimensions'] = [
257 'priority': task_request.priority, 186 {'key': k, 'value': v}
258 'properties': { 187 for k, v in out['properties']['dimensions'].iteritems()
259 'commands': [task_request.command], 188 ]
260 'data': task_request.data, 189 out['properties']['dimensions'].sort(key=lambda x: x['key'])
261 'dimensions': task_request.dimensions, 190 out['properties']['env'] = [
262 'env': task_request.env, 191 {'key': k, 'value': v}
263 'execution_timeout_secs': task_request.hard_timeout, 192 for k, v in out['properties']['env'].iteritems()
264 'io_timeout_secs': task_request.io_timeout, 193 ]
265 'idempotent': task_request.idempotent, 194 out['properties']['env'].sort(key=lambda x: x['key'])
266 }, 195 return out
267 'scheduling_expiration_secs': task_request.expiration,
268 'tags': task_request.tags,
269 'user': task_request.user,
270 }
271 196
272 197
273 def swarming_handshake(swarming): 198 def swarming_trigger(swarming, raw_request):
274 """Initiates the connection to the Swarming server."""
275 headers = {'X-XSRF-Token-Request': '1'}
276 response = net.url_read_json(
277 swarming + '/swarming/api/v1/client/handshake',
278 headers=headers,
279 data={})
280 if not response:
281 logging.error('Failed to handshake with server')
282 return None
283 logging.info('Connected to server version: %s', response['server_version'])
284 return response['xsrf_token']
285
286
287 def swarming_trigger(swarming, raw_request, xsrf_token):
288 """Triggers a request on the Swarming server and returns the json data. 199 """Triggers a request on the Swarming server and returns the json data.
289 200
290 It's the low-level function. 201 It's the low-level function.
291 202
292 Returns: 203 Returns:
293 { 204 {
294 'request': { 205 'request': {
295 'created_ts': u'2010-01-02 03:04:05', 206 'created_ts': u'2010-01-02 03:04:05',
296 'name': .. 207 'name': ..
297 }, 208 },
298 'task_id': '12300', 209 'task_id': '12300',
299 } 210 }
300 """ 211 """
301 logging.info('Triggering: %s', raw_request['name']) 212 logging.info('Triggering: %s', raw_request['name'])
302 213
303 headers = {'X-XSRF-Token': xsrf_token}
304 result = net.url_read_json( 214 result = net.url_read_json(
305 swarming + '/swarming/api/v1/client/request', 215 swarming + '/_ah/api/swarming/v1/tasks/new', data=raw_request)
306 data=raw_request,
307 headers=headers)
308 if not result: 216 if not result:
309 on_error.report('Failed to trigger task %s' % raw_request['name']) 217 on_error.report('Failed to trigger task %s' % raw_request['name'])
310 return None 218 return None
311 return result 219 return result
312 220
313 221
314 def setup_googletest(env, shards, index): 222 def setup_googletest(env, shards, index):
315 """Sets googletest specific environment variables.""" 223 """Sets googletest specific environment variables."""
316 if shards > 1: 224 if shards > 1:
317 env = env.copy() 225 assert not any(i['key'] == 'GTEST_SHARD_INDEX' for i in env), env
318 env['GTEST_SHARD_INDEX'] = str(index) 226 assert not any(i['key'] == 'GTEST_TOTAL_SHARDS' for i in env), env
319 env['GTEST_TOTAL_SHARDS'] = str(shards) 227 env = env[:]
228 env.append({'key': 'GTEST_SHARD_INDEX', 'value': str(index)})
229 env.append({'key': 'GTEST_TOTAL_SHARDS', 'value': str(shards)})
320 return env 230 return env
321 231
322 232
323 def trigger_task_shards(swarming, task_request, shards): 233 def trigger_task_shards(swarming, task_request, shards):
324 """Triggers one or many subtasks of a sharded task. 234 """Triggers one or many subtasks of a sharded task.
325 235
326 Returns: 236 Returns:
327 Dict with task details, returned to caller as part of --dump-json output. 237 Dict with task details, returned to caller as part of --dump-json output.
328 None in case of failure. 238 None in case of failure.
329 """ 239 """
330 def convert(index): 240 def convert(index):
331 req = task_request 241 req = task_request_to_raw_request(task_request)
332 if shards > 1: 242 if shards > 1:
333 req = req._replace( 243 req['properties']['env'] = setup_googletest(
334 env=setup_googletest(req.env, shards, index), 244 req['properties']['env'], shards, index)
335 name='%s:%s:%s' % (req.name, index, shards)) 245 req['name'] += ':%s:%s' % (index, shards)
336 return task_request_to_raw_request(req) 246 return req
337 247
338 requests = [convert(index) for index in xrange(shards)] 248 requests = [convert(index) for index in xrange(shards)]
339 xsrf_token = swarming_handshake(swarming)
340 if not xsrf_token:
341 return None
342 tasks = {} 249 tasks = {}
343 priority_warning = False 250 priority_warning = False
344 for index, request in enumerate(requests): 251 for index, request in enumerate(requests):
345 task = swarming_trigger(swarming, request, xsrf_token) 252 task = swarming_trigger(swarming, request)
346 if not task: 253 if not task:
347 break 254 break
348 logging.info('Request result: %s', task) 255 logging.info('Request result: %s', task)
349 if (not priority_warning and 256 if (not priority_warning and
350 task['request']['priority'] != task_request.priority): 257 task['request']['priority'] != task_request.priority):
351 priority_warning = True 258 priority_warning = True
352 print >> sys.stderr, ( 259 print >> sys.stderr, (
353 'Priority was reset to %s' % task['request']['priority']) 260 'Priority was reset to %s' % task['request']['priority'])
354 tasks[request['name']] = { 261 tasks[request['name']] = {
355 'shard_index': index, 262 'shard_index': index,
(...skipping 29 matching lines...) Expand all
385 It's in fact an enum. Values should be in decreasing order of importance. 292 It's in fact an enum. Values should be in decreasing order of importance.
386 """ 293 """
387 RUNNING = 0x10 294 RUNNING = 0x10
388 PENDING = 0x20 295 PENDING = 0x20
389 EXPIRED = 0x30 296 EXPIRED = 0x30
390 TIMED_OUT = 0x40 297 TIMED_OUT = 0x40
391 BOT_DIED = 0x50 298 BOT_DIED = 0x50
392 CANCELED = 0x60 299 CANCELED = 0x60
393 COMPLETED = 0x70 300 COMPLETED = 0x70
394 301
395 STATES = (RUNNING, PENDING, EXPIRED, TIMED_OUT, BOT_DIED, CANCELED, COMPLETED) 302 STATES = (
396 STATES_RUNNING = (RUNNING, PENDING) 303 'RUNNING', 'PENDING', 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED',
397 STATES_NOT_RUNNING = (EXPIRED, TIMED_OUT, BOT_DIED, CANCELED, COMPLETED) 304 'COMPLETED')
398 STATES_DONE = (TIMED_OUT, COMPLETED) 305 STATES_RUNNING = ('RUNNING', 'PENDING')
399 STATES_ABANDONED = (EXPIRED, BOT_DIED, CANCELED) 306 STATES_NOT_RUNNING = (
307 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED', 'COMPLETED')
308 STATES_DONE = ('TIMED_OUT', 'COMPLETED')
309 STATES_ABANDONED = ('EXPIRED', 'BOT_DIED', 'CANCELED')
400 310
401 _NAMES = { 311 _NAMES = {
402 RUNNING: 'Running', 312 RUNNING: 'Running',
403 PENDING: 'Pending', 313 PENDING: 'Pending',
404 EXPIRED: 'Expired', 314 EXPIRED: 'Expired',
405 TIMED_OUT: 'Execution timed out', 315 TIMED_OUT: 'Execution timed out',
406 BOT_DIED: 'Bot died', 316 BOT_DIED: 'Bot died',
407 CANCELED: 'User canceled', 317 CANCELED: 'User canceled',
408 COMPLETED: 'Completed', 318 COMPLETED: 'Completed',
409 } 319 }
410 320
321 _ENUMS = {
322 'RUNNING': RUNNING,
323 'PENDING': PENDING,
324 'EXPIRED': EXPIRED,
325 'TIMED_OUT': TIMED_OUT,
326 'BOT_DIED': BOT_DIED,
327 'CANCELED': CANCELED,
328 'COMPLETED': COMPLETED,
329 }
330
411 @classmethod 331 @classmethod
412 def to_string(cls, state): 332 def to_string(cls, state):
413 """Returns a user-readable string representing a State.""" 333 """Returns a user-readable string representing a State."""
414 if state not in cls._NAMES: 334 if state not in cls._NAMES:
415 raise ValueError('Invalid state %s' % state) 335 raise ValueError('Invalid state %s' % state)
416 return cls._NAMES[state] 336 return cls._NAMES[state]
417 337
338 @classmethod
339 def from_enum(cls, state):
340 """Returns int value based on the string."""
341 if state not in cls._ENUMS:
342 raise ValueError('Invalid state %s' % state)
343 return cls._ENUMS[state]
344
418 345
419 class TaskOutputCollector(object): 346 class TaskOutputCollector(object):
420 """Assembles task execution summary (for --task-summary-json output). 347 """Assembles task execution summary (for --task-summary-json output).
421 348
422 Optionally fetches task outputs from isolate server to local disk (used when 349 Optionally fetches task outputs from isolate server to local disk (used when
423 --task-output-dir is passed). 350 --task-output-dir is passed).
424 351
425 This object is shared among multiple threads running 'retrieve_results' 352 This object is shared among multiple threads running 'retrieve_results'
426 function, in particular they call 'process_shard_result' method in parallel. 353 function, in particular they call 'process_shard_result' method in parallel.
427 """ 354 """
(...skipping 15 matching lines...) Expand all
443 self._storage = None 370 self._storage = None
444 371
445 if self.task_output_dir and not os.path.isdir(self.task_output_dir): 372 if self.task_output_dir and not os.path.isdir(self.task_output_dir):
446 os.makedirs(self.task_output_dir) 373 os.makedirs(self.task_output_dir)
447 374
448 def process_shard_result(self, shard_index, result): 375 def process_shard_result(self, shard_index, result):
449 """Stores results of a single task shard, fetches output files if necessary. 376 """Stores results of a single task shard, fetches output files if necessary.
450 377
451 Modifies |result| in place. 378 Modifies |result| in place.
452 379
380 shard_index is 0-based.
381
453 Called concurrently from multiple threads. 382 Called concurrently from multiple threads.
454 """ 383 """
455 # Sanity check index is in expected range. 384 # Sanity check index is in expected range.
456 assert isinstance(shard_index, int) 385 assert isinstance(shard_index, int)
457 if shard_index < 0 or shard_index >= self.shard_count: 386 if shard_index < 0 or shard_index >= self.shard_count:
458 logging.warning( 387 logging.warning(
459 'Shard index %d is outside of expected range: [0; %d]', 388 'Shard index %d is outside of expected range: [0; %d]',
460 shard_index, self.shard_count - 1) 389 shard_index, self.shard_count - 1)
461 return 390 return
462 391
463 assert not 'isolated_out' in result 392 if result.get('outputs_ref'):
464 result['isolated_out'] = None 393 ref = result['outputs_ref']
465 for output in result['outputs']: 394 result['outputs_ref']['view_url'] = '%s/browse?%s' % (
466 isolated_files_location = extract_output_files_location(output) 395 ref['isolatedserver'],
467 if isolated_files_location: 396 urllib.urlencode(
468 if result['isolated_out']: 397 [('namespace', ref['namespace']), ('hash', ref['isolated'])]))
469 raise ValueError('Unexpected two task with output')
470 result['isolated_out'] = isolated_files_location
471 398
472 # Store result dict of that shard, ignore results we've already seen. 399 # Store result dict of that shard, ignore results we've already seen.
473 with self._lock: 400 with self._lock:
474 if shard_index in self._per_shard_results: 401 if shard_index in self._per_shard_results:
475 logging.warning('Ignoring duplicate shard index %d', shard_index) 402 logging.warning('Ignoring duplicate shard index %d', shard_index)
476 return 403 return
477 self._per_shard_results[shard_index] = result 404 self._per_shard_results[shard_index] = result
478 405
479 # Fetch output files if necessary. 406 # Fetch output files if necessary.
480 if self.task_output_dir and result['isolated_out']: 407 if self.task_output_dir and result.get('outputs_ref'):
481 storage = self._get_storage( 408 storage = self._get_storage(
482 result['isolated_out']['server'], 409 result['outputs_ref']['isolatedserver'],
483 result['isolated_out']['namespace']) 410 result['outputs_ref']['namespace'])
484 if storage: 411 if storage:
485 # Output files are supposed to be small and they are not reused across 412 # Output files are supposed to be small and they are not reused across
486 # tasks. So use MemoryCache for them instead of on-disk cache. Make 413 # tasks. So use MemoryCache for them instead of on-disk cache. Make
487 # files writable, so that calling script can delete them. 414 # files writable, so that calling script can delete them.
488 isolateserver.fetch_isolated( 415 isolateserver.fetch_isolated(
489 result['isolated_out']['hash'], 416 result['outputs_ref']['isolated'],
490 storage, 417 storage,
491 isolateserver.MemoryCache(file_mode_mask=0700), 418 isolateserver.MemoryCache(file_mode_mask=0700),
492 os.path.join(self.task_output_dir, str(shard_index)), 419 os.path.join(self.task_output_dir, str(shard_index)),
493 False) 420 False)
494 421
495 def finalize(self): 422 def finalize(self):
496 """Assembles and returns task summary JSON, shutdowns underlying Storage.""" 423 """Assembles and returns task summary JSON, shutdowns underlying Storage."""
497 with self._lock: 424 with self._lock:
498 # Write an array of shard results with None for missing shards. 425 # Write an array of shard results with None for missing shards.
499 summary = { 426 summary = {
(...skipping 26 matching lines...) Expand all
526 self._storage.location, isolate_server) 453 self._storage.location, isolate_server)
527 return None 454 return None
528 if self._storage.namespace != namespace: 455 if self._storage.namespace != namespace:
529 logging.error( 456 logging.error(
530 'Task shards are using multiple namespaces: %s and %s', 457 'Task shards are using multiple namespaces: %s and %s',
531 self._storage.namespace, namespace) 458 self._storage.namespace, namespace)
532 return None 459 return None
533 return self._storage 460 return self._storage
534 461
535 462
536 def extract_output_files_location(task_log):
537 """Task log -> location of task output files to fetch.
538
539 TODO(vadimsh,maruel): Use side-channel to get this information.
540 See 'run_tha_test' in run_isolated.py for where the data is generated.
541
542 Returns:
543 Tuple (isolate server URL, namespace, isolated hash) on success.
544 None if information is missing or can not be parsed.
545 """
546 if not task_log:
547 return None
548 match = re.search(
549 r'\[run_isolated_out_hack\](.*)\[/run_isolated_out_hack\]',
550 task_log,
551 re.DOTALL)
552 if not match:
553 return None
554
555 def to_ascii(val):
556 if not isinstance(val, basestring):
557 raise ValueError()
558 return val.encode('ascii')
559
560 try:
561 data = json.loads(match.group(1))
562 if not isinstance(data, dict):
563 raise ValueError()
564 isolated_hash = to_ascii(data['hash'])
565 namespace = to_ascii(data['namespace'])
566 isolate_server = to_ascii(data['storage'])
567 if not file_path.is_url(isolate_server):
568 raise ValueError()
569 data = {
570 'hash': isolated_hash,
571 'namespace': namespace,
572 'server': isolate_server,
573 'view_url': '%s/browse?%s' % (isolate_server, urllib.urlencode(
574 [('namespace', namespace), ('hash', isolated_hash)])),
575 }
576 return data
577 except (KeyError, ValueError):
578 logging.warning(
579 'Unexpected value of run_isolated_out_hack: %s', match.group(1))
580 return None
581
582
583 def now(): 463 def now():
584 """Exists so it can be mocked easily.""" 464 """Exists so it can be mocked easily."""
585 return time.time() 465 return time.time()
586 466
587 467
468 def parse_time(value):
469 """Converts serialized time from the API to datetime.datetime."""
470 # When microseconds are 0, the '.123456' suffix is elided. This means the
471 # serialized format is not consistent, which confuses the hell out of python.
472 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
473 try:
474 return datetime.datetime.strptime(value, fmt)
475 except ValueError:
476 pass
477 raise ValueError('Failed to parse %s' % value)
478
479
588 def retrieve_results( 480 def retrieve_results(
589 base_url, shard_index, task_id, timeout, should_stop, output_collector): 481 base_url, shard_index, task_id, timeout, should_stop, output_collector):
590 """Retrieves results for a single task ID. 482 """Retrieves results for a single task ID.
591 483
592 Returns: 484 Returns:
593 <result dict> on success. 485 <result dict> on success.
594 None on failure. 486 None on failure.
595 """ 487 """
596 assert isinstance(timeout, float), timeout 488 assert isinstance(timeout, float), timeout
597 result_url = '%s/swarming/api/v1/client/task/%s' % (base_url, task_id) 489 result_url = '%s/_ah/api/swarming/v1/task/%s/result' % (base_url, task_id)
598 output_url = '%s/swarming/api/v1/client/task/%s/output/all' % ( 490 output_url = '%s/_ah/api/swarming/v1/task/%s/stdout' % (base_url, task_id)
599 base_url, task_id)
600 started = now() 491 started = now()
601 deadline = started + timeout if timeout else None 492 deadline = started + timeout if timeout else None
602 attempt = 0 493 attempt = 0
603 494
604 while not should_stop.is_set(): 495 while not should_stop.is_set():
605 attempt += 1 496 attempt += 1
606 497
607 # Waiting for too long -> give up. 498 # Waiting for too long -> give up.
608 current_time = now() 499 current_time = now()
609 if deadline and current_time >= deadline: 500 if deadline and current_time >= deadline:
(...skipping 12 matching lines...) Expand all
622 should_stop.wait(delay) 513 should_stop.wait(delay)
623 if should_stop.is_set(): 514 if should_stop.is_set():
624 return None 515 return None
625 516
626 # Disable internal retries in net.url_read_json, since we are doing retries 517 # Disable internal retries in net.url_read_json, since we are doing retries
627 # ourselves. 518 # ourselves.
628 # TODO(maruel): We'd need to know if it's a 404 and not retry at all. 519 # TODO(maruel): We'd need to know if it's a 404 and not retry at all.
629 result = net.url_read_json(result_url, retry_50x=False) 520 result = net.url_read_json(result_url, retry_50x=False)
630 if not result: 521 if not result:
631 continue 522 continue
523
632 if result['state'] in State.STATES_NOT_RUNNING: 524 if result['state'] in State.STATES_NOT_RUNNING:
525 # TODO(maruel): Not always fetch stdout?
633 out = net.url_read_json(output_url) 526 out = net.url_read_json(output_url)
634 result['outputs'] = (out or {}).get('outputs', []) 527 result['output'] = out.get('output') if out else out
635 if not result['outputs']: 528 if not result['output']:
636 logging.error('No output found for task %s', task_id) 529 logging.error('No output found for task %s', task_id)
637 # Record the result, try to fetch attached output files (if any). 530 # Record the result, try to fetch attached output files (if any).
638 if output_collector: 531 if output_collector:
639 # TODO(vadimsh): Respect |should_stop| and |deadline| when fetching. 532 # TODO(vadimsh): Respect |should_stop| and |deadline| when fetching.
640 output_collector.process_shard_result(shard_index, result) 533 output_collector.process_shard_result(shard_index, result)
534 if result.get('internal_failure'):
535 logging.error('Internal error!')
536 elif result['state'] == 'BOT_DIED':
537 logging.error('Bot died!')
641 return result 538 return result
642 539
643 540
541 def convert_to_old_format(result):
542 """Converts the task result data from Endpoints API format to old API format
543 for compatibility.
544
545 This goes into the file generated as --task-summary-json.
546 """
547 # Sets default.
548 result.setdefault('abandoned_ts', None)
549 result.setdefault('bot_id', None)
550 result.setdefault('bot_version', None)
551 result.setdefault('children_task_ids', [])
552 result.setdefault('completed_ts', None)
553 result.setdefault('cost_saved_usd', None)
554 result.setdefault('costs_usd', None)
555 result.setdefault('deduped_from', None)
556 result.setdefault('name', None)
557 result.setdefault('outputs_ref', None)
558 result.setdefault('properties_hash', None)
559 result.setdefault('server_versions', None)
560 result.setdefault('started_ts', None)
561 result.setdefault('tags', None)
562 result.setdefault('user', None)
563
564 # Convertion back to old API.
565 duration = result.pop('duration', None)
566 result['durations'] = [duration] if duration else []
567 exit_code = result.pop('exit_code', None)
568 result['exit_codes'] = [int(exit_code)] if exit_code else []
569 result['id'] = result.pop('task_id')
570 result['isolated_out'] = result.get('outputs_ref', None)
571 output = result.pop('output', None)
572 result['outputs'] = [output] if output else []
573 # properties_hash
574 # server_version
575 # Endpoints result 'state' as string. For compatibility with old code, convert
576 # to int.
577 result['state'] = State.from_enum(result['state'])
578 # tags
579 result['try_number'] = (
580 int(result['try_number']) if result['try_number'] else None)
581 result['bot_dimensions'] = {
582 i['key']: i['value'] for i in result['bot_dimensions']
583 }
584
585
644 def yield_results( 586 def yield_results(
645 swarm_base_url, task_ids, timeout, max_threads, print_status_updates, 587 swarm_base_url, task_ids, timeout, max_threads, print_status_updates,
646 output_collector): 588 output_collector):
647 """Yields swarming task results from the swarming server as (index, result). 589 """Yields swarming task results from the swarming server as (index, result).
648 590
649 Duplicate shards are ignored. Shards are yielded in order of completion. 591 Duplicate shards are ignored. Shards are yielded in order of completion.
650 Timed out shards are NOT yielded at all. Caller can compare number of yielded 592 Timed out shards are NOT yielded at all. Caller can compare number of yielded
651 shards with len(task_keys) to verify all shards completed. 593 shards with len(task_keys) to verify all shards completed.
652 594
653 max_threads is optional and is used to limit the number of parallel fetches 595 max_threads is optional and is used to limit the number of parallel fetches
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
709 shards_remaining.remove(shard_index) 651 shards_remaining.remove(shard_index)
710 yield shard_index, result 652 yield shard_index, result
711 653
712 finally: 654 finally:
713 # Done or aborted with Ctrl+C, kill the remaining threads. 655 # Done or aborted with Ctrl+C, kill the remaining threads.
714 should_stop.set() 656 should_stop.set()
715 657
716 658
717 def decorate_shard_output(swarming, shard_index, metadata): 659 def decorate_shard_output(swarming, shard_index, metadata):
718 """Returns wrapped output for swarming task shard.""" 660 """Returns wrapped output for swarming task shard."""
719 def t(d): 661 if metadata.get('started_ts') and not metadata.get('deduped_from'):
720 return datetime.datetime.strptime(d, '%Y-%m-%d %H:%M:%S')
721 if metadata.get('started_ts'):
722 pending = '%.1fs' % ( 662 pending = '%.1fs' % (
723 t(metadata['started_ts']) - t(metadata['created_ts'])).total_seconds() 663 parse_time(metadata['started_ts']) - parse_time(metadata['created_ts'])
664 ).total_seconds()
724 else: 665 else:
725 pending = 'N/A' 666 pending = 'N/A'
726 667
727 if metadata.get('durations'): 668 if metadata.get('duration') is not None:
728 duration = '%.1fs' % metadata['durations'][0] 669 duration = '%.1fs' % metadata['duration']
729 else: 670 else:
730 duration = 'N/A' 671 duration = 'N/A'
731 672
732 if metadata.get('exit_codes'): 673 if metadata.get('exit_code') is not None:
733 exit_code = '%d' % metadata['exit_codes'][0] 674 # Integers are encoded as string to not loose precision.
675 exit_code = '%s' % metadata['exit_code']
734 else: 676 else:
735 exit_code = 'N/A' 677 exit_code = 'N/A'
736 678
737 bot_id = metadata.get('bot_id') or 'N/A' 679 bot_id = metadata.get('bot_id') or 'N/A'
738 680
739 url = '%s/user/task/%s' % (swarming, metadata['id']) 681 url = '%s/user/task/%s' % (swarming, metadata['task_id'])
740 tag_header = 'Shard %d %s' % (shard_index, url) 682 tag_header = 'Shard %d %s' % (shard_index, url)
741 tag_footer = ( 683 tag_footer = (
742 'End of shard %d Pending: %s Duration: %s Bot: %s Exit: %s' % ( 684 'End of shard %d Pending: %s Duration: %s Bot: %s Exit: %s' % (
743 shard_index, pending, duration, bot_id, exit_code)) 685 shard_index, pending, duration, bot_id, exit_code))
744 686
745 tag_len = max(len(tag_header), len(tag_footer)) 687 tag_len = max(len(tag_header), len(tag_footer))
746 dash_pad = '+-%s-+\n' % ('-' * tag_len) 688 dash_pad = '+-%s-+\n' % ('-' * tag_len)
747 tag_header = '| %s |\n' % tag_header.ljust(tag_len) 689 tag_header = '| %s |\n' % tag_header.ljust(tag_len)
748 tag_footer = '| %s |\n' % tag_footer.ljust(tag_len) 690 tag_footer = '| %s |\n' % tag_footer.ljust(tag_len)
749 691
750 header = dash_pad + tag_header + dash_pad 692 header = dash_pad + tag_header + dash_pad
751 footer = dash_pad + tag_footer + dash_pad[:-1] 693 footer = dash_pad + tag_footer + dash_pad[:-1]
752 output = '\n'.join(o for o in metadata['outputs'] if o).rstrip() + '\n' 694 output = metadata.get('output', '').rstrip() + '\n'
753 return header + output + footer 695 return header + output + footer
754 696
755 697
756 def collect( 698 def collect(
757 swarming, task_name, task_ids, timeout, decorate, print_status_updates, 699 swarming, task_name, task_ids, timeout, decorate, print_status_updates,
758 task_summary_json, task_output_dir): 700 task_summary_json, task_output_dir):
759 """Retrieves results of a Swarming task.""" 701 """Retrieves results of a Swarming task."""
760 # Collect summary JSON and output files (if task_output_dir is not None). 702 # Collect summary JSON and output files (if task_output_dir is not None).
761 output_collector = TaskOutputCollector( 703 output_collector = TaskOutputCollector(
762 task_output_dir, task_name, len(task_ids)) 704 task_output_dir, task_name, len(task_ids))
763 705
764 seen_shards = set() 706 seen_shards = set()
765 exit_code = 0 707 exit_code = 0
766 total_duration = 0 708 total_duration = 0
767 try: 709 try:
768 for index, metadata in yield_results( 710 for index, metadata in yield_results(
769 swarming, task_ids, timeout, None, print_status_updates, 711 swarming, task_ids, timeout, None, print_status_updates,
770 output_collector): 712 output_collector):
771 seen_shards.add(index) 713 seen_shards.add(index)
772 714
773 # Default to failure if there was no process that even started. 715 # Default to failure if there was no process that even started.
774 shard_exit_code = 1 716 shard_exit_code = metadata.get('exit_code')
775 if metadata.get('exit_codes'): 717 if shard_exit_code:
776 shard_exit_code = metadata['exit_codes'][0] 718 shard_exit_code = int(shard_exit_code)
777 if shard_exit_code: 719 if shard_exit_code:
778 exit_code = shard_exit_code 720 exit_code = shard_exit_code
779 if metadata.get('durations'): 721 total_duration += metadata.get('duration', 0)
780 total_duration += metadata['durations'][0]
781 722
782 if decorate: 723 if decorate:
783 print(decorate_shard_output(swarming, index, metadata)) 724 print(decorate_shard_output(swarming, index, metadata))
784 if len(seen_shards) < len(task_ids): 725 if len(seen_shards) < len(task_ids):
785 print('') 726 print('')
786 else: 727 else:
787 if metadata.get('exit_codes'): 728 print('%s: %s %s' % (
788 exit_code = metadata['exit_codes'][0] 729 metadata.get('bot_id', 'N/A'),
789 else: 730 metadata['task_id'],
790 exit_code = 'N/A' 731 shard_exit_code))
791 print('%s: %s %s' % 732 if metadata['output']:
792 (metadata.get('bot_id') or 'N/A', metadata['id'], exit_code)) 733 output = metadata['output'].rstrip()
793 for output in metadata['outputs']:
794 if not output:
795 continue
796 output = output.rstrip()
797 if output: 734 if output:
798 print(''.join(' %s\n' % l for l in output.splitlines())) 735 print(''.join(' %s\n' % l for l in output.splitlines()))
799 finally: 736 finally:
800 summary = output_collector.finalize() 737 summary = output_collector.finalize()
801 if task_summary_json: 738 if task_summary_json:
739 # TODO(maruel): Make this optional.
740 for i in summary['shards']:
741 if i:
742 convert_to_old_format(i)
802 tools.write_json(task_summary_json, summary, False) 743 tools.write_json(task_summary_json, summary, False)
803 744
804 if decorate and total_duration: 745 if decorate and total_duration:
805 print('Total duration: %.1fs' % total_duration) 746 print('Total duration: %.1fs' % total_duration)
806 747
807 if len(seen_shards) != len(task_ids): 748 if len(seen_shards) != len(task_ids):
808 missing_shards = [x for x in range(len(task_ids)) if x not in seen_shards] 749 missing_shards = [x for x in range(len(task_ids)) if x not in seen_shards]
809 print >> sys.stderr, ('Results from some shards are missing: %s' % 750 print >> sys.stderr, ('Results from some shards are missing: %s' %
810 ', '.join(map(str, missing_shards))) 751 ', '.join(map(str, missing_shards)))
811 return 1 752 return 1
812 753
813 return exit_code 754 return exit_code
814 755
815 756
757 ### API management.
758
759
760 class APIError(Exception):
761 pass
762
763
764 def endpoints_api_discovery_apis(host):
765 """Uses Cloud Endpoints' API Discovery Service to returns metadata about all
766 the APIs exposed by a host.
767
768 https://developers.google.com/discovery/v1/reference/apis/list
769 """
770 data = net.url_read_json(host + '/_ah/api/discovery/v1/apis')
771 if data is None:
772 raise APIError('Failed to discover APIs on %s' % host)
773 out = {}
774 for api in data['items']:
775 if api['id'] == 'discovery:v1':
776 continue
777 # URL is of the following form:
778 # url = host + (
779 # '/_ah/api/discovery/v1/apis/%s/%s/rest' % (api['id'], api['version'])
780 api_data = net.url_read_json(api['discoveryRestUrl'])
781 if api_data is None:
782 raise APIError('Failed to discover %s on %s' % (api['id'], host))
783 out[api['id']] = api_data
784 return out
785
786
816 ### Commands. 787 ### Commands.
817 788
818 789
819 def abort_task(_swarming, _manifest): 790 def abort_task(_swarming, _manifest):
820 """Given a task manifest that was triggered, aborts its execution.""" 791 """Given a task manifest that was triggered, aborts its execution."""
821 # TODO(vadimsh): No supported by the server yet. 792 # TODO(vadimsh): No supported by the server yet.
822 793
823 794
824 def add_filter_options(parser): 795 def add_filter_options(parser):
825 parser.filter_group = optparse.OptionGroup(parser, 'Filtering slaves') 796 parser.filter_group = optparse.OptionGroup(parser, 'Filtering slaves')
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
889 'In this case, no .isolated file is expected.') 860 'In this case, no .isolated file is expected.')
890 parser.add_option_group(parser.task_group) 861 parser.add_option_group(parser.task_group)
891 862
892 863
893 def process_trigger_options(parser, options, args): 864 def process_trigger_options(parser, options, args):
894 """Processes trigger options and uploads files to isolate server if necessary. 865 """Processes trigger options and uploads files to isolate server if necessary.
895 """ 866 """
896 options.dimensions = dict(options.dimensions) 867 options.dimensions = dict(options.dimensions)
897 options.env = dict(options.env) 868 options.env = dict(options.env)
898 869
899 data = []
900 if not options.dimensions: 870 if not options.dimensions:
901 parser.error('Please at least specify one --dimension') 871 parser.error('Please at least specify one --dimension')
902 if options.raw_cmd: 872 if options.raw_cmd:
903 if not args: 873 if not args:
904 parser.error( 874 parser.error(
905 'Arguments with --raw-cmd should be passed after -- as command ' 875 'Arguments with --raw-cmd should be passed after -- as command '
906 'delimiter.') 876 'delimiter.')
907 if options.isolate_server: 877 if options.isolate_server:
908 parser.error('Can\'t use both --raw-cmd and --isolate-server.') 878 parser.error('Can\'t use both --raw-cmd and --isolate-server.')
909 879
910 command = args 880 command = args
911 if not options.task_name: 881 if not options.task_name:
912 options.task_name = u'%s/%s' % ( 882 options.task_name = u'%s/%s' % (
913 options.user, 883 options.user,
914 '_'.join( 884 '_'.join(
915 '%s=%s' % (k, v) 885 '%s=%s' % (k, v)
916 for k, v in sorted(options.dimensions.iteritems()))) 886 for k, v in sorted(options.dimensions.iteritems())))
887 inputs_ref = None
917 else: 888 else:
918 isolateserver.process_isolate_server_options(parser, options, False) 889 isolateserver.process_isolate_server_options(parser, options, False)
919 try: 890 try:
920 command, data = isolated_handle_options(options, args) 891 command, inputs_ref = isolated_handle_options(options, args)
921 except ValueError as e: 892 except ValueError as e:
922 parser.error(str(e)) 893 parser.error(str(e))
923 894
924 return TaskRequest( 895 # If inputs_ref is used, command is actually extra_args. Otherwise it's an
925 command=command, 896 # actual command to run.
926 data=data, 897 properties = TaskProperties(
898 command=None if inputs_ref else command,
927 dimensions=options.dimensions, 899 dimensions=options.dimensions,
928 env=options.env, 900 env=options.env,
929 expiration=options.expiration, 901 execution_timeout_secs=options.hard_timeout,
930 hard_timeout=options.hard_timeout, 902 extra_args=command if inputs_ref else None,
903 grace_period_secs=30,
931 idempotent=options.idempotent, 904 idempotent=options.idempotent,
932 io_timeout=options.io_timeout, 905 inputs_ref=inputs_ref,
906 io_timeout_secs=options.io_timeout)
907 return NewTaskRequest(
908 expiration_secs=options.expiration,
933 name=options.task_name, 909 name=options.task_name,
910 parent_task_id=os.environ.get('SWARMING_TASK_ID', ''),
934 priority=options.priority, 911 priority=options.priority,
912 properties=properties,
935 tags=options.tags, 913 tags=options.tags,
936 user=options.user, 914 user=options.user)
937 verbose=options.verbose)
938 915
939 916
940 def add_collect_options(parser): 917 def add_collect_options(parser):
941 parser.server_group.add_option( 918 parser.server_group.add_option(
942 '-t', '--timeout', 919 '-t', '--timeout',
943 type='float', 920 type='float',
944 default=80*60., 921 default=80*60.,
945 help='Timeout to wait for result, set to 0 for no timeout; default: ' 922 help='Timeout to wait for result, set to 0 for no timeout; default: '
946 '%default s') 923 '%default s')
947 parser.group_logging.add_option( 924 parser.group_logging.add_option(
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
979 if not options.force: 956 if not options.force:
980 print('Delete the following bots?') 957 print('Delete the following bots?')
981 for bot in bots: 958 for bot in bots:
982 print(' %s' % bot) 959 print(' %s' % bot)
983 if raw_input('Continue? [y/N] ') not in ('y', 'Y'): 960 if raw_input('Continue? [y/N] ') not in ('y', 'Y'):
984 print('Goodbye.') 961 print('Goodbye.')
985 return 1 962 return 1
986 963
987 result = 0 964 result = 0
988 for bot in bots: 965 for bot in bots:
989 url = '%s/swarming/api/v1/client/bot/%s' % (options.swarming, bot) 966 url = '%s/_ah/api/swarming/v1/bot/%s' % (options.swarming, bot)
990 if net.url_read_json(url, method='DELETE') is None: 967 if net.url_read_json(url, method='DELETE') is None:
991 print('Deleting %s failed' % bot) 968 print('Deleting %s failed' % bot)
992 result = 1 969 result = 1
993 return result 970 return result
994 971
995 972
996 def CMDbots(parser, args): 973 def CMDbots(parser, args):
997 """Returns information about the bots connected to the Swarming server.""" 974 """Returns information about the bots connected to the Swarming server."""
998 add_filter_options(parser) 975 add_filter_options(parser)
999 parser.filter_group.add_option( 976 parser.filter_group.add_option(
1000 '--dead-only', action='store_true', 977 '--dead-only', action='store_true',
1001 help='Only print dead bots, useful to reap them and reimage broken bots') 978 help='Only print dead bots, useful to reap them and reimage broken bots')
1002 parser.filter_group.add_option( 979 parser.filter_group.add_option(
1003 '-k', '--keep-dead', action='store_true', 980 '-k', '--keep-dead', action='store_true',
1004 help='Do not filter out dead bots') 981 help='Do not filter out dead bots')
1005 parser.filter_group.add_option( 982 parser.filter_group.add_option(
1006 '-b', '--bare', action='store_true', 983 '-b', '--bare', action='store_true',
1007 help='Do not print out dimensions') 984 help='Do not print out dimensions')
1008 options, args = parser.parse_args(args) 985 options, args = parser.parse_args(args)
1009 986
1010 if options.keep_dead and options.dead_only: 987 if options.keep_dead and options.dead_only:
1011 parser.error('Use only one of --keep-dead and --dead-only') 988 parser.error('Use only one of --keep-dead and --dead-only')
1012 989
1013 bots = [] 990 bots = []
1014 cursor = None 991 cursor = None
1015 limit = 250 992 limit = 250
1016 # Iterate via cursors. 993 # Iterate via cursors.
1017 base_url = options.swarming + '/swarming/api/v1/client/bots?limit=%d' % limit 994 base_url = (
995 options.swarming + '/_ah/api/swarming/v1/bots/list?limit=%d' % limit)
1018 while True: 996 while True:
1019 url = base_url 997 url = base_url
1020 if cursor: 998 if cursor:
1021 url += '&cursor=%s' % urllib.quote(cursor) 999 url += '&cursor=%s' % urllib.quote(cursor)
1022 data = net.url_read_json(url) 1000 data = net.url_read_json(url)
1023 if data is None: 1001 if data is None:
1024 print >> sys.stderr, 'Failed to access %s' % options.swarming 1002 print >> sys.stderr, 'Failed to access %s' % options.swarming
1025 return 1 1003 return 1
1026 bots.extend(data['items']) 1004 bots.extend(data['items'])
1027 cursor = data['cursor'] 1005 cursor = data.get('cursor')
1028 if not cursor: 1006 if not cursor:
1029 break 1007 break
1030 1008
1031 for bot in natsort.natsorted(bots, key=lambda x: x['id']): 1009 for bot in natsort.natsorted(bots, key=lambda x: x['bot_id']):
1032 if options.dead_only: 1010 if options.dead_only:
1033 if not bot['is_dead']: 1011 if not bot.get('is_dead'):
1034 continue 1012 continue
1035 elif not options.keep_dead and bot['is_dead']: 1013 elif not options.keep_dead and bot.get('is_dead'):
1036 continue 1014 continue
1037 1015
1038 # If the user requested to filter on dimensions, ensure the bot has all the 1016 # If the user requested to filter on dimensions, ensure the bot has all the
1039 # dimensions requested. 1017 # dimensions requested.
1040 dimensions = bot['dimensions'] 1018 dimensions = {i['key']: i['value'] for i in bot['dimensions']}
1041 for key, value in options.dimensions: 1019 for key, value in options.dimensions:
1042 if key not in dimensions: 1020 if key not in dimensions:
1043 break 1021 break
1044 # A bot can have multiple value for a key, for example, 1022 # A bot can have multiple value for a key, for example,
1045 # {'os': ['Windows', 'Windows-6.1']}, so that --dimension os=Windows will 1023 # {'os': ['Windows', 'Windows-6.1']}, so that --dimension os=Windows will
1046 # be accepted. 1024 # be accepted.
1047 if isinstance(dimensions[key], list): 1025 if isinstance(dimensions[key], list):
1048 if value not in dimensions[key]: 1026 if value not in dimensions[key]:
1049 break 1027 break
1050 else: 1028 else:
1051 if value != dimensions[key]: 1029 if value != dimensions[key]:
1052 break 1030 break
1053 else: 1031 else:
1054 print bot['id'] 1032 print bot['bot_id']
1055 if not options.bare: 1033 if not options.bare:
1056 print ' %s' % json.dumps(dimensions, sort_keys=True) 1034 print ' %s' % json.dumps(dimensions, sort_keys=True)
1057 if bot.get('task_id'): 1035 if bot.get('task_id'):
1058 print ' task: %s' % bot['task_id'] 1036 print ' task: %s' % bot['task_id']
1059 return 0 1037 return 0
1060 1038
1061 1039
1062 @subcommand.usage('--json file | task_id...') 1040 @subcommand.usage('--json file | task_id...')
1063 def CMDcollect(parser, args): 1041 def CMDcollect(parser, args):
1064 """Retrieves results of one or multiple Swarming task by its ID. 1042 """Retrieves results of one or multiple Swarming task by its ID.
1065 1043
1066 The result can be in multiple part if the execution was sharded. It can 1044 The result can be in multiple part if the execution was sharded. It can
1067 potentially have retries. 1045 potentially have retries.
1068 """ 1046 """
1069 add_collect_options(parser) 1047 add_collect_options(parser)
1070 parser.add_option( 1048 parser.add_option(
1071 '-j', '--json', 1049 '-j', '--json',
1072 help='Load the task ids from .json as saved by trigger --dump-json') 1050 help='Load the task ids from .json as saved by trigger --dump-json')
1073 (options, args) = parser.parse_args(args) 1051 options, args = parser.parse_args(args)
1074 if not args and not options.json: 1052 if not args and not options.json:
1075 parser.error('Must specify at least one task id or --json.') 1053 parser.error('Must specify at least one task id or --json.')
1076 if args and options.json: 1054 if args and options.json:
1077 parser.error('Only use one of task id or --json.') 1055 parser.error('Only use one of task id or --json.')
1078 1056
1079 if options.json: 1057 if options.json:
1080 try: 1058 try:
1081 with open(options.json) as f: 1059 with open(options.json) as f:
1082 tasks = sorted( 1060 tasks = sorted(
1083 json.load(f)['tasks'].itervalues(), key=lambda x: x['shard_index']) 1061 json.load(f)['tasks'].itervalues(), key=lambda x: x['shard_index'])
(...skipping 13 matching lines...) Expand all
1097 options.timeout, 1075 options.timeout,
1098 options.decorate, 1076 options.decorate,
1099 options.print_status_updates, 1077 options.print_status_updates,
1100 options.task_summary_json, 1078 options.task_summary_json,
1101 options.task_output_dir) 1079 options.task_output_dir)
1102 except Failure: 1080 except Failure:
1103 on_error.report(None) 1081 on_error.report(None)
1104 return 1 1082 return 1
1105 1083
1106 1084
1107 @subcommand.usage('[resource name]') 1085 @subcommand.usage('[method name]')
1108 def CMDquery(parser, args): 1086 def CMDquery(parser, args):
1109 """Returns raw JSON information via an URL endpoint. Use 'list' to gather the 1087 """Returns raw JSON information via an URL endpoint. Use 'query-list' to
1110 list of valid values from the server. 1088 gather the list of API methods from the server.
1111 1089
1112 Examples: 1090 Examples:
1113 Printing the list of known URLs: 1091 Listing all bots:
1114 swarming.py query -S https://server-url list 1092 swarming.py query -S https://server-url bots/list
1115 1093
1116 Listing last 50 tasks on a specific bot named 'swarm1' 1094 Listing last 10 tasks on a specific bot named 'swarm1':
1117 swarming.py query -S https://server-url --limit 50 bot/swarm1/tasks 1095 swarming.py query -S https://server-url --limit 10 bot/swarm1/tasks
1118 """ 1096 """
1119 CHUNK_SIZE = 250 1097 CHUNK_SIZE = 250
1120 1098
1121 parser.add_option( 1099 parser.add_option(
1122 '-L', '--limit', type='int', default=200, 1100 '-L', '--limit', type='int', default=200,
1123 help='Limit to enforce on limitless items (like number of tasks); ' 1101 help='Limit to enforce on limitless items (like number of tasks); '
1124 'default=%default') 1102 'default=%default')
1125 parser.add_option( 1103 parser.add_option(
1126 '--json', help='Path to JSON output file (otherwise prints to stdout)') 1104 '--json', help='Path to JSON output file (otherwise prints to stdout)')
1127 (options, args) = parser.parse_args(args) 1105 parser.add_option(
1106 '--progress', action='store_true',
1107 help='Prints a dot at each request to show progress')
1108 options, args = parser.parse_args(args)
1128 if len(args) != 1: 1109 if len(args) != 1:
1129 parser.error('Must specify only one resource name.') 1110 parser.error(
1130 1111 'Must specify only method name and optionally query args properly '
1131 base_url = options.swarming + '/swarming/api/v1/client/' + args[0] 1112 'escaped.')
1113 base_url = options.swarming + '/_ah/api/swarming/v1/' + args[0]
1132 url = base_url 1114 url = base_url
1133 if options.limit: 1115 if options.limit:
1134 # Check check, change if not working out. 1116 # Check check, change if not working out.
1135 merge_char = '&' if '?' in url else '?' 1117 merge_char = '&' if '?' in url else '?'
1136 url += '%slimit=%d' % (merge_char, min(CHUNK_SIZE, options.limit)) 1118 url += '%slimit=%d' % (merge_char, min(CHUNK_SIZE, options.limit))
1137 data = net.url_read_json(url) 1119 data = net.url_read_json(url)
1138 if data is None: 1120 if data is None:
1139 print >> sys.stderr, 'Failed to access %s' % options.swarming 1121 # TODO(maruel): Do basic diagnostic.
1122 print >> sys.stderr, 'Failed to access %s' % url
1140 return 1 1123 return 1
1141 1124
1142 # Some items support cursors. Try to get automatically if cursors are needed 1125 # Some items support cursors. Try to get automatically if cursors are needed
1143 # by looking at the 'cursor' items. 1126 # by looking at the 'cursor' items.
1144 while ( 1127 while (
1145 data.get('cursor') and 1128 data.get('cursor') and
1146 (not options.limit or len(data['items']) < options.limit)): 1129 (not options.limit or len(data['items']) < options.limit)):
1147 merge_char = '&' if '?' in base_url else '?' 1130 merge_char = '&' if '?' in base_url else '?'
1148 url = base_url + '%scursor=%s' % (merge_char, urllib.quote(data['cursor'])) 1131 url = base_url + '%scursor=%s' % (merge_char, urllib.quote(data['cursor']))
1149 if options.limit: 1132 if options.limit:
1150 url += '&limit=%d' % min(CHUNK_SIZE, options.limit - len(data['items'])) 1133 url += '&limit=%d' % min(CHUNK_SIZE, options.limit - len(data['items']))
1134 if options.progress:
1135 sys.stdout.write('.')
1136 sys.stdout.flush()
1151 new = net.url_read_json(url) 1137 new = net.url_read_json(url)
1152 if new is None: 1138 if new is None:
1139 if options.progress:
1140 print('')
1153 print >> sys.stderr, 'Failed to access %s' % options.swarming 1141 print >> sys.stderr, 'Failed to access %s' % options.swarming
1154 return 1 1142 return 1
1155 data['items'].extend(new['items']) 1143 data['items'].extend(new['items'])
1156 data['cursor'] = new['cursor'] 1144 data['cursor'] = new.get('cursor')
1157 1145
1146 if options.progress:
1147 print('')
1158 if options.limit and len(data.get('items', [])) > options.limit: 1148 if options.limit and len(data.get('items', [])) > options.limit:
1159 data['items'] = data['items'][:options.limit] 1149 data['items'] = data['items'][:options.limit]
1160 data.pop('cursor', None) 1150 data.pop('cursor', None)
1161 1151
1162 if options.json: 1152 if options.json:
1163 with open(options.json, 'w') as f: 1153 tools.write_json(options.json, data, True)
1164 json.dump(data, f)
1165 else: 1154 else:
1166 try: 1155 try:
1167 json.dump(data, sys.stdout, indent=2, sort_keys=True) 1156 tools.write_json(sys.stdout, data, False)
1168 sys.stdout.write('\n') 1157 sys.stdout.write('\n')
1169 except IOError: 1158 except IOError:
1170 pass 1159 pass
1171 return 0 1160 return 0
1172 1161
1173 1162
1163 def CMDquery_list(parser, args):
1164 """Returns list of all the Swarming APIs that can be used with command
1165 'query'.
1166 """
1167 parser.add_option(
1168 '--json', help='Path to JSON output file (otherwise prints to stdout)')
1169 options, args = parser.parse_args(args)
1170 if args:
1171 parser.error('No argument allowed.')
1172
1173 try:
1174 apis = endpoints_api_discovery_apis(options.swarming)
1175 except APIError as e:
1176 parser.error(str(e))
1177 if options.json:
1178 with open(options.json, 'wb') as f:
1179 json.dump(apis, f)
1180 else:
1181 help_url = (
1182 'https://apis-explorer.appspot.com/apis-explorer/?base=%s/_ah/api#p/' %
1183 options.swarming)
1184 for api_id, api in sorted(apis.iteritems()):
1185 print api_id
1186 print ' ' + api['description']
1187 for resource_name, resource in sorted(api['resources'].iteritems()):
1188 print ''
1189 for method_name, method in sorted(resource['methods'].iteritems()):
1190 # Only list the GET ones.
1191 if method['httpMethod'] != 'GET':
1192 continue
1193 print '- %s.%s: %s' % (
1194 resource_name, method_name, method['path'])
1195 print ' ' + method['description']
1196 print ' %s%s%s' % (help_url, api['servicePath'], method['id'])
1197 return 0
1198
1199
1174 @subcommand.usage('(hash|isolated) [-- extra_args]') 1200 @subcommand.usage('(hash|isolated) [-- extra_args]')
1175 def CMDrun(parser, args): 1201 def CMDrun(parser, args):
1176 """Triggers a task and wait for the results. 1202 """Triggers a task and wait for the results.
1177 1203
1178 Basically, does everything to run a command remotely. 1204 Basically, does everything to run a command remotely.
1179 """ 1205 """
1180 add_trigger_options(parser) 1206 add_trigger_options(parser)
1181 add_collect_options(parser) 1207 add_collect_options(parser)
1182 add_sharding_options(parser) 1208 add_sharding_options(parser)
1183 options, args = parser.parse_args(args) 1209 options, args = parser.parse_args(args)
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
1218 """Runs a task locally that was triggered on the server. 1244 """Runs a task locally that was triggered on the server.
1219 1245
1220 This running locally the same commands that have been run on the bot. The data 1246 This running locally the same commands that have been run on the bot. The data
1221 downloaded will be in a subdirectory named 'work' of the current working 1247 downloaded will be in a subdirectory named 'work' of the current working
1222 directory. 1248 directory.
1223 """ 1249 """
1224 options, args = parser.parse_args(args) 1250 options, args = parser.parse_args(args)
1225 if len(args) != 1: 1251 if len(args) != 1:
1226 parser.error('Must specify exactly one task id.') 1252 parser.error('Must specify exactly one task id.')
1227 1253
1228 url = options.swarming + '/swarming/api/v1/client/task/%s/request' % args[0] 1254 url = options.swarming + '/_ah/api/swarming/v1/task/%s/request' % args[0]
1229 request = net.url_read_json(url) 1255 request = net.url_read_json(url)
1230 if not request: 1256 if not request:
1231 print >> sys.stderr, 'Failed to retrieve request data for the task' 1257 print >> sys.stderr, 'Failed to retrieve request data for the task'
1232 return 1 1258 return 1
1233 1259
1234 if not os.path.isdir('work'): 1260 if not os.path.isdir('work'):
1235 os.mkdir('work') 1261 os.mkdir('work')
1236 1262
1237 swarming_host = urlparse.urlparse(options.swarming).netloc
1238 properties = request['properties'] 1263 properties = request['properties']
1239 for data_url, _ in properties['data']:
1240 assert data_url.startswith('https://'), data_url
1241 data_host = urlparse.urlparse(data_url).netloc
1242 if data_host != swarming_host:
1243 auth.ensure_logged_in('https://' + data_host)
1244
1245 content = net.url_read(data_url)
1246 if content is None:
1247 print >> sys.stderr, 'Failed to download %s' % data_url
1248 return 1
1249 with zipfile.ZipFile(StringIO.StringIO(content)) as zip_file:
1250 zip_file.extractall('work')
1251
1252 env = None 1264 env = None
1253 if properties['env']: 1265 if properties['env']:
1254 env = os.environ.copy() 1266 env = os.environ.copy()
1255 logging.info('env: %r', properties['env']) 1267 logging.info('env: %r', properties['env'])
1256 env.update( 1268 env.update(
1257 (k.encode('utf-8'), v.encode('utf-8')) 1269 (i['key'].encode('utf-8'), i['value'].encode('utf-8'))
1258 for k, v in properties['env'].iteritems()) 1270 for i in properties['env'])
1259 1271
1260 exit_code = 0 1272 try:
1261 for cmd in properties['commands']: 1273 return subprocess.call(properties['command'], env=env, cwd='work')
1262 try: 1274 except OSError as e:
1263 c = subprocess.call(cmd, env=env, cwd='work') 1275 print >> sys.stderr, 'Failed to run: %s' % ' '.join(properties['command'])
1264 except OSError as e: 1276 print >> sys.stderr, str(e)
1265 print >> sys.stderr, 'Failed to run: %s' % ' '.join(cmd) 1277 return 1
1266 print >> sys.stderr, str(e)
1267 c = 1
1268 if not exit_code:
1269 exit_code = c
1270 return exit_code
1271 1278
1272 1279
1273 @subcommand.usage("(hash|isolated) [-- extra_args|raw command]") 1280 @subcommand.usage("(hash|isolated) [-- extra_args|raw command]")
1274 def CMDtrigger(parser, args): 1281 def CMDtrigger(parser, args):
1275 """Triggers a Swarming task. 1282 """Triggers a Swarming task.
1276 1283
1277 Accepts either the hash (sha1) of a .isolated file already uploaded or the 1284 Accepts either the hash (sha1) of a .isolated file already uploaded or the
1278 path to an .isolated file to archive. 1285 path to an .isolated file to archive.
1279 1286
1280 If an .isolated file is specified instead of an hash, it is first archived. 1287 If an .isolated file is specified instead of an hash, it is first archived.
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
1362 def main(args): 1369 def main(args):
1363 dispatcher = subcommand.CommandDispatcher(__name__) 1370 dispatcher = subcommand.CommandDispatcher(__name__)
1364 return dispatcher.execute(OptionParserSwarming(version=__version__), args) 1371 return dispatcher.execute(OptionParserSwarming(version=__version__), args)
1365 1372
1366 1373
1367 if __name__ == '__main__': 1374 if __name__ == '__main__':
1368 fix_encoding.fix_encoding() 1375 fix_encoding.fix_encoding()
1369 tools.disable_buffering() 1376 tools.disable_buffering()
1370 colorama.init() 1377 colorama.init()
1371 sys.exit(main(sys.argv[1:])) 1378 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « client/example/payload/hello_world.py ('k') | client/tests/swarming_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698