Chromium Code Reviews| Index: appengine/swarming/server/lease_management.py |
| diff --git a/appengine/swarming/server/lease_management.py b/appengine/swarming/server/lease_management.py |
| index 9bd20413669fce4c62a68481280e28cd03f2a679..4ead654d10346bdffc94a399793b18168127892f 100644 |
| --- a/appengine/swarming/server/lease_management.py |
| +++ b/appengine/swarming/server/lease_management.py |
| @@ -10,52 +10,25 @@ and the list of machines of each type currently leased. |
| Swarming integration with Machine Provider |
| ========================================== |
| -handlers_backend.py contains a cron job which looks at each machine type and |
| -ensures the number of active lease requests is equal to the target size |
| -specified by the machine type. |
| - |
| -A lease request is considered active as soon as it is initiated. It remains |
| -active while the lease request is pending and even after it is fulfilled. A |
| -lease request is considered no longer active once the request is denied or |
| -the machine used to fulfill the request is reclaimed. |
| - |
| -This scheme ensures that we never have more machines of a given type than we |
| -want |
| - |
| -Each machine type is stored as a MachineType entity which describes the machine |
| -in terms of Machine Provider Dimensions (not to be confused with Swarming's own |
| -bot dimensions) as well as a target size which describes the number of machines |
| -of this type we want connected at once. |
| - |
| -handlers_backend.py calls generate_lease_requests (below) to generate a number |
| -of lease requests equal to the target size minus the current number of active |
| -leases, then performs the RPC to send these requests to the Machine Provider, |
| -and finally it calls update_leases (below) with the results of the lease |
| -request to associate the new active leases with the machine type. |
| - |
| -In principle, the entire operation on a single machine type needs to happen |
| -transactionally so we don't make duplicate requests and end up with more |
| -machines than the target size wants. However, in practice we need the RPC |
| -to be outside the transaction because it may be slow and it may need to be |
| -retried, which could cause the entire transaction to time out without logging |
| -the result of the call. Therefore the procedure is broken up into two |
| -transactions with an RPC in between. |
| - |
| -With the transaction broken into two, the choice of request ID is the only way |
| -to prevents duplicate lease requests (e.g. if generate_lease_requests runs twice |
| -before update_leases gets to run). Machine Provider supports idempotent RPCs |
| -so long as the client-generated request ID is the same. |
| - |
| -Therefore, we ensure generate_lease_requests always generates lease requests |
| -with the same request ID until update_lease is called. This is done by keeping a |
| -count of the number of requests generated so far. If the request count is n, |
| -generate_lease_requests will always generate requests with IDs n + 1, n + 2, ... |
| -up to n + target size - current size, where current size is the number of |
| -currently active lease requests. |
| - |
| -update_lease takes the RPC result and updates the pending request entries. |
| - |
| -TODO(smut): Consider request count overflow. |
| +handlers_backend.py contains a cron job which looks at each MachineType and |
| +ensures there are at least as many MachineLeases in the datastore which refer |
| +to that MachineType as the target_size in MachineType specifies by numbering |
| +them 0 through target_size - 1 If there are MachineType entities numbered |
| +target_size or greater which refer to that MachineType, those MachineLeases |
| +are marked as drained. |
| + |
| +Each MachineLease manages itself. A cron job in handlers_backend.py will trigger |
| +self-management jobs for each entity. If there is no associated lease and the |
| +MachineLease is not drained, issue a request to the Machine Provider for a |
| +matching machine. If there is an associated request, check the status of that |
| +request. If it is fulfilled, ensure the existence of a BotInfo entity (see |
| +server/bot_management.py) corresponding to the machine provided for the lease. |
| +Include the lease ID and lease_expiration_ts as fields in the BotInfo. If it |
| +is expired, clear the associated lease. If there is no associated lease and |
| +the MachineLease is drained, delete the MachineLease entity. |
| + |
| +TODO(smut): If there is an associated request and the MachineLease is drained, |
| +release the lease immediately (as long as the bot is not mid-task). |
| """ |
| import base64 |
| @@ -63,6 +36,7 @@ import datetime |
| import json |
| import logging |
| +from google.appengine.api import app_identity |
| from google.appengine.ext import ndb |
| from google.appengine.ext.ndb import msgprop |
| @@ -83,16 +57,27 @@ PUBSUB_SUBSCRIPTION = 'machine-provider' |
| class MachineLease(ndb.Model): |
| """A lease request for a machine from the Machine Provider. |
| - Standalone MachineLease entities should not exist in the datastore. |
| + Key: |
| + id: A string in the form <machine type id>-<number>. |
| + kind: MachineLease. Is a root entity. |
| """ |
| # Request ID used to generate this request. |
| - client_request_id = ndb.StringProperty(required=True) |
| + client_request_id = ndb.StringProperty(indexed=True) |
| + # Whether or not this MachineLease should issue lease requests. |
| + drained = ndb.BooleanProperty(indexed=True) |
| # Hostname of the machine currently allocated for this request. |
| hostname = ndb.StringProperty() |
| - # Request hash returned by the server for the request for this machine. |
| - request_hash = ndb.StringProperty() |
| + # Duration to lease for. |
| + lease_duration_secs = ndb.IntegerProperty(indexed=False) |
| # DateTime indicating lease expiration time. |
| lease_expiration_ts = ndb.DateTimeProperty() |
| + # ndb.Key for the MachineType this MachineLease is created for. |
| + machine_type = ndb.KeyProperty() |
| + # machine_provider.Dimensions describing the machine. |
| + mp_dimensions = msgprop.MessageProperty( |
| + machine_provider.Dimensions, indexed=False) |
| + # Last request number used. |
| + request_count = ndb.IntegerProperty(default=0, required=True) |
| class MachineType(ndb.Model): |
| @@ -102,256 +87,254 @@ class MachineType(ndb.Model): |
| id: A human-readable name for this machine type. |
| kind: MachineType. Is a root entity. |
| """ |
| - # Current number of active leases. |
| - current_size = ndb.ComputedProperty(lambda self: len(self.leases)) |
| # Description of this machine type for humans. |
| description = ndb.StringProperty(indexed=False) |
| # Whether or not to attempt to lease machines of this type. |
| enabled = ndb.BooleanProperty(default=True) |
| # Duration to lease each machine for. |
| lease_duration_secs = ndb.IntegerProperty(indexed=False) |
| - # List of active lease requests. |
| - leases = ndb.LocalStructuredProperty(MachineLease, repeated=True) |
| # machine_provider.Dimensions describing the machine. |
| mp_dimensions = msgprop.MessageProperty( |
| machine_provider.Dimensions, indexed=False) |
| - # Number of bots pending deletion. |
| - num_pending_deletion = ndb.ComputedProperty( |
| - lambda self: len(self.pending_deletion)) |
| - # List of hostnames whose leases have expired and should be deleted. |
| - pending_deletion = ndb.StringProperty(indexed=False, repeated=True) |
| - # Last request number used. |
| - request_count = ndb.IntegerProperty(default=0, required=True) |
| # Target number of machines of this type to have leased at once. |
| target_size = ndb.IntegerProperty(indexed=False, required=True) |
| -def clean_up_bots(): |
| - """Cleans up expired leases.""" |
| - # Maximum number of in-flight ndb.Futures. |
| - MAX_IN_FLIGHT = 50 |
| +@ndb.transactional_tasklet |
| +def ensure_entity_exists(machine_type, n): |
| + """Ensures the nth MachineLease for the given MachineType exists. |
| + |
| + Args: |
| + machine_type: MachineType entity. |
| + n: The MachineLease index. |
| + """ |
| + key = ndb.Key(MachineLease, '%s-%s' % (machine_type.key.id(), n)) |
| + machine_lease = yield key.get_async() |
| + if machine_lease: |
| + return |
| + |
| + yield MachineLease( |
| + key=key, |
| + lease_duration_secs=machine_type.lease_duration_secs, |
| + machine_type=machine_type.key, |
| + mp_dimensions=machine_type.mp_dimensions, |
| + ).put_async() |
| - bot_ids = [] |
| - deleted = {} |
| - for machine_type in MachineType.query(MachineType.num_pending_deletion > 0): |
| - logging.info( |
| - 'Deleting bots: %s', ', '.join(sorted(machine_type.pending_deletion))) |
| - bot_ids.extend(machine_type.pending_deletion) |
| - deleted[machine_type.key] = machine_type.pending_deletion |
| - # Generate a few asynchronous requests at a time in order to |
| - # prevent having too many in-flight ndb.Futures at a time. |
| +def ensure_entities_exist(max_concurrent=50): |
| + """Ensures MachineLeases exist for each MachineType. |
| + |
| + Args: |
| + max_concurrent: Maximum number of concurrent asynchronous requests. |
| + """ |
| + # Generate a few asynchronous requests at a time in order to prevent having |
| + # too many in flight at a time. |
| futures = [] |
| - while bot_ids: |
| - num_futures = len(futures) |
| - if num_futures < MAX_IN_FLIGHT: |
| - keys = [bot_management.get_info_key(bot_id) |
| - for bot_id in bot_ids[:MAX_IN_FLIGHT - num_futures]] |
| - bot_ids = bot_ids[MAX_IN_FLIGHT - num_futures:] |
| - futures.extend(ndb.delete_multi_async(keys)) |
| - ndb.Future.wait_any(futures) |
| - futures = [future for future in futures if not future.done()] |
| + for machine_type in MachineType.query(MachineType.enabled == True): |
| + cursor = 0 |
| + while cursor < machine_type.target_size: |
| + while len(futures) < max_concurrent and cursor < machine_type.target_size: |
| + futures.append(ensure_entity_exists(machine_type, cursor)) |
| + cursor += 1 |
| + ndb.Future.wait_any(futures) |
|
Vadim Sh.
2016/10/03 20:26:31
this doesn't check status code (e.g. if transactio
smut
2016/10/03 21:18:07
Done.
|
| + futures = [future for future in futures if not future.done()] |
| if futures: |
| ndb.Future.wait_all(futures) |
| - # There should be relatively few MachineType entitites, so |
| - # just process them sequentially. |
| - # TODO(smut): Parallelize this. |
| - for machine_key, hostnames in deleted.iteritems(): |
| - successfully_deleted = [] |
| - for hostname in hostnames: |
| - if bot_management.get_info_key(hostname).get(): |
| - logging.error('Failed to delete BotInfo: %s', hostname) |
| - else: |
| - successfully_deleted.append(hostname) |
| - logging.info('Deleted bots: %s', ', '.join(sorted(successfully_deleted))) |
| - _clear_bots_pending_deletion(machine_key, hostnames) |
| + |
| +def drain_excess(max_concurrent=50): |
| + """Marks MachineLeases beyond what is needed by their MachineType as drained. |
| + |
| + Args: |
| + max_concurrent: Maximum number of concurrent asynchronous requests. |
| + """ |
| + futures = [] |
| + |
| + for machine_type in MachineType.query(): |
| + for machine_lease in MachineLease.query( |
|
Vadim Sh.
2016/10/03 20:26:32
I think it will be more efficient to do a single M
smut
2016/10/03 21:18:08
There's only one MachineType right now. Didn't do
|
| + MachineLease.machine_type == machine_type.key, |
| + ): |
| + try: |
| + index = int(machine_lease.key.id().rsplit('-', 1)[-1]) |
| + except ValueError: |
| + logging.error( |
| + 'MachineLease index could not be deciphered: %s', machine_lease.key) |
| + continue |
| + # Drain MachineLeases where the MachineType is not enabled or the index |
| + # exceeds the target_size given by the MachineType. Since MachineLeases |
| + # are created in contiguous blocks, only indices 0 through target_size - 1 |
| + # should exist. |
| + if not machine_type.enabled or index >= machine_type.target_size: |
| + if len(futures) == max_concurrent: |
| + ndb.Future.wait_any(futures) |
| + futures = [future for future in futures if not future.done()] |
| + machine_lease.drained = True |
| + futures.append(machine_lease.put_async()) |
| + |
| + if futures: |
| + ndb.Future.wait_all(futures) |
| + |
| + |
| +def schedule_lease_management(): |
| + """Schedules task queues to process each MachineLease.""" |
| + for machine_lease in MachineLease.query(): |
| + # TODO(smut): Remove this check once migrated to the new format. |
| + if machine_lease.machine_type: |
| + if not utils.enqueue_task( |
| + '/internal/taskqueue/machine-provider-manage', |
| + 'machine-provider-manage', |
| + params={ |
| + 'key': machine_lease.key.urlsafe(), |
| + }, |
| + ): |
| + logging.warning( |
| + 'Failed to enqueue task for MachineLease: %s', machine_lease.key) |
| @ndb.transactional |
| -def _clear_bots_pending_deletion(machine_type_key, hostnames): |
| - """Clears the list of bots pending deletion. |
| +def clear_lease_request(key, request_id): |
| + """Clears information about given lease request. |
| Args: |
| - machine_type_key: ndb.Key for a MachineType instance. |
| - hostnames: List of bots pending deletion. |
| + request_id: ID of the request to clear. |
| """ |
| - machine_type = machine_type_key.get() |
| - if not machine_type: |
| - logging.error('MachineType no longer exists: %s', machine_type_key.id()) |
| + machine_lease = key.get() |
| + if not machine_lease: |
| + logging.error('MachineLease does not exist: %s', key) |
| + return |
| + |
| + if not machine_lease.client_request_id: |
| return |
| - num_pending_deletion = len(machine_type.pending_deletion) |
| - machine_type.pending_deletion = [ |
| - host for host in machine_type.pending_deletion if host not in hostnames] |
| - if len(machine_type.pending_deletion) != num_pending_deletion: |
| - machine_type.put() |
| + if request_id != machine_lease.client_request_id: |
| + # Already cleared and incremented? |
| + logging.warning( |
| + 'Request ID mismatch for MachineLease: %s\nExpected: %s\nActual: %s', |
| + key, |
| + request_id, |
| + machine_lease.client_request_id, |
| + ) |
| + return |
| + machine_lease.client_request_id = None |
| + machine_lease.hostname = None |
| + machine_lease.lease_expiration_ts = None |
| + machine_lease.put() |
| -@ndb.transactional |
| -def generate_lease_requests(machine_type_key, app_id, swarming_server): |
| - """Generates lease requests. |
| - The list includes new requests to lease machines up to the targeted |
| - size for the given machine type as well as requests to get the status |
| - of pending lease requests. |
| +@ndb.transactional |
| +def log_lease_fulfillment(key, request_id, hostname, lease_expiration_ts): |
| + """Logs lease fulfillment. |
| Args: |
| - machine_type_key: ndb.Key for a MachineType instance. |
| - app_id: ID of the application the requests originate from. |
| - swarming_server: URL for the Swarming server to connect to. |
| - |
| - Returns: |
| - A list of lease requests. |
| + request_id: ID of the request being fulfilled. |
| + hostname: Hostname of the machine fulfilling the request. |
| + lease_expiration_ts: UTC seconds since epoch when the lease expires. |
| """ |
| - machine_type = machine_type_key.get() |
| - if not machine_type: |
| - logging.warning('MachineType no longer exists: %s', machine_type_key.id()) |
| - return [] |
| - |
| - expired_requests = _clean_up_expired_leases(machine_type) |
| - lease_requests = _generate_lease_request_status_updates( |
| - machine_type, app_id, swarming_server) |
| - |
| - if not machine_type.enabled: |
| - logging.warning('MachineType is not enabled: %s\n', machine_type.key.id()) |
| - return lease_requests |
| - if machine_type.current_size >= machine_type.target_size: |
| - logging.info( |
| - 'MachineType %s is at capacity: %d/%d', |
| - machine_type.key.id(), |
| - machine_type.current_size, |
| - machine_type.target_size, |
| - ) |
| - return lease_requests |
| - |
| - new_requests = _generate_lease_requests_for_new_machines( |
| - machine_type, app_id, swarming_server) |
| + machine_lease = key.get() |
| + if not machine_lease: |
| + logging.error('MachineLease does not exist: %s', key) |
| + return |
| - if new_requests or expired_requests: |
| - machine_type.put() |
| - lease_requests.extend(new_requests) |
| + if request_id != machine_lease.client_request_id: |
| + logging.error( |
| + 'Request ID mismatch for MachineLease: %s\nExpected: %s\nActual: %s', |
| + machine_lease.client_request_id, |
| + request_id, |
| + ) |
| + return |
| - return lease_requests |
| + if (hostname == machine_lease.hostname |
| + and lease_expiration_ts == machine_lease.lease_expiration_ts): |
| + return |
| + machine_lease.hostname = hostname |
| + machine_lease.lease_expiration_ts = datetime.datetime.utcfromtimestamp( |
| + lease_expiration_ts) |
| + machine_lease.put() |
| -def _clean_up_expired_leases(machine_type): |
| - """Cleans up expired leases. |
| - Prunes expired leases from machine_type.leases, |
| - but does not write the result to the datastore. |
| +@ndb.transactional |
| +def increment_lease_request_id(key): |
|
Vadim Sh.
2016/10/03 20:26:32
nit: update_client_request_id
It takes an entity
smut
2016/10/03 21:18:08
Done.
|
| + """Increments the client request ID used to lease a machine. |
| Args: |
| - machine_type: MachineType instance. |
| - |
| - Returns: |
| - A list of leases that were removed. |
| + key: ndb.Key for a MachineLease entity. |
| """ |
| - active = [] |
| - expired = [] |
| + machine_lease = key.get() |
| + if not machine_lease: |
| + logging.error('MachineLease does not exist: %s', key) |
| + return |
| - for request in machine_type.leases: |
| - if request.hostname and request.lease_expiration_ts <= utils.utcnow(): |
| - logging.warning( |
| - 'Request ID %s expired:\nHostname: %s\nExpiration: %s', |
| - request.client_request_id, |
| - request.hostname, |
| - request.lease_expiration_ts, |
| - ) |
| - expired.append(request.hostname) |
| - else: |
| - active.append(request) |
| + if machine_lease.drained: |
| + logging.info('MachineLease is drained: %s', key) |
| + return |
| - machine_type.leases = active |
| - machine_type.pending_deletion.extend(expired) |
| - return expired |
| + if machine_lease.client_request_id: |
| + return |
| + machine_lease.request_count += 1 |
| + machine_lease.client_request_id = '%s-%s-%s' % ( |
| + machine_lease.machine_type.id(), key.id(), machine_lease.request_count) |
| + machine_lease.put() |
| -def _generate_lease_request_status_updates( |
| - machine_type, app_id, swarming_server): |
| - """Generates status update requests for pending lease requests. |
| - Args: |
| - machine_type: MachineType instance. |
| - app_id: ID of the application the requests originate from. |
| - swarming_server: URL for the Swarming server to connect to. |
| +@ndb.transactional |
| +def delete_machine_lease(key): |
| + """Deletes the given MachineLease if it is drained and has no active lease. |
| - Returns: |
| - A list of lease requests. |
| + Args: |
| + key: ndb.Key for a MachineLease entity. |
| """ |
| - lease_requests = [] |
| - for request in machine_type.leases: |
| - if not request.hostname: |
| - # We don't know the hostname yet, meaning this request is still pending. |
| - lease_requests.append(machine_provider.LeaseRequest( |
| - dimensions=machine_type.mp_dimensions, |
| - duration=machine_type.lease_duration_secs, |
| - on_lease=machine_provider.Instruction( |
| - swarming_server=swarming_server), |
| - pubsub_project=app_id, |
| - pubsub_topic=PUBSUB_TOPIC, |
| - request_id=request.client_request_id, |
| - )) |
| - return lease_requests |
| - |
| - |
| -def _generate_lease_requests_for_new_machines( |
| - machine_type, app_id, swarming_server): |
| - """Generates requests to lease machines up to the target. |
| - |
| - Extends machine_type.leases by the number of new lease requests generated, |
| - but does not write the result to the datastore. |
| + machine_lease = key.get() |
| + if not machine_lease: |
| + return |
| - Args: |
| - machine_type: MachineType instance. |
| - app_id: ID of the application the requests originate from. |
| - swarming_server: URL for the Swarming server to connect to. |
| + if not machine_lease.drained: |
| + logging.warning('MachineLease not drained: %s', key) |
| + return |
| - Returns: |
| - A list of lease requests. |
| - """ |
| - lease_requests = [] |
| - request_number = machine_type.request_count |
| - for _ in xrange(machine_type.target_size - machine_type.current_size): |
| - request_number += 1 |
| - request_id = '%s-%d' % (machine_type.key.id(), request_number) |
| - lease_requests.append(machine_provider.LeaseRequest( |
| - dimensions=machine_type.mp_dimensions, |
| - duration=machine_type.lease_duration_secs, |
| - on_lease=machine_provider.Instruction(swarming_server=swarming_server), |
| - pubsub_project=app_id, |
| - pubsub_topic=PUBSUB_TOPIC, |
| - request_id=request_id, |
| - )) |
| - machine_type.leases.append(MachineLease(client_request_id=request_id)) |
| - machine_type.request_count = request_number |
| - return lease_requests |
| + if machine_lease.client_request_id: |
|
Vadim Sh.
2016/10/03 20:26:31
is this possible if drained == True?
smut
2016/10/03 21:18:07
Yes, there's the case where the MachineLease alrea
|
| + return |
| + key.delete() |
| -@ndb.transactional |
| -def update_leases(machine_type_key, responses): |
| - """Updates the states of leases of the given machine types. |
| + |
| +def manage_lease(key): |
| + """Manages a MachineLease. |
| Args: |
| - machine_type_key: ndb.Key for a MachineType instance. |
| - responses: machine_provider.BatchedLeaseResponse instance. |
| + key: ndb.Key for a MachineLease entity. |
| """ |
| - machine_type = machine_type_key.get() |
| - if not machine_type: |
| - logging.warning('MachineType no longer exists: %s', machine_type_key) |
| + machine_lease = key.get() |
| + if not machine_lease: |
| return |
| - lease_request_map = { |
| - request.client_request_id: request for request in machine_type.leases |
| - } |
| - for response in responses: |
| - request_id = response['client_request_id'] |
| - request = lease_request_map.get(request_id) |
| - if not request: |
| - logging.error('Unknown request ID: %s', request_id) |
| - continue |
| + # Manage a leased machine. |
| + if machine_lease.lease_expiration_ts: |
| + if machine_lease.lease_expiration_ts <= utils.utcnow(): |
| + logging.info('MachineLease expired: %s', key) |
| + assert machine_lease.hostname, key |
| + clear_lease_request(key, machine_lease.client_request_id) |
| + return |
| + |
| + # Lease expiration time is unknown, so there must be no leased machine. |
| + assert not machine_lease.hostname, key |
| + |
| + # Manage a pending lease request. |
| + if machine_lease.client_request_id: |
|
Vadim Sh.
2016/10/03 20:26:31
can the body of this 'if' be converted into a sepa
smut
2016/10/03 21:18:07
Done.
|
| + response = machine_provider.lease_machine( |
| + machine_provider.LeaseRequest( |
| + dimensions=machine_lease.mp_dimensions, |
| + duration=machine_lease.lease_duration_secs, |
|
Vadim Sh.
2016/10/03 20:26:32
I really really wish we can randomize this a bit t
smut
2016/10/03 21:18:07
Your suggestion of varying with +/- 10% seems reas
Vadim Sh.
2016/10/03 21:34:18
Instead of random() you can use some hash-like fun
smut
2016/10/03 21:41:50
random.seed(machine_lease.client_request_id)
rando
Vadim Sh.
2016/10/03 21:46:43
Sort of, except random.seed will modify global see
|
| + on_lease=machine_provider.Instruction( |
| + swarming_server='https://%s' % ( |
| + app_identity.get_default_version_hostname())), |
| + request_id=machine_lease.client_request_id, |
| + ), |
| + ) |
| if response.get('error'): |
| error = machine_provider.LeaseRequestError.lookup_by_name( |
| @@ -360,104 +343,61 @@ def update_leases(machine_type_key, responses): |
| machine_provider.LeaseRequestError.DEADLINE_EXCEEDED, |
| machine_provider.LeaseRequestError.TRANSIENT_ERROR, |
| ): |
| - # Retryable errors. Just try again later. |
| - logging.warning( |
| - 'Request not processed, trying again later: %s', request_id) |
| - else: |
| - # TODO(smut): Handle specific errors. |
| logging.warning( |
| - 'Error %s for request ID %s', |
| + 'Transient failure: %s\nRequest ID: %s\nError: %s', |
| + key, |
| + request_id, |
| response['error'], |
| + ) |
| + else: |
| + logging.error( |
| + 'Lease request failed: %s\nRequest ID: %s\nError: %s', |
|
Vadim Sh.
2016/10/03 20:26:32
nit: "Lease request failed\n%s...."
it will make
smut
2016/10/03 21:18:07
Done.
|
| + key, |
| request_id, |
| + response['error'], |
| + ) |
| + clear_lease_request(key, machine_lease.client_request_id) |
| + return |
| + |
| + state = machine_provider.LeaseRequestState.lookup_by_name(response['state']) |
| + if state == machine_provider.LeaseRequestState.FULFILLED: |
| + if not response.get('hostname'): |
| + # Lease has already expired. This shouldn't happen, but it indicates the |
| + # lease expired faster than we could tell it even got fulfilled. |
| + logging.error( |
| + 'Request expired: %s\nRequest ID:%s\nExpired: %s', |
|
Vadim Sh.
2016/10/03 20:26:31
same here
smut
2016/10/03 21:18:07
Done.
|
| + key, |
| + machine_lease.client_request_id, |
| + response['lease_expiration_ts'], |
| ) |
| - lease_request_map.pop(request_id) |
| - else: |
| - request.request_hash = response['request_hash'] |
| - state = machine_provider.LeaseRequestState.lookup_by_name( |
| - response['state']) |
| - if state == machine_provider.LeaseRequestState.DENIED: |
| - logging.warning('Request ID denied: %s', request_id) |
| - lease_request_map.pop(request_id) |
| - elif state == machine_provider.LeaseRequestState.FULFILLED: |
| - if response.get('hostname'): |
| - logging.info( |
| - 'Request ID %s fulfilled:\nHostname: %s\nExpiration: %s', |
| - request_id, |
| - response['hostname'], |
| - response['lease_expiration_ts'], |
| - ) |
| - request.hostname = response['hostname'] |
| - request.lease_expiration_ts = datetime.datetime.utcfromtimestamp( |
| - int(response['lease_expiration_ts'])) |
| - else: |
| - # Lease expired. This shouldn't happen, because it means we had a |
| - # pending request which was fulfilled and expired before we were |
| - # able to check its status. |
| - logging.warning('Request ID fulfilled and expired: %s', request_id) |
| - lease_request_map.pop(request_id) |
| + clear_lease_request(key, machine_lease.client_request_id) |
| else: |
| - # Lease request isn't processed yet. Just try again later. |
| logging.info( |
| - 'Request ID %s in state: %s', request_id, response['state']) |
| - |
| - machine_type.leases = sorted( |
| - lease_request_map.values(), key=lambda lease: lease.client_request_id) |
| - machine_type.put() |
| - |
| - |
| -@ndb.tasklet |
| -def process_message(message, subscription): |
| - """Processes a Pub/Sub message from the Machine Provider. |
| - |
| - Args: |
| - message: A message dict as returned by pubsub.pull. |
| - subscription: Full name of the subscription the message was received on. |
| - """ |
| - now = utils.utcnow() |
| - |
| - try: |
| - data = base64.b64decode(message.get('message', {}).get('data', '')) |
| - lease_response = json.loads(data) |
| - # Machine Provider sends a response including lease_expiration_ts. If |
| - # lease_expiration_ts is not in the future and there is no more hostname |
| - # associated with the response then this is a reclamation message. |
| - lease_expiration_ts = datetime.datetime.utcfromtimestamp( |
| - int(lease_response['lease_expiration_ts'])) |
| - if lease_expiration_ts <= now and not lease_response.get('hostname'): |
| - # We used request IDs of the form MachineType.key.id()-<integer>. |
| - # Extract the ID for the MachineType from the request ID. |
| - machine_type_id = lease_response['client_request_id'].rsplit('-', 1)[0] |
| - logging.info('Lease ID %s is expired\nMachineType: %s', |
| - lease_response['client_request_id'], |
| - MachineType.get_by_id(machine_type_id)) |
| - except (TypeError, ValueError): |
| - logging.error('Received unexpected Pub/Sub message:\n%s', |
| - json.dumps(message, indent=2)) |
| - |
| - yield pubsub.ack_async(subscription, message['ackId']) |
| - |
| - |
| -def process_pubsub(app_id): |
| - """Processes Pub/Sub messages from the Machine Provider, if there are any. |
| + 'Request fulfilled: %s\nRequest ID: %s\nHostname: %s\nExpires: %s', |
| + key, |
| + machine_lease.client_request_id, |
| + response['hostname'], |
| + response['lease_expiration_ts'], |
| + ) |
| + log_lease_fulfillment( |
| + key, |
| + machine_lease.client_request_id, |
| + response['hostname'], |
| + int(response['lease_expiration_ts']), |
| + ) |
| + # TODO(smut): Create BotInfo, associate lease information. |
|
Vadim Sh.
2016/10/03 20:26:32
yep, in the same transaction
smut
2016/10/03 21:18:07
xg in log_lease_fulfillment?
Vadim Sh.
2016/10/03 21:34:18
Yes. Assuming 'manage_lease' is idempotent and wil
smut
2016/10/03 21:41:50
Yes should be idempotent.
|
| + elif state == machine_provider.LeaseRequestState.DENIED: |
| + logging.warning( |
| + 'Request denied: %s\nRequest ID: %s', |
| + key, |
| + machine_lease.client_request_id, |
| + ) |
| + clear_lease_request(key, machine_lease.client_request_id) |
| + return |
| - Args: |
| - app_id: ID of the application where the Pub/Sub subscription exists. |
| - """ |
| - MAX_IN_FLIGHT = 50 |
| - subscription = pubsub.full_subscription_name(app_id, PUBSUB_SUBSCRIPTION) |
| - response = pubsub.pull(subscription) |
| - logging.info('%s', response) |
| + # Manage an uninitiated lease request. |
| + if not machine_lease.drained: |
| + increment_lease_request_id(key) |
| - futures = [] |
| - messages = response.get('receivedMessages', []) |
| - while messages: |
| - num_futures = len(futures) |
| - if num_futures < MAX_IN_FLIGHT: |
| - futures.extend( |
| - process_message(message, subscription) |
| - for message in messages[:MAX_IN_FLIGHT - num_futures]) |
| - messages = messages[MAX_IN_FLIGHT - num_futures:] |
| - ndb.Future.wait_any(futures) |
| - futures = [future for future in futures if not future.done()] |
| - if futures: |
| - ndb.Future.wait_all(futures) |
| + # Manage an uninitiated, drained lease request. |
| + delete_machine_lease(key) |
|
Vadim Sh.
2016/10/03 20:26:32
shouldn't it be in 'else' section of 'if not machi
smut
2016/10/03 21:18:07
Er, yes. I just added return above under the if so
|