Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. | 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Lease management for machines leased from the Machine Provider. | 5 """Lease management for machines leased from the Machine Provider. |
| 6 | 6 |
| 7 Keeps a list of machine types which should be leased from the Machine Provider | 7 Keeps a list of machine types which should be leased from the Machine Provider |
| 8 and the list of machines of each type currently leased. | 8 and the list of machines of each type currently leased. |
| 9 | 9 |
| 10 Swarming integration with Machine Provider | 10 Swarming integration with Machine Provider |
| 11 ========================================== | 11 ========================================== |
| 12 | 12 |
| 13 handlers_backend.py contains a cron job which looks at each machine type and | 13 handlers_backend.py contains a cron job which looks at each MachineType and |
| 14 ensures the number of active lease requests is equal to the target size | 14 ensures there are at least as many MachineLeases in the datastore which refer |
| 15 specified by the machine type. | 15 to that MachineType as the target_size in MachineType specifies by numbering |
| 16 them 0 through target_size - 1 If there are MachineType entities numbered | |
| 17 target_size or greater which refer to that MachineType, those MachineLeases | |
| 18 are marked as drained. | |
| 16 | 19 |
| 17 A lease request is considered active as soon as it is initiated. It remains | 20 Each MachineLease manages itself. A cron job in handlers_backend.py will trigger |
| 18 active while the lease request is pending and even after it is fulfilled. A | 21 self-management jobs for each entity. If there is no associated lease and the |
| 19 lease request is considered no longer active once the request is denied or | 22 MachineLease is not drained, issue a request to the Machine Provider for a |
| 20 the machine used to fulfill the request is reclaimed. | 23 matching machine. If there is an associated request, check the status of that |
| 24 request. If it is fulfilled, ensure the existence of a BotInfo entity (see | |
| 25 server/bot_management.py) corresponding to the machine provided for the lease. | |
| 26 Include the lease ID and lease_expiration_ts as fields in the BotInfo. If it | |
| 27 is expired, clear the associated lease. If there is no associated lease and | |
| 28 the MachineLease is drained, delete the MachineLease entity. | |
| 21 | 29 |
| 22 This scheme ensures that we never have more machines of a given type than we | 30 TODO(smut): If there is an associated request and the MachineLease is drained, |
| 23 want | 31 release the lease immediately (as long as the bot is not mid-task). |
| 24 | |
| 25 Each machine type is stored as a MachineType entity which describes the machine | |
| 26 in terms of Machine Provider Dimensions (not to be confused with Swarming's own | |
| 27 bot dimensions) as well as a target size which describes the number of machines | |
| 28 of this type we want connected at once. | |
| 29 | |
| 30 handlers_backend.py calls generate_lease_requests (below) to generate a number | |
| 31 of lease requests equal to the target size minus the current number of active | |
| 32 leases, then performs the RPC to send these requests to the Machine Provider, | |
| 33 and finally it calls update_leases (below) with the results of the lease | |
| 34 request to associate the new active leases with the machine type. | |
| 35 | |
| 36 In principle, the entire operation on a single machine type needs to happen | |
| 37 transactionally so we don't make duplicate requests and end up with more | |
| 38 machines than the target size wants. However, in practice we need the RPC | |
| 39 to be outside the transaction because it may be slow and it may need to be | |
| 40 retried, which could cause the entire transaction to time out without logging | |
| 41 the result of the call. Therefore the procedure is broken up into two | |
| 42 transactions with an RPC in between. | |
| 43 | |
| 44 With the transaction broken into two, the choice of request ID is the only way | |
| 45 to prevents duplicate lease requests (e.g. if generate_lease_requests runs twice | |
| 46 before update_leases gets to run). Machine Provider supports idempotent RPCs | |
| 47 so long as the client-generated request ID is the same. | |
| 48 | |
| 49 Therefore, we ensure generate_lease_requests always generates lease requests | |
| 50 with the same request ID until update_lease is called. This is done by keeping a | |
| 51 count of the number of requests generated so far. If the request count is n, | |
| 52 generate_lease_requests will always generate requests with IDs n + 1, n + 2, ... | |
| 53 up to n + target size - current size, where current size is the number of | |
| 54 currently active lease requests. | |
| 55 | |
| 56 update_lease takes the RPC result and updates the pending request entries. | |
| 57 | |
| 58 TODO(smut): Consider request count overflow. | |
| 59 """ | 32 """ |
| 60 | 33 |
| 61 import base64 | 34 import base64 |
| 62 import datetime | 35 import datetime |
| 63 import json | 36 import json |
| 64 import logging | 37 import logging |
| 65 | 38 |
| 39 from google.appengine.api import app_identity | |
| 66 from google.appengine.ext import ndb | 40 from google.appengine.ext import ndb |
| 67 from google.appengine.ext.ndb import msgprop | 41 from google.appengine.ext.ndb import msgprop |
| 68 | 42 |
| 69 from components import machine_provider | 43 from components import machine_provider |
| 70 from components import pubsub | 44 from components import pubsub |
| 71 from components import utils | 45 from components import utils |
| 72 from server import bot_management | 46 from server import bot_management |
| 73 | 47 |
| 74 | 48 |
| 75 # Name of the topic the Machine Provider is authorized to publish | 49 # Name of the topic the Machine Provider is authorized to publish |
| 76 # lease information to. | 50 # lease information to. |
| 77 PUBSUB_TOPIC = 'machine-provider' | 51 PUBSUB_TOPIC = 'machine-provider' |
| 78 | 52 |
| 79 # Name of the pull subscription to the Machine Provider topic. | 53 # Name of the pull subscription to the Machine Provider topic. |
| 80 PUBSUB_SUBSCRIPTION = 'machine-provider' | 54 PUBSUB_SUBSCRIPTION = 'machine-provider' |
| 81 | 55 |
| 82 | 56 |
| 83 class MachineLease(ndb.Model): | 57 class MachineLease(ndb.Model): |
| 84 """A lease request for a machine from the Machine Provider. | 58 """A lease request for a machine from the Machine Provider. |
| 85 | 59 |
| 86 Standalone MachineLease entities should not exist in the datastore. | 60 Key: |
| 61 id: A string in the form <machine type id>-<number>. | |
| 62 kind: MachineLease. Is a root entity. | |
| 87 """ | 63 """ |
| 88 # Request ID used to generate this request. | 64 # Request ID used to generate this request. |
| 89 client_request_id = ndb.StringProperty(required=True) | 65 client_request_id = ndb.StringProperty(indexed=True) |
| 66 # Whether or not this MachineLease should issue lease requests. | |
| 67 drained = ndb.BooleanProperty(indexed=True) | |
| 90 # Hostname of the machine currently allocated for this request. | 68 # Hostname of the machine currently allocated for this request. |
| 91 hostname = ndb.StringProperty() | 69 hostname = ndb.StringProperty() |
| 92 # Request hash returned by the server for the request for this machine. | 70 # Duration to lease for. |
| 93 request_hash = ndb.StringProperty() | 71 lease_duration_secs = ndb.IntegerProperty(indexed=False) |
| 94 # DateTime indicating lease expiration time. | 72 # DateTime indicating lease expiration time. |
| 95 lease_expiration_ts = ndb.DateTimeProperty() | 73 lease_expiration_ts = ndb.DateTimeProperty() |
| 74 # ndb.Key for the MachineType this MachineLease is created for. | |
| 75 machine_type = ndb.KeyProperty() | |
| 76 # machine_provider.Dimensions describing the machine. | |
| 77 mp_dimensions = msgprop.MessageProperty( | |
| 78 machine_provider.Dimensions, indexed=False) | |
| 79 # Last request number used. | |
| 80 request_count = ndb.IntegerProperty(default=0, required=True) | |
| 96 | 81 |
| 97 | 82 |
| 98 class MachineType(ndb.Model): | 83 class MachineType(ndb.Model): |
| 99 """A type of machine which should be leased from the Machine Provider. | 84 """A type of machine which should be leased from the Machine Provider. |
| 100 | 85 |
| 101 Key: | 86 Key: |
| 102 id: A human-readable name for this machine type. | 87 id: A human-readable name for this machine type. |
| 103 kind: MachineType. Is a root entity. | 88 kind: MachineType. Is a root entity. |
| 104 """ | 89 """ |
| 105 # Current number of active leases. | |
| 106 current_size = ndb.ComputedProperty(lambda self: len(self.leases)) | |
| 107 # Description of this machine type for humans. | 90 # Description of this machine type for humans. |
| 108 description = ndb.StringProperty(indexed=False) | 91 description = ndb.StringProperty(indexed=False) |
| 109 # Whether or not to attempt to lease machines of this type. | 92 # Whether or not to attempt to lease machines of this type. |
| 110 enabled = ndb.BooleanProperty(default=True) | 93 enabled = ndb.BooleanProperty(default=True) |
| 111 # Duration to lease each machine for. | 94 # Duration to lease each machine for. |
| 112 lease_duration_secs = ndb.IntegerProperty(indexed=False) | 95 lease_duration_secs = ndb.IntegerProperty(indexed=False) |
| 113 # List of active lease requests. | |
| 114 leases = ndb.LocalStructuredProperty(MachineLease, repeated=True) | |
| 115 # machine_provider.Dimensions describing the machine. | 96 # machine_provider.Dimensions describing the machine. |
| 116 mp_dimensions = msgprop.MessageProperty( | 97 mp_dimensions = msgprop.MessageProperty( |
| 117 machine_provider.Dimensions, indexed=False) | 98 machine_provider.Dimensions, indexed=False) |
| 118 # Number of bots pending deletion. | |
| 119 num_pending_deletion = ndb.ComputedProperty( | |
| 120 lambda self: len(self.pending_deletion)) | |
| 121 # List of hostnames whose leases have expired and should be deleted. | |
| 122 pending_deletion = ndb.StringProperty(indexed=False, repeated=True) | |
| 123 # Last request number used. | |
| 124 request_count = ndb.IntegerProperty(default=0, required=True) | |
| 125 # Target number of machines of this type to have leased at once. | 99 # Target number of machines of this type to have leased at once. |
| 126 target_size = ndb.IntegerProperty(indexed=False, required=True) | 100 target_size = ndb.IntegerProperty(indexed=False, required=True) |
| 127 | 101 |
| 128 | 102 |
| 129 def clean_up_bots(): | 103 @ndb.transactional_tasklet |
| 130 """Cleans up expired leases.""" | 104 def ensure_entity_exists(machine_type, n): |
| 131 # Maximum number of in-flight ndb.Futures. | 105 """Ensures the nth MachineLease for the given MachineType exists. |
| 132 MAX_IN_FLIGHT = 50 | 106 |
| 133 | 107 Args: |
| 134 bot_ids = [] | 108 machine_type: MachineType entity. |
| 135 deleted = {} | 109 n: The MachineLease index. |
| 136 for machine_type in MachineType.query(MachineType.num_pending_deletion > 0): | 110 """ |
| 137 logging.info( | 111 key = ndb.Key(MachineLease, '%s-%s' % (machine_type.key.id(), n)) |
| 138 'Deleting bots: %s', ', '.join(sorted(machine_type.pending_deletion))) | 112 machine_lease = yield key.get_async() |
| 139 bot_ids.extend(machine_type.pending_deletion) | 113 if machine_lease: |
| 140 deleted[machine_type.key] = machine_type.pending_deletion | 114 return |
| 141 | 115 |
| 142 # Generate a few asynchronous requests at a time in order to | 116 yield MachineLease( |
| 143 # prevent having too many in-flight ndb.Futures at a time. | 117 key=key, |
| 118 lease_duration_secs=machine_type.lease_duration_secs, | |
| 119 machine_type=machine_type.key, | |
| 120 mp_dimensions=machine_type.mp_dimensions, | |
| 121 ).put_async() | |
| 122 | |
| 123 | |
| 124 def ensure_entities_exist(max_concurrent=50): | |
| 125 """Ensures MachineLeases exist for each MachineType. | |
| 126 | |
| 127 Args: | |
| 128 max_concurrent: Maximum number of concurrent asynchronous requests. | |
| 129 """ | |
| 130 # Generate a few asynchronous requests at a time in order to prevent having | |
| 131 # too many in flight at a time. | |
| 144 futures = [] | 132 futures = [] |
| 145 while bot_ids: | 133 |
| 146 num_futures = len(futures) | 134 for machine_type in MachineType.query(MachineType.enabled == True): |
| 147 if num_futures < MAX_IN_FLIGHT: | 135 cursor = 0 |
| 148 keys = [bot_management.get_info_key(bot_id) | 136 while cursor < machine_type.target_size: |
| 149 for bot_id in bot_ids[:MAX_IN_FLIGHT - num_futures]] | 137 while len(futures) < max_concurrent and cursor < machine_type.target_size: |
| 150 bot_ids = bot_ids[MAX_IN_FLIGHT - num_futures:] | 138 futures.append(ensure_entity_exists(machine_type, cursor)) |
| 151 futures.extend(ndb.delete_multi_async(keys)) | 139 cursor += 1 |
| 152 | 140 ndb.Future.wait_any(futures) |
|
Vadim Sh.
2016/10/03 20:26:31
this doesn't check status code (e.g. if transactio
smut
2016/10/03 21:18:07
Done.
| |
| 153 ndb.Future.wait_any(futures) | 141 futures = [future for future in futures if not future.done()] |
| 154 futures = [future for future in futures if not future.done()] | |
| 155 | 142 |
| 156 if futures: | 143 if futures: |
| 157 ndb.Future.wait_all(futures) | 144 ndb.Future.wait_all(futures) |
| 158 | 145 |
| 159 # There should be relatively few MachineType entitites, so | 146 |
| 160 # just process them sequentially. | 147 def drain_excess(max_concurrent=50): |
| 161 # TODO(smut): Parallelize this. | 148 """Marks MachineLeases beyond what is needed by their MachineType as drained. |
| 162 for machine_key, hostnames in deleted.iteritems(): | 149 |
| 163 successfully_deleted = [] | 150 Args: |
| 164 for hostname in hostnames: | 151 max_concurrent: Maximum number of concurrent asynchronous requests. |
| 165 if bot_management.get_info_key(hostname).get(): | 152 """ |
| 166 logging.error('Failed to delete BotInfo: %s', hostname) | 153 futures = [] |
| 167 else: | 154 |
| 168 successfully_deleted.append(hostname) | 155 for machine_type in MachineType.query(): |
| 169 logging.info('Deleted bots: %s', ', '.join(sorted(successfully_deleted))) | 156 for machine_lease in MachineLease.query( |
|
Vadim Sh.
2016/10/03 20:26:32
I think it will be more efficient to do a single M
smut
2016/10/03 21:18:08
There's only one MachineType right now. Didn't do
| |
| 170 _clear_bots_pending_deletion(machine_key, hostnames) | 157 MachineLease.machine_type == machine_type.key, |
| 171 | 158 ): |
| 172 | 159 try: |
| 173 @ndb.transactional | 160 index = int(machine_lease.key.id().rsplit('-', 1)[-1]) |
| 174 def _clear_bots_pending_deletion(machine_type_key, hostnames): | 161 except ValueError: |
| 175 """Clears the list of bots pending deletion. | 162 logging.error( |
| 176 | 163 'MachineLease index could not be deciphered: %s', machine_lease.key) |
| 177 Args: | 164 continue |
| 178 machine_type_key: ndb.Key for a MachineType instance. | 165 # Drain MachineLeases where the MachineType is not enabled or the index |
| 179 hostnames: List of bots pending deletion. | 166 # exceeds the target_size given by the MachineType. Since MachineLeases |
| 180 """ | 167 # are created in contiguous blocks, only indices 0 through target_size - 1 |
| 181 machine_type = machine_type_key.get() | 168 # should exist. |
| 182 if not machine_type: | 169 if not machine_type.enabled or index >= machine_type.target_size: |
| 183 logging.error('MachineType no longer exists: %s', machine_type_key.id()) | 170 if len(futures) == max_concurrent: |
| 184 return | 171 ndb.Future.wait_any(futures) |
| 185 | 172 futures = [future for future in futures if not future.done()] |
| 186 num_pending_deletion = len(machine_type.pending_deletion) | 173 machine_lease.drained = True |
| 187 machine_type.pending_deletion = [ | 174 futures.append(machine_lease.put_async()) |
| 188 host for host in machine_type.pending_deletion if host not in hostnames] | 175 |
| 189 if len(machine_type.pending_deletion) != num_pending_deletion: | 176 if futures: |
| 190 machine_type.put() | 177 ndb.Future.wait_all(futures) |
| 191 | 178 |
| 192 | 179 |
| 193 @ndb.transactional | 180 def schedule_lease_management(): |
| 194 def generate_lease_requests(machine_type_key, app_id, swarming_server): | 181 """Schedules task queues to process each MachineLease.""" |
| 195 """Generates lease requests. | 182 for machine_lease in MachineLease.query(): |
| 196 | 183 # TODO(smut): Remove this check once migrated to the new format. |
| 197 The list includes new requests to lease machines up to the targeted | 184 if machine_lease.machine_type: |
| 198 size for the given machine type as well as requests to get the status | 185 if not utils.enqueue_task( |
| 199 of pending lease requests. | 186 '/internal/taskqueue/machine-provider-manage', |
| 200 | 187 'machine-provider-manage', |
| 201 Args: | 188 params={ |
| 202 machine_type_key: ndb.Key for a MachineType instance. | 189 'key': machine_lease.key.urlsafe(), |
| 203 app_id: ID of the application the requests originate from. | 190 }, |
| 204 swarming_server: URL for the Swarming server to connect to. | 191 ): |
| 205 | 192 logging.warning( |
| 206 Returns: | 193 'Failed to enqueue task for MachineLease: %s', machine_lease.key) |
| 207 A list of lease requests. | 194 |
| 208 """ | 195 |
| 209 machine_type = machine_type_key.get() | 196 @ndb.transactional |
| 210 if not machine_type: | 197 def clear_lease_request(key, request_id): |
| 211 logging.warning('MachineType no longer exists: %s', machine_type_key.id()) | 198 """Clears information about given lease request. |
| 212 return [] | 199 |
| 213 | 200 Args: |
| 214 expired_requests = _clean_up_expired_leases(machine_type) | 201 request_id: ID of the request to clear. |
| 215 lease_requests = _generate_lease_request_status_updates( | 202 """ |
| 216 machine_type, app_id, swarming_server) | 203 machine_lease = key.get() |
| 217 | 204 if not machine_lease: |
| 218 if not machine_type.enabled: | 205 logging.error('MachineLease does not exist: %s', key) |
| 219 logging.warning('MachineType is not enabled: %s\n', machine_type.key.id()) | 206 return |
| 220 return lease_requests | 207 |
| 221 if machine_type.current_size >= machine_type.target_size: | 208 if not machine_lease.client_request_id: |
| 222 logging.info( | 209 return |
| 223 'MachineType %s is at capacity: %d/%d', | 210 |
| 224 machine_type.key.id(), | 211 if request_id != machine_lease.client_request_id: |
| 225 machine_type.current_size, | 212 # Already cleared and incremented? |
| 226 machine_type.target_size, | 213 logging.warning( |
| 214 'Request ID mismatch for MachineLease: %s\nExpected: %s\nActual: %s', | |
| 215 key, | |
| 216 request_id, | |
| 217 machine_lease.client_request_id, | |
| 227 ) | 218 ) |
| 228 return lease_requests | 219 return |
| 229 | 220 |
| 230 new_requests = _generate_lease_requests_for_new_machines( | 221 machine_lease.client_request_id = None |
| 231 machine_type, app_id, swarming_server) | 222 machine_lease.hostname = None |
| 232 | 223 machine_lease.lease_expiration_ts = None |
| 233 if new_requests or expired_requests: | 224 machine_lease.put() |
| 234 machine_type.put() | 225 |
| 235 lease_requests.extend(new_requests) | 226 |
| 236 | 227 @ndb.transactional |
| 237 return lease_requests | 228 def log_lease_fulfillment(key, request_id, hostname, lease_expiration_ts): |
| 238 | 229 """Logs lease fulfillment. |
| 239 | 230 |
| 240 def _clean_up_expired_leases(machine_type): | 231 Args: |
| 241 """Cleans up expired leases. | 232 request_id: ID of the request being fulfilled. |
| 242 | 233 hostname: Hostname of the machine fulfilling the request. |
| 243 Prunes expired leases from machine_type.leases, | 234 lease_expiration_ts: UTC seconds since epoch when the lease expires. |
| 244 but does not write the result to the datastore. | 235 """ |
| 245 | 236 machine_lease = key.get() |
| 246 Args: | 237 if not machine_lease: |
| 247 machine_type: MachineType instance. | 238 logging.error('MachineLease does not exist: %s', key) |
| 248 | 239 return |
| 249 Returns: | 240 |
| 250 A list of leases that were removed. | 241 if request_id != machine_lease.client_request_id: |
| 251 """ | 242 logging.error( |
| 252 active = [] | 243 'Request ID mismatch for MachineLease: %s\nExpected: %s\nActual: %s', |
| 253 expired = [] | 244 machine_lease.client_request_id, |
| 254 | 245 request_id, |
| 255 for request in machine_type.leases: | 246 ) |
| 256 if request.hostname and request.lease_expiration_ts <= utils.utcnow(): | 247 return |
| 257 logging.warning( | 248 |
| 258 'Request ID %s expired:\nHostname: %s\nExpiration: %s', | 249 if (hostname == machine_lease.hostname |
| 259 request.client_request_id, | 250 and lease_expiration_ts == machine_lease.lease_expiration_ts): |
| 260 request.hostname, | 251 return |
| 261 request.lease_expiration_ts, | 252 |
| 262 ) | 253 machine_lease.hostname = hostname |
| 263 expired.append(request.hostname) | 254 machine_lease.lease_expiration_ts = datetime.datetime.utcfromtimestamp( |
| 264 else: | 255 lease_expiration_ts) |
| 265 active.append(request) | 256 machine_lease.put() |
| 266 | 257 |
| 267 machine_type.leases = active | 258 |
| 268 machine_type.pending_deletion.extend(expired) | 259 @ndb.transactional |
| 269 return expired | 260 def increment_lease_request_id(key): |
|
Vadim Sh.
2016/10/03 20:26:32
nit: update_client_request_id
It takes an entity
smut
2016/10/03 21:18:08
Done.
| |
| 270 | 261 """Increments the client request ID used to lease a machine. |
| 271 | 262 |
| 272 def _generate_lease_request_status_updates( | 263 Args: |
| 273 machine_type, app_id, swarming_server): | 264 key: ndb.Key for a MachineLease entity. |
| 274 """Generates status update requests for pending lease requests. | 265 """ |
| 275 | 266 machine_lease = key.get() |
| 276 Args: | 267 if not machine_lease: |
| 277 machine_type: MachineType instance. | 268 logging.error('MachineLease does not exist: %s', key) |
| 278 app_id: ID of the application the requests originate from. | 269 return |
| 279 swarming_server: URL for the Swarming server to connect to. | 270 |
| 280 | 271 if machine_lease.drained: |
| 281 Returns: | 272 logging.info('MachineLease is drained: %s', key) |
| 282 A list of lease requests. | 273 return |
| 283 """ | 274 |
| 284 lease_requests = [] | 275 if machine_lease.client_request_id: |
| 285 for request in machine_type.leases: | 276 return |
| 286 if not request.hostname: | 277 |
| 287 # We don't know the hostname yet, meaning this request is still pending. | 278 machine_lease.request_count += 1 |
| 288 lease_requests.append(machine_provider.LeaseRequest( | 279 machine_lease.client_request_id = '%s-%s-%s' % ( |
| 289 dimensions=machine_type.mp_dimensions, | 280 machine_lease.machine_type.id(), key.id(), machine_lease.request_count) |
| 290 duration=machine_type.lease_duration_secs, | 281 machine_lease.put() |
| 291 on_lease=machine_provider.Instruction( | 282 |
| 292 swarming_server=swarming_server), | 283 |
| 293 pubsub_project=app_id, | 284 @ndb.transactional |
| 294 pubsub_topic=PUBSUB_TOPIC, | 285 def delete_machine_lease(key): |
| 295 request_id=request.client_request_id, | 286 """Deletes the given MachineLease if it is drained and has no active lease. |
| 296 )) | 287 |
| 297 return lease_requests | 288 Args: |
| 298 | 289 key: ndb.Key for a MachineLease entity. |
| 299 | 290 """ |
| 300 def _generate_lease_requests_for_new_machines( | 291 machine_lease = key.get() |
| 301 machine_type, app_id, swarming_server): | 292 if not machine_lease: |
| 302 """Generates requests to lease machines up to the target. | 293 return |
| 303 | 294 |
| 304 Extends machine_type.leases by the number of new lease requests generated, | 295 if not machine_lease.drained: |
| 305 but does not write the result to the datastore. | 296 logging.warning('MachineLease not drained: %s', key) |
| 306 | 297 return |
| 307 Args: | 298 |
| 308 machine_type: MachineType instance. | 299 if machine_lease.client_request_id: |
|
Vadim Sh.
2016/10/03 20:26:31
is this possible if drained == True?
smut
2016/10/03 21:18:07
Yes, there's the case where the MachineLease alrea
| |
| 309 app_id: ID of the application the requests originate from. | 300 return |
| 310 swarming_server: URL for the Swarming server to connect to. | 301 |
| 311 | 302 key.delete() |
| 312 Returns: | 303 |
| 313 A list of lease requests. | 304 |
| 314 """ | 305 def manage_lease(key): |
| 315 lease_requests = [] | 306 """Manages a MachineLease. |
| 316 request_number = machine_type.request_count | 307 |
| 317 for _ in xrange(machine_type.target_size - machine_type.current_size): | 308 Args: |
| 318 request_number += 1 | 309 key: ndb.Key for a MachineLease entity. |
| 319 request_id = '%s-%d' % (machine_type.key.id(), request_number) | 310 """ |
| 320 lease_requests.append(machine_provider.LeaseRequest( | 311 machine_lease = key.get() |
| 321 dimensions=machine_type.mp_dimensions, | 312 if not machine_lease: |
| 322 duration=machine_type.lease_duration_secs, | 313 return |
| 323 on_lease=machine_provider.Instruction(swarming_server=swarming_server), | 314 |
| 324 pubsub_project=app_id, | 315 # Manage a leased machine. |
| 325 pubsub_topic=PUBSUB_TOPIC, | 316 if machine_lease.lease_expiration_ts: |
| 326 request_id=request_id, | 317 if machine_lease.lease_expiration_ts <= utils.utcnow(): |
| 327 )) | 318 logging.info('MachineLease expired: %s', key) |
| 328 machine_type.leases.append(MachineLease(client_request_id=request_id)) | 319 assert machine_lease.hostname, key |
| 329 machine_type.request_count = request_number | 320 clear_lease_request(key, machine_lease.client_request_id) |
| 330 return lease_requests | 321 return |
| 331 | 322 |
| 332 | 323 # Lease expiration time is unknown, so there must be no leased machine. |
| 333 @ndb.transactional | 324 assert not machine_lease.hostname, key |
| 334 def update_leases(machine_type_key, responses): | 325 |
| 335 """Updates the states of leases of the given machine types. | 326 # Manage a pending lease request. |
| 336 | 327 if machine_lease.client_request_id: |
|
Vadim Sh.
2016/10/03 20:26:31
can the body of this 'if' be converted into a sepa
smut
2016/10/03 21:18:07
Done.
| |
| 337 Args: | 328 response = machine_provider.lease_machine( |
| 338 machine_type_key: ndb.Key for a MachineType instance. | 329 machine_provider.LeaseRequest( |
| 339 responses: machine_provider.BatchedLeaseResponse instance. | 330 dimensions=machine_lease.mp_dimensions, |
| 340 """ | 331 duration=machine_lease.lease_duration_secs, |
|
Vadim Sh.
2016/10/03 20:26:32
I really really wish we can randomize this a bit t
smut
2016/10/03 21:18:07
Your suggestion of varying with +/- 10% seems reas
Vadim Sh.
2016/10/03 21:34:18
Instead of random() you can use some hash-like fun
smut
2016/10/03 21:41:50
random.seed(machine_lease.client_request_id)
rando
Vadim Sh.
2016/10/03 21:46:43
Sort of, except random.seed will modify global see
| |
| 341 machine_type = machine_type_key.get() | 332 on_lease=machine_provider.Instruction( |
| 342 if not machine_type: | 333 swarming_server='https://%s' % ( |
| 343 logging.warning('MachineType no longer exists: %s', machine_type_key) | 334 app_identity.get_default_version_hostname())), |
| 344 return | 335 request_id=machine_lease.client_request_id, |
| 345 | 336 ), |
| 346 lease_request_map = { | 337 ) |
| 347 request.client_request_id: request for request in machine_type.leases | |
| 348 } | |
| 349 for response in responses: | |
| 350 request_id = response['client_request_id'] | |
| 351 request = lease_request_map.get(request_id) | |
| 352 if not request: | |
| 353 logging.error('Unknown request ID: %s', request_id) | |
| 354 continue | |
| 355 | 338 |
| 356 if response.get('error'): | 339 if response.get('error'): |
| 357 error = machine_provider.LeaseRequestError.lookup_by_name( | 340 error = machine_provider.LeaseRequestError.lookup_by_name( |
| 358 response['error']) | 341 response['error']) |
| 359 if error in ( | 342 if error in ( |
| 360 machine_provider.LeaseRequestError.DEADLINE_EXCEEDED, | 343 machine_provider.LeaseRequestError.DEADLINE_EXCEEDED, |
| 361 machine_provider.LeaseRequestError.TRANSIENT_ERROR, | 344 machine_provider.LeaseRequestError.TRANSIENT_ERROR, |
| 362 ): | 345 ): |
| 363 # Retryable errors. Just try again later. | |
| 364 logging.warning( | 346 logging.warning( |
| 365 'Request not processed, trying again later: %s', request_id) | 347 'Transient failure: %s\nRequest ID: %s\nError: %s', |
| 348 key, | |
| 349 request_id, | |
| 350 response['error'], | |
| 351 ) | |
| 366 else: | 352 else: |
| 367 # TODO(smut): Handle specific errors. | 353 logging.error( |
| 368 logging.warning( | 354 'Lease request failed: %s\nRequest ID: %s\nError: %s', |
|
Vadim Sh.
2016/10/03 20:26:32
nit: "Lease request failed\n%s...."
it will make
smut
2016/10/03 21:18:07
Done.
| |
| 369 'Error %s for request ID %s', | 355 key, |
| 356 request_id, | |
| 370 response['error'], | 357 response['error'], |
| 371 request_id, | |
| 372 ) | 358 ) |
| 373 lease_request_map.pop(request_id) | 359 clear_lease_request(key, machine_lease.client_request_id) |
| 374 else: | 360 return |
| 375 request.request_hash = response['request_hash'] | 361 |
| 376 state = machine_provider.LeaseRequestState.lookup_by_name( | 362 state = machine_provider.LeaseRequestState.lookup_by_name(response['state']) |
| 377 response['state']) | 363 if state == machine_provider.LeaseRequestState.FULFILLED: |
| 378 if state == machine_provider.LeaseRequestState.DENIED: | 364 if not response.get('hostname'): |
| 379 logging.warning('Request ID denied: %s', request_id) | 365 # Lease has already expired. This shouldn't happen, but it indicates the |
| 380 lease_request_map.pop(request_id) | 366 # lease expired faster than we could tell it even got fulfilled. |
| 381 elif state == machine_provider.LeaseRequestState.FULFILLED: | 367 logging.error( |
| 382 if response.get('hostname'): | 368 'Request expired: %s\nRequest ID:%s\nExpired: %s', |
|
Vadim Sh.
2016/10/03 20:26:31
same here
smut
2016/10/03 21:18:07
Done.
| |
| 383 logging.info( | 369 key, |
| 384 'Request ID %s fulfilled:\nHostname: %s\nExpiration: %s', | 370 machine_lease.client_request_id, |
| 385 request_id, | 371 response['lease_expiration_ts'], |
| 386 response['hostname'], | 372 ) |
| 387 response['lease_expiration_ts'], | 373 clear_lease_request(key, machine_lease.client_request_id) |
| 388 ) | |
| 389 request.hostname = response['hostname'] | |
| 390 request.lease_expiration_ts = datetime.datetime.utcfromtimestamp( | |
| 391 int(response['lease_expiration_ts'])) | |
| 392 else: | |
| 393 # Lease expired. This shouldn't happen, because it means we had a | |
| 394 # pending request which was fulfilled and expired before we were | |
| 395 # able to check its status. | |
| 396 logging.warning('Request ID fulfilled and expired: %s', request_id) | |
| 397 lease_request_map.pop(request_id) | |
| 398 else: | 374 else: |
| 399 # Lease request isn't processed yet. Just try again later. | |
| 400 logging.info( | 375 logging.info( |
| 401 'Request ID %s in state: %s', request_id, response['state']) | 376 'Request fulfilled: %s\nRequest ID: %s\nHostname: %s\nExpires: %s', |
| 377 key, | |
| 378 machine_lease.client_request_id, | |
| 379 response['hostname'], | |
| 380 response['lease_expiration_ts'], | |
| 381 ) | |
| 382 log_lease_fulfillment( | |
| 383 key, | |
| 384 machine_lease.client_request_id, | |
| 385 response['hostname'], | |
| 386 int(response['lease_expiration_ts']), | |
| 387 ) | |
| 388 # TODO(smut): Create BotInfo, associate lease information. | |
|
Vadim Sh.
2016/10/03 20:26:32
yep, in the same transaction
smut
2016/10/03 21:18:07
xg in log_lease_fulfillment?
Vadim Sh.
2016/10/03 21:34:18
Yes. Assuming 'manage_lease' is idempotent and wil
smut
2016/10/03 21:41:50
Yes should be idempotent.
| |
| 389 elif state == machine_provider.LeaseRequestState.DENIED: | |
| 390 logging.warning( | |
| 391 'Request denied: %s\nRequest ID: %s', | |
| 392 key, | |
| 393 machine_lease.client_request_id, | |
| 394 ) | |
| 395 clear_lease_request(key, machine_lease.client_request_id) | |
| 396 return | |
| 402 | 397 |
| 403 machine_type.leases = sorted( | 398 # Manage an uninitiated lease request. |
| 404 lease_request_map.values(), key=lambda lease: lease.client_request_id) | 399 if not machine_lease.drained: |
| 405 machine_type.put() | 400 increment_lease_request_id(key) |
| 406 | 401 |
| 407 | 402 # Manage an uninitiated, drained lease request. |
| 408 @ndb.tasklet | 403 delete_machine_lease(key) |
|
Vadim Sh.
2016/10/03 20:26:32
shouldn't it be in 'else' section of 'if not machi
smut
2016/10/03 21:18:07
Er, yes. I just added return above under the if so
| |
| 409 def process_message(message, subscription): | |
| 410 """Processes a Pub/Sub message from the Machine Provider. | |
| 411 | |
| 412 Args: | |
| 413 message: A message dict as returned by pubsub.pull. | |
| 414 subscription: Full name of the subscription the message was received on. | |
| 415 """ | |
| 416 now = utils.utcnow() | |
| 417 | |
| 418 try: | |
| 419 data = base64.b64decode(message.get('message', {}).get('data', '')) | |
| 420 lease_response = json.loads(data) | |
| 421 # Machine Provider sends a response including lease_expiration_ts. If | |
| 422 # lease_expiration_ts is not in the future and there is no more hostname | |
| 423 # associated with the response then this is a reclamation message. | |
| 424 lease_expiration_ts = datetime.datetime.utcfromtimestamp( | |
| 425 int(lease_response['lease_expiration_ts'])) | |
| 426 if lease_expiration_ts <= now and not lease_response.get('hostname'): | |
| 427 # We used request IDs of the form MachineType.key.id()-<integer>. | |
| 428 # Extract the ID for the MachineType from the request ID. | |
| 429 machine_type_id = lease_response['client_request_id'].rsplit('-', 1)[0] | |
| 430 logging.info('Lease ID %s is expired\nMachineType: %s', | |
| 431 lease_response['client_request_id'], | |
| 432 MachineType.get_by_id(machine_type_id)) | |
| 433 except (TypeError, ValueError): | |
| 434 logging.error('Received unexpected Pub/Sub message:\n%s', | |
| 435 json.dumps(message, indent=2)) | |
| 436 | |
| 437 yield pubsub.ack_async(subscription, message['ackId']) | |
| 438 | |
| 439 | |
| 440 def process_pubsub(app_id): | |
| 441 """Processes Pub/Sub messages from the Machine Provider, if there are any. | |
| 442 | |
| 443 Args: | |
| 444 app_id: ID of the application where the Pub/Sub subscription exists. | |
| 445 """ | |
| 446 MAX_IN_FLIGHT = 50 | |
| 447 subscription = pubsub.full_subscription_name(app_id, PUBSUB_SUBSCRIPTION) | |
| 448 response = pubsub.pull(subscription) | |
| 449 logging.info('%s', response) | |
| 450 | |
| 451 futures = [] | |
| 452 messages = response.get('receivedMessages', []) | |
| 453 while messages: | |
| 454 num_futures = len(futures) | |
| 455 if num_futures < MAX_IN_FLIGHT: | |
| 456 futures.extend( | |
| 457 process_message(message, subscription) | |
| 458 for message in messages[:MAX_IN_FLIGHT - num_futures]) | |
| 459 messages = messages[MAX_IN_FLIGHT - num_futures:] | |
| 460 ndb.Future.wait_any(futures) | |
| 461 futures = [future for future in futures if not future.done()] | |
| 462 if futures: | |
| 463 ndb.Future.wait_all(futures) | |
| OLD | NEW |