OLD | NEW |
1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """High level tasks execution scheduling API. | 5 """High level tasks execution scheduling API. |
6 | 6 |
7 This is the interface closest to the HTTP handlers. | 7 This is the interface closest to the HTTP handlers. |
8 """ | 8 """ |
9 | 9 |
10 import contextlib | 10 import contextlib |
(...skipping 537 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
548 user=request.user) | 548 user=request.user) |
549 return request, run_result | 549 return request, run_result |
550 if failures: | 550 if failures: |
551 logging.info( | 551 logging.info( |
552 'Chose nothing (failed %d, skipped %d)', failures, total_skipped) | 552 'Chose nothing (failed %d, skipped %d)', failures, total_skipped) |
553 return None, None | 553 return None, None |
554 | 554 |
555 | 555 |
556 def bot_update_task( | 556 def bot_update_task( |
557 run_result_key, bot_id, output, output_chunk_start, exit_code, duration, | 557 run_result_key, bot_id, output, output_chunk_start, exit_code, duration, |
558 hard_timeout, io_timeout, cost_usd, outputs_ref, performance_stats): | 558 hard_timeout, io_timeout, cost_usd, outputs_ref, cipd_pins, |
| 559 performance_stats): |
559 """Updates a TaskRunResult and TaskResultSummary, along TaskOutput. | 560 """Updates a TaskRunResult and TaskResultSummary, along TaskOutput. |
560 | 561 |
561 Arguments: | 562 Arguments: |
562 - run_result_key: ndb.Key to TaskRunResult. | 563 - run_result_key: ndb.Key to TaskRunResult. |
563 - bot_id: Self advertised bot id to ensure it's the one expected. | 564 - bot_id: Self advertised bot id to ensure it's the one expected. |
564 - output: Data to append to this command output. | 565 - output: Data to append to this command output. |
565 - output_chunk_start: Index of output in the stdout stream. | 566 - output_chunk_start: Index of output in the stdout stream. |
566 - exit_code: Mark that this task completed. | 567 - exit_code: Mark that this task completed. |
567 - duration: Time spent in seconds for this task, excluding overheads. | 568 - duration: Time spent in seconds for this task, excluding overheads. |
568 - hard_timeout: Bool set if an hard timeout occured. | 569 - hard_timeout: Bool set if an hard timeout occured. |
569 - io_timeout: Bool set if an I/O timeout occured. | 570 - io_timeout: Bool set if an I/O timeout occured. |
570 - cost_usd: Cost in $USD of this task up to now. | 571 - cost_usd: Cost in $USD of this task up to now. |
571 - outputs_ref: task_request.FilesRef instance or None. | 572 - outputs_ref: task_request.FilesRef instance or None. |
| 573 - cipd_pins: list(task_request.CipdPackage) or None. |
572 - performance_stats: task_result.PerformanceStats instance or None. Can only | 574 - performance_stats: task_result.PerformanceStats instance or None. Can only |
573 be set when the task is completing. | 575 be set when the task is completing. |
574 | 576 |
575 Invalid states, these are flat out refused: | 577 Invalid states, these are flat out refused: |
576 - A command is updated after it had an exit code assigned to. | 578 - A command is updated after it had an exit code assigned to. |
577 | 579 |
578 Returns: | 580 Returns: |
579 TaskRunResult.state or None in case of failure. | 581 TaskRunResult.state or None in case of failure. |
580 """ | 582 """ |
581 assert output_chunk_start is None or isinstance(output_chunk_start, int) | 583 assert output_chunk_start is None or isinstance(output_chunk_start, int) |
582 assert output is None or isinstance(output, str) | 584 assert output is None or isinstance(output, str) |
583 if cost_usd is not None and cost_usd < 0.: | 585 if cost_usd is not None and cost_usd < 0.: |
584 raise ValueError('cost_usd must be None or greater or equal than 0') | 586 raise ValueError('cost_usd must be None or greater or equal than 0') |
585 if duration is not None and duration < 0.: | 587 if duration is not None and duration < 0.: |
586 raise ValueError('duration must be None or greater or equal than 0') | 588 raise ValueError('duration must be None or greater or equal than 0') |
587 if (duration is None) != (exit_code is None): | 589 if (duration is None) != (exit_code is None): |
588 raise ValueError( | 590 raise ValueError( |
589 'had unexpected duration; expected iff a command completes\n' | 591 'had unexpected duration; expected iff a command completes\n' |
590 'duration: %r; exit: %r' % (duration, exit_code)) | 592 'duration: %r; exit: %r' % (duration, exit_code)) |
591 if performance_stats and duration is None: | 593 if performance_stats and duration is None: |
592 raise ValueError( | 594 raise ValueError( |
593 'duration must be set when performance_stats is set\n' | 595 'duration must be set when performance_stats is set\n' |
594 'duration: %s; performance_stats: %s' % | 596 'duration: %s; performance_stats: %s' % |
595 (duration, performance_stats)) | 597 (duration, performance_stats)) |
596 | 598 |
597 packed = task_pack.pack_run_result_key(run_result_key) | 599 packed = task_pack.pack_run_result_key(run_result_key) |
598 logging.debug( | 600 logging.debug( |
599 'bot_update_task(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', | 601 'bot_update_task(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', |
600 packed, bot_id, len(output) if output else output, output_chunk_start, | 602 packed, bot_id, len(output) if output else output, output_chunk_start, |
601 exit_code, duration, hard_timeout, io_timeout, cost_usd, outputs_ref, | 603 exit_code, duration, hard_timeout, io_timeout, cost_usd, outputs_ref, |
602 performance_stats) | 604 cipd_pins, performance_stats) |
603 | 605 |
604 result_summary_key = task_pack.run_result_key_to_result_summary_key( | 606 result_summary_key = task_pack.run_result_key_to_result_summary_key( |
605 run_result_key) | 607 run_result_key) |
606 request_key = task_pack.result_summary_key_to_request_key(result_summary_key) | 608 request_key = task_pack.result_summary_key_to_request_key(result_summary_key) |
607 request_future = request_key.get_async() | 609 request_future = request_key.get_async() |
608 server_version = utils.get_app_version() | 610 server_version = utils.get_app_version() |
609 request = request_future.get_result() | 611 request = request_future.get_result() |
610 now = utils.utcnow() | 612 now = utils.utcnow() |
611 | 613 |
612 def run(): | 614 def run(): |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
648 result_summary_future.wait() | 650 result_summary_future.wait() |
649 return None, None, 'got 2 different durations; %s then %s' % ( | 651 return None, None, 'got 2 different durations; %s then %s' % ( |
650 run_result.duration, duration) | 652 run_result.duration, duration) |
651 else: | 653 else: |
652 run_result.duration = duration | 654 run_result.duration = duration |
653 run_result.exit_code = exit_code | 655 run_result.exit_code = exit_code |
654 | 656 |
655 if outputs_ref: | 657 if outputs_ref: |
656 run_result.outputs_ref = outputs_ref | 658 run_result.outputs_ref = outputs_ref |
657 | 659 |
| 660 if cipd_pins: |
| 661 run_result.cipd_pins = cipd_pins |
| 662 |
658 if run_result.state in task_result.State.STATES_RUNNING: | 663 if run_result.state in task_result.State.STATES_RUNNING: |
659 if hard_timeout or io_timeout: | 664 if hard_timeout or io_timeout: |
660 run_result.state = task_result.State.TIMED_OUT | 665 run_result.state = task_result.State.TIMED_OUT |
661 run_result.completed_ts = now | 666 run_result.completed_ts = now |
662 elif run_result.exit_code is not None: | 667 elif run_result.exit_code is not None: |
663 run_result.state = task_result.State.COMPLETED | 668 run_result.state = task_result.State.COMPLETED |
664 run_result.completed_ts = now | 669 run_result.completed_ts = now |
665 | 670 |
666 run_result.signal_server_version(server_version) | 671 run_result.signal_server_version(server_version) |
667 to_put = [run_result] | 672 to_put = [run_result] |
(...skipping 225 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
893 ## Task queue tasks. | 898 ## Task queue tasks. |
894 | 899 |
895 | 900 |
896 def task_handle_pubsub_task(payload): | 901 def task_handle_pubsub_task(payload): |
897 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" | 902 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" |
898 # Do not catch errors to trigger task queue task retry. Errors should not | 903 # Do not catch errors to trigger task queue task retry. Errors should not |
899 # happen in normal case. | 904 # happen in normal case. |
900 _pubsub_notify( | 905 _pubsub_notify( |
901 payload['task_id'], payload['topic'], | 906 payload['task_id'], payload['topic'], |
902 payload['auth_token'], payload['userdata']) | 907 payload['auth_token'], payload['userdata']) |
OLD | NEW |