Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(56)

Side by Side Diff: client/swarming.py

Issue 1337633002: Reapply "Isolated task support in Endpoints API: client side (3/3)" and fixes" (Closed) Base URL: git@github.com:luci/luci-py.git@master
Patch Set: We did it, again Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/example/payload/hello_world.py ('k') | client/tests/swarming_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2013 The Swarming Authors. All rights reserved. 2 # Copyright 2013 The Swarming Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 that 3 # Use of this source code is governed under the Apache License, Version 2.0 that
4 # can be found in the LICENSE file. 4 # can be found in the LICENSE file.
5 5
6 """Client tool to trigger tasks or retrieve results from a Swarming server.""" 6 """Client tool to trigger tasks or retrieve results from a Swarming server."""
7 7
8 __version__ = '0.6.3' 8 __version__ = '0.8.2'
9 9
10 import collections 10 import collections
11 import datetime 11 import datetime
12 import json 12 import json
13 import logging 13 import logging
14 import optparse 14 import optparse
15 import os 15 import os
16 import re
17 import shutil
18 import StringIO
19 import subprocess 16 import subprocess
20 import sys 17 import sys
21 import threading 18 import threading
22 import time 19 import time
23 import urllib 20 import urllib
24 import urlparse
25 import zipfile
26 21
27 from third_party import colorama 22 from third_party import colorama
28 from third_party.depot_tools import fix_encoding 23 from third_party.depot_tools import fix_encoding
29 from third_party.depot_tools import subcommand 24 from third_party.depot_tools import subcommand
30 25
31 from utils import file_path 26 from utils import file_path
32 from utils import logging_utils 27 from utils import logging_utils
33 from third_party.chromium import natsort 28 from third_party.chromium import natsort
34 from utils import net 29 from utils import net
35 from utils import on_error 30 from utils import on_error
36 from utils import threading_utils 31 from utils import threading_utils
37 from utils import tools 32 from utils import tools
38 from utils import zip_package
39 33
40 import auth 34 import auth
41 import isolated_format 35 import isolated_format
42 import isolateserver 36 import isolateserver
43 import run_isolated
44 37
45 38
46 ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) 39 ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
47 40
48 41
49 class Failure(Exception): 42 class Failure(Exception):
50 """Generic failure.""" 43 """Generic failure."""
51 pass 44 pass
52 45
53 46
54 ### Isolated file handling. 47 ### Isolated file handling.
55 48
56 49
57 def isolated_upload_zip_bundle(isolate_server, bundle): 50 def isolated_to_hash(arg, algo):
58 """Uploads a zip package to Isolate Server and returns raw fetch URL.
59
60 Args:
61 isolate_server: URL of an Isolate Server.
62 bundle: instance of ZipPackage to upload.
63
64 Returns:
65 URL to get the file from.
66 """
67 # Swarming bot needs to be able to grab the file from the Isolate Server using
68 # a simple HTTPS GET. Use 'default' namespace so that the raw data returned to
69 # a bot is not zipped, since the swarming_bot doesn't understand compressed
70 # data. This namespace have nothing to do with |namespace| passed to
71 # run_isolated.py that is used to store files for isolated task.
72 logging.info('Zipping up and uploading files...')
73 start_time = time.time()
74 isolate_item = isolateserver.BufferItem(bundle.zip_into_buffer())
75 with isolateserver.get_storage(isolate_server, 'default') as storage:
76 uploaded = storage.upload_items([isolate_item])
77 bundle_url = storage.get_fetch_url(isolate_item)
78 elapsed = time.time() - start_time
79 if isolate_item in uploaded:
80 logging.info('Upload complete, time elapsed: %f', elapsed)
81 else:
82 logging.info('Zip file already on server, time elapsed: %f', elapsed)
83 return bundle_url
84
85
86 def isolated_get_data(isolate_server):
87 """Returns the 'data' section with all files necessary to bootstrap a task
88 execution running an isolated task.
89
90 It's mainly zipping run_isolated.zip over and over again.
91 TODO(maruel): Get rid of this with.
92 https://code.google.com/p/swarming/issues/detail?id=173
93 """
94 bundle = zip_package.ZipPackage(ROOT_DIR)
95 bundle.add_buffer(
96 'run_isolated.zip',
97 run_isolated.get_as_zip_package().zip_into_buffer(compress=False))
98 bundle_url = isolated_upload_zip_bundle(isolate_server, bundle)
99 return [(bundle_url, 'swarm_data.zip')]
100
101
102 def isolated_get_run_commands(
103 isolate_server, namespace, isolated_hash, extra_args, verbose):
104 """Returns the 'commands' to run an isolated task via run_isolated.zip.
105
106 Returns:
107 commands list to be added to the request.
108 """
109 run_cmd = [
110 'python', 'run_isolated.zip',
111 '--isolated', isolated_hash,
112 '--isolate-server', isolate_server,
113 '--namespace', namespace,
114 ]
115 if verbose:
116 run_cmd.append('--verbose')
117 # Pass all extra args for run_isolated.py, it will pass them to the command.
118 if extra_args:
119 run_cmd.append('--')
120 run_cmd.extend(extra_args)
121 return run_cmd
122
123
124 def isolated_archive(isolate_server, namespace, isolated, algo, verbose):
125 """Archives a .isolated and all the dependencies on the Isolate Server."""
126 logging.info(
127 'isolated_archive(%s, %s, %s)', isolate_server, namespace, isolated)
128 print('Archiving: %s' % isolated)
129 cmd = [
130 sys.executable,
131 os.path.join(ROOT_DIR, 'isolate.py'),
132 'archive',
133 '--isolate-server', isolate_server,
134 '--namespace', namespace,
135 '--isolated', isolated,
136 ]
137 cmd.extend(['--verbose'] * verbose)
138 logging.info(' '.join(cmd))
139 if subprocess.call(cmd, verbose):
140 return None
141 return isolated_format.hash_file(isolated, algo)
142
143
144 def isolated_to_hash(isolate_server, namespace, arg, algo, verbose):
145 """Archives a .isolated file if needed. 51 """Archives a .isolated file if needed.
146 52
147 Returns the file hash to trigger and a bool specifying if it was a file (True) 53 Returns the file hash to trigger and a bool specifying if it was a file (True)
148 or a hash (False). 54 or a hash (False).
149 """ 55 """
150 if arg.endswith('.isolated'): 56 if arg.endswith('.isolated'):
151 file_hash = isolated_archive(isolate_server, namespace, arg, algo, verbose) 57 file_hash = isolated_format.hash_file(arg, algo)
152 if not file_hash: 58 if not file_hash:
153 on_error.report('Archival failure %s' % arg) 59 on_error.report('Archival failure %s' % arg)
154 return None, True 60 return None, True
155 return file_hash, True 61 return file_hash, True
156 elif isolated_format.is_valid_hash(arg, algo): 62 elif isolated_format.is_valid_hash(arg, algo):
157 return arg, False 63 return arg, False
158 else: 64 else:
159 on_error.report('Invalid hash %s' % arg) 65 on_error.report('Invalid hash %s' % arg)
160 return None, False 66 return None, False
161 67
162 68
163 def isolated_handle_options(options, args): 69 def isolated_handle_options(options, args):
164 """Handles '--isolated <isolated>', '<isolated>' and '-- <args...>' arguments. 70 """Handles '--isolated <isolated>', '<isolated>' and '-- <args...>' arguments.
165 71
166 Returns: 72 Returns:
167 tuple(command, data). 73 tuple(command, inputs_ref).
168 """ 74 """
169 isolated_cmd_args = [] 75 isolated_cmd_args = []
170 if not options.isolated: 76 if not options.isolated:
171 if '--' in args: 77 if '--' in args:
172 index = args.index('--') 78 index = args.index('--')
173 isolated_cmd_args = args[index+1:] 79 isolated_cmd_args = args[index+1:]
174 args = args[:index] 80 args = args[:index]
175 else: 81 else:
176 # optparse eats '--' sometimes. 82 # optparse eats '--' sometimes.
177 isolated_cmd_args = args[1:] 83 isolated_cmd_args = args[1:]
178 args = args[:1] 84 args = args[:1]
179 if len(args) != 1: 85 if len(args) != 1:
180 raise ValueError( 86 raise ValueError(
181 'Use --isolated, --raw-cmd or \'--\' to pass arguments to the called ' 87 'Use --isolated, --raw-cmd or \'--\' to pass arguments to the called '
182 'process.') 88 'process.')
183 # Old code. To be removed eventually. 89 # Old code. To be removed eventually.
184 options.isolated, is_file = isolated_to_hash( 90 options.isolated, is_file = isolated_to_hash(
185 options.isolate_server, options.namespace, args[0], 91 args[0], isolated_format.get_hash_algo(options.namespace))
186 isolated_format.get_hash_algo(options.namespace), options.verbose)
187 if not options.isolated: 92 if not options.isolated:
188 raise ValueError('Invalid argument %s' % args[0]) 93 raise ValueError('Invalid argument %s' % args[0])
189 elif args: 94 elif args:
190 is_file = False 95 is_file = False
191 if '--' in args: 96 if '--' in args:
192 index = args.index('--') 97 index = args.index('--')
193 isolated_cmd_args = args[index+1:] 98 isolated_cmd_args = args[index+1:]
194 if index != 0: 99 if index != 0:
195 raise ValueError('Unexpected arguments.') 100 raise ValueError('Unexpected arguments.')
196 else: 101 else:
197 # optparse eats '--' sometimes. 102 # optparse eats '--' sometimes.
198 isolated_cmd_args = args 103 isolated_cmd_args = args
199 104
200 command = isolated_get_run_commands(
201 options.isolate_server, options.namespace, options.isolated,
202 isolated_cmd_args, options.verbose)
203
204 # If a file name was passed, use its base name of the isolated hash. 105 # If a file name was passed, use its base name of the isolated hash.
205 # Otherwise, use user name as an approximation of a task name. 106 # Otherwise, use user name as an approximation of a task name.
206 if not options.task_name: 107 if not options.task_name:
207 if is_file: 108 if is_file:
208 key = os.path.splitext(os.path.basename(args[0]))[0] 109 key = os.path.splitext(os.path.basename(args[0]))[0]
209 else: 110 else:
210 key = options.user 111 key = options.user
211 options.task_name = u'%s/%s/%s' % ( 112 options.task_name = u'%s/%s/%s' % (
212 key, 113 key,
213 '_'.join( 114 '_'.join(
214 '%s=%s' % (k, v) 115 '%s=%s' % (k, v)
215 for k, v in sorted(options.dimensions.iteritems())), 116 for k, v in sorted(options.dimensions.iteritems())),
216 options.isolated) 117 options.isolated)
217 118
218 try: 119 inputs_ref = FilesRef(
219 data = isolated_get_data(options.isolate_server) 120 isolated=options.isolated,
220 except (IOError, OSError): 121 isolatedserver=options.isolate_server,
221 on_error.report('Failed to upload the zip file') 122 namespace=options.namespace)
222 raise ValueError('Failed to upload the zip file') 123 return isolated_cmd_args, inputs_ref
223
224 return command, data
225 124
226 125
227 ### Triggering. 126 ### Triggering.
228 127
229 128
230 TaskRequest = collections.namedtuple( 129 # See ../appengine/swarming/swarming_rpcs.py.
231 'TaskRequest', 130 FilesRef = collections.namedtuple(
131 'FilesRef',
132 [
133 'isolated',
134 'isolatedserver',
135 'namespace',
136 ])
137
138
139 # See ../appengine/swarming/swarming_rpcs.py.
140 TaskProperties = collections.namedtuple(
141 'TaskProperties',
232 [ 142 [
233 'command', 143 'command',
234 'data',
235 'dimensions', 144 'dimensions',
236 'env', 145 'env',
237 'expiration', 146 'execution_timeout_secs',
238 'hard_timeout', 147 'extra_args',
148 'grace_period_secs',
239 'idempotent', 149 'idempotent',
240 'io_timeout', 150 'inputs_ref',
151 'io_timeout_secs',
152 ])
153
154
155 # See ../appengine/swarming/swarming_rpcs.py.
156 NewTaskRequest = collections.namedtuple(
157 'NewTaskRequest',
158 [
159 'expiration_secs',
241 'name', 160 'name',
161 'parent_task_id',
242 'priority', 162 'priority',
163 'properties',
243 'tags', 164 'tags',
244 'user', 165 'user',
245 'verbose',
246 ]) 166 ])
247 167
248 168
169 def namedtuple_to_dict(value):
170 """Recursively converts a namedtuple to a dict."""
171 out = dict(value._asdict())
172 for k, v in out.iteritems():
173 if hasattr(v, '_asdict'):
174 out[k] = namedtuple_to_dict(v)
175 return out
176
177
249 def task_request_to_raw_request(task_request): 178 def task_request_to_raw_request(task_request):
250 """Returns the json dict expected by the Swarming server for new request. 179 """Returns the json dict expected by the Swarming server for new request.
251 180
252 This is for the v1 client Swarming API. 181 This is for the v1 client Swarming API.
253 """ 182 """
254 return { 183 out = namedtuple_to_dict(task_request)
255 'name': task_request.name, 184 # Maps are not supported until protobuf v3.
256 'parent_task_id': os.environ.get('SWARMING_TASK_ID', ''), 185 out['properties']['dimensions'] = [
257 'priority': task_request.priority, 186 {'key': k, 'value': v}
258 'properties': { 187 for k, v in out['properties']['dimensions'].iteritems()
259 'commands': [task_request.command], 188 ]
260 'data': task_request.data, 189 out['properties']['dimensions'].sort(key=lambda x: x['key'])
261 'dimensions': task_request.dimensions, 190 out['properties']['env'] = [
262 'env': task_request.env, 191 {'key': k, 'value': v}
263 'execution_timeout_secs': task_request.hard_timeout, 192 for k, v in out['properties']['env'].iteritems()
264 'io_timeout_secs': task_request.io_timeout, 193 ]
265 'idempotent': task_request.idempotent, 194 out['properties']['env'].sort(key=lambda x: x['key'])
266 }, 195 return out
267 'scheduling_expiration_secs': task_request.expiration,
268 'tags': task_request.tags,
269 'user': task_request.user,
270 }
271 196
272 197
273 def swarming_handshake(swarming): 198 def swarming_trigger(swarming, raw_request):
274 """Initiates the connection to the Swarming server."""
275 headers = {'X-XSRF-Token-Request': '1'}
276 response = net.url_read_json(
277 swarming + '/swarming/api/v1/client/handshake',
278 headers=headers,
279 data={})
280 if not response:
281 logging.error('Failed to handshake with server')
282 return None
283 logging.info('Connected to server version: %s', response['server_version'])
284 return response['xsrf_token']
285
286
287 def swarming_trigger(swarming, raw_request, xsrf_token):
288 """Triggers a request on the Swarming server and returns the json data. 199 """Triggers a request on the Swarming server and returns the json data.
289 200
290 It's the low-level function. 201 It's the low-level function.
291 202
292 Returns: 203 Returns:
293 { 204 {
294 'request': { 205 'request': {
295 'created_ts': u'2010-01-02 03:04:05', 206 'created_ts': u'2010-01-02 03:04:05',
296 'name': .. 207 'name': ..
297 }, 208 },
298 'task_id': '12300', 209 'task_id': '12300',
299 } 210 }
300 """ 211 """
301 logging.info('Triggering: %s', raw_request['name']) 212 logging.info('Triggering: %s', raw_request['name'])
302 213
303 headers = {'X-XSRF-Token': xsrf_token}
304 result = net.url_read_json( 214 result = net.url_read_json(
305 swarming + '/swarming/api/v1/client/request', 215 swarming + '/_ah/api/swarming/v1/tasks/new', data=raw_request)
306 data=raw_request,
307 headers=headers)
308 if not result: 216 if not result:
309 on_error.report('Failed to trigger task %s' % raw_request['name']) 217 on_error.report('Failed to trigger task %s' % raw_request['name'])
310 return None 218 return None
311 return result 219 return result
312 220
313 221
314 def setup_googletest(env, shards, index): 222 def setup_googletest(env, shards, index):
315 """Sets googletest specific environment variables.""" 223 """Sets googletest specific environment variables."""
316 if shards > 1: 224 if shards > 1:
317 env = env.copy() 225 env = env[:]
318 env['GTEST_SHARD_INDEX'] = str(index) 226 env.append({'key': 'GTEST_SHARD_INDEX', 'value': str(index)})
Vadim Sh. 2015/09/14 17:46:52 nit: override existing GTEST_SHARD_INDEX and GTEST
M-A Ruel 2015/09/15 14:13:54 Added asserts that they are not present instead. I
319 env['GTEST_TOTAL_SHARDS'] = str(shards) 227 env.append({'key': 'GTEST_TOTAL_SHARDS', 'value': str(shards)})
320 return env 228 return env
321 229
322 230
323 def trigger_task_shards(swarming, task_request, shards): 231 def trigger_task_shards(swarming, task_request, shards):
324 """Triggers one or many subtasks of a sharded task. 232 """Triggers one or many subtasks of a sharded task.
325 233
326 Returns: 234 Returns:
327 Dict with task details, returned to caller as part of --dump-json output. 235 Dict with task details, returned to caller as part of --dump-json output.
328 None in case of failure. 236 None in case of failure.
329 """ 237 """
330 def convert(index): 238 def convert(index):
331 req = task_request 239 req = task_request_to_raw_request(task_request)
332 if shards > 1: 240 if shards > 1:
333 req = req._replace( 241 req['properties']['env'] = setup_googletest(
334 env=setup_googletest(req.env, shards, index), 242 req['properties']['env'], shards, index)
335 name='%s:%s:%s' % (req.name, index, shards)) 243 req['name'] += ':%s:%s' % (index, shards)
336 return task_request_to_raw_request(req) 244 return req
337 245
338 requests = [convert(index) for index in xrange(shards)] 246 requests = [convert(index) for index in xrange(shards)]
339 xsrf_token = swarming_handshake(swarming)
340 if not xsrf_token:
341 return None
342 tasks = {} 247 tasks = {}
343 priority_warning = False 248 priority_warning = False
344 for index, request in enumerate(requests): 249 for index, request in enumerate(requests):
345 task = swarming_trigger(swarming, request, xsrf_token) 250 task = swarming_trigger(swarming, request)
346 if not task: 251 if not task:
347 break 252 break
348 logging.info('Request result: %s', task) 253 logging.info('Request result: %s', task)
349 if (not priority_warning and 254 if (not priority_warning and
350 task['request']['priority'] != task_request.priority): 255 task['request']['priority'] != task_request.priority):
351 priority_warning = True 256 priority_warning = True
352 print >> sys.stderr, ( 257 print >> sys.stderr, (
353 'Priority was reset to %s' % task['request']['priority']) 258 'Priority was reset to %s' % task['request']['priority'])
354 tasks[request['name']] = { 259 tasks[request['name']] = {
355 'shard_index': index, 260 'shard_index': index,
(...skipping 29 matching lines...) Expand all
385 It's in fact an enum. Values should be in decreasing order of importance. 290 It's in fact an enum. Values should be in decreasing order of importance.
386 """ 291 """
387 RUNNING = 0x10 292 RUNNING = 0x10
388 PENDING = 0x20 293 PENDING = 0x20
389 EXPIRED = 0x30 294 EXPIRED = 0x30
390 TIMED_OUT = 0x40 295 TIMED_OUT = 0x40
391 BOT_DIED = 0x50 296 BOT_DIED = 0x50
392 CANCELED = 0x60 297 CANCELED = 0x60
393 COMPLETED = 0x70 298 COMPLETED = 0x70
394 299
395 STATES = (RUNNING, PENDING, EXPIRED, TIMED_OUT, BOT_DIED, CANCELED, COMPLETED) 300 STATES = (
396 STATES_RUNNING = (RUNNING, PENDING) 301 'RUNNING', 'PENDING', 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED',
397 STATES_NOT_RUNNING = (EXPIRED, TIMED_OUT, BOT_DIED, CANCELED, COMPLETED) 302 'COMPLETED')
398 STATES_DONE = (TIMED_OUT, COMPLETED) 303 STATES_RUNNING = ('RUNNING', 'PENDING')
399 STATES_ABANDONED = (EXPIRED, BOT_DIED, CANCELED) 304 STATES_NOT_RUNNING = (
305 'EXPIRED', 'TIMED_OUT', 'BOT_DIED', 'CANCELED', 'COMPLETED')
306 STATES_DONE = ('TIMED_OUT', 'COMPLETED')
307 STATES_ABANDONED = ('EXPIRED', 'BOT_DIED', 'CANCELED')
400 308
401 _NAMES = { 309 _NAMES = {
402 RUNNING: 'Running', 310 RUNNING: 'Running',
403 PENDING: 'Pending', 311 PENDING: 'Pending',
404 EXPIRED: 'Expired', 312 EXPIRED: 'Expired',
405 TIMED_OUT: 'Execution timed out', 313 TIMED_OUT: 'Execution timed out',
406 BOT_DIED: 'Bot died', 314 BOT_DIED: 'Bot died',
407 CANCELED: 'User canceled', 315 CANCELED: 'User canceled',
408 COMPLETED: 'Completed', 316 COMPLETED: 'Completed',
409 } 317 }
410 318
319 _ENUMS = {
320 'RUNNING': RUNNING,
321 'PENDING': PENDING,
322 'EXPIRED': EXPIRED,
323 'TIMED_OUT': TIMED_OUT,
324 'BOT_DIED': BOT_DIED,
325 'CANCELED': CANCELED,
326 'COMPLETED': COMPLETED,
327 }
328
411 @classmethod 329 @classmethod
412 def to_string(cls, state): 330 def to_string(cls, state):
413 """Returns a user-readable string representing a State.""" 331 """Returns a user-readable string representing a State."""
414 if state not in cls._NAMES: 332 if state not in cls._NAMES:
415 raise ValueError('Invalid state %s' % state) 333 raise ValueError('Invalid state %s' % state)
416 return cls._NAMES[state] 334 return cls._NAMES[state]
417 335
336 @classmethod
337 def from_enum(cls, state):
338 """Returns int value based on the string."""
339 if state not in cls._ENUMS:
340 raise ValueError('Invalid state %s' % state)
341 return cls._ENUMS[state]
342
418 343
419 class TaskOutputCollector(object): 344 class TaskOutputCollector(object):
420 """Assembles task execution summary (for --task-summary-json output). 345 """Assembles task execution summary (for --task-summary-json output).
421 346
422 Optionally fetches task outputs from isolate server to local disk (used when 347 Optionally fetches task outputs from isolate server to local disk (used when
423 --task-output-dir is passed). 348 --task-output-dir is passed).
424 349
425 This object is shared among multiple threads running 'retrieve_results' 350 This object is shared among multiple threads running 'retrieve_results'
426 function, in particular they call 'process_shard_result' method in parallel. 351 function, in particular they call 'process_shard_result' method in parallel.
427 """ 352 """
(...skipping 15 matching lines...) Expand all
443 self._storage = None 368 self._storage = None
444 369
445 if self.task_output_dir and not os.path.isdir(self.task_output_dir): 370 if self.task_output_dir and not os.path.isdir(self.task_output_dir):
446 os.makedirs(self.task_output_dir) 371 os.makedirs(self.task_output_dir)
447 372
448 def process_shard_result(self, shard_index, result): 373 def process_shard_result(self, shard_index, result):
449 """Stores results of a single task shard, fetches output files if necessary. 374 """Stores results of a single task shard, fetches output files if necessary.
450 375
451 Modifies |result| in place. 376 Modifies |result| in place.
452 377
378 shard_index is 0-based.
379
453 Called concurrently from multiple threads. 380 Called concurrently from multiple threads.
454 """ 381 """
455 # Sanity check index is in expected range. 382 # Sanity check index is in expected range.
456 assert isinstance(shard_index, int) 383 assert isinstance(shard_index, int)
457 if shard_index < 0 or shard_index >= self.shard_count: 384 if shard_index < 0 or shard_index >= self.shard_count:
458 logging.warning( 385 logging.warning(
459 'Shard index %d is outside of expected range: [0; %d]', 386 'Shard index %d is outside of expected range: [0; %d]',
460 shard_index, self.shard_count - 1) 387 shard_index, self.shard_count - 1)
461 return 388 return
462 389
463 assert not 'isolated_out' in result 390 if result.get('outputs_ref'):
464 result['isolated_out'] = None 391 ref = result['outputs_ref']
465 for output in result['outputs']: 392 result['outputs_ref']['view_url'] = '%s/browse?%s' % (
466 isolated_files_location = extract_output_files_location(output) 393 ref['isolatedserver'],
467 if isolated_files_location: 394 urllib.urlencode(
468 if result['isolated_out']: 395 [('namespace', ref['namespace']), ('hash', ref['isolated'])]))
469 raise ValueError('Unexpected two task with output')
470 result['isolated_out'] = isolated_files_location
471 396
472 # Store result dict of that shard, ignore results we've already seen. 397 # Store result dict of that shard, ignore results we've already seen.
473 with self._lock: 398 with self._lock:
474 if shard_index in self._per_shard_results: 399 if shard_index in self._per_shard_results:
475 logging.warning('Ignoring duplicate shard index %d', shard_index) 400 logging.warning('Ignoring duplicate shard index %d', shard_index)
476 return 401 return
477 self._per_shard_results[shard_index] = result 402 self._per_shard_results[shard_index] = result
478 403
479 # Fetch output files if necessary. 404 # Fetch output files if necessary.
480 if self.task_output_dir and result['isolated_out']: 405 if self.task_output_dir and result.get('outputs_ref'):
481 storage = self._get_storage( 406 storage = self._get_storage(
482 result['isolated_out']['server'], 407 result['outputs_ref']['isolatedserver'],
483 result['isolated_out']['namespace']) 408 result['outputs_ref']['namespace'])
484 if storage: 409 if storage:
485 # Output files are supposed to be small and they are not reused across 410 # Output files are supposed to be small and they are not reused across
486 # tasks. So use MemoryCache for them instead of on-disk cache. Make 411 # tasks. So use MemoryCache for them instead of on-disk cache. Make
487 # files writable, so that calling script can delete them. 412 # files writable, so that calling script can delete them.
488 isolateserver.fetch_isolated( 413 isolateserver.fetch_isolated(
489 result['isolated_out']['hash'], 414 result['outputs_ref']['isolated'],
490 storage, 415 storage,
491 isolateserver.MemoryCache(file_mode_mask=0700), 416 isolateserver.MemoryCache(file_mode_mask=0700),
492 os.path.join(self.task_output_dir, str(shard_index)), 417 os.path.join(self.task_output_dir, str(shard_index)),
493 False) 418 False)
494 419
495 def finalize(self): 420 def finalize(self):
496 """Assembles and returns task summary JSON, shutdowns underlying Storage.""" 421 """Assembles and returns task summary JSON, shutdowns underlying Storage."""
497 with self._lock: 422 with self._lock:
498 # Write an array of shard results with None for missing shards. 423 # Write an array of shard results with None for missing shards.
499 summary = { 424 summary = {
(...skipping 26 matching lines...) Expand all
526 self._storage.location, isolate_server) 451 self._storage.location, isolate_server)
527 return None 452 return None
528 if self._storage.namespace != namespace: 453 if self._storage.namespace != namespace:
529 logging.error( 454 logging.error(
530 'Task shards are using multiple namespaces: %s and %s', 455 'Task shards are using multiple namespaces: %s and %s',
531 self._storage.namespace, namespace) 456 self._storage.namespace, namespace)
532 return None 457 return None
533 return self._storage 458 return self._storage
534 459
535 460
536 def extract_output_files_location(task_log):
537 """Task log -> location of task output files to fetch.
538
539 TODO(vadimsh,maruel): Use side-channel to get this information.
540 See 'run_tha_test' in run_isolated.py for where the data is generated.
541
542 Returns:
543 Tuple (isolate server URL, namespace, isolated hash) on success.
544 None if information is missing or can not be parsed.
545 """
546 if not task_log:
547 return None
548 match = re.search(
549 r'\[run_isolated_out_hack\](.*)\[/run_isolated_out_hack\]',
550 task_log,
551 re.DOTALL)
552 if not match:
553 return None
554
555 def to_ascii(val):
556 if not isinstance(val, basestring):
557 raise ValueError()
558 return val.encode('ascii')
559
560 try:
561 data = json.loads(match.group(1))
562 if not isinstance(data, dict):
563 raise ValueError()
564 isolated_hash = to_ascii(data['hash'])
565 namespace = to_ascii(data['namespace'])
566 isolate_server = to_ascii(data['storage'])
567 if not file_path.is_url(isolate_server):
568 raise ValueError()
569 data = {
570 'hash': isolated_hash,
571 'namespace': namespace,
572 'server': isolate_server,
573 'view_url': '%s/browse?%s' % (isolate_server, urllib.urlencode(
574 [('namespace', namespace), ('hash', isolated_hash)])),
575 }
576 return data
577 except (KeyError, ValueError):
578 logging.warning(
579 'Unexpected value of run_isolated_out_hack: %s', match.group(1))
580 return None
581
582
583 def now(): 461 def now():
584 """Exists so it can be mocked easily.""" 462 """Exists so it can be mocked easily."""
585 return time.time() 463 return time.time()
586 464
587 465
466 def parse_time(value):
467 """Converts serialized time from the API to datetime.datetime."""
468 # When microseconds are 0, the '.123456' suffix is elided. This means the
469 # serialized format is not consistent, which confuses the hell out of python.
470 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
471 try:
472 return datetime.datetime.strptime(value, fmt)
473 except ValueError:
474 pass
475 raise ValueError('Failed to parse %s' % value)
476
477
588 def retrieve_results( 478 def retrieve_results(
589 base_url, shard_index, task_id, timeout, should_stop, output_collector): 479 base_url, shard_index, task_id, timeout, should_stop, output_collector):
590 """Retrieves results for a single task ID. 480 """Retrieves results for a single task ID.
591 481
592 Returns: 482 Returns:
593 <result dict> on success. 483 <result dict> on success.
594 None on failure. 484 None on failure.
595 """ 485 """
596 assert isinstance(timeout, float), timeout 486 assert isinstance(timeout, float), timeout
597 result_url = '%s/swarming/api/v1/client/task/%s' % (base_url, task_id) 487 result_url = '%s/_ah/api/swarming/v1/task/%s/result' % (base_url, task_id)
598 output_url = '%s/swarming/api/v1/client/task/%s/output/all' % ( 488 output_url = '%s/_ah/api/swarming/v1/task/%s/stdout' % (base_url, task_id)
599 base_url, task_id)
600 started = now() 489 started = now()
601 deadline = started + timeout if timeout else None 490 deadline = started + timeout if timeout else None
602 attempt = 0 491 attempt = 0
603 492
604 while not should_stop.is_set(): 493 while not should_stop.is_set():
605 attempt += 1 494 attempt += 1
606 495
607 # Waiting for too long -> give up. 496 # Waiting for too long -> give up.
608 current_time = now() 497 current_time = now()
609 if deadline and current_time >= deadline: 498 if deadline and current_time >= deadline:
(...skipping 12 matching lines...) Expand all
622 should_stop.wait(delay) 511 should_stop.wait(delay)
623 if should_stop.is_set(): 512 if should_stop.is_set():
624 return None 513 return None
625 514
626 # Disable internal retries in net.url_read_json, since we are doing retries 515 # Disable internal retries in net.url_read_json, since we are doing retries
627 # ourselves. 516 # ourselves.
628 # TODO(maruel): We'd need to know if it's a 404 and not retry at all. 517 # TODO(maruel): We'd need to know if it's a 404 and not retry at all.
629 result = net.url_read_json(result_url, retry_50x=False) 518 result = net.url_read_json(result_url, retry_50x=False)
630 if not result: 519 if not result:
631 continue 520 continue
521
632 if result['state'] in State.STATES_NOT_RUNNING: 522 if result['state'] in State.STATES_NOT_RUNNING:
523 # TODO(maruel): Not always fetch stdout?
633 out = net.url_read_json(output_url) 524 out = net.url_read_json(output_url)
634 result['outputs'] = (out or {}).get('outputs', []) 525 result['output'] = out['output'] if out else out
635 if not result['outputs']: 526 if not result['output']:
636 logging.error('No output found for task %s', task_id) 527 logging.error('No output found for task %s', task_id)
637 # Record the result, try to fetch attached output files (if any). 528 # Record the result, try to fetch attached output files (if any).
638 if output_collector: 529 if output_collector:
639 # TODO(vadimsh): Respect |should_stop| and |deadline| when fetching. 530 # TODO(vadimsh): Respect |should_stop| and |deadline| when fetching.
640 output_collector.process_shard_result(shard_index, result) 531 output_collector.process_shard_result(shard_index, result)
641 return result 532 return result
642 533
643 534
535 def convert_to_old_format(result):
536 """Converts the task result data from Endpoints API format to old API format
537 for compatibility.
538
539 This goes into the file generated as --task-summary-json.
540 """
541 # Sets default.
542 result.setdefault('abandoned_ts', None)
543 result.setdefault('bot_id', None)
544 result.setdefault('bot_version', None)
545 result.setdefault('children_task_ids', [])
546 result.setdefault('completed_ts', None)
547 result.setdefault('cost_saved_usd', None)
548 result.setdefault('costs_usd', None)
549 result.setdefault('deduped_from', None)
550 result.setdefault('name', None)
551 result.setdefault('outputs_ref', None)
552 result.setdefault('properties_hash', None)
553 result.setdefault('server_versions', None)
554 result.setdefault('started_ts', None)
555 result.setdefault('tags', None)
556 result.setdefault('user', None)
557
558 # Convertion back to old API.
559 duration = result.pop('duration', None)
560 result['durations'] = [duration] if duration else []
561 exit_code = result.pop('exit_code', None)
562 result['exit_codes'] = [int(exit_code)] if exit_code else []
563 result['id'] = result.pop('task_id')
564 result['isolated_out'] = result.get('outputs_ref', None)
565 output = result.pop('output', None)
566 result['outputs'] = [output] if output else []
567 # properties_hash
568 # server_version
569 # Endpoints result 'state' as string. For compatibility with old code, convert
570 # to int.
571 result['state'] = State.from_enum(result['state'])
572 # tags
573 result['try_number'] = (
574 int(result['try_number']) if result['try_number'] else None)
575 result['bot_dimensions'] = {
576 i['key']: i['value'] for i in result['bot_dimensions']
577 }
578
579
644 def yield_results( 580 def yield_results(
645 swarm_base_url, task_ids, timeout, max_threads, print_status_updates, 581 swarm_base_url, task_ids, timeout, max_threads, print_status_updates,
646 output_collector): 582 output_collector):
647 """Yields swarming task results from the swarming server as (index, result). 583 """Yields swarming task results from the swarming server as (index, result).
648 584
649 Duplicate shards are ignored. Shards are yielded in order of completion. 585 Duplicate shards are ignored. Shards are yielded in order of completion.
650 Timed out shards are NOT yielded at all. Caller can compare number of yielded 586 Timed out shards are NOT yielded at all. Caller can compare number of yielded
651 shards with len(task_keys) to verify all shards completed. 587 shards with len(task_keys) to verify all shards completed.
652 588
653 max_threads is optional and is used to limit the number of parallel fetches 589 max_threads is optional and is used to limit the number of parallel fetches
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
709 shards_remaining.remove(shard_index) 645 shards_remaining.remove(shard_index)
710 yield shard_index, result 646 yield shard_index, result
711 647
712 finally: 648 finally:
713 # Done or aborted with Ctrl+C, kill the remaining threads. 649 # Done or aborted with Ctrl+C, kill the remaining threads.
714 should_stop.set() 650 should_stop.set()
715 651
716 652
717 def decorate_shard_output(swarming, shard_index, metadata): 653 def decorate_shard_output(swarming, shard_index, metadata):
718 """Returns wrapped output for swarming task shard.""" 654 """Returns wrapped output for swarming task shard."""
719 def t(d): 655 if metadata.get('started_ts') and not metadata.get('deduped_from'):
720 return datetime.datetime.strptime(d, '%Y-%m-%d %H:%M:%S')
721 if metadata.get('started_ts'):
722 pending = '%.1fs' % ( 656 pending = '%.1fs' % (
723 t(metadata['started_ts']) - t(metadata['created_ts'])).total_seconds() 657 parse_time(metadata['started_ts']) - parse_time(metadata['created_ts'])
658 ).total_seconds()
724 else: 659 else:
725 pending = 'N/A' 660 pending = 'N/A'
726 661
727 if metadata.get('durations'): 662 if metadata.get('duration') is not None:
728 duration = '%.1fs' % metadata['durations'][0] 663 duration = '%.1fs' % metadata['duration']
729 else: 664 else:
730 duration = 'N/A' 665 duration = 'N/A'
731 666
732 if metadata.get('exit_codes'): 667 if metadata.get('exit_code') is not None:
733 exit_code = '%d' % metadata['exit_codes'][0] 668 # Integers are encoded as string to not loose precision.
669 exit_code = '%s' % metadata['exit_code']
734 else: 670 else:
735 exit_code = 'N/A' 671 exit_code = 'N/A'
736 672
737 bot_id = metadata.get('bot_id') or 'N/A' 673 bot_id = metadata.get('bot_id') or 'N/A'
738 674
739 url = '%s/user/task/%s' % (swarming, metadata['id']) 675 url = '%s/user/task/%s' % (swarming, metadata['task_id'])
740 tag_header = 'Shard %d %s' % (shard_index, url) 676 tag_header = 'Shard %d %s' % (shard_index, url)
741 tag_footer = ( 677 tag_footer = (
742 'End of shard %d Pending: %s Duration: %s Bot: %s Exit: %s' % ( 678 'End of shard %d Pending: %s Duration: %s Bot: %s Exit: %s' % (
743 shard_index, pending, duration, bot_id, exit_code)) 679 shard_index, pending, duration, bot_id, exit_code))
744 680
745 tag_len = max(len(tag_header), len(tag_footer)) 681 tag_len = max(len(tag_header), len(tag_footer))
746 dash_pad = '+-%s-+\n' % ('-' * tag_len) 682 dash_pad = '+-%s-+\n' % ('-' * tag_len)
747 tag_header = '| %s |\n' % tag_header.ljust(tag_len) 683 tag_header = '| %s |\n' % tag_header.ljust(tag_len)
748 tag_footer = '| %s |\n' % tag_footer.ljust(tag_len) 684 tag_footer = '| %s |\n' % tag_footer.ljust(tag_len)
749 685
750 header = dash_pad + tag_header + dash_pad 686 header = dash_pad + tag_header + dash_pad
751 footer = dash_pad + tag_footer + dash_pad[:-1] 687 footer = dash_pad + tag_footer + dash_pad[:-1]
752 output = '\n'.join(o for o in metadata['outputs'] if o).rstrip() + '\n' 688 output = metadata['output'].rstrip() + '\n'
753 return header + output + footer 689 return header + output + footer
754 690
755 691
756 def collect( 692 def collect(
757 swarming, task_name, task_ids, timeout, decorate, print_status_updates, 693 swarming, task_name, task_ids, timeout, decorate, print_status_updates,
758 task_summary_json, task_output_dir): 694 task_summary_json, task_output_dir):
759 """Retrieves results of a Swarming task.""" 695 """Retrieves results of a Swarming task."""
760 # Collect summary JSON and output files (if task_output_dir is not None). 696 # Collect summary JSON and output files (if task_output_dir is not None).
761 output_collector = TaskOutputCollector( 697 output_collector = TaskOutputCollector(
762 task_output_dir, task_name, len(task_ids)) 698 task_output_dir, task_name, len(task_ids))
763 699
764 seen_shards = set() 700 seen_shards = set()
765 exit_code = 0 701 exit_code = 0
766 total_duration = 0 702 total_duration = 0
767 try: 703 try:
768 for index, metadata in yield_results( 704 for index, metadata in yield_results(
769 swarming, task_ids, timeout, None, print_status_updates, 705 swarming, task_ids, timeout, None, print_status_updates,
770 output_collector): 706 output_collector):
771 seen_shards.add(index) 707 seen_shards.add(index)
772 708
773 # Default to failure if there was no process that even started. 709 # Default to failure if there was no process that even started.
774 shard_exit_code = 1 710 shard_exit_code = metadata.get('exit_code')
775 if metadata.get('exit_codes'): 711 if shard_exit_code:
776 shard_exit_code = metadata['exit_codes'][0] 712 shard_exit_code = int(shard_exit_code)
777 if shard_exit_code: 713 if shard_exit_code:
778 exit_code = shard_exit_code 714 exit_code = shard_exit_code
779 if metadata.get('durations'): 715 total_duration += metadata.get('duration', 0)
780 total_duration += metadata['durations'][0]
781 716
782 if decorate: 717 if decorate:
783 print(decorate_shard_output(swarming, index, metadata)) 718 print(decorate_shard_output(swarming, index, metadata))
784 if len(seen_shards) < len(task_ids): 719 if len(seen_shards) < len(task_ids):
785 print('') 720 print('')
786 else: 721 else:
787 if metadata.get('exit_codes'): 722 print('%s: %s %s' % (
788 exit_code = metadata['exit_codes'][0] 723 metadata.get('bot_id', 'N/A'),
789 else: 724 metadata['task_id'],
790 exit_code = 'N/A' 725 shard_exit_code))
791 print('%s: %s %s' % 726 if metadata['output']:
792 (metadata.get('bot_id') or 'N/A', metadata['id'], exit_code)) 727 output = metadata['output'].rstrip()
793 for output in metadata['outputs']:
794 if not output:
795 continue
796 output = output.rstrip()
797 if output: 728 if output:
798 print(''.join(' %s\n' % l for l in output.splitlines())) 729 print(''.join(' %s\n' % l for l in output.splitlines()))
799 finally: 730 finally:
800 summary = output_collector.finalize() 731 summary = output_collector.finalize()
801 if task_summary_json: 732 if task_summary_json:
733 # TODO(maruel): Make this optional.
734 for i in summary['shards']:
735 if i:
736 convert_to_old_format(i)
802 tools.write_json(task_summary_json, summary, False) 737 tools.write_json(task_summary_json, summary, False)
803 738
804 if decorate and total_duration: 739 if decorate and total_duration:
805 print('Total duration: %.1fs' % total_duration) 740 print('Total duration: %.1fs' % total_duration)
806 741
807 if len(seen_shards) != len(task_ids): 742 if len(seen_shards) != len(task_ids):
808 missing_shards = [x for x in range(len(task_ids)) if x not in seen_shards] 743 missing_shards = [x for x in range(len(task_ids)) if x not in seen_shards]
809 print >> sys.stderr, ('Results from some shards are missing: %s' % 744 print >> sys.stderr, ('Results from some shards are missing: %s' %
810 ', '.join(map(str, missing_shards))) 745 ', '.join(map(str, missing_shards)))
811 return 1 746 return 1
812 747
813 return exit_code 748 return exit_code
814 749
815 750
751 ### API management.
752
753
754 class APIError(Exception):
755 pass
756
757
758 def endpoints_api_discovery_apis(host):
759 """Uses Cloud Endpoints' API Discovery Service to returns metadata about all
760 the APIs exposed by a host.
761
762 https://developers.google.com/discovery/v1/reference/apis/list
763 """
764 data = net.url_read_json(host + '/_ah/api/discovery/v1/apis')
765 if data is None:
766 raise APIError('Failed to discover APIs on %s' % host)
767 out = {}
768 for api in data['items']:
769 if api['id'] == 'discovery:v1':
770 continue
771 # URL is of the following form:
772 # url = host + (
773 # '/_ah/api/discovery/v1/apis/%s/%s/rest' % (api['id'], api['version'])
774 api_data = net.url_read_json(api['discoveryRestUrl'])
775 if api_data is None:
776 raise APIError('Failed to discover %s on %s' % (api['id'], host))
777 out[api['id']] = api_data
778 return out
779
780
816 ### Commands. 781 ### Commands.
817 782
818 783
819 def abort_task(_swarming, _manifest): 784 def abort_task(_swarming, _manifest):
820 """Given a task manifest that was triggered, aborts its execution.""" 785 """Given a task manifest that was triggered, aborts its execution."""
821 # TODO(vadimsh): No supported by the server yet. 786 # TODO(vadimsh): No supported by the server yet.
822 787
823 788
824 def add_filter_options(parser): 789 def add_filter_options(parser):
825 parser.filter_group = optparse.OptionGroup(parser, 'Filtering slaves') 790 parser.filter_group = optparse.OptionGroup(parser, 'Filtering slaves')
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
889 'In this case, no .isolated file is expected.') 854 'In this case, no .isolated file is expected.')
890 parser.add_option_group(parser.task_group) 855 parser.add_option_group(parser.task_group)
891 856
892 857
893 def process_trigger_options(parser, options, args): 858 def process_trigger_options(parser, options, args):
894 """Processes trigger options and uploads files to isolate server if necessary. 859 """Processes trigger options and uploads files to isolate server if necessary.
895 """ 860 """
896 options.dimensions = dict(options.dimensions) 861 options.dimensions = dict(options.dimensions)
897 options.env = dict(options.env) 862 options.env = dict(options.env)
898 863
899 data = []
900 if not options.dimensions: 864 if not options.dimensions:
901 parser.error('Please at least specify one --dimension') 865 parser.error('Please at least specify one --dimension')
902 if options.raw_cmd: 866 if options.raw_cmd:
903 if not args: 867 if not args:
904 parser.error( 868 parser.error(
905 'Arguments with --raw-cmd should be passed after -- as command ' 869 'Arguments with --raw-cmd should be passed after -- as command '
906 'delimiter.') 870 'delimiter.')
907 if options.isolate_server: 871 if options.isolate_server:
908 parser.error('Can\'t use both --raw-cmd and --isolate-server.') 872 parser.error('Can\'t use both --raw-cmd and --isolate-server.')
909 873
910 command = args 874 command = args
911 if not options.task_name: 875 if not options.task_name:
912 options.task_name = u'%s/%s' % ( 876 options.task_name = u'%s/%s' % (
913 options.user, 877 options.user,
914 '_'.join( 878 '_'.join(
915 '%s=%s' % (k, v) 879 '%s=%s' % (k, v)
916 for k, v in sorted(options.dimensions.iteritems()))) 880 for k, v in sorted(options.dimensions.iteritems())))
881 inputs_ref = None
917 else: 882 else:
918 isolateserver.process_isolate_server_options(parser, options, False) 883 isolateserver.process_isolate_server_options(parser, options, False)
919 try: 884 try:
920 command, data = isolated_handle_options(options, args) 885 command, inputs_ref = isolated_handle_options(options, args)
921 except ValueError as e: 886 except ValueError as e:
922 parser.error(str(e)) 887 parser.error(str(e))
923 888
924 return TaskRequest( 889 # If inputs_ref is used, command is actually extra_args. Otherwise it's an
925 command=command, 890 # actual command to run.
926 data=data, 891 properties = TaskProperties(
892 command=None if inputs_ref else command,
927 dimensions=options.dimensions, 893 dimensions=options.dimensions,
928 env=options.env, 894 env=options.env,
929 expiration=options.expiration, 895 execution_timeout_secs=options.hard_timeout,
930 hard_timeout=options.hard_timeout, 896 extra_args=command if inputs_ref else None,
897 grace_period_secs=30,
931 idempotent=options.idempotent, 898 idempotent=options.idempotent,
932 io_timeout=options.io_timeout, 899 inputs_ref=inputs_ref,
900 io_timeout_secs=options.io_timeout)
901 return NewTaskRequest(
902 expiration_secs=options.expiration,
933 name=options.task_name, 903 name=options.task_name,
904 parent_task_id=os.environ.get('SWARMING_TASK_ID', ''),
934 priority=options.priority, 905 priority=options.priority,
906 properties=properties,
935 tags=options.tags, 907 tags=options.tags,
936 user=options.user, 908 user=options.user)
937 verbose=options.verbose)
938 909
939 910
940 def add_collect_options(parser): 911 def add_collect_options(parser):
941 parser.server_group.add_option( 912 parser.server_group.add_option(
942 '-t', '--timeout', 913 '-t', '--timeout',
943 type='float', 914 type='float',
944 default=80*60., 915 default=80*60.,
945 help='Timeout to wait for result, set to 0 for no timeout; default: ' 916 help='Timeout to wait for result, set to 0 for no timeout; default: '
946 '%default s') 917 '%default s')
947 parser.group_logging.add_option( 918 parser.group_logging.add_option(
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
979 if not options.force: 950 if not options.force:
980 print('Delete the following bots?') 951 print('Delete the following bots?')
981 for bot in bots: 952 for bot in bots:
982 print(' %s' % bot) 953 print(' %s' % bot)
983 if raw_input('Continue? [y/N] ') not in ('y', 'Y'): 954 if raw_input('Continue? [y/N] ') not in ('y', 'Y'):
984 print('Goodbye.') 955 print('Goodbye.')
985 return 1 956 return 1
986 957
987 result = 0 958 result = 0
988 for bot in bots: 959 for bot in bots:
989 url = '%s/swarming/api/v1/client/bot/%s' % (options.swarming, bot) 960 url = '%s/_ah/api/swarming/v1/bot/%s' % (options.swarming, bot)
990 if net.url_read_json(url, method='DELETE') is None: 961 if net.url_read_json(url, method='DELETE') is None:
991 print('Deleting %s failed' % bot) 962 print('Deleting %s failed' % bot)
992 result = 1 963 result = 1
993 return result 964 return result
994 965
995 966
996 def CMDbots(parser, args): 967 def CMDbots(parser, args):
997 """Returns information about the bots connected to the Swarming server.""" 968 """Returns information about the bots connected to the Swarming server."""
998 add_filter_options(parser) 969 add_filter_options(parser)
999 parser.filter_group.add_option( 970 parser.filter_group.add_option(
1000 '--dead-only', action='store_true', 971 '--dead-only', action='store_true',
1001 help='Only print dead bots, useful to reap them and reimage broken bots') 972 help='Only print dead bots, useful to reap them and reimage broken bots')
1002 parser.filter_group.add_option( 973 parser.filter_group.add_option(
1003 '-k', '--keep-dead', action='store_true', 974 '-k', '--keep-dead', action='store_true',
1004 help='Do not filter out dead bots') 975 help='Do not filter out dead bots')
1005 parser.filter_group.add_option( 976 parser.filter_group.add_option(
1006 '-b', '--bare', action='store_true', 977 '-b', '--bare', action='store_true',
1007 help='Do not print out dimensions') 978 help='Do not print out dimensions')
1008 options, args = parser.parse_args(args) 979 options, args = parser.parse_args(args)
1009 980
1010 if options.keep_dead and options.dead_only: 981 if options.keep_dead and options.dead_only:
1011 parser.error('Use only one of --keep-dead and --dead-only') 982 parser.error('Use only one of --keep-dead and --dead-only')
1012 983
1013 bots = [] 984 bots = []
1014 cursor = None 985 cursor = None
1015 limit = 250 986 limit = 250
1016 # Iterate via cursors. 987 # Iterate via cursors.
1017 base_url = options.swarming + '/swarming/api/v1/client/bots?limit=%d' % limit 988 base_url = (
989 options.swarming + '/_ah/api/swarming/v1/bots/list?limit=%d' % limit)
1018 while True: 990 while True:
1019 url = base_url 991 url = base_url
1020 if cursor: 992 if cursor:
1021 url += '&cursor=%s' % urllib.quote(cursor) 993 url += '&cursor=%s' % urllib.quote(cursor)
1022 data = net.url_read_json(url) 994 data = net.url_read_json(url)
1023 if data is None: 995 if data is None:
1024 print >> sys.stderr, 'Failed to access %s' % options.swarming 996 print >> sys.stderr, 'Failed to access %s' % options.swarming
1025 return 1 997 return 1
1026 bots.extend(data['items']) 998 bots.extend(data['items'])
1027 cursor = data['cursor'] 999 cursor = data.get('cursor')
1028 if not cursor: 1000 if not cursor:
1029 break 1001 break
1030 1002
1031 for bot in natsort.natsorted(bots, key=lambda x: x['id']): 1003 for bot in natsort.natsorted(bots, key=lambda x: x['bot_id']):
1032 if options.dead_only: 1004 if options.dead_only:
1033 if not bot['is_dead']: 1005 if not bot.get('is_dead'):
1034 continue 1006 continue
1035 elif not options.keep_dead and bot['is_dead']: 1007 elif not options.keep_dead and bot.get('is_dead'):
1036 continue 1008 continue
1037 1009
1038 # If the user requested to filter on dimensions, ensure the bot has all the 1010 # If the user requested to filter on dimensions, ensure the bot has all the
1039 # dimensions requested. 1011 # dimensions requested.
1040 dimensions = bot['dimensions'] 1012 dimensions = {i['key']: i['value'] for i in bot['dimensions']}
1041 for key, value in options.dimensions: 1013 for key, value in options.dimensions:
1042 if key not in dimensions: 1014 if key not in dimensions:
1043 break 1015 break
1044 # A bot can have multiple value for a key, for example, 1016 # A bot can have multiple value for a key, for example,
1045 # {'os': ['Windows', 'Windows-6.1']}, so that --dimension os=Windows will 1017 # {'os': ['Windows', 'Windows-6.1']}, so that --dimension os=Windows will
1046 # be accepted. 1018 # be accepted.
1047 if isinstance(dimensions[key], list): 1019 if isinstance(dimensions[key], list):
1048 if value not in dimensions[key]: 1020 if value not in dimensions[key]:
1049 break 1021 break
1050 else: 1022 else:
1051 if value != dimensions[key]: 1023 if value != dimensions[key]:
1052 break 1024 break
1053 else: 1025 else:
1054 print bot['id'] 1026 print bot['bot_id']
1055 if not options.bare: 1027 if not options.bare:
1056 print ' %s' % json.dumps(dimensions, sort_keys=True) 1028 print ' %s' % json.dumps(dimensions, sort_keys=True)
1057 if bot.get('task_id'): 1029 if bot.get('task_id'):
1058 print ' task: %s' % bot['task_id'] 1030 print ' task: %s' % bot['task_id']
1059 return 0 1031 return 0
1060 1032
1061 1033
1062 @subcommand.usage('--json file | task_id...') 1034 @subcommand.usage('--json file | task_id...')
1063 def CMDcollect(parser, args): 1035 def CMDcollect(parser, args):
1064 """Retrieves results of one or multiple Swarming task by its ID. 1036 """Retrieves results of one or multiple Swarming task by its ID.
1065 1037
1066 The result can be in multiple part if the execution was sharded. It can 1038 The result can be in multiple part if the execution was sharded. It can
1067 potentially have retries. 1039 potentially have retries.
1068 """ 1040 """
1069 add_collect_options(parser) 1041 add_collect_options(parser)
1070 parser.add_option( 1042 parser.add_option(
1071 '-j', '--json', 1043 '-j', '--json',
1072 help='Load the task ids from .json as saved by trigger --dump-json') 1044 help='Load the task ids from .json as saved by trigger --dump-json')
1073 (options, args) = parser.parse_args(args) 1045 options, args = parser.parse_args(args)
1074 if not args and not options.json: 1046 if not args and not options.json:
1075 parser.error('Must specify at least one task id or --json.') 1047 parser.error('Must specify at least one task id or --json.')
1076 if args and options.json: 1048 if args and options.json:
1077 parser.error('Only use one of task id or --json.') 1049 parser.error('Only use one of task id or --json.')
1078 1050
1079 if options.json: 1051 if options.json:
1080 try: 1052 try:
1081 with open(options.json) as f: 1053 with open(options.json) as f:
1082 tasks = sorted( 1054 tasks = sorted(
1083 json.load(f)['tasks'].itervalues(), key=lambda x: x['shard_index']) 1055 json.load(f)['tasks'].itervalues(), key=lambda x: x['shard_index'])
(...skipping 13 matching lines...) Expand all
1097 options.timeout, 1069 options.timeout,
1098 options.decorate, 1070 options.decorate,
1099 options.print_status_updates, 1071 options.print_status_updates,
1100 options.task_summary_json, 1072 options.task_summary_json,
1101 options.task_output_dir) 1073 options.task_output_dir)
1102 except Failure: 1074 except Failure:
1103 on_error.report(None) 1075 on_error.report(None)
1104 return 1 1076 return 1
1105 1077
1106 1078
1107 @subcommand.usage('[resource name]') 1079 @subcommand.usage('[method name]')
1108 def CMDquery(parser, args): 1080 def CMDquery(parser, args):
1109 """Returns raw JSON information via an URL endpoint. Use 'list' to gather the 1081 """Returns raw JSON information via an URL endpoint. Use 'query-list' to
1110 list of valid values from the server. 1082 gather the list of API methods from the server.
1111 1083
1112 Examples: 1084 Examples:
1113 Printing the list of known URLs: 1085 Listing all bots:
1114 swarming.py query -S https://server-url list 1086 swarming.py query -S https://server-url bots/list
1115 1087
1116 Listing last 50 tasks on a specific bot named 'swarm1' 1088 Listing last 10 tasks on a specific bot named 'swarm1':
1117 swarming.py query -S https://server-url --limit 50 bot/swarm1/tasks 1089 swarming.py query -S https://server-url --limit 10 bot/swarm1/tasks
1118 """ 1090 """
1119 CHUNK_SIZE = 250 1091 CHUNK_SIZE = 250
1120 1092
1121 parser.add_option( 1093 parser.add_option(
1122 '-L', '--limit', type='int', default=200, 1094 '-L', '--limit', type='int', default=200,
1123 help='Limit to enforce on limitless items (like number of tasks); ' 1095 help='Limit to enforce on limitless items (like number of tasks); '
1124 'default=%default') 1096 'default=%default')
1125 parser.add_option( 1097 parser.add_option(
1126 '--json', help='Path to JSON output file (otherwise prints to stdout)') 1098 '--json', help='Path to JSON output file (otherwise prints to stdout)')
1127 (options, args) = parser.parse_args(args) 1099 parser.add_option(
1100 '--progress', action='store_true',
1101 help='Prints a dot at each request to show progress')
1102 options, args = parser.parse_args(args)
1128 if len(args) != 1: 1103 if len(args) != 1:
1129 parser.error('Must specify only one resource name.') 1104 parser.error(
1130 1105 'Must specify only method name and optionally query args properly '
1131 base_url = options.swarming + '/swarming/api/v1/client/' + args[0] 1106 'escaped.')
1107 base_url = options.swarming + '/_ah/api/swarming/v1/' + args[0]
1132 url = base_url 1108 url = base_url
1133 if options.limit: 1109 if options.limit:
1134 # Check check, change if not working out. 1110 # Check check, change if not working out.
1135 merge_char = '&' if '?' in url else '?' 1111 merge_char = '&' if '?' in url else '?'
1136 url += '%slimit=%d' % (merge_char, min(CHUNK_SIZE, options.limit)) 1112 url += '%slimit=%d' % (merge_char, min(CHUNK_SIZE, options.limit))
1137 data = net.url_read_json(url) 1113 data = net.url_read_json(url)
1138 if data is None: 1114 if data is None:
1139 print >> sys.stderr, 'Failed to access %s' % options.swarming 1115 # TODO(maruel): Do basic diagnostic.
1116 print >> sys.stderr, 'Failed to access %s' % url
1140 return 1 1117 return 1
1141 1118
1142 # Some items support cursors. Try to get automatically if cursors are needed 1119 # Some items support cursors. Try to get automatically if cursors are needed
1143 # by looking at the 'cursor' items. 1120 # by looking at the 'cursor' items.
1144 while ( 1121 while (
1145 data.get('cursor') and 1122 data.get('cursor') and
1146 (not options.limit or len(data['items']) < options.limit)): 1123 (not options.limit or len(data['items']) < options.limit)):
1147 merge_char = '&' if '?' in base_url else '?' 1124 merge_char = '&' if '?' in base_url else '?'
1148 url = base_url + '%scursor=%s' % (merge_char, urllib.quote(data['cursor'])) 1125 url = base_url + '%scursor=%s' % (merge_char, urllib.quote(data['cursor']))
1149 if options.limit: 1126 if options.limit:
1150 url += '&limit=%d' % min(CHUNK_SIZE, options.limit - len(data['items'])) 1127 url += '&limit=%d' % min(CHUNK_SIZE, options.limit - len(data['items']))
1128 if options.progress:
1129 sys.stdout.write('.')
1130 sys.stdout.flush()
1151 new = net.url_read_json(url) 1131 new = net.url_read_json(url)
1152 if new is None: 1132 if new is None:
1133 if options.progress:
1134 print('')
1153 print >> sys.stderr, 'Failed to access %s' % options.swarming 1135 print >> sys.stderr, 'Failed to access %s' % options.swarming
1154 return 1 1136 return 1
1155 data['items'].extend(new['items']) 1137 data['items'].extend(new['items'])
1156 data['cursor'] = new['cursor'] 1138 data['cursor'] = new.get('cursor')
1157 1139
1140 if options.progress:
1141 print('')
1158 if options.limit and len(data.get('items', [])) > options.limit: 1142 if options.limit and len(data.get('items', [])) > options.limit:
1159 data['items'] = data['items'][:options.limit] 1143 data['items'] = data['items'][:options.limit]
1160 data.pop('cursor', None) 1144 data.pop('cursor', None)
1161 1145
1162 if options.json: 1146 if options.json:
1163 with open(options.json, 'w') as f: 1147 tools.write_json(options.json, data, True)
1164 json.dump(data, f)
1165 else: 1148 else:
1166 try: 1149 try:
1167 json.dump(data, sys.stdout, indent=2, sort_keys=True) 1150 tools.write_json(sys.stdout, data, False)
1168 sys.stdout.write('\n') 1151 sys.stdout.write('\n')
1169 except IOError: 1152 except IOError:
1170 pass 1153 pass
1171 return 0 1154 return 0
1172 1155
1173 1156
1157 def CMDquery_list(parser, args):
1158 """Returns list of all the Swarming APIs that can be used with command
1159 'query'.
1160 """
1161 parser.add_option(
1162 '--json', help='Path to JSON output file (otherwise prints to stdout)')
1163 options, args = parser.parse_args(args)
1164 if args:
1165 parser.error('No argument allowed.')
1166
1167 try:
1168 apis = endpoints_api_discovery_apis(options.swarming)
1169 except APIError as e:
1170 parser.error(str(e))
1171 if options.json:
1172 with open(options.json, 'wb') as f:
1173 json.dump(apis, f)
1174 else:
1175 help_url = (
1176 'https://apis-explorer.appspot.com/apis-explorer/?base=%s/_ah/api#p/' %
1177 options.swarming)
1178 for api_id, api in sorted(apis.iteritems()):
1179 print api_id
1180 print ' ' + api['description']
1181 for resource_name, resource in sorted(api['resources'].iteritems()):
1182 print ''
1183 for method_name, method in sorted(resource['methods'].iteritems()):
1184 # Only list the GET ones.
1185 if method['httpMethod'] != 'GET':
1186 continue
1187 print '- %s.%s: %s' % (
1188 resource_name, method_name, method['path'])
1189 print ' ' + method['description']
1190 print ' %s%s%s' % (help_url, api['servicePath'], method['id'])
1191 return 0
1192
1193
1174 @subcommand.usage('(hash|isolated) [-- extra_args]') 1194 @subcommand.usage('(hash|isolated) [-- extra_args]')
1175 def CMDrun(parser, args): 1195 def CMDrun(parser, args):
1176 """Triggers a task and wait for the results. 1196 """Triggers a task and wait for the results.
1177 1197
1178 Basically, does everything to run a command remotely. 1198 Basically, does everything to run a command remotely.
1179 """ 1199 """
1180 add_trigger_options(parser) 1200 add_trigger_options(parser)
1181 add_collect_options(parser) 1201 add_collect_options(parser)
1182 add_sharding_options(parser) 1202 add_sharding_options(parser)
1183 options, args = parser.parse_args(args) 1203 options, args = parser.parse_args(args)
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
1218 """Runs a task locally that was triggered on the server. 1238 """Runs a task locally that was triggered on the server.
1219 1239
1220 This running locally the same commands that have been run on the bot. The data 1240 This running locally the same commands that have been run on the bot. The data
1221 downloaded will be in a subdirectory named 'work' of the current working 1241 downloaded will be in a subdirectory named 'work' of the current working
1222 directory. 1242 directory.
1223 """ 1243 """
1224 options, args = parser.parse_args(args) 1244 options, args = parser.parse_args(args)
1225 if len(args) != 1: 1245 if len(args) != 1:
1226 parser.error('Must specify exactly one task id.') 1246 parser.error('Must specify exactly one task id.')
1227 1247
1228 url = options.swarming + '/swarming/api/v1/client/task/%s/request' % args[0] 1248 url = options.swarming + '/_ah/api/swarming/v1/task/%s/request' % args[0]
1229 request = net.url_read_json(url) 1249 request = net.url_read_json(url)
1230 if not request: 1250 if not request:
1231 print >> sys.stderr, 'Failed to retrieve request data for the task' 1251 print >> sys.stderr, 'Failed to retrieve request data for the task'
1232 return 1 1252 return 1
1233 1253
1234 if not os.path.isdir('work'): 1254 if not os.path.isdir('work'):
1235 os.mkdir('work') 1255 os.mkdir('work')
1236 1256
1237 swarming_host = urlparse.urlparse(options.swarming).netloc
1238 properties = request['properties'] 1257 properties = request['properties']
1239 for data_url, _ in properties['data']:
1240 assert data_url.startswith('https://'), data_url
1241 data_host = urlparse.urlparse(data_url).netloc
1242 if data_host != swarming_host:
1243 auth.ensure_logged_in('https://' + data_host)
1244
1245 content = net.url_read(data_url)
1246 if content is None:
1247 print >> sys.stderr, 'Failed to download %s' % data_url
1248 return 1
1249 with zipfile.ZipFile(StringIO.StringIO(content)) as zip_file:
1250 zip_file.extractall('work')
1251
1252 env = None 1258 env = None
1253 if properties['env']: 1259 if properties['env']:
1254 env = os.environ.copy() 1260 env = os.environ.copy()
1255 logging.info('env: %r', properties['env']) 1261 logging.info('env: %r', properties['env'])
1256 env.update( 1262 env.update(
1257 (k.encode('utf-8'), v.encode('utf-8')) 1263 (i['key'].encode('utf-8'), i['value'].encode('utf-8'))
1258 for k, v in properties['env'].iteritems()) 1264 for i in properties['env'])
1259 1265
1260 exit_code = 0 1266 try:
1261 for cmd in properties['commands']: 1267 return subprocess.call(properties['command'], env=env, cwd='work')
1262 try: 1268 except OSError as e:
1263 c = subprocess.call(cmd, env=env, cwd='work') 1269 print >> sys.stderr, 'Failed to run: %s' % ' '.join(properties['command'])
1264 except OSError as e: 1270 print >> sys.stderr, str(e)
1265 print >> sys.stderr, 'Failed to run: %s' % ' '.join(cmd) 1271 return 1
1266 print >> sys.stderr, str(e)
1267 c = 1
1268 if not exit_code:
1269 exit_code = c
1270 return exit_code
1271 1272
1272 1273
1273 @subcommand.usage("(hash|isolated) [-- extra_args|raw command]") 1274 @subcommand.usage("(hash|isolated) [-- extra_args|raw command]")
1274 def CMDtrigger(parser, args): 1275 def CMDtrigger(parser, args):
1275 """Triggers a Swarming task. 1276 """Triggers a Swarming task.
1276 1277
1277 Accepts either the hash (sha1) of a .isolated file already uploaded or the 1278 Accepts either the hash (sha1) of a .isolated file already uploaded or the
1278 path to an .isolated file to archive. 1279 path to an .isolated file to archive.
1279 1280
1280 If an .isolated file is specified instead of an hash, it is first archived. 1281 If an .isolated file is specified instead of an hash, it is first archived.
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
1362 def main(args): 1363 def main(args):
1363 dispatcher = subcommand.CommandDispatcher(__name__) 1364 dispatcher = subcommand.CommandDispatcher(__name__)
1364 return dispatcher.execute(OptionParserSwarming(version=__version__), args) 1365 return dispatcher.execute(OptionParserSwarming(version=__version__), args)
1365 1366
1366 1367
1367 if __name__ == '__main__': 1368 if __name__ == '__main__':
1368 fix_encoding.fix_encoding() 1369 fix_encoding.fix_encoding()
1369 tools.disable_buffering() 1370 tools.disable_buffering()
1370 colorama.init() 1371 colorama.init()
1371 sys.exit(main(sys.argv[1:])) 1372 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « client/example/payload/hello_world.py ('k') | client/tests/swarming_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698