| OLD | NEW |
| 1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """High level tasks execution scheduling API. | 5 """High level tasks execution scheduling API. |
| 6 | 6 |
| 7 This is the interface closest to the HTTP handlers. | 7 This is the interface closest to the HTTP handlers. |
| 8 """ | 8 """ |
| 9 | 9 |
| 10 import datetime | 10 import datetime |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 132 if not to_run: | 132 if not to_run: |
| 133 logging.error('Missing TaskToRun?\n%s', result_summary.task_id) | 133 logging.error('Missing TaskToRun?\n%s', result_summary.task_id) |
| 134 return None, None | 134 return None, None |
| 135 if not to_run.is_reapable: | 135 if not to_run.is_reapable: |
| 136 logging.info('%s is not reapable', result_summary.task_id) | 136 logging.info('%s is not reapable', result_summary.task_id) |
| 137 return None, None | 137 return None, None |
| 138 if result_summary.bot_id == bot_id: | 138 if result_summary.bot_id == bot_id: |
| 139 # This means two things, first it's a retry, second it's that the first | 139 # This means two things, first it's a retry, second it's that the first |
| 140 # try failed and the retry is being reaped by the same bot. Deny that, as | 140 # try failed and the retry is being reaped by the same bot. Deny that, as |
| 141 # the bot may be deeply broken and could be in a killing spree. | 141 # the bot may be deeply broken and could be in a killing spree. |
| 142 # TODO(maruel): Allow retry for bot locked task using 'id' dimension. |
| 142 logging.warning( | 143 logging.warning( |
| 143 '%s can\'t retry its own internal failure task', | 144 '%s can\'t retry its own internal failure task', |
| 144 result_summary.task_id) | 145 result_summary.task_id) |
| 145 return None, None | 146 return None, None |
| 146 to_run.queue_number = None | 147 to_run.queue_number = None |
| 147 run_result = task_result.new_run_result( | 148 run_result = task_result.new_run_result( |
| 148 request, (result_summary.try_number or 0) + 1, bot_id, bot_version, | 149 request, (result_summary.try_number or 0) + 1, bot_id, bot_version, |
| 149 bot_dimensions) | 150 bot_dimensions) |
| 151 # Upon bot reap, both .started_ts and .modified_ts matches. They differ on |
| 152 # the first ping. |
| 153 run_result.started_ts = now |
| 150 run_result.modified_ts = now | 154 run_result.modified_ts = now |
| 151 result_summary.set_from_run_result(run_result, request) | 155 result_summary.set_from_run_result(run_result, request) |
| 152 ndb.put_multi([to_run, run_result, result_summary]) | 156 ndb.put_multi([to_run, run_result, result_summary]) |
| 153 if result_summary.state != orig_summary_state: | 157 if result_summary.state != orig_summary_state: |
| 154 _maybe_pubsub_notify_via_tq(result_summary, request) | 158 _maybe_pubsub_notify_via_tq(result_summary, request) |
| 155 return run_result, secret_bytes | 159 return run_result, secret_bytes |
| 156 | 160 |
| 157 # The bot will reap the next available task in case of failure, no big deal. | 161 # The bot will reap the next available task in case of failure, no big deal. |
| 158 try: | 162 try: |
| 159 run_result, secret_bytes = datastore_utils.transaction(run, retries=0) | 163 run_result, secret_bytes = datastore_utils.transaction(run, retries=0) |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 192 run_result, result_summary, to_run = ndb.get_multi( | 196 run_result, result_summary, to_run = ndb.get_multi( |
| 193 (run_result_key, result_summary_key, to_run_key)) | 197 (run_result_key, result_summary_key, to_run_key)) |
| 194 if run_result.state != task_result.State.RUNNING: | 198 if run_result.state != task_result.State.RUNNING: |
| 195 # It was updated already or not updating last. Likely DB index was stale. | 199 # It was updated already or not updating last. Likely DB index was stale. |
| 196 return None, run_result.bot_id | 200 return None, run_result.bot_id |
| 197 if run_result.modified_ts > now - task_result.BOT_PING_TOLERANCE: | 201 if run_result.modified_ts > now - task_result.BOT_PING_TOLERANCE: |
| 198 # The query index IS stale. | 202 # The query index IS stale. |
| 199 return None, run_result.bot_id | 203 return None, run_result.bot_id |
| 200 | 204 |
| 201 run_result.signal_server_version(server_version) | 205 run_result.signal_server_version(server_version) |
| 206 old_modified = run_result.modified_ts |
| 202 run_result.modified_ts = now | 207 run_result.modified_ts = now |
| 203 | 208 |
| 204 orig_summary_state = result_summary.state | 209 orig_summary_state = result_summary.state |
| 205 if result_summary.try_number != run_result.try_number: | 210 if result_summary.try_number != run_result.try_number: |
| 206 # Not updating correct run_result, cancel it without touching | 211 # Not updating correct run_result, cancel it without touching |
| 207 # result_summary. | 212 # result_summary. |
| 208 to_put = (run_result,) | 213 to_put = (run_result,) |
| 209 run_result.state = task_result.State.BOT_DIED | 214 run_result.state = task_result.State.BOT_DIED |
| 210 run_result.internal_failure = True | 215 run_result.internal_failure = True |
| 211 run_result.abandoned_ts = now | 216 run_result.abandoned_ts = now |
| 212 task_is_retried = None | 217 task_is_retried = None |
| 213 elif (result_summary.try_number == 1 and now < request.expiration_ts and | 218 elif (result_summary.try_number == 1 and now < request.expiration_ts and |
| 214 request.properties.idempotent): | 219 (request.properties.idempotent or |
| 215 # Retry it. | 220 run_result.started_ts == old_modified)): |
| 221 # Retry it. It fits: |
| 222 # - first try |
| 223 # - not yet expired |
| 224 # - One of: |
| 225 # - idempotent |
| 226 # - task hadn't got any ping at all from task_runner.run_command() |
| 227 # TODO(maruel): Allow retry for bot locked task using 'id' dimension. |
| 216 to_put = (run_result, result_summary, to_run) | 228 to_put = (run_result, result_summary, to_run) |
| 217 to_run.queue_number = task_to_run.gen_queue_number(request) | 229 to_run.queue_number = task_to_run.gen_queue_number(request) |
| 218 run_result.state = task_result.State.BOT_DIED | 230 run_result.state = task_result.State.BOT_DIED |
| 219 run_result.internal_failure = True | 231 run_result.internal_failure = True |
| 220 run_result.abandoned_ts = now | 232 run_result.abandoned_ts = now |
| 221 # Do not sync data from run_result to result_summary, since the task is | 233 # Do not sync data from run_result to result_summary, since the task is |
| 222 # being retried. | 234 # being retried. |
| 223 result_summary.reset_to_pending() | 235 result_summary.reset_to_pending() |
| 224 result_summary.modified_ts = now | 236 result_summary.modified_ts = now |
| 225 task_is_retried = True | 237 task_is_retried = True |
| (...skipping 676 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 902 for i in killed)) | 914 for i in killed)) |
| 903 logging.info('Killed %d task, skipped %d', len(killed), skipped) | 915 logging.info('Killed %d task, skipped %d', len(killed), skipped) |
| 904 return [i.task_id for i in killed] | 916 return [i.task_id for i in killed] |
| 905 | 917 |
| 906 | 918 |
| 907 def cron_handle_bot_died(host): | 919 def cron_handle_bot_died(host): |
| 908 """Aborts or retry stale TaskRunResult where the bot stopped sending updates. | 920 """Aborts or retry stale TaskRunResult where the bot stopped sending updates. |
| 909 | 921 |
| 910 If the task was at its first try, it'll be retried. Otherwise the task will be | 922 If the task was at its first try, it'll be retried. Otherwise the task will be |
| 911 canceled. | 923 canceled. |
| 924 |
| 925 Returns: |
| 926 - task IDs killed |
| 927 - number of task retried |
| 928 - number of task ignored |
| 912 """ | 929 """ |
| 913 ignored = 0 | 930 ignored = 0 |
| 914 killed = [] | 931 killed = [] |
| 915 retried = 0 | 932 retried = 0 |
| 916 try: | 933 try: |
| 917 for run_result_key in task_result.yield_run_result_keys_with_dead_bot(): | 934 for run_result_key in task_result.yield_run_result_keys_with_dead_bot(): |
| 918 result = _handle_dead_bot(run_result_key) | 935 result = _handle_dead_bot(run_result_key) |
| 919 if result is True: | 936 if result is True: |
| 920 retried += 1 | 937 retried += 1 |
| 921 elif result is False: | 938 elif result is False: |
| (...skipping 14 matching lines...) Expand all Loading... |
| 936 ## Task queue tasks. | 953 ## Task queue tasks. |
| 937 | 954 |
| 938 | 955 |
| 939 def task_handle_pubsub_task(payload): | 956 def task_handle_pubsub_task(payload): |
| 940 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" | 957 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" |
| 941 # Do not catch errors to trigger task queue task retry. Errors should not | 958 # Do not catch errors to trigger task queue task retry. Errors should not |
| 942 # happen in normal case. | 959 # happen in normal case. |
| 943 _pubsub_notify( | 960 _pubsub_notify( |
| 944 payload['task_id'], payload['topic'], | 961 payload['task_id'], payload['topic'], |
| 945 payload['auth_token'], payload['userdata']) | 962 payload['auth_token'], payload['userdata']) |
| OLD | NEW |