OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright 2014 Google Inc. All Rights Reserved. |
| 3 # |
| 4 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 # you may not use this file except in compliance with the License. |
| 6 # You may obtain a copy of the License at |
| 7 # |
| 8 # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 # |
| 10 # Unless required by applicable law or agreed to in writing, software |
| 11 # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 # See the License for the specific language governing permissions and |
| 14 # limitations under the License. |
| 15 """Media helper functions and classes for Google Cloud Storage JSON API.""" |
| 16 |
| 17 from __future__ import absolute_import |
| 18 |
| 19 import copy |
| 20 import cStringIO |
| 21 import httplib |
| 22 import logging |
| 23 import socket |
| 24 import types |
| 25 import urlparse |
| 26 |
| 27 from apitools.base.py import exceptions as apitools_exceptions |
| 28 import httplib2 |
| 29 from httplib2 import parse_uri |
| 30 |
| 31 from gslib.cloud_api import BadRequestException |
| 32 from gslib.progress_callback import ProgressCallbackWithBackoff |
| 33 from gslib.util import SSL_TIMEOUT |
| 34 from gslib.util import TRANSFER_BUFFER_SIZE |
| 35 |
| 36 |
| 37 class BytesTransferredContainer(object): |
| 38 """Container class for passing number of bytes transferred to lower layers. |
| 39 |
| 40 For resumed transfers or connection rebuilds in the middle of a transfer, we |
| 41 need to rebuild the connection class with how much we've transferred so far. |
| 42 For uploads, we don't know the total number of bytes uploaded until we've |
| 43 queried the server, but we need to create the connection class to pass to |
| 44 httplib2 before we can query the server. This container object allows us to |
| 45 pass a reference into Upload/DownloadCallbackConnection. |
| 46 """ |
| 47 |
| 48 def __init__(self): |
| 49 self.__bytes_transferred = 0 |
| 50 |
| 51 @property |
| 52 def bytes_transferred(self): |
| 53 return self.__bytes_transferred |
| 54 |
| 55 @bytes_transferred.setter |
| 56 def bytes_transferred(self, value): |
| 57 self.__bytes_transferred = value |
| 58 |
| 59 |
| 60 class UploadCallbackConnectionClassFactory(object): |
| 61 """Creates a class that can override an httplib2 connection. |
| 62 |
| 63 This is used to provide progress callbacks and disable dumping the upload |
| 64 payload during debug statements. It can later be used to provide on-the-fly |
| 65 hash digestion during upload. |
| 66 """ |
| 67 |
| 68 def __init__(self, bytes_uploaded_container, |
| 69 buffer_size=TRANSFER_BUFFER_SIZE, |
| 70 total_size=0, progress_callback=None): |
| 71 self.bytes_uploaded_container = bytes_uploaded_container |
| 72 self.buffer_size = buffer_size |
| 73 self.total_size = total_size |
| 74 self.progress_callback = progress_callback |
| 75 |
| 76 def GetConnectionClass(self): |
| 77 """Returns a connection class that overrides send.""" |
| 78 outer_bytes_uploaded_container = self.bytes_uploaded_container |
| 79 outer_buffer_size = self.buffer_size |
| 80 outer_total_size = self.total_size |
| 81 outer_progress_callback = self.progress_callback |
| 82 |
| 83 class UploadCallbackConnection(httplib2.HTTPSConnectionWithTimeout): |
| 84 """Connection class override for uploads.""" |
| 85 bytes_uploaded_container = outer_bytes_uploaded_container |
| 86 # After we instantiate this class, apitools will check with the server |
| 87 # to find out how many bytes remain for a resumable upload. This allows |
| 88 # us to update our progress once based on that number. |
| 89 processed_initial_bytes = False |
| 90 GCS_JSON_BUFFER_SIZE = outer_buffer_size |
| 91 callback_processor = None |
| 92 size = outer_total_size |
| 93 |
| 94 def __init__(self, *args, **kwargs): |
| 95 kwargs['timeout'] = SSL_TIMEOUT |
| 96 httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs) |
| 97 |
| 98 def send(self, data): |
| 99 """Overrides HTTPConnection.send.""" |
| 100 if not self.processed_initial_bytes: |
| 101 self.processed_initial_bytes = True |
| 102 if outer_progress_callback: |
| 103 self.callback_processor = ProgressCallbackWithBackoff( |
| 104 outer_total_size, outer_progress_callback) |
| 105 self.callback_processor.Progress( |
| 106 self.bytes_uploaded_container.bytes_transferred) |
| 107 # httplib.HTTPConnection.send accepts either a string or a file-like |
| 108 # object (anything that implements read()). |
| 109 if isinstance(data, basestring): |
| 110 full_buffer = cStringIO.StringIO(data) |
| 111 else: |
| 112 full_buffer = data |
| 113 partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE) |
| 114 while partial_buffer: |
| 115 httplib2.HTTPSConnectionWithTimeout.send(self, partial_buffer) |
| 116 send_length = len(partial_buffer) |
| 117 if self.callback_processor: |
| 118 # This is the only place where gsutil has control over making a |
| 119 # callback, but here we can't differentiate the metadata bytes |
| 120 # (such as headers and OAuth2 refreshes) sent during an upload |
| 121 # from the actual upload bytes, so we will actually report |
| 122 # slightly more bytes than desired to the callback handler. |
| 123 # |
| 124 # One considered/rejected alternative is to move the callbacks |
| 125 # into the HashingFileUploadWrapper which only processes reads on |
| 126 # the bytes. This has the disadvantages of being removed from |
| 127 # where we actually send the bytes and unnecessarily |
| 128 # multi-purposing that class. |
| 129 self.callback_processor.Progress(send_length) |
| 130 partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE) |
| 131 |
| 132 return UploadCallbackConnection |
| 133 |
| 134 |
| 135 def WrapUploadHttpRequest(upload_http): |
| 136 """Wraps upload_http so we only use our custom connection_type on PUTs. |
| 137 |
| 138 POSTs are used to refresh oauth tokens, and we don't want to process the |
| 139 data sent in those requests. |
| 140 |
| 141 Args: |
| 142 upload_http: httplib2.Http instance to wrap |
| 143 """ |
| 144 request_orig = upload_http.request |
| 145 def NewRequest(uri, method='GET', body=None, headers=None, |
| 146 redirections=httplib2.DEFAULT_MAX_REDIRECTS, |
| 147 connection_type=None): |
| 148 if method == 'PUT' or method == 'POST': |
| 149 override_connection_type = connection_type |
| 150 else: |
| 151 override_connection_type = None |
| 152 return request_orig(uri, method=method, body=body, |
| 153 headers=headers, redirections=redirections, |
| 154 connection_type=override_connection_type) |
| 155 # Replace the request method with our own closure. |
| 156 upload_http.request = NewRequest |
| 157 |
| 158 |
| 159 class DownloadCallbackConnectionClassFactory(object): |
| 160 """Creates a class that can override an httplib2 connection. |
| 161 |
| 162 This is used to provide progress callbacks, disable dumping the download |
| 163 payload during debug statements, and provide on-the-fly hash digestion during |
| 164 download. On-the-fly digestion is particularly important because httplib2 |
| 165 will decompress gzipped content on-the-fly, thus this class provides our |
| 166 only opportunity to calculate the correct hash for an object that has a |
| 167 gzip hash in the cloud. |
| 168 """ |
| 169 |
| 170 def __init__(self, bytes_downloaded_container, |
| 171 buffer_size=TRANSFER_BUFFER_SIZE, total_size=0, |
| 172 progress_callback=None, digesters=None): |
| 173 self.buffer_size = buffer_size |
| 174 self.total_size = total_size |
| 175 self.progress_callback = progress_callback |
| 176 self.digesters = digesters |
| 177 self.bytes_downloaded_container = bytes_downloaded_container |
| 178 |
| 179 def GetConnectionClass(self): |
| 180 """Returns a connection class that overrides getresponse.""" |
| 181 |
| 182 class DownloadCallbackConnection(httplib2.HTTPSConnectionWithTimeout): |
| 183 """Connection class override for downloads.""" |
| 184 outer_total_size = self.total_size |
| 185 outer_digesters = self.digesters |
| 186 outer_progress_callback = self.progress_callback |
| 187 outer_bytes_downloaded_container = self.bytes_downloaded_container |
| 188 processed_initial_bytes = False |
| 189 callback_processor = None |
| 190 |
| 191 def __init__(self, *args, **kwargs): |
| 192 kwargs['timeout'] = SSL_TIMEOUT |
| 193 httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs) |
| 194 |
| 195 def getresponse(self, buffering=False): |
| 196 """Wraps an HTTPResponse to perform callbacks and hashing. |
| 197 |
| 198 In this function, self is a DownloadCallbackConnection. |
| 199 |
| 200 Args: |
| 201 buffering: Unused. This function uses a local buffer. |
| 202 |
| 203 Returns: |
| 204 HTTPResponse object with wrapped read function. |
| 205 """ |
| 206 orig_response = httplib.HTTPConnection.getresponse(self) |
| 207 if orig_response.status not in (httplib.OK, httplib.PARTIAL_CONTENT): |
| 208 return orig_response |
| 209 orig_read_func = orig_response.read |
| 210 |
| 211 def read(amt=None): # pylint: disable=invalid-name |
| 212 """Overrides HTTPConnection.getresponse.read. |
| 213 |
| 214 This function only supports reads of TRANSFER_BUFFER_SIZE or smaller. |
| 215 |
| 216 Args: |
| 217 amt: Integer n where 0 < n <= TRANSFER_BUFFER_SIZE. This is a |
| 218 keyword argument to match the read function it overrides, |
| 219 but it is required. |
| 220 |
| 221 Returns: |
| 222 Data read from HTTPConnection. |
| 223 """ |
| 224 if not amt or amt > TRANSFER_BUFFER_SIZE: |
| 225 raise BadRequestException( |
| 226 'Invalid HTTP read size %s during download, expected %s.' % |
| 227 (amt, TRANSFER_BUFFER_SIZE)) |
| 228 else: |
| 229 amt = amt or TRANSFER_BUFFER_SIZE |
| 230 |
| 231 if not self.processed_initial_bytes: |
| 232 self.processed_initial_bytes = True |
| 233 if self.outer_progress_callback: |
| 234 self.callback_processor = ProgressCallbackWithBackoff( |
| 235 self.outer_total_size, self.outer_progress_callback) |
| 236 self.callback_processor.Progress( |
| 237 self.outer_bytes_downloaded_container.bytes_transferred) |
| 238 |
| 239 data = orig_read_func(amt) |
| 240 read_length = len(data) |
| 241 if self.callback_processor: |
| 242 self.callback_processor.Progress(read_length) |
| 243 if self.outer_digesters: |
| 244 for alg in self.outer_digesters: |
| 245 self.outer_digesters[alg].update(data) |
| 246 return data |
| 247 orig_response.read = read |
| 248 |
| 249 return orig_response |
| 250 return DownloadCallbackConnection |
| 251 |
| 252 |
| 253 def WrapDownloadHttpRequest(download_http): |
| 254 """Overrides download request functions for an httplib2.Http object. |
| 255 |
| 256 Args: |
| 257 download_http: httplib2.Http.object to wrap / override. |
| 258 |
| 259 Returns: |
| 260 Wrapped / overridden httplib2.Http object. |
| 261 """ |
| 262 |
| 263 # httplib2 has a bug https://code.google.com/p/httplib2/issues/detail?id=305 |
| 264 # where custom connection_type is not respected after redirects. This |
| 265 # function is copied from httplib2 and overrides the request function so that |
| 266 # the connection_type is properly passed through. |
| 267 # pylint: disable=protected-access,g-inconsistent-quotes,unused-variable |
| 268 # pylint: disable=g-equals-none,g-doc-return-or-yield |
| 269 # pylint: disable=g-short-docstring-punctuation,g-doc-args |
| 270 # pylint: disable=too-many-statements |
| 271 def OverrideRequest(self, conn, host, absolute_uri, request_uri, method, |
| 272 body, headers, redirections, cachekey): |
| 273 """Do the actual request using the connection object. |
| 274 |
| 275 Also follow one level of redirects if necessary. |
| 276 """ |
| 277 |
| 278 auths = ([(auth.depth(request_uri), auth) for auth in self.authorizations |
| 279 if auth.inscope(host, request_uri)]) |
| 280 auth = auths and sorted(auths)[0][1] or None |
| 281 if auth: |
| 282 auth.request(method, request_uri, headers, body) |
| 283 |
| 284 (response, content) = self._conn_request(conn, request_uri, method, body, |
| 285 headers) |
| 286 |
| 287 if auth: |
| 288 if auth.response(response, body): |
| 289 auth.request(method, request_uri, headers, body) |
| 290 (response, content) = self._conn_request(conn, request_uri, method, |
| 291 body, headers) |
| 292 response._stale_digest = 1 |
| 293 |
| 294 if response.status == 401: |
| 295 for authorization in self._auth_from_challenge( |
| 296 host, request_uri, headers, response, content): |
| 297 authorization.request(method, request_uri, headers, body) |
| 298 (response, content) = self._conn_request(conn, request_uri, method, |
| 299 body, headers) |
| 300 if response.status != 401: |
| 301 self.authorizations.append(authorization) |
| 302 authorization.response(response, body) |
| 303 break |
| 304 |
| 305 if (self.follow_all_redirects or (method in ["GET", "HEAD"]) |
| 306 or response.status == 303): |
| 307 if self.follow_redirects and response.status in [300, 301, 302, |
| 308 303, 307]: |
| 309 # Pick out the location header and basically start from the beginning |
| 310 # remembering first to strip the ETag header and decrement our 'depth' |
| 311 if redirections: |
| 312 if not response.has_key('location') and response.status != 300: |
| 313 raise httplib2.RedirectMissingLocation( |
| 314 "Redirected but the response is missing a Location: header.", |
| 315 response, content) |
| 316 # Fix-up relative redirects (which violate an RFC 2616 MUST) |
| 317 if response.has_key('location'): |
| 318 location = response['location'] |
| 319 (scheme, authority, path, query, fragment) = parse_uri(location) |
| 320 if authority == None: |
| 321 response['location'] = urlparse.urljoin(absolute_uri, location) |
| 322 if response.status == 301 and method in ["GET", "HEAD"]: |
| 323 response['-x-permanent-redirect-url'] = response['location'] |
| 324 if not response.has_key('content-location'): |
| 325 response['content-location'] = absolute_uri |
| 326 httplib2._updateCache(headers, response, content, self.cache, |
| 327 cachekey) |
| 328 if headers.has_key('if-none-match'): |
| 329 del headers['if-none-match'] |
| 330 if headers.has_key('if-modified-since'): |
| 331 del headers['if-modified-since'] |
| 332 if ('authorization' in headers and |
| 333 not self.forward_authorization_headers): |
| 334 del headers['authorization'] |
| 335 if response.has_key('location'): |
| 336 location = response['location'] |
| 337 old_response = copy.deepcopy(response) |
| 338 if not old_response.has_key('content-location'): |
| 339 old_response['content-location'] = absolute_uri |
| 340 redirect_method = method |
| 341 if response.status in [302, 303]: |
| 342 redirect_method = "GET" |
| 343 body = None |
| 344 (response, content) = self.request( |
| 345 location, redirect_method, body=body, headers=headers, |
| 346 redirections=redirections-1, |
| 347 connection_type=conn.__class__) |
| 348 response.previous = old_response |
| 349 else: |
| 350 raise httplib2.RedirectLimit( |
| 351 "Redirected more times than redirection_limit allows.", |
| 352 response, content) |
| 353 elif response.status in [200, 203] and method in ["GET", "HEAD"]: |
| 354 # Don't cache 206's since we aren't going to handle byte range |
| 355 # requests |
| 356 if not response.has_key('content-location'): |
| 357 response['content-location'] = absolute_uri |
| 358 httplib2._updateCache(headers, response, content, self.cache, |
| 359 cachekey) |
| 360 |
| 361 return (response, content) |
| 362 |
| 363 # Wrap download_http so we do not use our custom connection_type |
| 364 # on POSTS, which are used to refresh oauth tokens. We don't want to |
| 365 # process the data received in those requests. |
| 366 request_orig = download_http.request |
| 367 def NewRequest(uri, method='GET', body=None, headers=None, |
| 368 redirections=httplib2.DEFAULT_MAX_REDIRECTS, |
| 369 connection_type=None): |
| 370 if method == 'POST': |
| 371 return request_orig(uri, method=method, body=body, |
| 372 headers=headers, redirections=redirections, |
| 373 connection_type=None) |
| 374 else: |
| 375 return request_orig(uri, method=method, body=body, |
| 376 headers=headers, redirections=redirections, |
| 377 connection_type=connection_type) |
| 378 |
| 379 # Replace the request methods with our own closures. |
| 380 download_http._request = types.MethodType(OverrideRequest, download_http) |
| 381 download_http.request = NewRequest |
| 382 |
| 383 return download_http |
| 384 |
| 385 |
| 386 class HttpWithNoRetries(httplib2.Http): |
| 387 """httplib2.Http variant that does not retry. |
| 388 |
| 389 httplib2 automatically retries requests according to httplib2.RETRIES, but |
| 390 in certain cases httplib2 ignores the RETRIES value and forces a retry. |
| 391 Because httplib2 does not handle the case where the underlying request body |
| 392 is a stream, a retry may cause a non-idempotent write as the stream is |
| 393 partially consumed and not reset before the retry occurs. |
| 394 |
| 395 Here we override _conn_request to disable retries unequivocally, so that |
| 396 uploads may be retried at higher layers that properly handle stream request |
| 397 bodies. |
| 398 """ |
| 399 |
| 400 def _conn_request(self, conn, request_uri, method, body, headers): # pylint:
disable=too-many-statements |
| 401 |
| 402 try: |
| 403 if hasattr(conn, 'sock') and conn.sock is None: |
| 404 conn.connect() |
| 405 conn.request(method, request_uri, body, headers) |
| 406 except socket.timeout: |
| 407 raise |
| 408 except socket.gaierror: |
| 409 conn.close() |
| 410 raise httplib2.ServerNotFoundError( |
| 411 'Unable to find the server at %s' % conn.host) |
| 412 except httplib2.ssl_SSLError: |
| 413 conn.close() |
| 414 raise |
| 415 except socket.error, e: |
| 416 err = 0 |
| 417 if hasattr(e, 'args'): |
| 418 err = getattr(e, 'args')[0] |
| 419 else: |
| 420 err = e.errno |
| 421 if err == httplib2.errno.ECONNREFUSED: # Connection refused |
| 422 raise |
| 423 except httplib.HTTPException: |
| 424 conn.close() |
| 425 raise |
| 426 try: |
| 427 response = conn.getresponse() |
| 428 except (socket.error, httplib.HTTPException): |
| 429 conn.close() |
| 430 raise |
| 431 else: |
| 432 content = '' |
| 433 if method == 'HEAD': |
| 434 conn.close() |
| 435 else: |
| 436 content = response.read() |
| 437 response = httplib2.Response(response) |
| 438 if method != 'HEAD': |
| 439 # pylint: disable=protected-access |
| 440 content = httplib2._decompressContent(response, content) |
| 441 return (response, content) |
| 442 |
| 443 |
| 444 class HttpWithDownloadStream(httplib2.Http): |
| 445 """httplib2.Http variant that only pushes bytes through a stream. |
| 446 |
| 447 httplib2 handles media by storing entire chunks of responses in memory, which |
| 448 is undesirable particularly when multiple instances are used during |
| 449 multi-threaded/multi-process copy. This class copies and then overrides some |
| 450 httplib2 functions to use a streaming copy approach that uses small memory |
| 451 buffers. |
| 452 |
| 453 Also disables httplib2 retries (for reasons stated in the HttpWithNoRetries |
| 454 class doc). |
| 455 """ |
| 456 |
| 457 def __init__(self, stream=None, *args, **kwds): |
| 458 if stream is None: |
| 459 raise apitools_exceptions.InvalidUserInputError( |
| 460 'Cannot create HttpWithDownloadStream with no stream') |
| 461 self._stream = stream |
| 462 self._logger = logging.getLogger() |
| 463 super(HttpWithDownloadStream, self).__init__(*args, **kwds) |
| 464 |
| 465 @property |
| 466 def stream(self): |
| 467 return self._stream |
| 468 |
| 469 def _conn_request(self, conn, request_uri, method, body, headers): # pylint:
disable=too-many-statements |
| 470 try: |
| 471 if hasattr(conn, 'sock') and conn.sock is None: |
| 472 conn.connect() |
| 473 conn.request(method, request_uri, body, headers) |
| 474 except socket.timeout: |
| 475 raise |
| 476 except socket.gaierror: |
| 477 conn.close() |
| 478 raise httplib2.ServerNotFoundError( |
| 479 'Unable to find the server at %s' % conn.host) |
| 480 except httplib2.ssl_SSLError: |
| 481 conn.close() |
| 482 raise |
| 483 except socket.error, e: |
| 484 err = 0 |
| 485 if hasattr(e, 'args'): |
| 486 err = getattr(e, 'args')[0] |
| 487 else: |
| 488 err = e.errno |
| 489 if err == httplib2.errno.ECONNREFUSED: # Connection refused |
| 490 raise |
| 491 except httplib.HTTPException: |
| 492 # Just because the server closed the connection doesn't apparently mean |
| 493 # that the server didn't send a response. |
| 494 conn.close() |
| 495 raise |
| 496 try: |
| 497 response = conn.getresponse() |
| 498 except (socket.error, httplib.HTTPException): |
| 499 conn.close() |
| 500 raise |
| 501 else: |
| 502 content = '' |
| 503 if method == 'HEAD': |
| 504 conn.close() |
| 505 response = httplib2.Response(response) |
| 506 else: |
| 507 if response.status in (httplib.OK, httplib.PARTIAL_CONTENT): |
| 508 content_length = None |
| 509 if hasattr(response, 'msg'): |
| 510 content_length = response.getheader('content-length') |
| 511 http_stream = response |
| 512 bytes_read = 0 |
| 513 while True: |
| 514 new_data = http_stream.read(TRANSFER_BUFFER_SIZE) |
| 515 if new_data: |
| 516 self.stream.write(new_data) |
| 517 bytes_read += len(new_data) |
| 518 else: |
| 519 break |
| 520 |
| 521 if (content_length is not None and |
| 522 long(bytes_read) != long(content_length)): |
| 523 # The input stream terminated before we were able to read the |
| 524 # entire contents, possibly due to a network condition. Set |
| 525 # content-length to indicate how many bytes we actually read. |
| 526 self._logger.log( |
| 527 logging.DEBUG, 'Only got %s bytes out of content-length %s ' |
| 528 'for request URI %s. Resetting content-length to match ' |
| 529 'bytes read.', bytes_read, content_length, request_uri) |
| 530 response.msg['content-length'] = str(bytes_read) |
| 531 response = httplib2.Response(response) |
| 532 else: |
| 533 # We fall back to the current httplib2 behavior if we're |
| 534 # not processing bytes (eg it's a redirect). |
| 535 content = response.read() |
| 536 response = httplib2.Response(response) |
| 537 # pylint: disable=protected-access |
| 538 content = httplib2._decompressContent(response, content) |
| 539 return (response, content) |
OLD | NEW |