Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(94)

Unified Diff: tools/telemetry/third_party/gsutil/third_party/apitools/apitools/base/py/transfer.py

Issue 1260493004: Revert "Add gsutil 4.13 to telemetry/third_party" (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutil/third_party/apitools/apitools/base/py/transfer.py
diff --git a/tools/telemetry/third_party/gsutil/third_party/apitools/apitools/base/py/transfer.py b/tools/telemetry/third_party/gsutil/third_party/apitools/apitools/base/py/transfer.py
deleted file mode 100644
index 35a4774a6fb7cd49c40ad42bbfd89e5356147398..0000000000000000000000000000000000000000
--- a/tools/telemetry/third_party/gsutil/third_party/apitools/apitools/base/py/transfer.py
+++ /dev/null
@@ -1,986 +0,0 @@
-#!/usr/bin/env python
-"""Upload and download support for apitools."""
-from __future__ import print_function
-
-import email.generator as email_generator
-import email.mime.multipart as mime_multipart
-import email.mime.nonmultipart as mime_nonmultipart
-import io
-import json
-import mimetypes
-import os
-import threading
-
-import six
-from six.moves import http_client
-
-from apitools.base.py import buffered_stream
-from apitools.base.py import exceptions
-from apitools.base.py import http_wrapper
-from apitools.base.py import stream_slice
-from apitools.base.py import util
-
-__all__ = [
- 'Download',
- 'Upload',
- 'RESUMABLE_UPLOAD',
- 'SIMPLE_UPLOAD',
- 'DownloadProgressPrinter',
- 'DownloadCompletePrinter',
- 'UploadProgressPrinter',
- 'UploadCompletePrinter',
-]
-
-_RESUMABLE_UPLOAD_THRESHOLD = 5 << 20
-SIMPLE_UPLOAD = 'simple'
-RESUMABLE_UPLOAD = 'resumable'
-
-
-def DownloadProgressPrinter(response, unused_download):
- """Print download progress based on response."""
- if 'content-range' in response.info:
- print('Received %s' % response.info['content-range'])
- else:
- print('Received %d bytes' % response.length)
-
-
-def DownloadCompletePrinter(unused_response, unused_download):
- """Print information about a completed download."""
- print('Download complete')
-
-
-def UploadProgressPrinter(response, unused_upload):
- """Print upload progress based on response."""
- print('Sent %s' % response.info['range'])
-
-
-def UploadCompletePrinter(unused_response, unused_upload):
- """Print information about a completed upload."""
- print('Upload complete')
-
-
-class _Transfer(object):
-
- """Generic bits common to Uploads and Downloads."""
-
- def __init__(self, stream, close_stream=False, chunksize=None,
- auto_transfer=True, http=None, num_retries=5):
- self.__bytes_http = None
- self.__close_stream = close_stream
- self.__http = http
- self.__stream = stream
- self.__url = None
-
- self.__num_retries = 5
- # Let the @property do validation
- self.num_retries = num_retries
-
- self.retry_func = (
- http_wrapper.HandleExceptionsAndRebuildHttpConnections)
- self.auto_transfer = auto_transfer
- self.chunksize = chunksize or 1048576
-
- def __repr__(self):
- return str(self)
-
- @property
- def close_stream(self):
- return self.__close_stream
-
- @property
- def http(self):
- return self.__http
-
- @property
- def bytes_http(self):
- return self.__bytes_http or self.http
-
- @bytes_http.setter
- def bytes_http(self, value):
- self.__bytes_http = value
-
- @property
- def num_retries(self):
- return self.__num_retries
-
- @num_retries.setter
- def num_retries(self, value):
- util.Typecheck(value, six.integer_types)
- if value < 0:
- raise exceptions.InvalidDataError(
- 'Cannot have negative value for num_retries')
- self.__num_retries = value
-
- @property
- def stream(self):
- return self.__stream
-
- @property
- def url(self):
- return self.__url
-
- def _Initialize(self, http, url):
- """Initialize this download by setting self.http and self.url.
-
- We want the user to be able to override self.http by having set
- the value in the constructor; in that case, we ignore the provided
- http.
-
- Args:
- http: An httplib2.Http instance or None.
- url: The url for this transfer.
-
- Returns:
- None. Initializes self.
- """
- self.EnsureUninitialized()
- if self.http is None:
- self.__http = http or http_wrapper.GetHttp()
- self.__url = url
-
- @property
- def initialized(self):
- return self.url is not None and self.http is not None
-
- @property
- def _type_name(self):
- return type(self).__name__
-
- def EnsureInitialized(self):
- if not self.initialized:
- raise exceptions.TransferInvalidError(
- 'Cannot use uninitialized %s', self._type_name)
-
- def EnsureUninitialized(self):
- if self.initialized:
- raise exceptions.TransferInvalidError(
- 'Cannot re-initialize %s', self._type_name)
-
- def __del__(self):
- if self.__close_stream:
- self.__stream.close()
-
- def _ExecuteCallback(self, callback, response):
- # TODO(craigcitro): Push these into a queue.
- if callback is not None:
- threading.Thread(target=callback, args=(response, self)).start()
-
-
-class Download(_Transfer):
-
- """Data for a single download.
-
- Public attributes:
- chunksize: default chunksize to use for transfers.
- """
- _ACCEPTABLE_STATUSES = set((
- http_client.OK,
- http_client.NO_CONTENT,
- http_client.PARTIAL_CONTENT,
- http_client.REQUESTED_RANGE_NOT_SATISFIABLE,
- ))
- _REQUIRED_SERIALIZATION_KEYS = set((
- 'auto_transfer', 'progress', 'total_size', 'url'))
-
- def __init__(self, stream, progress_callback=None, finish_callback=None,
- **kwds):
- total_size = kwds.pop('total_size', None)
- super(Download, self).__init__(stream, **kwds)
- self.__initial_response = None
- self.__progress = 0
- self.__total_size = total_size
- self.__encoding = None
-
- self.progress_callback = progress_callback
- self.finish_callback = finish_callback
-
- @property
- def progress(self):
- return self.__progress
-
- @property
- def encoding(self):
- return self.__encoding
-
- @classmethod
- def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds):
- """Create a new download object from a filename."""
- path = os.path.expanduser(filename)
- if os.path.exists(path) and not overwrite:
- raise exceptions.InvalidUserInputError(
- 'File %s exists and overwrite not specified' % path)
- return cls(open(path, 'wb'), close_stream=True,
- auto_transfer=auto_transfer, **kwds)
-
- @classmethod
- def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds):
- """Create a new Download object from a stream."""
- return cls(stream, auto_transfer=auto_transfer, total_size=total_size,
- **kwds)
-
- @classmethod
- def FromData(cls, stream, json_data, http=None, auto_transfer=None,
- **kwds):
- """Create a new Download object from a stream and serialized data."""
- info = json.loads(json_data)
- missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
- if missing_keys:
- raise exceptions.InvalidDataError(
- 'Invalid serialization data, missing keys: %s' % (
- ', '.join(missing_keys)))
- download = cls.FromStream(stream, **kwds)
- if auto_transfer is not None:
- download.auto_transfer = auto_transfer
- else:
- download.auto_transfer = info['auto_transfer']
- setattr(download, '_Download__progress', info['progress'])
- setattr(download, '_Download__total_size', info['total_size'])
- download._Initialize( # pylint: disable=protected-access
- http, info['url'])
- return download
-
- @property
- def serialization_data(self):
- self.EnsureInitialized()
- return {
- 'auto_transfer': self.auto_transfer,
- 'progress': self.progress,
- 'total_size': self.total_size,
- 'url': self.url,
- }
-
- @property
- def total_size(self):
- return self.__total_size
-
- def __str__(self):
- if not self.initialized:
- return 'Download (uninitialized)'
- else:
- return 'Download with %d/%s bytes transferred from url %s' % (
- self.progress, self.total_size, self.url)
-
- def ConfigureRequest(self, http_request, url_builder):
- url_builder.query_params['alt'] = 'media'
- # TODO(craigcitro): We need to send range requests because by
- # default httplib2 stores entire reponses in memory. Override
- # httplib2's download method (as gsutil does) so that this is not
- # necessary.
- http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,)
-
- def __SetTotal(self, info):
- if 'content-range' in info:
- _, _, total = info['content-range'].rpartition('/')
- if total != '*':
- self.__total_size = int(total)
- # Note "total_size is None" means we don't know it; if no size
- # info was returned on our initial range request, that means we
- # have a 0-byte file. (That last statement has been verified
- # empirically, but is not clearly documented anywhere.)
- if self.total_size is None:
- self.__total_size = 0
-
- def InitializeDownload(self, http_request, http=None, client=None):
- """Initialize this download by making a request.
-
- Args:
- http_request: The HttpRequest to use to initialize this download.
- http: The httplib2.Http instance for this request.
- client: If provided, let this client process the final URL before
- sending any additional requests. If client is provided and
- http is not, client.http will be used instead.
- """
- self.EnsureUninitialized()
- if http is None and client is None:
- raise exceptions.UserError('Must provide client or http.')
- http = http or client.http
- if client is not None:
- http_request.url = client.FinalizeTransferUrl(http_request.url)
- url = http_request.url
- if self.auto_transfer:
- end_byte = self.__ComputeEndByte(0)
- self.__SetRangeHeader(http_request, 0, end_byte)
- response = http_wrapper.MakeRequest(
- self.bytes_http or http, http_request)
- if response.status_code not in self._ACCEPTABLE_STATUSES:
- raise exceptions.HttpError.FromResponse(response)
- self.__initial_response = response
- self.__SetTotal(response.info)
- url = response.info.get('content-location', response.request_url)
- if client is not None:
- url = client.FinalizeTransferUrl(url)
- self._Initialize(http, url)
- # Unless the user has requested otherwise, we want to just
- # go ahead and pump the bytes now.
- if self.auto_transfer:
- self.StreamInChunks()
-
- def __NormalizeStartEnd(self, start, end=None):
- if end is not None:
- if start < 0:
- raise exceptions.TransferInvalidError(
- 'Cannot have end index with negative start index')
- elif start >= self.total_size:
- raise exceptions.TransferInvalidError(
- 'Cannot have start index greater than total size')
- end = min(end, self.total_size - 1)
- if end < start:
- raise exceptions.TransferInvalidError(
- 'Range requested with end[%s] < start[%s]' % (end, start))
- return start, end
- else:
- if start < 0:
- start = max(0, start + self.total_size)
- return start, self.total_size
-
- def __SetRangeHeader(self, request, start, end=None):
- if start < 0:
- request.headers['range'] = 'bytes=%d' % start
- elif end is None:
- request.headers['range'] = 'bytes=%d-' % start
- else:
- request.headers['range'] = 'bytes=%d-%d' % (start, end)
-
- def __ComputeEndByte(self, start, end=None, use_chunks=True):
- """Compute the last byte to fetch for this request.
-
- This is all based on the HTTP spec for Range and
- Content-Range.
-
- Note that this is potentially confusing in several ways:
- * the value for the last byte is 0-based, eg "fetch 10 bytes
- from the beginning" would return 9 here.
- * if we have no information about size, and don't want to
- use the chunksize, we'll return None.
- See the tests for more examples.
-
- Args:
- start: byte to start at.
- end: (int or None, default: None) Suggested last byte.
- use_chunks: (bool, default: True) If False, ignore self.chunksize.
-
- Returns:
- Last byte to use in a Range header, or None.
-
- """
- end_byte = end
- if use_chunks:
- alternate = start + self.chunksize - 1
- end_byte = min(end_byte, alternate) if end_byte else alternate
- if self.total_size:
- alternate = self.total_size - 1
- end_byte = min(end_byte, alternate) if end_byte else alternate
- return end_byte
-
- def __GetChunk(self, start, end, additional_headers=None):
- """Retrieve a chunk, and return the full response."""
- self.EnsureInitialized()
- request = http_wrapper.Request(url=self.url)
- self.__SetRangeHeader(request, start, end=end)
- if additional_headers is not None:
- request.headers.update(additional_headers)
- return http_wrapper.MakeRequest(
- self.bytes_http, request, retry_func=self.retry_func,
- retries=self.num_retries)
-
- def __ProcessResponse(self, response):
- """Process response (by updating self and writing to self.stream)."""
- if response.status_code not in self._ACCEPTABLE_STATUSES:
- # We distinguish errors that mean we made a mistake in setting
- # up the transfer versus something we should attempt again.
- if response.status_code in (http_client.FORBIDDEN,
- http_client.NOT_FOUND):
- raise exceptions.HttpError.FromResponse(response)
- else:
- raise exceptions.TransferRetryError(response.content)
- if response.status_code in (http_client.OK,
- http_client.PARTIAL_CONTENT):
- self.stream.write(response.content)
- self.__progress += response.length
- if response.info and 'content-encoding' in response.info:
- # TODO(craigcitro): Handle the case where this changes over a
- # download.
- self.__encoding = response.info['content-encoding']
- elif response.status_code == http_client.NO_CONTENT:
- # It's important to write something to the stream for the case
- # of a 0-byte download to a file, as otherwise python won't
- # create the file.
- self.stream.write('')
- return response
-
- def GetRange(self, start, end=None, additional_headers=None,
- use_chunks=True):
- """Retrieve a given byte range from this download, inclusive.
-
- Range must be of one of these three forms:
- * 0 <= start, end = None: Fetch from start to the end of the file.
- * 0 <= start <= end: Fetch the bytes from start to end.
- * start < 0, end = None: Fetch the last -start bytes of the file.
-
- (These variations correspond to those described in the HTTP 1.1
- protocol for range headers in RFC 2616, sec. 14.35.1.)
-
- Args:
- start: (int) Where to start fetching bytes. (See above.)
- end: (int, optional) Where to stop fetching bytes. (See above.)
- additional_headers: (bool, optional) Any additional headers to
- pass with the request.
- use_chunks: (bool, default: True) If False, ignore self.chunksize
- and fetch this range in a single request.
-
- Returns:
- None. Streams bytes into self.stream.
- """
- self.EnsureInitialized()
- progress_end_normalized = False
- if self.total_size is not None:
- progress, end = self.__NormalizeStartEnd(start, end)
- progress_end_normalized = True
- else:
- progress = start
- while not progress_end_normalized or progress < end:
- end_byte = self.__ComputeEndByte(progress, end=end,
- use_chunks=use_chunks)
- response = self.__GetChunk(progress, end_byte,
- additional_headers=additional_headers)
- if not progress_end_normalized:
- self.__SetTotal(response.info)
- progress, end = self.__NormalizeStartEnd(start, end)
- progress_end_normalized = True
- response = self.__ProcessResponse(response)
- progress += response.length
- if not response:
- raise exceptions.TransferRetryError(
- 'Zero bytes unexpectedly returned in download response')
-
- def StreamInChunks(self, callback=None, finish_callback=None,
- additional_headers=None):
- """Stream the entire download in chunks."""
- self.StreamMedia(callback=callback, finish_callback=finish_callback,
- additional_headers=additional_headers,
- use_chunks=True)
-
- def StreamMedia(self, callback=None, finish_callback=None,
- additional_headers=None, use_chunks=True):
- """Stream the entire download.
-
- Args:
- callback: (default: None) Callback to call as each chunk is
- completed.
- finish_callback: (default: None) Callback to call when the
- download is complete.
- additional_headers: (default: None) Additional headers to
- include in fetching bytes.
- use_chunks: (bool, default: True) If False, ignore self.chunksize
- and stream this download in a single request.
-
- Returns:
- None. Streams bytes into self.stream.
- """
- callback = callback or self.progress_callback
- finish_callback = finish_callback or self.finish_callback
-
- self.EnsureInitialized()
- while True:
- if self.__initial_response is not None:
- response = self.__initial_response
- self.__initial_response = None
- else:
- end_byte = self.__ComputeEndByte(self.progress,
- use_chunks=use_chunks)
- response = self.__GetChunk(
- self.progress, end_byte,
- additional_headers=additional_headers)
- if self.total_size is None:
- self.__SetTotal(response.info)
- response = self.__ProcessResponse(response)
- self._ExecuteCallback(callback, response)
- if (response.status_code == http_client.OK or
- self.progress >= self.total_size):
- break
- self._ExecuteCallback(finish_callback, response)
-
-
-class Upload(_Transfer):
-
- """Data for a single Upload.
-
- Fields:
- stream: The stream to upload.
- mime_type: MIME type of the upload.
- total_size: (optional) Total upload size for the stream.
- close_stream: (default: False) Whether or not we should close the
- stream when finished with the upload.
- auto_transfer: (default: True) If True, stream all bytes as soon as
- the upload is created.
- """
- _REQUIRED_SERIALIZATION_KEYS = set((
- 'auto_transfer', 'mime_type', 'total_size', 'url'))
-
- def __init__(self, stream, mime_type, total_size=None, http=None,
- close_stream=False, chunksize=None, auto_transfer=True,
- progress_callback=None, finish_callback=None,
- **kwds):
- super(Upload, self).__init__(
- stream, close_stream=close_stream, chunksize=chunksize,
- auto_transfer=auto_transfer, http=http, **kwds)
- self.__complete = False
- self.__final_response = None
- self.__mime_type = mime_type
- self.__progress = 0
- self.__server_chunk_granularity = None
- self.__strategy = None
- self.__total_size = None
-
- self.progress_callback = progress_callback
- self.finish_callback = finish_callback
- self.total_size = total_size
-
- @property
- def progress(self):
- return self.__progress
-
- @classmethod
- def FromFile(cls, filename, mime_type=None, auto_transfer=True, **kwds):
- """Create a new Upload object from a filename."""
- path = os.path.expanduser(filename)
- if not os.path.exists(path):
- raise exceptions.NotFoundError('Could not find file %s' % path)
- if not mime_type:
- mime_type, _ = mimetypes.guess_type(path)
- if mime_type is None:
- raise exceptions.InvalidUserInputError(
- 'Could not guess mime type for %s' % path)
- size = os.stat(path).st_size
- return cls(open(path, 'rb'), mime_type, total_size=size,
- close_stream=True, auto_transfer=auto_transfer, **kwds)
-
- @classmethod
- def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True,
- **kwds):
- """Create a new Upload object from a stream."""
- if mime_type is None:
- raise exceptions.InvalidUserInputError(
- 'No mime_type specified for stream')
- return cls(stream, mime_type, total_size=total_size,
- close_stream=False, auto_transfer=auto_transfer, **kwds)
-
- @classmethod
- def FromData(cls, stream, json_data, http, auto_transfer=None, **kwds):
- """Create a new Upload of stream from serialized json_data and http."""
- info = json.loads(json_data)
- missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
- if missing_keys:
- raise exceptions.InvalidDataError(
- 'Invalid serialization data, missing keys: %s' % (
- ', '.join(missing_keys)))
- if 'total_size' in kwds:
- raise exceptions.InvalidUserInputError(
- 'Cannot override total_size on serialized Upload')
- upload = cls.FromStream(stream, info['mime_type'],
- total_size=info.get('total_size'), **kwds)
- if isinstance(stream, io.IOBase) and not stream.seekable():
- raise exceptions.InvalidUserInputError(
- 'Cannot restart resumable upload on non-seekable stream')
- if auto_transfer is not None:
- upload.auto_transfer = auto_transfer
- else:
- upload.auto_transfer = info['auto_transfer']
- upload.strategy = RESUMABLE_UPLOAD
- upload._Initialize( # pylint: disable=protected-access
- http, info['url'])
- upload.RefreshResumableUploadState()
- upload.EnsureInitialized()
- if upload.auto_transfer:
- upload.StreamInChunks()
- return upload
-
- @property
- def serialization_data(self):
- self.EnsureInitialized()
- if self.strategy != RESUMABLE_UPLOAD:
- raise exceptions.InvalidDataError(
- 'Serialization only supported for resumable uploads')
- return {
- 'auto_transfer': self.auto_transfer,
- 'mime_type': self.mime_type,
- 'total_size': self.total_size,
- 'url': self.url,
- }
-
- @property
- def complete(self):
- return self.__complete
-
- @property
- def mime_type(self):
- return self.__mime_type
-
- def __str__(self):
- if not self.initialized:
- return 'Upload (uninitialized)'
- else:
- return 'Upload with %d/%s bytes transferred for url %s' % (
- self.progress, self.total_size or '???', self.url)
-
- @property
- def strategy(self):
- return self.__strategy
-
- @strategy.setter
- def strategy(self, value):
- if value not in (SIMPLE_UPLOAD, RESUMABLE_UPLOAD):
- raise exceptions.UserError((
- 'Invalid value "%s" for upload strategy, must be one of '
- '"simple" or "resumable".') % value)
- self.__strategy = value
-
- @property
- def total_size(self):
- return self.__total_size
-
- @total_size.setter
- def total_size(self, value):
- self.EnsureUninitialized()
- self.__total_size = value
-
- def __SetDefaultUploadStrategy(self, upload_config, http_request):
- """Determine and set the default upload strategy for this upload.
-
- We generally prefer simple or multipart, unless we're forced to
- use resumable. This happens when any of (1) the upload is too
- large, (2) the simple endpoint doesn't support multipart requests
- and we have metadata, or (3) there is no simple upload endpoint.
-
- Args:
- upload_config: Configuration for the upload endpoint.
- http_request: The associated http request.
-
- Returns:
- None.
- """
- if self.strategy is not None:
- return
- strategy = SIMPLE_UPLOAD
- if (self.total_size is not None and
- self.total_size > _RESUMABLE_UPLOAD_THRESHOLD):
- strategy = RESUMABLE_UPLOAD
- if http_request.body and not upload_config.simple_multipart:
- strategy = RESUMABLE_UPLOAD
- if not upload_config.simple_path:
- strategy = RESUMABLE_UPLOAD
- self.strategy = strategy
-
- def ConfigureRequest(self, upload_config, http_request, url_builder):
- """Configure the request and url for this upload."""
- # Validate total_size vs. max_size
- if (self.total_size and upload_config.max_size and
- self.total_size > upload_config.max_size):
- raise exceptions.InvalidUserInputError(
- 'Upload too big: %s larger than max size %s' % (
- self.total_size, upload_config.max_size))
- # Validate mime type
- if not util.AcceptableMimeType(upload_config.accept, self.mime_type):
- raise exceptions.InvalidUserInputError(
- 'MIME type %s does not match any accepted MIME ranges %s' % (
- self.mime_type, upload_config.accept))
-
- self.__SetDefaultUploadStrategy(upload_config, http_request)
- if self.strategy == SIMPLE_UPLOAD:
- url_builder.relative_path = upload_config.simple_path
- if http_request.body:
- url_builder.query_params['uploadType'] = 'multipart'
- self.__ConfigureMultipartRequest(http_request)
- else:
- url_builder.query_params['uploadType'] = 'media'
- self.__ConfigureMediaRequest(http_request)
- else:
- url_builder.relative_path = upload_config.resumable_path
- url_builder.query_params['uploadType'] = 'resumable'
- self.__ConfigureResumableRequest(http_request)
-
- def __ConfigureMediaRequest(self, http_request):
- """Configure http_request as a simple request for this upload."""
- http_request.headers['content-type'] = self.mime_type
- http_request.body = self.stream.read()
- http_request.loggable_body = '<media body>'
-
- def __ConfigureMultipartRequest(self, http_request):
- """Configure http_request as a multipart request for this upload."""
- # This is a multipart/related upload.
- msg_root = mime_multipart.MIMEMultipart('related')
- # msg_root should not write out its own headers
- setattr(msg_root, '_write_headers', lambda self: None)
-
- # attach the body as one part
- msg = mime_nonmultipart.MIMENonMultipart(
- *http_request.headers['content-type'].split('/'))
- msg.set_payload(http_request.body)
- msg_root.attach(msg)
-
- # attach the media as the second part
- msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/'))
- msg['Content-Transfer-Encoding'] = 'binary'
- msg.set_payload(self.stream.read())
- msg_root.attach(msg)
-
- # NOTE: We encode the body, but can't use
- # `email.message.Message.as_string` because it prepends
- # `> ` to `From ` lines.
- # NOTE: We must use six.StringIO() instead of io.StringIO() since the
- # `email` library uses cStringIO in Py2 and io.StringIO in Py3.
- fp = six.StringIO()
- g = email_generator.Generator(fp, mangle_from_=False)
- g.flatten(msg_root, unixfrom=False)
- http_request.body = fp.getvalue()
-
- multipart_boundary = msg_root.get_boundary()
- http_request.headers['content-type'] = (
- 'multipart/related; boundary=%r' % multipart_boundary)
-
- body_components = http_request.body.split(multipart_boundary)
- headers, _, _ = body_components[-2].partition('\n\n')
- body_components[-2] = '\n\n'.join([headers, '<media body>\n\n--'])
- http_request.loggable_body = multipart_boundary.join(body_components)
-
- def __ConfigureResumableRequest(self, http_request):
- http_request.headers['X-Upload-Content-Type'] = self.mime_type
- if self.total_size is not None:
- http_request.headers[
- 'X-Upload-Content-Length'] = str(self.total_size)
-
- def RefreshResumableUploadState(self):
- """Talk to the server and refresh the state of this resumable upload.
-
- Returns:
- Response if the upload is complete.
- """
- if self.strategy != RESUMABLE_UPLOAD:
- return
- self.EnsureInitialized()
- refresh_request = http_wrapper.Request(
- url=self.url, http_method='PUT',
- headers={'Content-Range': 'bytes */*'})
- refresh_response = http_wrapper.MakeRequest(
- self.http, refresh_request, redirections=0,
- retries=self.num_retries)
- range_header = self._GetRangeHeaderFromResponse(refresh_response)
- if refresh_response.status_code in (http_client.OK,
- http_client.CREATED):
- self.__complete = True
- self.__progress = self.total_size
- self.stream.seek(self.progress)
- # If we're finished, the refresh response will contain the metadata
- # originally requested. Cache it so it can be returned in
- # StreamInChunks.
- self.__final_response = refresh_response
- elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE:
- if range_header is None:
- self.__progress = 0
- else:
- self.__progress = self.__GetLastByte(range_header) + 1
- self.stream.seek(self.progress)
- else:
- raise exceptions.HttpError.FromResponse(refresh_response)
-
- def _GetRangeHeaderFromResponse(self, response):
- return response.info.get('Range', response.info.get('range'))
-
- def InitializeUpload(self, http_request, http=None, client=None):
- """Initialize this upload from the given http_request."""
- if self.strategy is None:
- raise exceptions.UserError(
- 'No upload strategy set; did you call ConfigureRequest?')
- if http is None and client is None:
- raise exceptions.UserError('Must provide client or http.')
- if self.strategy != RESUMABLE_UPLOAD:
- return
- http = http or client.http
- if client is not None:
- http_request.url = client.FinalizeTransferUrl(http_request.url)
- self.EnsureUninitialized()
- http_response = http_wrapper.MakeRequest(http, http_request,
- retries=self.num_retries)
- if http_response.status_code != http_client.OK:
- raise exceptions.HttpError.FromResponse(http_response)
-
- self.__server_chunk_granularity = http_response.info.get(
- 'X-Goog-Upload-Chunk-Granularity')
- url = http_response.info['location']
- if client is not None:
- url = client.FinalizeTransferUrl(url)
- self._Initialize(http, url)
-
- # Unless the user has requested otherwise, we want to just
- # go ahead and pump the bytes now.
- if self.auto_transfer:
- return self.StreamInChunks()
-
- def __GetLastByte(self, range_header):
- _, _, end = range_header.partition('-')
- # TODO(craigcitro): Validate start == 0?
- return int(end)
-
- def __ValidateChunksize(self, chunksize=None):
- if self.__server_chunk_granularity is None:
- return
- chunksize = chunksize or self.chunksize
- if chunksize % self.__server_chunk_granularity:
- raise exceptions.ConfigurationValueError(
- 'Server requires chunksize to be a multiple of %d',
- self.__server_chunk_granularity)
-
- def __StreamMedia(self, callback=None, finish_callback=None,
- additional_headers=None, use_chunks=True):
- """Helper function for StreamMedia / StreamInChunks."""
- if self.strategy != RESUMABLE_UPLOAD:
- raise exceptions.InvalidUserInputError(
- 'Cannot stream non-resumable upload')
- callback = callback or self.progress_callback
- finish_callback = finish_callback or self.finish_callback
- # final_response is set if we resumed an already-completed upload.
- response = self.__final_response
- send_func = self.__SendChunk if use_chunks else self.__SendMediaBody
- if use_chunks:
- self.__ValidateChunksize(self.chunksize)
- self.EnsureInitialized()
- while not self.complete:
- response = send_func(self.stream.tell(),
- additional_headers=additional_headers)
- if response.status_code in (http_client.OK, http_client.CREATED):
- self.__complete = True
- break
- self.__progress = self.__GetLastByte(response.info['range'])
- if self.progress + 1 != self.stream.tell():
- # TODO(craigcitro): Add a better way to recover here.
- raise exceptions.CommunicationError(
- 'Failed to transfer all bytes in chunk, upload paused at '
- 'byte %d' % self.progress)
- self._ExecuteCallback(callback, response)
- if self.__complete and hasattr(self.stream, 'seek'):
- current_pos = self.stream.tell()
- self.stream.seek(0, os.SEEK_END)
- end_pos = self.stream.tell()
- self.stream.seek(current_pos)
- if current_pos != end_pos:
- raise exceptions.TransferInvalidError(
- 'Upload complete with %s additional bytes left in stream' %
- (int(end_pos) - int(current_pos)))
- self._ExecuteCallback(finish_callback, response)
- return response
-
- def StreamMedia(self, callback=None, finish_callback=None,
- additional_headers=None):
- """Send this resumable upload in a single request.
-
- Args:
- callback: Progress callback function with inputs
- (http_wrapper.Response, transfer.Upload)
- finish_callback: Final callback function with inputs
- (http_wrapper.Response, transfer.Upload)
- additional_headers: Dict of headers to include with the upload
- http_wrapper.Request.
-
- Returns:
- http_wrapper.Response of final response.
- """
- return self.__StreamMedia(
- callback=callback, finish_callback=finish_callback,
- additional_headers=additional_headers, use_chunks=False)
-
- def StreamInChunks(self, callback=None, finish_callback=None,
- additional_headers=None):
- """Send this (resumable) upload in chunks."""
- return self.__StreamMedia(
- callback=callback, finish_callback=finish_callback,
- additional_headers=additional_headers)
-
- def __SendMediaRequest(self, request, end):
- """Request helper function for SendMediaBody & SendChunk."""
- response = http_wrapper.MakeRequest(
- self.bytes_http, request, retry_func=self.retry_func,
- retries=self.num_retries)
- if response.status_code not in (http_client.OK, http_client.CREATED,
- http_wrapper.RESUME_INCOMPLETE):
- # We want to reset our state to wherever the server left us
- # before this failed request, and then raise.
- self.RefreshResumableUploadState()
- raise exceptions.HttpError.FromResponse(response)
- if response.status_code == http_wrapper.RESUME_INCOMPLETE:
- last_byte = self.__GetLastByte(
- self._GetRangeHeaderFromResponse(response))
- if last_byte + 1 != end:
- self.stream.seek(last_byte)
- return response
-
- def __SendMediaBody(self, start, additional_headers=None):
- """Send the entire media stream in a single request."""
- self.EnsureInitialized()
- if self.total_size is None:
- raise exceptions.TransferInvalidError(
- 'Total size must be known for SendMediaBody')
- body_stream = stream_slice.StreamSlice(
- self.stream, self.total_size - start)
-
- request = http_wrapper.Request(url=self.url, http_method='PUT',
- body=body_stream)
- request.headers['Content-Type'] = self.mime_type
- if start == self.total_size:
- # End of an upload with 0 bytes left to send; just finalize.
- range_string = 'bytes */%s' % self.total_size
- else:
- range_string = 'bytes %s-%s/%s' % (start, self.total_size - 1,
- self.total_size)
-
- request.headers['Content-Range'] = range_string
- if additional_headers:
- request.headers.update(additional_headers)
-
- return self.__SendMediaRequest(request, self.total_size)
-
- def __SendChunk(self, start, additional_headers=None):
- """Send the specified chunk."""
- self.EnsureInitialized()
- no_log_body = self.total_size is None
- if self.total_size is None:
- # For the streaming resumable case, we need to detect when
- # we're at the end of the stream.
- body_stream = buffered_stream.BufferedStream(
- self.stream, start, self.chunksize)
- end = body_stream.stream_end_position
- if body_stream.stream_exhausted:
- self.__total_size = end
- # TODO: Here, change body_stream from a stream to a string object,
- # which means reading a chunk into memory. This works around
- # https://code.google.com/p/httplib2/issues/detail?id=176 which can
- # cause httplib2 to skip bytes on 401's for file objects.
- # Rework this solution to be more general.
- body_stream = body_stream.read(self.chunksize)
- else:
- end = min(start + self.chunksize, self.total_size)
- body_stream = stream_slice.StreamSlice(self.stream, end - start)
- # TODO(craigcitro): Think about clearer errors on "no data in
- # stream".
- request = http_wrapper.Request(url=self.url, http_method='PUT',
- body=body_stream)
- request.headers['Content-Type'] = self.mime_type
- if no_log_body:
- # Disable logging of streaming body.
- # TODO: Remove no_log_body and rework as part of a larger logs
- # refactor.
- request.loggable_body = '<media body>'
- if self.total_size is None:
- # Streaming resumable upload case, unknown total size.
- range_string = 'bytes %s-%s/*' % (start, end - 1)
- elif end == start:
- # End of an upload with 0 bytes left to send; just finalize.
- range_string = 'bytes */%s' % self.total_size
- else:
- # Normal resumable upload case with known sizes.
- range_string = 'bytes %s-%s/%s' % (start, end - 1, self.total_size)
-
- request.headers['Content-Range'] = range_string
- if additional_headers:
- request.headers.update(additional_headers)
-
- return self.__SendMediaRequest(request, end)

Powered by Google App Engine
This is Rietveld 408576698