| OLD | NEW |
| 1 # Copyright 2013 The LUCI Authors. All rights reserved. | 1 # Copyright 2013 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Runs a Swarming task. | 5 """Runs a Swarming task. |
| 6 | 6 |
| 7 Downloads all the necessary files to run the task, executes the command and | 7 Downloads all the necessary files to run the task, executes the command and |
| 8 streams results back to the Swarming server. | 8 streams results back to the Swarming server. |
| 9 | 9 |
| 10 The process exit code is 0 when the task was executed, even if the task itself | 10 The process exit code is 0 when the task was executed, even if the task itself |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 77 | 77 |
| 78 | 78 |
| 79 def get_run_isolated(): | 79 def get_run_isolated(): |
| 80 """Returns the path to itself to run run_isolated. | 80 """Returns the path to itself to run run_isolated. |
| 81 | 81 |
| 82 Mocked in test to point to the real run_isolated.py script. | 82 Mocked in test to point to the real run_isolated.py script. |
| 83 """ | 83 """ |
| 84 return [sys.executable, THIS_FILE, 'run_isolated'] | 84 return [sys.executable, THIS_FILE, 'run_isolated'] |
| 85 | 85 |
| 86 | 86 |
| 87 def get_isolated_cmd( | 87 def get_isolated_args( |
| 88 work_dir, task_details, isolated_result, bot_file, min_free_space): | 88 work_dir, task_details, isolated_result, bot_file, min_free_space): |
| 89 """Returns the command to call run_isolated. Mocked in tests.""" | 89 """Returns the command to call run_isolated. Mocked in tests.""" |
| 90 assert (bool(task_details.command) != | 90 assert (bool(task_details.command) != |
| 91 bool(task_details.isolated and task_details.isolated.get('input'))) | 91 bool(task_details.isolated and task_details.isolated.get('input'))) |
| 92 bot_dir = os.path.dirname(work_dir) | 92 bot_dir = os.path.dirname(work_dir) |
| 93 if os.path.isfile(isolated_result): | 93 if os.path.isfile(isolated_result): |
| 94 os.remove(isolated_result) | 94 os.remove(isolated_result) |
| 95 cmd = get_run_isolated() | 95 cmd = [] |
| 96 | 96 |
| 97 if task_details.isolated: | 97 if task_details.isolated: |
| 98 cmd.extend( | 98 cmd.extend( |
| 99 [ | 99 [ |
| 100 '-I', task_details.isolated['server'].encode('utf-8'), | 100 '-I', task_details.isolated['server'].encode('utf-8'), |
| 101 '--namespace', task_details.isolated['namespace'].encode('utf-8'), | 101 '--namespace', task_details.isolated['namespace'].encode('utf-8'), |
| 102 ]) | 102 ]) |
| 103 isolated_input = task_details.isolated.get('input') | 103 isolated_input = task_details.isolated.get('input') |
| 104 if isolated_input: | 104 if isolated_input: |
| 105 cmd.extend( | 105 cmd.extend( |
| (...skipping 227 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 333 try: | 333 try: |
| 334 proc.wait(grace_period) | 334 proc.wait(grace_period) |
| 335 except subprocess42.TimeoutError: | 335 except subprocess42.TimeoutError: |
| 336 logging.warning('SIGKILL finally due to %s', reason) | 336 logging.warning('SIGKILL finally due to %s', reason) |
| 337 proc.kill() | 337 proc.kill() |
| 338 exit_code = proc.wait() | 338 exit_code = proc.wait() |
| 339 logging.info('Waiting for process exit in finally - done') | 339 logging.info('Waiting for process exit in finally - done') |
| 340 return exit_code | 340 return exit_code |
| 341 | 341 |
| 342 | 342 |
| 343 def fail_without_command(remote, bot_id, task_id, params, cost_usd_hour, |
| 344 task_start, exit_code, stdout): |
| 345 now = monotonic_time() |
| 346 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. |
| 347 params['duration'] = now - task_start |
| 348 params['io_timeout'] = False |
| 349 params['hard_timeout'] = False |
| 350 # Ignore server reply to stop. |
| 351 remote.post_task_update(task_id, bot_id, params, (stdout, 0), 1) |
| 352 return { |
| 353 u'exit_code': exit_code, |
| 354 u'hard_timeout': False, |
| 355 u'io_timeout': False, |
| 356 u'must_signal_internal_failure': None, |
| 357 u'version': OUT_VERSION, |
| 358 } |
| 359 |
| 360 |
| 343 def run_command(remote, task_details, work_dir, cost_usd_hour, | 361 def run_command(remote, task_details, work_dir, cost_usd_hour, |
| 344 task_start, min_free_space, bot_file): | 362 task_start, min_free_space, bot_file): |
| 345 """Runs a command and sends packets to the server to stream results back. | 363 """Runs a command and sends packets to the server to stream results back. |
| 346 | 364 |
| 347 Implements both I/O and hard timeouts. Sends the packets numbered, so the | 365 Implements both I/O and hard timeouts. Sends the packets numbered, so the |
| 348 server can ensure they are processed in order. | 366 server can ensure they are processed in order. |
| 349 | 367 |
| 350 Returns: | 368 Returns: |
| 351 Metadata dict with the execution result. | 369 Metadata dict with the execution result. |
| 352 | 370 |
| (...skipping 14 matching lines...) Expand all Loading... |
| 367 # Don't even bother, the task was already canceled. | 385 # Don't even bother, the task was already canceled. |
| 368 return { | 386 return { |
| 369 u'exit_code': -1, | 387 u'exit_code': -1, |
| 370 u'hard_timeout': False, | 388 u'hard_timeout': False, |
| 371 u'io_timeout': False, | 389 u'io_timeout': False, |
| 372 u'must_signal_internal_failure': None, | 390 u'must_signal_internal_failure': None, |
| 373 u'version': OUT_VERSION, | 391 u'version': OUT_VERSION, |
| 374 } | 392 } |
| 375 | 393 |
| 376 isolated_result = os.path.join(work_dir, 'isolated_result.json') | 394 isolated_result = os.path.join(work_dir, 'isolated_result.json') |
| 377 cmd = get_isolated_cmd( | 395 args_path = os.path.join(work_dir, 'run_isolated_args.json') |
| 396 cmd = get_run_isolated() |
| 397 cmd.extend(['-a', args_path]) |
| 398 args = get_isolated_args( |
| 378 work_dir, task_details, isolated_result, bot_file, min_free_space) | 399 work_dir, task_details, isolated_result, bot_file, min_free_space) |
| 379 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to | 400 # Hard timeout enforcement is deferred to run_isolated. Grace is doubled to |
| 380 # give one 'grace_period' slot to the child process and one slot to upload | 401 # give one 'grace_period' slot to the child process and one slot to upload |
| 381 # the results back. | 402 # the results back. |
| 382 task_details.hard_timeout = 0 | 403 task_details.hard_timeout = 0 |
| 383 if task_details.grace_period: | 404 if task_details.grace_period: |
| 384 task_details.grace_period *= 2 | 405 task_details.grace_period *= 2 |
| 385 | 406 |
| 386 try: | 407 try: |
| 387 # TODO(maruel): Support both channels independently and display stderr in | 408 # TODO(maruel): Support both channels independently and display stderr in |
| 388 # red. | 409 # red. |
| 389 env = os.environ.copy() | 410 env = os.environ.copy() |
| 390 for key, value in (task_details.env or {}).iteritems(): | 411 for key, value in (task_details.env or {}).iteritems(): |
| 391 if not value: | 412 if not value: |
| 392 env.pop(key, None) | 413 env.pop(key, None) |
| 393 else: | 414 else: |
| 394 env[key] = value | 415 env[key] = value |
| 395 logging.info('cmd=%s', cmd) | 416 logging.info('cmd=%s', cmd) |
| 396 logging.info('cwd=%s', work_dir) | 417 logging.info('cwd=%s', work_dir) |
| 397 logging.info('env=%s', env) | 418 logging.info('env=%s', env) |
| 419 fail_on_start = lambda exit_code, stdout: fail_without_command( |
| 420 remote, bot_id, task_id, params, cost_usd_hour, task_start, |
| 421 exit_code, stdout) |
| 422 |
| 423 # We write args to a file since there may be more of them than the OS |
| 424 # can handle. |
| 425 try: |
| 426 with open(args_path, 'w') as f: |
| 427 json.dump(args, f) |
| 428 except (IOError, OSError) as e: |
| 429 return fail_on_start( |
| 430 -1, |
| 431 'Could not write args to %s: %s' % (args_path, e)) |
| 432 |
| 433 # Start the command |
| 398 try: | 434 try: |
| 399 assert cmd and all(isinstance(a, basestring) for a in cmd) | 435 assert cmd and all(isinstance(a, basestring) for a in cmd) |
| 400 proc = subprocess42.Popen( | 436 proc = subprocess42.Popen( |
| 401 cmd, | 437 cmd, |
| 402 env=env, | 438 env=env, |
| 403 cwd=work_dir, | 439 cwd=work_dir, |
| 404 detached=True, | 440 detached=True, |
| 405 stdout=subprocess42.PIPE, | 441 stdout=subprocess42.PIPE, |
| 406 stderr=subprocess42.STDOUT, | 442 stderr=subprocess42.STDOUT, |
| 407 stdin=subprocess42.PIPE) | 443 stdin=subprocess42.PIPE) |
| 408 except OSError as e: | 444 except OSError as e: |
| 409 stdout = 'Command "%s" failed to start.\nError: %s' % (' '.join(cmd), e) | 445 return fail_on_start( |
| 410 now = monotonic_time() | 446 1, |
| 411 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. | 447 'Command "%s" failed to start.\nError: %s' % (' '.join(cmd), e)) |
| 412 params['duration'] = now - start | |
| 413 params['io_timeout'] = False | |
| 414 params['hard_timeout'] = False | |
| 415 # Ignore server reply to stop. | |
| 416 remote.post_task_update(task_id, bot_id, params, (stdout, 0), 1) | |
| 417 return { | |
| 418 u'exit_code': 1, | |
| 419 u'hard_timeout': False, | |
| 420 u'io_timeout': False, | |
| 421 u'must_signal_internal_failure': None, | |
| 422 u'version': OUT_VERSION, | |
| 423 } | |
| 424 | 448 |
| 449 # Monitor the task |
| 425 output_chunk_start = 0 | 450 output_chunk_start = 0 |
| 426 stdout = '' | 451 stdout = '' |
| 427 exit_code = None | 452 exit_code = None |
| 428 had_io_timeout = False | 453 had_io_timeout = False |
| 429 must_signal_internal_failure = None | 454 must_signal_internal_failure = None |
| 430 kill_sent = False | 455 kill_sent = False |
| 431 timed_out = None | 456 timed_out = None |
| 432 try: | 457 try: |
| 433 calc = lambda: calc_yield_wait( | 458 calc = lambda: calc_yield_wait( |
| 434 task_details, start, last_io, timed_out, stdout) | 459 task_details, start, last_io, timed_out, stdout) |
| (...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 621 options.start = now | 646 options.start = now |
| 622 | 647 |
| 623 try: | 648 try: |
| 624 load_and_run( | 649 load_and_run( |
| 625 options.in_file, options.swarming_server, options.cost_usd_hour, | 650 options.in_file, options.swarming_server, options.cost_usd_hour, |
| 626 options.start, options.out_file, options.min_free_space, | 651 options.start, options.out_file, options.min_free_space, |
| 627 options.bot_file, options.auth_params_file) | 652 options.bot_file, options.auth_params_file) |
| 628 return 0 | 653 return 0 |
| 629 finally: | 654 finally: |
| 630 logging.info('quitting') | 655 logging.info('quitting') |
| OLD | NEW |