| Index: appengine/swarming/server/lease_management.py
|
| diff --git a/appengine/swarming/server/lease_management.py b/appengine/swarming/server/lease_management.py
|
| index 9bd20413669fce4c62a68481280e28cd03f2a679..2c3e87d9b66e87d5d40f38f94c5de2459d4146a5 100644
|
| --- a/appengine/swarming/server/lease_management.py
|
| +++ b/appengine/swarming/server/lease_management.py
|
| @@ -10,52 +10,25 @@ and the list of machines of each type currently leased.
|
| Swarming integration with Machine Provider
|
| ==========================================
|
|
|
| -handlers_backend.py contains a cron job which looks at each machine type and
|
| -ensures the number of active lease requests is equal to the target size
|
| -specified by the machine type.
|
| -
|
| -A lease request is considered active as soon as it is initiated. It remains
|
| -active while the lease request is pending and even after it is fulfilled. A
|
| -lease request is considered no longer active once the request is denied or
|
| -the machine used to fulfill the request is reclaimed.
|
| -
|
| -This scheme ensures that we never have more machines of a given type than we
|
| -want
|
| -
|
| -Each machine type is stored as a MachineType entity which describes the machine
|
| -in terms of Machine Provider Dimensions (not to be confused with Swarming's own
|
| -bot dimensions) as well as a target size which describes the number of machines
|
| -of this type we want connected at once.
|
| -
|
| -handlers_backend.py calls generate_lease_requests (below) to generate a number
|
| -of lease requests equal to the target size minus the current number of active
|
| -leases, then performs the RPC to send these requests to the Machine Provider,
|
| -and finally it calls update_leases (below) with the results of the lease
|
| -request to associate the new active leases with the machine type.
|
| -
|
| -In principle, the entire operation on a single machine type needs to happen
|
| -transactionally so we don't make duplicate requests and end up with more
|
| -machines than the target size wants. However, in practice we need the RPC
|
| -to be outside the transaction because it may be slow and it may need to be
|
| -retried, which could cause the entire transaction to time out without logging
|
| -the result of the call. Therefore the procedure is broken up into two
|
| -transactions with an RPC in between.
|
| -
|
| -With the transaction broken into two, the choice of request ID is the only way
|
| -to prevents duplicate lease requests (e.g. if generate_lease_requests runs twice
|
| -before update_leases gets to run). Machine Provider supports idempotent RPCs
|
| -so long as the client-generated request ID is the same.
|
| -
|
| -Therefore, we ensure generate_lease_requests always generates lease requests
|
| -with the same request ID until update_lease is called. This is done by keeping a
|
| -count of the number of requests generated so far. If the request count is n,
|
| -generate_lease_requests will always generate requests with IDs n + 1, n + 2, ...
|
| -up to n + target size - current size, where current size is the number of
|
| -currently active lease requests.
|
| -
|
| -update_lease takes the RPC result and updates the pending request entries.
|
| -
|
| -TODO(smut): Consider request count overflow.
|
| +handlers_backend.py contains a cron job which looks at each MachineType and
|
| +ensures there are at least as many MachineLeases in the datastore which refer
|
| +to that MachineType as the target_size in MachineType specifies by numbering
|
| +them 0 through target_size - 1 If there are MachineType entities numbered
|
| +target_size or greater which refer to that MachineType, those MachineLeases
|
| +are marked as drained.
|
| +
|
| +Each MachineLease manages itself. A cron job in handlers_backend.py will trigger
|
| +self-management jobs for each entity. If there is no associated lease and the
|
| +MachineLease is not drained, issue a request to the Machine Provider for a
|
| +matching machine. If there is an associated request, check the status of that
|
| +request. If it is fulfilled, ensure the existence of a BotInfo entity (see
|
| +server/bot_management.py) corresponding to the machine provided for the lease.
|
| +Include the lease ID and lease_expiration_ts as fields in the BotInfo. If it
|
| +is expired, clear the associated lease. If there is no associated lease and
|
| +the MachineLease is drained, delete the MachineLease entity.
|
| +
|
| +TODO(smut): If there is an associated request and the MachineLease is drained,
|
| +release the lease immediately (as long as the bot is not mid-task).
|
| """
|
|
|
| import base64
|
| @@ -63,6 +36,7 @@ import datetime
|
| import json
|
| import logging
|
|
|
| +from google.appengine.api import app_identity
|
| from google.appengine.ext import ndb
|
| from google.appengine.ext.ndb import msgprop
|
|
|
| @@ -83,16 +57,27 @@ PUBSUB_SUBSCRIPTION = 'machine-provider'
|
| class MachineLease(ndb.Model):
|
| """A lease request for a machine from the Machine Provider.
|
|
|
| - Standalone MachineLease entities should not exist in the datastore.
|
| + Key:
|
| + id: A string in the form <machine type id>-<number>.
|
| + kind: MachineLease. Is a root entity.
|
| """
|
| # Request ID used to generate this request.
|
| - client_request_id = ndb.StringProperty(required=True)
|
| + client_request_id = ndb.StringProperty(indexed=True)
|
| + # Whether or not this MachineLease should issue lease requests.
|
| + drained = ndb.BooleanProperty(indexed=True)
|
| # Hostname of the machine currently allocated for this request.
|
| hostname = ndb.StringProperty()
|
| - # Request hash returned by the server for the request for this machine.
|
| - request_hash = ndb.StringProperty()
|
| + # Duration to lease for.
|
| + lease_duration_secs = ndb.IntegerProperty(indexed=False)
|
| # DateTime indicating lease expiration time.
|
| lease_expiration_ts = ndb.DateTimeProperty()
|
| + # ndb.Key for the MachineType this MachineLease is created for.
|
| + machine_type = ndb.KeyProperty()
|
| + # machine_provider.Dimensions describing the machine.
|
| + mp_dimensions = msgprop.MessageProperty(
|
| + machine_provider.Dimensions, indexed=False)
|
| + # Last request number used.
|
| + request_count = ndb.IntegerProperty(default=0, required=True)
|
|
|
|
|
| class MachineType(ndb.Model):
|
| @@ -102,362 +87,357 @@ class MachineType(ndb.Model):
|
| id: A human-readable name for this machine type.
|
| kind: MachineType. Is a root entity.
|
| """
|
| - # Current number of active leases.
|
| - current_size = ndb.ComputedProperty(lambda self: len(self.leases))
|
| # Description of this machine type for humans.
|
| description = ndb.StringProperty(indexed=False)
|
| # Whether or not to attempt to lease machines of this type.
|
| enabled = ndb.BooleanProperty(default=True)
|
| # Duration to lease each machine for.
|
| lease_duration_secs = ndb.IntegerProperty(indexed=False)
|
| - # List of active lease requests.
|
| - leases = ndb.LocalStructuredProperty(MachineLease, repeated=True)
|
| # machine_provider.Dimensions describing the machine.
|
| mp_dimensions = msgprop.MessageProperty(
|
| machine_provider.Dimensions, indexed=False)
|
| - # Number of bots pending deletion.
|
| - num_pending_deletion = ndb.ComputedProperty(
|
| - lambda self: len(self.pending_deletion))
|
| - # List of hostnames whose leases have expired and should be deleted.
|
| - pending_deletion = ndb.StringProperty(indexed=False, repeated=True)
|
| - # Last request number used.
|
| - request_count = ndb.IntegerProperty(default=0, required=True)
|
| # Target number of machines of this type to have leased at once.
|
| target_size = ndb.IntegerProperty(indexed=False, required=True)
|
|
|
|
|
| -def clean_up_bots():
|
| - """Cleans up expired leases."""
|
| - # Maximum number of in-flight ndb.Futures.
|
| - MAX_IN_FLIGHT = 50
|
| +@ndb.transactional_tasklet
|
| +def ensure_entity_exists(machine_type, n):
|
| + """Ensures the nth MachineLease for the given MachineType exists.
|
| +
|
| + Args:
|
| + machine_type: MachineType entity.
|
| + n: The MachineLease index.
|
| + """
|
| + key = ndb.Key(MachineLease, '%s-%s' % (machine_type.key.id(), n))
|
| + machine_lease = yield key.get_async()
|
| + if machine_lease:
|
| + return
|
| +
|
| + yield MachineLease(
|
| + key=key,
|
| + lease_duration_secs=machine_type.lease_duration_secs,
|
| + machine_type=machine_type.key,
|
| + mp_dimensions=machine_type.mp_dimensions,
|
| + ).put_async()
|
| +
|
|
|
| - bot_ids = []
|
| - deleted = {}
|
| - for machine_type in MachineType.query(MachineType.num_pending_deletion > 0):
|
| - logging.info(
|
| - 'Deleting bots: %s', ', '.join(sorted(machine_type.pending_deletion)))
|
| - bot_ids.extend(machine_type.pending_deletion)
|
| - deleted[machine_type.key] = machine_type.pending_deletion
|
| +def ensure_entities_exist(max_concurrent=50):
|
| + """Ensures MachineLeases exist for each MachineType.
|
|
|
| - # Generate a few asynchronous requests at a time in order to
|
| - # prevent having too many in-flight ndb.Futures at a time.
|
| + Args:
|
| + max_concurrent: Maximum number of concurrent asynchronous requests.
|
| + """
|
| + # Generate a few asynchronous requests at a time in order to prevent having
|
| + # too many in flight at a time.
|
| futures = []
|
| - while bot_ids:
|
| - num_futures = len(futures)
|
| - if num_futures < MAX_IN_FLIGHT:
|
| - keys = [bot_management.get_info_key(bot_id)
|
| - for bot_id in bot_ids[:MAX_IN_FLIGHT - num_futures]]
|
| - bot_ids = bot_ids[MAX_IN_FLIGHT - num_futures:]
|
| - futures.extend(ndb.delete_multi_async(keys))
|
|
|
| - ndb.Future.wait_any(futures)
|
| - futures = [future for future in futures if not future.done()]
|
| + for machine_type in MachineType.query(MachineType.enabled == True):
|
| + cursor = 0
|
| + while cursor < machine_type.target_size:
|
| + while len(futures) < max_concurrent and cursor < machine_type.target_size:
|
| + futures.append(ensure_entity_exists(machine_type, cursor))
|
| + cursor += 1
|
| + ndb.Future.wait_any(futures)
|
| + # We don't bother checking success or failure. If a transient error
|
| + # like TransactionFailed or DeadlineExceeded is raised and an entity
|
| + # is not created, we will just create it the next time this is called,
|
| + # converging to the desired state eventually.
|
| + futures = [future for future in futures if not future.done()]
|
|
|
| if futures:
|
| ndb.Future.wait_all(futures)
|
|
|
| - # There should be relatively few MachineType entitites, so
|
| - # just process them sequentially.
|
| - # TODO(smut): Parallelize this.
|
| - for machine_key, hostnames in deleted.iteritems():
|
| - successfully_deleted = []
|
| - for hostname in hostnames:
|
| - if bot_management.get_info_key(hostname).get():
|
| - logging.error('Failed to delete BotInfo: %s', hostname)
|
| - else:
|
| - successfully_deleted.append(hostname)
|
| - logging.info('Deleted bots: %s', ', '.join(sorted(successfully_deleted)))
|
| - _clear_bots_pending_deletion(machine_key, hostnames)
|
| +
|
| +def drain_excess(max_concurrent=50):
|
| + """Marks MachineLeases beyond what is needed by their MachineType as drained.
|
| +
|
| + Args:
|
| + max_concurrent: Maximum number of concurrent asynchronous requests.
|
| + """
|
| + futures = []
|
| +
|
| + for machine_type in MachineType.query():
|
| + for machine_lease in MachineLease.query(
|
| + MachineLease.machine_type == machine_type.key,
|
| + ):
|
| + try:
|
| + index = int(machine_lease.key.id().rsplit('-', 1)[-1])
|
| + except ValueError:
|
| + logging.error(
|
| + 'MachineLease index could not be deciphered\n Key: %s',
|
| + machine_lease.key,
|
| + )
|
| + continue
|
| + # Drain MachineLeases where the MachineType is not enabled or the index
|
| + # exceeds the target_size given by the MachineType. Since MachineLeases
|
| + # are created in contiguous blocks, only indices 0 through target_size - 1
|
| + # should exist.
|
| + if not machine_type.enabled or index >= machine_type.target_size:
|
| + if len(futures) == max_concurrent:
|
| + ndb.Future.wait_any(futures)
|
| + futures = [future for future in futures if not future.done()]
|
| + machine_lease.drained = True
|
| + futures.append(machine_lease.put_async())
|
| +
|
| + if futures:
|
| + ndb.Future.wait_all(futures)
|
| +
|
| +
|
| +def schedule_lease_management():
|
| + """Schedules task queues to process each MachineLease."""
|
| + for machine_lease in MachineLease.query():
|
| + # TODO(smut): Remove this check once migrated to the new format.
|
| + if machine_lease.machine_type:
|
| + if not utils.enqueue_task(
|
| + '/internal/taskqueue/machine-provider-manage',
|
| + 'machine-provider-manage',
|
| + params={
|
| + 'key': machine_lease.key.urlsafe(),
|
| + },
|
| + ):
|
| + logging.warning(
|
| + 'Failed to enqueue task for MachineLease: %s', machine_lease.key)
|
|
|
|
|
| @ndb.transactional
|
| -def _clear_bots_pending_deletion(machine_type_key, hostnames):
|
| - """Clears the list of bots pending deletion.
|
| +def clear_lease_request(key, request_id):
|
| + """Clears information about given lease request.
|
|
|
| Args:
|
| - machine_type_key: ndb.Key for a MachineType instance.
|
| - hostnames: List of bots pending deletion.
|
| + request_id: ID of the request to clear.
|
| """
|
| - machine_type = machine_type_key.get()
|
| - if not machine_type:
|
| - logging.error('MachineType no longer exists: %s', machine_type_key.id())
|
| + machine_lease = key.get()
|
| + if not machine_lease:
|
| + logging.error('MachineLease does not exist\nKey: %s', key)
|
| return
|
|
|
| - num_pending_deletion = len(machine_type.pending_deletion)
|
| - machine_type.pending_deletion = [
|
| - host for host in machine_type.pending_deletion if host not in hostnames]
|
| - if len(machine_type.pending_deletion) != num_pending_deletion:
|
| - machine_type.put()
|
| + if not machine_lease.client_request_id:
|
| + return
|
|
|
| + if request_id != machine_lease.client_request_id:
|
| + # Already cleared and incremented?
|
| + logging.warning(
|
| + 'Request ID mismatch for MachineLease: %s\nExpected: %s\nActual: %s',
|
| + key,
|
| + request_id,
|
| + machine_lease.client_request_id,
|
| + )
|
| + return
|
|
|
| -@ndb.transactional
|
| -def generate_lease_requests(machine_type_key, app_id, swarming_server):
|
| - """Generates lease requests.
|
| + machine_lease.client_request_id = None
|
| + machine_lease.hostname = None
|
| + machine_lease.lease_expiration_ts = None
|
| + machine_lease.put()
|
|
|
| - The list includes new requests to lease machines up to the targeted
|
| - size for the given machine type as well as requests to get the status
|
| - of pending lease requests.
|
|
|
| - Args:
|
| - machine_type_key: ndb.Key for a MachineType instance.
|
| - app_id: ID of the application the requests originate from.
|
| - swarming_server: URL for the Swarming server to connect to.
|
| +@ndb.transactional
|
| +def log_lease_fulfillment(key, request_id, hostname, lease_expiration_ts):
|
| + """Logs lease fulfillment.
|
|
|
| - Returns:
|
| - A list of lease requests.
|
| + Args:
|
| + request_id: ID of the request being fulfilled.
|
| + hostname: Hostname of the machine fulfilling the request.
|
| + lease_expiration_ts: UTC seconds since epoch when the lease expires.
|
| """
|
| - machine_type = machine_type_key.get()
|
| - if not machine_type:
|
| - logging.warning('MachineType no longer exists: %s', machine_type_key.id())
|
| - return []
|
| -
|
| - expired_requests = _clean_up_expired_leases(machine_type)
|
| - lease_requests = _generate_lease_request_status_updates(
|
| - machine_type, app_id, swarming_server)
|
| -
|
| - if not machine_type.enabled:
|
| - logging.warning('MachineType is not enabled: %s\n', machine_type.key.id())
|
| - return lease_requests
|
| - if machine_type.current_size >= machine_type.target_size:
|
| - logging.info(
|
| - 'MachineType %s is at capacity: %d/%d',
|
| - machine_type.key.id(),
|
| - machine_type.current_size,
|
| - machine_type.target_size,
|
| - )
|
| - return lease_requests
|
| -
|
| - new_requests = _generate_lease_requests_for_new_machines(
|
| - machine_type, app_id, swarming_server)
|
| + machine_lease = key.get()
|
| + if not machine_lease:
|
| + logging.error('MachineLease does not exist\nKey: %s', key)
|
| + return
|
|
|
| - if new_requests or expired_requests:
|
| - machine_type.put()
|
| - lease_requests.extend(new_requests)
|
| + if request_id != machine_lease.client_request_id:
|
| + logging.error(
|
| + 'Request ID mismatch\nKey: %s\nExpected: %s\nActual: %s',
|
| + key,
|
| + machine_lease.client_request_id,
|
| + request_id,
|
| + )
|
| + return
|
|
|
| - return lease_requests
|
| + if (hostname == machine_lease.hostname
|
| + and lease_expiration_ts == machine_lease.lease_expiration_ts):
|
| + return
|
|
|
| + machine_lease.hostname = hostname
|
| + machine_lease.lease_expiration_ts = datetime.datetime.utcfromtimestamp(
|
| + lease_expiration_ts)
|
| + machine_lease.put()
|
|
|
| -def _clean_up_expired_leases(machine_type):
|
| - """Cleans up expired leases.
|
|
|
| - Prunes expired leases from machine_type.leases,
|
| - but does not write the result to the datastore.
|
| +@ndb.transactional
|
| +def update_client_request_id(key):
|
| + """Sets the client request ID used to lease a machine.
|
|
|
| Args:
|
| - machine_type: MachineType instance.
|
| -
|
| - Returns:
|
| - A list of leases that were removed.
|
| + key: ndb.Key for a MachineLease entity.
|
| """
|
| - active = []
|
| - expired = []
|
| -
|
| - for request in machine_type.leases:
|
| - if request.hostname and request.lease_expiration_ts <= utils.utcnow():
|
| - logging.warning(
|
| - 'Request ID %s expired:\nHostname: %s\nExpiration: %s',
|
| - request.client_request_id,
|
| - request.hostname,
|
| - request.lease_expiration_ts,
|
| - )
|
| - expired.append(request.hostname)
|
| - else:
|
| - active.append(request)
|
| + machine_lease = key.get()
|
| + if not machine_lease:
|
| + logging.error('MachineLease does not exist\nKey: %s', key)
|
| + return
|
| +
|
| + if machine_lease.drained:
|
| + logging.info('MachineLease is drained\nKey: %s', key)
|
| + return
|
|
|
| - machine_type.leases = active
|
| - machine_type.pending_deletion.extend(expired)
|
| - return expired
|
| + if machine_lease.client_request_id:
|
| + return
|
|
|
| + machine_lease.request_count += 1
|
| + machine_lease.client_request_id = '%s-%s-%s' % (
|
| + machine_lease.machine_type.id(), key.id(), machine_lease.request_count)
|
| + machine_lease.put()
|
|
|
| -def _generate_lease_request_status_updates(
|
| - machine_type, app_id, swarming_server):
|
| - """Generates status update requests for pending lease requests.
|
|
|
| - Args:
|
| - machine_type: MachineType instance.
|
| - app_id: ID of the application the requests originate from.
|
| - swarming_server: URL for the Swarming server to connect to.
|
| +@ndb.transactional
|
| +def delete_machine_lease(key):
|
| + """Deletes the given MachineLease if it is drained and has no active lease.
|
|
|
| - Returns:
|
| - A list of lease requests.
|
| + Args:
|
| + key: ndb.Key for a MachineLease entity.
|
| """
|
| - lease_requests = []
|
| - for request in machine_type.leases:
|
| - if not request.hostname:
|
| - # We don't know the hostname yet, meaning this request is still pending.
|
| - lease_requests.append(machine_provider.LeaseRequest(
|
| - dimensions=machine_type.mp_dimensions,
|
| - duration=machine_type.lease_duration_secs,
|
| - on_lease=machine_provider.Instruction(
|
| - swarming_server=swarming_server),
|
| - pubsub_project=app_id,
|
| - pubsub_topic=PUBSUB_TOPIC,
|
| - request_id=request.client_request_id,
|
| - ))
|
| - return lease_requests
|
| + machine_lease = key.get()
|
| + if not machine_lease:
|
| + return
|
| +
|
| + if not machine_lease.drained:
|
| + logging.warning('MachineLease not drained: %s', key)
|
| + return
|
|
|
| + if machine_lease.client_request_id:
|
| + return
|
|
|
| -def _generate_lease_requests_for_new_machines(
|
| - machine_type, app_id, swarming_server):
|
| - """Generates requests to lease machines up to the target.
|
| + key.delete()
|
|
|
| - Extends machine_type.leases by the number of new lease requests generated,
|
| - but does not write the result to the datastore.
|
|
|
| - Args:
|
| - machine_type: MachineType instance.
|
| - app_id: ID of the application the requests originate from.
|
| - swarming_server: URL for the Swarming server to connect to.
|
| +def handle_lease_request_error(machine_lease, response):
|
| + """Handles an error in the lease request response from Machine Provider.
|
|
|
| - Returns:
|
| - A list of lease requests.
|
| + Args:
|
| + machine_lease: MachineLease instance.
|
| + response: Response returned by components.machine_provider.lease_machine.
|
| """
|
| - lease_requests = []
|
| - request_number = machine_type.request_count
|
| - for _ in xrange(machine_type.target_size - machine_type.current_size):
|
| - request_number += 1
|
| - request_id = '%s-%d' % (machine_type.key.id(), request_number)
|
| - lease_requests.append(machine_provider.LeaseRequest(
|
| - dimensions=machine_type.mp_dimensions,
|
| - duration=machine_type.lease_duration_secs,
|
| - on_lease=machine_provider.Instruction(swarming_server=swarming_server),
|
| - pubsub_project=app_id,
|
| - pubsub_topic=PUBSUB_TOPIC,
|
| - request_id=request_id,
|
| - ))
|
| - machine_type.leases.append(MachineLease(client_request_id=request_id))
|
| - machine_type.request_count = request_number
|
| - return lease_requests
|
| + error = machine_provider.LeaseRequestError.lookup_by_name(response['error'])
|
| + if error in (
|
| + machine_provider.LeaseRequestError.DEADLINE_EXCEEDED,
|
| + machine_provider.LeaseRequestError.TRANSIENT_ERROR,
|
| + ):
|
| + logging.warning(
|
| + 'Transient failure: %s\nRequest ID: %s\nError: %s',
|
| + machine_lease.key,
|
| + response['client_request_id'],
|
| + response['error'],
|
| + )
|
| + else:
|
| + logging.error(
|
| + 'Lease request failed\nKey: %s\nRequest ID: %s\nError: %s',
|
| + machine_lease.key,
|
| + response['client_request_id'],
|
| + response['error'],
|
| + )
|
| + clear_lease_request(machine_lease.key, machine_lease.client_request_id)
|
|
|
|
|
| -@ndb.transactional
|
| -def update_leases(machine_type_key, responses):
|
| - """Updates the states of leases of the given machine types.
|
| +def handle_lease_request_response(machine_lease, response):
|
| + """Handles a successful lease request response from Machine Provider.
|
|
|
| Args:
|
| - machine_type_key: ndb.Key for a MachineType instance.
|
| - responses: machine_provider.BatchedLeaseResponse instance.
|
| + machine_lease: MachineLease instance.
|
| + response: Response returned by components.machine_provider.lease_machine.
|
| """
|
| - machine_type = machine_type_key.get()
|
| - if not machine_type:
|
| - logging.warning('MachineType no longer exists: %s', machine_type_key)
|
| - return
|
| -
|
| - lease_request_map = {
|
| - request.client_request_id: request for request in machine_type.leases
|
| - }
|
| - for response in responses:
|
| - request_id = response['client_request_id']
|
| - request = lease_request_map.get(request_id)
|
| - if not request:
|
| - logging.error('Unknown request ID: %s', request_id)
|
| - continue
|
| -
|
| - if response.get('error'):
|
| - error = machine_provider.LeaseRequestError.lookup_by_name(
|
| - response['error'])
|
| - if error in (
|
| - machine_provider.LeaseRequestError.DEADLINE_EXCEEDED,
|
| - machine_provider.LeaseRequestError.TRANSIENT_ERROR,
|
| - ):
|
| - # Retryable errors. Just try again later.
|
| - logging.warning(
|
| - 'Request not processed, trying again later: %s', request_id)
|
| - else:
|
| - # TODO(smut): Handle specific errors.
|
| - logging.warning(
|
| - 'Error %s for request ID %s',
|
| - response['error'],
|
| - request_id,
|
| - )
|
| - lease_request_map.pop(request_id)
|
| + assert not response.get('error')
|
| + state = machine_provider.LeaseRequestState.lookup_by_name(response['state'])
|
| + if state == machine_provider.LeaseRequestState.FULFILLED:
|
| + if not response.get('hostname'):
|
| + # Lease has already expired. This shouldn't happen, but it indicates the
|
| + # lease expired faster than we could tell it even got fulfilled.
|
| + logging.error(
|
| + 'Request expired\nKey: %s\nRequest ID:%s\nExpired: %s',
|
| + machine_lease.key,
|
| + machine_lease.client_request_id,
|
| + response['lease_expiration_ts'],
|
| + )
|
| + clear_lease_request(machine_lease.key, machine_lease.client_request_id)
|
| else:
|
| - request.request_hash = response['request_hash']
|
| - state = machine_provider.LeaseRequestState.lookup_by_name(
|
| - response['state'])
|
| - if state == machine_provider.LeaseRequestState.DENIED:
|
| - logging.warning('Request ID denied: %s', request_id)
|
| - lease_request_map.pop(request_id)
|
| - elif state == machine_provider.LeaseRequestState.FULFILLED:
|
| - if response.get('hostname'):
|
| - logging.info(
|
| - 'Request ID %s fulfilled:\nHostname: %s\nExpiration: %s',
|
| - request_id,
|
| - response['hostname'],
|
| - response['lease_expiration_ts'],
|
| - )
|
| - request.hostname = response['hostname']
|
| - request.lease_expiration_ts = datetime.datetime.utcfromtimestamp(
|
| - int(response['lease_expiration_ts']))
|
| - else:
|
| - # Lease expired. This shouldn't happen, because it means we had a
|
| - # pending request which was fulfilled and expired before we were
|
| - # able to check its status.
|
| - logging.warning('Request ID fulfilled and expired: %s', request_id)
|
| - lease_request_map.pop(request_id)
|
| - else:
|
| - # Lease request isn't processed yet. Just try again later.
|
| - logging.info(
|
| - 'Request ID %s in state: %s', request_id, response['state'])
|
| -
|
| - machine_type.leases = sorted(
|
| - lease_request_map.values(), key=lambda lease: lease.client_request_id)
|
| - machine_type.put()
|
| -
|
| -
|
| -@ndb.tasklet
|
| -def process_message(message, subscription):
|
| - """Processes a Pub/Sub message from the Machine Provider.
|
| + logging.info(
|
| + 'Request fulfilled: %s\nRequest ID: %s\nHostname: %s\nExpires: %s',
|
| + machine_lease.key,
|
| + machine_lease.client_request_id,
|
| + response['hostname'],
|
| + response['lease_expiration_ts'],
|
| + )
|
| + log_lease_fulfillment(
|
| + machine_lease.key,
|
| + machine_lease.client_request_id,
|
| + response['hostname'],
|
| + int(response['lease_expiration_ts']),
|
| + )
|
| + # TODO(smut): Create BotInfo, associate lease information.
|
| + elif state == machine_provider.LeaseRequestState.DENIED:
|
| + logging.warning(
|
| + 'Request denied: %s\nRequest ID: %s',
|
| + machine_lease.key,
|
| + machine_lease.client_request_id,
|
| + )
|
| + clear_lease_request(machine_lease.key, machine_lease.client_request_id)
|
| +
|
| +
|
| +def manage_pending_lease_request(machine_lease):
|
| + """Manages a pending lease request.
|
|
|
| Args:
|
| - message: A message dict as returned by pubsub.pull.
|
| - subscription: Full name of the subscription the message was received on.
|
| + machine_lease: MachineLease instance with client_request_id set.
|
| """
|
| - now = utils.utcnow()
|
| -
|
| - try:
|
| - data = base64.b64decode(message.get('message', {}).get('data', ''))
|
| - lease_response = json.loads(data)
|
| - # Machine Provider sends a response including lease_expiration_ts. If
|
| - # lease_expiration_ts is not in the future and there is no more hostname
|
| - # associated with the response then this is a reclamation message.
|
| - lease_expiration_ts = datetime.datetime.utcfromtimestamp(
|
| - int(lease_response['lease_expiration_ts']))
|
| - if lease_expiration_ts <= now and not lease_response.get('hostname'):
|
| - # We used request IDs of the form MachineType.key.id()-<integer>.
|
| - # Extract the ID for the MachineType from the request ID.
|
| - machine_type_id = lease_response['client_request_id'].rsplit('-', 1)[0]
|
| - logging.info('Lease ID %s is expired\nMachineType: %s',
|
| - lease_response['client_request_id'],
|
| - MachineType.get_by_id(machine_type_id))
|
| - except (TypeError, ValueError):
|
| - logging.error('Received unexpected Pub/Sub message:\n%s',
|
| - json.dumps(message, indent=2))
|
| -
|
| - yield pubsub.ack_async(subscription, message['ackId'])
|
| -
|
| -
|
| -def process_pubsub(app_id):
|
| - """Processes Pub/Sub messages from the Machine Provider, if there are any.
|
| + assert machine_lease.client_request_id, machine_lease.key
|
| +
|
| + response = machine_provider.lease_machine(
|
| + machine_provider.LeaseRequest(
|
| + dimensions=machine_lease.mp_dimensions,
|
| + # TODO(smut): Vary duration so machines don't expire all at once.
|
| + duration=machine_lease.lease_duration_secs,
|
| + on_lease=machine_provider.Instruction(
|
| + swarming_server='https://%s' % (
|
| + app_identity.get_default_version_hostname())),
|
| + request_id=machine_lease.client_request_id,
|
| + ),
|
| + )
|
| +
|
| + if response.get('error'):
|
| + handle_lease_request_error(machine_lease, response)
|
| + return
|
| +
|
| + handle_lease_request_response(machine_lease, response)
|
| +
|
| +
|
| +def manage_lease(key):
|
| + """Manages a MachineLease.
|
|
|
| Args:
|
| - app_id: ID of the application where the Pub/Sub subscription exists.
|
| + key: ndb.Key for a MachineLease entity.
|
| """
|
| - MAX_IN_FLIGHT = 50
|
| - subscription = pubsub.full_subscription_name(app_id, PUBSUB_SUBSCRIPTION)
|
| - response = pubsub.pull(subscription)
|
| - logging.info('%s', response)
|
| + machine_lease = key.get()
|
| + if not machine_lease:
|
| + return
|
|
|
| - futures = []
|
| - messages = response.get('receivedMessages', [])
|
| - while messages:
|
| - num_futures = len(futures)
|
| - if num_futures < MAX_IN_FLIGHT:
|
| - futures.extend(
|
| - process_message(message, subscription)
|
| - for message in messages[:MAX_IN_FLIGHT - num_futures])
|
| - messages = messages[MAX_IN_FLIGHT - num_futures:]
|
| - ndb.Future.wait_any(futures)
|
| - futures = [future for future in futures if not future.done()]
|
| - if futures:
|
| - ndb.Future.wait_all(futures)
|
| + # Manage a leased machine.
|
| + if machine_lease.lease_expiration_ts:
|
| + if machine_lease.lease_expiration_ts <= utils.utcnow():
|
| + logging.info('MachineLease expired: %s', key)
|
| + assert machine_lease.hostname, key
|
| + clear_lease_request(key, machine_lease.client_request_id)
|
| + return
|
| +
|
| + # Lease expiration time is unknown, so there must be no leased machine.
|
| + assert not machine_lease.hostname, key
|
| +
|
| + # Manage a pending lease request.
|
| + if machine_lease.client_request_id:
|
| + manage_pending_lease_request(machine_lease)
|
| + return
|
| +
|
| + # Manage an uninitiated lease request.
|
| + if not machine_lease.drained:
|
| + update_client_request_id(key)
|
| + return
|
| +
|
| + # Manage an uninitiated, drained lease request.
|
| + delete_machine_lease(key)
|
|
|