| Index: tools/telemetry/third_party/gsutilz/gslib/gcs_json_media.py
|
| diff --git a/tools/telemetry/third_party/gsutilz/gslib/gcs_json_media.py b/tools/telemetry/third_party/gsutilz/gslib/gcs_json_media.py
|
| deleted file mode 100644
|
| index 45440f9e727e4ba9bf7d1403822013982ede52e4..0000000000000000000000000000000000000000
|
| --- a/tools/telemetry/third_party/gsutilz/gslib/gcs_json_media.py
|
| +++ /dev/null
|
| @@ -1,539 +0,0 @@
|
| -# -*- coding: utf-8 -*-
|
| -# Copyright 2014 Google Inc. All Rights Reserved.
|
| -#
|
| -# Licensed under the Apache License, Version 2.0 (the "License");
|
| -# you may not use this file except in compliance with the License.
|
| -# You may obtain a copy of the License at
|
| -#
|
| -# http://www.apache.org/licenses/LICENSE-2.0
|
| -#
|
| -# Unless required by applicable law or agreed to in writing, software
|
| -# distributed under the License is distributed on an "AS IS" BASIS,
|
| -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| -# See the License for the specific language governing permissions and
|
| -# limitations under the License.
|
| -"""Media helper functions and classes for Google Cloud Storage JSON API."""
|
| -
|
| -from __future__ import absolute_import
|
| -
|
| -import copy
|
| -import cStringIO
|
| -import httplib
|
| -import logging
|
| -import socket
|
| -import types
|
| -import urlparse
|
| -
|
| -from apitools.base.py import exceptions as apitools_exceptions
|
| -import httplib2
|
| -from httplib2 import parse_uri
|
| -
|
| -from gslib.cloud_api import BadRequestException
|
| -from gslib.progress_callback import ProgressCallbackWithBackoff
|
| -from gslib.util import SSL_TIMEOUT
|
| -from gslib.util import TRANSFER_BUFFER_SIZE
|
| -
|
| -
|
| -class BytesTransferredContainer(object):
|
| - """Container class for passing number of bytes transferred to lower layers.
|
| -
|
| - For resumed transfers or connection rebuilds in the middle of a transfer, we
|
| - need to rebuild the connection class with how much we've transferred so far.
|
| - For uploads, we don't know the total number of bytes uploaded until we've
|
| - queried the server, but we need to create the connection class to pass to
|
| - httplib2 before we can query the server. This container object allows us to
|
| - pass a reference into Upload/DownloadCallbackConnection.
|
| - """
|
| -
|
| - def __init__(self):
|
| - self.__bytes_transferred = 0
|
| -
|
| - @property
|
| - def bytes_transferred(self):
|
| - return self.__bytes_transferred
|
| -
|
| - @bytes_transferred.setter
|
| - def bytes_transferred(self, value):
|
| - self.__bytes_transferred = value
|
| -
|
| -
|
| -class UploadCallbackConnectionClassFactory(object):
|
| - """Creates a class that can override an httplib2 connection.
|
| -
|
| - This is used to provide progress callbacks and disable dumping the upload
|
| - payload during debug statements. It can later be used to provide on-the-fly
|
| - hash digestion during upload.
|
| - """
|
| -
|
| - def __init__(self, bytes_uploaded_container,
|
| - buffer_size=TRANSFER_BUFFER_SIZE,
|
| - total_size=0, progress_callback=None):
|
| - self.bytes_uploaded_container = bytes_uploaded_container
|
| - self.buffer_size = buffer_size
|
| - self.total_size = total_size
|
| - self.progress_callback = progress_callback
|
| -
|
| - def GetConnectionClass(self):
|
| - """Returns a connection class that overrides send."""
|
| - outer_bytes_uploaded_container = self.bytes_uploaded_container
|
| - outer_buffer_size = self.buffer_size
|
| - outer_total_size = self.total_size
|
| - outer_progress_callback = self.progress_callback
|
| -
|
| - class UploadCallbackConnection(httplib2.HTTPSConnectionWithTimeout):
|
| - """Connection class override for uploads."""
|
| - bytes_uploaded_container = outer_bytes_uploaded_container
|
| - # After we instantiate this class, apitools will check with the server
|
| - # to find out how many bytes remain for a resumable upload. This allows
|
| - # us to update our progress once based on that number.
|
| - processed_initial_bytes = False
|
| - GCS_JSON_BUFFER_SIZE = outer_buffer_size
|
| - callback_processor = None
|
| - size = outer_total_size
|
| -
|
| - def __init__(self, *args, **kwargs):
|
| - kwargs['timeout'] = SSL_TIMEOUT
|
| - httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs)
|
| -
|
| - def send(self, data):
|
| - """Overrides HTTPConnection.send."""
|
| - if not self.processed_initial_bytes:
|
| - self.processed_initial_bytes = True
|
| - if outer_progress_callback:
|
| - self.callback_processor = ProgressCallbackWithBackoff(
|
| - outer_total_size, outer_progress_callback)
|
| - self.callback_processor.Progress(
|
| - self.bytes_uploaded_container.bytes_transferred)
|
| - # httplib.HTTPConnection.send accepts either a string or a file-like
|
| - # object (anything that implements read()).
|
| - if isinstance(data, basestring):
|
| - full_buffer = cStringIO.StringIO(data)
|
| - else:
|
| - full_buffer = data
|
| - partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE)
|
| - while partial_buffer:
|
| - httplib2.HTTPSConnectionWithTimeout.send(self, partial_buffer)
|
| - send_length = len(partial_buffer)
|
| - if self.callback_processor:
|
| - # This is the only place where gsutil has control over making a
|
| - # callback, but here we can't differentiate the metadata bytes
|
| - # (such as headers and OAuth2 refreshes) sent during an upload
|
| - # from the actual upload bytes, so we will actually report
|
| - # slightly more bytes than desired to the callback handler.
|
| - #
|
| - # One considered/rejected alternative is to move the callbacks
|
| - # into the HashingFileUploadWrapper which only processes reads on
|
| - # the bytes. This has the disadvantages of being removed from
|
| - # where we actually send the bytes and unnecessarily
|
| - # multi-purposing that class.
|
| - self.callback_processor.Progress(send_length)
|
| - partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE)
|
| -
|
| - return UploadCallbackConnection
|
| -
|
| -
|
| -def WrapUploadHttpRequest(upload_http):
|
| - """Wraps upload_http so we only use our custom connection_type on PUTs.
|
| -
|
| - POSTs are used to refresh oauth tokens, and we don't want to process the
|
| - data sent in those requests.
|
| -
|
| - Args:
|
| - upload_http: httplib2.Http instance to wrap
|
| - """
|
| - request_orig = upload_http.request
|
| - def NewRequest(uri, method='GET', body=None, headers=None,
|
| - redirections=httplib2.DEFAULT_MAX_REDIRECTS,
|
| - connection_type=None):
|
| - if method == 'PUT' or method == 'POST':
|
| - override_connection_type = connection_type
|
| - else:
|
| - override_connection_type = None
|
| - return request_orig(uri, method=method, body=body,
|
| - headers=headers, redirections=redirections,
|
| - connection_type=override_connection_type)
|
| - # Replace the request method with our own closure.
|
| - upload_http.request = NewRequest
|
| -
|
| -
|
| -class DownloadCallbackConnectionClassFactory(object):
|
| - """Creates a class that can override an httplib2 connection.
|
| -
|
| - This is used to provide progress callbacks, disable dumping the download
|
| - payload during debug statements, and provide on-the-fly hash digestion during
|
| - download. On-the-fly digestion is particularly important because httplib2
|
| - will decompress gzipped content on-the-fly, thus this class provides our
|
| - only opportunity to calculate the correct hash for an object that has a
|
| - gzip hash in the cloud.
|
| - """
|
| -
|
| - def __init__(self, bytes_downloaded_container,
|
| - buffer_size=TRANSFER_BUFFER_SIZE, total_size=0,
|
| - progress_callback=None, digesters=None):
|
| - self.buffer_size = buffer_size
|
| - self.total_size = total_size
|
| - self.progress_callback = progress_callback
|
| - self.digesters = digesters
|
| - self.bytes_downloaded_container = bytes_downloaded_container
|
| -
|
| - def GetConnectionClass(self):
|
| - """Returns a connection class that overrides getresponse."""
|
| -
|
| - class DownloadCallbackConnection(httplib2.HTTPSConnectionWithTimeout):
|
| - """Connection class override for downloads."""
|
| - outer_total_size = self.total_size
|
| - outer_digesters = self.digesters
|
| - outer_progress_callback = self.progress_callback
|
| - outer_bytes_downloaded_container = self.bytes_downloaded_container
|
| - processed_initial_bytes = False
|
| - callback_processor = None
|
| -
|
| - def __init__(self, *args, **kwargs):
|
| - kwargs['timeout'] = SSL_TIMEOUT
|
| - httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs)
|
| -
|
| - def getresponse(self, buffering=False):
|
| - """Wraps an HTTPResponse to perform callbacks and hashing.
|
| -
|
| - In this function, self is a DownloadCallbackConnection.
|
| -
|
| - Args:
|
| - buffering: Unused. This function uses a local buffer.
|
| -
|
| - Returns:
|
| - HTTPResponse object with wrapped read function.
|
| - """
|
| - orig_response = httplib.HTTPConnection.getresponse(self)
|
| - if orig_response.status not in (httplib.OK, httplib.PARTIAL_CONTENT):
|
| - return orig_response
|
| - orig_read_func = orig_response.read
|
| -
|
| - def read(amt=None): # pylint: disable=invalid-name
|
| - """Overrides HTTPConnection.getresponse.read.
|
| -
|
| - This function only supports reads of TRANSFER_BUFFER_SIZE or smaller.
|
| -
|
| - Args:
|
| - amt: Integer n where 0 < n <= TRANSFER_BUFFER_SIZE. This is a
|
| - keyword argument to match the read function it overrides,
|
| - but it is required.
|
| -
|
| - Returns:
|
| - Data read from HTTPConnection.
|
| - """
|
| - if not amt or amt > TRANSFER_BUFFER_SIZE:
|
| - raise BadRequestException(
|
| - 'Invalid HTTP read size %s during download, expected %s.' %
|
| - (amt, TRANSFER_BUFFER_SIZE))
|
| - else:
|
| - amt = amt or TRANSFER_BUFFER_SIZE
|
| -
|
| - if not self.processed_initial_bytes:
|
| - self.processed_initial_bytes = True
|
| - if self.outer_progress_callback:
|
| - self.callback_processor = ProgressCallbackWithBackoff(
|
| - self.outer_total_size, self.outer_progress_callback)
|
| - self.callback_processor.Progress(
|
| - self.outer_bytes_downloaded_container.bytes_transferred)
|
| -
|
| - data = orig_read_func(amt)
|
| - read_length = len(data)
|
| - if self.callback_processor:
|
| - self.callback_processor.Progress(read_length)
|
| - if self.outer_digesters:
|
| - for alg in self.outer_digesters:
|
| - self.outer_digesters[alg].update(data)
|
| - return data
|
| - orig_response.read = read
|
| -
|
| - return orig_response
|
| - return DownloadCallbackConnection
|
| -
|
| -
|
| -def WrapDownloadHttpRequest(download_http):
|
| - """Overrides download request functions for an httplib2.Http object.
|
| -
|
| - Args:
|
| - download_http: httplib2.Http.object to wrap / override.
|
| -
|
| - Returns:
|
| - Wrapped / overridden httplib2.Http object.
|
| - """
|
| -
|
| - # httplib2 has a bug https://code.google.com/p/httplib2/issues/detail?id=305
|
| - # where custom connection_type is not respected after redirects. This
|
| - # function is copied from httplib2 and overrides the request function so that
|
| - # the connection_type is properly passed through.
|
| - # pylint: disable=protected-access,g-inconsistent-quotes,unused-variable
|
| - # pylint: disable=g-equals-none,g-doc-return-or-yield
|
| - # pylint: disable=g-short-docstring-punctuation,g-doc-args
|
| - # pylint: disable=too-many-statements
|
| - def OverrideRequest(self, conn, host, absolute_uri, request_uri, method,
|
| - body, headers, redirections, cachekey):
|
| - """Do the actual request using the connection object.
|
| -
|
| - Also follow one level of redirects if necessary.
|
| - """
|
| -
|
| - auths = ([(auth.depth(request_uri), auth) for auth in self.authorizations
|
| - if auth.inscope(host, request_uri)])
|
| - auth = auths and sorted(auths)[0][1] or None
|
| - if auth:
|
| - auth.request(method, request_uri, headers, body)
|
| -
|
| - (response, content) = self._conn_request(conn, request_uri, method, body,
|
| - headers)
|
| -
|
| - if auth:
|
| - if auth.response(response, body):
|
| - auth.request(method, request_uri, headers, body)
|
| - (response, content) = self._conn_request(conn, request_uri, method,
|
| - body, headers)
|
| - response._stale_digest = 1
|
| -
|
| - if response.status == 401:
|
| - for authorization in self._auth_from_challenge(
|
| - host, request_uri, headers, response, content):
|
| - authorization.request(method, request_uri, headers, body)
|
| - (response, content) = self._conn_request(conn, request_uri, method,
|
| - body, headers)
|
| - if response.status != 401:
|
| - self.authorizations.append(authorization)
|
| - authorization.response(response, body)
|
| - break
|
| -
|
| - if (self.follow_all_redirects or (method in ["GET", "HEAD"])
|
| - or response.status == 303):
|
| - if self.follow_redirects and response.status in [300, 301, 302,
|
| - 303, 307]:
|
| - # Pick out the location header and basically start from the beginning
|
| - # remembering first to strip the ETag header and decrement our 'depth'
|
| - if redirections:
|
| - if not response.has_key('location') and response.status != 300:
|
| - raise httplib2.RedirectMissingLocation(
|
| - "Redirected but the response is missing a Location: header.",
|
| - response, content)
|
| - # Fix-up relative redirects (which violate an RFC 2616 MUST)
|
| - if response.has_key('location'):
|
| - location = response['location']
|
| - (scheme, authority, path, query, fragment) = parse_uri(location)
|
| - if authority == None:
|
| - response['location'] = urlparse.urljoin(absolute_uri, location)
|
| - if response.status == 301 and method in ["GET", "HEAD"]:
|
| - response['-x-permanent-redirect-url'] = response['location']
|
| - if not response.has_key('content-location'):
|
| - response['content-location'] = absolute_uri
|
| - httplib2._updateCache(headers, response, content, self.cache,
|
| - cachekey)
|
| - if headers.has_key('if-none-match'):
|
| - del headers['if-none-match']
|
| - if headers.has_key('if-modified-since'):
|
| - del headers['if-modified-since']
|
| - if ('authorization' in headers and
|
| - not self.forward_authorization_headers):
|
| - del headers['authorization']
|
| - if response.has_key('location'):
|
| - location = response['location']
|
| - old_response = copy.deepcopy(response)
|
| - if not old_response.has_key('content-location'):
|
| - old_response['content-location'] = absolute_uri
|
| - redirect_method = method
|
| - if response.status in [302, 303]:
|
| - redirect_method = "GET"
|
| - body = None
|
| - (response, content) = self.request(
|
| - location, redirect_method, body=body, headers=headers,
|
| - redirections=redirections-1,
|
| - connection_type=conn.__class__)
|
| - response.previous = old_response
|
| - else:
|
| - raise httplib2.RedirectLimit(
|
| - "Redirected more times than redirection_limit allows.",
|
| - response, content)
|
| - elif response.status in [200, 203] and method in ["GET", "HEAD"]:
|
| - # Don't cache 206's since we aren't going to handle byte range
|
| - # requests
|
| - if not response.has_key('content-location'):
|
| - response['content-location'] = absolute_uri
|
| - httplib2._updateCache(headers, response, content, self.cache,
|
| - cachekey)
|
| -
|
| - return (response, content)
|
| -
|
| - # Wrap download_http so we do not use our custom connection_type
|
| - # on POSTS, which are used to refresh oauth tokens. We don't want to
|
| - # process the data received in those requests.
|
| - request_orig = download_http.request
|
| - def NewRequest(uri, method='GET', body=None, headers=None,
|
| - redirections=httplib2.DEFAULT_MAX_REDIRECTS,
|
| - connection_type=None):
|
| - if method == 'POST':
|
| - return request_orig(uri, method=method, body=body,
|
| - headers=headers, redirections=redirections,
|
| - connection_type=None)
|
| - else:
|
| - return request_orig(uri, method=method, body=body,
|
| - headers=headers, redirections=redirections,
|
| - connection_type=connection_type)
|
| -
|
| - # Replace the request methods with our own closures.
|
| - download_http._request = types.MethodType(OverrideRequest, download_http)
|
| - download_http.request = NewRequest
|
| -
|
| - return download_http
|
| -
|
| -
|
| -class HttpWithNoRetries(httplib2.Http):
|
| - """httplib2.Http variant that does not retry.
|
| -
|
| - httplib2 automatically retries requests according to httplib2.RETRIES, but
|
| - in certain cases httplib2 ignores the RETRIES value and forces a retry.
|
| - Because httplib2 does not handle the case where the underlying request body
|
| - is a stream, a retry may cause a non-idempotent write as the stream is
|
| - partially consumed and not reset before the retry occurs.
|
| -
|
| - Here we override _conn_request to disable retries unequivocally, so that
|
| - uploads may be retried at higher layers that properly handle stream request
|
| - bodies.
|
| - """
|
| -
|
| - def _conn_request(self, conn, request_uri, method, body, headers): # pylint: disable=too-many-statements
|
| -
|
| - try:
|
| - if hasattr(conn, 'sock') and conn.sock is None:
|
| - conn.connect()
|
| - conn.request(method, request_uri, body, headers)
|
| - except socket.timeout:
|
| - raise
|
| - except socket.gaierror:
|
| - conn.close()
|
| - raise httplib2.ServerNotFoundError(
|
| - 'Unable to find the server at %s' % conn.host)
|
| - except httplib2.ssl_SSLError:
|
| - conn.close()
|
| - raise
|
| - except socket.error, e:
|
| - err = 0
|
| - if hasattr(e, 'args'):
|
| - err = getattr(e, 'args')[0]
|
| - else:
|
| - err = e.errno
|
| - if err == httplib2.errno.ECONNREFUSED: # Connection refused
|
| - raise
|
| - except httplib.HTTPException:
|
| - conn.close()
|
| - raise
|
| - try:
|
| - response = conn.getresponse()
|
| - except (socket.error, httplib.HTTPException):
|
| - conn.close()
|
| - raise
|
| - else:
|
| - content = ''
|
| - if method == 'HEAD':
|
| - conn.close()
|
| - else:
|
| - content = response.read()
|
| - response = httplib2.Response(response)
|
| - if method != 'HEAD':
|
| - # pylint: disable=protected-access
|
| - content = httplib2._decompressContent(response, content)
|
| - return (response, content)
|
| -
|
| -
|
| -class HttpWithDownloadStream(httplib2.Http):
|
| - """httplib2.Http variant that only pushes bytes through a stream.
|
| -
|
| - httplib2 handles media by storing entire chunks of responses in memory, which
|
| - is undesirable particularly when multiple instances are used during
|
| - multi-threaded/multi-process copy. This class copies and then overrides some
|
| - httplib2 functions to use a streaming copy approach that uses small memory
|
| - buffers.
|
| -
|
| - Also disables httplib2 retries (for reasons stated in the HttpWithNoRetries
|
| - class doc).
|
| - """
|
| -
|
| - def __init__(self, stream=None, *args, **kwds):
|
| - if stream is None:
|
| - raise apitools_exceptions.InvalidUserInputError(
|
| - 'Cannot create HttpWithDownloadStream with no stream')
|
| - self._stream = stream
|
| - self._logger = logging.getLogger()
|
| - super(HttpWithDownloadStream, self).__init__(*args, **kwds)
|
| -
|
| - @property
|
| - def stream(self):
|
| - return self._stream
|
| -
|
| - def _conn_request(self, conn, request_uri, method, body, headers): # pylint: disable=too-many-statements
|
| - try:
|
| - if hasattr(conn, 'sock') and conn.sock is None:
|
| - conn.connect()
|
| - conn.request(method, request_uri, body, headers)
|
| - except socket.timeout:
|
| - raise
|
| - except socket.gaierror:
|
| - conn.close()
|
| - raise httplib2.ServerNotFoundError(
|
| - 'Unable to find the server at %s' % conn.host)
|
| - except httplib2.ssl_SSLError:
|
| - conn.close()
|
| - raise
|
| - except socket.error, e:
|
| - err = 0
|
| - if hasattr(e, 'args'):
|
| - err = getattr(e, 'args')[0]
|
| - else:
|
| - err = e.errno
|
| - if err == httplib2.errno.ECONNREFUSED: # Connection refused
|
| - raise
|
| - except httplib.HTTPException:
|
| - # Just because the server closed the connection doesn't apparently mean
|
| - # that the server didn't send a response.
|
| - conn.close()
|
| - raise
|
| - try:
|
| - response = conn.getresponse()
|
| - except (socket.error, httplib.HTTPException):
|
| - conn.close()
|
| - raise
|
| - else:
|
| - content = ''
|
| - if method == 'HEAD':
|
| - conn.close()
|
| - response = httplib2.Response(response)
|
| - else:
|
| - if response.status in (httplib.OK, httplib.PARTIAL_CONTENT):
|
| - content_length = None
|
| - if hasattr(response, 'msg'):
|
| - content_length = response.getheader('content-length')
|
| - http_stream = response
|
| - bytes_read = 0
|
| - while True:
|
| - new_data = http_stream.read(TRANSFER_BUFFER_SIZE)
|
| - if new_data:
|
| - self.stream.write(new_data)
|
| - bytes_read += len(new_data)
|
| - else:
|
| - break
|
| -
|
| - if (content_length is not None and
|
| - long(bytes_read) != long(content_length)):
|
| - # The input stream terminated before we were able to read the
|
| - # entire contents, possibly due to a network condition. Set
|
| - # content-length to indicate how many bytes we actually read.
|
| - self._logger.log(
|
| - logging.DEBUG, 'Only got %s bytes out of content-length %s '
|
| - 'for request URI %s. Resetting content-length to match '
|
| - 'bytes read.', bytes_read, content_length, request_uri)
|
| - response.msg['content-length'] = str(bytes_read)
|
| - response = httplib2.Response(response)
|
| - else:
|
| - # We fall back to the current httplib2 behavior if we're
|
| - # not processing bytes (eg it's a redirect).
|
| - content = response.read()
|
| - response = httplib2.Response(response)
|
| - # pylint: disable=protected-access
|
| - content = httplib2._decompressContent(response, content)
|
| - return (response, content)
|
|
|