Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(480)

Side by Side Diff: third_party/google_appengine_cloudstorage/cloudstorage/api_utils.py

Issue 139303023: add GCS support to docs server (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: bumped versions Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2013 Google Inc. All Rights Reserved.
2
3 """Util functions and classes for cloudstorage_api."""
4
5
6
7 __all__ = ['set_default_retry_params',
8 'RetryParams',
9 ]
10
11 import copy
12 import httplib
13 import logging
14 import math
15 import os
16 import threading
17 import time
18 import urllib
19
20
21 try:
22 from google.appengine.api import urlfetch
23 from google.appengine.datastore import datastore_rpc
24 from google.appengine.ext.ndb import eventloop
25 from google.appengine.ext.ndb import utils
26 from google.appengine import runtime
27 from google.appengine.runtime import apiproxy_errors
28 except ImportError:
29 from google.appengine.api import urlfetch
30 from google.appengine.datastore import datastore_rpc
31 from google.appengine import runtime
32 from google.appengine.runtime import apiproxy_errors
33 from google.appengine.ext.ndb import eventloop
34 from google.appengine.ext.ndb import utils
35
36
37 _RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError,
38 apiproxy_errors.Error)
39
40 _thread_local_settings = threading.local()
41 _thread_local_settings.default_retry_params = None
42
43
44 def set_default_retry_params(retry_params):
45 """Set a default RetryParams for current thread current request."""
46 _thread_local_settings.default_retry_params = copy.copy(retry_params)
47
48
49 def _get_default_retry_params():
50 """Get default RetryParams for current request and current thread.
51
52 Returns:
53 A new instance of the default RetryParams.
54 """
55 default = getattr(_thread_local_settings, 'default_retry_params', None)
56 if default is None or not default.belong_to_current_request():
57 return RetryParams()
58 else:
59 return copy.copy(default)
60
61
62 def _quote_filename(filename):
63 """Quotes filename to use as a valid URI path.
64
65 Args:
66 filename: user provided filename. /bucket/filename.
67
68 Returns:
69 The filename properly quoted to use as URI's path component.
70 """
71 return urllib.quote(filename)
72
73
74 def _should_retry(resp):
75 """Given a urlfetch response, decide whether to retry that request."""
76 return (resp.status_code == httplib.REQUEST_TIMEOUT or
77 (resp.status_code >= 500 and
78 resp.status_code < 600))
79
80
81 class RetryParams(object):
82 """Retry configuration parameters."""
83
84 @datastore_rpc._positional(1)
85 def __init__(self,
86 backoff_factor=2.0,
87 initial_delay=0.1,
88 max_delay=10.0,
89 min_retries=2,
90 max_retries=5,
91 max_retry_period=30.0,
92 urlfetch_timeout=None,
93 save_access_token=False):
94 """Init.
95
96 This object is unique per request per thread.
97
98 Library will retry according to this setting when App Engine Server
99 can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or
100 500-600 response.
101
102 Args:
103 backoff_factor: exponential backoff multiplier.
104 initial_delay: seconds to delay for the first retry.
105 max_delay: max seconds to delay for every retry.
106 min_retries: min number of times to retry. This value is automatically
107 capped by max_retries.
108 max_retries: max number of times to retry. Set this to 0 for no retry.
109 max_retry_period: max total seconds spent on retry. Retry stops when
110 this period passed AND min_retries has been attempted.
111 urlfetch_timeout: timeout for urlfetch in seconds. Could be None,
112 in which case the value will be chosen by urlfetch module.
113 save_access_token: persist access token to datastore to avoid
114 excessive usage of GetAccessToken API. Usually the token is cached
115 in process and in memcache. In some cases, memcache isn't very
116 reliable.
117 """
118 self.backoff_factor = self._check('backoff_factor', backoff_factor)
119 self.initial_delay = self._check('initial_delay', initial_delay)
120 self.max_delay = self._check('max_delay', max_delay)
121 self.max_retry_period = self._check('max_retry_period', max_retry_period)
122 self.max_retries = self._check('max_retries', max_retries, True, int)
123 self.min_retries = self._check('min_retries', min_retries, True, int)
124 if self.min_retries > self.max_retries:
125 self.min_retries = self.max_retries
126
127 self.urlfetch_timeout = None
128 if urlfetch_timeout is not None:
129 self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout)
130 self.save_access_token = self._check('save_access_token', save_access_token,
131 True, bool)
132
133 self._request_id = os.getenv('REQUEST_LOG_ID')
134
135 def __eq__(self, other):
136 if not isinstance(other, self.__class__):
137 return False
138 return self.__dict__ == other.__dict__
139
140 def __ne__(self, other):
141 return not self.__eq__(other)
142
143 @classmethod
144 def _check(cls, name, val, can_be_zero=False, val_type=float):
145 """Check init arguments.
146
147 Args:
148 name: name of the argument. For logging purpose.
149 val: value. Value has to be non negative number.
150 can_be_zero: whether value can be zero.
151 val_type: Python type of the value.
152
153 Returns:
154 The value.
155
156 Raises:
157 ValueError: when invalid value is passed in.
158 TypeError: when invalid value type is passed in.
159 """
160 valid_types = [val_type]
161 if val_type is float:
162 valid_types.append(int)
163
164 if type(val) not in valid_types:
165 raise TypeError(
166 'Expect type %s for parameter %s' % (val_type.__name__, name))
167 if val < 0:
168 raise ValueError(
169 'Value for parameter %s has to be greater than 0' % name)
170 if not can_be_zero and val == 0:
171 raise ValueError(
172 'Value for parameter %s can not be 0' % name)
173 return val
174
175 def belong_to_current_request(self):
176 return os.getenv('REQUEST_LOG_ID') == self._request_id
177
178 def delay(self, n, start_time):
179 """Calculate delay before the next retry.
180
181 Args:
182 n: the number of current attempt. The first attempt should be 1.
183 start_time: the time when retry started in unix time.
184
185 Returns:
186 Number of seconds to wait before next retry. -1 if retry should give up.
187 """
188 if (n > self.max_retries or
189 (n > self.min_retries and
190 time.time() - start_time > self.max_retry_period)):
191 return -1
192 return min(
193 math.pow(self.backoff_factor, n-1) * self.initial_delay,
194 self.max_delay)
195
196
197 def _retry_fetch(url, retry_params, **kwds):
198 """A blocking fetch function similar to urlfetch.fetch.
199
200 This function should be used when a urlfetch has timed out or the response
201 shows http request timeout. This function will put current thread to
202 sleep between retry backoffs.
203
204 Args:
205 url: url to fetch.
206 retry_params: an instance of RetryParams.
207 **kwds: keyword arguments for urlfetch. If deadline is specified in kwds,
208 it precedes the one in RetryParams. If none is specified, it's up to
209 urlfetch to use its own default.
210
211 Returns:
212 A urlfetch response from the last retry. None if no retry was attempted.
213
214 Raises:
215 Whatever exception encountered during the last retry.
216 """
217 n = 1
218 start_time = time.time()
219 delay = retry_params.delay(n, start_time)
220 if delay <= 0:
221 return
222
223 logging.info('Will retry request to %s.', url)
224 while delay > 0:
225 resp = None
226 try:
227 logging.info('Retry in %s seconds.', delay)
228 time.sleep(delay)
229 resp = urlfetch.fetch(url, **kwds)
230 except runtime.DeadlineExceededError:
231 logging.info(
232 'Urlfetch retry %s will exceed request deadline '
233 'after %s seconds total', n, time.time() - start_time)
234 raise
235 except _RETRIABLE_EXCEPTIONS, e:
236 pass
237
238 n += 1
239 delay = retry_params.delay(n, start_time)
240 if resp and not _should_retry(resp):
241 break
242 elif resp:
243 logging.info(
244 'Got status %s from GCS.', resp.status_code)
245 else:
246 logging.info(
247 'Got exception "%r" while contacting GCS.', e)
248
249 if resp:
250 return resp
251
252 logging.info('Urlfetch failed after %s retries and %s seconds in total.',
253 n - 1, time.time() - start_time)
254 raise
255
256
257 def _run_until_rpc():
258 """Eagerly evaluate tasklets until it is blocking on some RPC.
259
260 Usually ndb eventloop el isn't run until some code calls future.get_result().
261
262 When an async tasklet is called, the tasklet wrapper evaluates the tasklet
263 code into a generator, enqueues a callback _help_tasklet_along onto
264 the el.current queue, and returns a future.
265
266 _help_tasklet_along, when called by the el, will
267 get one yielded value from the generator. If the value if another future,
268 set up a callback _on_future_complete to invoke _help_tasklet_along
269 when the dependent future fulfills. If the value if a RPC, set up a
270 callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills.
271 Thus _help_tasklet_along drills down
272 the the chain of futures until some future is blocked by RPC. El runs
273 all callbacks and constantly check pending RPC status.
274 """
275 el = eventloop.get_event_loop()
276 while el.current:
277 el.run0()
278
279
280 def _eager_tasklet(tasklet):
281 """Decorator to turn tasklet to run eagerly."""
282
283 @utils.wrapping(tasklet)
284 def eager_wrapper(*args, **kwds):
285 fut = tasklet(*args, **kwds)
286 _run_until_rpc()
287 return fut
288
289 return eager_wrapper
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698