Index: appengine/gce-backend/pubsub.py |
diff --git a/appengine/gce-backend/pubsub.py b/appengine/gce-backend/pubsub.py |
deleted file mode 100644 |
index 94ad632d0897d431bb341e7ad2725d89b3802343..0000000000000000000000000000000000000000 |
--- a/appengine/gce-backend/pubsub.py |
+++ /dev/null |
@@ -1,167 +0,0 @@ |
-# Copyright 2016 The LUCI Authors. All rights reserved. |
-# Use of this source code is governed under the Apache License, Version 2.0 |
-# that can be found in the LICENSE file. |
- |
-"""Utilities for interacting with Pub/Sub.""" |
- |
-import base64 |
-import collections |
-import json |
-import logging |
- |
-from google.appengine.api import app_identity |
-from google.appengine.ext import ndb |
- |
-from components import gce |
-from components import machine_provider |
-from components import pubsub |
-from components import utils |
- |
-import instances |
-import models |
-import utilities |
- |
- |
-@utils.cache |
-def get_machine_provider_topic_project(): |
- """Returns the project the Machine Provider topic is contained in.""" |
- return app_identity.get_application_id() |
- |
- |
-def get_machine_provider_topic(): |
- """Returns the name of the topic Machine Provider communication occurs on.""" |
- return 'machine-provider' |
- |
- |
-def get_machine_provider_subscription(): |
- """Returns the subscription to the Machine Provider topic.""" |
- return 'machine-provider' |
- |
- |
-@ndb.tasklet |
-def _process_message(key, data, attributes): |
- """Processes a single Pub/Sub message. |
- |
- Args: |
- key: ndb.Key for the models.Instance this message refers to. |
- data: Text of the message. |
- attributes: Any attributes associated with the message. |
- """ |
- assert ndb.in_transaction() |
- |
- if key.kind() == 'Instance': |
- if data == 'LEASED': |
- logging.info('Instance leased: %s', key) |
- elif data == 'RECLAIMED': |
- yield instances.mark_for_deletion(key) |
- elif data == 'SUBSCRIBED': |
- yield instances.add_subscription_metadata( |
- key, |
- attributes['subscription_project'], |
- attributes['subscription'], |
- ) |
- else: |
- logging.error('Unexpected key: %s', key) |
- |
- |
-@ndb.transactional_tasklet |
-def _process(messages): |
- """Processes the given Pub/Sub messages. |
- |
- Args: |
- messages: A list of message dicts where each message refers to an Instance |
- with the same parent InstanceGroupManager. |
- """ |
- for message in messages: |
- attributes = message.get('message', {}).get('attributes', {}) |
- data = base64.b64decode(message.get('message', {}).get('data', '')) |
- logging.info( |
- 'Received Pub/Sub message: %s\nAt: %s\nMessage: %s\nAttributes: %s\n', |
- message['ackId'], |
- message.get('message', {}).get('publishTime'), |
- data, |
- json.dumps(attributes, indent=2), |
- ) |
- key = ndb.Key(urlsafe=attributes['key']) |
- yield _process_message(key, data, attributes) |
- |
- |
-@ndb.tasklet |
-def process(messages): |
- """Processes the given Pub/Sub messages. |
- |
- Args: |
- messages: A list of message dicts where each message refers to an Instance |
- with the same parent InstanceGroupManager. |
- """ |
- logging.info('Processing messages: %s', len(messages)) |
- ack_ids = [message['ackId'] for message in messages] |
- # Since it can take some time to handle the Pub/Sub messages and commit the |
- # large transaction, extend the ack deadline to match the cron interval with |
- # which we pull Pub/Sub messages. We should not extend the deadline longer |
- # than that, otherwise if the transaction fails and the messages are not |
- # acknowledged then we won't receive those messages again during the next |
- # run of the cron job. |
- yield pubsub.modify_ack_deadline_async( |
- pubsub.full_subscription_name( |
- get_machine_provider_topic_project(), |
- get_machine_provider_subscription(), |
- ), |
- 60, |
- *ack_ids |
- ) |
- yield _process(messages) |
- yield pubsub.ack_async( |
- pubsub.full_subscription_name( |
- get_machine_provider_topic_project(), |
- get_machine_provider_subscription(), |
- ), |
- *ack_ids |
- ) |
- |
- |
-def split_by_entity_group(messages): |
- """Returns a list of lists of Pub/Sub messages in the same entity group. |
- |
- Each list contains Pub/Sub messages that refer to Instance entities with |
- a common parent InstanceGroupManager. |
- |
- Args: |
- messages: A list of message dicts. |
- """ |
- filtered = collections.defaultdict(list) |
- |
- for message in messages: |
- attributes = message.get('message', {}).get('attributes', {}) |
- key = ndb.Key(urlsafe=attributes['key']) |
- if key.kind() != 'Instance': |
- filtered['erroneous'].append(message) |
- else: |
- filtered[key.parent()].append(message) |
- |
- return filtered.values() |
- |
- |
-def poll(): |
- """Polls and processes Pub/Sub messages.""" |
- response = pubsub.pull( |
- pubsub.full_subscription_name( |
- get_machine_provider_topic_project(), |
- get_machine_provider_subscription(), |
- ), |
- max_messages=400, |
- ) |
- |
- if response.get('receivedMessages', []): |
- logging.info('Messages received: %s', len(response['receivedMessages'])) |
- messages = split_by_entity_group(response['receivedMessages']) |
- utilities.batch_process_async(messages, process) |
- |
- |
-def schedule_poll(): |
- """Enqueues tasks to poll for Pub/Sub messages.""" |
- if not utils.enqueue_task( |
- '/internal/queues/process-pubsub-messages', |
- 'process-pubsub-messages', |
- ): |
- logging.warning('Failed to enqueue task for Pub/Sub') |