| OLD | NEW |
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. | 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Lease management for machines leased from the Machine Provider. | 5 """Lease management for machines leased from the Machine Provider. |
| 6 | 6 |
| 7 Keeps a list of machine types which should be leased from the Machine Provider | 7 Keeps a list of machine types which should be leased from the Machine Provider |
| 8 and the list of machines of each type currently leased. | 8 and the list of machines of each type currently leased. |
| 9 | 9 |
| 10 Swarming integration with Machine Provider | 10 Swarming integration with Machine Provider |
| 11 ========================================== | 11 ========================================== |
| 12 | 12 |
| 13 handlers_backend.py contains a cron job which looks at each machine type and | 13 handlers_backend.py contains a cron job which looks at each MachineType and |
| 14 ensures the number of active lease requests is equal to the target size | 14 ensures there are at least as many MachineLeases in the datastore which refer |
| 15 specified by the machine type. | 15 to that MachineType as the target_size in MachineType specifies by numbering |
| 16 them 0 through target_size - 1 If there are MachineType entities numbered |
| 17 target_size or greater which refer to that MachineType, those MachineLeases |
| 18 are marked as drained. |
| 16 | 19 |
| 17 A lease request is considered active as soon as it is initiated. It remains | 20 Each MachineLease manages itself. A cron job in handlers_backend.py will trigger |
| 18 active while the lease request is pending and even after it is fulfilled. A | 21 self-management jobs for each entity. If there is no associated lease and the |
| 19 lease request is considered no longer active once the request is denied or | 22 MachineLease is not drained, issue a request to the Machine Provider for a |
| 20 the machine used to fulfill the request is reclaimed. | 23 matching machine. If there is an associated request, check the status of that |
| 24 request. If it is fulfilled, ensure the existence of a BotInfo entity (see |
| 25 server/bot_management.py) corresponding to the machine provided for the lease. |
| 26 Include the lease ID and lease_expiration_ts as fields in the BotInfo. If it |
| 27 is expired, clear the associated lease. If there is no associated lease and |
| 28 the MachineLease is drained, delete the MachineLease entity. |
| 21 | 29 |
| 22 This scheme ensures that we never have more machines of a given type than we | 30 TODO(smut): If there is an associated request and the MachineLease is drained, |
| 23 want | 31 release the lease immediately (as long as the bot is not mid-task). |
| 24 | |
| 25 Each machine type is stored as a MachineType entity which describes the machine | |
| 26 in terms of Machine Provider Dimensions (not to be confused with Swarming's own | |
| 27 bot dimensions) as well as a target size which describes the number of machines | |
| 28 of this type we want connected at once. | |
| 29 | |
| 30 handlers_backend.py calls generate_lease_requests (below) to generate a number | |
| 31 of lease requests equal to the target size minus the current number of active | |
| 32 leases, then performs the RPC to send these requests to the Machine Provider, | |
| 33 and finally it calls update_leases (below) with the results of the lease | |
| 34 request to associate the new active leases with the machine type. | |
| 35 | |
| 36 In principle, the entire operation on a single machine type needs to happen | |
| 37 transactionally so we don't make duplicate requests and end up with more | |
| 38 machines than the target size wants. However, in practice we need the RPC | |
| 39 to be outside the transaction because it may be slow and it may need to be | |
| 40 retried, which could cause the entire transaction to time out without logging | |
| 41 the result of the call. Therefore the procedure is broken up into two | |
| 42 transactions with an RPC in between. | |
| 43 | |
| 44 With the transaction broken into two, the choice of request ID is the only way | |
| 45 to prevents duplicate lease requests (e.g. if generate_lease_requests runs twice | |
| 46 before update_leases gets to run). Machine Provider supports idempotent RPCs | |
| 47 so long as the client-generated request ID is the same. | |
| 48 | |
| 49 Therefore, we ensure generate_lease_requests always generates lease requests | |
| 50 with the same request ID until update_lease is called. This is done by keeping a | |
| 51 count of the number of requests generated so far. If the request count is n, | |
| 52 generate_lease_requests will always generate requests with IDs n + 1, n + 2, ... | |
| 53 up to n + target size - current size, where current size is the number of | |
| 54 currently active lease requests. | |
| 55 | |
| 56 update_lease takes the RPC result and updates the pending request entries. | |
| 57 | |
| 58 TODO(smut): Consider request count overflow. | |
| 59 """ | 32 """ |
| 60 | 33 |
| 61 import base64 | 34 import base64 |
| 62 import datetime | 35 import datetime |
| 63 import json | 36 import json |
| 64 import logging | 37 import logging |
| 65 | 38 |
| 39 from google.appengine.api import app_identity |
| 66 from google.appengine.ext import ndb | 40 from google.appengine.ext import ndb |
| 67 from google.appengine.ext.ndb import msgprop | 41 from google.appengine.ext.ndb import msgprop |
| 68 | 42 |
| 69 from components import machine_provider | 43 from components import machine_provider |
| 70 from components import pubsub | 44 from components import pubsub |
| 71 from components import utils | 45 from components import utils |
| 72 from server import bot_management | 46 from server import bot_management |
| 73 | 47 |
| 74 | 48 |
| 75 # Name of the topic the Machine Provider is authorized to publish | 49 # Name of the topic the Machine Provider is authorized to publish |
| 76 # lease information to. | 50 # lease information to. |
| 77 PUBSUB_TOPIC = 'machine-provider' | 51 PUBSUB_TOPIC = 'machine-provider' |
| 78 | 52 |
| 79 # Name of the pull subscription to the Machine Provider topic. | 53 # Name of the pull subscription to the Machine Provider topic. |
| 80 PUBSUB_SUBSCRIPTION = 'machine-provider' | 54 PUBSUB_SUBSCRIPTION = 'machine-provider' |
| 81 | 55 |
| 82 | 56 |
| 83 class MachineLease(ndb.Model): | 57 class MachineLease(ndb.Model): |
| 84 """A lease request for a machine from the Machine Provider. | 58 """A lease request for a machine from the Machine Provider. |
| 85 | 59 |
| 86 Standalone MachineLease entities should not exist in the datastore. | 60 Key: |
| 61 id: A string in the form <machine type id>-<number>. |
| 62 kind: MachineLease. Is a root entity. |
| 87 """ | 63 """ |
| 88 # Request ID used to generate this request. | 64 # Request ID used to generate this request. |
| 89 client_request_id = ndb.StringProperty(required=True) | 65 client_request_id = ndb.StringProperty(indexed=True) |
| 66 # Whether or not this MachineLease should issue lease requests. |
| 67 drained = ndb.BooleanProperty(indexed=True) |
| 90 # Hostname of the machine currently allocated for this request. | 68 # Hostname of the machine currently allocated for this request. |
| 91 hostname = ndb.StringProperty() | 69 hostname = ndb.StringProperty() |
| 92 # Request hash returned by the server for the request for this machine. | 70 # Duration to lease for. |
| 93 request_hash = ndb.StringProperty() | 71 lease_duration_secs = ndb.IntegerProperty(indexed=False) |
| 94 # DateTime indicating lease expiration time. | 72 # DateTime indicating lease expiration time. |
| 95 lease_expiration_ts = ndb.DateTimeProperty() | 73 lease_expiration_ts = ndb.DateTimeProperty() |
| 74 # ndb.Key for the MachineType this MachineLease is created for. |
| 75 machine_type = ndb.KeyProperty() |
| 76 # machine_provider.Dimensions describing the machine. |
| 77 mp_dimensions = msgprop.MessageProperty( |
| 78 machine_provider.Dimensions, indexed=False) |
| 79 # Last request number used. |
| 80 request_count = ndb.IntegerProperty(default=0, required=True) |
| 96 | 81 |
| 97 | 82 |
| 98 class MachineType(ndb.Model): | 83 class MachineType(ndb.Model): |
| 99 """A type of machine which should be leased from the Machine Provider. | 84 """A type of machine which should be leased from the Machine Provider. |
| 100 | 85 |
| 101 Key: | 86 Key: |
| 102 id: A human-readable name for this machine type. | 87 id: A human-readable name for this machine type. |
| 103 kind: MachineType. Is a root entity. | 88 kind: MachineType. Is a root entity. |
| 104 """ | 89 """ |
| 105 # Current number of active leases. | |
| 106 current_size = ndb.ComputedProperty(lambda self: len(self.leases)) | |
| 107 # Description of this machine type for humans. | 90 # Description of this machine type for humans. |
| 108 description = ndb.StringProperty(indexed=False) | 91 description = ndb.StringProperty(indexed=False) |
| 109 # Whether or not to attempt to lease machines of this type. | 92 # Whether or not to attempt to lease machines of this type. |
| 110 enabled = ndb.BooleanProperty(default=True) | 93 enabled = ndb.BooleanProperty(default=True) |
| 111 # Duration to lease each machine for. | 94 # Duration to lease each machine for. |
| 112 lease_duration_secs = ndb.IntegerProperty(indexed=False) | 95 lease_duration_secs = ndb.IntegerProperty(indexed=False) |
| 113 # List of active lease requests. | |
| 114 leases = ndb.LocalStructuredProperty(MachineLease, repeated=True) | |
| 115 # machine_provider.Dimensions describing the machine. | 96 # machine_provider.Dimensions describing the machine. |
| 116 mp_dimensions = msgprop.MessageProperty( | 97 mp_dimensions = msgprop.MessageProperty( |
| 117 machine_provider.Dimensions, indexed=False) | 98 machine_provider.Dimensions, indexed=False) |
| 118 # Number of bots pending deletion. | |
| 119 num_pending_deletion = ndb.ComputedProperty( | |
| 120 lambda self: len(self.pending_deletion)) | |
| 121 # List of hostnames whose leases have expired and should be deleted. | |
| 122 pending_deletion = ndb.StringProperty(indexed=False, repeated=True) | |
| 123 # Last request number used. | |
| 124 request_count = ndb.IntegerProperty(default=0, required=True) | |
| 125 # Target number of machines of this type to have leased at once. | 99 # Target number of machines of this type to have leased at once. |
| 126 target_size = ndb.IntegerProperty(indexed=False, required=True) | 100 target_size = ndb.IntegerProperty(indexed=False, required=True) |
| 127 | 101 |
| 128 | 102 |
| 129 def clean_up_bots(): | 103 @ndb.transactional_tasklet |
| 130 """Cleans up expired leases.""" | 104 def ensure_entity_exists(machine_type, n): |
| 131 # Maximum number of in-flight ndb.Futures. | 105 """Ensures the nth MachineLease for the given MachineType exists. |
| 132 MAX_IN_FLIGHT = 50 | 106 |
| 133 | 107 Args: |
| 134 bot_ids = [] | 108 machine_type: MachineType entity. |
| 135 deleted = {} | 109 n: The MachineLease index. |
| 136 for machine_type in MachineType.query(MachineType.num_pending_deletion > 0): | 110 """ |
| 137 logging.info( | 111 key = ndb.Key(MachineLease, '%s-%s' % (machine_type.key.id(), n)) |
| 138 'Deleting bots: %s', ', '.join(sorted(machine_type.pending_deletion))) | 112 machine_lease = yield key.get_async() |
| 139 bot_ids.extend(machine_type.pending_deletion) | 113 if machine_lease: |
| 140 deleted[machine_type.key] = machine_type.pending_deletion | 114 return |
| 141 | 115 |
| 142 # Generate a few asynchronous requests at a time in order to | 116 yield MachineLease( |
| 143 # prevent having too many in-flight ndb.Futures at a time. | 117 key=key, |
| 118 lease_duration_secs=machine_type.lease_duration_secs, |
| 119 machine_type=machine_type.key, |
| 120 mp_dimensions=machine_type.mp_dimensions, |
| 121 ).put_async() |
| 122 |
| 123 |
| 124 def ensure_entities_exist(max_concurrent=50): |
| 125 """Ensures MachineLeases exist for each MachineType. |
| 126 |
| 127 Args: |
| 128 max_concurrent: Maximum number of concurrent asynchronous requests. |
| 129 """ |
| 130 # Generate a few asynchronous requests at a time in order to prevent having |
| 131 # too many in flight at a time. |
| 144 futures = [] | 132 futures = [] |
| 145 while bot_ids: | 133 |
| 146 num_futures = len(futures) | 134 for machine_type in MachineType.query(MachineType.enabled == True): |
| 147 if num_futures < MAX_IN_FLIGHT: | 135 cursor = 0 |
| 148 keys = [bot_management.get_info_key(bot_id) | 136 while cursor < machine_type.target_size: |
| 149 for bot_id in bot_ids[:MAX_IN_FLIGHT - num_futures]] | 137 while len(futures) < max_concurrent and cursor < machine_type.target_size: |
| 150 bot_ids = bot_ids[MAX_IN_FLIGHT - num_futures:] | 138 futures.append(ensure_entity_exists(machine_type, cursor)) |
| 151 futures.extend(ndb.delete_multi_async(keys)) | 139 cursor += 1 |
| 152 | 140 ndb.Future.wait_any(futures) |
| 153 ndb.Future.wait_any(futures) | 141 # We don't bother checking success or failure. If a transient error |
| 154 futures = [future for future in futures if not future.done()] | 142 # like TransactionFailed or DeadlineExceeded is raised and an entity |
| 143 # is not created, we will just create it the next time this is called, |
| 144 # converging to the desired state eventually. |
| 145 futures = [future for future in futures if not future.done()] |
| 155 | 146 |
| 156 if futures: | 147 if futures: |
| 157 ndb.Future.wait_all(futures) | 148 ndb.Future.wait_all(futures) |
| 158 | 149 |
| 159 # There should be relatively few MachineType entitites, so | 150 |
| 160 # just process them sequentially. | 151 def drain_excess(max_concurrent=50): |
| 161 # TODO(smut): Parallelize this. | 152 """Marks MachineLeases beyond what is needed by their MachineType as drained. |
| 162 for machine_key, hostnames in deleted.iteritems(): | 153 |
| 163 successfully_deleted = [] | 154 Args: |
| 164 for hostname in hostnames: | 155 max_concurrent: Maximum number of concurrent asynchronous requests. |
| 165 if bot_management.get_info_key(hostname).get(): | 156 """ |
| 166 logging.error('Failed to delete BotInfo: %s', hostname) | 157 futures = [] |
| 167 else: | 158 |
| 168 successfully_deleted.append(hostname) | 159 for machine_type in MachineType.query(): |
| 169 logging.info('Deleted bots: %s', ', '.join(sorted(successfully_deleted))) | 160 for machine_lease in MachineLease.query( |
| 170 _clear_bots_pending_deletion(machine_key, hostnames) | 161 MachineLease.machine_type == machine_type.key, |
| 171 | 162 ): |
| 172 | 163 try: |
| 173 @ndb.transactional | 164 index = int(machine_lease.key.id().rsplit('-', 1)[-1]) |
| 174 def _clear_bots_pending_deletion(machine_type_key, hostnames): | 165 except ValueError: |
| 175 """Clears the list of bots pending deletion. | 166 logging.error( |
| 176 | 167 'MachineLease index could not be deciphered\n Key: %s', |
| 177 Args: | 168 machine_lease.key, |
| 178 machine_type_key: ndb.Key for a MachineType instance. | |
| 179 hostnames: List of bots pending deletion. | |
| 180 """ | |
| 181 machine_type = machine_type_key.get() | |
| 182 if not machine_type: | |
| 183 logging.error('MachineType no longer exists: %s', machine_type_key.id()) | |
| 184 return | |
| 185 | |
| 186 num_pending_deletion = len(machine_type.pending_deletion) | |
| 187 machine_type.pending_deletion = [ | |
| 188 host for host in machine_type.pending_deletion if host not in hostnames] | |
| 189 if len(machine_type.pending_deletion) != num_pending_deletion: | |
| 190 machine_type.put() | |
| 191 | |
| 192 | |
| 193 @ndb.transactional | |
| 194 def generate_lease_requests(machine_type_key, app_id, swarming_server): | |
| 195 """Generates lease requests. | |
| 196 | |
| 197 The list includes new requests to lease machines up to the targeted | |
| 198 size for the given machine type as well as requests to get the status | |
| 199 of pending lease requests. | |
| 200 | |
| 201 Args: | |
| 202 machine_type_key: ndb.Key for a MachineType instance. | |
| 203 app_id: ID of the application the requests originate from. | |
| 204 swarming_server: URL for the Swarming server to connect to. | |
| 205 | |
| 206 Returns: | |
| 207 A list of lease requests. | |
| 208 """ | |
| 209 machine_type = machine_type_key.get() | |
| 210 if not machine_type: | |
| 211 logging.warning('MachineType no longer exists: %s', machine_type_key.id()) | |
| 212 return [] | |
| 213 | |
| 214 expired_requests = _clean_up_expired_leases(machine_type) | |
| 215 lease_requests = _generate_lease_request_status_updates( | |
| 216 machine_type, app_id, swarming_server) | |
| 217 | |
| 218 if not machine_type.enabled: | |
| 219 logging.warning('MachineType is not enabled: %s\n', machine_type.key.id()) | |
| 220 return lease_requests | |
| 221 if machine_type.current_size >= machine_type.target_size: | |
| 222 logging.info( | |
| 223 'MachineType %s is at capacity: %d/%d', | |
| 224 machine_type.key.id(), | |
| 225 machine_type.current_size, | |
| 226 machine_type.target_size, | |
| 227 ) | |
| 228 return lease_requests | |
| 229 | |
| 230 new_requests = _generate_lease_requests_for_new_machines( | |
| 231 machine_type, app_id, swarming_server) | |
| 232 | |
| 233 if new_requests or expired_requests: | |
| 234 machine_type.put() | |
| 235 lease_requests.extend(new_requests) | |
| 236 | |
| 237 return lease_requests | |
| 238 | |
| 239 | |
| 240 def _clean_up_expired_leases(machine_type): | |
| 241 """Cleans up expired leases. | |
| 242 | |
| 243 Prunes expired leases from machine_type.leases, | |
| 244 but does not write the result to the datastore. | |
| 245 | |
| 246 Args: | |
| 247 machine_type: MachineType instance. | |
| 248 | |
| 249 Returns: | |
| 250 A list of leases that were removed. | |
| 251 """ | |
| 252 active = [] | |
| 253 expired = [] | |
| 254 | |
| 255 for request in machine_type.leases: | |
| 256 if request.hostname and request.lease_expiration_ts <= utils.utcnow(): | |
| 257 logging.warning( | |
| 258 'Request ID %s expired:\nHostname: %s\nExpiration: %s', | |
| 259 request.client_request_id, | |
| 260 request.hostname, | |
| 261 request.lease_expiration_ts, | |
| 262 ) | |
| 263 expired.append(request.hostname) | |
| 264 else: | |
| 265 active.append(request) | |
| 266 | |
| 267 machine_type.leases = active | |
| 268 machine_type.pending_deletion.extend(expired) | |
| 269 return expired | |
| 270 | |
| 271 | |
| 272 def _generate_lease_request_status_updates( | |
| 273 machine_type, app_id, swarming_server): | |
| 274 """Generates status update requests for pending lease requests. | |
| 275 | |
| 276 Args: | |
| 277 machine_type: MachineType instance. | |
| 278 app_id: ID of the application the requests originate from. | |
| 279 swarming_server: URL for the Swarming server to connect to. | |
| 280 | |
| 281 Returns: | |
| 282 A list of lease requests. | |
| 283 """ | |
| 284 lease_requests = [] | |
| 285 for request in machine_type.leases: | |
| 286 if not request.hostname: | |
| 287 # We don't know the hostname yet, meaning this request is still pending. | |
| 288 lease_requests.append(machine_provider.LeaseRequest( | |
| 289 dimensions=machine_type.mp_dimensions, | |
| 290 duration=machine_type.lease_duration_secs, | |
| 291 on_lease=machine_provider.Instruction( | |
| 292 swarming_server=swarming_server), | |
| 293 pubsub_project=app_id, | |
| 294 pubsub_topic=PUBSUB_TOPIC, | |
| 295 request_id=request.client_request_id, | |
| 296 )) | |
| 297 return lease_requests | |
| 298 | |
| 299 | |
| 300 def _generate_lease_requests_for_new_machines( | |
| 301 machine_type, app_id, swarming_server): | |
| 302 """Generates requests to lease machines up to the target. | |
| 303 | |
| 304 Extends machine_type.leases by the number of new lease requests generated, | |
| 305 but does not write the result to the datastore. | |
| 306 | |
| 307 Args: | |
| 308 machine_type: MachineType instance. | |
| 309 app_id: ID of the application the requests originate from. | |
| 310 swarming_server: URL for the Swarming server to connect to. | |
| 311 | |
| 312 Returns: | |
| 313 A list of lease requests. | |
| 314 """ | |
| 315 lease_requests = [] | |
| 316 request_number = machine_type.request_count | |
| 317 for _ in xrange(machine_type.target_size - machine_type.current_size): | |
| 318 request_number += 1 | |
| 319 request_id = '%s-%d' % (machine_type.key.id(), request_number) | |
| 320 lease_requests.append(machine_provider.LeaseRequest( | |
| 321 dimensions=machine_type.mp_dimensions, | |
| 322 duration=machine_type.lease_duration_secs, | |
| 323 on_lease=machine_provider.Instruction(swarming_server=swarming_server), | |
| 324 pubsub_project=app_id, | |
| 325 pubsub_topic=PUBSUB_TOPIC, | |
| 326 request_id=request_id, | |
| 327 )) | |
| 328 machine_type.leases.append(MachineLease(client_request_id=request_id)) | |
| 329 machine_type.request_count = request_number | |
| 330 return lease_requests | |
| 331 | |
| 332 | |
| 333 @ndb.transactional | |
| 334 def update_leases(machine_type_key, responses): | |
| 335 """Updates the states of leases of the given machine types. | |
| 336 | |
| 337 Args: | |
| 338 machine_type_key: ndb.Key for a MachineType instance. | |
| 339 responses: machine_provider.BatchedLeaseResponse instance. | |
| 340 """ | |
| 341 machine_type = machine_type_key.get() | |
| 342 if not machine_type: | |
| 343 logging.warning('MachineType no longer exists: %s', machine_type_key) | |
| 344 return | |
| 345 | |
| 346 lease_request_map = { | |
| 347 request.client_request_id: request for request in machine_type.leases | |
| 348 } | |
| 349 for response in responses: | |
| 350 request_id = response['client_request_id'] | |
| 351 request = lease_request_map.get(request_id) | |
| 352 if not request: | |
| 353 logging.error('Unknown request ID: %s', request_id) | |
| 354 continue | |
| 355 | |
| 356 if response.get('error'): | |
| 357 error = machine_provider.LeaseRequestError.lookup_by_name( | |
| 358 response['error']) | |
| 359 if error in ( | |
| 360 machine_provider.LeaseRequestError.DEADLINE_EXCEEDED, | |
| 361 machine_provider.LeaseRequestError.TRANSIENT_ERROR, | |
| 362 ): | |
| 363 # Retryable errors. Just try again later. | |
| 364 logging.warning( | |
| 365 'Request not processed, trying again later: %s', request_id) | |
| 366 else: | |
| 367 # TODO(smut): Handle specific errors. | |
| 368 logging.warning( | |
| 369 'Error %s for request ID %s', | |
| 370 response['error'], | |
| 371 request_id, | |
| 372 ) | 169 ) |
| 373 lease_request_map.pop(request_id) | 170 continue |
| 374 else: | 171 # Drain MachineLeases where the MachineType is not enabled or the index |
| 375 request.request_hash = response['request_hash'] | 172 # exceeds the target_size given by the MachineType. Since MachineLeases |
| 376 state = machine_provider.LeaseRequestState.lookup_by_name( | 173 # are created in contiguous blocks, only indices 0 through target_size - 1 |
| 377 response['state']) | 174 # should exist. |
| 378 if state == machine_provider.LeaseRequestState.DENIED: | 175 if not machine_type.enabled or index >= machine_type.target_size: |
| 379 logging.warning('Request ID denied: %s', request_id) | 176 if len(futures) == max_concurrent: |
| 380 lease_request_map.pop(request_id) | 177 ndb.Future.wait_any(futures) |
| 381 elif state == machine_provider.LeaseRequestState.FULFILLED: | 178 futures = [future for future in futures if not future.done()] |
| 382 if response.get('hostname'): | 179 machine_lease.drained = True |
| 383 logging.info( | 180 futures.append(machine_lease.put_async()) |
| 384 'Request ID %s fulfilled:\nHostname: %s\nExpiration: %s', | 181 |
| 385 request_id, | |
| 386 response['hostname'], | |
| 387 response['lease_expiration_ts'], | |
| 388 ) | |
| 389 request.hostname = response['hostname'] | |
| 390 request.lease_expiration_ts = datetime.datetime.utcfromtimestamp( | |
| 391 int(response['lease_expiration_ts'])) | |
| 392 else: | |
| 393 # Lease expired. This shouldn't happen, because it means we had a | |
| 394 # pending request which was fulfilled and expired before we were | |
| 395 # able to check its status. | |
| 396 logging.warning('Request ID fulfilled and expired: %s', request_id) | |
| 397 lease_request_map.pop(request_id) | |
| 398 else: | |
| 399 # Lease request isn't processed yet. Just try again later. | |
| 400 logging.info( | |
| 401 'Request ID %s in state: %s', request_id, response['state']) | |
| 402 | |
| 403 machine_type.leases = sorted( | |
| 404 lease_request_map.values(), key=lambda lease: lease.client_request_id) | |
| 405 machine_type.put() | |
| 406 | |
| 407 | |
| 408 @ndb.tasklet | |
| 409 def process_message(message, subscription): | |
| 410 """Processes a Pub/Sub message from the Machine Provider. | |
| 411 | |
| 412 Args: | |
| 413 message: A message dict as returned by pubsub.pull. | |
| 414 subscription: Full name of the subscription the message was received on. | |
| 415 """ | |
| 416 now = utils.utcnow() | |
| 417 | |
| 418 try: | |
| 419 data = base64.b64decode(message.get('message', {}).get('data', '')) | |
| 420 lease_response = json.loads(data) | |
| 421 # Machine Provider sends a response including lease_expiration_ts. If | |
| 422 # lease_expiration_ts is not in the future and there is no more hostname | |
| 423 # associated with the response then this is a reclamation message. | |
| 424 lease_expiration_ts = datetime.datetime.utcfromtimestamp( | |
| 425 int(lease_response['lease_expiration_ts'])) | |
| 426 if lease_expiration_ts <= now and not lease_response.get('hostname'): | |
| 427 # We used request IDs of the form MachineType.key.id()-<integer>. | |
| 428 # Extract the ID for the MachineType from the request ID. | |
| 429 machine_type_id = lease_response['client_request_id'].rsplit('-', 1)[0] | |
| 430 logging.info('Lease ID %s is expired\nMachineType: %s', | |
| 431 lease_response['client_request_id'], | |
| 432 MachineType.get_by_id(machine_type_id)) | |
| 433 except (TypeError, ValueError): | |
| 434 logging.error('Received unexpected Pub/Sub message:\n%s', | |
| 435 json.dumps(message, indent=2)) | |
| 436 | |
| 437 yield pubsub.ack_async(subscription, message['ackId']) | |
| 438 | |
| 439 | |
| 440 def process_pubsub(app_id): | |
| 441 """Processes Pub/Sub messages from the Machine Provider, if there are any. | |
| 442 | |
| 443 Args: | |
| 444 app_id: ID of the application where the Pub/Sub subscription exists. | |
| 445 """ | |
| 446 MAX_IN_FLIGHT = 50 | |
| 447 subscription = pubsub.full_subscription_name(app_id, PUBSUB_SUBSCRIPTION) | |
| 448 response = pubsub.pull(subscription) | |
| 449 logging.info('%s', response) | |
| 450 | |
| 451 futures = [] | |
| 452 messages = response.get('receivedMessages', []) | |
| 453 while messages: | |
| 454 num_futures = len(futures) | |
| 455 if num_futures < MAX_IN_FLIGHT: | |
| 456 futures.extend( | |
| 457 process_message(message, subscription) | |
| 458 for message in messages[:MAX_IN_FLIGHT - num_futures]) | |
| 459 messages = messages[MAX_IN_FLIGHT - num_futures:] | |
| 460 ndb.Future.wait_any(futures) | |
| 461 futures = [future for future in futures if not future.done()] | |
| 462 if futures: | 182 if futures: |
| 463 ndb.Future.wait_all(futures) | 183 ndb.Future.wait_all(futures) |
| 184 |
| 185 |
| 186 def schedule_lease_management(): |
| 187 """Schedules task queues to process each MachineLease.""" |
| 188 for machine_lease in MachineLease.query(): |
| 189 # TODO(smut): Remove this check once migrated to the new format. |
| 190 if machine_lease.machine_type: |
| 191 if not utils.enqueue_task( |
| 192 '/internal/taskqueue/machine-provider-manage', |
| 193 'machine-provider-manage', |
| 194 params={ |
| 195 'key': machine_lease.key.urlsafe(), |
| 196 }, |
| 197 ): |
| 198 logging.warning( |
| 199 'Failed to enqueue task for MachineLease: %s', machine_lease.key) |
| 200 |
| 201 |
| 202 @ndb.transactional |
| 203 def clear_lease_request(key, request_id): |
| 204 """Clears information about given lease request. |
| 205 |
| 206 Args: |
| 207 request_id: ID of the request to clear. |
| 208 """ |
| 209 machine_lease = key.get() |
| 210 if not machine_lease: |
| 211 logging.error('MachineLease does not exist\nKey: %s', key) |
| 212 return |
| 213 |
| 214 if not machine_lease.client_request_id: |
| 215 return |
| 216 |
| 217 if request_id != machine_lease.client_request_id: |
| 218 # Already cleared and incremented? |
| 219 logging.warning( |
| 220 'Request ID mismatch for MachineLease: %s\nExpected: %s\nActual: %s', |
| 221 key, |
| 222 request_id, |
| 223 machine_lease.client_request_id, |
| 224 ) |
| 225 return |
| 226 |
| 227 machine_lease.client_request_id = None |
| 228 machine_lease.hostname = None |
| 229 machine_lease.lease_expiration_ts = None |
| 230 machine_lease.put() |
| 231 |
| 232 |
| 233 @ndb.transactional |
| 234 def log_lease_fulfillment(key, request_id, hostname, lease_expiration_ts): |
| 235 """Logs lease fulfillment. |
| 236 |
| 237 Args: |
| 238 request_id: ID of the request being fulfilled. |
| 239 hostname: Hostname of the machine fulfilling the request. |
| 240 lease_expiration_ts: UTC seconds since epoch when the lease expires. |
| 241 """ |
| 242 machine_lease = key.get() |
| 243 if not machine_lease: |
| 244 logging.error('MachineLease does not exist\nKey: %s', key) |
| 245 return |
| 246 |
| 247 if request_id != machine_lease.client_request_id: |
| 248 logging.error( |
| 249 'Request ID mismatch\nKey: %s\nExpected: %s\nActual: %s', |
| 250 key, |
| 251 machine_lease.client_request_id, |
| 252 request_id, |
| 253 ) |
| 254 return |
| 255 |
| 256 if (hostname == machine_lease.hostname |
| 257 and lease_expiration_ts == machine_lease.lease_expiration_ts): |
| 258 return |
| 259 |
| 260 machine_lease.hostname = hostname |
| 261 machine_lease.lease_expiration_ts = datetime.datetime.utcfromtimestamp( |
| 262 lease_expiration_ts) |
| 263 machine_lease.put() |
| 264 |
| 265 |
| 266 @ndb.transactional |
| 267 def update_client_request_id(key): |
| 268 """Sets the client request ID used to lease a machine. |
| 269 |
| 270 Args: |
| 271 key: ndb.Key for a MachineLease entity. |
| 272 """ |
| 273 machine_lease = key.get() |
| 274 if not machine_lease: |
| 275 logging.error('MachineLease does not exist\nKey: %s', key) |
| 276 return |
| 277 |
| 278 if machine_lease.drained: |
| 279 logging.info('MachineLease is drained\nKey: %s', key) |
| 280 return |
| 281 |
| 282 if machine_lease.client_request_id: |
| 283 return |
| 284 |
| 285 machine_lease.request_count += 1 |
| 286 machine_lease.client_request_id = '%s-%s-%s' % ( |
| 287 machine_lease.machine_type.id(), key.id(), machine_lease.request_count) |
| 288 machine_lease.put() |
| 289 |
| 290 |
| 291 @ndb.transactional |
| 292 def delete_machine_lease(key): |
| 293 """Deletes the given MachineLease if it is drained and has no active lease. |
| 294 |
| 295 Args: |
| 296 key: ndb.Key for a MachineLease entity. |
| 297 """ |
| 298 machine_lease = key.get() |
| 299 if not machine_lease: |
| 300 return |
| 301 |
| 302 if not machine_lease.drained: |
| 303 logging.warning('MachineLease not drained: %s', key) |
| 304 return |
| 305 |
| 306 if machine_lease.client_request_id: |
| 307 return |
| 308 |
| 309 key.delete() |
| 310 |
| 311 |
| 312 def handle_lease_request_error(machine_lease, response): |
| 313 """Handles an error in the lease request response from Machine Provider. |
| 314 |
| 315 Args: |
| 316 machine_lease: MachineLease instance. |
| 317 response: Response returned by components.machine_provider.lease_machine. |
| 318 """ |
| 319 error = machine_provider.LeaseRequestError.lookup_by_name(response['error']) |
| 320 if error in ( |
| 321 machine_provider.LeaseRequestError.DEADLINE_EXCEEDED, |
| 322 machine_provider.LeaseRequestError.TRANSIENT_ERROR, |
| 323 ): |
| 324 logging.warning( |
| 325 'Transient failure: %s\nRequest ID: %s\nError: %s', |
| 326 machine_lease.key, |
| 327 response['client_request_id'], |
| 328 response['error'], |
| 329 ) |
| 330 else: |
| 331 logging.error( |
| 332 'Lease request failed\nKey: %s\nRequest ID: %s\nError: %s', |
| 333 machine_lease.key, |
| 334 response['client_request_id'], |
| 335 response['error'], |
| 336 ) |
| 337 clear_lease_request(machine_lease.key, machine_lease.client_request_id) |
| 338 |
| 339 |
| 340 def handle_lease_request_response(machine_lease, response): |
| 341 """Handles a successful lease request response from Machine Provider. |
| 342 |
| 343 Args: |
| 344 machine_lease: MachineLease instance. |
| 345 response: Response returned by components.machine_provider.lease_machine. |
| 346 """ |
| 347 assert not response.get('error') |
| 348 state = machine_provider.LeaseRequestState.lookup_by_name(response['state']) |
| 349 if state == machine_provider.LeaseRequestState.FULFILLED: |
| 350 if not response.get('hostname'): |
| 351 # Lease has already expired. This shouldn't happen, but it indicates the |
| 352 # lease expired faster than we could tell it even got fulfilled. |
| 353 logging.error( |
| 354 'Request expired\nKey: %s\nRequest ID:%s\nExpired: %s', |
| 355 machine_lease.key, |
| 356 machine_lease.client_request_id, |
| 357 response['lease_expiration_ts'], |
| 358 ) |
| 359 clear_lease_request(machine_lease.key, machine_lease.client_request_id) |
| 360 else: |
| 361 logging.info( |
| 362 'Request fulfilled: %s\nRequest ID: %s\nHostname: %s\nExpires: %s', |
| 363 machine_lease.key, |
| 364 machine_lease.client_request_id, |
| 365 response['hostname'], |
| 366 response['lease_expiration_ts'], |
| 367 ) |
| 368 log_lease_fulfillment( |
| 369 machine_lease.key, |
| 370 machine_lease.client_request_id, |
| 371 response['hostname'], |
| 372 int(response['lease_expiration_ts']), |
| 373 ) |
| 374 # TODO(smut): Create BotInfo, associate lease information. |
| 375 elif state == machine_provider.LeaseRequestState.DENIED: |
| 376 logging.warning( |
| 377 'Request denied: %s\nRequest ID: %s', |
| 378 machine_lease.key, |
| 379 machine_lease.client_request_id, |
| 380 ) |
| 381 clear_lease_request(machine_lease.key, machine_lease.client_request_id) |
| 382 |
| 383 |
| 384 def manage_pending_lease_request(machine_lease): |
| 385 """Manages a pending lease request. |
| 386 |
| 387 Args: |
| 388 machine_lease: MachineLease instance with client_request_id set. |
| 389 """ |
| 390 assert machine_lease.client_request_id, machine_lease.key |
| 391 |
| 392 response = machine_provider.lease_machine( |
| 393 machine_provider.LeaseRequest( |
| 394 dimensions=machine_lease.mp_dimensions, |
| 395 # TODO(smut): Vary duration so machines don't expire all at once. |
| 396 duration=machine_lease.lease_duration_secs, |
| 397 on_lease=machine_provider.Instruction( |
| 398 swarming_server='https://%s' % ( |
| 399 app_identity.get_default_version_hostname())), |
| 400 request_id=machine_lease.client_request_id, |
| 401 ), |
| 402 ) |
| 403 |
| 404 if response.get('error'): |
| 405 handle_lease_request_error(machine_lease, response) |
| 406 return |
| 407 |
| 408 handle_lease_request_response(machine_lease, response) |
| 409 |
| 410 |
| 411 def manage_lease(key): |
| 412 """Manages a MachineLease. |
| 413 |
| 414 Args: |
| 415 key: ndb.Key for a MachineLease entity. |
| 416 """ |
| 417 machine_lease = key.get() |
| 418 if not machine_lease: |
| 419 return |
| 420 |
| 421 # Manage a leased machine. |
| 422 if machine_lease.lease_expiration_ts: |
| 423 if machine_lease.lease_expiration_ts <= utils.utcnow(): |
| 424 logging.info('MachineLease expired: %s', key) |
| 425 assert machine_lease.hostname, key |
| 426 clear_lease_request(key, machine_lease.client_request_id) |
| 427 return |
| 428 |
| 429 # Lease expiration time is unknown, so there must be no leased machine. |
| 430 assert not machine_lease.hostname, key |
| 431 |
| 432 # Manage a pending lease request. |
| 433 if machine_lease.client_request_id: |
| 434 manage_pending_lease_request(machine_lease) |
| 435 return |
| 436 |
| 437 # Manage an uninitiated lease request. |
| 438 if not machine_lease.drained: |
| 439 update_client_request_id(key) |
| 440 return |
| 441 |
| 442 # Manage an uninitiated, drained lease request. |
| 443 delete_machine_lease(key) |
| OLD | NEW |