Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(68)

Side by Side Diff: appengine/gce-backend/pubsub.py

Issue 2480653002: Remove unused pubsub stuff (Closed)
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file.
4
5 """Utilities for interacting with Pub/Sub."""
6
7 import base64
8 import collections
9 import json
10 import logging
11
12 from google.appengine.api import app_identity
13 from google.appengine.ext import ndb
14
15 from components import gce
16 from components import machine_provider
17 from components import pubsub
18 from components import utils
19
20 import instances
21 import models
22 import utilities
23
24
25 @utils.cache
26 def get_machine_provider_topic_project():
27 """Returns the project the Machine Provider topic is contained in."""
28 return app_identity.get_application_id()
29
30
31 def get_machine_provider_topic():
32 """Returns the name of the topic Machine Provider communication occurs on."""
33 return 'machine-provider'
34
35
36 def get_machine_provider_subscription():
37 """Returns the subscription to the Machine Provider topic."""
38 return 'machine-provider'
39
40
41 @ndb.tasklet
42 def _process_message(key, data, attributes):
43 """Processes a single Pub/Sub message.
44
45 Args:
46 key: ndb.Key for the models.Instance this message refers to.
47 data: Text of the message.
48 attributes: Any attributes associated with the message.
49 """
50 assert ndb.in_transaction()
51
52 if key.kind() == 'Instance':
53 if data == 'LEASED':
54 logging.info('Instance leased: %s', key)
55 elif data == 'RECLAIMED':
56 yield instances.mark_for_deletion(key)
57 elif data == 'SUBSCRIBED':
58 yield instances.add_subscription_metadata(
59 key,
60 attributes['subscription_project'],
61 attributes['subscription'],
62 )
63 else:
64 logging.error('Unexpected key: %s', key)
65
66
67 @ndb.transactional_tasklet
68 def _process(messages):
69 """Processes the given Pub/Sub messages.
70
71 Args:
72 messages: A list of message dicts where each message refers to an Instance
73 with the same parent InstanceGroupManager.
74 """
75 for message in messages:
76 attributes = message.get('message', {}).get('attributes', {})
77 data = base64.b64decode(message.get('message', {}).get('data', ''))
78 logging.info(
79 'Received Pub/Sub message: %s\nAt: %s\nMessage: %s\nAttributes: %s\n',
80 message['ackId'],
81 message.get('message', {}).get('publishTime'),
82 data,
83 json.dumps(attributes, indent=2),
84 )
85 key = ndb.Key(urlsafe=attributes['key'])
86 yield _process_message(key, data, attributes)
87
88
89 @ndb.tasklet
90 def process(messages):
91 """Processes the given Pub/Sub messages.
92
93 Args:
94 messages: A list of message dicts where each message refers to an Instance
95 with the same parent InstanceGroupManager.
96 """
97 logging.info('Processing messages: %s', len(messages))
98 ack_ids = [message['ackId'] for message in messages]
99 # Since it can take some time to handle the Pub/Sub messages and commit the
100 # large transaction, extend the ack deadline to match the cron interval with
101 # which we pull Pub/Sub messages. We should not extend the deadline longer
102 # than that, otherwise if the transaction fails and the messages are not
103 # acknowledged then we won't receive those messages again during the next
104 # run of the cron job.
105 yield pubsub.modify_ack_deadline_async(
106 pubsub.full_subscription_name(
107 get_machine_provider_topic_project(),
108 get_machine_provider_subscription(),
109 ),
110 60,
111 *ack_ids
112 )
113 yield _process(messages)
114 yield pubsub.ack_async(
115 pubsub.full_subscription_name(
116 get_machine_provider_topic_project(),
117 get_machine_provider_subscription(),
118 ),
119 *ack_ids
120 )
121
122
123 def split_by_entity_group(messages):
124 """Returns a list of lists of Pub/Sub messages in the same entity group.
125
126 Each list contains Pub/Sub messages that refer to Instance entities with
127 a common parent InstanceGroupManager.
128
129 Args:
130 messages: A list of message dicts.
131 """
132 filtered = collections.defaultdict(list)
133
134 for message in messages:
135 attributes = message.get('message', {}).get('attributes', {})
136 key = ndb.Key(urlsafe=attributes['key'])
137 if key.kind() != 'Instance':
138 filtered['erroneous'].append(message)
139 else:
140 filtered[key.parent()].append(message)
141
142 return filtered.values()
143
144
145 def poll():
146 """Polls and processes Pub/Sub messages."""
147 response = pubsub.pull(
148 pubsub.full_subscription_name(
149 get_machine_provider_topic_project(),
150 get_machine_provider_subscription(),
151 ),
152 max_messages=400,
153 )
154
155 if response.get('receivedMessages', []):
156 logging.info('Messages received: %s', len(response['receivedMessages']))
157 messages = split_by_entity_group(response['receivedMessages'])
158 utilities.batch_process_async(messages, process)
159
160
161 def schedule_poll():
162 """Enqueues tasks to poll for Pub/Sub messages."""
163 if not utils.enqueue_task(
164 '/internal/queues/process-pubsub-messages',
165 'process-pubsub-messages',
166 ):
167 logging.warning('Failed to enqueue task for Pub/Sub')
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698