| OLD | NEW |
| 1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """High level tasks execution scheduling API. | 5 """High level tasks execution scheduling API. |
| 6 | 6 |
| 7 This is the interface closest to the HTTP handlers. | 7 This is the interface closest to the HTTP handlers. |
| 8 """ | 8 """ |
| 9 | 9 |
| 10 import datetime | 10 import datetime |
| (...skipping 590 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 601 user=request.user) | 601 user=request.user) |
| 602 return request, run_result | 602 return request, run_result |
| 603 if failures: | 603 if failures: |
| 604 logging.info( | 604 logging.info( |
| 605 'Chose nothing (failed %d, skipped %d)', failures, total_skipped) | 605 'Chose nothing (failed %d, skipped %d)', failures, total_skipped) |
| 606 return None, None | 606 return None, None |
| 607 | 607 |
| 608 | 608 |
| 609 def bot_update_task( | 609 def bot_update_task( |
| 610 run_result_key, bot_id, output, output_chunk_start, exit_code, duration, | 610 run_result_key, bot_id, output, output_chunk_start, exit_code, duration, |
| 611 hard_timeout, io_timeout, cost_usd, outputs_ref, performance_stats): | 611 hard_timeout, io_timeout, cost_usd, outputs_ref, cipd_pins, |
| 612 performance_stats): |
| 612 """Updates a TaskRunResult and TaskResultSummary, along TaskOutput. | 613 """Updates a TaskRunResult and TaskResultSummary, along TaskOutput. |
| 613 | 614 |
| 614 Arguments: | 615 Arguments: |
| 615 - run_result_key: ndb.Key to TaskRunResult. | 616 - run_result_key: ndb.Key to TaskRunResult. |
| 616 - bot_id: Self advertised bot id to ensure it's the one expected. | 617 - bot_id: Self advertised bot id to ensure it's the one expected. |
| 617 - output: Data to append to this command output. | 618 - output: Data to append to this command output. |
| 618 - output_chunk_start: Index of output in the stdout stream. | 619 - output_chunk_start: Index of output in the stdout stream. |
| 619 - exit_code: Mark that this task completed. | 620 - exit_code: Mark that this task completed. |
| 620 - duration: Time spent in seconds for this task, excluding overheads. | 621 - duration: Time spent in seconds for this task, excluding overheads. |
| 621 - hard_timeout: Bool set if an hard timeout occured. | 622 - hard_timeout: Bool set if an hard timeout occured. |
| 622 - io_timeout: Bool set if an I/O timeout occured. | 623 - io_timeout: Bool set if an I/O timeout occured. |
| 623 - cost_usd: Cost in $USD of this task up to now. | 624 - cost_usd: Cost in $USD of this task up to now. |
| 624 - outputs_ref: task_request.FilesRef instance or None. | 625 - outputs_ref: task_request.FilesRef instance or None. |
| 626 - cipd_pins: None or task_result.CipdPins |
| 625 - performance_stats: task_result.PerformanceStats instance or None. Can only | 627 - performance_stats: task_result.PerformanceStats instance or None. Can only |
| 626 be set when the task is completing. | 628 be set when the task is completing. |
| 627 | 629 |
| 628 Invalid states, these are flat out refused: | 630 Invalid states, these are flat out refused: |
| 629 - A command is updated after it had an exit code assigned to. | 631 - A command is updated after it had an exit code assigned to. |
| 630 | 632 |
| 631 Returns: | 633 Returns: |
| 632 TaskRunResult.state or None in case of failure. | 634 TaskRunResult.state or None in case of failure. |
| 633 """ | 635 """ |
| 634 assert output_chunk_start is None or isinstance(output_chunk_start, int) | 636 assert output_chunk_start is None or isinstance(output_chunk_start, int) |
| 635 assert output is None or isinstance(output, str) | 637 assert output is None or isinstance(output, str) |
| 636 if cost_usd is not None and cost_usd < 0.: | 638 if cost_usd is not None and cost_usd < 0.: |
| 637 raise ValueError('cost_usd must be None or greater or equal than 0') | 639 raise ValueError('cost_usd must be None or greater or equal than 0') |
| 638 if duration is not None and duration < 0.: | 640 if duration is not None and duration < 0.: |
| 639 raise ValueError('duration must be None or greater or equal than 0') | 641 raise ValueError('duration must be None or greater or equal than 0') |
| 640 if (duration is None) != (exit_code is None): | 642 if (duration is None) != (exit_code is None): |
| 641 raise ValueError( | 643 raise ValueError( |
| 642 'had unexpected duration; expected iff a command completes\n' | 644 'had unexpected duration; expected iff a command completes\n' |
| 643 'duration: %r; exit: %r' % (duration, exit_code)) | 645 'duration: %r; exit: %r' % (duration, exit_code)) |
| 644 if performance_stats and duration is None: | 646 if performance_stats and duration is None: |
| 645 raise ValueError( | 647 raise ValueError( |
| 646 'duration must be set when performance_stats is set\n' | 648 'duration must be set when performance_stats is set\n' |
| 647 'duration: %s; performance_stats: %s' % | 649 'duration: %s; performance_stats: %s' % |
| 648 (duration, performance_stats)) | 650 (duration, performance_stats)) |
| 649 | 651 |
| 650 packed = task_pack.pack_run_result_key(run_result_key) | 652 packed = task_pack.pack_run_result_key(run_result_key) |
| 651 logging.debug( | 653 logging.debug( |
| 652 'bot_update_task(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', | 654 'bot_update_task(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', |
| 653 packed, bot_id, len(output) if output else output, output_chunk_start, | 655 packed, bot_id, len(output) if output else output, output_chunk_start, |
| 654 exit_code, duration, hard_timeout, io_timeout, cost_usd, outputs_ref, | 656 exit_code, duration, hard_timeout, io_timeout, cost_usd, outputs_ref, |
| 655 performance_stats) | 657 cipd_pins, performance_stats) |
| 656 | 658 |
| 657 result_summary_key = task_pack.run_result_key_to_result_summary_key( | 659 result_summary_key = task_pack.run_result_key_to_result_summary_key( |
| 658 run_result_key) | 660 run_result_key) |
| 659 request_key = task_pack.result_summary_key_to_request_key(result_summary_key) | 661 request_key = task_pack.result_summary_key_to_request_key(result_summary_key) |
| 660 request_future = request_key.get_async() | 662 request_future = request_key.get_async() |
| 661 server_version = utils.get_app_version() | 663 server_version = utils.get_app_version() |
| 662 request = request_future.get_result() | 664 request = request_future.get_result() |
| 663 now = utils.utcnow() | 665 now = utils.utcnow() |
| 664 | 666 |
| 665 def run(): | 667 def run(): |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 701 result_summary_future.wait() | 703 result_summary_future.wait() |
| 702 return None, None, 'got 2 different durations; %s then %s' % ( | 704 return None, None, 'got 2 different durations; %s then %s' % ( |
| 703 run_result.duration, duration) | 705 run_result.duration, duration) |
| 704 else: | 706 else: |
| 705 run_result.duration = duration | 707 run_result.duration = duration |
| 706 run_result.exit_code = exit_code | 708 run_result.exit_code = exit_code |
| 707 | 709 |
| 708 if outputs_ref: | 710 if outputs_ref: |
| 709 run_result.outputs_ref = outputs_ref | 711 run_result.outputs_ref = outputs_ref |
| 710 | 712 |
| 713 if cipd_pins: |
| 714 run_result.cipd_pins = cipd_pins |
| 715 |
| 711 if run_result.state in task_result.State.STATES_RUNNING: | 716 if run_result.state in task_result.State.STATES_RUNNING: |
| 712 if hard_timeout or io_timeout: | 717 if hard_timeout or io_timeout: |
| 713 run_result.state = task_result.State.TIMED_OUT | 718 run_result.state = task_result.State.TIMED_OUT |
| 714 run_result.completed_ts = now | 719 run_result.completed_ts = now |
| 715 elif run_result.exit_code is not None: | 720 elif run_result.exit_code is not None: |
| 716 run_result.state = task_result.State.COMPLETED | 721 run_result.state = task_result.State.COMPLETED |
| 717 run_result.completed_ts = now | 722 run_result.completed_ts = now |
| 718 | 723 |
| 719 run_result.signal_server_version(server_version) | 724 run_result.signal_server_version(server_version) |
| 725 run_result.validate(request) |
| 720 to_put = [run_result] | 726 to_put = [run_result] |
| 721 if output: | 727 if output: |
| 722 # This does 1 multi GETs. This also modifies run_result in place. | 728 # This does 1 multi GETs. This also modifies run_result in place. |
| 723 to_put.extend(run_result.append_output(output, output_chunk_start or 0)) | 729 to_put.extend(run_result.append_output(output, output_chunk_start or 0)) |
| 724 if performance_stats: | 730 if performance_stats: |
| 725 performance_stats.key = task_pack.run_result_key_to_performance_stats_key( | 731 performance_stats.key = task_pack.run_result_key_to_performance_stats_key( |
| 726 run_result.key) | 732 run_result.key) |
| 727 to_put.append(performance_stats) | 733 to_put.append(performance_stats) |
| 728 | 734 |
| 729 run_result.cost_usd = max(cost_usd, run_result.cost_usd or 0.) | 735 run_result.cost_usd = max(cost_usd, run_result.cost_usd or 0.) |
| 730 run_result.modified_ts = now | 736 run_result.modified_ts = now |
| 731 | 737 |
| 732 result_summary = result_summary_future.get_result() | 738 result_summary = result_summary_future.get_result() |
| 733 if (result_summary.try_number and | 739 if (result_summary.try_number and |
| 734 result_summary.try_number > run_result.try_number): | 740 result_summary.try_number > run_result.try_number): |
| 735 # The situation where a shard is retried but the bot running the previous | 741 # The situation where a shard is retried but the bot running the previous |
| 736 # try somehow reappears and reports success, the result must still show | 742 # try somehow reappears and reports success, the result must still show |
| 737 # the last try's result. We still need to update cost_usd manually. | 743 # the last try's result. We still need to update cost_usd manually. |
| 738 result_summary.costs_usd[run_result.try_number-1] = run_result.cost_usd | 744 result_summary.costs_usd[run_result.try_number-1] = run_result.cost_usd |
| 739 result_summary.modified_ts = now | 745 result_summary.modified_ts = now |
| 740 else: | 746 else: |
| 741 result_summary.set_from_run_result(run_result, request) | 747 result_summary.set_from_run_result(run_result, request) |
| 742 | 748 |
| 749 result_summary.validate(request) |
| 743 to_put.append(result_summary) | 750 to_put.append(result_summary) |
| 744 ndb.put_multi(to_put) | 751 ndb.put_multi(to_put) |
| 745 | 752 |
| 746 return result_summary, run_result, None | 753 return result_summary, run_result, None |
| 747 | 754 |
| 748 try: | 755 try: |
| 749 smry, run_result, error = datastore_utils.transaction(run) | 756 smry, run_result, error = datastore_utils.transaction(run) |
| 750 except datastore_utils.CommitError as e: | 757 except datastore_utils.CommitError as e: |
| 751 logging.info('Got commit error: %s', e) | 758 logging.info('Got commit error: %s', e) |
| 752 # It is important that the caller correctly surface this error. | 759 # It is important that the caller correctly surface this error. |
| (...skipping 193 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 946 ## Task queue tasks. | 953 ## Task queue tasks. |
| 947 | 954 |
| 948 | 955 |
| 949 def task_handle_pubsub_task(payload): | 956 def task_handle_pubsub_task(payload): |
| 950 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" | 957 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" |
| 951 # Do not catch errors to trigger task queue task retry. Errors should not | 958 # Do not catch errors to trigger task queue task retry. Errors should not |
| 952 # happen in normal case. | 959 # happen in normal case. |
| 953 _pubsub_notify( | 960 _pubsub_notify( |
| 954 payload['task_id'], payload['topic'], | 961 payload['task_id'], payload['topic'], |
| 955 payload['auth_token'], payload['userdata']) | 962 payload['auth_token'], payload['userdata']) |
| OLD | NEW |