OLD | NEW |
| (Empty) |
1 # Copyright 2016 The LUCI Authors. All rights reserved. | |
2 # Use of this source code is governed under the Apache License, Version 2.0 | |
3 # that can be found in the LICENSE file. | |
4 | |
5 """Utilities for interacting with Pub/Sub.""" | |
6 | |
7 import base64 | |
8 import collections | |
9 import json | |
10 import logging | |
11 | |
12 from google.appengine.api import app_identity | |
13 from google.appengine.ext import ndb | |
14 | |
15 from components import gce | |
16 from components import machine_provider | |
17 from components import pubsub | |
18 from components import utils | |
19 | |
20 import instances | |
21 import models | |
22 import utilities | |
23 | |
24 | |
25 @utils.cache | |
26 def get_machine_provider_topic_project(): | |
27 """Returns the project the Machine Provider topic is contained in.""" | |
28 return app_identity.get_application_id() | |
29 | |
30 | |
31 def get_machine_provider_topic(): | |
32 """Returns the name of the topic Machine Provider communication occurs on.""" | |
33 return 'machine-provider' | |
34 | |
35 | |
36 def get_machine_provider_subscription(): | |
37 """Returns the subscription to the Machine Provider topic.""" | |
38 return 'machine-provider' | |
39 | |
40 | |
41 @ndb.tasklet | |
42 def _process_message(key, data, attributes): | |
43 """Processes a single Pub/Sub message. | |
44 | |
45 Args: | |
46 key: ndb.Key for the models.Instance this message refers to. | |
47 data: Text of the message. | |
48 attributes: Any attributes associated with the message. | |
49 """ | |
50 assert ndb.in_transaction() | |
51 | |
52 if key.kind() == 'Instance': | |
53 if data == 'LEASED': | |
54 logging.info('Instance leased: %s', key) | |
55 elif data == 'RECLAIMED': | |
56 yield instances.mark_for_deletion(key) | |
57 elif data == 'SUBSCRIBED': | |
58 yield instances.add_subscription_metadata( | |
59 key, | |
60 attributes['subscription_project'], | |
61 attributes['subscription'], | |
62 ) | |
63 else: | |
64 logging.error('Unexpected key: %s', key) | |
65 | |
66 | |
67 @ndb.transactional_tasklet | |
68 def _process(messages): | |
69 """Processes the given Pub/Sub messages. | |
70 | |
71 Args: | |
72 messages: A list of message dicts where each message refers to an Instance | |
73 with the same parent InstanceGroupManager. | |
74 """ | |
75 for message in messages: | |
76 attributes = message.get('message', {}).get('attributes', {}) | |
77 data = base64.b64decode(message.get('message', {}).get('data', '')) | |
78 logging.info( | |
79 'Received Pub/Sub message: %s\nAt: %s\nMessage: %s\nAttributes: %s\n', | |
80 message['ackId'], | |
81 message.get('message', {}).get('publishTime'), | |
82 data, | |
83 json.dumps(attributes, indent=2), | |
84 ) | |
85 key = ndb.Key(urlsafe=attributes['key']) | |
86 yield _process_message(key, data, attributes) | |
87 | |
88 | |
89 @ndb.tasklet | |
90 def process(messages): | |
91 """Processes the given Pub/Sub messages. | |
92 | |
93 Args: | |
94 messages: A list of message dicts where each message refers to an Instance | |
95 with the same parent InstanceGroupManager. | |
96 """ | |
97 logging.info('Processing messages: %s', len(messages)) | |
98 ack_ids = [message['ackId'] for message in messages] | |
99 # Since it can take some time to handle the Pub/Sub messages and commit the | |
100 # large transaction, extend the ack deadline to match the cron interval with | |
101 # which we pull Pub/Sub messages. We should not extend the deadline longer | |
102 # than that, otherwise if the transaction fails and the messages are not | |
103 # acknowledged then we won't receive those messages again during the next | |
104 # run of the cron job. | |
105 yield pubsub.modify_ack_deadline_async( | |
106 pubsub.full_subscription_name( | |
107 get_machine_provider_topic_project(), | |
108 get_machine_provider_subscription(), | |
109 ), | |
110 60, | |
111 *ack_ids | |
112 ) | |
113 yield _process(messages) | |
114 yield pubsub.ack_async( | |
115 pubsub.full_subscription_name( | |
116 get_machine_provider_topic_project(), | |
117 get_machine_provider_subscription(), | |
118 ), | |
119 *ack_ids | |
120 ) | |
121 | |
122 | |
123 def split_by_entity_group(messages): | |
124 """Returns a list of lists of Pub/Sub messages in the same entity group. | |
125 | |
126 Each list contains Pub/Sub messages that refer to Instance entities with | |
127 a common parent InstanceGroupManager. | |
128 | |
129 Args: | |
130 messages: A list of message dicts. | |
131 """ | |
132 filtered = collections.defaultdict(list) | |
133 | |
134 for message in messages: | |
135 attributes = message.get('message', {}).get('attributes', {}) | |
136 key = ndb.Key(urlsafe=attributes['key']) | |
137 if key.kind() != 'Instance': | |
138 filtered['erroneous'].append(message) | |
139 else: | |
140 filtered[key.parent()].append(message) | |
141 | |
142 return filtered.values() | |
143 | |
144 | |
145 def poll(): | |
146 """Polls and processes Pub/Sub messages.""" | |
147 response = pubsub.pull( | |
148 pubsub.full_subscription_name( | |
149 get_machine_provider_topic_project(), | |
150 get_machine_provider_subscription(), | |
151 ), | |
152 max_messages=400, | |
153 ) | |
154 | |
155 if response.get('receivedMessages', []): | |
156 logging.info('Messages received: %s', len(response['receivedMessages'])) | |
157 messages = split_by_entity_group(response['receivedMessages']) | |
158 utilities.batch_process_async(messages, process) | |
159 | |
160 | |
161 def schedule_poll(): | |
162 """Enqueues tasks to poll for Pub/Sub messages.""" | |
163 if not utils.enqueue_task( | |
164 '/internal/queues/process-pubsub-messages', | |
165 'process-pubsub-messages', | |
166 ): | |
167 logging.warning('Failed to enqueue task for Pub/Sub') | |
OLD | NEW |