Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(111)

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/task_runner.py

Issue 1949613002: task_runner: always use run_isolated (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: rebased Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2013 The LUCI Authors. All rights reserved. 1 # Copyright 2013 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Runs a Swarming task. 5 """Runs a Swarming task.
6 6
7 Downloads all the necessary files to run the task, executes the command and 7 Downloads all the necessary files to run the task, executes the command and
8 streams results back to the Swarming server. 8 streams results back to the Swarming server.
9 9
10 The process exit code is 0 when the task was executed, even if the task itself 10 The process exit code is 0 when the task was executed, even if the task itself
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after
107 '--cache', os.path.join(bot_dir, 'cache'), 107 '--cache', os.path.join(bot_dir, 'cache'),
108 '--root-dir', os.path.join(work_dir, 'isolated'), 108 '--root-dir', os.path.join(work_dir, 'isolated'),
109 ]) 109 ])
110 if min_free_space: 110 if min_free_space:
111 cmd.extend(('--min-free-space', str(min_free_space))) 111 cmd.extend(('--min-free-space', str(min_free_space)))
112 112
113 if task_details.hard_timeout: 113 if task_details.hard_timeout:
114 cmd.extend(('--hard-timeout', str(task_details.hard_timeout))) 114 cmd.extend(('--hard-timeout', str(task_details.hard_timeout)))
115 if task_details.grace_period: 115 if task_details.grace_period:
116 cmd.extend(('--grace-period', str(task_details.grace_period))) 116 cmd.extend(('--grace-period', str(task_details.grace_period)))
117 if task_details.extra_args: 117
118 cmd.append('--') 118 cmd.append('--')
119 if task_details.command:
120 cmd.extend(task_details.command)
121 elif task_details.extra_args:
119 cmd.extend(task_details.extra_args) 122 cmd.extend(task_details.extra_args)
120 return cmd 123 return cmd
121 124
122 125
123 class TaskDetails(object): 126 class TaskDetails(object):
124 def __init__(self, data): 127 def __init__(self, data):
125 """Loads the raw data from a manifest file specified by --in-file.""" 128 """Loads the raw data from a manifest file specified by --in-file."""
126 logging.info('TaskDetails(%s)', data) 129 logging.info('TaskDetails(%s)', data)
127 if not isinstance(data, dict): 130 if not isinstance(data, dict):
128 raise ValueError('Expected dict, got %r' % data) 131 raise ValueError('Expected dict, got %r' % data)
(...skipping 163 matching lines...) Expand 10 before | Expand all | Expand 10 after
292 # TODO(maruel): This function is incomprehensible, split and refactor. 295 # TODO(maruel): This function is incomprehensible, split and refactor.
293 # Signal the command is about to be started. 296 # Signal the command is about to be started.
294 last_packet = start = now = monotonic_time() 297 last_packet = start = now = monotonic_time()
295 params = { 298 params = {
296 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., 299 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60.,
297 'id': task_details.bot_id, 300 'id': task_details.bot_id,
298 'task_id': task_details.task_id, 301 'task_id': task_details.task_id,
299 } 302 }
300 post_update(swarming_server, params, None, '', 0) 303 post_update(swarming_server, params, None, '', 0)
301 304
302 if task_details.command: 305 isolated_result = os.path.join(work_dir, 'isolated_result.json')
303 # Raw command. 306 cmd = get_isolated_cmd(
304 cmd = task_details.command 307 work_dir, task_details, isolated_result, min_free_space)
305 isolated_result = None 308 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to
306 else: 309 # give one 'grace_period' slot to the child process and one slot to upload
307 # Isolated task. 310 # the results back.
308 isolated_result = os.path.join(work_dir, 'isolated_result.json') 311 task_details.hard_timeout = 0
309 cmd = get_isolated_cmd( 312 if task_details.grace_period:
310 work_dir, task_details, isolated_result, min_free_space) 313 task_details.grace_period *= 2
311 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to
312 # give one 'grace_period' slot to the child process and one slot to upload
313 # the results back.
314 task_details.hard_timeout = 0
315 if task_details.grace_period:
316 task_details.grace_period *= 2
317 314
318 try: 315 try:
319 # TODO(maruel): Support both channels independently and display stderr in 316 # TODO(maruel): Support both channels independently and display stderr in
320 # red. 317 # red.
321 env = None 318 env = None
322 if task_details.env: 319 if task_details.env:
323 env = os.environ.copy() 320 env = os.environ.copy()
324 for key, value in task_details.env.iteritems(): 321 for key, value in task_details.env.iteritems():
325 if not value: 322 if not value:
326 env.pop(key, None) 323 env.pop(key, None)
(...skipping 23 matching lines...) Expand all
350 u'exit_code': 1, 347 u'exit_code': 1,
351 u'hard_timeout': False, 348 u'hard_timeout': False,
352 u'io_timeout': False, 349 u'io_timeout': False,
353 u'must_signal_internal_failure': None, 350 u'must_signal_internal_failure': None,
354 u'version': OUT_VERSION, 351 u'version': OUT_VERSION,
355 } 352 }
356 353
357 output_chunk_start = 0 354 output_chunk_start = 0
358 stdout = '' 355 stdout = ''
359 exit_code = None 356 exit_code = None
360 had_hard_timeout = False
361 had_io_timeout = False 357 had_io_timeout = False
362 must_signal_internal_failure = None 358 must_signal_internal_failure = None
363 kill_sent = False 359 kill_sent = False
364 timed_out = None 360 timed_out = None
365 try: 361 try:
366 calc = lambda: calc_yield_wait( 362 calc = lambda: calc_yield_wait(
367 task_details, start, last_io, timed_out, stdout) 363 task_details, start, last_io, timed_out, stdout)
368 maxsize = lambda: MAX_CHUNK_SIZE - len(stdout) 364 maxsize = lambda: MAX_CHUNK_SIZE - len(stdout)
369 last_io = monotonic_time() 365 last_io = monotonic_time()
370 for _, new_data in proc.yield_any(maxsize=maxsize, timeout=calc): 366 for _, new_data in proc.yield_any(maxsize=maxsize, timeout=calc):
(...skipping 14 matching lines...) Expand all
385 # Send signal on timeout if necessary. Both are failures, not 381 # Send signal on timeout if necessary. Both are failures, not
386 # internal_failures. 382 # internal_failures.
387 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. 383 # Eventually kill but return 0 so bot_main.py doesn't cancel the task.
388 if not timed_out: 384 if not timed_out:
389 if (task_details.io_timeout and 385 if (task_details.io_timeout and
390 now - last_io > task_details.io_timeout): 386 now - last_io > task_details.io_timeout):
391 had_io_timeout = True 387 had_io_timeout = True
392 logging.warning('I/O timeout; sending SIGTERM') 388 logging.warning('I/O timeout; sending SIGTERM')
393 proc.terminate() 389 proc.terminate()
394 timed_out = monotonic_time() 390 timed_out = monotonic_time()
395 elif (task_details.hard_timeout and
396 now - start > task_details.hard_timeout):
397 had_hard_timeout = True
398 logging.warning('Hard timeout; sending SIGTERM')
399 proc.terminate()
400 timed_out = monotonic_time()
401 else: 391 else:
402 # During grace period. 392 # During grace period.
403 if not kill_sent and now >= timed_out + task_details.grace_period: 393 if not kill_sent and now >= timed_out + task_details.grace_period:
404 # Now kill for real. The user can distinguish between the following 394 # Now kill for real. The user can distinguish between the following
405 # states: 395 # states:
406 # - signal but process exited within grace period, 396 # - signal but process exited within grace period,
407 # (hard_|io_)_timed_out will be set but the process exit code will 397 # (hard_|io_)_timed_out will be set but the process exit code will
408 # be script provided. 398 # be script provided.
409 # - processed exited late, exit code will be -9 on posix. 399 # - processed exited late, exit code will be -9 on posix.
410 logging.warning('Grace exhausted; sending SIGKILL') 400 logging.warning('Grace exhausted; sending SIGKILL')
411 proc.kill() 401 proc.kill()
412 kill_sent = True 402 kill_sent = True
413 logging.info('Waiting for proces exit') 403 logging.info('Waiting for proces exit')
414 exit_code = proc.wait() 404 exit_code = proc.wait()
415 except MustExit as e: 405 except MustExit as e:
416 # TODO(maruel): Do the send SIGTERM to child process and give it 406 # TODO(maruel): Do the send SIGTERM to child process and give it
417 # task_details.grace_period to terminate. 407 # task_details.grace_period to terminate.
418 must_signal_internal_failure = ( 408 must_signal_internal_failure = (
419 u'task_runner received signal %s' % e.signal) 409 u'task_runner received signal %s' % e.signal)
420 exit_code = kill_and_wait( 410 exit_code = kill_and_wait(
421 proc, task_details.grace_period, 'signal %d' % e.signal) 411 proc, task_details.grace_period, 'signal %d' % e.signal)
422 except (IOError, OSError): 412 except (IOError, OSError):
423 # Something wrong happened, try to kill the child process. 413 # Something wrong happened, try to kill the child process.
424 had_hard_timeout = True
425 exit_code = kill_and_wait( 414 exit_code = kill_and_wait(
426 proc, task_details.grace_period, 'exception %s' % e) 415 proc, task_details.grace_period, 'exception %s' % e)
427 416
428 # This is the very last packet for this command. It if was an isolated task, 417 # This is the very last packet for this command. It if was an isolated task,
429 # include the output reference to the archived .isolated file. 418 # include the output reference to the archived .isolated file.
430 now = monotonic_time() 419 now = monotonic_time()
431 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. 420 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60.
432 params['duration'] = now - start 421 params['duration'] = now - start
433 params['io_timeout'] = had_io_timeout 422 params['io_timeout'] = had_io_timeout
434 params['hard_timeout'] = had_hard_timeout 423 had_hard_timeout = False
435 if isolated_result: 424 try:
436 try: 425 if not os.path.isfile(isolated_result):
437 if ((had_io_timeout or had_hard_timeout) and 426 # It's possible if
438 not os.path.isfile(isolated_result)): 427 # - run_isolated.py did not start
439 # It's possible that run_isolated failed to quit quickly enough; it 428 # - run_isolated.py started, but arguments were invalid
440 # could be because there was too much data to upload back or something 429 # - host in a situation unable to fork
441 # else. Do not create an internal error, just send back the (partial) 430 # - grand child process outliving the child process deleting everything
442 # view as task_runner saw it, for example the real exit_code is 431 # it can
443 # unknown. 432 # Do not create an internal error, just send back the (partial)
444 logging.warning('TIMED_OUT and there\'s no result file') 433 # view as task_runner saw it, for example the real exit_code is
434 # unknown.
435 logging.warning('there\'s no result file')
436 if exit_code is None:
445 exit_code = -1 437 exit_code = -1
446 else: 438 else:
447 # See run_isolated.py for the format. 439 # See run_isolated.py for the format.
448 with open(isolated_result, 'rb') as f: 440 with open(isolated_result, 'rb') as f:
449 run_isolated_result = json.load(f) 441 run_isolated_result = json.load(f)
450 logging.debug('run_isolated:\n%s', run_isolated_result) 442 logging.debug('run_isolated:\n%s', run_isolated_result)
451 # TODO(maruel): Grab statistics (cache hit rate, data downloaded, 443 # TODO(maruel): Grab statistics (cache hit rate, data downloaded,
452 # mapping time, etc) from run_isolated and push them to the server. 444 # mapping time, etc) from run_isolated and push them to the server.
453 if run_isolated_result['outputs_ref']: 445 if run_isolated_result['outputs_ref']:
454 params['outputs_ref'] = run_isolated_result['outputs_ref'] 446 params['outputs_ref'] = run_isolated_result['outputs_ref']
455 had_hard_timeout = ( 447 had_hard_timeout = run_isolated_result['had_hard_timeout']
456 had_hard_timeout or run_isolated_result['had_hard_timeout']) 448 if not had_io_timeout and not had_hard_timeout:
457 params['hard_timeout'] = had_hard_timeout 449 if run_isolated_result['internal_failure']:
458 if not had_io_timeout and not had_hard_timeout: 450 must_signal_internal_failure = (
459 if run_isolated_result['internal_failure']: 451 run_isolated_result['internal_failure'])
460 must_signal_internal_failure = ( 452 logging.error('%s', must_signal_internal_failure)
461 run_isolated_result['internal_failure']) 453 elif exit_code:
462 logging.error('%s', must_signal_internal_failure) 454 # TODO(maruel): Grab stdout from run_isolated.
463 elif exit_code: 455 must_signal_internal_failure = (
464 # TODO(maruel): Grab stdout from run_isolated. 456 'run_isolated internal failure %d' % exit_code)
465 must_signal_internal_failure = ( 457 logging.error('%s', must_signal_internal_failure)
466 'run_isolated internal failure %d' % exit_code) 458 exit_code = run_isolated_result['exit_code']
467 logging.error('%s', must_signal_internal_failure) 459 if run_isolated_result.get('duration') is not None:
468 exit_code = run_isolated_result['exit_code'] 460 # Calculate the real task duration as measured by run_isolated and
469 if run_isolated_result.get('duration') is not None: 461 # calculate the remaining overhead.
470 # Calculate the real task duration as measured by run_isolated and 462 params['bot_overhead'] = params['duration']
471 # calculate the remaining overhead. 463 params['duration'] = run_isolated_result['duration']
472 params['bot_overhead'] = params['duration'] 464 params['bot_overhead'] -= params['duration']
473 params['duration'] = run_isolated_result['duration'] 465 params['bot_overhead'] -= run_isolated_result.get(
474 params['bot_overhead'] -= params['duration'] 466 'download', {}).get('duration', 0)
475 params['bot_overhead'] -= run_isolated_result.get( 467 params['bot_overhead'] -= run_isolated_result.get(
476 'download', {}).get('duration', 0) 468 'upload', {}).get('duration', 0)
477 params['bot_overhead'] -= run_isolated_result.get( 469 if params['bot_overhead'] < 0:
478 'upload', {}).get('duration', 0) 470 params['bot_overhead'] = 0
479 if params['bot_overhead'] < 0: 471 stats = run_isolated_result.get('stats')
480 params['bot_overhead'] = 0 472 if stats:
481 stats = run_isolated_result.get('stats') 473 params['isolated_stats'] = stats
482 if stats: 474 except (IOError, OSError, ValueError) as e:
483 params['isolated_stats'] = stats 475 logging.error('Swallowing error: %s', e)
484 except (IOError, OSError, ValueError) as e: 476 if not must_signal_internal_failure:
485 logging.error('Swallowing error: %s', e) 477 must_signal_internal_failure = str(e)
486 if not must_signal_internal_failure:
487 must_signal_internal_failure = str(e)
488 # TODO(maruel): Send the internal failure here instead of sending it through 478 # TODO(maruel): Send the internal failure here instead of sending it through
489 # bot_main, this causes a race condition. 479 # bot_main, this causes a race condition.
490 if exit_code is None: 480 if exit_code is None:
491 exit_code = -1 481 exit_code = -1
482 params['hard_timeout'] = had_hard_timeout
492 post_update(swarming_server, params, exit_code, stdout, output_chunk_start) 483 post_update(swarming_server, params, exit_code, stdout, output_chunk_start)
493 return { 484 return {
494 u'exit_code': exit_code, 485 u'exit_code': exit_code,
495 u'hard_timeout': had_hard_timeout, 486 u'hard_timeout': had_hard_timeout,
496 u'io_timeout': had_io_timeout, 487 u'io_timeout': had_io_timeout,
497 u'must_signal_internal_failure': must_signal_internal_failure, 488 u'must_signal_internal_failure': must_signal_internal_failure,
498 u'version': OUT_VERSION, 489 u'version': OUT_VERSION,
499 } 490 }
500 finally: 491 finally:
501 if isolated_result: 492 try:
502 try: 493 os.remove(isolated_result)
503 os.remove(isolated_result) 494 except OSError:
504 except OSError: 495 pass
505 pass
506 496
507 497
508 def main(args): 498 def main(args):
509 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) 499 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
510 parser.add_option('--in-file', help='Name of the request file') 500 parser.add_option('--in-file', help='Name of the request file')
511 parser.add_option( 501 parser.add_option(
512 '--out-file', help='Name of the JSON file to write a task summary to') 502 '--out-file', help='Name of the JSON file to write a task summary to')
513 parser.add_option( 503 parser.add_option(
514 '--swarming-server', help='Swarming server to send data back') 504 '--swarming-server', help='Swarming server to send data back')
515 parser.add_option( 505 parser.add_option(
(...skipping 14 matching lines...) Expand all
530 if options.start > now: 520 if options.start > now:
531 options.start = now 521 options.start = now
532 522
533 try: 523 try:
534 load_and_run( 524 load_and_run(
535 options.in_file, options.swarming_server, options.cost_usd_hour, 525 options.in_file, options.swarming_server, options.cost_usd_hour,
536 options.start, options.out_file, options.min_free_space) 526 options.start, options.out_file, options.min_free_space)
537 return 0 527 return 0
538 finally: 528 finally:
539 logging.info('quitting') 529 logging.info('quitting')
OLDNEW
« no previous file with comments | « appengine/swarming/local_smoke_test.py ('k') | appengine/swarming/swarming_bot/bot_code/task_runner_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698