| OLD | NEW |
| (Empty) |
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. | |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 # that can be found in the LICENSE file. | |
| 4 | |
| 5 """Utilities for interacting with Pub/Sub.""" | |
| 6 | |
| 7 import base64 | |
| 8 import collections | |
| 9 import json | |
| 10 import logging | |
| 11 | |
| 12 from google.appengine.api import app_identity | |
| 13 from google.appengine.ext import ndb | |
| 14 | |
| 15 from components import gce | |
| 16 from components import machine_provider | |
| 17 from components import pubsub | |
| 18 from components import utils | |
| 19 | |
| 20 import instances | |
| 21 import models | |
| 22 import utilities | |
| 23 | |
| 24 | |
| 25 @utils.cache | |
| 26 def get_machine_provider_topic_project(): | |
| 27 """Returns the project the Machine Provider topic is contained in.""" | |
| 28 return app_identity.get_application_id() | |
| 29 | |
| 30 | |
| 31 def get_machine_provider_topic(): | |
| 32 """Returns the name of the topic Machine Provider communication occurs on.""" | |
| 33 return 'machine-provider' | |
| 34 | |
| 35 | |
| 36 def get_machine_provider_subscription(): | |
| 37 """Returns the subscription to the Machine Provider topic.""" | |
| 38 return 'machine-provider' | |
| 39 | |
| 40 | |
| 41 @ndb.tasklet | |
| 42 def _process_message(key, data, attributes): | |
| 43 """Processes a single Pub/Sub message. | |
| 44 | |
| 45 Args: | |
| 46 key: ndb.Key for the models.Instance this message refers to. | |
| 47 data: Text of the message. | |
| 48 attributes: Any attributes associated with the message. | |
| 49 """ | |
| 50 assert ndb.in_transaction() | |
| 51 | |
| 52 if key.kind() == 'Instance': | |
| 53 if data == 'LEASED': | |
| 54 logging.info('Instance leased: %s', key) | |
| 55 elif data == 'RECLAIMED': | |
| 56 yield instances.mark_for_deletion(key) | |
| 57 elif data == 'SUBSCRIBED': | |
| 58 yield instances.add_subscription_metadata( | |
| 59 key, | |
| 60 attributes['subscription_project'], | |
| 61 attributes['subscription'], | |
| 62 ) | |
| 63 else: | |
| 64 logging.error('Unexpected key: %s', key) | |
| 65 | |
| 66 | |
| 67 @ndb.transactional_tasklet | |
| 68 def _process(messages): | |
| 69 """Processes the given Pub/Sub messages. | |
| 70 | |
| 71 Args: | |
| 72 messages: A list of message dicts where each message refers to an Instance | |
| 73 with the same parent InstanceGroupManager. | |
| 74 """ | |
| 75 for message in messages: | |
| 76 attributes = message.get('message', {}).get('attributes', {}) | |
| 77 data = base64.b64decode(message.get('message', {}).get('data', '')) | |
| 78 logging.info( | |
| 79 'Received Pub/Sub message: %s\nAt: %s\nMessage: %s\nAttributes: %s\n', | |
| 80 message['ackId'], | |
| 81 message.get('message', {}).get('publishTime'), | |
| 82 data, | |
| 83 json.dumps(attributes, indent=2), | |
| 84 ) | |
| 85 key = ndb.Key(urlsafe=attributes['key']) | |
| 86 yield _process_message(key, data, attributes) | |
| 87 | |
| 88 | |
| 89 @ndb.tasklet | |
| 90 def process(messages): | |
| 91 """Processes the given Pub/Sub messages. | |
| 92 | |
| 93 Args: | |
| 94 messages: A list of message dicts where each message refers to an Instance | |
| 95 with the same parent InstanceGroupManager. | |
| 96 """ | |
| 97 logging.info('Processing messages: %s', len(messages)) | |
| 98 ack_ids = [message['ackId'] for message in messages] | |
| 99 # Since it can take some time to handle the Pub/Sub messages and commit the | |
| 100 # large transaction, extend the ack deadline to match the cron interval with | |
| 101 # which we pull Pub/Sub messages. We should not extend the deadline longer | |
| 102 # than that, otherwise if the transaction fails and the messages are not | |
| 103 # acknowledged then we won't receive those messages again during the next | |
| 104 # run of the cron job. | |
| 105 yield pubsub.modify_ack_deadline_async( | |
| 106 pubsub.full_subscription_name( | |
| 107 get_machine_provider_topic_project(), | |
| 108 get_machine_provider_subscription(), | |
| 109 ), | |
| 110 60, | |
| 111 *ack_ids | |
| 112 ) | |
| 113 yield _process(messages) | |
| 114 yield pubsub.ack_async( | |
| 115 pubsub.full_subscription_name( | |
| 116 get_machine_provider_topic_project(), | |
| 117 get_machine_provider_subscription(), | |
| 118 ), | |
| 119 *ack_ids | |
| 120 ) | |
| 121 | |
| 122 | |
| 123 def split_by_entity_group(messages): | |
| 124 """Returns a list of lists of Pub/Sub messages in the same entity group. | |
| 125 | |
| 126 Each list contains Pub/Sub messages that refer to Instance entities with | |
| 127 a common parent InstanceGroupManager. | |
| 128 | |
| 129 Args: | |
| 130 messages: A list of message dicts. | |
| 131 """ | |
| 132 filtered = collections.defaultdict(list) | |
| 133 | |
| 134 for message in messages: | |
| 135 attributes = message.get('message', {}).get('attributes', {}) | |
| 136 key = ndb.Key(urlsafe=attributes['key']) | |
| 137 if key.kind() != 'Instance': | |
| 138 filtered['erroneous'].append(message) | |
| 139 else: | |
| 140 filtered[key.parent()].append(message) | |
| 141 | |
| 142 return filtered.values() | |
| 143 | |
| 144 | |
| 145 def poll(): | |
| 146 """Polls and processes Pub/Sub messages.""" | |
| 147 response = pubsub.pull( | |
| 148 pubsub.full_subscription_name( | |
| 149 get_machine_provider_topic_project(), | |
| 150 get_machine_provider_subscription(), | |
| 151 ), | |
| 152 max_messages=400, | |
| 153 ) | |
| 154 | |
| 155 if response.get('receivedMessages', []): | |
| 156 logging.info('Messages received: %s', len(response['receivedMessages'])) | |
| 157 messages = split_by_entity_group(response['receivedMessages']) | |
| 158 utilities.batch_process_async(messages, process) | |
| 159 | |
| 160 | |
| 161 def schedule_poll(): | |
| 162 """Enqueues tasks to poll for Pub/Sub messages.""" | |
| 163 if not utils.enqueue_task( | |
| 164 '/internal/queues/process-pubsub-messages', | |
| 165 'process-pubsub-messages', | |
| 166 ): | |
| 167 logging.warning('Failed to enqueue task for Pub/Sub') | |
| OLD | NEW |