OLD | NEW |
| (Empty) |
1 """Channel notifications support. | |
2 | |
3 Classes and functions to support channel subscriptions and notifications | |
4 on those channels. | |
5 | |
6 Notes: | |
7 - This code is based on experimental APIs and is subject to change. | |
8 - Notification does not do deduplication of notification ids, that's up to | |
9 the receiver. | |
10 - Storing the Channel between calls is up to the caller. | |
11 | |
12 | |
13 Example setting up a channel: | |
14 | |
15 # Create a new channel that gets notifications via webhook. | |
16 channel = new_webhook_channel("https://example.com/my_web_hook") | |
17 | |
18 # Store the channel, keyed by 'channel.id'. Store it before calling the | |
19 # watch method because notifications may start arriving before the watch | |
20 # method returns. | |
21 ... | |
22 | |
23 resp = service.objects().watchAll( | |
24 bucket="some_bucket_id", body=channel.body()).execute() | |
25 channel.update(resp) | |
26 | |
27 # Store the channel, keyed by 'channel.id'. Store it after being updated | |
28 # since the resource_id value will now be correct, and that's needed to | |
29 # stop a subscription. | |
30 ... | |
31 | |
32 | |
33 An example Webhook implementation using webapp2. Note that webapp2 puts | |
34 headers in a case insensitive dictionary, as headers aren't guaranteed to | |
35 always be upper case. | |
36 | |
37 id = self.request.headers[X_GOOG_CHANNEL_ID] | |
38 | |
39 # Retrieve the channel by id. | |
40 channel = ... | |
41 | |
42 # Parse notification from the headers, including validating the id. | |
43 n = notification_from_headers(channel, self.request.headers) | |
44 | |
45 # Do app specific stuff with the notification here. | |
46 if n.resource_state == 'sync': | |
47 # Code to handle sync state. | |
48 elif n.resource_state == 'exists': | |
49 # Code to handle the exists state. | |
50 elif n.resource_state == 'not_exists': | |
51 # Code to handle the not exists state. | |
52 | |
53 | |
54 Example of unsubscribing. | |
55 | |
56 service.channels().stop(channel.body()) | |
57 """ | |
58 | |
59 import datetime | |
60 import uuid | |
61 | |
62 from googleapiclient import errors | |
63 from ...oauth2client import util | |
64 | |
65 | |
66 # The unix time epoch starts at midnight 1970. | |
67 EPOCH = datetime.datetime.utcfromtimestamp(0) | |
68 | |
69 # Map the names of the parameters in the JSON channel description to | |
70 # the parameter names we use in the Channel class. | |
71 CHANNEL_PARAMS = { | |
72 'address': 'address', | |
73 'id': 'id', | |
74 'expiration': 'expiration', | |
75 'params': 'params', | |
76 'resourceId': 'resource_id', | |
77 'resourceUri': 'resource_uri', | |
78 'type': 'type', | |
79 'token': 'token', | |
80 } | |
81 | |
82 X_GOOG_CHANNEL_ID = 'X-GOOG-CHANNEL-ID' | |
83 X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER' | |
84 X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE' | |
85 X_GOOG_RESOURCE_URI = 'X-GOOG-RESOURCE-URI' | |
86 X_GOOG_RESOURCE_ID = 'X-GOOG-RESOURCE-ID' | |
87 | |
88 | |
89 def _upper_header_keys(headers): | |
90 new_headers = {} | |
91 for k, v in headers.iteritems(): | |
92 new_headers[k.upper()] = v | |
93 return new_headers | |
94 | |
95 | |
96 class Notification(object): | |
97 """A Notification from a Channel. | |
98 | |
99 Notifications are not usually constructed directly, but are returned | |
100 from functions like notification_from_headers(). | |
101 | |
102 Attributes: | |
103 message_number: int, The unique id number of this notification. | |
104 state: str, The state of the resource being monitored. | |
105 uri: str, The address of the resource being monitored. | |
106 resource_id: str, The unique identifier of the version of the resource at | |
107 this event. | |
108 """ | |
109 @util.positional(5) | |
110 def __init__(self, message_number, state, resource_uri, resource_id): | |
111 """Notification constructor. | |
112 | |
113 Args: | |
114 message_number: int, The unique id number of this notification. | |
115 state: str, The state of the resource being monitored. Can be one | |
116 of "exists", "not_exists", or "sync". | |
117 resource_uri: str, The address of the resource being monitored. | |
118 resource_id: str, The identifier of the watched resource. | |
119 """ | |
120 self.message_number = message_number | |
121 self.state = state | |
122 self.resource_uri = resource_uri | |
123 self.resource_id = resource_id | |
124 | |
125 | |
126 class Channel(object): | |
127 """A Channel for notifications. | |
128 | |
129 Usually not constructed directly, instead it is returned from helper | |
130 functions like new_webhook_channel(). | |
131 | |
132 Attributes: | |
133 type: str, The type of delivery mechanism used by this channel. For | |
134 example, 'web_hook'. | |
135 id: str, A UUID for the channel. | |
136 token: str, An arbitrary string associated with the channel that | |
137 is delivered to the target address with each event delivered | |
138 over this channel. | |
139 address: str, The address of the receiving entity where events are | |
140 delivered. Specific to the channel type. | |
141 expiration: int, The time, in milliseconds from the epoch, when this | |
142 channel will expire. | |
143 params: dict, A dictionary of string to string, with additional parameters | |
144 controlling delivery channel behavior. | |
145 resource_id: str, An opaque id that identifies the resource that is | |
146 being watched. Stable across different API versions. | |
147 resource_uri: str, The canonicalized ID of the watched resource. | |
148 """ | |
149 | |
150 @util.positional(5) | |
151 def __init__(self, type, id, token, address, expiration=None, | |
152 params=None, resource_id="", resource_uri=""): | |
153 """Create a new Channel. | |
154 | |
155 In user code, this Channel constructor will not typically be called | |
156 manually since there are functions for creating channels for each specific | |
157 type with a more customized set of arguments to pass. | |
158 | |
159 Args: | |
160 type: str, The type of delivery mechanism used by this channel. For | |
161 example, 'web_hook'. | |
162 id: str, A UUID for the channel. | |
163 token: str, An arbitrary string associated with the channel that | |
164 is delivered to the target address with each event delivered | |
165 over this channel. | |
166 address: str, The address of the receiving entity where events are | |
167 delivered. Specific to the channel type. | |
168 expiration: int, The time, in milliseconds from the epoch, when this | |
169 channel will expire. | |
170 params: dict, A dictionary of string to string, with additional parameters | |
171 controlling delivery channel behavior. | |
172 resource_id: str, An opaque id that identifies the resource that is | |
173 being watched. Stable across different API versions. | |
174 resource_uri: str, The canonicalized ID of the watched resource. | |
175 """ | |
176 self.type = type | |
177 self.id = id | |
178 self.token = token | |
179 self.address = address | |
180 self.expiration = expiration | |
181 self.params = params | |
182 self.resource_id = resource_id | |
183 self.resource_uri = resource_uri | |
184 | |
185 def body(self): | |
186 """Build a body from the Channel. | |
187 | |
188 Constructs a dictionary that's appropriate for passing into watch() | |
189 methods as the value of body argument. | |
190 | |
191 Returns: | |
192 A dictionary representation of the channel. | |
193 """ | |
194 result = { | |
195 'id': self.id, | |
196 'token': self.token, | |
197 'type': self.type, | |
198 'address': self.address | |
199 } | |
200 if self.params: | |
201 result['params'] = self.params | |
202 if self.resource_id: | |
203 result['resourceId'] = self.resource_id | |
204 if self.resource_uri: | |
205 result['resourceUri'] = self.resource_uri | |
206 if self.expiration: | |
207 result['expiration'] = self.expiration | |
208 | |
209 return result | |
210 | |
211 def update(self, resp): | |
212 """Update a channel with information from the response of watch(). | |
213 | |
214 When a request is sent to watch() a resource, the response returned | |
215 from the watch() request is a dictionary with updated channel information, | |
216 such as the resource_id, which is needed when stopping a subscription. | |
217 | |
218 Args: | |
219 resp: dict, The response from a watch() method. | |
220 """ | |
221 for json_name, param_name in CHANNEL_PARAMS.iteritems(): | |
222 value = resp.get(json_name) | |
223 if value is not None: | |
224 setattr(self, param_name, value) | |
225 | |
226 | |
227 def notification_from_headers(channel, headers): | |
228 """Parse a notification from the webhook request headers, validate | |
229 the notification, and return a Notification object. | |
230 | |
231 Args: | |
232 channel: Channel, The channel that the notification is associated with. | |
233 headers: dict, A dictionary like object that contains the request headers | |
234 from the webhook HTTP request. | |
235 | |
236 Returns: | |
237 A Notification object. | |
238 | |
239 Raises: | |
240 errors.InvalidNotificationError if the notification is invalid. | |
241 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int. | |
242 """ | |
243 headers = _upper_header_keys(headers) | |
244 channel_id = headers[X_GOOG_CHANNEL_ID] | |
245 if channel.id != channel_id: | |
246 raise errors.InvalidNotificationError( | |
247 'Channel id mismatch: %s != %s' % (channel.id, channel_id)) | |
248 else: | |
249 message_number = int(headers[X_GOOG_MESSAGE_NUMBER]) | |
250 state = headers[X_GOOG_RESOURCE_STATE] | |
251 resource_uri = headers[X_GOOG_RESOURCE_URI] | |
252 resource_id = headers[X_GOOG_RESOURCE_ID] | |
253 return Notification(message_number, state, resource_uri, resource_id) | |
254 | |
255 | |
256 @util.positional(2) | |
257 def new_webhook_channel(url, token=None, expiration=None, params=None): | |
258 """Create a new webhook Channel. | |
259 | |
260 Args: | |
261 url: str, URL to post notifications to. | |
262 token: str, An arbitrary string associated with the channel that | |
263 is delivered to the target address with each notification delivered | |
264 over this channel. | |
265 expiration: datetime.datetime, A time in the future when the channel | |
266 should expire. Can also be None if the subscription should use the | |
267 default expiration. Note that different services may have different | |
268 limits on how long a subscription lasts. Check the response from the | |
269 watch() method to see the value the service has set for an expiration | |
270 time. | |
271 params: dict, Extra parameters to pass on channel creation. Currently | |
272 not used for webhook channels. | |
273 """ | |
274 expiration_ms = 0 | |
275 if expiration: | |
276 delta = expiration - EPOCH | |
277 expiration_ms = delta.microseconds/1000 + ( | |
278 delta.seconds + delta.days*24*3600)*1000 | |
279 if expiration_ms < 0: | |
280 expiration_ms = 0 | |
281 | |
282 return Channel('web_hook', str(uuid.uuid4()), | |
283 token, url, expiration=expiration_ms, | |
284 params=params) | |
285 | |
OLD | NEW |