| OLD | NEW |
| 1 # Copyright 2013 The LUCI Authors. All rights reserved. | 1 # Copyright 2013 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed by the Apache v2.0 license that can be | 2 # Use of this source code is governed by the Apache v2.0 license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """Runs a Swarming task. | 5 """Runs a Swarming task. |
| 6 | 6 |
| 7 Downloads all the necessary files to run the task, executes the command and | 7 Downloads all the necessary files to run the task, executes the command and |
| 8 streams results back to the Swarming server. | 8 streams results back to the Swarming server. |
| 9 | 9 |
| 10 The process exit code is 0 when the task was executed, even if the task itself | 10 The process exit code is 0 when the task was executed, even if the task itself |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 77 return [sys.executable, THIS_FILE, 'run_isolated'] | 77 return [sys.executable, THIS_FILE, 'run_isolated'] |
| 78 | 78 |
| 79 | 79 |
| 80 def get_isolated_cmd( | 80 def get_isolated_cmd( |
| 81 work_dir, task_details, isolated_result, min_free_space): | 81 work_dir, task_details, isolated_result, min_free_space): |
| 82 """Returns the command to call run_isolated. Mocked in tests.""" | 82 """Returns the command to call run_isolated. Mocked in tests.""" |
| 83 bot_dir = os.path.dirname(work_dir) | 83 bot_dir = os.path.dirname(work_dir) |
| 84 if os.path.isfile(isolated_result): | 84 if os.path.isfile(isolated_result): |
| 85 os.remove(isolated_result) | 85 os.remove(isolated_result) |
| 86 cmd = get_run_isolated() | 86 cmd = get_run_isolated() |
| 87 if task_details.inputs_ref: |
| 88 cmd.extend( |
| 89 [ |
| 90 '--isolated', task_details.inputs_ref['isolated'].encode('utf-8'), |
| 91 '--namespace', task_details.inputs_ref['namespace'].encode('utf-8'), |
| 92 '-I', task_details.inputs_ref['isolatedserver'].encode('utf-8'), |
| 93 ]) |
| 94 |
| 87 cmd.extend( | 95 cmd.extend( |
| 88 [ | 96 [ |
| 89 '--isolated', task_details.inputs_ref['isolated'].encode('utf-8'), | |
| 90 '--namespace', task_details.inputs_ref['namespace'].encode('utf-8'), | |
| 91 '-I', task_details.inputs_ref['isolatedserver'].encode('utf-8'), | |
| 92 '--json', isolated_result, | 97 '--json', isolated_result, |
| 93 '--log-file', os.path.join(bot_dir, 'logs', 'run_isolated.log'), | 98 '--log-file', os.path.join(bot_dir, 'logs', 'run_isolated.log'), |
| 94 '--cache', os.path.join(bot_dir, 'cache'), | 99 '--cache', os.path.join(bot_dir, 'cache'), |
| 95 '--root-dir', os.path.join(work_dir, 'isolated'), | 100 '--root-dir', os.path.join(work_dir, 'isolated'), |
| 96 ]) | 101 ]) |
| 97 if min_free_space: | 102 if min_free_space: |
| 98 cmd.extend(('--min-free-space', str(min_free_space))) | 103 cmd.extend(('--min-free-space', str(min_free_space))) |
| 99 | 104 |
| 100 if task_details.hard_timeout: | 105 if task_details.hard_timeout: |
| 101 cmd.extend(('--hard-timeout', str(task_details.hard_timeout))) | 106 cmd.extend(('--hard-timeout', str(task_details.hard_timeout))) |
| 102 if task_details.grace_period: | 107 if task_details.grace_period: |
| 103 cmd.extend(('--grace-period', str(task_details.grace_period))) | 108 cmd.extend(('--grace-period', str(task_details.grace_period))) |
| 104 if task_details.extra_args: | 109 |
| 105 cmd.append('--') | 110 cmd.append('--') |
| 111 if task_details.command: |
| 112 cmd.extend(task_details.command) |
| 113 elif task_details.extra_args: |
| 106 cmd.extend(task_details.extra_args) | 114 cmd.extend(task_details.extra_args) |
| 107 return cmd | 115 return cmd |
| 108 | 116 |
| 109 | 117 |
| 110 class TaskDetails(object): | 118 class TaskDetails(object): |
| 111 def __init__(self, data): | 119 def __init__(self, data): |
| 112 """Loads the raw data. | 120 """Loads the raw data. |
| 113 | 121 |
| 114 It is expected to have at least: | 122 It is expected to have at least: |
| 115 - bot_id | 123 - bot_id |
| (...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 289 # TODO(maruel): This function is incomprehensible, split and refactor. | 297 # TODO(maruel): This function is incomprehensible, split and refactor. |
| 290 # Signal the command is about to be started. | 298 # Signal the command is about to be started. |
| 291 last_packet = start = now = monotonic_time() | 299 last_packet = start = now = monotonic_time() |
| 292 params = { | 300 params = { |
| 293 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., | 301 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., |
| 294 'id': task_details.bot_id, | 302 'id': task_details.bot_id, |
| 295 'task_id': task_details.task_id, | 303 'task_id': task_details.task_id, |
| 296 } | 304 } |
| 297 post_update(swarming_server, params, None, '', 0) | 305 post_update(swarming_server, params, None, '', 0) |
| 298 | 306 |
| 299 if task_details.command: | 307 isolated_result = os.path.join(work_dir, 'isolated_result.json') |
| 300 # Raw command. | 308 cmd = get_isolated_cmd( |
| 301 cmd = task_details.command | 309 work_dir, task_details, isolated_result, min_free_space) |
| 302 isolated_result = None | 310 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to |
| 303 else: | 311 # give one 'grace_period' slot to the child process and one slot to upload |
| 304 # Isolated task. | 312 # the results back. |
| 305 isolated_result = os.path.join(work_dir, 'isolated_result.json') | 313 task_details.hard_timeout = 0 |
| 306 cmd = get_isolated_cmd( | 314 if task_details.grace_period: |
| 307 work_dir, task_details, isolated_result, min_free_space) | 315 task_details.grace_period *= 2 |
| 308 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to | |
| 309 # give one 'grace_period' slot to the child process and one slot to upload | |
| 310 # the results back. | |
| 311 task_details.hard_timeout = 0 | |
| 312 if task_details.grace_period: | |
| 313 task_details.grace_period *= 2 | |
| 314 | 316 |
| 315 try: | 317 try: |
| 316 # TODO(maruel): Support both channels independently and display stderr in | 318 # TODO(maruel): Support both channels independently and display stderr in |
| 317 # red. | 319 # red. |
| 318 env = None | 320 env = None |
| 319 if task_details.env: | 321 if task_details.env: |
| 320 env = os.environ.copy() | 322 env = os.environ.copy() |
| 321 for key, value in task_details.env.iteritems(): | 323 for key, value in task_details.env.iteritems(): |
| 322 if not value: | 324 if not value: |
| 323 env.pop(key, None) | 325 env.pop(key, None) |
| (...skipping 22 matching lines...) Expand all Loading... |
| 346 u'exit_code': 1, | 348 u'exit_code': 1, |
| 347 u'hard_timeout': False, | 349 u'hard_timeout': False, |
| 348 u'io_timeout': False, | 350 u'io_timeout': False, |
| 349 u'must_signal_internal_failure': None, | 351 u'must_signal_internal_failure': None, |
| 350 u'version': OUT_VERSION, | 352 u'version': OUT_VERSION, |
| 351 } | 353 } |
| 352 | 354 |
| 353 output_chunk_start = 0 | 355 output_chunk_start = 0 |
| 354 stdout = '' | 356 stdout = '' |
| 355 exit_code = None | 357 exit_code = None |
| 356 had_hard_timeout = False | |
| 357 had_io_timeout = False | 358 had_io_timeout = False |
| 358 must_signal_internal_failure = None | 359 must_signal_internal_failure = None |
| 359 kill_sent = False | 360 kill_sent = False |
| 360 timed_out = None | 361 timed_out = None |
| 361 try: | 362 try: |
| 362 calc = lambda: calc_yield_wait( | 363 calc = lambda: calc_yield_wait( |
| 363 task_details, start, last_io, timed_out, stdout) | 364 task_details, start, last_io, timed_out, stdout) |
| 364 maxsize = lambda: MAX_CHUNK_SIZE - len(stdout) | 365 maxsize = lambda: MAX_CHUNK_SIZE - len(stdout) |
| 365 last_io = monotonic_time() | 366 last_io = monotonic_time() |
| 366 for _, new_data in proc.yield_any(maxsize=maxsize, timeout=calc): | 367 for _, new_data in proc.yield_any(maxsize=maxsize, timeout=calc): |
| (...skipping 14 matching lines...) Expand all Loading... |
| 381 # Send signal on timeout if necessary. Both are failures, not | 382 # Send signal on timeout if necessary. Both are failures, not |
| 382 # internal_failures. | 383 # internal_failures. |
| 383 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. | 384 # Eventually kill but return 0 so bot_main.py doesn't cancel the task. |
| 384 if not timed_out: | 385 if not timed_out: |
| 385 if (task_details.io_timeout and | 386 if (task_details.io_timeout and |
| 386 now - last_io > task_details.io_timeout): | 387 now - last_io > task_details.io_timeout): |
| 387 had_io_timeout = True | 388 had_io_timeout = True |
| 388 logging.warning('I/O timeout; sending SIGTERM') | 389 logging.warning('I/O timeout; sending SIGTERM') |
| 389 proc.terminate() | 390 proc.terminate() |
| 390 timed_out = monotonic_time() | 391 timed_out = monotonic_time() |
| 391 elif (task_details.hard_timeout and | |
| 392 now - start > task_details.hard_timeout): | |
| 393 had_hard_timeout = True | |
| 394 logging.warning('Hard timeout; sending SIGTERM') | |
| 395 proc.terminate() | |
| 396 timed_out = monotonic_time() | |
| 397 else: | 392 else: |
| 398 # During grace period. | 393 # During grace period. |
| 399 if not kill_sent and now >= timed_out + task_details.grace_period: | 394 if not kill_sent and now >= timed_out + task_details.grace_period: |
| 400 # Now kill for real. The user can distinguish between the following | 395 # Now kill for real. The user can distinguish between the following |
| 401 # states: | 396 # states: |
| 402 # - signal but process exited within grace period, | 397 # - signal but process exited within grace period, |
| 403 # (hard_|io_)_timed_out will be set but the process exit code will | 398 # (hard_|io_)_timed_out will be set but the process exit code will |
| 404 # be script provided. | 399 # be script provided. |
| 405 # - processed exited late, exit code will be -9 on posix. | 400 # - processed exited late, exit code will be -9 on posix. |
| 406 logging.warning('Grace exhausted; sending SIGKILL') | 401 logging.warning('Grace exhausted; sending SIGKILL') |
| 407 proc.kill() | 402 proc.kill() |
| 408 kill_sent = True | 403 kill_sent = True |
| 409 logging.info('Waiting for proces exit') | 404 logging.info('Waiting for proces exit') |
| 410 exit_code = proc.wait() | 405 exit_code = proc.wait() |
| 411 except MustExit as e: | 406 except MustExit as e: |
| 412 # TODO(maruel): Do the send SIGTERM to child process and give it | 407 # TODO(maruel): Do the send SIGTERM to child process and give it |
| 413 # task_details.grace_period to terminate. | 408 # task_details.grace_period to terminate. |
| 414 must_signal_internal_failure = ( | 409 must_signal_internal_failure = ( |
| 415 u'task_runner received signal %s' % e.signal) | 410 u'task_runner received signal %s' % e.signal) |
| 416 exit_code = kill_and_wait( | 411 exit_code = kill_and_wait( |
| 417 proc, task_details.grace_period, 'signal %d' % e.signal) | 412 proc, task_details.grace_period, 'signal %d' % e.signal) |
| 418 except (IOError, OSError): | 413 except (IOError, OSError): |
| 419 # Something wrong happened, try to kill the child process. | 414 # Something wrong happened, try to kill the child process. |
| 420 had_hard_timeout = True | |
| 421 exit_code = kill_and_wait( | 415 exit_code = kill_and_wait( |
| 422 proc, task_details.grace_period, 'exception %s' % e) | 416 proc, task_details.grace_period, 'exception %s' % e) |
| 423 | 417 |
| 424 # This is the very last packet for this command. It if was an isolated task, | 418 # This is the very last packet for this command. It if was an isolated task, |
| 425 # include the output reference to the archived .isolated file. | 419 # include the output reference to the archived .isolated file. |
| 426 now = monotonic_time() | 420 now = monotonic_time() |
| 427 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. | 421 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. |
| 428 params['duration'] = now - start | 422 params['duration'] = now - start |
| 429 params['io_timeout'] = had_io_timeout | 423 params['io_timeout'] = had_io_timeout |
| 430 params['hard_timeout'] = had_hard_timeout | 424 had_hard_timeout = False |
| 431 if isolated_result: | 425 try: |
| 432 try: | 426 if not os.path.isfile(isolated_result): |
| 433 if ((had_io_timeout or had_hard_timeout) and | 427 # It's possible if |
| 434 not os.path.isfile(isolated_result)): | 428 # - run_isolated.py did not start |
| 435 # It's possible that run_isolated failed to quit quickly enough; it | 429 # - run_isolated.py started, but arguments were invalid |
| 436 # could be because there was too much data to upload back or something | 430 # - host in a situation unable to fork |
| 437 # else. Do not create an internal error, just send back the (partial) | 431 # - grand child process outliving the child process deleting everything |
| 438 # view as task_runner saw it, for example the real exit_code is | 432 # it can |
| 439 # unknown. | 433 # Do not create an internal error, just send back the (partial) |
| 440 logging.warning('TIMED_OUT and there\'s no result file') | 434 # view as task_runner saw it, for example the real exit_code is |
| 435 # unknown. |
| 436 logging.warning('there\'s no result file') |
| 437 if exit_code is None: |
| 441 exit_code = -1 | 438 exit_code = -1 |
| 442 else: | 439 else: |
| 443 # See run_isolated.py for the format. | 440 # See run_isolated.py for the format. |
| 444 with open(isolated_result, 'rb') as f: | 441 with open(isolated_result, 'rb') as f: |
| 445 run_isolated_result = json.load(f) | 442 run_isolated_result = json.load(f) |
| 446 logging.debug('run_isolated:\n%s', run_isolated_result) | 443 logging.debug('run_isolated:\n%s', run_isolated_result) |
| 447 # TODO(maruel): Grab statistics (cache hit rate, data downloaded, | 444 # TODO(maruel): Grab statistics (cache hit rate, data downloaded, |
| 448 # mapping time, etc) from run_isolated and push them to the server. | 445 # mapping time, etc) from run_isolated and push them to the server. |
| 449 if run_isolated_result['outputs_ref']: | 446 if run_isolated_result['outputs_ref']: |
| 450 params['outputs_ref'] = run_isolated_result['outputs_ref'] | 447 params['outputs_ref'] = run_isolated_result['outputs_ref'] |
| 451 had_hard_timeout = ( | 448 had_hard_timeout = run_isolated_result['had_hard_timeout'] |
| 452 had_hard_timeout or run_isolated_result['had_hard_timeout']) | 449 if not had_io_timeout and not had_hard_timeout: |
| 453 params['hard_timeout'] = had_hard_timeout | 450 if run_isolated_result['internal_failure']: |
| 454 if not had_io_timeout and not had_hard_timeout: | 451 must_signal_internal_failure = ( |
| 455 if run_isolated_result['internal_failure']: | 452 run_isolated_result['internal_failure']) |
| 456 must_signal_internal_failure = ( | 453 logging.error('%s', must_signal_internal_failure) |
| 457 run_isolated_result['internal_failure']) | 454 elif exit_code: |
| 458 logging.error('%s', must_signal_internal_failure) | 455 # TODO(maruel): Grab stdout from run_isolated. |
| 459 elif exit_code: | 456 must_signal_internal_failure = ( |
| 460 # TODO(maruel): Grab stdout from run_isolated. | 457 'run_isolated internal failure %d' % exit_code) |
| 461 must_signal_internal_failure = ( | 458 logging.error('%s', must_signal_internal_failure) |
| 462 'run_isolated internal failure %d' % exit_code) | 459 exit_code = run_isolated_result['exit_code'] |
| 463 logging.error('%s', must_signal_internal_failure) | 460 if run_isolated_result.get('duration') is not None: |
| 464 exit_code = run_isolated_result['exit_code'] | 461 # Calculate the real task duration as measured by run_isolated and |
| 465 if run_isolated_result.get('duration') is not None: | 462 # calculate the remaining overhead. |
| 466 # Calculate the real task duration as measured by run_isolated and | 463 params['bot_overhead'] = params['duration'] |
| 467 # calculate the remaining overhead. | 464 params['duration'] = run_isolated_result['duration'] |
| 468 params['bot_overhead'] = params['duration'] | 465 params['bot_overhead'] -= params['duration'] |
| 469 params['duration'] = run_isolated_result['duration'] | 466 params['bot_overhead'] -= run_isolated_result.get( |
| 470 params['bot_overhead'] -= params['duration'] | 467 'download', {}).get('duration', 0) |
| 471 params['bot_overhead'] -= run_isolated_result.get( | 468 params['bot_overhead'] -= run_isolated_result.get( |
| 472 'download', {}).get('duration', 0) | 469 'upload', {}).get('duration', 0) |
| 473 params['bot_overhead'] -= run_isolated_result.get( | 470 if params['bot_overhead'] < 0: |
| 474 'upload', {}).get('duration', 0) | 471 params['bot_overhead'] = 0 |
| 475 if params['bot_overhead'] < 0: | 472 stats = run_isolated_result.get('stats') |
| 476 params['bot_overhead'] = 0 | 473 if stats: |
| 477 stats = run_isolated_result.get('stats') | 474 params['isolated_stats'] = stats |
| 478 if stats: | 475 except (IOError, OSError, ValueError) as e: |
| 479 params['isolated_stats'] = stats | 476 logging.error('Swallowing error: %s', e) |
| 480 except (IOError, OSError, ValueError) as e: | 477 if not must_signal_internal_failure: |
| 481 logging.error('Swallowing error: %s', e) | 478 must_signal_internal_failure = str(e) |
| 482 if not must_signal_internal_failure: | |
| 483 must_signal_internal_failure = str(e) | |
| 484 # TODO(maruel): Send the internal failure here instead of sending it through | 479 # TODO(maruel): Send the internal failure here instead of sending it through |
| 485 # bot_main, this causes a race condition. | 480 # bot_main, this causes a race condition. |
| 486 if exit_code is None: | 481 if exit_code is None: |
| 487 exit_code = -1 | 482 exit_code = -1 |
| 483 params['hard_timeout'] = had_hard_timeout |
| 488 post_update(swarming_server, params, exit_code, stdout, output_chunk_start) | 484 post_update(swarming_server, params, exit_code, stdout, output_chunk_start) |
| 489 return { | 485 return { |
| 490 u'exit_code': exit_code, | 486 u'exit_code': exit_code, |
| 491 u'hard_timeout': had_hard_timeout, | 487 u'hard_timeout': had_hard_timeout, |
| 492 u'io_timeout': had_io_timeout, | 488 u'io_timeout': had_io_timeout, |
| 493 u'must_signal_internal_failure': must_signal_internal_failure, | 489 u'must_signal_internal_failure': must_signal_internal_failure, |
| 494 u'version': OUT_VERSION, | 490 u'version': OUT_VERSION, |
| 495 } | 491 } |
| 496 finally: | 492 finally: |
| 497 if isolated_result: | 493 try: |
| 498 try: | 494 os.remove(isolated_result) |
| 499 os.remove(isolated_result) | 495 except OSError: |
| 500 except OSError: | 496 pass |
| 501 pass | |
| 502 | 497 |
| 503 | 498 |
| 504 def main(args): | 499 def main(args): |
| 505 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) | 500 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) |
| 506 parser.add_option('--in-file', help='Name of the request file') | 501 parser.add_option('--in-file', help='Name of the request file') |
| 507 parser.add_option( | 502 parser.add_option( |
| 508 '--out-file', help='Name of the JSON file to write a task summary to') | 503 '--out-file', help='Name of the JSON file to write a task summary to') |
| 509 parser.add_option( | 504 parser.add_option( |
| 510 '--swarming-server', help='Swarming server to send data back') | 505 '--swarming-server', help='Swarming server to send data back') |
| 511 parser.add_option( | 506 parser.add_option( |
| (...skipping 14 matching lines...) Expand all Loading... |
| 526 if options.start > now: | 521 if options.start > now: |
| 527 options.start = now | 522 options.start = now |
| 528 | 523 |
| 529 try: | 524 try: |
| 530 load_and_run( | 525 load_and_run( |
| 531 options.in_file, options.swarming_server, options.cost_usd_hour, | 526 options.in_file, options.swarming_server, options.cost_usd_hour, |
| 532 options.start, options.out_file, options.min_free_space) | 527 options.start, options.out_file, options.min_free_space) |
| 533 return 0 | 528 return 0 |
| 534 finally: | 529 finally: |
| 535 logging.info('quitting') | 530 logging.info('quitting') |
| OLD | NEW |