| OLD | NEW |
| (Empty) | |
| 1 # Copyright 2012 Google Inc. All Rights Reserved. |
| 2 |
| 3 """Base and helper classes for Google RESTful APIs.""" |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 __all__ = ['add_sync_methods'] |
| 10 |
| 11 import httplib |
| 12 import time |
| 13 |
| 14 from . import api_utils |
| 15 |
| 16 try: |
| 17 from google.appengine.api import app_identity |
| 18 from google.appengine.ext import ndb |
| 19 except ImportError: |
| 20 from google.appengine.api import app_identity |
| 21 from google.appengine.ext import ndb |
| 22 |
| 23 |
| 24 def _make_sync_method(name): |
| 25 """Helper to synthesize a synchronous method from an async method name. |
| 26 |
| 27 Used by the @add_sync_methods class decorator below. |
| 28 |
| 29 Args: |
| 30 name: The name of the synchronous method. |
| 31 |
| 32 Returns: |
| 33 A method (with first argument 'self') that retrieves and calls |
| 34 self.<name>, passing its own arguments, expects it to return a |
| 35 Future, and then waits for and returns that Future's result. |
| 36 """ |
| 37 |
| 38 def sync_wrapper(self, *args, **kwds): |
| 39 method = getattr(self, name) |
| 40 future = method(*args, **kwds) |
| 41 return future.get_result() |
| 42 |
| 43 return sync_wrapper |
| 44 |
| 45 |
| 46 def add_sync_methods(cls): |
| 47 """Class decorator to add synchronous methods corresponding to async methods. |
| 48 |
| 49 This modifies the class in place, adding additional methods to it. |
| 50 If a synchronous method of a given name already exists it is not |
| 51 replaced. |
| 52 |
| 53 Args: |
| 54 cls: A class. |
| 55 |
| 56 Returns: |
| 57 The same class, modified in place. |
| 58 """ |
| 59 for name in cls.__dict__.keys(): |
| 60 if name.endswith('_async'): |
| 61 sync_name = name[:-6] |
| 62 if not hasattr(cls, sync_name): |
| 63 setattr(cls, sync_name, _make_sync_method(name)) |
| 64 return cls |
| 65 |
| 66 |
| 67 class _AE_TokenStorage_(ndb.Model): |
| 68 """Entity to store app_identity tokens in memcache.""" |
| 69 |
| 70 token = ndb.StringProperty() |
| 71 expires = ndb.FloatProperty() |
| 72 |
| 73 |
| 74 @ndb.tasklet |
| 75 def _make_token_async(scopes, service_account_id): |
| 76 """Get a fresh authentication token. |
| 77 |
| 78 Args: |
| 79 scopes: A list of scopes. |
| 80 service_account_id: Internal-use only. |
| 81 |
| 82 Returns: |
| 83 An tuple (token, expiration_time) where expiration_time is |
| 84 seconds since the epoch. |
| 85 """ |
| 86 rpc = app_identity.create_rpc() |
| 87 app_identity.make_get_access_token_call(rpc, scopes, service_account_id) |
| 88 token, expires_at = yield rpc |
| 89 raise ndb.Return((token, expires_at)) |
| 90 |
| 91 |
| 92 class _RestApi(object): |
| 93 """Base class for REST-based API wrapper classes. |
| 94 |
| 95 This class manages authentication tokens and request retries. All |
| 96 APIs are available as synchronous and async methods; synchronous |
| 97 methods are synthesized from async ones by the add_sync_methods() |
| 98 function in this module. |
| 99 |
| 100 WARNING: Do NOT directly use this api. It's an implementation detail |
| 101 and is subject to change at any release. |
| 102 """ |
| 103 |
| 104 def __init__(self, scopes, service_account_id=None, token_maker=None, |
| 105 retry_params=None): |
| 106 """Constructor. |
| 107 |
| 108 Args: |
| 109 scopes: A scope or a list of scopes. |
| 110 token_maker: An asynchronous function of the form |
| 111 (scopes, service_account_id) -> (token, expires). |
| 112 retry_params: An instance of api_utils.RetryParams. If None, the |
| 113 default for current thread will be used. |
| 114 service_account_id: Internal use only. |
| 115 """ |
| 116 |
| 117 if isinstance(scopes, basestring): |
| 118 scopes = [scopes] |
| 119 self.scopes = scopes |
| 120 self.service_account_id = service_account_id |
| 121 self.make_token_async = token_maker or _make_token_async |
| 122 self.token = None |
| 123 if not retry_params: |
| 124 retry_params = api_utils._get_default_retry_params() |
| 125 self.retry_params = retry_params |
| 126 |
| 127 def __getstate__(self): |
| 128 """Store state as part of serialization/pickling.""" |
| 129 return {'token': self.token, |
| 130 'scopes': self.scopes, |
| 131 'id': self.service_account_id, |
| 132 'a_maker': None if self.make_token_async == _make_token_async |
| 133 else self.make_token_async, |
| 134 'retry_params': self.retry_params} |
| 135 |
| 136 def __setstate__(self, state): |
| 137 """Restore state as part of deserialization/unpickling.""" |
| 138 self.__init__(state['scopes'], |
| 139 service_account_id=state['id'], |
| 140 token_maker=state['a_maker'], |
| 141 retry_params=state['retry_params']) |
| 142 self.token = state['token'] |
| 143 |
| 144 @ndb.tasklet |
| 145 def do_request_async(self, url, method='GET', headers=None, payload=None, |
| 146 deadline=None, callback=None): |
| 147 """Issue one HTTP request. |
| 148 |
| 149 This is an async wrapper around urlfetch(). It adds an authentication |
| 150 header and retries on a 401 status code. Upon other retriable errors, |
| 151 it performs blocking retries. |
| 152 """ |
| 153 headers = {} if headers is None else dict(headers) |
| 154 if self.token is None: |
| 155 self.token = yield self.get_token_async() |
| 156 headers['authorization'] = 'OAuth ' + self.token |
| 157 |
| 158 deadline = deadline or self.retry_params.urlfetch_timeout |
| 159 |
| 160 retry = False |
| 161 resp = None |
| 162 try: |
| 163 resp = yield self.urlfetch_async(url, payload=payload, method=method, |
| 164 headers=headers, follow_redirects=False, |
| 165 deadline=deadline, callback=callback) |
| 166 if resp.status_code == httplib.UNAUTHORIZED: |
| 167 self.token = yield self.get_token_async(refresh=True) |
| 168 headers['authorization'] = 'OAuth ' + self.token |
| 169 resp = yield self.urlfetch_async( |
| 170 url, payload=payload, method=method, headers=headers, |
| 171 follow_redirects=False, deadline=deadline, callback=callback) |
| 172 except api_utils._RETRIABLE_EXCEPTIONS: |
| 173 retry = True |
| 174 else: |
| 175 retry = api_utils._should_retry(resp) |
| 176 |
| 177 if retry: |
| 178 retry_resp = api_utils._retry_fetch( |
| 179 url, retry_params=self.retry_params, payload=payload, method=method, |
| 180 headers=headers, follow_redirects=False, deadline=deadline) |
| 181 if retry_resp: |
| 182 resp = retry_resp |
| 183 elif not resp: |
| 184 raise |
| 185 |
| 186 raise ndb.Return((resp.status_code, resp.headers, resp.content)) |
| 187 |
| 188 @ndb.tasklet |
| 189 def get_token_async(self, refresh=False): |
| 190 """Get an authentication token. |
| 191 |
| 192 The token is cached in memcache, keyed by the scopes argument. |
| 193 |
| 194 Args: |
| 195 refresh: If True, ignore a cached token; default False. |
| 196 |
| 197 Returns: |
| 198 An authentication token. |
| 199 """ |
| 200 if self.token is not None and not refresh: |
| 201 raise ndb.Return(self.token) |
| 202 key = '%s,%s' % (self.service_account_id, ','.join(self.scopes)) |
| 203 ts = None |
| 204 if not refresh: |
| 205 ts = yield _AE_TokenStorage_.get_by_id_async( |
| 206 key, use_cache=True, use_memcache=True, |
| 207 use_datastore=self.retry_params.save_access_token) |
| 208 if ts is None or ts.expires < (time.time() + 60): |
| 209 token, expires_at = yield self.make_token_async( |
| 210 self.scopes, self.service_account_id) |
| 211 timeout = int(expires_at - time.time()) |
| 212 ts = _AE_TokenStorage_(id=key, token=token, expires=expires_at) |
| 213 if timeout > 0: |
| 214 yield ts.put_async(memcache_timeout=timeout, |
| 215 use_datastore=self.retry_params.save_access_token, |
| 216 use_cache=True, use_memcache=True) |
| 217 self.token = ts.token |
| 218 raise ndb.Return(self.token) |
| 219 |
| 220 def urlfetch_async(self, url, **kwds): |
| 221 """Make an async urlfetch() call. |
| 222 |
| 223 This just passes the url and keyword arguments to NDB's async |
| 224 urlfetch() wrapper in the current context. |
| 225 |
| 226 This returns a Future despite not being decorated with @ndb.tasklet! |
| 227 """ |
| 228 ctx = ndb.get_context() |
| 229 return ctx.urlfetch(url, **kwds) |
| 230 |
| 231 |
| 232 _RestApi = add_sync_methods(_RestApi) |
| OLD | NEW |