| OLD | NEW |
| 1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Task execution result models. | 5 """Task execution result models. |
| 6 | 6 |
| 7 This module doesn't do the scheduling itself. It only describes the entities to | 7 This module doesn't do the scheduling itself. It only describes the entities to |
| 8 store tasks results. | 8 store tasks results. |
| 9 | 9 |
| 10 - TaskResultSummary represents the overall result for the TaskRequest taking in | 10 - TaskResultSummary represents the overall result for the TaskRequest taking in |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 56 |TaskOutputChunk| |TaskOutputChunk| ... | 56 |TaskOutputChunk| |TaskOutputChunk| ... |
| 57 |id=1 | |id=2 | | 57 |id=1 | |id=2 | |
| 58 +---------------+ +---------------+ | 58 +---------------+ +---------------+ |
| 59 """ | 59 """ |
| 60 | 60 |
| 61 import datetime | 61 import datetime |
| 62 import logging | 62 import logging |
| 63 import random | 63 import random |
| 64 | 64 |
| 65 from google.appengine.api import datastore_errors | 65 from google.appengine.api import datastore_errors |
| 66 from google.appengine.api import search | |
| 67 from google.appengine.datastore import datastore_query | 66 from google.appengine.datastore import datastore_query |
| 68 from google.appengine.ext import ndb | 67 from google.appengine.ext import ndb |
| 69 | 68 |
| 70 from components import datastore_utils | 69 from components import datastore_utils |
| 71 from components import utils | 70 from components import utils |
| 72 from server import large | 71 from server import large |
| 73 from server import task_pack | 72 from server import task_pack |
| 74 from server import task_request | 73 from server import task_request |
| 75 | 74 |
| 76 # Amount of time after which a bot is considered dead. In short, if a bot has | 75 # Amount of time after which a bot is considered dead. In short, if a bot has |
| (...skipping 963 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1040 | 1039 |
| 1041 if state == 'bot_died': | 1040 if state == 'bot_died': |
| 1042 return query.filter(cls.state == State.BOT_DIED) | 1041 return query.filter(cls.state == State.BOT_DIED) |
| 1043 | 1042 |
| 1044 if state == 'canceled': | 1043 if state == 'canceled': |
| 1045 return query.filter(cls.state == State.CANCELED) | 1044 return query.filter(cls.state == State.CANCELED) |
| 1046 | 1045 |
| 1047 raise ValueError('Invalid state') | 1046 raise ValueError('Invalid state') |
| 1048 | 1047 |
| 1049 | 1048 |
| 1050 def _search_by_name(word, cursor_str, limit): | |
| 1051 """Returns TaskResultSummary in -created_ts order containing the word.""" | |
| 1052 cursor = search.Cursor(web_safe_string=cursor_str, per_result=True) | |
| 1053 index = search.Index(name='requests') | |
| 1054 | |
| 1055 def item_to_id(item): | |
| 1056 for field in item.fields: | |
| 1057 if field.name == 'id': | |
| 1058 return field.value | |
| 1059 | |
| 1060 # The code is structured to handle incomplete entities but still return | |
| 1061 # 'limit' items. This is done by fetching a few more entities than necessary, | |
| 1062 # then keeping track of the cursor per item so the right cursor can be | |
| 1063 # returned. | |
| 1064 opts = search.QueryOptions(limit=limit + 5, cursor=cursor) | |
| 1065 results = index.search(search.Query('name:%s' % word, options=opts)) | |
| 1066 result_summary_keys = [] | |
| 1067 cursors = [] | |
| 1068 for item in results.results: | |
| 1069 value = item_to_id(item) | |
| 1070 if value: | |
| 1071 result_summary_keys.append(task_pack.unpack_result_summary_key(value)) | |
| 1072 cursors.append(item.cursor) | |
| 1073 | |
| 1074 # Handle None result value. See make_request() for details about how this can | |
| 1075 # happen. | |
| 1076 tasks = [] | |
| 1077 cursor = None | |
| 1078 for task, c in zip(ndb.get_multi(result_summary_keys), cursors): | |
| 1079 if task: | |
| 1080 cursor = c | |
| 1081 tasks.append(task) | |
| 1082 if len(tasks) == limit: | |
| 1083 # Drop the rest. | |
| 1084 break | |
| 1085 else: | |
| 1086 if len(cursors) == limit + 5: | |
| 1087 while len(tasks) < limit: | |
| 1088 # Go into the slow path, seems like we got a lot of corrupted items. | |
| 1089 opts = search.QueryOptions(limit=limit-len(tasks) + 5, cursor=cursor) | |
| 1090 results = index.search(search.Query('name:%s' % word, options=opts)) | |
| 1091 if not results.results: | |
| 1092 # Nothing else. | |
| 1093 cursor = None | |
| 1094 break | |
| 1095 for item in results.results: | |
| 1096 value = item_to_id(item) | |
| 1097 if value: | |
| 1098 cursor = item.cursor | |
| 1099 task = task_pack.unpack_result_summary_key(value).get() | |
| 1100 if task: | |
| 1101 tasks.append(task) | |
| 1102 if len(tasks) == limit: | |
| 1103 break | |
| 1104 | |
| 1105 cursor_str = cursor.web_safe_string if cursor else None | |
| 1106 return tasks, cursor_str | |
| 1107 | |
| 1108 | |
| 1109 ### Public API. | 1049 ### Public API. |
| 1110 | 1050 |
| 1111 | 1051 |
| 1112 def state_to_string(state_obj): | 1052 def state_to_string(state_obj): |
| 1113 """Returns a user-readable string representing a State.""" | 1053 """Returns a user-readable string representing a State.""" |
| 1114 if state_obj.deduped_from: | 1054 if state_obj.deduped_from: |
| 1115 return 'Deduped' | 1055 return 'Deduped' |
| 1116 out = State.to_string(state_obj.state) | 1056 out = State.to_string(state_obj.state) |
| 1117 if state_obj.failure: | 1057 if state_obj.failure: |
| 1118 out += ' (failed)' | 1058 out += ' (failed)' |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1155 """Yields all the TaskRunResult ndb.Key where the bot died recently. | 1095 """Yields all the TaskRunResult ndb.Key where the bot died recently. |
| 1156 | 1096 |
| 1157 In practice it is returning a ndb.QueryIterator but this is equivalent. | 1097 In practice it is returning a ndb.QueryIterator but this is equivalent. |
| 1158 """ | 1098 """ |
| 1159 # If a bot didn't ping recently, it is considered dead. | 1099 # If a bot didn't ping recently, it is considered dead. |
| 1160 deadline = utils.utcnow() - BOT_PING_TOLERANCE | 1100 deadline = utils.utcnow() - BOT_PING_TOLERANCE |
| 1161 q = TaskRunResult.query().filter(TaskRunResult.modified_ts < deadline) | 1101 q = TaskRunResult.query().filter(TaskRunResult.modified_ts < deadline) |
| 1162 return q.filter(TaskRunResult.state == State.RUNNING).iter(keys_only=True) | 1102 return q.filter(TaskRunResult.state == State.RUNNING).iter(keys_only=True) |
| 1163 | 1103 |
| 1164 | 1104 |
| 1165 def get_tasks(limit, cursor_str, sort, state, tags, task_name): | |
| 1166 """Returns TaskResultSummary entities for this query. | |
| 1167 | |
| 1168 This function is synchronous. | |
| 1169 | |
| 1170 Arguments: | |
| 1171 limit: Maximum number of items to return. | |
| 1172 cursor_str: query-dependent string encoded cursor to continue a previous | |
| 1173 search. | |
| 1174 sort: Order to use. Must default to 'created_ts' to use the default. | |
| 1175 state: State to filter on. | |
| 1176 tags: List of search for one or multiple task tags. | |
| 1177 task_name: search for task name whole word. | |
| 1178 | |
| 1179 Returns: | |
| 1180 tuple(list of tasks, str encoded cursor, updated sort, updated state) | |
| 1181 """ | |
| 1182 if task_name: | |
| 1183 # Task name based word based search. Override the flags. | |
| 1184 sort = 'created_ts' | |
| 1185 state = 'all' | |
| 1186 tasks, cursor_str = _search_by_name(task_name, cursor_str, limit) | |
| 1187 else: | |
| 1188 # Normal listing. | |
| 1189 if tags: | |
| 1190 # Add TaskResultSummary indexes if desired. | |
| 1191 sort = 'created_ts' | |
| 1192 query = get_result_summaries_query(None, None, sort, state, tags) | |
| 1193 tasks, cursor_str = datastore_utils.fetch_page( | |
| 1194 query, limit, cursor_str) | |
| 1195 | |
| 1196 return tasks, cursor_str, sort, state | |
| 1197 | |
| 1198 | |
| 1199 def get_run_results_query(start, end, sort, state, bot_id): | 1105 def get_run_results_query(start, end, sort, state, bot_id): |
| 1200 """Returns TaskRunResult.query() with these filters. | 1106 """Returns TaskRunResult.query() with these filters. |
| 1201 | 1107 |
| 1202 Arguments: | 1108 Arguments: |
| 1203 start: Earliest creation date of retrieved tasks. | 1109 start: Earliest creation date of retrieved tasks. |
| 1204 end: Most recent creation date of retrieved tasks, normally None. | 1110 end: Most recent creation date of retrieved tasks, normally None. |
| 1205 sort: Order to use. Must default to 'created_ts' to use the default. Cannot | 1111 sort: Order to use. Must default to 'created_ts' to use the default. Cannot |
| 1206 be used along start and end. | 1112 be used along start and end. |
| 1207 state: One of State enum value as str. Use 'all' to get all tasks. | 1113 state: One of State enum value as str. Use 'all' to get all tasks. |
| 1208 bot_id: (required) bot id to filter on. | 1114 bot_id: (required) bot id to filter on. |
| (...skipping 20 matching lines...) Expand all Loading... |
| 1229 if tags: | 1135 if tags: |
| 1230 # Add TaskResultSummary indexes if desired. | 1136 # Add TaskResultSummary indexes if desired. |
| 1231 if sort != 'created_ts': | 1137 if sort != 'created_ts': |
| 1232 raise ValueError( | 1138 raise ValueError( |
| 1233 'Add needed indexes for sort:%s and tags if desired' % sort) | 1139 'Add needed indexes for sort:%s and tags if desired' % sort) |
| 1234 tags_filter = TaskResultSummary.tags == tags[0] | 1140 tags_filter = TaskResultSummary.tags == tags[0] |
| 1235 for tag in tags[1:]: | 1141 for tag in tags[1:]: |
| 1236 tags_filter = ndb.AND(tags_filter, TaskResultSummary.tags == tag) | 1142 tags_filter = ndb.AND(tags_filter, TaskResultSummary.tags == tag) |
| 1237 query = query.filter(tags_filter) | 1143 query = query.filter(tags_filter) |
| 1238 return _filter_query(TaskResultSummary, query, start, end, sort, state) | 1144 return _filter_query(TaskResultSummary, query, start, end, sort, state) |
| OLD | NEW |