| OLD | NEW |
| 1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """High level tasks execution scheduling API. | 5 """High level tasks execution scheduling API. |
| 6 | 6 |
| 7 This is the interface closest to the HTTP handlers. | 7 This is the interface closest to the HTTP handlers. |
| 8 """ | 8 """ |
| 9 | 9 |
| 10 import contextlib | 10 import contextlib |
| (...skipping 537 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 548 user=request.user) | 548 user=request.user) |
| 549 return request, run_result | 549 return request, run_result |
| 550 if failures: | 550 if failures: |
| 551 logging.info( | 551 logging.info( |
| 552 'Chose nothing (failed %d, skipped %d)', failures, total_skipped) | 552 'Chose nothing (failed %d, skipped %d)', failures, total_skipped) |
| 553 return None, None | 553 return None, None |
| 554 | 554 |
| 555 | 555 |
| 556 def bot_update_task( | 556 def bot_update_task( |
| 557 run_result_key, bot_id, output, output_chunk_start, exit_code, duration, | 557 run_result_key, bot_id, output, output_chunk_start, exit_code, duration, |
| 558 hard_timeout, io_timeout, cost_usd, outputs_ref, performance_stats): | 558 hard_timeout, io_timeout, cost_usd, outputs_ref, cipd_pins, |
| 559 performance_stats): |
| 559 """Updates a TaskRunResult and TaskResultSummary, along TaskOutput. | 560 """Updates a TaskRunResult and TaskResultSummary, along TaskOutput. |
| 560 | 561 |
| 561 Arguments: | 562 Arguments: |
| 562 - run_result_key: ndb.Key to TaskRunResult. | 563 - run_result_key: ndb.Key to TaskRunResult. |
| 563 - bot_id: Self advertised bot id to ensure it's the one expected. | 564 - bot_id: Self advertised bot id to ensure it's the one expected. |
| 564 - output: Data to append to this command output. | 565 - output: Data to append to this command output. |
| 565 - output_chunk_start: Index of output in the stdout stream. | 566 - output_chunk_start: Index of output in the stdout stream. |
| 566 - exit_code: Mark that this task completed. | 567 - exit_code: Mark that this task completed. |
| 567 - duration: Time spent in seconds for this task, excluding overheads. | 568 - duration: Time spent in seconds for this task, excluding overheads. |
| 568 - hard_timeout: Bool set if an hard timeout occured. | 569 - hard_timeout: Bool set if an hard timeout occured. |
| 569 - io_timeout: Bool set if an I/O timeout occured. | 570 - io_timeout: Bool set if an I/O timeout occured. |
| 570 - cost_usd: Cost in $USD of this task up to now. | 571 - cost_usd: Cost in $USD of this task up to now. |
| 571 - outputs_ref: task_request.FilesRef instance or None. | 572 - outputs_ref: task_request.FilesRef instance or None. |
| 573 - cipd_pins: None or task_result.CipdPins |
| 572 - performance_stats: task_result.PerformanceStats instance or None. Can only | 574 - performance_stats: task_result.PerformanceStats instance or None. Can only |
| 573 be set when the task is completing. | 575 be set when the task is completing. |
| 574 | 576 |
| 575 Invalid states, these are flat out refused: | 577 Invalid states, these are flat out refused: |
| 576 - A command is updated after it had an exit code assigned to. | 578 - A command is updated after it had an exit code assigned to. |
| 577 | 579 |
| 578 Returns: | 580 Returns: |
| 579 TaskRunResult.state or None in case of failure. | 581 TaskRunResult.state or None in case of failure. |
| 580 """ | 582 """ |
| 581 assert output_chunk_start is None or isinstance(output_chunk_start, int) | 583 assert output_chunk_start is None or isinstance(output_chunk_start, int) |
| 582 assert output is None or isinstance(output, str) | 584 assert output is None or isinstance(output, str) |
| 583 if cost_usd is not None and cost_usd < 0.: | 585 if cost_usd is not None and cost_usd < 0.: |
| 584 raise ValueError('cost_usd must be None or greater or equal than 0') | 586 raise ValueError('cost_usd must be None or greater or equal than 0') |
| 585 if duration is not None and duration < 0.: | 587 if duration is not None and duration < 0.: |
| 586 raise ValueError('duration must be None or greater or equal than 0') | 588 raise ValueError('duration must be None or greater or equal than 0') |
| 587 if (duration is None) != (exit_code is None): | 589 if (duration is None) != (exit_code is None): |
| 588 raise ValueError( | 590 raise ValueError( |
| 589 'had unexpected duration; expected iff a command completes\n' | 591 'had unexpected duration; expected iff a command completes\n' |
| 590 'duration: %r; exit: %r' % (duration, exit_code)) | 592 'duration: %r; exit: %r' % (duration, exit_code)) |
| 591 if performance_stats and duration is None: | 593 if performance_stats and duration is None: |
| 592 raise ValueError( | 594 raise ValueError( |
| 593 'duration must be set when performance_stats is set\n' | 595 'duration must be set when performance_stats is set\n' |
| 594 'duration: %s; performance_stats: %s' % | 596 'duration: %s; performance_stats: %s' % |
| 595 (duration, performance_stats)) | 597 (duration, performance_stats)) |
| 596 | 598 |
| 597 packed = task_pack.pack_run_result_key(run_result_key) | 599 packed = task_pack.pack_run_result_key(run_result_key) |
| 598 logging.debug( | 600 logging.debug( |
| 599 'bot_update_task(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', | 601 'bot_update_task(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', |
| 600 packed, bot_id, len(output) if output else output, output_chunk_start, | 602 packed, bot_id, len(output) if output else output, output_chunk_start, |
| 601 exit_code, duration, hard_timeout, io_timeout, cost_usd, outputs_ref, | 603 exit_code, duration, hard_timeout, io_timeout, cost_usd, outputs_ref, |
| 602 performance_stats) | 604 cipd_pins, performance_stats) |
| 603 | 605 |
| 604 result_summary_key = task_pack.run_result_key_to_result_summary_key( | 606 result_summary_key = task_pack.run_result_key_to_result_summary_key( |
| 605 run_result_key) | 607 run_result_key) |
| 606 request_key = task_pack.result_summary_key_to_request_key(result_summary_key) | 608 request_key = task_pack.result_summary_key_to_request_key(result_summary_key) |
| 607 request_future = request_key.get_async() | 609 request_future = request_key.get_async() |
| 608 server_version = utils.get_app_version() | 610 server_version = utils.get_app_version() |
| 609 request = request_future.get_result() | 611 request = request_future.get_result() |
| 610 now = utils.utcnow() | 612 now = utils.utcnow() |
| 611 | 613 |
| 612 def run(): | 614 def run(): |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 648 result_summary_future.wait() | 650 result_summary_future.wait() |
| 649 return None, None, 'got 2 different durations; %s then %s' % ( | 651 return None, None, 'got 2 different durations; %s then %s' % ( |
| 650 run_result.duration, duration) | 652 run_result.duration, duration) |
| 651 else: | 653 else: |
| 652 run_result.duration = duration | 654 run_result.duration = duration |
| 653 run_result.exit_code = exit_code | 655 run_result.exit_code = exit_code |
| 654 | 656 |
| 655 if outputs_ref: | 657 if outputs_ref: |
| 656 run_result.outputs_ref = outputs_ref | 658 run_result.outputs_ref = outputs_ref |
| 657 | 659 |
| 660 if cipd_pins: |
| 661 run_result.cipd_pins = cipd_pins |
| 662 |
| 658 if run_result.state in task_result.State.STATES_RUNNING: | 663 if run_result.state in task_result.State.STATES_RUNNING: |
| 659 if hard_timeout or io_timeout: | 664 if hard_timeout or io_timeout: |
| 660 run_result.state = task_result.State.TIMED_OUT | 665 run_result.state = task_result.State.TIMED_OUT |
| 661 run_result.completed_ts = now | 666 run_result.completed_ts = now |
| 662 elif run_result.exit_code is not None: | 667 elif run_result.exit_code is not None: |
| 663 run_result.state = task_result.State.COMPLETED | 668 run_result.state = task_result.State.COMPLETED |
| 664 run_result.completed_ts = now | 669 run_result.completed_ts = now |
| 665 | 670 |
| 666 run_result.signal_server_version(server_version) | 671 run_result.signal_server_version(server_version) |
| 667 to_put = [run_result] | 672 to_put = [run_result] |
| (...skipping 225 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 893 ## Task queue tasks. | 898 ## Task queue tasks. |
| 894 | 899 |
| 895 | 900 |
| 896 def task_handle_pubsub_task(payload): | 901 def task_handle_pubsub_task(payload): |
| 897 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" | 902 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" |
| 898 # Do not catch errors to trigger task queue task retry. Errors should not | 903 # Do not catch errors to trigger task queue task retry. Errors should not |
| 899 # happen in normal case. | 904 # happen in normal case. |
| 900 _pubsub_notify( | 905 _pubsub_notify( |
| 901 payload['task_id'], payload['topic'], | 906 payload['task_id'], payload['topic'], |
| 902 payload['auth_token'], payload['userdata']) | 907 payload['auth_token'], payload['userdata']) |
| OLD | NEW |