OLD | NEW |
1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """High level tasks execution scheduling API. | 5 """High level tasks execution scheduling API. |
6 | 6 |
7 This is the interface closest to the HTTP handlers. | 7 This is the interface closest to the HTTP handlers. |
8 """ | 8 """ |
9 | 9 |
10 import datetime | 10 import datetime |
(...skipping 590 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
601 user=request.user) | 601 user=request.user) |
602 return request, run_result | 602 return request, run_result |
603 if failures: | 603 if failures: |
604 logging.info( | 604 logging.info( |
605 'Chose nothing (failed %d, skipped %d)', failures, total_skipped) | 605 'Chose nothing (failed %d, skipped %d)', failures, total_skipped) |
606 return None, None | 606 return None, None |
607 | 607 |
608 | 608 |
609 def bot_update_task( | 609 def bot_update_task( |
610 run_result_key, bot_id, output, output_chunk_start, exit_code, duration, | 610 run_result_key, bot_id, output, output_chunk_start, exit_code, duration, |
611 hard_timeout, io_timeout, cost_usd, outputs_ref, performance_stats): | 611 hard_timeout, io_timeout, cost_usd, outputs_ref, cipd_pins, |
| 612 performance_stats): |
612 """Updates a TaskRunResult and TaskResultSummary, along TaskOutput. | 613 """Updates a TaskRunResult and TaskResultSummary, along TaskOutput. |
613 | 614 |
614 Arguments: | 615 Arguments: |
615 - run_result_key: ndb.Key to TaskRunResult. | 616 - run_result_key: ndb.Key to TaskRunResult. |
616 - bot_id: Self advertised bot id to ensure it's the one expected. | 617 - bot_id: Self advertised bot id to ensure it's the one expected. |
617 - output: Data to append to this command output. | 618 - output: Data to append to this command output. |
618 - output_chunk_start: Index of output in the stdout stream. | 619 - output_chunk_start: Index of output in the stdout stream. |
619 - exit_code: Mark that this task completed. | 620 - exit_code: Mark that this task completed. |
620 - duration: Time spent in seconds for this task, excluding overheads. | 621 - duration: Time spent in seconds for this task, excluding overheads. |
621 - hard_timeout: Bool set if an hard timeout occured. | 622 - hard_timeout: Bool set if an hard timeout occured. |
622 - io_timeout: Bool set if an I/O timeout occured. | 623 - io_timeout: Bool set if an I/O timeout occured. |
623 - cost_usd: Cost in $USD of this task up to now. | 624 - cost_usd: Cost in $USD of this task up to now. |
624 - outputs_ref: task_request.FilesRef instance or None. | 625 - outputs_ref: task_request.FilesRef instance or None. |
| 626 - cipd_pins: None or task_result.CipdPins |
625 - performance_stats: task_result.PerformanceStats instance or None. Can only | 627 - performance_stats: task_result.PerformanceStats instance or None. Can only |
626 be set when the task is completing. | 628 be set when the task is completing. |
627 | 629 |
628 Invalid states, these are flat out refused: | 630 Invalid states, these are flat out refused: |
629 - A command is updated after it had an exit code assigned to. | 631 - A command is updated after it had an exit code assigned to. |
630 | 632 |
631 Returns: | 633 Returns: |
632 TaskRunResult.state or None in case of failure. | 634 TaskRunResult.state or None in case of failure. |
633 """ | 635 """ |
634 assert output_chunk_start is None or isinstance(output_chunk_start, int) | 636 assert output_chunk_start is None or isinstance(output_chunk_start, int) |
635 assert output is None or isinstance(output, str) | 637 assert output is None or isinstance(output, str) |
636 if cost_usd is not None and cost_usd < 0.: | 638 if cost_usd is not None and cost_usd < 0.: |
637 raise ValueError('cost_usd must be None or greater or equal than 0') | 639 raise ValueError('cost_usd must be None or greater or equal than 0') |
638 if duration is not None and duration < 0.: | 640 if duration is not None and duration < 0.: |
639 raise ValueError('duration must be None or greater or equal than 0') | 641 raise ValueError('duration must be None or greater or equal than 0') |
640 if (duration is None) != (exit_code is None): | 642 if (duration is None) != (exit_code is None): |
641 raise ValueError( | 643 raise ValueError( |
642 'had unexpected duration; expected iff a command completes\n' | 644 'had unexpected duration; expected iff a command completes\n' |
643 'duration: %r; exit: %r' % (duration, exit_code)) | 645 'duration: %r; exit: %r' % (duration, exit_code)) |
644 if performance_stats and duration is None: | 646 if performance_stats and duration is None: |
645 raise ValueError( | 647 raise ValueError( |
646 'duration must be set when performance_stats is set\n' | 648 'duration must be set when performance_stats is set\n' |
647 'duration: %s; performance_stats: %s' % | 649 'duration: %s; performance_stats: %s' % |
648 (duration, performance_stats)) | 650 (duration, performance_stats)) |
649 | 651 |
650 packed = task_pack.pack_run_result_key(run_result_key) | 652 packed = task_pack.pack_run_result_key(run_result_key) |
651 logging.debug( | 653 logging.debug( |
652 'bot_update_task(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', | 654 'bot_update_task(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', |
653 packed, bot_id, len(output) if output else output, output_chunk_start, | 655 packed, bot_id, len(output) if output else output, output_chunk_start, |
654 exit_code, duration, hard_timeout, io_timeout, cost_usd, outputs_ref, | 656 exit_code, duration, hard_timeout, io_timeout, cost_usd, outputs_ref, |
655 performance_stats) | 657 cipd_pins, performance_stats) |
656 | 658 |
657 result_summary_key = task_pack.run_result_key_to_result_summary_key( | 659 result_summary_key = task_pack.run_result_key_to_result_summary_key( |
658 run_result_key) | 660 run_result_key) |
659 request_key = task_pack.result_summary_key_to_request_key(result_summary_key) | 661 request_key = task_pack.result_summary_key_to_request_key(result_summary_key) |
660 request_future = request_key.get_async() | 662 request_future = request_key.get_async() |
661 server_version = utils.get_app_version() | 663 server_version = utils.get_app_version() |
662 request = request_future.get_result() | 664 request = request_future.get_result() |
663 now = utils.utcnow() | 665 now = utils.utcnow() |
664 | 666 |
665 def run(): | 667 def run(): |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
701 result_summary_future.wait() | 703 result_summary_future.wait() |
702 return None, None, 'got 2 different durations; %s then %s' % ( | 704 return None, None, 'got 2 different durations; %s then %s' % ( |
703 run_result.duration, duration) | 705 run_result.duration, duration) |
704 else: | 706 else: |
705 run_result.duration = duration | 707 run_result.duration = duration |
706 run_result.exit_code = exit_code | 708 run_result.exit_code = exit_code |
707 | 709 |
708 if outputs_ref: | 710 if outputs_ref: |
709 run_result.outputs_ref = outputs_ref | 711 run_result.outputs_ref = outputs_ref |
710 | 712 |
| 713 if cipd_pins: |
| 714 run_result.cipd_pins = cipd_pins |
| 715 |
711 if run_result.state in task_result.State.STATES_RUNNING: | 716 if run_result.state in task_result.State.STATES_RUNNING: |
712 if hard_timeout or io_timeout: | 717 if hard_timeout or io_timeout: |
713 run_result.state = task_result.State.TIMED_OUT | 718 run_result.state = task_result.State.TIMED_OUT |
714 run_result.completed_ts = now | 719 run_result.completed_ts = now |
715 elif run_result.exit_code is not None: | 720 elif run_result.exit_code is not None: |
716 run_result.state = task_result.State.COMPLETED | 721 run_result.state = task_result.State.COMPLETED |
717 run_result.completed_ts = now | 722 run_result.completed_ts = now |
718 | 723 |
719 run_result.signal_server_version(server_version) | 724 run_result.signal_server_version(server_version) |
| 725 run_result.validate(request) |
720 to_put = [run_result] | 726 to_put = [run_result] |
721 if output: | 727 if output: |
722 # This does 1 multi GETs. This also modifies run_result in place. | 728 # This does 1 multi GETs. This also modifies run_result in place. |
723 to_put.extend(run_result.append_output(output, output_chunk_start or 0)) | 729 to_put.extend(run_result.append_output(output, output_chunk_start or 0)) |
724 if performance_stats: | 730 if performance_stats: |
725 performance_stats.key = task_pack.run_result_key_to_performance_stats_key( | 731 performance_stats.key = task_pack.run_result_key_to_performance_stats_key( |
726 run_result.key) | 732 run_result.key) |
727 to_put.append(performance_stats) | 733 to_put.append(performance_stats) |
728 | 734 |
729 run_result.cost_usd = max(cost_usd, run_result.cost_usd or 0.) | 735 run_result.cost_usd = max(cost_usd, run_result.cost_usd or 0.) |
730 run_result.modified_ts = now | 736 run_result.modified_ts = now |
731 | 737 |
732 result_summary = result_summary_future.get_result() | 738 result_summary = result_summary_future.get_result() |
733 if (result_summary.try_number and | 739 if (result_summary.try_number and |
734 result_summary.try_number > run_result.try_number): | 740 result_summary.try_number > run_result.try_number): |
735 # The situation where a shard is retried but the bot running the previous | 741 # The situation where a shard is retried but the bot running the previous |
736 # try somehow reappears and reports success, the result must still show | 742 # try somehow reappears and reports success, the result must still show |
737 # the last try's result. We still need to update cost_usd manually. | 743 # the last try's result. We still need to update cost_usd manually. |
738 result_summary.costs_usd[run_result.try_number-1] = run_result.cost_usd | 744 result_summary.costs_usd[run_result.try_number-1] = run_result.cost_usd |
739 result_summary.modified_ts = now | 745 result_summary.modified_ts = now |
740 else: | 746 else: |
741 result_summary.set_from_run_result(run_result, request) | 747 result_summary.set_from_run_result(run_result, request) |
742 | 748 |
| 749 result_summary.validate(request) |
743 to_put.append(result_summary) | 750 to_put.append(result_summary) |
744 ndb.put_multi(to_put) | 751 ndb.put_multi(to_put) |
745 | 752 |
746 return result_summary, run_result, None | 753 return result_summary, run_result, None |
747 | 754 |
748 try: | 755 try: |
749 smry, run_result, error = datastore_utils.transaction(run) | 756 smry, run_result, error = datastore_utils.transaction(run) |
750 except datastore_utils.CommitError as e: | 757 except datastore_utils.CommitError as e: |
751 logging.info('Got commit error: %s', e) | 758 logging.info('Got commit error: %s', e) |
752 # It is important that the caller correctly surface this error. | 759 # It is important that the caller correctly surface this error. |
(...skipping 193 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
946 ## Task queue tasks. | 953 ## Task queue tasks. |
947 | 954 |
948 | 955 |
949 def task_handle_pubsub_task(payload): | 956 def task_handle_pubsub_task(payload): |
950 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" | 957 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" |
951 # Do not catch errors to trigger task queue task retry. Errors should not | 958 # Do not catch errors to trigger task queue task retry. Errors should not |
952 # happen in normal case. | 959 # happen in normal case. |
953 _pubsub_notify( | 960 _pubsub_notify( |
954 payload['task_id'], payload['topic'], | 961 payload['task_id'], payload['topic'], |
955 payload['auth_token'], payload['userdata']) | 962 payload['auth_token'], payload['userdata']) |
OLD | NEW |