Index: tools/telemetry/third_party/gsutil/third_party/apitools/apitools/base/py/transfer.py |
diff --git a/tools/telemetry/third_party/gsutil/third_party/apitools/apitools/base/py/transfer.py b/tools/telemetry/third_party/gsutil/third_party/apitools/apitools/base/py/transfer.py |
deleted file mode 100644 |
index 35a4774a6fb7cd49c40ad42bbfd89e5356147398..0000000000000000000000000000000000000000 |
--- a/tools/telemetry/third_party/gsutil/third_party/apitools/apitools/base/py/transfer.py |
+++ /dev/null |
@@ -1,986 +0,0 @@ |
-#!/usr/bin/env python |
-"""Upload and download support for apitools.""" |
-from __future__ import print_function |
- |
-import email.generator as email_generator |
-import email.mime.multipart as mime_multipart |
-import email.mime.nonmultipart as mime_nonmultipart |
-import io |
-import json |
-import mimetypes |
-import os |
-import threading |
- |
-import six |
-from six.moves import http_client |
- |
-from apitools.base.py import buffered_stream |
-from apitools.base.py import exceptions |
-from apitools.base.py import http_wrapper |
-from apitools.base.py import stream_slice |
-from apitools.base.py import util |
- |
-__all__ = [ |
- 'Download', |
- 'Upload', |
- 'RESUMABLE_UPLOAD', |
- 'SIMPLE_UPLOAD', |
- 'DownloadProgressPrinter', |
- 'DownloadCompletePrinter', |
- 'UploadProgressPrinter', |
- 'UploadCompletePrinter', |
-] |
- |
-_RESUMABLE_UPLOAD_THRESHOLD = 5 << 20 |
-SIMPLE_UPLOAD = 'simple' |
-RESUMABLE_UPLOAD = 'resumable' |
- |
- |
-def DownloadProgressPrinter(response, unused_download): |
- """Print download progress based on response.""" |
- if 'content-range' in response.info: |
- print('Received %s' % response.info['content-range']) |
- else: |
- print('Received %d bytes' % response.length) |
- |
- |
-def DownloadCompletePrinter(unused_response, unused_download): |
- """Print information about a completed download.""" |
- print('Download complete') |
- |
- |
-def UploadProgressPrinter(response, unused_upload): |
- """Print upload progress based on response.""" |
- print('Sent %s' % response.info['range']) |
- |
- |
-def UploadCompletePrinter(unused_response, unused_upload): |
- """Print information about a completed upload.""" |
- print('Upload complete') |
- |
- |
-class _Transfer(object): |
- |
- """Generic bits common to Uploads and Downloads.""" |
- |
- def __init__(self, stream, close_stream=False, chunksize=None, |
- auto_transfer=True, http=None, num_retries=5): |
- self.__bytes_http = None |
- self.__close_stream = close_stream |
- self.__http = http |
- self.__stream = stream |
- self.__url = None |
- |
- self.__num_retries = 5 |
- # Let the @property do validation |
- self.num_retries = num_retries |
- |
- self.retry_func = ( |
- http_wrapper.HandleExceptionsAndRebuildHttpConnections) |
- self.auto_transfer = auto_transfer |
- self.chunksize = chunksize or 1048576 |
- |
- def __repr__(self): |
- return str(self) |
- |
- @property |
- def close_stream(self): |
- return self.__close_stream |
- |
- @property |
- def http(self): |
- return self.__http |
- |
- @property |
- def bytes_http(self): |
- return self.__bytes_http or self.http |
- |
- @bytes_http.setter |
- def bytes_http(self, value): |
- self.__bytes_http = value |
- |
- @property |
- def num_retries(self): |
- return self.__num_retries |
- |
- @num_retries.setter |
- def num_retries(self, value): |
- util.Typecheck(value, six.integer_types) |
- if value < 0: |
- raise exceptions.InvalidDataError( |
- 'Cannot have negative value for num_retries') |
- self.__num_retries = value |
- |
- @property |
- def stream(self): |
- return self.__stream |
- |
- @property |
- def url(self): |
- return self.__url |
- |
- def _Initialize(self, http, url): |
- """Initialize this download by setting self.http and self.url. |
- |
- We want the user to be able to override self.http by having set |
- the value in the constructor; in that case, we ignore the provided |
- http. |
- |
- Args: |
- http: An httplib2.Http instance or None. |
- url: The url for this transfer. |
- |
- Returns: |
- None. Initializes self. |
- """ |
- self.EnsureUninitialized() |
- if self.http is None: |
- self.__http = http or http_wrapper.GetHttp() |
- self.__url = url |
- |
- @property |
- def initialized(self): |
- return self.url is not None and self.http is not None |
- |
- @property |
- def _type_name(self): |
- return type(self).__name__ |
- |
- def EnsureInitialized(self): |
- if not self.initialized: |
- raise exceptions.TransferInvalidError( |
- 'Cannot use uninitialized %s', self._type_name) |
- |
- def EnsureUninitialized(self): |
- if self.initialized: |
- raise exceptions.TransferInvalidError( |
- 'Cannot re-initialize %s', self._type_name) |
- |
- def __del__(self): |
- if self.__close_stream: |
- self.__stream.close() |
- |
- def _ExecuteCallback(self, callback, response): |
- # TODO(craigcitro): Push these into a queue. |
- if callback is not None: |
- threading.Thread(target=callback, args=(response, self)).start() |
- |
- |
-class Download(_Transfer): |
- |
- """Data for a single download. |
- |
- Public attributes: |
- chunksize: default chunksize to use for transfers. |
- """ |
- _ACCEPTABLE_STATUSES = set(( |
- http_client.OK, |
- http_client.NO_CONTENT, |
- http_client.PARTIAL_CONTENT, |
- http_client.REQUESTED_RANGE_NOT_SATISFIABLE, |
- )) |
- _REQUIRED_SERIALIZATION_KEYS = set(( |
- 'auto_transfer', 'progress', 'total_size', 'url')) |
- |
- def __init__(self, stream, progress_callback=None, finish_callback=None, |
- **kwds): |
- total_size = kwds.pop('total_size', None) |
- super(Download, self).__init__(stream, **kwds) |
- self.__initial_response = None |
- self.__progress = 0 |
- self.__total_size = total_size |
- self.__encoding = None |
- |
- self.progress_callback = progress_callback |
- self.finish_callback = finish_callback |
- |
- @property |
- def progress(self): |
- return self.__progress |
- |
- @property |
- def encoding(self): |
- return self.__encoding |
- |
- @classmethod |
- def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds): |
- """Create a new download object from a filename.""" |
- path = os.path.expanduser(filename) |
- if os.path.exists(path) and not overwrite: |
- raise exceptions.InvalidUserInputError( |
- 'File %s exists and overwrite not specified' % path) |
- return cls(open(path, 'wb'), close_stream=True, |
- auto_transfer=auto_transfer, **kwds) |
- |
- @classmethod |
- def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds): |
- """Create a new Download object from a stream.""" |
- return cls(stream, auto_transfer=auto_transfer, total_size=total_size, |
- **kwds) |
- |
- @classmethod |
- def FromData(cls, stream, json_data, http=None, auto_transfer=None, |
- **kwds): |
- """Create a new Download object from a stream and serialized data.""" |
- info = json.loads(json_data) |
- missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) |
- if missing_keys: |
- raise exceptions.InvalidDataError( |
- 'Invalid serialization data, missing keys: %s' % ( |
- ', '.join(missing_keys))) |
- download = cls.FromStream(stream, **kwds) |
- if auto_transfer is not None: |
- download.auto_transfer = auto_transfer |
- else: |
- download.auto_transfer = info['auto_transfer'] |
- setattr(download, '_Download__progress', info['progress']) |
- setattr(download, '_Download__total_size', info['total_size']) |
- download._Initialize( # pylint: disable=protected-access |
- http, info['url']) |
- return download |
- |
- @property |
- def serialization_data(self): |
- self.EnsureInitialized() |
- return { |
- 'auto_transfer': self.auto_transfer, |
- 'progress': self.progress, |
- 'total_size': self.total_size, |
- 'url': self.url, |
- } |
- |
- @property |
- def total_size(self): |
- return self.__total_size |
- |
- def __str__(self): |
- if not self.initialized: |
- return 'Download (uninitialized)' |
- else: |
- return 'Download with %d/%s bytes transferred from url %s' % ( |
- self.progress, self.total_size, self.url) |
- |
- def ConfigureRequest(self, http_request, url_builder): |
- url_builder.query_params['alt'] = 'media' |
- # TODO(craigcitro): We need to send range requests because by |
- # default httplib2 stores entire reponses in memory. Override |
- # httplib2's download method (as gsutil does) so that this is not |
- # necessary. |
- http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,) |
- |
- def __SetTotal(self, info): |
- if 'content-range' in info: |
- _, _, total = info['content-range'].rpartition('/') |
- if total != '*': |
- self.__total_size = int(total) |
- # Note "total_size is None" means we don't know it; if no size |
- # info was returned on our initial range request, that means we |
- # have a 0-byte file. (That last statement has been verified |
- # empirically, but is not clearly documented anywhere.) |
- if self.total_size is None: |
- self.__total_size = 0 |
- |
- def InitializeDownload(self, http_request, http=None, client=None): |
- """Initialize this download by making a request. |
- |
- Args: |
- http_request: The HttpRequest to use to initialize this download. |
- http: The httplib2.Http instance for this request. |
- client: If provided, let this client process the final URL before |
- sending any additional requests. If client is provided and |
- http is not, client.http will be used instead. |
- """ |
- self.EnsureUninitialized() |
- if http is None and client is None: |
- raise exceptions.UserError('Must provide client or http.') |
- http = http or client.http |
- if client is not None: |
- http_request.url = client.FinalizeTransferUrl(http_request.url) |
- url = http_request.url |
- if self.auto_transfer: |
- end_byte = self.__ComputeEndByte(0) |
- self.__SetRangeHeader(http_request, 0, end_byte) |
- response = http_wrapper.MakeRequest( |
- self.bytes_http or http, http_request) |
- if response.status_code not in self._ACCEPTABLE_STATUSES: |
- raise exceptions.HttpError.FromResponse(response) |
- self.__initial_response = response |
- self.__SetTotal(response.info) |
- url = response.info.get('content-location', response.request_url) |
- if client is not None: |
- url = client.FinalizeTransferUrl(url) |
- self._Initialize(http, url) |
- # Unless the user has requested otherwise, we want to just |
- # go ahead and pump the bytes now. |
- if self.auto_transfer: |
- self.StreamInChunks() |
- |
- def __NormalizeStartEnd(self, start, end=None): |
- if end is not None: |
- if start < 0: |
- raise exceptions.TransferInvalidError( |
- 'Cannot have end index with negative start index') |
- elif start >= self.total_size: |
- raise exceptions.TransferInvalidError( |
- 'Cannot have start index greater than total size') |
- end = min(end, self.total_size - 1) |
- if end < start: |
- raise exceptions.TransferInvalidError( |
- 'Range requested with end[%s] < start[%s]' % (end, start)) |
- return start, end |
- else: |
- if start < 0: |
- start = max(0, start + self.total_size) |
- return start, self.total_size |
- |
- def __SetRangeHeader(self, request, start, end=None): |
- if start < 0: |
- request.headers['range'] = 'bytes=%d' % start |
- elif end is None: |
- request.headers['range'] = 'bytes=%d-' % start |
- else: |
- request.headers['range'] = 'bytes=%d-%d' % (start, end) |
- |
- def __ComputeEndByte(self, start, end=None, use_chunks=True): |
- """Compute the last byte to fetch for this request. |
- |
- This is all based on the HTTP spec for Range and |
- Content-Range. |
- |
- Note that this is potentially confusing in several ways: |
- * the value for the last byte is 0-based, eg "fetch 10 bytes |
- from the beginning" would return 9 here. |
- * if we have no information about size, and don't want to |
- use the chunksize, we'll return None. |
- See the tests for more examples. |
- |
- Args: |
- start: byte to start at. |
- end: (int or None, default: None) Suggested last byte. |
- use_chunks: (bool, default: True) If False, ignore self.chunksize. |
- |
- Returns: |
- Last byte to use in a Range header, or None. |
- |
- """ |
- end_byte = end |
- if use_chunks: |
- alternate = start + self.chunksize - 1 |
- end_byte = min(end_byte, alternate) if end_byte else alternate |
- if self.total_size: |
- alternate = self.total_size - 1 |
- end_byte = min(end_byte, alternate) if end_byte else alternate |
- return end_byte |
- |
- def __GetChunk(self, start, end, additional_headers=None): |
- """Retrieve a chunk, and return the full response.""" |
- self.EnsureInitialized() |
- request = http_wrapper.Request(url=self.url) |
- self.__SetRangeHeader(request, start, end=end) |
- if additional_headers is not None: |
- request.headers.update(additional_headers) |
- return http_wrapper.MakeRequest( |
- self.bytes_http, request, retry_func=self.retry_func, |
- retries=self.num_retries) |
- |
- def __ProcessResponse(self, response): |
- """Process response (by updating self and writing to self.stream).""" |
- if response.status_code not in self._ACCEPTABLE_STATUSES: |
- # We distinguish errors that mean we made a mistake in setting |
- # up the transfer versus something we should attempt again. |
- if response.status_code in (http_client.FORBIDDEN, |
- http_client.NOT_FOUND): |
- raise exceptions.HttpError.FromResponse(response) |
- else: |
- raise exceptions.TransferRetryError(response.content) |
- if response.status_code in (http_client.OK, |
- http_client.PARTIAL_CONTENT): |
- self.stream.write(response.content) |
- self.__progress += response.length |
- if response.info and 'content-encoding' in response.info: |
- # TODO(craigcitro): Handle the case where this changes over a |
- # download. |
- self.__encoding = response.info['content-encoding'] |
- elif response.status_code == http_client.NO_CONTENT: |
- # It's important to write something to the stream for the case |
- # of a 0-byte download to a file, as otherwise python won't |
- # create the file. |
- self.stream.write('') |
- return response |
- |
- def GetRange(self, start, end=None, additional_headers=None, |
- use_chunks=True): |
- """Retrieve a given byte range from this download, inclusive. |
- |
- Range must be of one of these three forms: |
- * 0 <= start, end = None: Fetch from start to the end of the file. |
- * 0 <= start <= end: Fetch the bytes from start to end. |
- * start < 0, end = None: Fetch the last -start bytes of the file. |
- |
- (These variations correspond to those described in the HTTP 1.1 |
- protocol for range headers in RFC 2616, sec. 14.35.1.) |
- |
- Args: |
- start: (int) Where to start fetching bytes. (See above.) |
- end: (int, optional) Where to stop fetching bytes. (See above.) |
- additional_headers: (bool, optional) Any additional headers to |
- pass with the request. |
- use_chunks: (bool, default: True) If False, ignore self.chunksize |
- and fetch this range in a single request. |
- |
- Returns: |
- None. Streams bytes into self.stream. |
- """ |
- self.EnsureInitialized() |
- progress_end_normalized = False |
- if self.total_size is not None: |
- progress, end = self.__NormalizeStartEnd(start, end) |
- progress_end_normalized = True |
- else: |
- progress = start |
- while not progress_end_normalized or progress < end: |
- end_byte = self.__ComputeEndByte(progress, end=end, |
- use_chunks=use_chunks) |
- response = self.__GetChunk(progress, end_byte, |
- additional_headers=additional_headers) |
- if not progress_end_normalized: |
- self.__SetTotal(response.info) |
- progress, end = self.__NormalizeStartEnd(start, end) |
- progress_end_normalized = True |
- response = self.__ProcessResponse(response) |
- progress += response.length |
- if not response: |
- raise exceptions.TransferRetryError( |
- 'Zero bytes unexpectedly returned in download response') |
- |
- def StreamInChunks(self, callback=None, finish_callback=None, |
- additional_headers=None): |
- """Stream the entire download in chunks.""" |
- self.StreamMedia(callback=callback, finish_callback=finish_callback, |
- additional_headers=additional_headers, |
- use_chunks=True) |
- |
- def StreamMedia(self, callback=None, finish_callback=None, |
- additional_headers=None, use_chunks=True): |
- """Stream the entire download. |
- |
- Args: |
- callback: (default: None) Callback to call as each chunk is |
- completed. |
- finish_callback: (default: None) Callback to call when the |
- download is complete. |
- additional_headers: (default: None) Additional headers to |
- include in fetching bytes. |
- use_chunks: (bool, default: True) If False, ignore self.chunksize |
- and stream this download in a single request. |
- |
- Returns: |
- None. Streams bytes into self.stream. |
- """ |
- callback = callback or self.progress_callback |
- finish_callback = finish_callback or self.finish_callback |
- |
- self.EnsureInitialized() |
- while True: |
- if self.__initial_response is not None: |
- response = self.__initial_response |
- self.__initial_response = None |
- else: |
- end_byte = self.__ComputeEndByte(self.progress, |
- use_chunks=use_chunks) |
- response = self.__GetChunk( |
- self.progress, end_byte, |
- additional_headers=additional_headers) |
- if self.total_size is None: |
- self.__SetTotal(response.info) |
- response = self.__ProcessResponse(response) |
- self._ExecuteCallback(callback, response) |
- if (response.status_code == http_client.OK or |
- self.progress >= self.total_size): |
- break |
- self._ExecuteCallback(finish_callback, response) |
- |
- |
-class Upload(_Transfer): |
- |
- """Data for a single Upload. |
- |
- Fields: |
- stream: The stream to upload. |
- mime_type: MIME type of the upload. |
- total_size: (optional) Total upload size for the stream. |
- close_stream: (default: False) Whether or not we should close the |
- stream when finished with the upload. |
- auto_transfer: (default: True) If True, stream all bytes as soon as |
- the upload is created. |
- """ |
- _REQUIRED_SERIALIZATION_KEYS = set(( |
- 'auto_transfer', 'mime_type', 'total_size', 'url')) |
- |
- def __init__(self, stream, mime_type, total_size=None, http=None, |
- close_stream=False, chunksize=None, auto_transfer=True, |
- progress_callback=None, finish_callback=None, |
- **kwds): |
- super(Upload, self).__init__( |
- stream, close_stream=close_stream, chunksize=chunksize, |
- auto_transfer=auto_transfer, http=http, **kwds) |
- self.__complete = False |
- self.__final_response = None |
- self.__mime_type = mime_type |
- self.__progress = 0 |
- self.__server_chunk_granularity = None |
- self.__strategy = None |
- self.__total_size = None |
- |
- self.progress_callback = progress_callback |
- self.finish_callback = finish_callback |
- self.total_size = total_size |
- |
- @property |
- def progress(self): |
- return self.__progress |
- |
- @classmethod |
- def FromFile(cls, filename, mime_type=None, auto_transfer=True, **kwds): |
- """Create a new Upload object from a filename.""" |
- path = os.path.expanduser(filename) |
- if not os.path.exists(path): |
- raise exceptions.NotFoundError('Could not find file %s' % path) |
- if not mime_type: |
- mime_type, _ = mimetypes.guess_type(path) |
- if mime_type is None: |
- raise exceptions.InvalidUserInputError( |
- 'Could not guess mime type for %s' % path) |
- size = os.stat(path).st_size |
- return cls(open(path, 'rb'), mime_type, total_size=size, |
- close_stream=True, auto_transfer=auto_transfer, **kwds) |
- |
- @classmethod |
- def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True, |
- **kwds): |
- """Create a new Upload object from a stream.""" |
- if mime_type is None: |
- raise exceptions.InvalidUserInputError( |
- 'No mime_type specified for stream') |
- return cls(stream, mime_type, total_size=total_size, |
- close_stream=False, auto_transfer=auto_transfer, **kwds) |
- |
- @classmethod |
- def FromData(cls, stream, json_data, http, auto_transfer=None, **kwds): |
- """Create a new Upload of stream from serialized json_data and http.""" |
- info = json.loads(json_data) |
- missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) |
- if missing_keys: |
- raise exceptions.InvalidDataError( |
- 'Invalid serialization data, missing keys: %s' % ( |
- ', '.join(missing_keys))) |
- if 'total_size' in kwds: |
- raise exceptions.InvalidUserInputError( |
- 'Cannot override total_size on serialized Upload') |
- upload = cls.FromStream(stream, info['mime_type'], |
- total_size=info.get('total_size'), **kwds) |
- if isinstance(stream, io.IOBase) and not stream.seekable(): |
- raise exceptions.InvalidUserInputError( |
- 'Cannot restart resumable upload on non-seekable stream') |
- if auto_transfer is not None: |
- upload.auto_transfer = auto_transfer |
- else: |
- upload.auto_transfer = info['auto_transfer'] |
- upload.strategy = RESUMABLE_UPLOAD |
- upload._Initialize( # pylint: disable=protected-access |
- http, info['url']) |
- upload.RefreshResumableUploadState() |
- upload.EnsureInitialized() |
- if upload.auto_transfer: |
- upload.StreamInChunks() |
- return upload |
- |
- @property |
- def serialization_data(self): |
- self.EnsureInitialized() |
- if self.strategy != RESUMABLE_UPLOAD: |
- raise exceptions.InvalidDataError( |
- 'Serialization only supported for resumable uploads') |
- return { |
- 'auto_transfer': self.auto_transfer, |
- 'mime_type': self.mime_type, |
- 'total_size': self.total_size, |
- 'url': self.url, |
- } |
- |
- @property |
- def complete(self): |
- return self.__complete |
- |
- @property |
- def mime_type(self): |
- return self.__mime_type |
- |
- def __str__(self): |
- if not self.initialized: |
- return 'Upload (uninitialized)' |
- else: |
- return 'Upload with %d/%s bytes transferred for url %s' % ( |
- self.progress, self.total_size or '???', self.url) |
- |
- @property |
- def strategy(self): |
- return self.__strategy |
- |
- @strategy.setter |
- def strategy(self, value): |
- if value not in (SIMPLE_UPLOAD, RESUMABLE_UPLOAD): |
- raise exceptions.UserError(( |
- 'Invalid value "%s" for upload strategy, must be one of ' |
- '"simple" or "resumable".') % value) |
- self.__strategy = value |
- |
- @property |
- def total_size(self): |
- return self.__total_size |
- |
- @total_size.setter |
- def total_size(self, value): |
- self.EnsureUninitialized() |
- self.__total_size = value |
- |
- def __SetDefaultUploadStrategy(self, upload_config, http_request): |
- """Determine and set the default upload strategy for this upload. |
- |
- We generally prefer simple or multipart, unless we're forced to |
- use resumable. This happens when any of (1) the upload is too |
- large, (2) the simple endpoint doesn't support multipart requests |
- and we have metadata, or (3) there is no simple upload endpoint. |
- |
- Args: |
- upload_config: Configuration for the upload endpoint. |
- http_request: The associated http request. |
- |
- Returns: |
- None. |
- """ |
- if self.strategy is not None: |
- return |
- strategy = SIMPLE_UPLOAD |
- if (self.total_size is not None and |
- self.total_size > _RESUMABLE_UPLOAD_THRESHOLD): |
- strategy = RESUMABLE_UPLOAD |
- if http_request.body and not upload_config.simple_multipart: |
- strategy = RESUMABLE_UPLOAD |
- if not upload_config.simple_path: |
- strategy = RESUMABLE_UPLOAD |
- self.strategy = strategy |
- |
- def ConfigureRequest(self, upload_config, http_request, url_builder): |
- """Configure the request and url for this upload.""" |
- # Validate total_size vs. max_size |
- if (self.total_size and upload_config.max_size and |
- self.total_size > upload_config.max_size): |
- raise exceptions.InvalidUserInputError( |
- 'Upload too big: %s larger than max size %s' % ( |
- self.total_size, upload_config.max_size)) |
- # Validate mime type |
- if not util.AcceptableMimeType(upload_config.accept, self.mime_type): |
- raise exceptions.InvalidUserInputError( |
- 'MIME type %s does not match any accepted MIME ranges %s' % ( |
- self.mime_type, upload_config.accept)) |
- |
- self.__SetDefaultUploadStrategy(upload_config, http_request) |
- if self.strategy == SIMPLE_UPLOAD: |
- url_builder.relative_path = upload_config.simple_path |
- if http_request.body: |
- url_builder.query_params['uploadType'] = 'multipart' |
- self.__ConfigureMultipartRequest(http_request) |
- else: |
- url_builder.query_params['uploadType'] = 'media' |
- self.__ConfigureMediaRequest(http_request) |
- else: |
- url_builder.relative_path = upload_config.resumable_path |
- url_builder.query_params['uploadType'] = 'resumable' |
- self.__ConfigureResumableRequest(http_request) |
- |
- def __ConfigureMediaRequest(self, http_request): |
- """Configure http_request as a simple request for this upload.""" |
- http_request.headers['content-type'] = self.mime_type |
- http_request.body = self.stream.read() |
- http_request.loggable_body = '<media body>' |
- |
- def __ConfigureMultipartRequest(self, http_request): |
- """Configure http_request as a multipart request for this upload.""" |
- # This is a multipart/related upload. |
- msg_root = mime_multipart.MIMEMultipart('related') |
- # msg_root should not write out its own headers |
- setattr(msg_root, '_write_headers', lambda self: None) |
- |
- # attach the body as one part |
- msg = mime_nonmultipart.MIMENonMultipart( |
- *http_request.headers['content-type'].split('/')) |
- msg.set_payload(http_request.body) |
- msg_root.attach(msg) |
- |
- # attach the media as the second part |
- msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/')) |
- msg['Content-Transfer-Encoding'] = 'binary' |
- msg.set_payload(self.stream.read()) |
- msg_root.attach(msg) |
- |
- # NOTE: We encode the body, but can't use |
- # `email.message.Message.as_string` because it prepends |
- # `> ` to `From ` lines. |
- # NOTE: We must use six.StringIO() instead of io.StringIO() since the |
- # `email` library uses cStringIO in Py2 and io.StringIO in Py3. |
- fp = six.StringIO() |
- g = email_generator.Generator(fp, mangle_from_=False) |
- g.flatten(msg_root, unixfrom=False) |
- http_request.body = fp.getvalue() |
- |
- multipart_boundary = msg_root.get_boundary() |
- http_request.headers['content-type'] = ( |
- 'multipart/related; boundary=%r' % multipart_boundary) |
- |
- body_components = http_request.body.split(multipart_boundary) |
- headers, _, _ = body_components[-2].partition('\n\n') |
- body_components[-2] = '\n\n'.join([headers, '<media body>\n\n--']) |
- http_request.loggable_body = multipart_boundary.join(body_components) |
- |
- def __ConfigureResumableRequest(self, http_request): |
- http_request.headers['X-Upload-Content-Type'] = self.mime_type |
- if self.total_size is not None: |
- http_request.headers[ |
- 'X-Upload-Content-Length'] = str(self.total_size) |
- |
- def RefreshResumableUploadState(self): |
- """Talk to the server and refresh the state of this resumable upload. |
- |
- Returns: |
- Response if the upload is complete. |
- """ |
- if self.strategy != RESUMABLE_UPLOAD: |
- return |
- self.EnsureInitialized() |
- refresh_request = http_wrapper.Request( |
- url=self.url, http_method='PUT', |
- headers={'Content-Range': 'bytes */*'}) |
- refresh_response = http_wrapper.MakeRequest( |
- self.http, refresh_request, redirections=0, |
- retries=self.num_retries) |
- range_header = self._GetRangeHeaderFromResponse(refresh_response) |
- if refresh_response.status_code in (http_client.OK, |
- http_client.CREATED): |
- self.__complete = True |
- self.__progress = self.total_size |
- self.stream.seek(self.progress) |
- # If we're finished, the refresh response will contain the metadata |
- # originally requested. Cache it so it can be returned in |
- # StreamInChunks. |
- self.__final_response = refresh_response |
- elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE: |
- if range_header is None: |
- self.__progress = 0 |
- else: |
- self.__progress = self.__GetLastByte(range_header) + 1 |
- self.stream.seek(self.progress) |
- else: |
- raise exceptions.HttpError.FromResponse(refresh_response) |
- |
- def _GetRangeHeaderFromResponse(self, response): |
- return response.info.get('Range', response.info.get('range')) |
- |
- def InitializeUpload(self, http_request, http=None, client=None): |
- """Initialize this upload from the given http_request.""" |
- if self.strategy is None: |
- raise exceptions.UserError( |
- 'No upload strategy set; did you call ConfigureRequest?') |
- if http is None and client is None: |
- raise exceptions.UserError('Must provide client or http.') |
- if self.strategy != RESUMABLE_UPLOAD: |
- return |
- http = http or client.http |
- if client is not None: |
- http_request.url = client.FinalizeTransferUrl(http_request.url) |
- self.EnsureUninitialized() |
- http_response = http_wrapper.MakeRequest(http, http_request, |
- retries=self.num_retries) |
- if http_response.status_code != http_client.OK: |
- raise exceptions.HttpError.FromResponse(http_response) |
- |
- self.__server_chunk_granularity = http_response.info.get( |
- 'X-Goog-Upload-Chunk-Granularity') |
- url = http_response.info['location'] |
- if client is not None: |
- url = client.FinalizeTransferUrl(url) |
- self._Initialize(http, url) |
- |
- # Unless the user has requested otherwise, we want to just |
- # go ahead and pump the bytes now. |
- if self.auto_transfer: |
- return self.StreamInChunks() |
- |
- def __GetLastByte(self, range_header): |
- _, _, end = range_header.partition('-') |
- # TODO(craigcitro): Validate start == 0? |
- return int(end) |
- |
- def __ValidateChunksize(self, chunksize=None): |
- if self.__server_chunk_granularity is None: |
- return |
- chunksize = chunksize or self.chunksize |
- if chunksize % self.__server_chunk_granularity: |
- raise exceptions.ConfigurationValueError( |
- 'Server requires chunksize to be a multiple of %d', |
- self.__server_chunk_granularity) |
- |
- def __StreamMedia(self, callback=None, finish_callback=None, |
- additional_headers=None, use_chunks=True): |
- """Helper function for StreamMedia / StreamInChunks.""" |
- if self.strategy != RESUMABLE_UPLOAD: |
- raise exceptions.InvalidUserInputError( |
- 'Cannot stream non-resumable upload') |
- callback = callback or self.progress_callback |
- finish_callback = finish_callback or self.finish_callback |
- # final_response is set if we resumed an already-completed upload. |
- response = self.__final_response |
- send_func = self.__SendChunk if use_chunks else self.__SendMediaBody |
- if use_chunks: |
- self.__ValidateChunksize(self.chunksize) |
- self.EnsureInitialized() |
- while not self.complete: |
- response = send_func(self.stream.tell(), |
- additional_headers=additional_headers) |
- if response.status_code in (http_client.OK, http_client.CREATED): |
- self.__complete = True |
- break |
- self.__progress = self.__GetLastByte(response.info['range']) |
- if self.progress + 1 != self.stream.tell(): |
- # TODO(craigcitro): Add a better way to recover here. |
- raise exceptions.CommunicationError( |
- 'Failed to transfer all bytes in chunk, upload paused at ' |
- 'byte %d' % self.progress) |
- self._ExecuteCallback(callback, response) |
- if self.__complete and hasattr(self.stream, 'seek'): |
- current_pos = self.stream.tell() |
- self.stream.seek(0, os.SEEK_END) |
- end_pos = self.stream.tell() |
- self.stream.seek(current_pos) |
- if current_pos != end_pos: |
- raise exceptions.TransferInvalidError( |
- 'Upload complete with %s additional bytes left in stream' % |
- (int(end_pos) - int(current_pos))) |
- self._ExecuteCallback(finish_callback, response) |
- return response |
- |
- def StreamMedia(self, callback=None, finish_callback=None, |
- additional_headers=None): |
- """Send this resumable upload in a single request. |
- |
- Args: |
- callback: Progress callback function with inputs |
- (http_wrapper.Response, transfer.Upload) |
- finish_callback: Final callback function with inputs |
- (http_wrapper.Response, transfer.Upload) |
- additional_headers: Dict of headers to include with the upload |
- http_wrapper.Request. |
- |
- Returns: |
- http_wrapper.Response of final response. |
- """ |
- return self.__StreamMedia( |
- callback=callback, finish_callback=finish_callback, |
- additional_headers=additional_headers, use_chunks=False) |
- |
- def StreamInChunks(self, callback=None, finish_callback=None, |
- additional_headers=None): |
- """Send this (resumable) upload in chunks.""" |
- return self.__StreamMedia( |
- callback=callback, finish_callback=finish_callback, |
- additional_headers=additional_headers) |
- |
- def __SendMediaRequest(self, request, end): |
- """Request helper function for SendMediaBody & SendChunk.""" |
- response = http_wrapper.MakeRequest( |
- self.bytes_http, request, retry_func=self.retry_func, |
- retries=self.num_retries) |
- if response.status_code not in (http_client.OK, http_client.CREATED, |
- http_wrapper.RESUME_INCOMPLETE): |
- # We want to reset our state to wherever the server left us |
- # before this failed request, and then raise. |
- self.RefreshResumableUploadState() |
- raise exceptions.HttpError.FromResponse(response) |
- if response.status_code == http_wrapper.RESUME_INCOMPLETE: |
- last_byte = self.__GetLastByte( |
- self._GetRangeHeaderFromResponse(response)) |
- if last_byte + 1 != end: |
- self.stream.seek(last_byte) |
- return response |
- |
- def __SendMediaBody(self, start, additional_headers=None): |
- """Send the entire media stream in a single request.""" |
- self.EnsureInitialized() |
- if self.total_size is None: |
- raise exceptions.TransferInvalidError( |
- 'Total size must be known for SendMediaBody') |
- body_stream = stream_slice.StreamSlice( |
- self.stream, self.total_size - start) |
- |
- request = http_wrapper.Request(url=self.url, http_method='PUT', |
- body=body_stream) |
- request.headers['Content-Type'] = self.mime_type |
- if start == self.total_size: |
- # End of an upload with 0 bytes left to send; just finalize. |
- range_string = 'bytes */%s' % self.total_size |
- else: |
- range_string = 'bytes %s-%s/%s' % (start, self.total_size - 1, |
- self.total_size) |
- |
- request.headers['Content-Range'] = range_string |
- if additional_headers: |
- request.headers.update(additional_headers) |
- |
- return self.__SendMediaRequest(request, self.total_size) |
- |
- def __SendChunk(self, start, additional_headers=None): |
- """Send the specified chunk.""" |
- self.EnsureInitialized() |
- no_log_body = self.total_size is None |
- if self.total_size is None: |
- # For the streaming resumable case, we need to detect when |
- # we're at the end of the stream. |
- body_stream = buffered_stream.BufferedStream( |
- self.stream, start, self.chunksize) |
- end = body_stream.stream_end_position |
- if body_stream.stream_exhausted: |
- self.__total_size = end |
- # TODO: Here, change body_stream from a stream to a string object, |
- # which means reading a chunk into memory. This works around |
- # https://code.google.com/p/httplib2/issues/detail?id=176 which can |
- # cause httplib2 to skip bytes on 401's for file objects. |
- # Rework this solution to be more general. |
- body_stream = body_stream.read(self.chunksize) |
- else: |
- end = min(start + self.chunksize, self.total_size) |
- body_stream = stream_slice.StreamSlice(self.stream, end - start) |
- # TODO(craigcitro): Think about clearer errors on "no data in |
- # stream". |
- request = http_wrapper.Request(url=self.url, http_method='PUT', |
- body=body_stream) |
- request.headers['Content-Type'] = self.mime_type |
- if no_log_body: |
- # Disable logging of streaming body. |
- # TODO: Remove no_log_body and rework as part of a larger logs |
- # refactor. |
- request.loggable_body = '<media body>' |
- if self.total_size is None: |
- # Streaming resumable upload case, unknown total size. |
- range_string = 'bytes %s-%s/*' % (start, end - 1) |
- elif end == start: |
- # End of an upload with 0 bytes left to send; just finalize. |
- range_string = 'bytes */%s' % self.total_size |
- else: |
- # Normal resumable upload case with known sizes. |
- range_string = 'bytes %s-%s/%s' % (start, end - 1, self.total_size) |
- |
- request.headers['Content-Range'] = range_string |
- if additional_headers: |
- request.headers.update(additional_headers) |
- |
- return self.__SendMediaRequest(request, end) |