Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(156)

Side by Side Diff: appengine/swarming/server/lease_management.py

Issue 2386793002: Reimplement Machine Provider integration (Closed)
Patch Set: Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/swarming/queue.yaml ('k') | appengine/swarming/server/lease_management_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2016 The LUCI Authors. All rights reserved. 1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Lease management for machines leased from the Machine Provider. 5 """Lease management for machines leased from the Machine Provider.
6 6
7 Keeps a list of machine types which should be leased from the Machine Provider 7 Keeps a list of machine types which should be leased from the Machine Provider
8 and the list of machines of each type currently leased. 8 and the list of machines of each type currently leased.
9 9
10 Swarming integration with Machine Provider 10 Swarming integration with Machine Provider
11 ========================================== 11 ==========================================
12 12
13 handlers_backend.py contains a cron job which looks at each machine type and 13 handlers_backend.py contains a cron job which looks at each MachineType and
14 ensures the number of active lease requests is equal to the target size 14 ensures there are at least as many MachineLeases in the datastore which refer
15 specified by the machine type. 15 to that MachineType as the target_size in MachineType specifies by numbering
16 them 0 through target_size - 1 If there are MachineType entities numbered
17 target_size or greater which refer to that MachineType, those MachineLeases
18 are marked as drained.
16 19
17 A lease request is considered active as soon as it is initiated. It remains 20 Each MachineLease manages itself. A cron job in handlers_backend.py will trigger
18 active while the lease request is pending and even after it is fulfilled. A 21 self-management jobs for each entity. If there is no associated lease and the
19 lease request is considered no longer active once the request is denied or 22 MachineLease is not drained, issue a request to the Machine Provider for a
20 the machine used to fulfill the request is reclaimed. 23 matching machine. If there is an associated request, check the status of that
24 request. If it is fulfilled, ensure the existence of a BotInfo entity (see
25 server/bot_management.py) corresponding to the machine provided for the lease.
26 Include the lease ID and lease_expiration_ts as fields in the BotInfo. If it
27 is expired, clear the associated lease. If there is no associated lease and
28 the MachineLease is drained, delete the MachineLease entity.
21 29
22 This scheme ensures that we never have more machines of a given type than we 30 TODO(smut): If there is an associated request and the MachineLease is drained,
23 want 31 release the lease immediately (as long as the bot is not mid-task).
24
25 Each machine type is stored as a MachineType entity which describes the machine
26 in terms of Machine Provider Dimensions (not to be confused with Swarming's own
27 bot dimensions) as well as a target size which describes the number of machines
28 of this type we want connected at once.
29
30 handlers_backend.py calls generate_lease_requests (below) to generate a number
31 of lease requests equal to the target size minus the current number of active
32 leases, then performs the RPC to send these requests to the Machine Provider,
33 and finally it calls update_leases (below) with the results of the lease
34 request to associate the new active leases with the machine type.
35
36 In principle, the entire operation on a single machine type needs to happen
37 transactionally so we don't make duplicate requests and end up with more
38 machines than the target size wants. However, in practice we need the RPC
39 to be outside the transaction because it may be slow and it may need to be
40 retried, which could cause the entire transaction to time out without logging
41 the result of the call. Therefore the procedure is broken up into two
42 transactions with an RPC in between.
43
44 With the transaction broken into two, the choice of request ID is the only way
45 to prevents duplicate lease requests (e.g. if generate_lease_requests runs twice
46 before update_leases gets to run). Machine Provider supports idempotent RPCs
47 so long as the client-generated request ID is the same.
48
49 Therefore, we ensure generate_lease_requests always generates lease requests
50 with the same request ID until update_lease is called. This is done by keeping a
51 count of the number of requests generated so far. If the request count is n,
52 generate_lease_requests will always generate requests with IDs n + 1, n + 2, ...
53 up to n + target size - current size, where current size is the number of
54 currently active lease requests.
55
56 update_lease takes the RPC result and updates the pending request entries.
57
58 TODO(smut): Consider request count overflow.
59 """ 32 """
60 33
61 import base64 34 import base64
62 import datetime 35 import datetime
63 import json 36 import json
64 import logging 37 import logging
65 38
39 from google.appengine.api import app_identity
66 from google.appengine.ext import ndb 40 from google.appengine.ext import ndb
67 from google.appengine.ext.ndb import msgprop 41 from google.appengine.ext.ndb import msgprop
68 42
69 from components import machine_provider 43 from components import machine_provider
70 from components import pubsub 44 from components import pubsub
71 from components import utils 45 from components import utils
72 from server import bot_management 46 from server import bot_management
73 47
74 48
75 # Name of the topic the Machine Provider is authorized to publish 49 # Name of the topic the Machine Provider is authorized to publish
76 # lease information to. 50 # lease information to.
77 PUBSUB_TOPIC = 'machine-provider' 51 PUBSUB_TOPIC = 'machine-provider'
78 52
79 # Name of the pull subscription to the Machine Provider topic. 53 # Name of the pull subscription to the Machine Provider topic.
80 PUBSUB_SUBSCRIPTION = 'machine-provider' 54 PUBSUB_SUBSCRIPTION = 'machine-provider'
81 55
82 56
83 class MachineLease(ndb.Model): 57 class MachineLease(ndb.Model):
84 """A lease request for a machine from the Machine Provider. 58 """A lease request for a machine from the Machine Provider.
85 59
86 Standalone MachineLease entities should not exist in the datastore. 60 Key:
61 id: A string in the form <machine type id>-<number>.
62 kind: MachineLease. Is a root entity.
87 """ 63 """
88 # Request ID used to generate this request. 64 # Request ID used to generate this request.
89 client_request_id = ndb.StringProperty(required=True) 65 client_request_id = ndb.StringProperty(indexed=True)
66 # Whether or not this MachineLease should issue lease requests.
67 drained = ndb.BooleanProperty(indexed=True)
90 # Hostname of the machine currently allocated for this request. 68 # Hostname of the machine currently allocated for this request.
91 hostname = ndb.StringProperty() 69 hostname = ndb.StringProperty()
92 # Request hash returned by the server for the request for this machine. 70 # Duration to lease for.
93 request_hash = ndb.StringProperty() 71 lease_duration_secs = ndb.IntegerProperty(indexed=False)
94 # DateTime indicating lease expiration time. 72 # DateTime indicating lease expiration time.
95 lease_expiration_ts = ndb.DateTimeProperty() 73 lease_expiration_ts = ndb.DateTimeProperty()
74 # ndb.Key for the MachineType this MachineLease is created for.
75 machine_type = ndb.KeyProperty()
76 # machine_provider.Dimensions describing the machine.
77 mp_dimensions = msgprop.MessageProperty(
78 machine_provider.Dimensions, indexed=False)
79 # Last request number used.
80 request_count = ndb.IntegerProperty(default=0, required=True)
96 81
97 82
98 class MachineType(ndb.Model): 83 class MachineType(ndb.Model):
99 """A type of machine which should be leased from the Machine Provider. 84 """A type of machine which should be leased from the Machine Provider.
100 85
101 Key: 86 Key:
102 id: A human-readable name for this machine type. 87 id: A human-readable name for this machine type.
103 kind: MachineType. Is a root entity. 88 kind: MachineType. Is a root entity.
104 """ 89 """
105 # Current number of active leases.
106 current_size = ndb.ComputedProperty(lambda self: len(self.leases))
107 # Description of this machine type for humans. 90 # Description of this machine type for humans.
108 description = ndb.StringProperty(indexed=False) 91 description = ndb.StringProperty(indexed=False)
109 # Whether or not to attempt to lease machines of this type. 92 # Whether or not to attempt to lease machines of this type.
110 enabled = ndb.BooleanProperty(default=True) 93 enabled = ndb.BooleanProperty(default=True)
111 # Duration to lease each machine for. 94 # Duration to lease each machine for.
112 lease_duration_secs = ndb.IntegerProperty(indexed=False) 95 lease_duration_secs = ndb.IntegerProperty(indexed=False)
113 # List of active lease requests.
114 leases = ndb.LocalStructuredProperty(MachineLease, repeated=True)
115 # machine_provider.Dimensions describing the machine. 96 # machine_provider.Dimensions describing the machine.
116 mp_dimensions = msgprop.MessageProperty( 97 mp_dimensions = msgprop.MessageProperty(
117 machine_provider.Dimensions, indexed=False) 98 machine_provider.Dimensions, indexed=False)
118 # Number of bots pending deletion.
119 num_pending_deletion = ndb.ComputedProperty(
120 lambda self: len(self.pending_deletion))
121 # List of hostnames whose leases have expired and should be deleted.
122 pending_deletion = ndb.StringProperty(indexed=False, repeated=True)
123 # Last request number used.
124 request_count = ndb.IntegerProperty(default=0, required=True)
125 # Target number of machines of this type to have leased at once. 99 # Target number of machines of this type to have leased at once.
126 target_size = ndb.IntegerProperty(indexed=False, required=True) 100 target_size = ndb.IntegerProperty(indexed=False, required=True)
127 101
128 102
129 def clean_up_bots(): 103 @ndb.transactional_tasklet
130 """Cleans up expired leases.""" 104 def ensure_entity_exists(machine_type, n):
131 # Maximum number of in-flight ndb.Futures. 105 """Ensures the nth MachineLease for the given MachineType exists.
132 MAX_IN_FLIGHT = 50 106
133 107 Args:
134 bot_ids = [] 108 machine_type: MachineType entity.
135 deleted = {} 109 n: The MachineLease index.
136 for machine_type in MachineType.query(MachineType.num_pending_deletion > 0): 110 """
137 logging.info( 111 key = ndb.Key(MachineLease, '%s-%s' % (machine_type.key.id(), n))
138 'Deleting bots: %s', ', '.join(sorted(machine_type.pending_deletion))) 112 machine_lease = yield key.get_async()
139 bot_ids.extend(machine_type.pending_deletion) 113 if machine_lease:
140 deleted[machine_type.key] = machine_type.pending_deletion 114 return
141 115
142 # Generate a few asynchronous requests at a time in order to 116 yield MachineLease(
143 # prevent having too many in-flight ndb.Futures at a time. 117 key=key,
118 lease_duration_secs=machine_type.lease_duration_secs,
119 machine_type=machine_type.key,
120 mp_dimensions=machine_type.mp_dimensions,
121 ).put_async()
122
123
124 def ensure_entities_exist(max_concurrent=50):
125 """Ensures MachineLeases exist for each MachineType.
126
127 Args:
128 max_concurrent: Maximum number of concurrent asynchronous requests.
129 """
130 # Generate a few asynchronous requests at a time in order to prevent having
131 # too many in flight at a time.
144 futures = [] 132 futures = []
145 while bot_ids: 133
146 num_futures = len(futures) 134 for machine_type in MachineType.query(MachineType.enabled == True):
147 if num_futures < MAX_IN_FLIGHT: 135 cursor = 0
148 keys = [bot_management.get_info_key(bot_id) 136 while cursor < machine_type.target_size:
149 for bot_id in bot_ids[:MAX_IN_FLIGHT - num_futures]] 137 while len(futures) < max_concurrent and cursor < machine_type.target_size:
150 bot_ids = bot_ids[MAX_IN_FLIGHT - num_futures:] 138 futures.append(ensure_entity_exists(machine_type, cursor))
151 futures.extend(ndb.delete_multi_async(keys)) 139 cursor += 1
152 140 ndb.Future.wait_any(futures)
153 ndb.Future.wait_any(futures) 141 # We don't bother checking success or failure. If a transient error
154 futures = [future for future in futures if not future.done()] 142 # like TransactionFailed or DeadlineExceeded is raised and an entity
143 # is not created, we will just create it the next time this is called,
144 # converging to the desired state eventually.
145 futures = [future for future in futures if not future.done()]
155 146
156 if futures: 147 if futures:
157 ndb.Future.wait_all(futures) 148 ndb.Future.wait_all(futures)
158 149
159 # There should be relatively few MachineType entitites, so 150
160 # just process them sequentially. 151 def drain_excess(max_concurrent=50):
161 # TODO(smut): Parallelize this. 152 """Marks MachineLeases beyond what is needed by their MachineType as drained.
162 for machine_key, hostnames in deleted.iteritems(): 153
163 successfully_deleted = [] 154 Args:
164 for hostname in hostnames: 155 max_concurrent: Maximum number of concurrent asynchronous requests.
165 if bot_management.get_info_key(hostname).get(): 156 """
166 logging.error('Failed to delete BotInfo: %s', hostname) 157 futures = []
167 else: 158
168 successfully_deleted.append(hostname) 159 for machine_type in MachineType.query():
169 logging.info('Deleted bots: %s', ', '.join(sorted(successfully_deleted))) 160 for machine_lease in MachineLease.query(
170 _clear_bots_pending_deletion(machine_key, hostnames) 161 MachineLease.machine_type == machine_type.key,
171 162 ):
172 163 try:
173 @ndb.transactional 164 index = int(machine_lease.key.id().rsplit('-', 1)[-1])
174 def _clear_bots_pending_deletion(machine_type_key, hostnames): 165 except ValueError:
175 """Clears the list of bots pending deletion. 166 logging.error(
176 167 'MachineLease index could not be deciphered\n Key: %s',
177 Args: 168 machine_lease.key,
178 machine_type_key: ndb.Key for a MachineType instance.
179 hostnames: List of bots pending deletion.
180 """
181 machine_type = machine_type_key.get()
182 if not machine_type:
183 logging.error('MachineType no longer exists: %s', machine_type_key.id())
184 return
185
186 num_pending_deletion = len(machine_type.pending_deletion)
187 machine_type.pending_deletion = [
188 host for host in machine_type.pending_deletion if host not in hostnames]
189 if len(machine_type.pending_deletion) != num_pending_deletion:
190 machine_type.put()
191
192
193 @ndb.transactional
194 def generate_lease_requests(machine_type_key, app_id, swarming_server):
195 """Generates lease requests.
196
197 The list includes new requests to lease machines up to the targeted
198 size for the given machine type as well as requests to get the status
199 of pending lease requests.
200
201 Args:
202 machine_type_key: ndb.Key for a MachineType instance.
203 app_id: ID of the application the requests originate from.
204 swarming_server: URL for the Swarming server to connect to.
205
206 Returns:
207 A list of lease requests.
208 """
209 machine_type = machine_type_key.get()
210 if not machine_type:
211 logging.warning('MachineType no longer exists: %s', machine_type_key.id())
212 return []
213
214 expired_requests = _clean_up_expired_leases(machine_type)
215 lease_requests = _generate_lease_request_status_updates(
216 machine_type, app_id, swarming_server)
217
218 if not machine_type.enabled:
219 logging.warning('MachineType is not enabled: %s\n', machine_type.key.id())
220 return lease_requests
221 if machine_type.current_size >= machine_type.target_size:
222 logging.info(
223 'MachineType %s is at capacity: %d/%d',
224 machine_type.key.id(),
225 machine_type.current_size,
226 machine_type.target_size,
227 )
228 return lease_requests
229
230 new_requests = _generate_lease_requests_for_new_machines(
231 machine_type, app_id, swarming_server)
232
233 if new_requests or expired_requests:
234 machine_type.put()
235 lease_requests.extend(new_requests)
236
237 return lease_requests
238
239
240 def _clean_up_expired_leases(machine_type):
241 """Cleans up expired leases.
242
243 Prunes expired leases from machine_type.leases,
244 but does not write the result to the datastore.
245
246 Args:
247 machine_type: MachineType instance.
248
249 Returns:
250 A list of leases that were removed.
251 """
252 active = []
253 expired = []
254
255 for request in machine_type.leases:
256 if request.hostname and request.lease_expiration_ts <= utils.utcnow():
257 logging.warning(
258 'Request ID %s expired:\nHostname: %s\nExpiration: %s',
259 request.client_request_id,
260 request.hostname,
261 request.lease_expiration_ts,
262 )
263 expired.append(request.hostname)
264 else:
265 active.append(request)
266
267 machine_type.leases = active
268 machine_type.pending_deletion.extend(expired)
269 return expired
270
271
272 def _generate_lease_request_status_updates(
273 machine_type, app_id, swarming_server):
274 """Generates status update requests for pending lease requests.
275
276 Args:
277 machine_type: MachineType instance.
278 app_id: ID of the application the requests originate from.
279 swarming_server: URL for the Swarming server to connect to.
280
281 Returns:
282 A list of lease requests.
283 """
284 lease_requests = []
285 for request in machine_type.leases:
286 if not request.hostname:
287 # We don't know the hostname yet, meaning this request is still pending.
288 lease_requests.append(machine_provider.LeaseRequest(
289 dimensions=machine_type.mp_dimensions,
290 duration=machine_type.lease_duration_secs,
291 on_lease=machine_provider.Instruction(
292 swarming_server=swarming_server),
293 pubsub_project=app_id,
294 pubsub_topic=PUBSUB_TOPIC,
295 request_id=request.client_request_id,
296 ))
297 return lease_requests
298
299
300 def _generate_lease_requests_for_new_machines(
301 machine_type, app_id, swarming_server):
302 """Generates requests to lease machines up to the target.
303
304 Extends machine_type.leases by the number of new lease requests generated,
305 but does not write the result to the datastore.
306
307 Args:
308 machine_type: MachineType instance.
309 app_id: ID of the application the requests originate from.
310 swarming_server: URL for the Swarming server to connect to.
311
312 Returns:
313 A list of lease requests.
314 """
315 lease_requests = []
316 request_number = machine_type.request_count
317 for _ in xrange(machine_type.target_size - machine_type.current_size):
318 request_number += 1
319 request_id = '%s-%d' % (machine_type.key.id(), request_number)
320 lease_requests.append(machine_provider.LeaseRequest(
321 dimensions=machine_type.mp_dimensions,
322 duration=machine_type.lease_duration_secs,
323 on_lease=machine_provider.Instruction(swarming_server=swarming_server),
324 pubsub_project=app_id,
325 pubsub_topic=PUBSUB_TOPIC,
326 request_id=request_id,
327 ))
328 machine_type.leases.append(MachineLease(client_request_id=request_id))
329 machine_type.request_count = request_number
330 return lease_requests
331
332
333 @ndb.transactional
334 def update_leases(machine_type_key, responses):
335 """Updates the states of leases of the given machine types.
336
337 Args:
338 machine_type_key: ndb.Key for a MachineType instance.
339 responses: machine_provider.BatchedLeaseResponse instance.
340 """
341 machine_type = machine_type_key.get()
342 if not machine_type:
343 logging.warning('MachineType no longer exists: %s', machine_type_key)
344 return
345
346 lease_request_map = {
347 request.client_request_id: request for request in machine_type.leases
348 }
349 for response in responses:
350 request_id = response['client_request_id']
351 request = lease_request_map.get(request_id)
352 if not request:
353 logging.error('Unknown request ID: %s', request_id)
354 continue
355
356 if response.get('error'):
357 error = machine_provider.LeaseRequestError.lookup_by_name(
358 response['error'])
359 if error in (
360 machine_provider.LeaseRequestError.DEADLINE_EXCEEDED,
361 machine_provider.LeaseRequestError.TRANSIENT_ERROR,
362 ):
363 # Retryable errors. Just try again later.
364 logging.warning(
365 'Request not processed, trying again later: %s', request_id)
366 else:
367 # TODO(smut): Handle specific errors.
368 logging.warning(
369 'Error %s for request ID %s',
370 response['error'],
371 request_id,
372 ) 169 )
373 lease_request_map.pop(request_id) 170 continue
374 else: 171 # Drain MachineLeases where the MachineType is not enabled or the index
375 request.request_hash = response['request_hash'] 172 # exceeds the target_size given by the MachineType. Since MachineLeases
376 state = machine_provider.LeaseRequestState.lookup_by_name( 173 # are created in contiguous blocks, only indices 0 through target_size - 1
377 response['state']) 174 # should exist.
378 if state == machine_provider.LeaseRequestState.DENIED: 175 if not machine_type.enabled or index >= machine_type.target_size:
379 logging.warning('Request ID denied: %s', request_id) 176 if len(futures) == max_concurrent:
380 lease_request_map.pop(request_id) 177 ndb.Future.wait_any(futures)
381 elif state == machine_provider.LeaseRequestState.FULFILLED: 178 futures = [future for future in futures if not future.done()]
382 if response.get('hostname'): 179 machine_lease.drained = True
383 logging.info( 180 futures.append(machine_lease.put_async())
384 'Request ID %s fulfilled:\nHostname: %s\nExpiration: %s', 181
385 request_id,
386 response['hostname'],
387 response['lease_expiration_ts'],
388 )
389 request.hostname = response['hostname']
390 request.lease_expiration_ts = datetime.datetime.utcfromtimestamp(
391 int(response['lease_expiration_ts']))
392 else:
393 # Lease expired. This shouldn't happen, because it means we had a
394 # pending request which was fulfilled and expired before we were
395 # able to check its status.
396 logging.warning('Request ID fulfilled and expired: %s', request_id)
397 lease_request_map.pop(request_id)
398 else:
399 # Lease request isn't processed yet. Just try again later.
400 logging.info(
401 'Request ID %s in state: %s', request_id, response['state'])
402
403 machine_type.leases = sorted(
404 lease_request_map.values(), key=lambda lease: lease.client_request_id)
405 machine_type.put()
406
407
408 @ndb.tasklet
409 def process_message(message, subscription):
410 """Processes a Pub/Sub message from the Machine Provider.
411
412 Args:
413 message: A message dict as returned by pubsub.pull.
414 subscription: Full name of the subscription the message was received on.
415 """
416 now = utils.utcnow()
417
418 try:
419 data = base64.b64decode(message.get('message', {}).get('data', ''))
420 lease_response = json.loads(data)
421 # Machine Provider sends a response including lease_expiration_ts. If
422 # lease_expiration_ts is not in the future and there is no more hostname
423 # associated with the response then this is a reclamation message.
424 lease_expiration_ts = datetime.datetime.utcfromtimestamp(
425 int(lease_response['lease_expiration_ts']))
426 if lease_expiration_ts <= now and not lease_response.get('hostname'):
427 # We used request IDs of the form MachineType.key.id()-<integer>.
428 # Extract the ID for the MachineType from the request ID.
429 machine_type_id = lease_response['client_request_id'].rsplit('-', 1)[0]
430 logging.info('Lease ID %s is expired\nMachineType: %s',
431 lease_response['client_request_id'],
432 MachineType.get_by_id(machine_type_id))
433 except (TypeError, ValueError):
434 logging.error('Received unexpected Pub/Sub message:\n%s',
435 json.dumps(message, indent=2))
436
437 yield pubsub.ack_async(subscription, message['ackId'])
438
439
440 def process_pubsub(app_id):
441 """Processes Pub/Sub messages from the Machine Provider, if there are any.
442
443 Args:
444 app_id: ID of the application where the Pub/Sub subscription exists.
445 """
446 MAX_IN_FLIGHT = 50
447 subscription = pubsub.full_subscription_name(app_id, PUBSUB_SUBSCRIPTION)
448 response = pubsub.pull(subscription)
449 logging.info('%s', response)
450
451 futures = []
452 messages = response.get('receivedMessages', [])
453 while messages:
454 num_futures = len(futures)
455 if num_futures < MAX_IN_FLIGHT:
456 futures.extend(
457 process_message(message, subscription)
458 for message in messages[:MAX_IN_FLIGHT - num_futures])
459 messages = messages[MAX_IN_FLIGHT - num_futures:]
460 ndb.Future.wait_any(futures)
461 futures = [future for future in futures if not future.done()]
462 if futures: 182 if futures:
463 ndb.Future.wait_all(futures) 183 ndb.Future.wait_all(futures)
184
185
186 def schedule_lease_management():
187 """Schedules task queues to process each MachineLease."""
188 for machine_lease in MachineLease.query():
189 # TODO(smut): Remove this check once migrated to the new format.
190 if machine_lease.machine_type:
191 if not utils.enqueue_task(
192 '/internal/taskqueue/machine-provider-manage',
193 'machine-provider-manage',
194 params={
195 'key': machine_lease.key.urlsafe(),
196 },
197 ):
198 logging.warning(
199 'Failed to enqueue task for MachineLease: %s', machine_lease.key)
200
201
202 @ndb.transactional
203 def clear_lease_request(key, request_id):
204 """Clears information about given lease request.
205
206 Args:
207 request_id: ID of the request to clear.
208 """
209 machine_lease = key.get()
210 if not machine_lease:
211 logging.error('MachineLease does not exist\nKey: %s', key)
212 return
213
214 if not machine_lease.client_request_id:
215 return
216
217 if request_id != machine_lease.client_request_id:
218 # Already cleared and incremented?
219 logging.warning(
220 'Request ID mismatch for MachineLease: %s\nExpected: %s\nActual: %s',
221 key,
222 request_id,
223 machine_lease.client_request_id,
224 )
225 return
226
227 machine_lease.client_request_id = None
228 machine_lease.hostname = None
229 machine_lease.lease_expiration_ts = None
230 machine_lease.put()
231
232
233 @ndb.transactional
234 def log_lease_fulfillment(key, request_id, hostname, lease_expiration_ts):
235 """Logs lease fulfillment.
236
237 Args:
238 request_id: ID of the request being fulfilled.
239 hostname: Hostname of the machine fulfilling the request.
240 lease_expiration_ts: UTC seconds since epoch when the lease expires.
241 """
242 machine_lease = key.get()
243 if not machine_lease:
244 logging.error('MachineLease does not exist\nKey: %s', key)
245 return
246
247 if request_id != machine_lease.client_request_id:
248 logging.error(
249 'Request ID mismatch\nKey: %s\nExpected: %s\nActual: %s',
250 key,
251 machine_lease.client_request_id,
252 request_id,
253 )
254 return
255
256 if (hostname == machine_lease.hostname
257 and lease_expiration_ts == machine_lease.lease_expiration_ts):
258 return
259
260 machine_lease.hostname = hostname
261 machine_lease.lease_expiration_ts = datetime.datetime.utcfromtimestamp(
262 lease_expiration_ts)
263 machine_lease.put()
264
265
266 @ndb.transactional
267 def update_client_request_id(key):
268 """Sets the client request ID used to lease a machine.
269
270 Args:
271 key: ndb.Key for a MachineLease entity.
272 """
273 machine_lease = key.get()
274 if not machine_lease:
275 logging.error('MachineLease does not exist\nKey: %s', key)
276 return
277
278 if machine_lease.drained:
279 logging.info('MachineLease is drained\nKey: %s', key)
280 return
281
282 if machine_lease.client_request_id:
283 return
284
285 machine_lease.request_count += 1
286 machine_lease.client_request_id = '%s-%s-%s' % (
287 machine_lease.machine_type.id(), key.id(), machine_lease.request_count)
288 machine_lease.put()
289
290
291 @ndb.transactional
292 def delete_machine_lease(key):
293 """Deletes the given MachineLease if it is drained and has no active lease.
294
295 Args:
296 key: ndb.Key for a MachineLease entity.
297 """
298 machine_lease = key.get()
299 if not machine_lease:
300 return
301
302 if not machine_lease.drained:
303 logging.warning('MachineLease not drained: %s', key)
304 return
305
306 if machine_lease.client_request_id:
307 return
308
309 key.delete()
310
311
312 def handle_lease_request_error(machine_lease, response):
313 """Handles an error in the lease request response from Machine Provider.
314
315 Args:
316 machine_lease: MachineLease instance.
317 response: Response returned by components.machine_provider.lease_machine.
318 """
319 error = machine_provider.LeaseRequestError.lookup_by_name(response['error'])
320 if error in (
321 machine_provider.LeaseRequestError.DEADLINE_EXCEEDED,
322 machine_provider.LeaseRequestError.TRANSIENT_ERROR,
323 ):
324 logging.warning(
325 'Transient failure: %s\nRequest ID: %s\nError: %s',
326 machine_lease.key,
327 response['client_request_id'],
328 response['error'],
329 )
330 else:
331 logging.error(
332 'Lease request failed\nKey: %s\nRequest ID: %s\nError: %s',
333 machine_lease.key,
334 response['client_request_id'],
335 response['error'],
336 )
337 clear_lease_request(machine_lease.key, machine_lease.client_request_id)
338
339
340 def handle_lease_request_response(machine_lease, response):
341 """Handles a successful lease request response from Machine Provider.
342
343 Args:
344 machine_lease: MachineLease instance.
345 response: Response returned by components.machine_provider.lease_machine.
346 """
347 assert not response.get('error')
348 state = machine_provider.LeaseRequestState.lookup_by_name(response['state'])
349 if state == machine_provider.LeaseRequestState.FULFILLED:
350 if not response.get('hostname'):
351 # Lease has already expired. This shouldn't happen, but it indicates the
352 # lease expired faster than we could tell it even got fulfilled.
353 logging.error(
354 'Request expired\nKey: %s\nRequest ID:%s\nExpired: %s',
355 machine_lease.key,
356 machine_lease.client_request_id,
357 response['lease_expiration_ts'],
358 )
359 clear_lease_request(machine_lease.key, machine_lease.client_request_id)
360 else:
361 logging.info(
362 'Request fulfilled: %s\nRequest ID: %s\nHostname: %s\nExpires: %s',
363 machine_lease.key,
364 machine_lease.client_request_id,
365 response['hostname'],
366 response['lease_expiration_ts'],
367 )
368 log_lease_fulfillment(
369 machine_lease.key,
370 machine_lease.client_request_id,
371 response['hostname'],
372 int(response['lease_expiration_ts']),
373 )
374 # TODO(smut): Create BotInfo, associate lease information.
375 elif state == machine_provider.LeaseRequestState.DENIED:
376 logging.warning(
377 'Request denied: %s\nRequest ID: %s',
378 machine_lease.key,
379 machine_lease.client_request_id,
380 )
381 clear_lease_request(machine_lease.key, machine_lease.client_request_id)
382
383
384 def manage_pending_lease_request(machine_lease):
385 """Manages a pending lease request.
386
387 Args:
388 machine_lease: MachineLease instance with client_request_id set.
389 """
390 assert machine_lease.client_request_id, machine_lease.key
391
392 response = machine_provider.lease_machine(
393 machine_provider.LeaseRequest(
394 dimensions=machine_lease.mp_dimensions,
395 # TODO(smut): Vary duration so machines don't expire all at once.
396 duration=machine_lease.lease_duration_secs,
397 on_lease=machine_provider.Instruction(
398 swarming_server='https://%s' % (
399 app_identity.get_default_version_hostname())),
400 request_id=machine_lease.client_request_id,
401 ),
402 )
403
404 if response.get('error'):
405 handle_lease_request_error(machine_lease, response)
406 return
407
408 handle_lease_request_response(machine_lease, response)
409
410
411 def manage_lease(key):
412 """Manages a MachineLease.
413
414 Args:
415 key: ndb.Key for a MachineLease entity.
416 """
417 machine_lease = key.get()
418 if not machine_lease:
419 return
420
421 # Manage a leased machine.
422 if machine_lease.lease_expiration_ts:
423 if machine_lease.lease_expiration_ts <= utils.utcnow():
424 logging.info('MachineLease expired: %s', key)
425 assert machine_lease.hostname, key
426 clear_lease_request(key, machine_lease.client_request_id)
427 return
428
429 # Lease expiration time is unknown, so there must be no leased machine.
430 assert not machine_lease.hostname, key
431
432 # Manage a pending lease request.
433 if machine_lease.client_request_id:
434 manage_pending_lease_request(machine_lease)
435 return
436
437 # Manage an uninitiated lease request.
438 if not machine_lease.drained:
439 update_client_request_id(key)
440 return
441
442 # Manage an uninitiated, drained lease request.
443 delete_machine_lease(key)
OLDNEW
« no previous file with comments | « appengine/swarming/queue.yaml ('k') | appengine/swarming/server/lease_management_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698