OLD | NEW |
(Empty) | |
| 1 """Channel notifications support. |
| 2 |
| 3 Classes and functions to support channel subscriptions and notifications |
| 4 on those channels. |
| 5 |
| 6 Notes: |
| 7 - This code is based on experimental APIs and is subject to change. |
| 8 - Notification does not do deduplication of notification ids, that's up to |
| 9 the receiver. |
| 10 - Storing the Channel between calls is up to the caller. |
| 11 |
| 12 |
| 13 Example setting up a channel: |
| 14 |
| 15 # Create a new channel that gets notifications via webhook. |
| 16 channel = new_webhook_channel("https://example.com/my_web_hook") |
| 17 |
| 18 # Store the channel, keyed by 'channel.id'. Store it before calling the |
| 19 # watch method because notifications may start arriving before the watch |
| 20 # method returns. |
| 21 ... |
| 22 |
| 23 resp = service.objects().watchAll( |
| 24 bucket="some_bucket_id", body=channel.body()).execute() |
| 25 channel.update(resp) |
| 26 |
| 27 # Store the channel, keyed by 'channel.id'. Store it after being updated |
| 28 # since the resource_id value will now be correct, and that's needed to |
| 29 # stop a subscription. |
| 30 ... |
| 31 |
| 32 |
| 33 An example Webhook implementation using webapp2. Note that webapp2 puts |
| 34 headers in a case insensitive dictionary, as headers aren't guaranteed to |
| 35 always be upper case. |
| 36 |
| 37 id = self.request.headers[X_GOOG_CHANNEL_ID] |
| 38 |
| 39 # Retrieve the channel by id. |
| 40 channel = ... |
| 41 |
| 42 # Parse notification from the headers, including validating the id. |
| 43 n = notification_from_headers(channel, self.request.headers) |
| 44 |
| 45 # Do app specific stuff with the notification here. |
| 46 if n.resource_state == 'sync': |
| 47 # Code to handle sync state. |
| 48 elif n.resource_state == 'exists': |
| 49 # Code to handle the exists state. |
| 50 elif n.resource_state == 'not_exists': |
| 51 # Code to handle the not exists state. |
| 52 |
| 53 |
| 54 Example of unsubscribing. |
| 55 |
| 56 service.channels().stop(channel.body()) |
| 57 """ |
| 58 |
| 59 import datetime |
| 60 import uuid |
| 61 |
| 62 from googleapiclient import errors |
| 63 from ...oauth2client import util |
| 64 |
| 65 |
| 66 # The unix time epoch starts at midnight 1970. |
| 67 EPOCH = datetime.datetime.utcfromtimestamp(0) |
| 68 |
| 69 # Map the names of the parameters in the JSON channel description to |
| 70 # the parameter names we use in the Channel class. |
| 71 CHANNEL_PARAMS = { |
| 72 'address': 'address', |
| 73 'id': 'id', |
| 74 'expiration': 'expiration', |
| 75 'params': 'params', |
| 76 'resourceId': 'resource_id', |
| 77 'resourceUri': 'resource_uri', |
| 78 'type': 'type', |
| 79 'token': 'token', |
| 80 } |
| 81 |
| 82 X_GOOG_CHANNEL_ID = 'X-GOOG-CHANNEL-ID' |
| 83 X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER' |
| 84 X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE' |
| 85 X_GOOG_RESOURCE_URI = 'X-GOOG-RESOURCE-URI' |
| 86 X_GOOG_RESOURCE_ID = 'X-GOOG-RESOURCE-ID' |
| 87 |
| 88 |
| 89 def _upper_header_keys(headers): |
| 90 new_headers = {} |
| 91 for k, v in headers.iteritems(): |
| 92 new_headers[k.upper()] = v |
| 93 return new_headers |
| 94 |
| 95 |
| 96 class Notification(object): |
| 97 """A Notification from a Channel. |
| 98 |
| 99 Notifications are not usually constructed directly, but are returned |
| 100 from functions like notification_from_headers(). |
| 101 |
| 102 Attributes: |
| 103 message_number: int, The unique id number of this notification. |
| 104 state: str, The state of the resource being monitored. |
| 105 uri: str, The address of the resource being monitored. |
| 106 resource_id: str, The unique identifier of the version of the resource at |
| 107 this event. |
| 108 """ |
| 109 @util.positional(5) |
| 110 def __init__(self, message_number, state, resource_uri, resource_id): |
| 111 """Notification constructor. |
| 112 |
| 113 Args: |
| 114 message_number: int, The unique id number of this notification. |
| 115 state: str, The state of the resource being monitored. Can be one |
| 116 of "exists", "not_exists", or "sync". |
| 117 resource_uri: str, The address of the resource being monitored. |
| 118 resource_id: str, The identifier of the watched resource. |
| 119 """ |
| 120 self.message_number = message_number |
| 121 self.state = state |
| 122 self.resource_uri = resource_uri |
| 123 self.resource_id = resource_id |
| 124 |
| 125 |
| 126 class Channel(object): |
| 127 """A Channel for notifications. |
| 128 |
| 129 Usually not constructed directly, instead it is returned from helper |
| 130 functions like new_webhook_channel(). |
| 131 |
| 132 Attributes: |
| 133 type: str, The type of delivery mechanism used by this channel. For |
| 134 example, 'web_hook'. |
| 135 id: str, A UUID for the channel. |
| 136 token: str, An arbitrary string associated with the channel that |
| 137 is delivered to the target address with each event delivered |
| 138 over this channel. |
| 139 address: str, The address of the receiving entity where events are |
| 140 delivered. Specific to the channel type. |
| 141 expiration: int, The time, in milliseconds from the epoch, when this |
| 142 channel will expire. |
| 143 params: dict, A dictionary of string to string, with additional parameters |
| 144 controlling delivery channel behavior. |
| 145 resource_id: str, An opaque id that identifies the resource that is |
| 146 being watched. Stable across different API versions. |
| 147 resource_uri: str, The canonicalized ID of the watched resource. |
| 148 """ |
| 149 |
| 150 @util.positional(5) |
| 151 def __init__(self, type, id, token, address, expiration=None, |
| 152 params=None, resource_id="", resource_uri=""): |
| 153 """Create a new Channel. |
| 154 |
| 155 In user code, this Channel constructor will not typically be called |
| 156 manually since there are functions for creating channels for each specific |
| 157 type with a more customized set of arguments to pass. |
| 158 |
| 159 Args: |
| 160 type: str, The type of delivery mechanism used by this channel. For |
| 161 example, 'web_hook'. |
| 162 id: str, A UUID for the channel. |
| 163 token: str, An arbitrary string associated with the channel that |
| 164 is delivered to the target address with each event delivered |
| 165 over this channel. |
| 166 address: str, The address of the receiving entity where events are |
| 167 delivered. Specific to the channel type. |
| 168 expiration: int, The time, in milliseconds from the epoch, when this |
| 169 channel will expire. |
| 170 params: dict, A dictionary of string to string, with additional parameters |
| 171 controlling delivery channel behavior. |
| 172 resource_id: str, An opaque id that identifies the resource that is |
| 173 being watched. Stable across different API versions. |
| 174 resource_uri: str, The canonicalized ID of the watched resource. |
| 175 """ |
| 176 self.type = type |
| 177 self.id = id |
| 178 self.token = token |
| 179 self.address = address |
| 180 self.expiration = expiration |
| 181 self.params = params |
| 182 self.resource_id = resource_id |
| 183 self.resource_uri = resource_uri |
| 184 |
| 185 def body(self): |
| 186 """Build a body from the Channel. |
| 187 |
| 188 Constructs a dictionary that's appropriate for passing into watch() |
| 189 methods as the value of body argument. |
| 190 |
| 191 Returns: |
| 192 A dictionary representation of the channel. |
| 193 """ |
| 194 result = { |
| 195 'id': self.id, |
| 196 'token': self.token, |
| 197 'type': self.type, |
| 198 'address': self.address |
| 199 } |
| 200 if self.params: |
| 201 result['params'] = self.params |
| 202 if self.resource_id: |
| 203 result['resourceId'] = self.resource_id |
| 204 if self.resource_uri: |
| 205 result['resourceUri'] = self.resource_uri |
| 206 if self.expiration: |
| 207 result['expiration'] = self.expiration |
| 208 |
| 209 return result |
| 210 |
| 211 def update(self, resp): |
| 212 """Update a channel with information from the response of watch(). |
| 213 |
| 214 When a request is sent to watch() a resource, the response returned |
| 215 from the watch() request is a dictionary with updated channel information, |
| 216 such as the resource_id, which is needed when stopping a subscription. |
| 217 |
| 218 Args: |
| 219 resp: dict, The response from a watch() method. |
| 220 """ |
| 221 for json_name, param_name in CHANNEL_PARAMS.iteritems(): |
| 222 value = resp.get(json_name) |
| 223 if value is not None: |
| 224 setattr(self, param_name, value) |
| 225 |
| 226 |
| 227 def notification_from_headers(channel, headers): |
| 228 """Parse a notification from the webhook request headers, validate |
| 229 the notification, and return a Notification object. |
| 230 |
| 231 Args: |
| 232 channel: Channel, The channel that the notification is associated with. |
| 233 headers: dict, A dictionary like object that contains the request headers |
| 234 from the webhook HTTP request. |
| 235 |
| 236 Returns: |
| 237 A Notification object. |
| 238 |
| 239 Raises: |
| 240 errors.InvalidNotificationError if the notification is invalid. |
| 241 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int. |
| 242 """ |
| 243 headers = _upper_header_keys(headers) |
| 244 channel_id = headers[X_GOOG_CHANNEL_ID] |
| 245 if channel.id != channel_id: |
| 246 raise errors.InvalidNotificationError( |
| 247 'Channel id mismatch: %s != %s' % (channel.id, channel_id)) |
| 248 else: |
| 249 message_number = int(headers[X_GOOG_MESSAGE_NUMBER]) |
| 250 state = headers[X_GOOG_RESOURCE_STATE] |
| 251 resource_uri = headers[X_GOOG_RESOURCE_URI] |
| 252 resource_id = headers[X_GOOG_RESOURCE_ID] |
| 253 return Notification(message_number, state, resource_uri, resource_id) |
| 254 |
| 255 |
| 256 @util.positional(2) |
| 257 def new_webhook_channel(url, token=None, expiration=None, params=None): |
| 258 """Create a new webhook Channel. |
| 259 |
| 260 Args: |
| 261 url: str, URL to post notifications to. |
| 262 token: str, An arbitrary string associated with the channel that |
| 263 is delivered to the target address with each notification delivered |
| 264 over this channel. |
| 265 expiration: datetime.datetime, A time in the future when the channel |
| 266 should expire. Can also be None if the subscription should use the |
| 267 default expiration. Note that different services may have different |
| 268 limits on how long a subscription lasts. Check the response from the |
| 269 watch() method to see the value the service has set for an expiration |
| 270 time. |
| 271 params: dict, Extra parameters to pass on channel creation. Currently |
| 272 not used for webhook channels. |
| 273 """ |
| 274 expiration_ms = 0 |
| 275 if expiration: |
| 276 delta = expiration - EPOCH |
| 277 expiration_ms = delta.microseconds/1000 + ( |
| 278 delta.seconds + delta.days*24*3600)*1000 |
| 279 if expiration_ms < 0: |
| 280 expiration_ms = 0 |
| 281 |
| 282 return Channel('web_hook', str(uuid.uuid4()), |
| 283 token, url, expiration=expiration_ms, |
| 284 params=params) |
| 285 |
OLD | NEW |