OLD | NEW |
1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """High level tasks execution scheduling API. | 5 """High level tasks execution scheduling API. |
6 | 6 |
7 This is the interface closest to the HTTP handlers. | 7 This is the interface closest to the HTTP handlers. |
8 """ | 8 """ |
9 | 9 |
10 import datetime | 10 import datetime |
(...skipping 772 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
783 smry, run_result, error = datastore_utils.transaction(run) | 783 smry, run_result, error = datastore_utils.transaction(run) |
784 except datastore_utils.CommitError as e: | 784 except datastore_utils.CommitError as e: |
785 logging.info('Got commit error: %s', e) | 785 logging.info('Got commit error: %s', e) |
786 # It is important that the caller correctly surface this error. | 786 # It is important that the caller correctly surface this error. |
787 return None | 787 return None |
788 assert bool(error) != bool(run_result), (error, run_result) | 788 assert bool(error) != bool(run_result), (error, run_result) |
789 if error: | 789 if error: |
790 logging.error('Task %s %s', packed, error) | 790 logging.error('Task %s %s', packed, error) |
791 return None | 791 return None |
792 # Caller must retry if PubSub enqueue fails. | 792 # Caller must retry if PubSub enqueue fails. |
793 task_completed = run_result.state != task_result.State.RUNNING | |
794 if not _maybe_pubsub_notify_now(smry, request): | 793 if not _maybe_pubsub_notify_now(smry, request): |
795 return None | 794 return None |
796 if task_completed: | 795 if smry.state not in task_result.State.STATES_RUNNING: |
797 event_mon_metrics.send_task_event(smry) | 796 event_mon_metrics.send_task_event(smry) |
798 ts_mon_metrics.update_jobs_completed_metrics(smry) | 797 ts_mon_metrics.update_jobs_completed_metrics(smry) |
799 return run_result.state | 798 return run_result.state |
800 | 799 |
801 | 800 |
802 def bot_kill_task(run_result_key, bot_id): | 801 def bot_kill_task(run_result_key, bot_id): |
803 """Terminates a task that is currently running as an internal failure. | 802 """Terminates a task that is currently running as an internal failure. |
804 | 803 |
805 Returns: | 804 Returns: |
806 str if an error message. | 805 str if an error message. |
(...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
975 ## Task queue tasks. | 974 ## Task queue tasks. |
976 | 975 |
977 | 976 |
978 def task_handle_pubsub_task(payload): | 977 def task_handle_pubsub_task(payload): |
979 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" | 978 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" |
980 # Do not catch errors to trigger task queue task retry. Errors should not | 979 # Do not catch errors to trigger task queue task retry. Errors should not |
981 # happen in normal case. | 980 # happen in normal case. |
982 _pubsub_notify( | 981 _pubsub_notify( |
983 payload['task_id'], payload['topic'], | 982 payload['task_id'], payload['topic'], |
984 payload['auth_token'], payload['userdata']) | 983 payload['auth_token'], payload['userdata']) |
OLD | NEW |