OLD | NEW |
1 # Copyright 2017 The LUCI Authors. All rights reserved. | 1 # Copyright 2017 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """Ambient task queues generated from the actual load. | 5 """Ambient task queues generated from the actual load. |
6 | 6 |
7 This means that the task queues are deduced by the actual load, they are never | 7 This means that the task queues are deduced by the actual load, they are never |
8 explicitly defined. They 'disapear' once the load stops, that is, no task with | 8 explicitly defined. They are eventually deleted by a cron job once no incoming |
9 the exact set of dimensions is triggered anymore. | 9 task with the exact set of dimensions is triggered anymore. |
10 | 10 |
11 Used to optimize scheduling. | 11 Used to optimize scheduling. |
| 12 |
| 13 +---------+ |
| 14 |BotRoot | <bot_management.py> |
| 15 |id=bot_id| |
| 16 +---------+ |
| 17 | |
| 18 | |
| 19 +--------------------+ |
| 20 | | |
| 21 v v |
| 22 +-------------------+ +-------------------+ |
| 23 |BotTaskDimensions | ... |BotTaskDimensions | |
| 24 |id=<dimension_hash>| ... |id=<dimension_hash>| |
| 25 +-------------------+ +-------------------+ |
| 26 |
| 27 +-------Root------------+ |
| 28 |TaskDimensionsRoot | (not stored) |
| 29 |id=<pool:foo or id:foo>| |
| 30 +-----------------------+ |
| 31 | |
| 32 v |
| 33 +-------------------+ |
| 34 |TaskDimensions | |
| 35 |id=<dimension_hash>| |
| 36 +-------------------+ |
12 """ | 37 """ |
13 | 38 |
| 39 import datetime |
| 40 |
| 41 from google.appengine.ext import ndb |
| 42 |
| 43 |
| 44 # Frequency at which these entities must be refreshed. This value is a trade off |
| 45 # between constantly updating BotTaskDimensions and TaskDimensions vs keeping |
| 46 # them alive for longer than necessary, causing unnecessary queries in |
| 47 # get_queues() users. |
| 48 _ADVANCE = datetime.timedelta(hours=1) |
| 49 |
| 50 |
14 ### Models. | 51 ### Models. |
15 | 52 |
16 | 53 |
| 54 class BotTaskDimensions(ndb.Model): |
| 55 """Stores the precalculated hashes for this bot. |
| 56 |
| 57 Parent is BotRoot. |
| 58 key id is <dimensions_hash>. |
| 59 |
| 60 This hash could be conflicting different properties set, but this doesn't |
| 61 matter at this level because disambiguation is done in TaskDimensions |
| 62 entities. |
| 63 |
| 64 The number of stored entities is: |
| 65 <number of bots> x <TaskDimensions each bot support> |
| 66 |
| 67 The actual number if a direct function of the variety of the TaskDimensions. |
| 68 """ |
| 69 # Validity time, at which this entity should be considered irrelevant. |
| 70 valid_until_ts = ndb.DateTimeProperty() |
| 71 |
| 72 |
| 73 class TaskDimensionsRoot(ndb.Model): |
| 74 """Ghost root entity to group kinds of tasks to a common root. |
| 75 |
| 76 This root entity is not stored in the DB. |
| 77 |
| 78 id is either 'id:<value>' or 'pool:<value>'. For a request dimensions set that |
| 79 specifies both keys, one TaskDimensions is listed in each root. |
| 80 """ |
| 81 pass |
| 82 |
| 83 |
| 84 class TaskDimensionsSet(ndb.Model): |
| 85 """Embedded struct to store a set of dimensions. |
| 86 |
| 87 This entity is not stored, it is contained inside TaskDimensions.sets. |
| 88 """ |
| 89 # 'key:value' strings. This is stored to enable match(). This is important as |
| 90 # the dimensions_hash can be colliding! In this case, a *set* of |
| 91 # dimensions_flat can exist. |
| 92 dimensions_flat = ndb.StringProperty(repeated=True, indexed=False) |
| 93 |
| 94 def match(self, bot_dimensions): |
| 95 """Returns True if this bot can run this request dimensions set.""" |
| 96 for d in self.dimensions_flat: |
| 97 key, value = d.split(':', 1) |
| 98 if value not in bot_dimensions.get(key, []): |
| 99 return False |
| 100 return True |
| 101 |
| 102 |
| 103 class TaskDimensions(ndb.Model): |
| 104 """List dimensions for each kind of task. |
| 105 |
| 106 Parent is TaskDimensionsRoot |
| 107 key id is <dimensions_hash> |
| 108 |
| 109 A single dimensions_hash may represent multiple independent queues in a single |
| 110 root. This is because the hash is very compressed (32 bits). This is handled |
| 111 specifically here by having one set of TaskDimensionsSet per 'set'. |
| 112 |
| 113 The worst case of having hash collision is unneeded scanning for unrelated |
| 114 tasks in get_queues(). This is bad but not the end of the world. |
| 115 |
| 116 It is only a function of the number of different tasks, so it is not expected |
| 117 to be very large, only in the few hundreds. The exception is when one task per |
| 118 bot is triggered, which leads to have at <number of bots> TaskDimensions |
| 119 entities. |
| 120 """ |
| 121 # Validity time, at which this entity should be considered irrelevant. |
| 122 # Entities with valid_until_ts in the past are considered inactive and are not |
| 123 # used. valid_until_ts is set in assert_task() to "TaskRequest.expiration_ts + |
| 124 # _ADVANCE". It is updated when an assert_task() call sees that valid_until_ts |
| 125 # becomes lower than TaskRequest.expiration_ts for a later task. This enables |
| 126 # not updating the entity too frequently, at the cost of keeping a dead queue |
| 127 # "alive" for a bit longer than strictly necessary. |
| 128 valid_until_ts = ndb.DateTimeProperty() |
| 129 |
| 130 # One or multiple sets of request dimensions this dimensions_hash represents. |
| 131 sets = ndb.LocalStructuredProperty(TaskDimensionsSet, repeated=True) |
| 132 |
| 133 def confirm(self, dimensions): |
| 134 """Confirms that this instance actually stores this set.""" |
| 135 return self.confirm_flat( |
| 136 '%s:%s' % (k, v) for k, v in dimensions.iteritems()) |
| 137 |
| 138 def confirm_flat(self, dimensions_flat): |
| 139 """Confirms that this instance actually stores this set.""" |
| 140 x = frozenset(dimensions_flat) |
| 141 return any(not x.difference(s.dimensions_flat) for s in self.sets) |
| 142 |
| 143 def match(self, bot_dimensions): |
| 144 """Returns True if this bot can run one of the request dimensions sets |
| 145 represented by this dimensions_hash. |
| 146 """ |
| 147 return any(s.match(bot_dimensions) for s in self.sets) |
| 148 |
| 149 |
17 ### Private APIs. | 150 ### Private APIs. |
18 | 151 |
19 | 152 |
20 ### Public APIs. | 153 ### Public APIs. |
21 | 154 |
22 | 155 |
| 156 def hash_dimensions(dimensions): |
| 157 """Returns a 32 bits int that is a hash of the request dimensions specified. |
| 158 |
| 159 Arguments: |
| 160 dimensions: dict(str, str) |
| 161 |
| 162 The return value is guaranteed to be non-zero so it can be used as a key id in |
| 163 a ndb.Key. |
| 164 """ |
| 165 del dimensions |
| 166 |
| 167 |
23 def assert_bot(bot_dimensions): | 168 def assert_bot(bot_dimensions): |
24 """Prepares the dimensions for the queues.""" | 169 """Prepares the dimensions for the queues.""" |
25 assert len(bot_dimensions[u'id']) == 1, bot_dimensions | 170 assert len(bot_dimensions[u'id']) == 1, bot_dimensions |
26 | 171 |
27 | 172 |
28 def assert_task(request): | 173 def assert_task(request): |
29 """Prepares the dimensions for the queues. | 174 """Makes sure the TaskRequest dimensions are listed as a known queue. |
30 | 175 |
31 The generated entities are root entities. | 176 This function must be called before storing the TaskRequest in the DB. |
32 """ | 177 |
33 del request | 178 When a cache miss occurs, a task queue is triggered. |
| 179 |
| 180 Warning: the task will not be run until the task queue ran, which causes a |
| 181 user visible delay. This only occurs on new kind of requests, which is not |
| 182 that often in practice. |
| 183 """ |
| 184 assert not request.key, request.key |
| 185 |
| 186 |
| 187 def get_queues(bot_id): |
| 188 """Queries all the known task queues in parallel and yields the task in order |
| 189 of priority. |
| 190 |
| 191 The ordering is opportunistic, not strict. |
| 192 """ |
| 193 assert isinstance(bot_id, unicode), repr(bot_id) |
| 194 return [] |
| 195 |
| 196 |
| 197 def rebuild_task_cache(dimensions_hash, dimensions_flat): |
| 198 """Rebuilds the TaskDimensions cache. |
| 199 |
| 200 This code implicitly depends on bot_management.bot_event() being called for |
| 201 the bots. |
| 202 |
| 203 This function is called in two cases: |
| 204 - A new kind of dimensions never seen before |
| 205 - The TaskDimensions.valid_until_ts expired |
| 206 |
| 207 It is a cache miss, query all the bots and check for the ones which can run |
| 208 the task. |
| 209 |
| 210 Warning: There's a race condition, where the TaskDimensions query could be |
| 211 missing some instances due to eventually coherent consistency in the BotInfo |
| 212 query. This only happens when there's new request dimensions set AND a bot |
| 213 that can run this task recently showed up. |
| 214 |
| 215 Runtime expectation: the scale on the number of bots that can run the task, |
| 216 via BotInfo.dimensions_flat filtering. As there can be tens of thousands of |
| 217 bots that can run the task, this can take a long time to store all the |
| 218 entities on a new kind of request. As such, it must be called in the backend. |
| 219 """ |
| 220 del dimensions_hash |
| 221 del dimensions_flat |
| 222 |
| 223 |
| 224 def tidy_stale(): |
| 225 """Searches for all stale BotTaskDimensions and TaskDimensions and delete |
| 226 them. |
| 227 |
| 228 Their .valid_until_ts is compared to the current time and the entity is |
| 229 deleted if it's older. |
| 230 |
| 231 The number of entities processed is expected to be relatively low, in the few |
| 232 tens at most. |
| 233 """ |
| 234 pass |
OLD | NEW |