Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(329)

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/task_runner.py

Issue 2024313003: Send authorization headers when calling Swarming backend. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: rebase Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2013 The LUCI Authors. All rights reserved. 1 # Copyright 2013 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Runs a Swarming task. 5 """Runs a Swarming task.
6 6
7 Downloads all the necessary files to run the task, executes the command and 7 Downloads all the necessary files to run the task, executes the command and
8 streams results back to the Swarming server. 8 streams results back to the Swarming server.
9 9
10 The process exit code is 0 when the task was executed, even if the task itself 10 The process exit code is 0 when the task was executed, even if the task itself
(...skipping 10 matching lines...) Expand all
21 import os 21 import os
22 import signal 22 import signal
23 import sys 23 import sys
24 import time 24 import time
25 25
26 from utils import net 26 from utils import net
27 from utils import on_error 27 from utils import on_error
28 from utils import subprocess42 28 from utils import subprocess42
29 from utils import zip_package 29 from utils import zip_package
30 30
31 import bot_auth
32 import file_reader
33
31 34
32 # Path to this file or the zip containing this file. 35 # Path to this file or the zip containing this file.
33 THIS_FILE = os.path.abspath(zip_package.get_main_script_path()) 36 THIS_FILE = os.path.abspath(zip_package.get_main_script_path())
34 37
35 38
36 # Sends a maximum of 100kb of stdout per task_update packet. 39 # Sends a maximum of 100kb of stdout per task_update packet.
37 MAX_CHUNK_SIZE = 102400 40 MAX_CHUNK_SIZE = 102400
38 41
39 42
40 # Maximum wait between task_update packet when there's no output. 43 # Maximum wait between task_update packet when there's no output.
(...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after
175 work_dir = os.path.dirname(out_file) 178 work_dir = os.path.dirname(out_file)
176 try: 179 try:
177 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler): 180 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler):
178 if not os.path.isdir(work_dir): 181 if not os.path.isdir(work_dir):
179 raise ValueError('%s expected to exist' % work_dir) 182 raise ValueError('%s expected to exist' % work_dir)
180 183
181 with open(in_file, 'rb') as f: 184 with open(in_file, 'rb') as f:
182 task_details = TaskDetails(json.load(f)) 185 task_details = TaskDetails(json.load(f))
183 186
184 task_result = run_command( 187 task_result = run_command(
185 swarming_server, task_details, work_dir, cost_usd_hour, start, 188 swarming_server, task_details, work_dir,
186 min_free_space) 189 cost_usd_hour, start, min_free_space)
187 except MustExit as e: 190 except MustExit as e:
188 # This normally means run_command() didn't get the chance to run, as it 191 # This normally means run_command() didn't get the chance to run, as it
189 # itself trap MustExit and will report accordingly. In this case, we want 192 # itself trap MustExit and will report accordingly. In this case, we want
190 # the parent process to send the message instead. 193 # the parent process to send the message instead.
191 if not task_result: 194 if not task_result:
192 task_result = { 195 task_result = {
193 u'exit_code': None, 196 u'exit_code': None,
194 u'hard_timeout': False, 197 u'hard_timeout': False,
195 u'io_timeout': False, 198 u'io_timeout': False,
196 u'must_signal_internal_failure': 199 u'must_signal_internal_failure':
197 u'task_runner received signal %s' % e.signal, 200 u'task_runner received signal %s' % e.signal,
198 u'version': OUT_VERSION, 201 u'version': OUT_VERSION,
199 } 202 }
200 finally: 203 finally:
201 # We've found tests to delete 'work' when quitting, causing an exception 204 # We've found tests to delete 'work' when quitting, causing an exception
202 # here. Try to recreate the directory if necessary. 205 # here. Try to recreate the directory if necessary.
203 if not os.path.isdir(work_dir): 206 if not os.path.isdir(work_dir):
204 os.mkdir(work_dir) 207 os.mkdir(work_dir)
205 with open(out_file, 'wb') as f: 208 with open(out_file, 'wb') as f:
206 json.dump(task_result, f) 209 json.dump(task_result, f)
207 210
208 211
209 def post_update(swarming_server, params, exit_code, stdout, output_chunk_start): 212 def post_update(
213 swarming_server, auth_headers, params, exit_code,
214 stdout, output_chunk_start):
210 """Posts task update to task_update. 215 """Posts task update to task_update.
211 216
212 Arguments: 217 Arguments:
213 swarming_server: Base URL to Swarming server. 218 swarming_server: Base URL to Swarming server.
219 auth_headers: dict with HTTP authentication headers.
214 params: Default JSON parameters for the POST. 220 params: Default JSON parameters for the POST.
215 exit_code: Process exit code, only when a command completed. 221 exit_code: Process exit code, only when a command completed.
216 stdout: Incremental output since last call, if any. 222 stdout: Incremental output since last call, if any.
217 output_chunk_start: Total number of stdout previously sent, for coherency 223 output_chunk_start: Total number of stdout previously sent, for coherency
218 with the server. 224 with the server.
219 """ 225 """
220 params = params.copy() 226 params = params.copy()
221 if exit_code is not None: 227 if exit_code is not None:
222 params['exit_code'] = exit_code 228 params['exit_code'] = exit_code
223 if stdout: 229 if stdout:
224 # The output_chunk_start is used by the server to make sure that the stdout 230 # The output_chunk_start is used by the server to make sure that the stdout
225 # chunks are processed and saved in the DB in order. 231 # chunks are processed and saved in the DB in order.
226 params['output'] = base64.b64encode(stdout) 232 params['output'] = base64.b64encode(stdout)
227 params['output_chunk_start'] = output_chunk_start 233 params['output_chunk_start'] = output_chunk_start
228 # TODO(maruel): Support early cancellation. 234 # TODO(maruel): Support early cancellation.
229 # https://code.google.com/p/swarming/issues/detail?id=62 235 # https://code.google.com/p/swarming/issues/detail?id=62
230 resp = net.url_read_json( 236 resp = net.url_read_json(
231 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'], 237 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'],
232 data=params) 238 data=params,
239 headers=auth_headers,
240 follow_redirects=False)
233 logging.debug('post_update() = %s', resp) 241 logging.debug('post_update() = %s', resp)
234 if not resp or resp.get('error'): 242 if not resp or resp.get('error'):
235 # Abandon it. This will force a process exit. 243 # Abandon it. This will force a process exit.
236 raise ValueError(resp.get('error') if resp else 'Failed to contact server') 244 raise ValueError(resp.get('error') if resp else 'Failed to contact server')
237 245
238 246
239 def should_post_update(stdout, now, last_packet): 247 def should_post_update(stdout, now, last_packet):
240 """Returns True if it's time to send a task_update packet via post_update(). 248 """Returns True if it's time to send a task_update packet via post_update().
241 249
242 Sends a packet when one of this condition is met: 250 Sends a packet when one of this condition is met:
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
274 try: 282 try:
275 proc.wait(grace_period) 283 proc.wait(grace_period)
276 except subprocess42.TimeoutError: 284 except subprocess42.TimeoutError:
277 logging.warning('SIGKILL finally due to %s', reason) 285 logging.warning('SIGKILL finally due to %s', reason)
278 proc.kill() 286 proc.kill()
279 exit_code = proc.wait() 287 exit_code = proc.wait()
280 logging.info('Waiting for proces exit in finally - done') 288 logging.info('Waiting for proces exit in finally - done')
281 return exit_code 289 return exit_code
282 290
283 291
292 def start_reading_headers(auth_params_file):
293 """Spawns a thread that rereads headers from SWARMING_AUTH_PARAMS file.
294
295 Returns:
296 Tuple (callback that returns the last known headers, stop callback).
297
298 Raises:
299 file_reader.FatalReadError if headers file can't be read.
300 ValueError if it can be read, but its body is invalid.
301 """
302 # Read headers more often than bot_main writes them, to reduce maximum
303 # possible latency between headers are updated and read.
304 reader = file_reader.FileReaderThread(auth_params_file, interval_sec=30)
305
306 def read_and_validate_headers():
307 val = bot_auth.process_auth_params_json(reader.last_value or {})
308 return val.swarming_http_headers
309
310 reader.start()
311 read_and_validate_headers() # initial validation, may raise ValueError
312 return read_and_validate_headers, reader.stop
313
314
284 def run_command( 315 def run_command(
285 swarming_server, task_details, work_dir, cost_usd_hour, task_start, 316 swarming_server, task_details, work_dir, cost_usd_hour,
286 min_free_space): 317 task_start, min_free_space):
287 """Runs a command and sends packets to the server to stream results back. 318 """Runs a command and sends packets to the server to stream results back.
288 319
289 Implements both I/O and hard timeouts. Sends the packets numbered, so the 320 Implements both I/O and hard timeouts. Sends the packets numbered, so the
290 server can ensure they are processed in order. 321 server can ensure they are processed in order.
291 322
292 Returns: 323 Returns:
293 Metadata about the command. 324 Metadata about the command.
294 """ 325 """
295 # TODO(maruel): This function is incomprehensible, split and refactor. 326 # TODO(maruel): This function is incomprehensible, split and refactor.
327
328 # Grab initial auth headers and start rereading them in parallel thread. They
329 # MUST be there already. It's fatal internal error if they are not.
330 headers_cb = lambda: {}
331 stop_headers_reader = lambda: None
332 auth_params_file = os.environ.get('SWARMING_AUTH_PARAMS')
333 if auth_params_file:
334 try:
335 headers_cb, stop_headers_reader = start_reading_headers(auth_params_file)
336 except (ValueError, file_reader.FatalReadError) as e:
337 return {
338 u'exit_code': 1,
339 u'hard_timeout': False,
340 u'io_timeout': False,
341 u'must_signal_internal_failure': str(e),
342 u'version': OUT_VERSION,
343 }
344
296 # Signal the command is about to be started. 345 # Signal the command is about to be started.
297 last_packet = start = now = monotonic_time() 346 last_packet = start = now = monotonic_time()
298 params = { 347 params = {
299 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., 348 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60.,
300 'id': task_details.bot_id, 349 'id': task_details.bot_id,
301 'task_id': task_details.task_id, 350 'task_id': task_details.task_id,
302 } 351 }
303 post_update(swarming_server, params, None, '', 0) 352 post_update(swarming_server, headers_cb(), params, None, '', 0)
304 353
305 isolated_result = os.path.join(work_dir, 'isolated_result.json') 354 isolated_result = os.path.join(work_dir, 'isolated_result.json')
306 cmd = get_isolated_cmd( 355 cmd = get_isolated_cmd(
307 work_dir, task_details, isolated_result, min_free_space) 356 work_dir, task_details, isolated_result, min_free_space)
308 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to 357 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to
309 # give one 'grace_period' slot to the child process and one slot to upload 358 # give one 'grace_period' slot to the child process and one slot to upload
310 # the results back. 359 # the results back.
311 task_details.hard_timeout = 0 360 task_details.hard_timeout = 0
312 if task_details.grace_period: 361 if task_details.grace_period:
313 task_details.grace_period *= 2 362 task_details.grace_period *= 2
(...skipping 21 matching lines...) Expand all
335 stdout=subprocess42.PIPE, 384 stdout=subprocess42.PIPE,
336 stderr=subprocess42.STDOUT, 385 stderr=subprocess42.STDOUT,
337 stdin=subprocess42.PIPE) 386 stdin=subprocess42.PIPE)
338 except OSError as e: 387 except OSError as e:
339 stdout = 'Command "%s" failed to start.\nError: %s' % (' '.join(cmd), e) 388 stdout = 'Command "%s" failed to start.\nError: %s' % (' '.join(cmd), e)
340 now = monotonic_time() 389 now = monotonic_time()
341 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. 390 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60.
342 params['duration'] = now - start 391 params['duration'] = now - start
343 params['io_timeout'] = False 392 params['io_timeout'] = False
344 params['hard_timeout'] = False 393 params['hard_timeout'] = False
345 post_update(swarming_server, params, 1, stdout, 0) 394 post_update(swarming_server, headers_cb(), params, 1, stdout, 0)
346 return { 395 return {
347 u'exit_code': 1, 396 u'exit_code': 1,
348 u'hard_timeout': False, 397 u'hard_timeout': False,
349 u'io_timeout': False, 398 u'io_timeout': False,
350 u'must_signal_internal_failure': None, 399 u'must_signal_internal_failure': None,
351 u'version': OUT_VERSION, 400 u'version': OUT_VERSION,
352 } 401 }
353 402
354 output_chunk_start = 0 403 output_chunk_start = 0
355 stdout = '' 404 stdout = ''
(...skipping 11 matching lines...) Expand all
367 now = monotonic_time() 416 now = monotonic_time()
368 if new_data: 417 if new_data:
369 stdout += new_data 418 stdout += new_data
370 last_io = now 419 last_io = now
371 420
372 # Post update if necessary. 421 # Post update if necessary.
373 if should_post_update(stdout, now, last_packet): 422 if should_post_update(stdout, now, last_packet):
374 last_packet = monotonic_time() 423 last_packet = monotonic_time()
375 params['cost_usd'] = ( 424 params['cost_usd'] = (
376 cost_usd_hour * (last_packet - task_start) / 60. / 60.) 425 cost_usd_hour * (last_packet - task_start) / 60. / 60.)
377 post_update(swarming_server, params, None, stdout, output_chunk_start) 426 post_update(
427 swarming_server, headers_cb(), params, None,
428 stdout, output_chunk_start)
378 output_chunk_start += len(stdout) 429 output_chunk_start += len(stdout)
379 stdout = '' 430 stdout = ''
380 431
381 # Send signal on timeout if necessary. Both are failures, not 432 # Send signal on timeout if necessary. Both are failures, not
382 # internal_failures. 433 # internal_failures.
383 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. 434 # Eventually kill but return 0 so bot_main.py doesn't cancel the task.
384 if not timed_out: 435 if not timed_out:
385 if (task_details.io_timeout and 436 if (task_details.io_timeout and
386 now - last_io > task_details.io_timeout): 437 now - last_io > task_details.io_timeout):
387 had_io_timeout = True 438 had_io_timeout = True
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
473 params['isolated_stats'] = isolated_stats 524 params['isolated_stats'] = isolated_stats
474 except (IOError, OSError, ValueError) as e: 525 except (IOError, OSError, ValueError) as e:
475 logging.error('Swallowing error: %s', e) 526 logging.error('Swallowing error: %s', e)
476 if not must_signal_internal_failure: 527 if not must_signal_internal_failure:
477 must_signal_internal_failure = str(e) 528 must_signal_internal_failure = str(e)
478 # TODO(maruel): Send the internal failure here instead of sending it through 529 # TODO(maruel): Send the internal failure here instead of sending it through
479 # bot_main, this causes a race condition. 530 # bot_main, this causes a race condition.
480 if exit_code is None: 531 if exit_code is None:
481 exit_code = -1 532 exit_code = -1
482 params['hard_timeout'] = had_hard_timeout 533 params['hard_timeout'] = had_hard_timeout
483 post_update(swarming_server, params, exit_code, stdout, output_chunk_start) 534 post_update(
535 swarming_server, headers_cb(), params, exit_code,
536 stdout, output_chunk_start)
484 return { 537 return {
485 u'exit_code': exit_code, 538 u'exit_code': exit_code,
486 u'hard_timeout': had_hard_timeout, 539 u'hard_timeout': had_hard_timeout,
487 u'io_timeout': had_io_timeout, 540 u'io_timeout': had_io_timeout,
488 u'must_signal_internal_failure': must_signal_internal_failure, 541 u'must_signal_internal_failure': must_signal_internal_failure,
489 u'version': OUT_VERSION, 542 u'version': OUT_VERSION,
490 } 543 }
491 finally: 544 finally:
492 try: 545 try:
493 os.remove(isolated_result) 546 os.remove(isolated_result)
494 except OSError: 547 except OSError:
495 pass 548 pass
549 stop_headers_reader()
496 550
497 551
498 def main(args): 552 def main(args):
499 subprocess42.inhibit_os_error_reporting() 553 subprocess42.inhibit_os_error_reporting()
500 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) 554 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
501 parser.add_option('--in-file', help='Name of the request file') 555 parser.add_option('--in-file', help='Name of the request file')
502 parser.add_option( 556 parser.add_option(
503 '--out-file', help='Name of the JSON file to write a task summary to') 557 '--out-file', help='Name of the JSON file to write a task summary to')
504 parser.add_option( 558 parser.add_option(
505 '--swarming-server', help='Swarming server to send data back') 559 '--swarming-server', help='Swarming server to send data back')
(...skipping 15 matching lines...) Expand all
521 if options.start > now: 575 if options.start > now:
522 options.start = now 576 options.start = now
523 577
524 try: 578 try:
525 load_and_run( 579 load_and_run(
526 options.in_file, options.swarming_server, options.cost_usd_hour, 580 options.in_file, options.swarming_server, options.cost_usd_hour,
527 options.start, options.out_file, options.min_free_space) 581 options.start, options.out_file, options.min_free_space)
528 return 0 582 return 0
529 finally: 583 finally:
530 logging.info('quitting') 584 logging.info('quitting')
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698