OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2014 Google Inc. All Rights Reserved. | |
3 # | |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 # you may not use this file except in compliance with the License. | |
6 # You may obtain a copy of the License at | |
7 # | |
8 # http://www.apache.org/licenses/LICENSE-2.0 | |
9 # | |
10 # Unless required by applicable law or agreed to in writing, software | |
11 # distributed under the License is distributed on an "AS IS" BASIS, | |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 # See the License for the specific language governing permissions and | |
14 # limitations under the License. | |
15 """Media helper functions and classes for Google Cloud Storage JSON API.""" | |
16 | |
17 from __future__ import absolute_import | |
18 | |
19 import copy | |
20 import cStringIO | |
21 import httplib | |
22 import logging | |
23 import socket | |
24 import types | |
25 import urlparse | |
26 | |
27 from apitools.base.py import exceptions as apitools_exceptions | |
28 import httplib2 | |
29 from httplib2 import parse_uri | |
30 | |
31 from gslib.cloud_api import BadRequestException | |
32 from gslib.progress_callback import ProgressCallbackWithBackoff | |
33 from gslib.util import SSL_TIMEOUT | |
34 from gslib.util import TRANSFER_BUFFER_SIZE | |
35 | |
36 | |
37 class BytesTransferredContainer(object): | |
38 """Container class for passing number of bytes transferred to lower layers. | |
39 | |
40 For resumed transfers or connection rebuilds in the middle of a transfer, we | |
41 need to rebuild the connection class with how much we've transferred so far. | |
42 For uploads, we don't know the total number of bytes uploaded until we've | |
43 queried the server, but we need to create the connection class to pass to | |
44 httplib2 before we can query the server. This container object allows us to | |
45 pass a reference into Upload/DownloadCallbackConnection. | |
46 """ | |
47 | |
48 def __init__(self): | |
49 self.__bytes_transferred = 0 | |
50 | |
51 @property | |
52 def bytes_transferred(self): | |
53 return self.__bytes_transferred | |
54 | |
55 @bytes_transferred.setter | |
56 def bytes_transferred(self, value): | |
57 self.__bytes_transferred = value | |
58 | |
59 | |
60 class UploadCallbackConnectionClassFactory(object): | |
61 """Creates a class that can override an httplib2 connection. | |
62 | |
63 This is used to provide progress callbacks and disable dumping the upload | |
64 payload during debug statements. It can later be used to provide on-the-fly | |
65 hash digestion during upload. | |
66 """ | |
67 | |
68 def __init__(self, bytes_uploaded_container, | |
69 buffer_size=TRANSFER_BUFFER_SIZE, | |
70 total_size=0, progress_callback=None): | |
71 self.bytes_uploaded_container = bytes_uploaded_container | |
72 self.buffer_size = buffer_size | |
73 self.total_size = total_size | |
74 self.progress_callback = progress_callback | |
75 | |
76 def GetConnectionClass(self): | |
77 """Returns a connection class that overrides send.""" | |
78 outer_bytes_uploaded_container = self.bytes_uploaded_container | |
79 outer_buffer_size = self.buffer_size | |
80 outer_total_size = self.total_size | |
81 outer_progress_callback = self.progress_callback | |
82 | |
83 class UploadCallbackConnection(httplib2.HTTPSConnectionWithTimeout): | |
84 """Connection class override for uploads.""" | |
85 bytes_uploaded_container = outer_bytes_uploaded_container | |
86 # After we instantiate this class, apitools will check with the server | |
87 # to find out how many bytes remain for a resumable upload. This allows | |
88 # us to update our progress once based on that number. | |
89 processed_initial_bytes = False | |
90 GCS_JSON_BUFFER_SIZE = outer_buffer_size | |
91 callback_processor = None | |
92 size = outer_total_size | |
93 | |
94 def __init__(self, *args, **kwargs): | |
95 kwargs['timeout'] = SSL_TIMEOUT | |
96 httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs) | |
97 | |
98 def send(self, data): | |
99 """Overrides HTTPConnection.send.""" | |
100 if not self.processed_initial_bytes: | |
101 self.processed_initial_bytes = True | |
102 if outer_progress_callback: | |
103 self.callback_processor = ProgressCallbackWithBackoff( | |
104 outer_total_size, outer_progress_callback) | |
105 self.callback_processor.Progress( | |
106 self.bytes_uploaded_container.bytes_transferred) | |
107 # httplib.HTTPConnection.send accepts either a string or a file-like | |
108 # object (anything that implements read()). | |
109 if isinstance(data, basestring): | |
110 full_buffer = cStringIO.StringIO(data) | |
111 else: | |
112 full_buffer = data | |
113 partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE) | |
114 while partial_buffer: | |
115 httplib2.HTTPSConnectionWithTimeout.send(self, partial_buffer) | |
116 send_length = len(partial_buffer) | |
117 if self.callback_processor: | |
118 # This is the only place where gsutil has control over making a | |
119 # callback, but here we can't differentiate the metadata bytes | |
120 # (such as headers and OAuth2 refreshes) sent during an upload | |
121 # from the actual upload bytes, so we will actually report | |
122 # slightly more bytes than desired to the callback handler. | |
123 # | |
124 # One considered/rejected alternative is to move the callbacks | |
125 # into the HashingFileUploadWrapper which only processes reads on | |
126 # the bytes. This has the disadvantages of being removed from | |
127 # where we actually send the bytes and unnecessarily | |
128 # multi-purposing that class. | |
129 self.callback_processor.Progress(send_length) | |
130 partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE) | |
131 | |
132 return UploadCallbackConnection | |
133 | |
134 | |
135 def WrapUploadHttpRequest(upload_http): | |
136 """Wraps upload_http so we only use our custom connection_type on PUTs. | |
137 | |
138 POSTs are used to refresh oauth tokens, and we don't want to process the | |
139 data sent in those requests. | |
140 | |
141 Args: | |
142 upload_http: httplib2.Http instance to wrap | |
143 """ | |
144 request_orig = upload_http.request | |
145 def NewRequest(uri, method='GET', body=None, headers=None, | |
146 redirections=httplib2.DEFAULT_MAX_REDIRECTS, | |
147 connection_type=None): | |
148 if method == 'PUT' or method == 'POST': | |
149 override_connection_type = connection_type | |
150 else: | |
151 override_connection_type = None | |
152 return request_orig(uri, method=method, body=body, | |
153 headers=headers, redirections=redirections, | |
154 connection_type=override_connection_type) | |
155 # Replace the request method with our own closure. | |
156 upload_http.request = NewRequest | |
157 | |
158 | |
159 class DownloadCallbackConnectionClassFactory(object): | |
160 """Creates a class that can override an httplib2 connection. | |
161 | |
162 This is used to provide progress callbacks, disable dumping the download | |
163 payload during debug statements, and provide on-the-fly hash digestion during | |
164 download. On-the-fly digestion is particularly important because httplib2 | |
165 will decompress gzipped content on-the-fly, thus this class provides our | |
166 only opportunity to calculate the correct hash for an object that has a | |
167 gzip hash in the cloud. | |
168 """ | |
169 | |
170 def __init__(self, bytes_downloaded_container, | |
171 buffer_size=TRANSFER_BUFFER_SIZE, total_size=0, | |
172 progress_callback=None, digesters=None): | |
173 self.buffer_size = buffer_size | |
174 self.total_size = total_size | |
175 self.progress_callback = progress_callback | |
176 self.digesters = digesters | |
177 self.bytes_downloaded_container = bytes_downloaded_container | |
178 | |
179 def GetConnectionClass(self): | |
180 """Returns a connection class that overrides getresponse.""" | |
181 | |
182 class DownloadCallbackConnection(httplib2.HTTPSConnectionWithTimeout): | |
183 """Connection class override for downloads.""" | |
184 outer_total_size = self.total_size | |
185 outer_digesters = self.digesters | |
186 outer_progress_callback = self.progress_callback | |
187 outer_bytes_downloaded_container = self.bytes_downloaded_container | |
188 processed_initial_bytes = False | |
189 callback_processor = None | |
190 | |
191 def __init__(self, *args, **kwargs): | |
192 kwargs['timeout'] = SSL_TIMEOUT | |
193 httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs) | |
194 | |
195 def getresponse(self, buffering=False): | |
196 """Wraps an HTTPResponse to perform callbacks and hashing. | |
197 | |
198 In this function, self is a DownloadCallbackConnection. | |
199 | |
200 Args: | |
201 buffering: Unused. This function uses a local buffer. | |
202 | |
203 Returns: | |
204 HTTPResponse object with wrapped read function. | |
205 """ | |
206 orig_response = httplib.HTTPConnection.getresponse(self) | |
207 if orig_response.status not in (httplib.OK, httplib.PARTIAL_CONTENT): | |
208 return orig_response | |
209 orig_read_func = orig_response.read | |
210 | |
211 def read(amt=None): # pylint: disable=invalid-name | |
212 """Overrides HTTPConnection.getresponse.read. | |
213 | |
214 This function only supports reads of TRANSFER_BUFFER_SIZE or smaller. | |
215 | |
216 Args: | |
217 amt: Integer n where 0 < n <= TRANSFER_BUFFER_SIZE. This is a | |
218 keyword argument to match the read function it overrides, | |
219 but it is required. | |
220 | |
221 Returns: | |
222 Data read from HTTPConnection. | |
223 """ | |
224 if not amt or amt > TRANSFER_BUFFER_SIZE: | |
225 raise BadRequestException( | |
226 'Invalid HTTP read size %s during download, expected %s.' % | |
227 (amt, TRANSFER_BUFFER_SIZE)) | |
228 else: | |
229 amt = amt or TRANSFER_BUFFER_SIZE | |
230 | |
231 if not self.processed_initial_bytes: | |
232 self.processed_initial_bytes = True | |
233 if self.outer_progress_callback: | |
234 self.callback_processor = ProgressCallbackWithBackoff( | |
235 self.outer_total_size, self.outer_progress_callback) | |
236 self.callback_processor.Progress( | |
237 self.outer_bytes_downloaded_container.bytes_transferred) | |
238 | |
239 data = orig_read_func(amt) | |
240 read_length = len(data) | |
241 if self.callback_processor: | |
242 self.callback_processor.Progress(read_length) | |
243 if self.outer_digesters: | |
244 for alg in self.outer_digesters: | |
245 self.outer_digesters[alg].update(data) | |
246 return data | |
247 orig_response.read = read | |
248 | |
249 return orig_response | |
250 return DownloadCallbackConnection | |
251 | |
252 | |
253 def WrapDownloadHttpRequest(download_http): | |
254 """Overrides download request functions for an httplib2.Http object. | |
255 | |
256 Args: | |
257 download_http: httplib2.Http.object to wrap / override. | |
258 | |
259 Returns: | |
260 Wrapped / overridden httplib2.Http object. | |
261 """ | |
262 | |
263 # httplib2 has a bug https://code.google.com/p/httplib2/issues/detail?id=305 | |
264 # where custom connection_type is not respected after redirects. This | |
265 # function is copied from httplib2 and overrides the request function so that | |
266 # the connection_type is properly passed through. | |
267 # pylint: disable=protected-access,g-inconsistent-quotes,unused-variable | |
268 # pylint: disable=g-equals-none,g-doc-return-or-yield | |
269 # pylint: disable=g-short-docstring-punctuation,g-doc-args | |
270 # pylint: disable=too-many-statements | |
271 def OverrideRequest(self, conn, host, absolute_uri, request_uri, method, | |
272 body, headers, redirections, cachekey): | |
273 """Do the actual request using the connection object. | |
274 | |
275 Also follow one level of redirects if necessary. | |
276 """ | |
277 | |
278 auths = ([(auth.depth(request_uri), auth) for auth in self.authorizations | |
279 if auth.inscope(host, request_uri)]) | |
280 auth = auths and sorted(auths)[0][1] or None | |
281 if auth: | |
282 auth.request(method, request_uri, headers, body) | |
283 | |
284 (response, content) = self._conn_request(conn, request_uri, method, body, | |
285 headers) | |
286 | |
287 if auth: | |
288 if auth.response(response, body): | |
289 auth.request(method, request_uri, headers, body) | |
290 (response, content) = self._conn_request(conn, request_uri, method, | |
291 body, headers) | |
292 response._stale_digest = 1 | |
293 | |
294 if response.status == 401: | |
295 for authorization in self._auth_from_challenge( | |
296 host, request_uri, headers, response, content): | |
297 authorization.request(method, request_uri, headers, body) | |
298 (response, content) = self._conn_request(conn, request_uri, method, | |
299 body, headers) | |
300 if response.status != 401: | |
301 self.authorizations.append(authorization) | |
302 authorization.response(response, body) | |
303 break | |
304 | |
305 if (self.follow_all_redirects or (method in ["GET", "HEAD"]) | |
306 or response.status == 303): | |
307 if self.follow_redirects and response.status in [300, 301, 302, | |
308 303, 307]: | |
309 # Pick out the location header and basically start from the beginning | |
310 # remembering first to strip the ETag header and decrement our 'depth' | |
311 if redirections: | |
312 if not response.has_key('location') and response.status != 300: | |
313 raise httplib2.RedirectMissingLocation( | |
314 "Redirected but the response is missing a Location: header.", | |
315 response, content) | |
316 # Fix-up relative redirects (which violate an RFC 2616 MUST) | |
317 if response.has_key('location'): | |
318 location = response['location'] | |
319 (scheme, authority, path, query, fragment) = parse_uri(location) | |
320 if authority == None: | |
321 response['location'] = urlparse.urljoin(absolute_uri, location) | |
322 if response.status == 301 and method in ["GET", "HEAD"]: | |
323 response['-x-permanent-redirect-url'] = response['location'] | |
324 if not response.has_key('content-location'): | |
325 response['content-location'] = absolute_uri | |
326 httplib2._updateCache(headers, response, content, self.cache, | |
327 cachekey) | |
328 if headers.has_key('if-none-match'): | |
329 del headers['if-none-match'] | |
330 if headers.has_key('if-modified-since'): | |
331 del headers['if-modified-since'] | |
332 if ('authorization' in headers and | |
333 not self.forward_authorization_headers): | |
334 del headers['authorization'] | |
335 if response.has_key('location'): | |
336 location = response['location'] | |
337 old_response = copy.deepcopy(response) | |
338 if not old_response.has_key('content-location'): | |
339 old_response['content-location'] = absolute_uri | |
340 redirect_method = method | |
341 if response.status in [302, 303]: | |
342 redirect_method = "GET" | |
343 body = None | |
344 (response, content) = self.request( | |
345 location, redirect_method, body=body, headers=headers, | |
346 redirections=redirections-1, | |
347 connection_type=conn.__class__) | |
348 response.previous = old_response | |
349 else: | |
350 raise httplib2.RedirectLimit( | |
351 "Redirected more times than redirection_limit allows.", | |
352 response, content) | |
353 elif response.status in [200, 203] and method in ["GET", "HEAD"]: | |
354 # Don't cache 206's since we aren't going to handle byte range | |
355 # requests | |
356 if not response.has_key('content-location'): | |
357 response['content-location'] = absolute_uri | |
358 httplib2._updateCache(headers, response, content, self.cache, | |
359 cachekey) | |
360 | |
361 return (response, content) | |
362 | |
363 # Wrap download_http so we do not use our custom connection_type | |
364 # on POSTS, which are used to refresh oauth tokens. We don't want to | |
365 # process the data received in those requests. | |
366 request_orig = download_http.request | |
367 def NewRequest(uri, method='GET', body=None, headers=None, | |
368 redirections=httplib2.DEFAULT_MAX_REDIRECTS, | |
369 connection_type=None): | |
370 if method == 'POST': | |
371 return request_orig(uri, method=method, body=body, | |
372 headers=headers, redirections=redirections, | |
373 connection_type=None) | |
374 else: | |
375 return request_orig(uri, method=method, body=body, | |
376 headers=headers, redirections=redirections, | |
377 connection_type=connection_type) | |
378 | |
379 # Replace the request methods with our own closures. | |
380 download_http._request = types.MethodType(OverrideRequest, download_http) | |
381 download_http.request = NewRequest | |
382 | |
383 return download_http | |
384 | |
385 | |
386 class HttpWithNoRetries(httplib2.Http): | |
387 """httplib2.Http variant that does not retry. | |
388 | |
389 httplib2 automatically retries requests according to httplib2.RETRIES, but | |
390 in certain cases httplib2 ignores the RETRIES value and forces a retry. | |
391 Because httplib2 does not handle the case where the underlying request body | |
392 is a stream, a retry may cause a non-idempotent write as the stream is | |
393 partially consumed and not reset before the retry occurs. | |
394 | |
395 Here we override _conn_request to disable retries unequivocally, so that | |
396 uploads may be retried at higher layers that properly handle stream request | |
397 bodies. | |
398 """ | |
399 | |
400 def _conn_request(self, conn, request_uri, method, body, headers): # pylint:
disable=too-many-statements | |
401 | |
402 try: | |
403 if hasattr(conn, 'sock') and conn.sock is None: | |
404 conn.connect() | |
405 conn.request(method, request_uri, body, headers) | |
406 except socket.timeout: | |
407 raise | |
408 except socket.gaierror: | |
409 conn.close() | |
410 raise httplib2.ServerNotFoundError( | |
411 'Unable to find the server at %s' % conn.host) | |
412 except httplib2.ssl_SSLError: | |
413 conn.close() | |
414 raise | |
415 except socket.error, e: | |
416 err = 0 | |
417 if hasattr(e, 'args'): | |
418 err = getattr(e, 'args')[0] | |
419 else: | |
420 err = e.errno | |
421 if err == httplib2.errno.ECONNREFUSED: # Connection refused | |
422 raise | |
423 except httplib.HTTPException: | |
424 conn.close() | |
425 raise | |
426 try: | |
427 response = conn.getresponse() | |
428 except (socket.error, httplib.HTTPException): | |
429 conn.close() | |
430 raise | |
431 else: | |
432 content = '' | |
433 if method == 'HEAD': | |
434 conn.close() | |
435 else: | |
436 content = response.read() | |
437 response = httplib2.Response(response) | |
438 if method != 'HEAD': | |
439 # pylint: disable=protected-access | |
440 content = httplib2._decompressContent(response, content) | |
441 return (response, content) | |
442 | |
443 | |
444 class HttpWithDownloadStream(httplib2.Http): | |
445 """httplib2.Http variant that only pushes bytes through a stream. | |
446 | |
447 httplib2 handles media by storing entire chunks of responses in memory, which | |
448 is undesirable particularly when multiple instances are used during | |
449 multi-threaded/multi-process copy. This class copies and then overrides some | |
450 httplib2 functions to use a streaming copy approach that uses small memory | |
451 buffers. | |
452 | |
453 Also disables httplib2 retries (for reasons stated in the HttpWithNoRetries | |
454 class doc). | |
455 """ | |
456 | |
457 def __init__(self, stream=None, *args, **kwds): | |
458 if stream is None: | |
459 raise apitools_exceptions.InvalidUserInputError( | |
460 'Cannot create HttpWithDownloadStream with no stream') | |
461 self._stream = stream | |
462 self._logger = logging.getLogger() | |
463 super(HttpWithDownloadStream, self).__init__(*args, **kwds) | |
464 | |
465 @property | |
466 def stream(self): | |
467 return self._stream | |
468 | |
469 def _conn_request(self, conn, request_uri, method, body, headers): # pylint:
disable=too-many-statements | |
470 try: | |
471 if hasattr(conn, 'sock') and conn.sock is None: | |
472 conn.connect() | |
473 conn.request(method, request_uri, body, headers) | |
474 except socket.timeout: | |
475 raise | |
476 except socket.gaierror: | |
477 conn.close() | |
478 raise httplib2.ServerNotFoundError( | |
479 'Unable to find the server at %s' % conn.host) | |
480 except httplib2.ssl_SSLError: | |
481 conn.close() | |
482 raise | |
483 except socket.error, e: | |
484 err = 0 | |
485 if hasattr(e, 'args'): | |
486 err = getattr(e, 'args')[0] | |
487 else: | |
488 err = e.errno | |
489 if err == httplib2.errno.ECONNREFUSED: # Connection refused | |
490 raise | |
491 except httplib.HTTPException: | |
492 # Just because the server closed the connection doesn't apparently mean | |
493 # that the server didn't send a response. | |
494 conn.close() | |
495 raise | |
496 try: | |
497 response = conn.getresponse() | |
498 except (socket.error, httplib.HTTPException): | |
499 conn.close() | |
500 raise | |
501 else: | |
502 content = '' | |
503 if method == 'HEAD': | |
504 conn.close() | |
505 response = httplib2.Response(response) | |
506 else: | |
507 if response.status in (httplib.OK, httplib.PARTIAL_CONTENT): | |
508 content_length = None | |
509 if hasattr(response, 'msg'): | |
510 content_length = response.getheader('content-length') | |
511 http_stream = response | |
512 bytes_read = 0 | |
513 while True: | |
514 new_data = http_stream.read(TRANSFER_BUFFER_SIZE) | |
515 if new_data: | |
516 self.stream.write(new_data) | |
517 bytes_read += len(new_data) | |
518 else: | |
519 break | |
520 | |
521 if (content_length is not None and | |
522 long(bytes_read) != long(content_length)): | |
523 # The input stream terminated before we were able to read the | |
524 # entire contents, possibly due to a network condition. Set | |
525 # content-length to indicate how many bytes we actually read. | |
526 self._logger.log( | |
527 logging.DEBUG, 'Only got %s bytes out of content-length %s ' | |
528 'for request URI %s. Resetting content-length to match ' | |
529 'bytes read.', bytes_read, content_length, request_uri) | |
530 response.msg['content-length'] = str(bytes_read) | |
531 response = httplib2.Response(response) | |
532 else: | |
533 # We fall back to the current httplib2 behavior if we're | |
534 # not processing bytes (eg it's a redirect). | |
535 content = response.read() | |
536 response = httplib2.Response(response) | |
537 # pylint: disable=protected-access | |
538 content = httplib2._decompressContent(response, content) | |
539 return (response, content) | |
OLD | NEW |