Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(349)

Side by Side Diff: appengine/swarming/server/lease_management.py

Issue 2386793002: Reimplement Machine Provider integration (Closed)
Patch Set: Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/swarming/queue.yaml ('k') | appengine/swarming/server/lease_management_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2016 The LUCI Authors. All rights reserved. 1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Lease management for machines leased from the Machine Provider. 5 """Lease management for machines leased from the Machine Provider.
6 6
7 Keeps a list of machine types which should be leased from the Machine Provider 7 Keeps a list of machine types which should be leased from the Machine Provider
8 and the list of machines of each type currently leased. 8 and the list of machines of each type currently leased.
9 9
10 Swarming integration with Machine Provider 10 Swarming integration with Machine Provider
11 ========================================== 11 ==========================================
12 12
13 handlers_backend.py contains a cron job which looks at each machine type and 13 handlers_backend.py contains a cron job which looks at each MachineType and
14 ensures the number of active lease requests is equal to the target size 14 ensures there are at least as many MachineLeases in the datastore which refer
15 specified by the machine type. 15 to that MachineType as the target_size in MachineType specifies by numbering
16 them 0 through target_size - 1 If there are MachineType entities numbered
17 target_size or greater which refer to that MachineType, those MachineLeases
18 are marked as drained.
16 19
17 A lease request is considered active as soon as it is initiated. It remains 20 Each MachineLease manages itself. A cron job in handlers_backend.py will trigger
18 active while the lease request is pending and even after it is fulfilled. A 21 self-management jobs for each entity. If there is no associated lease and the
19 lease request is considered no longer active once the request is denied or 22 MachineLease is not drained, issue a request to the Machine Provider for a
20 the machine used to fulfill the request is reclaimed. 23 matching machine. If there is an associated request, check the status of that
24 request. If it is fulfilled, ensure the existence of a BotInfo entity (see
25 server/bot_management.py) corresponding to the machine provided for the lease.
26 Include the lease ID and lease_expiration_ts as fields in the BotInfo. If it
27 is expired, clear the associated lease. If there is no associated lease and
28 the MachineLease is drained, delete the MachineLease entity.
21 29
22 This scheme ensures that we never have more machines of a given type than we 30 TODO(smut): If there is an associated request and the MachineLease is drained,
23 want 31 release the lease immediately (as long as the bot is not mid-task).
24
25 Each machine type is stored as a MachineType entity which describes the machine
26 in terms of Machine Provider Dimensions (not to be confused with Swarming's own
27 bot dimensions) as well as a target size which describes the number of machines
28 of this type we want connected at once.
29
30 handlers_backend.py calls generate_lease_requests (below) to generate a number
31 of lease requests equal to the target size minus the current number of active
32 leases, then performs the RPC to send these requests to the Machine Provider,
33 and finally it calls update_leases (below) with the results of the lease
34 request to associate the new active leases with the machine type.
35
36 In principle, the entire operation on a single machine type needs to happen
37 transactionally so we don't make duplicate requests and end up with more
38 machines than the target size wants. However, in practice we need the RPC
39 to be outside the transaction because it may be slow and it may need to be
40 retried, which could cause the entire transaction to time out without logging
41 the result of the call. Therefore the procedure is broken up into two
42 transactions with an RPC in between.
43
44 With the transaction broken into two, the choice of request ID is the only way
45 to prevents duplicate lease requests (e.g. if generate_lease_requests runs twice
46 before update_leases gets to run). Machine Provider supports idempotent RPCs
47 so long as the client-generated request ID is the same.
48
49 Therefore, we ensure generate_lease_requests always generates lease requests
50 with the same request ID until update_lease is called. This is done by keeping a
51 count of the number of requests generated so far. If the request count is n,
52 generate_lease_requests will always generate requests with IDs n + 1, n + 2, ...
53 up to n + target size - current size, where current size is the number of
54 currently active lease requests.
55
56 update_lease takes the RPC result and updates the pending request entries.
57
58 TODO(smut): Consider request count overflow.
59 """ 32 """
60 33
61 import base64 34 import base64
62 import datetime 35 import datetime
63 import json 36 import json
64 import logging 37 import logging
65 38
39 from google.appengine.api import app_identity
66 from google.appengine.ext import ndb 40 from google.appengine.ext import ndb
67 from google.appengine.ext.ndb import msgprop 41 from google.appengine.ext.ndb import msgprop
68 42
69 from components import machine_provider 43 from components import machine_provider
70 from components import pubsub 44 from components import pubsub
71 from components import utils 45 from components import utils
72 from server import bot_management 46 from server import bot_management
73 47
74 48
75 # Name of the topic the Machine Provider is authorized to publish 49 # Name of the topic the Machine Provider is authorized to publish
76 # lease information to. 50 # lease information to.
77 PUBSUB_TOPIC = 'machine-provider' 51 PUBSUB_TOPIC = 'machine-provider'
78 52
79 # Name of the pull subscription to the Machine Provider topic. 53 # Name of the pull subscription to the Machine Provider topic.
80 PUBSUB_SUBSCRIPTION = 'machine-provider' 54 PUBSUB_SUBSCRIPTION = 'machine-provider'
81 55
82 56
83 class MachineLease(ndb.Model): 57 class MachineLease(ndb.Model):
84 """A lease request for a machine from the Machine Provider. 58 """A lease request for a machine from the Machine Provider.
85 59
86 Standalone MachineLease entities should not exist in the datastore. 60 Key:
61 id: A string in the form <machine type id>-<number>.
62 kind: MachineLease. Is a root entity.
87 """ 63 """
88 # Request ID used to generate this request. 64 # Request ID used to generate this request.
89 client_request_id = ndb.StringProperty(required=True) 65 client_request_id = ndb.StringProperty(indexed=True)
66 # Whether or not this MachineLease should issue lease requests.
67 drained = ndb.BooleanProperty(indexed=True)
90 # Hostname of the machine currently allocated for this request. 68 # Hostname of the machine currently allocated for this request.
91 hostname = ndb.StringProperty() 69 hostname = ndb.StringProperty()
92 # Request hash returned by the server for the request for this machine. 70 # Duration to lease for.
93 request_hash = ndb.StringProperty() 71 lease_duration_secs = ndb.IntegerProperty(indexed=False)
94 # DateTime indicating lease expiration time. 72 # DateTime indicating lease expiration time.
95 lease_expiration_ts = ndb.DateTimeProperty() 73 lease_expiration_ts = ndb.DateTimeProperty()
74 # ndb.Key for the MachineType this MachineLease is created for.
75 machine_type = ndb.KeyProperty()
76 # machine_provider.Dimensions describing the machine.
77 mp_dimensions = msgprop.MessageProperty(
78 machine_provider.Dimensions, indexed=False)
79 # Last request number used.
80 request_count = ndb.IntegerProperty(default=0, required=True)
96 81
97 82
98 class MachineType(ndb.Model): 83 class MachineType(ndb.Model):
99 """A type of machine which should be leased from the Machine Provider. 84 """A type of machine which should be leased from the Machine Provider.
100 85
101 Key: 86 Key:
102 id: A human-readable name for this machine type. 87 id: A human-readable name for this machine type.
103 kind: MachineType. Is a root entity. 88 kind: MachineType. Is a root entity.
104 """ 89 """
105 # Current number of active leases.
106 current_size = ndb.ComputedProperty(lambda self: len(self.leases))
107 # Description of this machine type for humans. 90 # Description of this machine type for humans.
108 description = ndb.StringProperty(indexed=False) 91 description = ndb.StringProperty(indexed=False)
109 # Whether or not to attempt to lease machines of this type. 92 # Whether or not to attempt to lease machines of this type.
110 enabled = ndb.BooleanProperty(default=True) 93 enabled = ndb.BooleanProperty(default=True)
111 # Duration to lease each machine for. 94 # Duration to lease each machine for.
112 lease_duration_secs = ndb.IntegerProperty(indexed=False) 95 lease_duration_secs = ndb.IntegerProperty(indexed=False)
113 # List of active lease requests.
114 leases = ndb.LocalStructuredProperty(MachineLease, repeated=True)
115 # machine_provider.Dimensions describing the machine. 96 # machine_provider.Dimensions describing the machine.
116 mp_dimensions = msgprop.MessageProperty( 97 mp_dimensions = msgprop.MessageProperty(
117 machine_provider.Dimensions, indexed=False) 98 machine_provider.Dimensions, indexed=False)
118 # Number of bots pending deletion.
119 num_pending_deletion = ndb.ComputedProperty(
120 lambda self: len(self.pending_deletion))
121 # List of hostnames whose leases have expired and should be deleted.
122 pending_deletion = ndb.StringProperty(indexed=False, repeated=True)
123 # Last request number used.
124 request_count = ndb.IntegerProperty(default=0, required=True)
125 # Target number of machines of this type to have leased at once. 99 # Target number of machines of this type to have leased at once.
126 target_size = ndb.IntegerProperty(indexed=False, required=True) 100 target_size = ndb.IntegerProperty(indexed=False, required=True)
127 101
128 102
129 def clean_up_bots(): 103 @ndb.transactional_tasklet
130 """Cleans up expired leases.""" 104 def ensure_entity_exists(machine_type, n):
131 # Maximum number of in-flight ndb.Futures. 105 """Ensures the nth MachineLease for the given MachineType exists.
132 MAX_IN_FLIGHT = 50 106
133 107 Args:
134 bot_ids = [] 108 machine_type: MachineType entity.
135 deleted = {} 109 n: The MachineLease index.
136 for machine_type in MachineType.query(MachineType.num_pending_deletion > 0): 110 """
137 logging.info( 111 key = ndb.Key(MachineLease, '%s-%s' % (machine_type.key.id(), n))
138 'Deleting bots: %s', ', '.join(sorted(machine_type.pending_deletion))) 112 machine_lease = yield key.get_async()
139 bot_ids.extend(machine_type.pending_deletion) 113 if machine_lease:
140 deleted[machine_type.key] = machine_type.pending_deletion 114 return
141 115
142 # Generate a few asynchronous requests at a time in order to 116 yield MachineLease(
143 # prevent having too many in-flight ndb.Futures at a time. 117 key=key,
118 lease_duration_secs=machine_type.lease_duration_secs,
119 machine_type=machine_type.key,
120 mp_dimensions=machine_type.mp_dimensions,
121 ).put_async()
122
123
124 def ensure_entities_exist(max_concurrent=50):
125 """Ensures MachineLeases exist for each MachineType.
126
127 Args:
128 max_concurrent: Maximum number of concurrent asynchronous requests.
129 """
130 # Generate a few asynchronous requests at a time in order to prevent having
131 # too many in flight at a time.
144 futures = [] 132 futures = []
145 while bot_ids: 133
146 num_futures = len(futures) 134 for machine_type in MachineType.query(MachineType.enabled == True):
147 if num_futures < MAX_IN_FLIGHT: 135 cursor = 0
148 keys = [bot_management.get_info_key(bot_id) 136 while cursor < machine_type.target_size:
149 for bot_id in bot_ids[:MAX_IN_FLIGHT - num_futures]] 137 while len(futures) < max_concurrent and cursor < machine_type.target_size:
150 bot_ids = bot_ids[MAX_IN_FLIGHT - num_futures:] 138 futures.append(ensure_entity_exists(machine_type, cursor))
151 futures.extend(ndb.delete_multi_async(keys)) 139 cursor += 1
152 140 ndb.Future.wait_any(futures)
Vadim Sh. 2016/10/03 20:26:31 this doesn't check status code (e.g. if transactio
smut 2016/10/03 21:18:07 Done.
153 ndb.Future.wait_any(futures) 141 futures = [future for future in futures if not future.done()]
154 futures = [future for future in futures if not future.done()]
155 142
156 if futures: 143 if futures:
157 ndb.Future.wait_all(futures) 144 ndb.Future.wait_all(futures)
158 145
159 # There should be relatively few MachineType entitites, so 146
160 # just process them sequentially. 147 def drain_excess(max_concurrent=50):
161 # TODO(smut): Parallelize this. 148 """Marks MachineLeases beyond what is needed by their MachineType as drained.
162 for machine_key, hostnames in deleted.iteritems(): 149
163 successfully_deleted = [] 150 Args:
164 for hostname in hostnames: 151 max_concurrent: Maximum number of concurrent asynchronous requests.
165 if bot_management.get_info_key(hostname).get(): 152 """
166 logging.error('Failed to delete BotInfo: %s', hostname) 153 futures = []
167 else: 154
168 successfully_deleted.append(hostname) 155 for machine_type in MachineType.query():
169 logging.info('Deleted bots: %s', ', '.join(sorted(successfully_deleted))) 156 for machine_lease in MachineLease.query(
Vadim Sh. 2016/10/03 20:26:32 I think it will be more efficient to do a single M
smut 2016/10/03 21:18:08 There's only one MachineType right now. Didn't do
170 _clear_bots_pending_deletion(machine_key, hostnames) 157 MachineLease.machine_type == machine_type.key,
171 158 ):
172 159 try:
173 @ndb.transactional 160 index = int(machine_lease.key.id().rsplit('-', 1)[-1])
174 def _clear_bots_pending_deletion(machine_type_key, hostnames): 161 except ValueError:
175 """Clears the list of bots pending deletion. 162 logging.error(
176 163 'MachineLease index could not be deciphered: %s', machine_lease.key)
177 Args: 164 continue
178 machine_type_key: ndb.Key for a MachineType instance. 165 # Drain MachineLeases where the MachineType is not enabled or the index
179 hostnames: List of bots pending deletion. 166 # exceeds the target_size given by the MachineType. Since MachineLeases
180 """ 167 # are created in contiguous blocks, only indices 0 through target_size - 1
181 machine_type = machine_type_key.get() 168 # should exist.
182 if not machine_type: 169 if not machine_type.enabled or index >= machine_type.target_size:
183 logging.error('MachineType no longer exists: %s', machine_type_key.id()) 170 if len(futures) == max_concurrent:
184 return 171 ndb.Future.wait_any(futures)
185 172 futures = [future for future in futures if not future.done()]
186 num_pending_deletion = len(machine_type.pending_deletion) 173 machine_lease.drained = True
187 machine_type.pending_deletion = [ 174 futures.append(machine_lease.put_async())
188 host for host in machine_type.pending_deletion if host not in hostnames] 175
189 if len(machine_type.pending_deletion) != num_pending_deletion: 176 if futures:
190 machine_type.put() 177 ndb.Future.wait_all(futures)
191 178
192 179
193 @ndb.transactional 180 def schedule_lease_management():
194 def generate_lease_requests(machine_type_key, app_id, swarming_server): 181 """Schedules task queues to process each MachineLease."""
195 """Generates lease requests. 182 for machine_lease in MachineLease.query():
196 183 # TODO(smut): Remove this check once migrated to the new format.
197 The list includes new requests to lease machines up to the targeted 184 if machine_lease.machine_type:
198 size for the given machine type as well as requests to get the status 185 if not utils.enqueue_task(
199 of pending lease requests. 186 '/internal/taskqueue/machine-provider-manage',
200 187 'machine-provider-manage',
201 Args: 188 params={
202 machine_type_key: ndb.Key for a MachineType instance. 189 'key': machine_lease.key.urlsafe(),
203 app_id: ID of the application the requests originate from. 190 },
204 swarming_server: URL for the Swarming server to connect to. 191 ):
205 192 logging.warning(
206 Returns: 193 'Failed to enqueue task for MachineLease: %s', machine_lease.key)
207 A list of lease requests. 194
208 """ 195
209 machine_type = machine_type_key.get() 196 @ndb.transactional
210 if not machine_type: 197 def clear_lease_request(key, request_id):
211 logging.warning('MachineType no longer exists: %s', machine_type_key.id()) 198 """Clears information about given lease request.
212 return [] 199
213 200 Args:
214 expired_requests = _clean_up_expired_leases(machine_type) 201 request_id: ID of the request to clear.
215 lease_requests = _generate_lease_request_status_updates( 202 """
216 machine_type, app_id, swarming_server) 203 machine_lease = key.get()
217 204 if not machine_lease:
218 if not machine_type.enabled: 205 logging.error('MachineLease does not exist: %s', key)
219 logging.warning('MachineType is not enabled: %s\n', machine_type.key.id()) 206 return
220 return lease_requests 207
221 if machine_type.current_size >= machine_type.target_size: 208 if not machine_lease.client_request_id:
222 logging.info( 209 return
223 'MachineType %s is at capacity: %d/%d', 210
224 machine_type.key.id(), 211 if request_id != machine_lease.client_request_id:
225 machine_type.current_size, 212 # Already cleared and incremented?
226 machine_type.target_size, 213 logging.warning(
214 'Request ID mismatch for MachineLease: %s\nExpected: %s\nActual: %s',
215 key,
216 request_id,
217 machine_lease.client_request_id,
227 ) 218 )
228 return lease_requests 219 return
229 220
230 new_requests = _generate_lease_requests_for_new_machines( 221 machine_lease.client_request_id = None
231 machine_type, app_id, swarming_server) 222 machine_lease.hostname = None
232 223 machine_lease.lease_expiration_ts = None
233 if new_requests or expired_requests: 224 machine_lease.put()
234 machine_type.put() 225
235 lease_requests.extend(new_requests) 226
236 227 @ndb.transactional
237 return lease_requests 228 def log_lease_fulfillment(key, request_id, hostname, lease_expiration_ts):
238 229 """Logs lease fulfillment.
239 230
240 def _clean_up_expired_leases(machine_type): 231 Args:
241 """Cleans up expired leases. 232 request_id: ID of the request being fulfilled.
242 233 hostname: Hostname of the machine fulfilling the request.
243 Prunes expired leases from machine_type.leases, 234 lease_expiration_ts: UTC seconds since epoch when the lease expires.
244 but does not write the result to the datastore. 235 """
245 236 machine_lease = key.get()
246 Args: 237 if not machine_lease:
247 machine_type: MachineType instance. 238 logging.error('MachineLease does not exist: %s', key)
248 239 return
249 Returns: 240
250 A list of leases that were removed. 241 if request_id != machine_lease.client_request_id:
251 """ 242 logging.error(
252 active = [] 243 'Request ID mismatch for MachineLease: %s\nExpected: %s\nActual: %s',
253 expired = [] 244 machine_lease.client_request_id,
254 245 request_id,
255 for request in machine_type.leases: 246 )
256 if request.hostname and request.lease_expiration_ts <= utils.utcnow(): 247 return
257 logging.warning( 248
258 'Request ID %s expired:\nHostname: %s\nExpiration: %s', 249 if (hostname == machine_lease.hostname
259 request.client_request_id, 250 and lease_expiration_ts == machine_lease.lease_expiration_ts):
260 request.hostname, 251 return
261 request.lease_expiration_ts, 252
262 ) 253 machine_lease.hostname = hostname
263 expired.append(request.hostname) 254 machine_lease.lease_expiration_ts = datetime.datetime.utcfromtimestamp(
264 else: 255 lease_expiration_ts)
265 active.append(request) 256 machine_lease.put()
266 257
267 machine_type.leases = active 258
268 machine_type.pending_deletion.extend(expired) 259 @ndb.transactional
269 return expired 260 def increment_lease_request_id(key):
Vadim Sh. 2016/10/03 20:26:32 nit: update_client_request_id It takes an entity
smut 2016/10/03 21:18:08 Done.
270 261 """Increments the client request ID used to lease a machine.
271 262
272 def _generate_lease_request_status_updates( 263 Args:
273 machine_type, app_id, swarming_server): 264 key: ndb.Key for a MachineLease entity.
274 """Generates status update requests for pending lease requests. 265 """
275 266 machine_lease = key.get()
276 Args: 267 if not machine_lease:
277 machine_type: MachineType instance. 268 logging.error('MachineLease does not exist: %s', key)
278 app_id: ID of the application the requests originate from. 269 return
279 swarming_server: URL for the Swarming server to connect to. 270
280 271 if machine_lease.drained:
281 Returns: 272 logging.info('MachineLease is drained: %s', key)
282 A list of lease requests. 273 return
283 """ 274
284 lease_requests = [] 275 if machine_lease.client_request_id:
285 for request in machine_type.leases: 276 return
286 if not request.hostname: 277
287 # We don't know the hostname yet, meaning this request is still pending. 278 machine_lease.request_count += 1
288 lease_requests.append(machine_provider.LeaseRequest( 279 machine_lease.client_request_id = '%s-%s-%s' % (
289 dimensions=machine_type.mp_dimensions, 280 machine_lease.machine_type.id(), key.id(), machine_lease.request_count)
290 duration=machine_type.lease_duration_secs, 281 machine_lease.put()
291 on_lease=machine_provider.Instruction( 282
292 swarming_server=swarming_server), 283
293 pubsub_project=app_id, 284 @ndb.transactional
294 pubsub_topic=PUBSUB_TOPIC, 285 def delete_machine_lease(key):
295 request_id=request.client_request_id, 286 """Deletes the given MachineLease if it is drained and has no active lease.
296 )) 287
297 return lease_requests 288 Args:
298 289 key: ndb.Key for a MachineLease entity.
299 290 """
300 def _generate_lease_requests_for_new_machines( 291 machine_lease = key.get()
301 machine_type, app_id, swarming_server): 292 if not machine_lease:
302 """Generates requests to lease machines up to the target. 293 return
303 294
304 Extends machine_type.leases by the number of new lease requests generated, 295 if not machine_lease.drained:
305 but does not write the result to the datastore. 296 logging.warning('MachineLease not drained: %s', key)
306 297 return
307 Args: 298
308 machine_type: MachineType instance. 299 if machine_lease.client_request_id:
Vadim Sh. 2016/10/03 20:26:31 is this possible if drained == True?
smut 2016/10/03 21:18:07 Yes, there's the case where the MachineLease alrea
309 app_id: ID of the application the requests originate from. 300 return
310 swarming_server: URL for the Swarming server to connect to. 301
311 302 key.delete()
312 Returns: 303
313 A list of lease requests. 304
314 """ 305 def manage_lease(key):
315 lease_requests = [] 306 """Manages a MachineLease.
316 request_number = machine_type.request_count 307
317 for _ in xrange(machine_type.target_size - machine_type.current_size): 308 Args:
318 request_number += 1 309 key: ndb.Key for a MachineLease entity.
319 request_id = '%s-%d' % (machine_type.key.id(), request_number) 310 """
320 lease_requests.append(machine_provider.LeaseRequest( 311 machine_lease = key.get()
321 dimensions=machine_type.mp_dimensions, 312 if not machine_lease:
322 duration=machine_type.lease_duration_secs, 313 return
323 on_lease=machine_provider.Instruction(swarming_server=swarming_server), 314
324 pubsub_project=app_id, 315 # Manage a leased machine.
325 pubsub_topic=PUBSUB_TOPIC, 316 if machine_lease.lease_expiration_ts:
326 request_id=request_id, 317 if machine_lease.lease_expiration_ts <= utils.utcnow():
327 )) 318 logging.info('MachineLease expired: %s', key)
328 machine_type.leases.append(MachineLease(client_request_id=request_id)) 319 assert machine_lease.hostname, key
329 machine_type.request_count = request_number 320 clear_lease_request(key, machine_lease.client_request_id)
330 return lease_requests 321 return
331 322
332 323 # Lease expiration time is unknown, so there must be no leased machine.
333 @ndb.transactional 324 assert not machine_lease.hostname, key
334 def update_leases(machine_type_key, responses): 325
335 """Updates the states of leases of the given machine types. 326 # Manage a pending lease request.
336 327 if machine_lease.client_request_id:
Vadim Sh. 2016/10/03 20:26:31 can the body of this 'if' be converted into a sepa
smut 2016/10/03 21:18:07 Done.
337 Args: 328 response = machine_provider.lease_machine(
338 machine_type_key: ndb.Key for a MachineType instance. 329 machine_provider.LeaseRequest(
339 responses: machine_provider.BatchedLeaseResponse instance. 330 dimensions=machine_lease.mp_dimensions,
340 """ 331 duration=machine_lease.lease_duration_secs,
Vadim Sh. 2016/10/03 20:26:32 I really really wish we can randomize this a bit t
smut 2016/10/03 21:18:07 Your suggestion of varying with +/- 10% seems reas
Vadim Sh. 2016/10/03 21:34:18 Instead of random() you can use some hash-like fun
smut 2016/10/03 21:41:50 random.seed(machine_lease.client_request_id) rando
Vadim Sh. 2016/10/03 21:46:43 Sort of, except random.seed will modify global see
341 machine_type = machine_type_key.get() 332 on_lease=machine_provider.Instruction(
342 if not machine_type: 333 swarming_server='https://%s' % (
343 logging.warning('MachineType no longer exists: %s', machine_type_key) 334 app_identity.get_default_version_hostname())),
344 return 335 request_id=machine_lease.client_request_id,
345 336 ),
346 lease_request_map = { 337 )
347 request.client_request_id: request for request in machine_type.leases
348 }
349 for response in responses:
350 request_id = response['client_request_id']
351 request = lease_request_map.get(request_id)
352 if not request:
353 logging.error('Unknown request ID: %s', request_id)
354 continue
355 338
356 if response.get('error'): 339 if response.get('error'):
357 error = machine_provider.LeaseRequestError.lookup_by_name( 340 error = machine_provider.LeaseRequestError.lookup_by_name(
358 response['error']) 341 response['error'])
359 if error in ( 342 if error in (
360 machine_provider.LeaseRequestError.DEADLINE_EXCEEDED, 343 machine_provider.LeaseRequestError.DEADLINE_EXCEEDED,
361 machine_provider.LeaseRequestError.TRANSIENT_ERROR, 344 machine_provider.LeaseRequestError.TRANSIENT_ERROR,
362 ): 345 ):
363 # Retryable errors. Just try again later.
364 logging.warning( 346 logging.warning(
365 'Request not processed, trying again later: %s', request_id) 347 'Transient failure: %s\nRequest ID: %s\nError: %s',
348 key,
349 request_id,
350 response['error'],
351 )
366 else: 352 else:
367 # TODO(smut): Handle specific errors. 353 logging.error(
368 logging.warning( 354 'Lease request failed: %s\nRequest ID: %s\nError: %s',
Vadim Sh. 2016/10/03 20:26:32 nit: "Lease request failed\n%s...." it will make
smut 2016/10/03 21:18:07 Done.
369 'Error %s for request ID %s', 355 key,
356 request_id,
370 response['error'], 357 response['error'],
371 request_id,
372 ) 358 )
373 lease_request_map.pop(request_id) 359 clear_lease_request(key, machine_lease.client_request_id)
374 else: 360 return
375 request.request_hash = response['request_hash'] 361
376 state = machine_provider.LeaseRequestState.lookup_by_name( 362 state = machine_provider.LeaseRequestState.lookup_by_name(response['state'])
377 response['state']) 363 if state == machine_provider.LeaseRequestState.FULFILLED:
378 if state == machine_provider.LeaseRequestState.DENIED: 364 if not response.get('hostname'):
379 logging.warning('Request ID denied: %s', request_id) 365 # Lease has already expired. This shouldn't happen, but it indicates the
380 lease_request_map.pop(request_id) 366 # lease expired faster than we could tell it even got fulfilled.
381 elif state == machine_provider.LeaseRequestState.FULFILLED: 367 logging.error(
382 if response.get('hostname'): 368 'Request expired: %s\nRequest ID:%s\nExpired: %s',
Vadim Sh. 2016/10/03 20:26:31 same here
smut 2016/10/03 21:18:07 Done.
383 logging.info( 369 key,
384 'Request ID %s fulfilled:\nHostname: %s\nExpiration: %s', 370 machine_lease.client_request_id,
385 request_id, 371 response['lease_expiration_ts'],
386 response['hostname'], 372 )
387 response['lease_expiration_ts'], 373 clear_lease_request(key, machine_lease.client_request_id)
388 )
389 request.hostname = response['hostname']
390 request.lease_expiration_ts = datetime.datetime.utcfromtimestamp(
391 int(response['lease_expiration_ts']))
392 else:
393 # Lease expired. This shouldn't happen, because it means we had a
394 # pending request which was fulfilled and expired before we were
395 # able to check its status.
396 logging.warning('Request ID fulfilled and expired: %s', request_id)
397 lease_request_map.pop(request_id)
398 else: 374 else:
399 # Lease request isn't processed yet. Just try again later.
400 logging.info( 375 logging.info(
401 'Request ID %s in state: %s', request_id, response['state']) 376 'Request fulfilled: %s\nRequest ID: %s\nHostname: %s\nExpires: %s',
377 key,
378 machine_lease.client_request_id,
379 response['hostname'],
380 response['lease_expiration_ts'],
381 )
382 log_lease_fulfillment(
383 key,
384 machine_lease.client_request_id,
385 response['hostname'],
386 int(response['lease_expiration_ts']),
387 )
388 # TODO(smut): Create BotInfo, associate lease information.
Vadim Sh. 2016/10/03 20:26:32 yep, in the same transaction
smut 2016/10/03 21:18:07 xg in log_lease_fulfillment?
Vadim Sh. 2016/10/03 21:34:18 Yes. Assuming 'manage_lease' is idempotent and wil
smut 2016/10/03 21:41:50 Yes should be idempotent.
389 elif state == machine_provider.LeaseRequestState.DENIED:
390 logging.warning(
391 'Request denied: %s\nRequest ID: %s',
392 key,
393 machine_lease.client_request_id,
394 )
395 clear_lease_request(key, machine_lease.client_request_id)
396 return
402 397
403 machine_type.leases = sorted( 398 # Manage an uninitiated lease request.
404 lease_request_map.values(), key=lambda lease: lease.client_request_id) 399 if not machine_lease.drained:
405 machine_type.put() 400 increment_lease_request_id(key)
406 401
407 402 # Manage an uninitiated, drained lease request.
408 @ndb.tasklet 403 delete_machine_lease(key)
Vadim Sh. 2016/10/03 20:26:32 shouldn't it be in 'else' section of 'if not machi
smut 2016/10/03 21:18:07 Er, yes. I just added return above under the if so
409 def process_message(message, subscription):
410 """Processes a Pub/Sub message from the Machine Provider.
411
412 Args:
413 message: A message dict as returned by pubsub.pull.
414 subscription: Full name of the subscription the message was received on.
415 """
416 now = utils.utcnow()
417
418 try:
419 data = base64.b64decode(message.get('message', {}).get('data', ''))
420 lease_response = json.loads(data)
421 # Machine Provider sends a response including lease_expiration_ts. If
422 # lease_expiration_ts is not in the future and there is no more hostname
423 # associated with the response then this is a reclamation message.
424 lease_expiration_ts = datetime.datetime.utcfromtimestamp(
425 int(lease_response['lease_expiration_ts']))
426 if lease_expiration_ts <= now and not lease_response.get('hostname'):
427 # We used request IDs of the form MachineType.key.id()-<integer>.
428 # Extract the ID for the MachineType from the request ID.
429 machine_type_id = lease_response['client_request_id'].rsplit('-', 1)[0]
430 logging.info('Lease ID %s is expired\nMachineType: %s',
431 lease_response['client_request_id'],
432 MachineType.get_by_id(machine_type_id))
433 except (TypeError, ValueError):
434 logging.error('Received unexpected Pub/Sub message:\n%s',
435 json.dumps(message, indent=2))
436
437 yield pubsub.ack_async(subscription, message['ackId'])
438
439
440 def process_pubsub(app_id):
441 """Processes Pub/Sub messages from the Machine Provider, if there are any.
442
443 Args:
444 app_id: ID of the application where the Pub/Sub subscription exists.
445 """
446 MAX_IN_FLIGHT = 50
447 subscription = pubsub.full_subscription_name(app_id, PUBSUB_SUBSCRIPTION)
448 response = pubsub.pull(subscription)
449 logging.info('%s', response)
450
451 futures = []
452 messages = response.get('receivedMessages', [])
453 while messages:
454 num_futures = len(futures)
455 if num_futures < MAX_IN_FLIGHT:
456 futures.extend(
457 process_message(message, subscription)
458 for message in messages[:MAX_IN_FLIGHT - num_futures])
459 messages = messages[MAX_IN_FLIGHT - num_futures:]
460 ndb.Future.wait_any(futures)
461 futures = [future for future in futures if not future.done()]
462 if futures:
463 ndb.Future.wait_all(futures)
OLDNEW
« no previous file with comments | « appengine/swarming/queue.yaml ('k') | appengine/swarming/server/lease_management_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698