Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(222)

Side by Side Diff: appengine/swarming/server/task_queues.py

Issue 2832203002: task_queues: Add more scaffolding (Closed)
Patch Set: Addressed comments Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2017 The LUCI Authors. All rights reserved. 1 # Copyright 2017 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Ambient task queues generated from the actual load. 5 """Ambient task queues generated from the actual load.
6 6
7 This means that the task queues are deduced by the actual load, they are never 7 This means that the task queues are deduced by the actual load, they are never
8 explicitly defined. They 'disapear' once the load stops, that is, no task with 8 explicitly defined. They are eventually deleted by a cron job once no incoming
9 the exact set of dimensions is triggered anymore. 9 task with the exact set of dimensions is triggered anymore.
10 10
11 Used to optimize scheduling. 11 Used to optimize scheduling.
12
13 +---------+
14 |BotRoot | <bot_management.py>
15 |id=bot_id|
16 +---------+
17 |
18 |
19 +--------------------+
20 | |
21 v v
22 +-------------------+ +-------------------+
23 |BotTaskDimensions | ... |BotTaskDimensions |
24 |id=<dimension_hash>| ... |id=<dimension_hash>|
25 +-------------------+ +-------------------+
26
27 +-------Root------------+
28 |TaskDimensionsRoot | (not stored)
29 |id=<pool:foo or id:foo>|
30 +-----------------------+
31 |
32 v
33 +-------------------+
34 |TaskDimensions |
35 |id=<dimension_hash>|
36 +-------------------+
12 """ 37 """
13 38
39 import datetime
40
41 from google.appengine.ext import ndb
42
43
44 # Frequency at which these entities must be refreshed. This value is a trade off
45 # between constantly updating BotTaskDimensions and TaskDimensions vs keeping
46 # them alive for longer than necessary, causing unnecessary queries in
47 # get_queues() users.
48 _ADVANCE = datetime.timedelta(hours=1)
49
50
14 ### Models. 51 ### Models.
15 52
16 53
54 class BotTaskDimensions(ndb.Model):
55 """Stores the precalculated hashes for this bot.
56
57 Parent is BotRoot.
58 key id is <dimensions_hash>.
59
60 This hash could be conflicting different properties set, but this doesn't
61 matter at this level because disambiguation is done in TaskDimensions
62 entities.
63
64 The number of stored entities is:
65 <number of bots> x <TaskDimensions each bot support>
66
67 The actual number if a direct function of the variety of the TaskDimensions.
68 """
69 # Validity time, at which this entity should be considered irrelevant.
70 valid_until_ts = ndb.DateTimeProperty()
71
72
73 class TaskDimensionsRoot(ndb.Model):
74 """Ghost root entity to group kinds of tasks to a common root.
75
76 This root entity is not stored in the DB.
77
78 id is either 'id:<value>' or 'pool:<value>'. For a request dimensions set that
79 specifies both keys, one TaskDimensions is listed in each root.
80 """
81 pass
82
83
84 class TaskDimensionsSet(ndb.Model):
85 """Embedded struct to store a set of dimensions.
86
87 This entity is not stored, it is contained inside TaskDimensions.sets.
88 """
89 # 'key:value' strings. This is stored to enable match(). This is important as
90 # the dimensions_hash can be colliding! In this case, a *set* of
91 # dimensions_flat can exist.
92 dimensions_flat = ndb.StringProperty(repeated=True, indexed=False)
93
94 def match(self, bot_dimensions):
95 """Returns True if this bot can run this request dimensions set."""
96 for d in self.dimensions_flat:
97 key, value = d.split(':', 1)
98 if value not in bot_dimensions.get(key, []):
99 return False
100 return True
101
102
103 class TaskDimensions(ndb.Model):
104 """List dimensions for each kind of task.
105
106 Parent is TaskDimensionsRoot
107 key id is <dimensions_hash>
108
109 A single dimensions_hash may represent multiple independent queues in a single
110 root. This is because the hash is very compressed (32 bits). This is handled
111 specifically here by having one set of TaskDimensionsSet per 'set'.
112
113 The worst case of having hash collision is unneeded scanning for unrelated
114 tasks in get_queues(). This is bad but not the end of the world.
115
116 It is only a function of the number of different tasks, so it is not expected
117 to be very large, only in the few hundreds. The exception is when one task per
118 bot is triggered, which leads to have at <number of bots> TaskDimensions
119 entities.
120 """
121 # Validity time, at which this entity should be considered irrelevant.
122 # Entities with valid_until_ts in the past are considered inactive and are not
123 # used. valid_until_ts is set in assert_task() to "TaskRequest.expiration_ts +
124 # _ADVANCE". It is updated when an assert_task() call sees that valid_until_ts
125 # becomes lower than TaskRequest.expiration_ts for a later task. This enables
126 # not updating the entity too frequently, at the cost of keeping a dead queue
127 # "alive" for a bit longer than strictly necessary.
128 valid_until_ts = ndb.DateTimeProperty()
129
130 # One or multiple sets of request dimensions this dimensions_hash represents.
131 sets = ndb.LocalStructuredProperty(TaskDimensionsSet, repeated=True)
132
133 def confirm(self, dimensions):
134 """Confirms that this instance actually stores this set."""
135 return self.confirm_flat(
136 '%s:%s' % (k, v) for k, v in dimensions.iteritems())
137
138 def confirm_flat(self, dimensions_flat):
139 """Confirms that this instance actually stores this set."""
140 x = frozenset(dimensions_flat)
141 return any(not x.difference(s.dimensions_flat) for s in self.sets)
142
143 def match(self, bot_dimensions):
144 """Returns True if this bot can run one of the request dimensions sets
145 represented by this dimensions_hash.
146 """
147 return any(s.match(bot_dimensions) for s in self.sets)
148
149
17 ### Private APIs. 150 ### Private APIs.
18 151
19 152
20 ### Public APIs. 153 ### Public APIs.
21 154
22 155
156 def hash_dimensions(dimensions):
157 """Returns a 32 bits int that is a hash of the request dimensions specified.
158
159 Arguments:
160 dimensions: dict(str, str)
161
162 The return value is guaranteed to be non-zero so it can be used as a key id in
163 a ndb.Key.
164 """
165 del dimensions
166
167
23 def assert_bot(bot_dimensions): 168 def assert_bot(bot_dimensions):
24 """Prepares the dimensions for the queues.""" 169 """Prepares the dimensions for the queues."""
25 assert len(bot_dimensions[u'id']) == 1, bot_dimensions 170 assert len(bot_dimensions[u'id']) == 1, bot_dimensions
26 171
27 172
28 def assert_task(request): 173 def assert_task(request):
29 """Prepares the dimensions for the queues. 174 """Makes sure the TaskRequest dimensions are listed as a known queue.
30 175
31 The generated entities are root entities. 176 This function must be called before storing the TaskRequest in the DB.
32 """ 177
33 del request 178 When a cache miss occurs, a task queue is triggered.
179
180 Warning: the task will not be run until the task queue ran, which causes a
181 user visible delay. This only occurs on new kind of requests, which is not
182 that often in practice.
183 """
184 assert not request.key, request.key
185
186
187 def get_queues(bot_id):
188 """Queries all the known task queues in parallel and yields the task in order
189 of priority.
190
191 The ordering is opportunistic, not strict.
192 """
193 assert isinstance(bot_id, unicode), repr(bot_id)
194 return []
195
196
197 def rebuild_task_cache(dimensions_hash, dimensions_flat):
198 """Rebuilds the TaskDimensions cache.
199
200 This code implicitly depends on bot_management.bot_event() being called for
201 the bots.
202
203 This function is called in two cases:
204 - A new kind of dimensions never seen before
205 - The TaskDimensions.valid_until_ts expired
206
207 It is a cache miss, query all the bots and check for the ones which can run
208 the task.
209
210 Warning: There's a race condition, where the TaskDimensions query could be
211 missing some instances due to eventually coherent consistency in the BotInfo
212 query. This only happens when there's new request dimensions set AND a bot
213 that can run this task recently showed up.
214
215 Runtime expectation: the scale on the number of bots that can run the task,
216 via BotInfo.dimensions_flat filtering. As there can be tens of thousands of
217 bots that can run the task, this can take a long time to store all the
218 entities on a new kind of request. As such, it must be called in the backend.
219 """
220 del dimensions_hash
221 del dimensions_flat
222
223
224 def tidy_stale():
225 """Searches for all stale BotTaskDimensions and TaskDimensions and delete
226 them.
227
228 Their .valid_until_ts is compared to the current time and the entity is
229 deleted if it's older.
230
231 The number of entities processed is expected to be relatively low, in the few
232 tens at most.
233 """
234 pass
OLDNEW
« no previous file with comments | « appengine/swarming/server/bot_management.py ('k') | appengine/swarming/server/task_to_run_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698