Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: gslib/gcs_json_media.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « gslib/gcs_json_api.py ('k') | gslib/hashing_helper.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Property Changes:
Added: svn:eol-style
+ LF
OLDNEW
(Empty)
1 # -*- coding: utf-8 -*-
2 # Copyright 2014 Google Inc. All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Media helper functions and classes for Google Cloud Storage JSON API."""
16
17 from __future__ import absolute_import
18
19 import copy
20 import cStringIO
21 import httplib
22 import socket
23 import types
24 import urlparse
25
26 import httplib2
27 from httplib2 import parse_uri
28
29 from gslib.cloud_api import BadRequestException
30 from gslib.progress_callback import ProgressCallbackWithBackoff
31 from gslib.third_party.storage_apitools import exceptions as apitools_exceptions
32 from gslib.util import SSL_TIMEOUT
33 from gslib.util import TRANSFER_BUFFER_SIZE
34
35
36 class BytesTransferredContainer(object):
37 """Container class for passing number of bytes transferred to lower layers.
38
39 For resumed transfers or connection rebuilds in the middle of a transfer, we
40 need to rebuild the connection class with how much we've transferred so far.
41 For uploads, we don't know the total number of bytes uploaded until we've
42 queried the server, but we need to create the connection class to pass to
43 httplib2 before we can query the server. This container object allows us to
44 pass a reference into Upload/DownloadCallbackConnection.
45 """
46
47 def __init__(self):
48 self.__bytes_transferred = 0
49
50 @property
51 def bytes_transferred(self):
52 return self.__bytes_transferred
53
54 @bytes_transferred.setter
55 def bytes_transferred(self, value):
56 self.__bytes_transferred = value
57
58
59 class UploadCallbackConnectionClassFactory(object):
60 """Creates a class that can override an httplib2 connection.
61
62 This is used to provide progress callbacks and disable dumping the upload
63 payload during debug statements. It can later be used to provide on-the-fly
64 hash digestion during upload.
65 """
66
67 def __init__(self, bytes_uploaded_container,
68 buffer_size=TRANSFER_BUFFER_SIZE,
69 total_size=0, progress_callback=None):
70 self.bytes_uploaded_container = bytes_uploaded_container
71 self.buffer_size = buffer_size
72 self.total_size = total_size
73 self.progress_callback = progress_callback
74
75 def GetConnectionClass(self):
76 """Returns a connection class that overrides send."""
77 outer_bytes_uploaded_container = self.bytes_uploaded_container
78 outer_buffer_size = self.buffer_size
79 outer_total_size = self.total_size
80 outer_progress_callback = self.progress_callback
81
82 class UploadCallbackConnection(httplib2.HTTPSConnectionWithTimeout):
83 """Connection class override for uploads."""
84 bytes_uploaded_container = outer_bytes_uploaded_container
85 # After we instantiate this class, apitools will check with the server
86 # to find out how many bytes remain for a resumable upload. This allows
87 # us to update our progress once based on that number.
88 processed_initial_bytes = False
89 GCS_JSON_BUFFER_SIZE = outer_buffer_size
90 callback_processor = None
91 size = outer_total_size
92
93 def __init__(self, *args, **kwargs):
94 kwargs['timeout'] = SSL_TIMEOUT
95 httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs)
96
97 def send(self, data):
98 """Overrides HTTPConnection.send."""
99 if not self.processed_initial_bytes:
100 self.processed_initial_bytes = True
101 if outer_progress_callback:
102 self.callback_processor = ProgressCallbackWithBackoff(
103 outer_total_size, outer_progress_callback)
104 self.callback_processor.Progress(
105 self.bytes_uploaded_container.bytes_transferred)
106 # httplib.HTTPConnection.send accepts either a string or a file-like
107 # object (anything that implements read()).
108 if isinstance(data, basestring):
109 full_buffer = cStringIO.StringIO(data)
110 else:
111 full_buffer = data
112 partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE)
113 while partial_buffer:
114 httplib2.HTTPSConnectionWithTimeout.send(self, partial_buffer)
115 send_length = len(partial_buffer)
116 if self.callback_processor:
117 # This is the only place where gsutil has control over making a
118 # callback, but here we can't differentiate the metadata bytes
119 # (such as headers and OAuth2 refreshes) sent during an upload
120 # from the actual upload bytes, so we will actually report
121 # slightly more bytes than desired to the callback handler.
122 #
123 # One considered/rejected alternative is to move the callbacks
124 # into the HashingFileUploadWrapper which only processes reads on
125 # the bytes. This has the disadvantages of being removed from
126 # where we actually send the bytes and unnecessarily
127 # multi-purposing that class.
128 self.callback_processor.Progress(send_length)
129 partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE)
130
131 return UploadCallbackConnection
132
133
134 def WrapUploadHttpRequest(upload_http):
135 """Wraps upload_http so we only use our custom connection_type on PUTs.
136
137 POSTs are used to refresh oauth tokens, and we don't want to process the
138 data sent in those requests.
139
140 Args:
141 upload_http: httplib2.Http instance to wrap
142 """
143 request_orig = upload_http.request
144 def NewRequest(uri, method='GET', body=None, headers=None,
145 redirections=httplib2.DEFAULT_MAX_REDIRECTS,
146 connection_type=None):
147 if method == 'PUT' or method == 'POST':
148 override_connection_type = connection_type
149 else:
150 override_connection_type = None
151 return request_orig(uri, method=method, body=body,
152 headers=headers, redirections=redirections,
153 connection_type=override_connection_type)
154 # Replace the request method with our own closure.
155 upload_http.request = NewRequest
156
157
158 class DownloadCallbackConnectionClassFactory(object):
159 """Creates a class that can override an httplib2 connection.
160
161 This is used to provide progress callbacks, disable dumping the download
162 payload during debug statements, and provide on-the-fly hash digestion during
163 download. On-the-fly digestion is particularly important because httplib2
164 will decompress gzipped content on-the-fly, thus this class provides our
165 only opportunity to calculate the correct hash for an object that has a
166 gzip hash in the cloud.
167 """
168
169 def __init__(self, bytes_downloaded_container,
170 buffer_size=TRANSFER_BUFFER_SIZE, total_size=0,
171 progress_callback=None, digesters=None):
172 self.buffer_size = buffer_size
173 self.total_size = total_size
174 self.progress_callback = progress_callback
175 self.digesters = digesters
176 self.bytes_downloaded_container = bytes_downloaded_container
177
178 def GetConnectionClass(self):
179 """Returns a connection class that overrides getresponse."""
180
181 class DownloadCallbackConnection(httplib2.HTTPSConnectionWithTimeout):
182 """Connection class override for downloads."""
183 outer_total_size = self.total_size
184 outer_digesters = self.digesters
185 outer_progress_callback = self.progress_callback
186 outer_bytes_downloaded_container = self.bytes_downloaded_container
187 processed_initial_bytes = False
188 callback_processor = None
189
190 def __init__(self, *args, **kwargs):
191 kwargs['timeout'] = SSL_TIMEOUT
192 httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs)
193
194 def getresponse(self, buffering=False):
195 """Wraps an HTTPResponse to perform callbacks and hashing.
196
197 In this function, self is a DownloadCallbackConnection.
198
199 Args:
200 buffering: Unused. This function uses a local buffer.
201
202 Returns:
203 HTTPResponse object with wrapped read function.
204 """
205 orig_response = httplib.HTTPConnection.getresponse(self)
206 if orig_response.status not in (httplib.OK, httplib.PARTIAL_CONTENT):
207 return orig_response
208 orig_read_func = orig_response.read
209
210 def read(amt=None): # pylint: disable=invalid-name
211 """Overrides HTTPConnection.getresponse.read.
212
213 This function only supports reads of TRANSFER_BUFFER_SIZE or smaller.
214
215 Args:
216 amt: Integer n where 0 < n <= TRANSFER_BUFFER_SIZE. This is a
217 keyword argument to match the read function it overrides,
218 but it is required.
219
220 Returns:
221 Data read from HTTPConnection.
222 """
223 if not amt or amt > TRANSFER_BUFFER_SIZE:
224 raise BadRequestException(
225 'Invalid HTTP read size %s during download, expected %s.' %
226 (amt, TRANSFER_BUFFER_SIZE))
227 else:
228 amt = amt or TRANSFER_BUFFER_SIZE
229
230 if not self.processed_initial_bytes:
231 self.processed_initial_bytes = True
232 if self.outer_progress_callback:
233 self.callback_processor = ProgressCallbackWithBackoff(
234 self.outer_total_size, self.outer_progress_callback)
235 self.callback_processor.Progress(
236 self.outer_bytes_downloaded_container.bytes_transferred)
237
238 data = orig_read_func(amt)
239 read_length = len(data)
240 if self.callback_processor:
241 self.callback_processor.Progress(read_length)
242 if self.outer_digesters:
243 for alg in self.outer_digesters:
244 self.outer_digesters[alg].update(data)
245 return data
246 orig_response.read = read
247
248 return orig_response
249 return DownloadCallbackConnection
250
251
252 def WrapDownloadHttpRequest(download_http):
253 """Overrides download request functions for an httplib2.Http object.
254
255 Args:
256 download_http: httplib2.Http.object to wrap / override.
257
258 Returns:
259 Wrapped / overridden httplib2.Http object.
260 """
261
262 # httplib2 has a bug https://code.google.com/p/httplib2/issues/detail?id=305
263 # where custom connection_type is not respected after redirects. This
264 # function is copied from httplib2 and overrides the request function so that
265 # the connection_type is properly passed through.
266 # pylint: disable=protected-access,g-inconsistent-quotes,unused-variable
267 # pylint: disable=g-equals-none,g-doc-return-or-yield
268 # pylint: disable=g-short-docstring-punctuation,g-doc-args
269 # pylint: disable=too-many-statements
270 def OverrideRequest(self, conn, host, absolute_uri, request_uri, method,
271 body, headers, redirections, cachekey):
272 """Do the actual request using the connection object.
273
274 Also follow one level of redirects if necessary.
275 """
276
277 auths = ([(auth.depth(request_uri), auth) for auth in self.authorizations
278 if auth.inscope(host, request_uri)])
279 auth = auths and sorted(auths)[0][1] or None
280 if auth:
281 auth.request(method, request_uri, headers, body)
282
283 (response, content) = self._conn_request(conn, request_uri, method, body,
284 headers)
285
286 if auth:
287 if auth.response(response, body):
288 auth.request(method, request_uri, headers, body)
289 (response, content) = self._conn_request(conn, request_uri, method,
290 body, headers)
291 response._stale_digest = 1
292
293 if response.status == 401:
294 for authorization in self._auth_from_challenge(
295 host, request_uri, headers, response, content):
296 authorization.request(method, request_uri, headers, body)
297 (response, content) = self._conn_request(conn, request_uri, method,
298 body, headers)
299 if response.status != 401:
300 self.authorizations.append(authorization)
301 authorization.response(response, body)
302 break
303
304 if (self.follow_all_redirects or (method in ["GET", "HEAD"])
305 or response.status == 303):
306 if self.follow_redirects and response.status in [300, 301, 302,
307 303, 307]:
308 # Pick out the location header and basically start from the beginning
309 # remembering first to strip the ETag header and decrement our 'depth'
310 if redirections:
311 if not response.has_key('location') and response.status != 300:
312 raise httplib2.RedirectMissingLocation(
313 "Redirected but the response is missing a Location: header.",
314 response, content)
315 # Fix-up relative redirects (which violate an RFC 2616 MUST)
316 if response.has_key('location'):
317 location = response['location']
318 (scheme, authority, path, query, fragment) = parse_uri(location)
319 if authority == None:
320 response['location'] = urlparse.urljoin(absolute_uri, location)
321 if response.status == 301 and method in ["GET", "HEAD"]:
322 response['-x-permanent-redirect-url'] = response['location']
323 if not response.has_key('content-location'):
324 response['content-location'] = absolute_uri
325 httplib2._updateCache(headers, response, content, self.cache,
326 cachekey)
327 if headers.has_key('if-none-match'):
328 del headers['if-none-match']
329 if headers.has_key('if-modified-since'):
330 del headers['if-modified-since']
331 if ('authorization' in headers and
332 not self.forward_authorization_headers):
333 del headers['authorization']
334 if response.has_key('location'):
335 location = response['location']
336 old_response = copy.deepcopy(response)
337 if not old_response.has_key('content-location'):
338 old_response['content-location'] = absolute_uri
339 redirect_method = method
340 if response.status in [302, 303]:
341 redirect_method = "GET"
342 body = None
343 (response, content) = self.request(
344 location, redirect_method, body=body, headers=headers,
345 redirections=redirections-1,
346 connection_type=conn.__class__)
347 response.previous = old_response
348 else:
349 raise httplib2.RedirectLimit(
350 "Redirected more times than redirection_limit allows.",
351 response, content)
352 elif response.status in [200, 203] and method in ["GET", "HEAD"]:
353 # Don't cache 206's since we aren't going to handle byte range
354 # requests
355 if not response.has_key('content-location'):
356 response['content-location'] = absolute_uri
357 httplib2._updateCache(headers, response, content, self.cache,
358 cachekey)
359
360 return (response, content)
361
362 # Wrap download_http so we do not use our custom connection_type
363 # on POSTS, which are used to refresh oauth tokens. We don't want to
364 # process the data received in those requests.
365 request_orig = download_http.request
366 def NewRequest(uri, method='GET', body=None, headers=None,
367 redirections=httplib2.DEFAULT_MAX_REDIRECTS,
368 connection_type=None):
369 if method == 'POST':
370 return request_orig(uri, method=method, body=body,
371 headers=headers, redirections=redirections,
372 connection_type=None)
373 else:
374 return request_orig(uri, method=method, body=body,
375 headers=headers, redirections=redirections,
376 connection_type=connection_type)
377
378 # Replace the request methods with our own closures.
379 download_http._request = types.MethodType(OverrideRequest, download_http)
380 download_http.request = NewRequest
381
382 return download_http
383
384
385 class HttpWithDownloadStream(httplib2.Http):
386 """httplib2.Http variant that only pushes bytes through a stream.
387
388 httplib2 handles media by storing entire chunks of responses in memory, which
389 is undesirable particularly when multiple instances are used during
390 multi-threaded/multi-process copy. This class copies and then overrides some
391 httplib2 functions to use a streaming copy approach that uses small memory
392 buffers.
393 """
394
395 def __init__(self, stream=None, *args, **kwds):
396 if stream is None:
397 raise apitools_exceptions.InvalidUserInputError(
398 'Cannot create HttpWithDownloadStream with no stream')
399 self._stream = stream
400 super(HttpWithDownloadStream, self).__init__(*args, **kwds)
401
402 @property
403 def stream(self):
404 return self._stream
405
406 # pylint: disable=too-many-statements
407 def _conn_request(self, conn, request_uri, method, body, headers):
408 i = 0
409 seen_bad_status_line = False
410 while i < httplib2.RETRIES:
411 i += 1
412 try:
413 if hasattr(conn, 'sock') and conn.sock is None:
414 conn.connect()
415 conn.request(method, request_uri, body, headers)
416 except socket.timeout:
417 raise
418 except socket.gaierror:
419 conn.close()
420 raise httplib2.ServerNotFoundError(
421 'Unable to find the server at %s' % conn.host)
422 except httplib2.ssl_SSLError:
423 conn.close()
424 raise
425 except socket.error, e:
426 err = 0
427 if hasattr(e, 'args'):
428 err = getattr(e, 'args')[0]
429 else:
430 err = e.errno
431 if err == httplib2.errno.ECONNREFUSED: # Connection refused
432 raise
433 except httplib.HTTPException:
434 # Just because the server closed the connection doesn't apparently mean
435 # that the server didn't send a response.
436 if hasattr(conn, 'sock') and conn.sock is None:
437 if i < httplib2.RETRIES-1:
438 conn.close()
439 conn.connect()
440 continue
441 else:
442 conn.close()
443 raise
444 if i < httplib2.RETRIES-1:
445 conn.close()
446 conn.connect()
447 continue
448 try:
449 response = conn.getresponse()
450 except httplib.BadStatusLine:
451 # If we get a BadStatusLine on the first try then that means
452 # the connection just went stale, so retry regardless of the
453 # number of RETRIES set.
454 if not seen_bad_status_line and i == 1:
455 i = 0
456 seen_bad_status_line = True
457 conn.close()
458 conn.connect()
459 continue
460 else:
461 conn.close()
462 raise
463 except (socket.error, httplib.HTTPException):
464 if i < httplib2.RETRIES-1:
465 conn.close()
466 conn.connect()
467 continue
468 else:
469 conn.close()
470 raise
471 else:
472 content = ''
473 if method == 'HEAD':
474 conn.close()
475 response = httplib2.Response(response)
476 else:
477 if response.status in (httplib.OK, httplib.PARTIAL_CONTENT):
478 http_stream = response
479 # Start last_position and new_position at dummy values
480 last_position = -1
481 new_position = 0
482 while new_position != last_position:
483 last_position = new_position
484 new_data = http_stream.read(TRANSFER_BUFFER_SIZE)
485 self.stream.write(new_data)
486 new_position += len(new_data)
487 response = httplib2.Response(response)
488 else:
489 # We fall back to the current httplib2 behavior if we're
490 # not processing bytes (eg it's a redirect).
491 content = response.read()
492 response = httplib2.Response(response)
493 # pylint: disable=protected-access
494 content = httplib2._decompressContent(response, content)
495 break
496 return (response, content)
OLDNEW
« no previous file with comments | « gslib/gcs_json_api.py ('k') | gslib/hashing_helper.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698