| OLD | NEW |
| (Empty) |
| 1 """Channel notifications support. | |
| 2 | |
| 3 Classes and functions to support channel subscriptions and notifications | |
| 4 on those channels. | |
| 5 | |
| 6 Notes: | |
| 7 - This code is based on experimental APIs and is subject to change. | |
| 8 - Notification does not do deduplication of notification ids, that's up to | |
| 9 the receiver. | |
| 10 - Storing the Channel between calls is up to the caller. | |
| 11 | |
| 12 | |
| 13 Example setting up a channel: | |
| 14 | |
| 15 # Create a new channel that gets notifications via webhook. | |
| 16 channel = new_webhook_channel("https://example.com/my_web_hook") | |
| 17 | |
| 18 # Store the channel, keyed by 'channel.id'. Store it before calling the | |
| 19 # watch method because notifications may start arriving before the watch | |
| 20 # method returns. | |
| 21 ... | |
| 22 | |
| 23 resp = service.objects().watchAll( | |
| 24 bucket="some_bucket_id", body=channel.body()).execute() | |
| 25 channel.update(resp) | |
| 26 | |
| 27 # Store the channel, keyed by 'channel.id'. Store it after being updated | |
| 28 # since the resource_id value will now be correct, and that's needed to | |
| 29 # stop a subscription. | |
| 30 ... | |
| 31 | |
| 32 | |
| 33 An example Webhook implementation using webapp2. Note that webapp2 puts | |
| 34 headers in a case insensitive dictionary, as headers aren't guaranteed to | |
| 35 always be upper case. | |
| 36 | |
| 37 id = self.request.headers[X_GOOG_CHANNEL_ID] | |
| 38 | |
| 39 # Retrieve the channel by id. | |
| 40 channel = ... | |
| 41 | |
| 42 # Parse notification from the headers, including validating the id. | |
| 43 n = notification_from_headers(channel, self.request.headers) | |
| 44 | |
| 45 # Do app specific stuff with the notification here. | |
| 46 if n.resource_state == 'sync': | |
| 47 # Code to handle sync state. | |
| 48 elif n.resource_state == 'exists': | |
| 49 # Code to handle the exists state. | |
| 50 elif n.resource_state == 'not_exists': | |
| 51 # Code to handle the not exists state. | |
| 52 | |
| 53 | |
| 54 Example of unsubscribing. | |
| 55 | |
| 56 service.channels().stop(channel.body()) | |
| 57 """ | |
| 58 | |
| 59 import datetime | |
| 60 import uuid | |
| 61 | |
| 62 from googleapiclient import errors | |
| 63 from ...oauth2client import util | |
| 64 | |
| 65 | |
| 66 # The unix time epoch starts at midnight 1970. | |
| 67 EPOCH = datetime.datetime.utcfromtimestamp(0) | |
| 68 | |
| 69 # Map the names of the parameters in the JSON channel description to | |
| 70 # the parameter names we use in the Channel class. | |
| 71 CHANNEL_PARAMS = { | |
| 72 'address': 'address', | |
| 73 'id': 'id', | |
| 74 'expiration': 'expiration', | |
| 75 'params': 'params', | |
| 76 'resourceId': 'resource_id', | |
| 77 'resourceUri': 'resource_uri', | |
| 78 'type': 'type', | |
| 79 'token': 'token', | |
| 80 } | |
| 81 | |
| 82 X_GOOG_CHANNEL_ID = 'X-GOOG-CHANNEL-ID' | |
| 83 X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER' | |
| 84 X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE' | |
| 85 X_GOOG_RESOURCE_URI = 'X-GOOG-RESOURCE-URI' | |
| 86 X_GOOG_RESOURCE_ID = 'X-GOOG-RESOURCE-ID' | |
| 87 | |
| 88 | |
| 89 def _upper_header_keys(headers): | |
| 90 new_headers = {} | |
| 91 for k, v in headers.iteritems(): | |
| 92 new_headers[k.upper()] = v | |
| 93 return new_headers | |
| 94 | |
| 95 | |
| 96 class Notification(object): | |
| 97 """A Notification from a Channel. | |
| 98 | |
| 99 Notifications are not usually constructed directly, but are returned | |
| 100 from functions like notification_from_headers(). | |
| 101 | |
| 102 Attributes: | |
| 103 message_number: int, The unique id number of this notification. | |
| 104 state: str, The state of the resource being monitored. | |
| 105 uri: str, The address of the resource being monitored. | |
| 106 resource_id: str, The unique identifier of the version of the resource at | |
| 107 this event. | |
| 108 """ | |
| 109 @util.positional(5) | |
| 110 def __init__(self, message_number, state, resource_uri, resource_id): | |
| 111 """Notification constructor. | |
| 112 | |
| 113 Args: | |
| 114 message_number: int, The unique id number of this notification. | |
| 115 state: str, The state of the resource being monitored. Can be one | |
| 116 of "exists", "not_exists", or "sync". | |
| 117 resource_uri: str, The address of the resource being monitored. | |
| 118 resource_id: str, The identifier of the watched resource. | |
| 119 """ | |
| 120 self.message_number = message_number | |
| 121 self.state = state | |
| 122 self.resource_uri = resource_uri | |
| 123 self.resource_id = resource_id | |
| 124 | |
| 125 | |
| 126 class Channel(object): | |
| 127 """A Channel for notifications. | |
| 128 | |
| 129 Usually not constructed directly, instead it is returned from helper | |
| 130 functions like new_webhook_channel(). | |
| 131 | |
| 132 Attributes: | |
| 133 type: str, The type of delivery mechanism used by this channel. For | |
| 134 example, 'web_hook'. | |
| 135 id: str, A UUID for the channel. | |
| 136 token: str, An arbitrary string associated with the channel that | |
| 137 is delivered to the target address with each event delivered | |
| 138 over this channel. | |
| 139 address: str, The address of the receiving entity where events are | |
| 140 delivered. Specific to the channel type. | |
| 141 expiration: int, The time, in milliseconds from the epoch, when this | |
| 142 channel will expire. | |
| 143 params: dict, A dictionary of string to string, with additional parameters | |
| 144 controlling delivery channel behavior. | |
| 145 resource_id: str, An opaque id that identifies the resource that is | |
| 146 being watched. Stable across different API versions. | |
| 147 resource_uri: str, The canonicalized ID of the watched resource. | |
| 148 """ | |
| 149 | |
| 150 @util.positional(5) | |
| 151 def __init__(self, type, id, token, address, expiration=None, | |
| 152 params=None, resource_id="", resource_uri=""): | |
| 153 """Create a new Channel. | |
| 154 | |
| 155 In user code, this Channel constructor will not typically be called | |
| 156 manually since there are functions for creating channels for each specific | |
| 157 type with a more customized set of arguments to pass. | |
| 158 | |
| 159 Args: | |
| 160 type: str, The type of delivery mechanism used by this channel. For | |
| 161 example, 'web_hook'. | |
| 162 id: str, A UUID for the channel. | |
| 163 token: str, An arbitrary string associated with the channel that | |
| 164 is delivered to the target address with each event delivered | |
| 165 over this channel. | |
| 166 address: str, The address of the receiving entity where events are | |
| 167 delivered. Specific to the channel type. | |
| 168 expiration: int, The time, in milliseconds from the epoch, when this | |
| 169 channel will expire. | |
| 170 params: dict, A dictionary of string to string, with additional parameters | |
| 171 controlling delivery channel behavior. | |
| 172 resource_id: str, An opaque id that identifies the resource that is | |
| 173 being watched. Stable across different API versions. | |
| 174 resource_uri: str, The canonicalized ID of the watched resource. | |
| 175 """ | |
| 176 self.type = type | |
| 177 self.id = id | |
| 178 self.token = token | |
| 179 self.address = address | |
| 180 self.expiration = expiration | |
| 181 self.params = params | |
| 182 self.resource_id = resource_id | |
| 183 self.resource_uri = resource_uri | |
| 184 | |
| 185 def body(self): | |
| 186 """Build a body from the Channel. | |
| 187 | |
| 188 Constructs a dictionary that's appropriate for passing into watch() | |
| 189 methods as the value of body argument. | |
| 190 | |
| 191 Returns: | |
| 192 A dictionary representation of the channel. | |
| 193 """ | |
| 194 result = { | |
| 195 'id': self.id, | |
| 196 'token': self.token, | |
| 197 'type': self.type, | |
| 198 'address': self.address | |
| 199 } | |
| 200 if self.params: | |
| 201 result['params'] = self.params | |
| 202 if self.resource_id: | |
| 203 result['resourceId'] = self.resource_id | |
| 204 if self.resource_uri: | |
| 205 result['resourceUri'] = self.resource_uri | |
| 206 if self.expiration: | |
| 207 result['expiration'] = self.expiration | |
| 208 | |
| 209 return result | |
| 210 | |
| 211 def update(self, resp): | |
| 212 """Update a channel with information from the response of watch(). | |
| 213 | |
| 214 When a request is sent to watch() a resource, the response returned | |
| 215 from the watch() request is a dictionary with updated channel information, | |
| 216 such as the resource_id, which is needed when stopping a subscription. | |
| 217 | |
| 218 Args: | |
| 219 resp: dict, The response from a watch() method. | |
| 220 """ | |
| 221 for json_name, param_name in CHANNEL_PARAMS.iteritems(): | |
| 222 value = resp.get(json_name) | |
| 223 if value is not None: | |
| 224 setattr(self, param_name, value) | |
| 225 | |
| 226 | |
| 227 def notification_from_headers(channel, headers): | |
| 228 """Parse a notification from the webhook request headers, validate | |
| 229 the notification, and return a Notification object. | |
| 230 | |
| 231 Args: | |
| 232 channel: Channel, The channel that the notification is associated with. | |
| 233 headers: dict, A dictionary like object that contains the request headers | |
| 234 from the webhook HTTP request. | |
| 235 | |
| 236 Returns: | |
| 237 A Notification object. | |
| 238 | |
| 239 Raises: | |
| 240 errors.InvalidNotificationError if the notification is invalid. | |
| 241 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int. | |
| 242 """ | |
| 243 headers = _upper_header_keys(headers) | |
| 244 channel_id = headers[X_GOOG_CHANNEL_ID] | |
| 245 if channel.id != channel_id: | |
| 246 raise errors.InvalidNotificationError( | |
| 247 'Channel id mismatch: %s != %s' % (channel.id, channel_id)) | |
| 248 else: | |
| 249 message_number = int(headers[X_GOOG_MESSAGE_NUMBER]) | |
| 250 state = headers[X_GOOG_RESOURCE_STATE] | |
| 251 resource_uri = headers[X_GOOG_RESOURCE_URI] | |
| 252 resource_id = headers[X_GOOG_RESOURCE_ID] | |
| 253 return Notification(message_number, state, resource_uri, resource_id) | |
| 254 | |
| 255 | |
| 256 @util.positional(2) | |
| 257 def new_webhook_channel(url, token=None, expiration=None, params=None): | |
| 258 """Create a new webhook Channel. | |
| 259 | |
| 260 Args: | |
| 261 url: str, URL to post notifications to. | |
| 262 token: str, An arbitrary string associated with the channel that | |
| 263 is delivered to the target address with each notification delivered | |
| 264 over this channel. | |
| 265 expiration: datetime.datetime, A time in the future when the channel | |
| 266 should expire. Can also be None if the subscription should use the | |
| 267 default expiration. Note that different services may have different | |
| 268 limits on how long a subscription lasts. Check the response from the | |
| 269 watch() method to see the value the service has set for an expiration | |
| 270 time. | |
| 271 params: dict, Extra parameters to pass on channel creation. Currently | |
| 272 not used for webhook channels. | |
| 273 """ | |
| 274 expiration_ms = 0 | |
| 275 if expiration: | |
| 276 delta = expiration - EPOCH | |
| 277 expiration_ms = delta.microseconds/1000 + ( | |
| 278 delta.seconds + delta.days*24*3600)*1000 | |
| 279 if expiration_ms < 0: | |
| 280 expiration_ms = 0 | |
| 281 | |
| 282 return Channel('web_hook', str(uuid.uuid4()), | |
| 283 token, url, expiration=expiration_ms, | |
| 284 params=params) | |
| 285 | |
| OLD | NEW |