OLD | NEW |
(Empty) | |
| 1 # Copyright 2012 Google Inc. All Rights Reserved. |
| 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at |
| 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # |
| 9 # Unless required by applicable law or agreed to in writing, |
| 10 # software distributed under the License is distributed on an |
| 11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, |
| 12 # either express or implied. See the License for the specific |
| 13 # language governing permissions and limitations under the License. |
| 14 |
| 15 """Base and helper classes for Google RESTful APIs.""" |
| 16 |
| 17 |
| 18 |
| 19 |
| 20 |
| 21 __all__ = ['add_sync_methods'] |
| 22 |
| 23 import random |
| 24 import time |
| 25 |
| 26 from . import api_utils |
| 27 |
| 28 try: |
| 29 from google.appengine.api import app_identity |
| 30 from google.appengine.ext import ndb |
| 31 except ImportError: |
| 32 from google.appengine.api import app_identity |
| 33 from google.appengine.ext import ndb |
| 34 |
| 35 |
| 36 |
| 37 def _make_sync_method(name): |
| 38 """Helper to synthesize a synchronous method from an async method name. |
| 39 |
| 40 Used by the @add_sync_methods class decorator below. |
| 41 |
| 42 Args: |
| 43 name: The name of the synchronous method. |
| 44 |
| 45 Returns: |
| 46 A method (with first argument 'self') that retrieves and calls |
| 47 self.<name>, passing its own arguments, expects it to return a |
| 48 Future, and then waits for and returns that Future's result. |
| 49 """ |
| 50 |
| 51 def sync_wrapper(self, *args, **kwds): |
| 52 method = getattr(self, name) |
| 53 future = method(*args, **kwds) |
| 54 return future.get_result() |
| 55 |
| 56 return sync_wrapper |
| 57 |
| 58 |
| 59 def add_sync_methods(cls): |
| 60 """Class decorator to add synchronous methods corresponding to async methods. |
| 61 |
| 62 This modifies the class in place, adding additional methods to it. |
| 63 If a synchronous method of a given name already exists it is not |
| 64 replaced. |
| 65 |
| 66 Args: |
| 67 cls: A class. |
| 68 |
| 69 Returns: |
| 70 The same class, modified in place. |
| 71 """ |
| 72 for name in cls.__dict__.keys(): |
| 73 if name.endswith('_async'): |
| 74 sync_name = name[:-6] |
| 75 if not hasattr(cls, sync_name): |
| 76 setattr(cls, sync_name, _make_sync_method(name)) |
| 77 return cls |
| 78 |
| 79 |
| 80 class _AE_TokenStorage_(ndb.Model): |
| 81 """Entity to store app_identity tokens in memcache.""" |
| 82 |
| 83 token = ndb.StringProperty() |
| 84 expires = ndb.FloatProperty() |
| 85 |
| 86 |
| 87 @ndb.tasklet |
| 88 def _make_token_async(scopes, service_account_id): |
| 89 """Get a fresh authentication token. |
| 90 |
| 91 Args: |
| 92 scopes: A list of scopes. |
| 93 service_account_id: Internal-use only. |
| 94 |
| 95 Raises: |
| 96 An ndb.Return with a tuple (token, expiration_time) where expiration_time is |
| 97 seconds since the epoch. |
| 98 """ |
| 99 rpc = app_identity.create_rpc() |
| 100 app_identity.make_get_access_token_call(rpc, scopes, service_account_id) |
| 101 token, expires_at = yield rpc |
| 102 raise ndb.Return((token, expires_at)) |
| 103 |
| 104 |
| 105 class _RestApi(object): |
| 106 """Base class for REST-based API wrapper classes. |
| 107 |
| 108 This class manages authentication tokens and request retries. All |
| 109 APIs are available as synchronous and async methods; synchronous |
| 110 methods are synthesized from async ones by the add_sync_methods() |
| 111 function in this module. |
| 112 |
| 113 WARNING: Do NOT directly use this api. It's an implementation detail |
| 114 and is subject to change at any release. |
| 115 """ |
| 116 |
| 117 def __init__(self, scopes, service_account_id=None, token_maker=None, |
| 118 retry_params=None): |
| 119 """Constructor. |
| 120 |
| 121 Args: |
| 122 scopes: A scope or a list of scopes. |
| 123 service_account_id: Internal use only. |
| 124 token_maker: An asynchronous function of the form |
| 125 (scopes, service_account_id) -> (token, expires). |
| 126 retry_params: An instance of api_utils.RetryParams. If None, the |
| 127 default for current thread will be used. |
| 128 """ |
| 129 |
| 130 if isinstance(scopes, basestring): |
| 131 scopes = [scopes] |
| 132 self.scopes = scopes |
| 133 self.service_account_id = service_account_id |
| 134 self.make_token_async = token_maker or _make_token_async |
| 135 if not retry_params: |
| 136 retry_params = api_utils._get_default_retry_params() |
| 137 self.retry_params = retry_params |
| 138 self.user_agent = {'User-Agent': retry_params._user_agent} |
| 139 self.expiration_headroom = random.randint(60, 240) |
| 140 |
| 141 def __getstate__(self): |
| 142 """Store state as part of serialization/pickling.""" |
| 143 return {'scopes': self.scopes, |
| 144 'id': self.service_account_id, |
| 145 'a_maker': (None if self.make_token_async == _make_token_async |
| 146 else self.make_token_async), |
| 147 'retry_params': self.retry_params, |
| 148 'expiration_headroom': self.expiration_headroom} |
| 149 |
| 150 def __setstate__(self, state): |
| 151 """Restore state as part of deserialization/unpickling.""" |
| 152 self.__init__(state['scopes'], |
| 153 service_account_id=state['id'], |
| 154 token_maker=state['a_maker'], |
| 155 retry_params=state['retry_params']) |
| 156 self.expiration_headroom = state['expiration_headroom'] |
| 157 |
| 158 @ndb.tasklet |
| 159 def do_request_async(self, url, method='GET', headers=None, payload=None, |
| 160 deadline=None, callback=None): |
| 161 """Issue one HTTP request. |
| 162 |
| 163 It performs async retries using tasklets. |
| 164 |
| 165 Args: |
| 166 url: the url to fetch. |
| 167 method: the method in which to fetch. |
| 168 headers: the http headers. |
| 169 payload: the data to submit in the fetch. |
| 170 deadline: the deadline in which to make the call. |
| 171 callback: the call to make once completed. |
| 172 |
| 173 Yields: |
| 174 The async fetch of the url. |
| 175 """ |
| 176 retry_wrapper = api_utils._RetryWrapper( |
| 177 self.retry_params, |
| 178 retriable_exceptions=api_utils._RETRIABLE_EXCEPTIONS, |
| 179 should_retry=api_utils._should_retry) |
| 180 resp = yield retry_wrapper.run( |
| 181 self.urlfetch_async, |
| 182 url=url, |
| 183 method=method, |
| 184 headers=headers, |
| 185 payload=payload, |
| 186 deadline=deadline, |
| 187 callback=callback, |
| 188 follow_redirects=False) |
| 189 raise ndb.Return((resp.status_code, resp.headers, resp.content)) |
| 190 |
| 191 @ndb.tasklet |
| 192 def get_token_async(self, refresh=False): |
| 193 """Get an authentication token. |
| 194 |
| 195 The token is cached in memcache, keyed by the scopes argument. |
| 196 Uses a random token expiration headroom value generated in the constructor |
| 197 to eliminate a burst of GET_ACCESS_TOKEN API requests. |
| 198 |
| 199 Args: |
| 200 refresh: If True, ignore a cached token; default False. |
| 201 |
| 202 Yields: |
| 203 An authentication token. This token is guaranteed to be non-expired. |
| 204 """ |
| 205 key = '%s,%s' % (self.service_account_id, ','.join(self.scopes)) |
| 206 ts = yield _AE_TokenStorage_.get_by_id_async( |
| 207 key, use_cache=True, use_memcache=True, |
| 208 use_datastore=self.retry_params.save_access_token) |
| 209 if refresh or ts is None or ts.expires < ( |
| 210 time.time() + self.expiration_headroom): |
| 211 token, expires_at = yield self.make_token_async( |
| 212 self.scopes, self.service_account_id) |
| 213 timeout = int(expires_at - time.time()) |
| 214 ts = _AE_TokenStorage_(id=key, token=token, expires=expires_at) |
| 215 if timeout > 0: |
| 216 yield ts.put_async(memcache_timeout=timeout, |
| 217 use_datastore=self.retry_params.save_access_token, |
| 218 use_cache=True, use_memcache=True) |
| 219 raise ndb.Return(ts.token) |
| 220 |
| 221 @ndb.tasklet |
| 222 def urlfetch_async(self, url, method='GET', headers=None, |
| 223 payload=None, deadline=None, callback=None, |
| 224 follow_redirects=False): |
| 225 """Make an async urlfetch() call. |
| 226 |
| 227 This is an async wrapper around urlfetch(). It adds an authentication |
| 228 header. |
| 229 |
| 230 Args: |
| 231 url: the url to fetch. |
| 232 method: the method in which to fetch. |
| 233 headers: the http headers. |
| 234 payload: the data to submit in the fetch. |
| 235 deadline: the deadline in which to make the call. |
| 236 callback: the call to make once completed. |
| 237 follow_redirects: whether or not to follow redirects. |
| 238 |
| 239 Yields: |
| 240 This returns a Future despite not being decorated with @ndb.tasklet! |
| 241 """ |
| 242 headers = {} if headers is None else dict(headers) |
| 243 headers.update(self.user_agent) |
| 244 self.token = yield self.get_token_async() |
| 245 if self.token: |
| 246 headers['authorization'] = 'OAuth ' + self.token |
| 247 |
| 248 deadline = deadline or self.retry_params.urlfetch_timeout |
| 249 |
| 250 ctx = ndb.get_context() |
| 251 resp = yield ctx.urlfetch( |
| 252 url, payload=payload, method=method, |
| 253 headers=headers, follow_redirects=follow_redirects, |
| 254 deadline=deadline, callback=callback) |
| 255 raise ndb.Return(resp) |
| 256 |
| 257 |
| 258 _RestApi = add_sync_methods(_RestApi) |
OLD | NEW |