Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(368)

Unified Diff: appengine/swarming/server/lease_management.py

Issue 2386793002: Reimplement Machine Provider integration (Closed)
Patch Set: Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/swarming/queue.yaml ('k') | appengine/swarming/server/lease_management_test.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/swarming/server/lease_management.py
diff --git a/appengine/swarming/server/lease_management.py b/appengine/swarming/server/lease_management.py
index 9bd20413669fce4c62a68481280e28cd03f2a679..2c3e87d9b66e87d5d40f38f94c5de2459d4146a5 100644
--- a/appengine/swarming/server/lease_management.py
+++ b/appengine/swarming/server/lease_management.py
@@ -10,52 +10,25 @@ and the list of machines of each type currently leased.
Swarming integration with Machine Provider
==========================================
-handlers_backend.py contains a cron job which looks at each machine type and
-ensures the number of active lease requests is equal to the target size
-specified by the machine type.
-
-A lease request is considered active as soon as it is initiated. It remains
-active while the lease request is pending and even after it is fulfilled. A
-lease request is considered no longer active once the request is denied or
-the machine used to fulfill the request is reclaimed.
-
-This scheme ensures that we never have more machines of a given type than we
-want
-
-Each machine type is stored as a MachineType entity which describes the machine
-in terms of Machine Provider Dimensions (not to be confused with Swarming's own
-bot dimensions) as well as a target size which describes the number of machines
-of this type we want connected at once.
-
-handlers_backend.py calls generate_lease_requests (below) to generate a number
-of lease requests equal to the target size minus the current number of active
-leases, then performs the RPC to send these requests to the Machine Provider,
-and finally it calls update_leases (below) with the results of the lease
-request to associate the new active leases with the machine type.
-
-In principle, the entire operation on a single machine type needs to happen
-transactionally so we don't make duplicate requests and end up with more
-machines than the target size wants. However, in practice we need the RPC
-to be outside the transaction because it may be slow and it may need to be
-retried, which could cause the entire transaction to time out without logging
-the result of the call. Therefore the procedure is broken up into two
-transactions with an RPC in between.
-
-With the transaction broken into two, the choice of request ID is the only way
-to prevents duplicate lease requests (e.g. if generate_lease_requests runs twice
-before update_leases gets to run). Machine Provider supports idempotent RPCs
-so long as the client-generated request ID is the same.
-
-Therefore, we ensure generate_lease_requests always generates lease requests
-with the same request ID until update_lease is called. This is done by keeping a
-count of the number of requests generated so far. If the request count is n,
-generate_lease_requests will always generate requests with IDs n + 1, n + 2, ...
-up to n + target size - current size, where current size is the number of
-currently active lease requests.
-
-update_lease takes the RPC result and updates the pending request entries.
-
-TODO(smut): Consider request count overflow.
+handlers_backend.py contains a cron job which looks at each MachineType and
+ensures there are at least as many MachineLeases in the datastore which refer
+to that MachineType as the target_size in MachineType specifies by numbering
+them 0 through target_size - 1 If there are MachineType entities numbered
+target_size or greater which refer to that MachineType, those MachineLeases
+are marked as drained.
+
+Each MachineLease manages itself. A cron job in handlers_backend.py will trigger
+self-management jobs for each entity. If there is no associated lease and the
+MachineLease is not drained, issue a request to the Machine Provider for a
+matching machine. If there is an associated request, check the status of that
+request. If it is fulfilled, ensure the existence of a BotInfo entity (see
+server/bot_management.py) corresponding to the machine provided for the lease.
+Include the lease ID and lease_expiration_ts as fields in the BotInfo. If it
+is expired, clear the associated lease. If there is no associated lease and
+the MachineLease is drained, delete the MachineLease entity.
+
+TODO(smut): If there is an associated request and the MachineLease is drained,
+release the lease immediately (as long as the bot is not mid-task).
"""
import base64
@@ -63,6 +36,7 @@ import datetime
import json
import logging
+from google.appengine.api import app_identity
from google.appengine.ext import ndb
from google.appengine.ext.ndb import msgprop
@@ -83,16 +57,27 @@ PUBSUB_SUBSCRIPTION = 'machine-provider'
class MachineLease(ndb.Model):
"""A lease request for a machine from the Machine Provider.
- Standalone MachineLease entities should not exist in the datastore.
+ Key:
+ id: A string in the form <machine type id>-<number>.
+ kind: MachineLease. Is a root entity.
"""
# Request ID used to generate this request.
- client_request_id = ndb.StringProperty(required=True)
+ client_request_id = ndb.StringProperty(indexed=True)
+ # Whether or not this MachineLease should issue lease requests.
+ drained = ndb.BooleanProperty(indexed=True)
# Hostname of the machine currently allocated for this request.
hostname = ndb.StringProperty()
- # Request hash returned by the server for the request for this machine.
- request_hash = ndb.StringProperty()
+ # Duration to lease for.
+ lease_duration_secs = ndb.IntegerProperty(indexed=False)
# DateTime indicating lease expiration time.
lease_expiration_ts = ndb.DateTimeProperty()
+ # ndb.Key for the MachineType this MachineLease is created for.
+ machine_type = ndb.KeyProperty()
+ # machine_provider.Dimensions describing the machine.
+ mp_dimensions = msgprop.MessageProperty(
+ machine_provider.Dimensions, indexed=False)
+ # Last request number used.
+ request_count = ndb.IntegerProperty(default=0, required=True)
class MachineType(ndb.Model):
@@ -102,362 +87,357 @@ class MachineType(ndb.Model):
id: A human-readable name for this machine type.
kind: MachineType. Is a root entity.
"""
- # Current number of active leases.
- current_size = ndb.ComputedProperty(lambda self: len(self.leases))
# Description of this machine type for humans.
description = ndb.StringProperty(indexed=False)
# Whether or not to attempt to lease machines of this type.
enabled = ndb.BooleanProperty(default=True)
# Duration to lease each machine for.
lease_duration_secs = ndb.IntegerProperty(indexed=False)
- # List of active lease requests.
- leases = ndb.LocalStructuredProperty(MachineLease, repeated=True)
# machine_provider.Dimensions describing the machine.
mp_dimensions = msgprop.MessageProperty(
machine_provider.Dimensions, indexed=False)
- # Number of bots pending deletion.
- num_pending_deletion = ndb.ComputedProperty(
- lambda self: len(self.pending_deletion))
- # List of hostnames whose leases have expired and should be deleted.
- pending_deletion = ndb.StringProperty(indexed=False, repeated=True)
- # Last request number used.
- request_count = ndb.IntegerProperty(default=0, required=True)
# Target number of machines of this type to have leased at once.
target_size = ndb.IntegerProperty(indexed=False, required=True)
-def clean_up_bots():
- """Cleans up expired leases."""
- # Maximum number of in-flight ndb.Futures.
- MAX_IN_FLIGHT = 50
+@ndb.transactional_tasklet
+def ensure_entity_exists(machine_type, n):
+ """Ensures the nth MachineLease for the given MachineType exists.
+
+ Args:
+ machine_type: MachineType entity.
+ n: The MachineLease index.
+ """
+ key = ndb.Key(MachineLease, '%s-%s' % (machine_type.key.id(), n))
+ machine_lease = yield key.get_async()
+ if machine_lease:
+ return
+
+ yield MachineLease(
+ key=key,
+ lease_duration_secs=machine_type.lease_duration_secs,
+ machine_type=machine_type.key,
+ mp_dimensions=machine_type.mp_dimensions,
+ ).put_async()
+
- bot_ids = []
- deleted = {}
- for machine_type in MachineType.query(MachineType.num_pending_deletion > 0):
- logging.info(
- 'Deleting bots: %s', ', '.join(sorted(machine_type.pending_deletion)))
- bot_ids.extend(machine_type.pending_deletion)
- deleted[machine_type.key] = machine_type.pending_deletion
+def ensure_entities_exist(max_concurrent=50):
+ """Ensures MachineLeases exist for each MachineType.
- # Generate a few asynchronous requests at a time in order to
- # prevent having too many in-flight ndb.Futures at a time.
+ Args:
+ max_concurrent: Maximum number of concurrent asynchronous requests.
+ """
+ # Generate a few asynchronous requests at a time in order to prevent having
+ # too many in flight at a time.
futures = []
- while bot_ids:
- num_futures = len(futures)
- if num_futures < MAX_IN_FLIGHT:
- keys = [bot_management.get_info_key(bot_id)
- for bot_id in bot_ids[:MAX_IN_FLIGHT - num_futures]]
- bot_ids = bot_ids[MAX_IN_FLIGHT - num_futures:]
- futures.extend(ndb.delete_multi_async(keys))
- ndb.Future.wait_any(futures)
- futures = [future for future in futures if not future.done()]
+ for machine_type in MachineType.query(MachineType.enabled == True):
+ cursor = 0
+ while cursor < machine_type.target_size:
+ while len(futures) < max_concurrent and cursor < machine_type.target_size:
+ futures.append(ensure_entity_exists(machine_type, cursor))
+ cursor += 1
+ ndb.Future.wait_any(futures)
+ # We don't bother checking success or failure. If a transient error
+ # like TransactionFailed or DeadlineExceeded is raised and an entity
+ # is not created, we will just create it the next time this is called,
+ # converging to the desired state eventually.
+ futures = [future for future in futures if not future.done()]
if futures:
ndb.Future.wait_all(futures)
- # There should be relatively few MachineType entitites, so
- # just process them sequentially.
- # TODO(smut): Parallelize this.
- for machine_key, hostnames in deleted.iteritems():
- successfully_deleted = []
- for hostname in hostnames:
- if bot_management.get_info_key(hostname).get():
- logging.error('Failed to delete BotInfo: %s', hostname)
- else:
- successfully_deleted.append(hostname)
- logging.info('Deleted bots: %s', ', '.join(sorted(successfully_deleted)))
- _clear_bots_pending_deletion(machine_key, hostnames)
+
+def drain_excess(max_concurrent=50):
+ """Marks MachineLeases beyond what is needed by their MachineType as drained.
+
+ Args:
+ max_concurrent: Maximum number of concurrent asynchronous requests.
+ """
+ futures = []
+
+ for machine_type in MachineType.query():
+ for machine_lease in MachineLease.query(
+ MachineLease.machine_type == machine_type.key,
+ ):
+ try:
+ index = int(machine_lease.key.id().rsplit('-', 1)[-1])
+ except ValueError:
+ logging.error(
+ 'MachineLease index could not be deciphered\n Key: %s',
+ machine_lease.key,
+ )
+ continue
+ # Drain MachineLeases where the MachineType is not enabled or the index
+ # exceeds the target_size given by the MachineType. Since MachineLeases
+ # are created in contiguous blocks, only indices 0 through target_size - 1
+ # should exist.
+ if not machine_type.enabled or index >= machine_type.target_size:
+ if len(futures) == max_concurrent:
+ ndb.Future.wait_any(futures)
+ futures = [future for future in futures if not future.done()]
+ machine_lease.drained = True
+ futures.append(machine_lease.put_async())
+
+ if futures:
+ ndb.Future.wait_all(futures)
+
+
+def schedule_lease_management():
+ """Schedules task queues to process each MachineLease."""
+ for machine_lease in MachineLease.query():
+ # TODO(smut): Remove this check once migrated to the new format.
+ if machine_lease.machine_type:
+ if not utils.enqueue_task(
+ '/internal/taskqueue/machine-provider-manage',
+ 'machine-provider-manage',
+ params={
+ 'key': machine_lease.key.urlsafe(),
+ },
+ ):
+ logging.warning(
+ 'Failed to enqueue task for MachineLease: %s', machine_lease.key)
@ndb.transactional
-def _clear_bots_pending_deletion(machine_type_key, hostnames):
- """Clears the list of bots pending deletion.
+def clear_lease_request(key, request_id):
+ """Clears information about given lease request.
Args:
- machine_type_key: ndb.Key for a MachineType instance.
- hostnames: List of bots pending deletion.
+ request_id: ID of the request to clear.
"""
- machine_type = machine_type_key.get()
- if not machine_type:
- logging.error('MachineType no longer exists: %s', machine_type_key.id())
+ machine_lease = key.get()
+ if not machine_lease:
+ logging.error('MachineLease does not exist\nKey: %s', key)
return
- num_pending_deletion = len(machine_type.pending_deletion)
- machine_type.pending_deletion = [
- host for host in machine_type.pending_deletion if host not in hostnames]
- if len(machine_type.pending_deletion) != num_pending_deletion:
- machine_type.put()
+ if not machine_lease.client_request_id:
+ return
+ if request_id != machine_lease.client_request_id:
+ # Already cleared and incremented?
+ logging.warning(
+ 'Request ID mismatch for MachineLease: %s\nExpected: %s\nActual: %s',
+ key,
+ request_id,
+ machine_lease.client_request_id,
+ )
+ return
-@ndb.transactional
-def generate_lease_requests(machine_type_key, app_id, swarming_server):
- """Generates lease requests.
+ machine_lease.client_request_id = None
+ machine_lease.hostname = None
+ machine_lease.lease_expiration_ts = None
+ machine_lease.put()
- The list includes new requests to lease machines up to the targeted
- size for the given machine type as well as requests to get the status
- of pending lease requests.
- Args:
- machine_type_key: ndb.Key for a MachineType instance.
- app_id: ID of the application the requests originate from.
- swarming_server: URL for the Swarming server to connect to.
+@ndb.transactional
+def log_lease_fulfillment(key, request_id, hostname, lease_expiration_ts):
+ """Logs lease fulfillment.
- Returns:
- A list of lease requests.
+ Args:
+ request_id: ID of the request being fulfilled.
+ hostname: Hostname of the machine fulfilling the request.
+ lease_expiration_ts: UTC seconds since epoch when the lease expires.
"""
- machine_type = machine_type_key.get()
- if not machine_type:
- logging.warning('MachineType no longer exists: %s', machine_type_key.id())
- return []
-
- expired_requests = _clean_up_expired_leases(machine_type)
- lease_requests = _generate_lease_request_status_updates(
- machine_type, app_id, swarming_server)
-
- if not machine_type.enabled:
- logging.warning('MachineType is not enabled: %s\n', machine_type.key.id())
- return lease_requests
- if machine_type.current_size >= machine_type.target_size:
- logging.info(
- 'MachineType %s is at capacity: %d/%d',
- machine_type.key.id(),
- machine_type.current_size,
- machine_type.target_size,
- )
- return lease_requests
-
- new_requests = _generate_lease_requests_for_new_machines(
- machine_type, app_id, swarming_server)
+ machine_lease = key.get()
+ if not machine_lease:
+ logging.error('MachineLease does not exist\nKey: %s', key)
+ return
- if new_requests or expired_requests:
- machine_type.put()
- lease_requests.extend(new_requests)
+ if request_id != machine_lease.client_request_id:
+ logging.error(
+ 'Request ID mismatch\nKey: %s\nExpected: %s\nActual: %s',
+ key,
+ machine_lease.client_request_id,
+ request_id,
+ )
+ return
- return lease_requests
+ if (hostname == machine_lease.hostname
+ and lease_expiration_ts == machine_lease.lease_expiration_ts):
+ return
+ machine_lease.hostname = hostname
+ machine_lease.lease_expiration_ts = datetime.datetime.utcfromtimestamp(
+ lease_expiration_ts)
+ machine_lease.put()
-def _clean_up_expired_leases(machine_type):
- """Cleans up expired leases.
- Prunes expired leases from machine_type.leases,
- but does not write the result to the datastore.
+@ndb.transactional
+def update_client_request_id(key):
+ """Sets the client request ID used to lease a machine.
Args:
- machine_type: MachineType instance.
-
- Returns:
- A list of leases that were removed.
+ key: ndb.Key for a MachineLease entity.
"""
- active = []
- expired = []
-
- for request in machine_type.leases:
- if request.hostname and request.lease_expiration_ts <= utils.utcnow():
- logging.warning(
- 'Request ID %s expired:\nHostname: %s\nExpiration: %s',
- request.client_request_id,
- request.hostname,
- request.lease_expiration_ts,
- )
- expired.append(request.hostname)
- else:
- active.append(request)
+ machine_lease = key.get()
+ if not machine_lease:
+ logging.error('MachineLease does not exist\nKey: %s', key)
+ return
+
+ if machine_lease.drained:
+ logging.info('MachineLease is drained\nKey: %s', key)
+ return
- machine_type.leases = active
- machine_type.pending_deletion.extend(expired)
- return expired
+ if machine_lease.client_request_id:
+ return
+ machine_lease.request_count += 1
+ machine_lease.client_request_id = '%s-%s-%s' % (
+ machine_lease.machine_type.id(), key.id(), machine_lease.request_count)
+ machine_lease.put()
-def _generate_lease_request_status_updates(
- machine_type, app_id, swarming_server):
- """Generates status update requests for pending lease requests.
- Args:
- machine_type: MachineType instance.
- app_id: ID of the application the requests originate from.
- swarming_server: URL for the Swarming server to connect to.
+@ndb.transactional
+def delete_machine_lease(key):
+ """Deletes the given MachineLease if it is drained and has no active lease.
- Returns:
- A list of lease requests.
+ Args:
+ key: ndb.Key for a MachineLease entity.
"""
- lease_requests = []
- for request in machine_type.leases:
- if not request.hostname:
- # We don't know the hostname yet, meaning this request is still pending.
- lease_requests.append(machine_provider.LeaseRequest(
- dimensions=machine_type.mp_dimensions,
- duration=machine_type.lease_duration_secs,
- on_lease=machine_provider.Instruction(
- swarming_server=swarming_server),
- pubsub_project=app_id,
- pubsub_topic=PUBSUB_TOPIC,
- request_id=request.client_request_id,
- ))
- return lease_requests
+ machine_lease = key.get()
+ if not machine_lease:
+ return
+
+ if not machine_lease.drained:
+ logging.warning('MachineLease not drained: %s', key)
+ return
+ if machine_lease.client_request_id:
+ return
-def _generate_lease_requests_for_new_machines(
- machine_type, app_id, swarming_server):
- """Generates requests to lease machines up to the target.
+ key.delete()
- Extends machine_type.leases by the number of new lease requests generated,
- but does not write the result to the datastore.
- Args:
- machine_type: MachineType instance.
- app_id: ID of the application the requests originate from.
- swarming_server: URL for the Swarming server to connect to.
+def handle_lease_request_error(machine_lease, response):
+ """Handles an error in the lease request response from Machine Provider.
- Returns:
- A list of lease requests.
+ Args:
+ machine_lease: MachineLease instance.
+ response: Response returned by components.machine_provider.lease_machine.
"""
- lease_requests = []
- request_number = machine_type.request_count
- for _ in xrange(machine_type.target_size - machine_type.current_size):
- request_number += 1
- request_id = '%s-%d' % (machine_type.key.id(), request_number)
- lease_requests.append(machine_provider.LeaseRequest(
- dimensions=machine_type.mp_dimensions,
- duration=machine_type.lease_duration_secs,
- on_lease=machine_provider.Instruction(swarming_server=swarming_server),
- pubsub_project=app_id,
- pubsub_topic=PUBSUB_TOPIC,
- request_id=request_id,
- ))
- machine_type.leases.append(MachineLease(client_request_id=request_id))
- machine_type.request_count = request_number
- return lease_requests
+ error = machine_provider.LeaseRequestError.lookup_by_name(response['error'])
+ if error in (
+ machine_provider.LeaseRequestError.DEADLINE_EXCEEDED,
+ machine_provider.LeaseRequestError.TRANSIENT_ERROR,
+ ):
+ logging.warning(
+ 'Transient failure: %s\nRequest ID: %s\nError: %s',
+ machine_lease.key,
+ response['client_request_id'],
+ response['error'],
+ )
+ else:
+ logging.error(
+ 'Lease request failed\nKey: %s\nRequest ID: %s\nError: %s',
+ machine_lease.key,
+ response['client_request_id'],
+ response['error'],
+ )
+ clear_lease_request(machine_lease.key, machine_lease.client_request_id)
-@ndb.transactional
-def update_leases(machine_type_key, responses):
- """Updates the states of leases of the given machine types.
+def handle_lease_request_response(machine_lease, response):
+ """Handles a successful lease request response from Machine Provider.
Args:
- machine_type_key: ndb.Key for a MachineType instance.
- responses: machine_provider.BatchedLeaseResponse instance.
+ machine_lease: MachineLease instance.
+ response: Response returned by components.machine_provider.lease_machine.
"""
- machine_type = machine_type_key.get()
- if not machine_type:
- logging.warning('MachineType no longer exists: %s', machine_type_key)
- return
-
- lease_request_map = {
- request.client_request_id: request for request in machine_type.leases
- }
- for response in responses:
- request_id = response['client_request_id']
- request = lease_request_map.get(request_id)
- if not request:
- logging.error('Unknown request ID: %s', request_id)
- continue
-
- if response.get('error'):
- error = machine_provider.LeaseRequestError.lookup_by_name(
- response['error'])
- if error in (
- machine_provider.LeaseRequestError.DEADLINE_EXCEEDED,
- machine_provider.LeaseRequestError.TRANSIENT_ERROR,
- ):
- # Retryable errors. Just try again later.
- logging.warning(
- 'Request not processed, trying again later: %s', request_id)
- else:
- # TODO(smut): Handle specific errors.
- logging.warning(
- 'Error %s for request ID %s',
- response['error'],
- request_id,
- )
- lease_request_map.pop(request_id)
+ assert not response.get('error')
+ state = machine_provider.LeaseRequestState.lookup_by_name(response['state'])
+ if state == machine_provider.LeaseRequestState.FULFILLED:
+ if not response.get('hostname'):
+ # Lease has already expired. This shouldn't happen, but it indicates the
+ # lease expired faster than we could tell it even got fulfilled.
+ logging.error(
+ 'Request expired\nKey: %s\nRequest ID:%s\nExpired: %s',
+ machine_lease.key,
+ machine_lease.client_request_id,
+ response['lease_expiration_ts'],
+ )
+ clear_lease_request(machine_lease.key, machine_lease.client_request_id)
else:
- request.request_hash = response['request_hash']
- state = machine_provider.LeaseRequestState.lookup_by_name(
- response['state'])
- if state == machine_provider.LeaseRequestState.DENIED:
- logging.warning('Request ID denied: %s', request_id)
- lease_request_map.pop(request_id)
- elif state == machine_provider.LeaseRequestState.FULFILLED:
- if response.get('hostname'):
- logging.info(
- 'Request ID %s fulfilled:\nHostname: %s\nExpiration: %s',
- request_id,
- response['hostname'],
- response['lease_expiration_ts'],
- )
- request.hostname = response['hostname']
- request.lease_expiration_ts = datetime.datetime.utcfromtimestamp(
- int(response['lease_expiration_ts']))
- else:
- # Lease expired. This shouldn't happen, because it means we had a
- # pending request which was fulfilled and expired before we were
- # able to check its status.
- logging.warning('Request ID fulfilled and expired: %s', request_id)
- lease_request_map.pop(request_id)
- else:
- # Lease request isn't processed yet. Just try again later.
- logging.info(
- 'Request ID %s in state: %s', request_id, response['state'])
-
- machine_type.leases = sorted(
- lease_request_map.values(), key=lambda lease: lease.client_request_id)
- machine_type.put()
-
-
-@ndb.tasklet
-def process_message(message, subscription):
- """Processes a Pub/Sub message from the Machine Provider.
+ logging.info(
+ 'Request fulfilled: %s\nRequest ID: %s\nHostname: %s\nExpires: %s',
+ machine_lease.key,
+ machine_lease.client_request_id,
+ response['hostname'],
+ response['lease_expiration_ts'],
+ )
+ log_lease_fulfillment(
+ machine_lease.key,
+ machine_lease.client_request_id,
+ response['hostname'],
+ int(response['lease_expiration_ts']),
+ )
+ # TODO(smut): Create BotInfo, associate lease information.
+ elif state == machine_provider.LeaseRequestState.DENIED:
+ logging.warning(
+ 'Request denied: %s\nRequest ID: %s',
+ machine_lease.key,
+ machine_lease.client_request_id,
+ )
+ clear_lease_request(machine_lease.key, machine_lease.client_request_id)
+
+
+def manage_pending_lease_request(machine_lease):
+ """Manages a pending lease request.
Args:
- message: A message dict as returned by pubsub.pull.
- subscription: Full name of the subscription the message was received on.
+ machine_lease: MachineLease instance with client_request_id set.
"""
- now = utils.utcnow()
-
- try:
- data = base64.b64decode(message.get('message', {}).get('data', ''))
- lease_response = json.loads(data)
- # Machine Provider sends a response including lease_expiration_ts. If
- # lease_expiration_ts is not in the future and there is no more hostname
- # associated with the response then this is a reclamation message.
- lease_expiration_ts = datetime.datetime.utcfromtimestamp(
- int(lease_response['lease_expiration_ts']))
- if lease_expiration_ts <= now and not lease_response.get('hostname'):
- # We used request IDs of the form MachineType.key.id()-<integer>.
- # Extract the ID for the MachineType from the request ID.
- machine_type_id = lease_response['client_request_id'].rsplit('-', 1)[0]
- logging.info('Lease ID %s is expired\nMachineType: %s',
- lease_response['client_request_id'],
- MachineType.get_by_id(machine_type_id))
- except (TypeError, ValueError):
- logging.error('Received unexpected Pub/Sub message:\n%s',
- json.dumps(message, indent=2))
-
- yield pubsub.ack_async(subscription, message['ackId'])
-
-
-def process_pubsub(app_id):
- """Processes Pub/Sub messages from the Machine Provider, if there are any.
+ assert machine_lease.client_request_id, machine_lease.key
+
+ response = machine_provider.lease_machine(
+ machine_provider.LeaseRequest(
+ dimensions=machine_lease.mp_dimensions,
+ # TODO(smut): Vary duration so machines don't expire all at once.
+ duration=machine_lease.lease_duration_secs,
+ on_lease=machine_provider.Instruction(
+ swarming_server='https://%s' % (
+ app_identity.get_default_version_hostname())),
+ request_id=machine_lease.client_request_id,
+ ),
+ )
+
+ if response.get('error'):
+ handle_lease_request_error(machine_lease, response)
+ return
+
+ handle_lease_request_response(machine_lease, response)
+
+
+def manage_lease(key):
+ """Manages a MachineLease.
Args:
- app_id: ID of the application where the Pub/Sub subscription exists.
+ key: ndb.Key for a MachineLease entity.
"""
- MAX_IN_FLIGHT = 50
- subscription = pubsub.full_subscription_name(app_id, PUBSUB_SUBSCRIPTION)
- response = pubsub.pull(subscription)
- logging.info('%s', response)
+ machine_lease = key.get()
+ if not machine_lease:
+ return
- futures = []
- messages = response.get('receivedMessages', [])
- while messages:
- num_futures = len(futures)
- if num_futures < MAX_IN_FLIGHT:
- futures.extend(
- process_message(message, subscription)
- for message in messages[:MAX_IN_FLIGHT - num_futures])
- messages = messages[MAX_IN_FLIGHT - num_futures:]
- ndb.Future.wait_any(futures)
- futures = [future for future in futures if not future.done()]
- if futures:
- ndb.Future.wait_all(futures)
+ # Manage a leased machine.
+ if machine_lease.lease_expiration_ts:
+ if machine_lease.lease_expiration_ts <= utils.utcnow():
+ logging.info('MachineLease expired: %s', key)
+ assert machine_lease.hostname, key
+ clear_lease_request(key, machine_lease.client_request_id)
+ return
+
+ # Lease expiration time is unknown, so there must be no leased machine.
+ assert not machine_lease.hostname, key
+
+ # Manage a pending lease request.
+ if machine_lease.client_request_id:
+ manage_pending_lease_request(machine_lease)
+ return
+
+ # Manage an uninitiated lease request.
+ if not machine_lease.drained:
+ update_client_request_id(key)
+ return
+
+ # Manage an uninitiated, drained lease request.
+ delete_machine_lease(key)
« no previous file with comments | « appengine/swarming/queue.yaml ('k') | appengine/swarming/server/lease_management_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698