| Index: tools/telemetry/third_party/gsutilz/gslib/boto_resumable_upload.py
|
| diff --git a/tools/telemetry/third_party/gsutilz/gslib/boto_resumable_upload.py b/tools/telemetry/third_party/gsutilz/gslib/boto_resumable_upload.py
|
| deleted file mode 100644
|
| index 0e3153b20ffd30199fd7be362db03b38729fcaef..0000000000000000000000000000000000000000
|
| --- a/tools/telemetry/third_party/gsutilz/gslib/boto_resumable_upload.py
|
| +++ /dev/null
|
| @@ -1,582 +0,0 @@
|
| -# -*- coding: utf-8 -*-
|
| -# Copyright 2010 Google Inc. All Rights Reserved.
|
| -#
|
| -# Permission is hereby granted, free of charge, to any person obtaining a
|
| -# copy of this software and associated documentation files (the
|
| -# "Software"), to deal in the Software without restriction, including
|
| -# without limitation the rights to use, copy, modify, merge, publish, dis-
|
| -# tribute, sublicense, and/or sell copies of the Software, and to permit
|
| -# persons to whom the Software is furnished to do so, subject to the fol-
|
| -# lowing conditions:
|
| -#
|
| -# The above copyright notice and this permission notice shall be included
|
| -# in all copies or substantial portions of the Software.
|
| -#
|
| -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
| -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
|
| -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
|
| -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
| -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
| -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
| -# IN THE SOFTWARE.
|
| -"""Boto translation layer for resumable uploads.
|
| -
|
| -See http://code.google.com/apis/storage/docs/developer-guide.html#resumable
|
| -for details.
|
| -
|
| -Resumable uploads will retry interrupted uploads, resuming at the byte
|
| -count completed by the last upload attempt. If too many retries happen with
|
| -no progress (per configurable num_retries param), the upload will be
|
| -aborted in the current process.
|
| -
|
| -Unlike the boto implementation of resumable upload handler, this class does
|
| -not directly interact with tracker files.
|
| -
|
| -Originally Google wrote and contributed this code to the boto project,
|
| -then copied that code back into gsutil on the release of gsutil 4.0 which
|
| -supports both boto and non-boto codepaths for resumable uploads. Any bug
|
| -fixes made to this file should also be integrated to resumable_upload_handler.py
|
| -in boto, where applicable.
|
| -
|
| -TODO: gsutil-beta: Add a similar comment to the boto code.
|
| -"""
|
| -
|
| -from __future__ import absolute_import
|
| -
|
| -import errno
|
| -import httplib
|
| -import random
|
| -import re
|
| -import socket
|
| -import time
|
| -import urlparse
|
| -from boto import UserAgent
|
| -from boto.connection import AWSAuthConnection
|
| -from boto.exception import ResumableTransferDisposition
|
| -from boto.exception import ResumableUploadException
|
| -from gslib.exception import InvalidUrlError
|
| -from gslib.util import GetMaxRetryDelay
|
| -from gslib.util import GetNumRetries
|
| -from gslib.util import XML_PROGRESS_CALLBACKS
|
| -
|
| -
|
| -class BotoResumableUpload(object):
|
| - """Upload helper class for resumable uploads via boto."""
|
| -
|
| - BUFFER_SIZE = 8192
|
| - RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
|
| - socket.gaierror)
|
| -
|
| - # (start, end) response indicating service has nothing (upload protocol uses
|
| - # inclusive numbering).
|
| - SERVICE_HAS_NOTHING = (0, -1)
|
| -
|
| - def __init__(self, tracker_callback, logger,
|
| - resume_url=None, num_retries=None):
|
| - """Constructor. Instantiate once for each uploaded file.
|
| -
|
| - Args:
|
| - tracker_callback: Callback function that takes a string argument. Used
|
| - by caller to track this upload across upload
|
| - interruption.
|
| - logger: logging.logger instance to use for debug messages.
|
| - resume_url: If present, attempt to resume the upload at this URL.
|
| - num_retries: Number of times to retry the upload making no progress.
|
| - This count resets every time we make progress, so the upload
|
| - can span many more than this number of retries.
|
| - """
|
| - if resume_url:
|
| - self._SetUploadUrl(resume_url)
|
| - else:
|
| - self.upload_url = None
|
| - self.num_retries = num_retries
|
| - self.service_has_bytes = 0 # Byte count at last service check.
|
| - # Save upload_start_point in instance state so caller can find how
|
| - # much was transferred by this ResumableUploadHandler (across retries).
|
| - self.upload_start_point = None
|
| - self.tracker_callback = tracker_callback
|
| - self.logger = logger
|
| -
|
| - def _SetUploadUrl(self, url):
|
| - """Saves URL and resets upload state.
|
| -
|
| - Called when we start a new resumable upload or get a new tracker
|
| - URL for the upload.
|
| -
|
| - Args:
|
| - url: URL string for the upload.
|
| -
|
| - Raises InvalidUrlError if URL is syntactically invalid.
|
| - """
|
| - parse_result = urlparse.urlparse(url)
|
| - if (parse_result.scheme.lower() not in ['http', 'https'] or
|
| - not parse_result.netloc):
|
| - raise InvalidUrlError('Invalid upload URL (%s)' % url)
|
| - self.upload_url = url
|
| - self.upload_url_host = parse_result.netloc
|
| - self.upload_url_path = '%s?%s' % (
|
| - parse_result.path, parse_result.query)
|
| - self.service_has_bytes = 0
|
| -
|
| - def _BuildContentRangeHeader(self, range_spec='*', length_spec='*'):
|
| - return 'bytes %s/%s' % (range_spec, length_spec)
|
| -
|
| - def _QueryServiceState(self, conn, file_length):
|
| - """Queries service to find out state of given upload.
|
| -
|
| - Note that this method really just makes special case use of the
|
| - fact that the upload service always returns the current start/end
|
| - state whenever a PUT doesn't complete.
|
| -
|
| - Args:
|
| - conn: HTTPConnection to use for the query.
|
| - file_length: Total length of the file.
|
| -
|
| - Returns:
|
| - HTTP response from sending request.
|
| -
|
| - Raises:
|
| - ResumableUploadException if problem querying service.
|
| - """
|
| - # Send an empty PUT so that service replies with this resumable
|
| - # transfer's state.
|
| - put_headers = {}
|
| - put_headers['Content-Range'] = (
|
| - self._BuildContentRangeHeader('*', file_length))
|
| - put_headers['Content-Length'] = '0'
|
| - return AWSAuthConnection.make_request(
|
| - conn, 'PUT', path=self.upload_url_path, auth_path=self.upload_url_path,
|
| - headers=put_headers, host=self.upload_url_host)
|
| -
|
| - def _QueryServicePos(self, conn, file_length):
|
| - """Queries service to find out what bytes it currently has.
|
| -
|
| - Args:
|
| - conn: HTTPConnection to use for the query.
|
| - file_length: Total length of the file.
|
| -
|
| - Returns:
|
| - (service_start, service_end), where the values are inclusive.
|
| - For example, (0, 2) would mean that the service has bytes 0, 1, *and* 2.
|
| -
|
| - Raises:
|
| - ResumableUploadException if problem querying service.
|
| - """
|
| - resp = self._QueryServiceState(conn, file_length)
|
| - if resp.status == 200:
|
| - # To handle the boundary condition where the service has the complete
|
| - # file, we return (service_start, file_length-1). That way the
|
| - # calling code can always simply read up through service_end. (If we
|
| - # didn't handle this boundary condition here, the caller would have
|
| - # to check whether service_end == file_length and read one fewer byte
|
| - # in that case.)
|
| - return (0, file_length - 1) # Completed upload.
|
| - if resp.status != 308:
|
| - # This means the service didn't have any state for the given
|
| - # upload ID, which can happen (for example) if the caller saved
|
| - # the upload URL to a file and then tried to restart the transfer
|
| - # after that upload ID has gone stale. In that case we need to
|
| - # start a new transfer (and the caller will then save the new
|
| - # upload URL to the tracker file).
|
| - raise ResumableUploadException(
|
| - 'Got non-308 response (%s) from service state query' %
|
| - resp.status, ResumableTransferDisposition.START_OVER)
|
| - got_valid_response = False
|
| - range_spec = resp.getheader('range')
|
| - if range_spec:
|
| - # Parse 'bytes=<from>-<to>' range_spec.
|
| - m = re.search(r'bytes=(\d+)-(\d+)', range_spec)
|
| - if m:
|
| - service_start = long(m.group(1))
|
| - service_end = long(m.group(2))
|
| - got_valid_response = True
|
| - else:
|
| - # No Range header, which means the service does not yet have
|
| - # any bytes. Note that the Range header uses inclusive 'from'
|
| - # and 'to' values. Since Range 0-0 would mean that the service
|
| - # has byte 0, omitting the Range header is used to indicate that
|
| - # the service doesn't have any bytes.
|
| - return self.SERVICE_HAS_NOTHING
|
| - if not got_valid_response:
|
| - raise ResumableUploadException(
|
| - 'Couldn\'t parse upload service state query response (%s)' %
|
| - str(resp.getheaders()), ResumableTransferDisposition.START_OVER)
|
| - if conn.debug >= 1:
|
| - self.logger.debug('Service has: Range: %d - %d.', service_start,
|
| - service_end)
|
| - return (service_start, service_end)
|
| -
|
| - def _StartNewResumableUpload(self, key, headers=None):
|
| - """Starts a new resumable upload.
|
| -
|
| - Args:
|
| - key: Boto Key representing the object to upload.
|
| - headers: Headers to use in the upload requests.
|
| -
|
| - Raises:
|
| - ResumableUploadException if any errors occur.
|
| - """
|
| - conn = key.bucket.connection
|
| - if conn.debug >= 1:
|
| - self.logger.debug('Starting new resumable upload.')
|
| - self.service_has_bytes = 0
|
| -
|
| - # Start a new resumable upload by sending a POST request with an
|
| - # empty body and the "X-Goog-Resumable: start" header. Include any
|
| - # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length
|
| - # (and raise an exception if they tried to pass one, since it's
|
| - # a semantic error to specify it at this point, and if we were to
|
| - # include one now it would cause the service to expect that many
|
| - # bytes; the POST doesn't include the actual file bytes We set
|
| - # the Content-Length in the subsequent PUT, based on the uploaded
|
| - # file size.
|
| - post_headers = {}
|
| - for k in headers:
|
| - if k.lower() == 'content-length':
|
| - raise ResumableUploadException(
|
| - 'Attempt to specify Content-Length header (disallowed)',
|
| - ResumableTransferDisposition.ABORT)
|
| - post_headers[k] = headers[k]
|
| - post_headers[conn.provider.resumable_upload_header] = 'start'
|
| -
|
| - resp = conn.make_request(
|
| - 'POST', key.bucket.name, key.name, post_headers)
|
| - # Get upload URL from response 'Location' header.
|
| - body = resp.read()
|
| -
|
| - # Check for various status conditions.
|
| - if resp.status in [429, 500, 503]:
|
| - # Retry after a delay.
|
| - raise ResumableUploadException(
|
| - 'Got status %d from attempt to start resumable upload. '
|
| - 'Will wait/retry' % resp.status,
|
| - ResumableTransferDisposition.WAIT_BEFORE_RETRY)
|
| - elif resp.status != 200 and resp.status != 201:
|
| - raise ResumableUploadException(
|
| - 'Got status %d from attempt to start resumable upload. '
|
| - 'Aborting' % resp.status,
|
| - ResumableTransferDisposition.ABORT)
|
| -
|
| - # Else we got 200 or 201 response code, indicating the resumable
|
| - # upload was created.
|
| - upload_url = resp.getheader('Location')
|
| - if not upload_url:
|
| - raise ResumableUploadException(
|
| - 'No resumable upload URL found in resumable initiation '
|
| - 'POST response (%s)' % body,
|
| - ResumableTransferDisposition.WAIT_BEFORE_RETRY)
|
| - self._SetUploadUrl(upload_url)
|
| - self.tracker_callback(upload_url)
|
| -
|
| - def _UploadFileBytes(self, conn, http_conn, fp, file_length,
|
| - total_bytes_uploaded, cb, num_cb, headers):
|
| - """Attempts to upload file bytes.
|
| -
|
| - Makes a single attempt using an existing resumable upload connection.
|
| -
|
| - Args:
|
| - conn: HTTPConnection from the boto Key.
|
| - http_conn: Separate HTTPConnection for the transfer.
|
| - fp: File pointer containing bytes to upload.
|
| - file_length: Total length of the file.
|
| - total_bytes_uploaded: The total number of bytes uploaded.
|
| - cb: Progress callback function that takes (progress, total_size).
|
| - num_cb: Granularity of the callback (maximum number of times the
|
| - callback will be called during the file transfer). If negative,
|
| - perform callback with each buffer read.
|
| - headers: Headers to be used in the upload requests.
|
| -
|
| - Returns:
|
| - (etag, generation, metageneration) from service upon success.
|
| -
|
| - Raises:
|
| - ResumableUploadException if any problems occur.
|
| - """
|
| - buf = fp.read(self.BUFFER_SIZE)
|
| - if cb:
|
| - # The cb_count represents the number of full buffers to send between
|
| - # cb executions.
|
| - if num_cb > 2:
|
| - cb_count = file_length / self.BUFFER_SIZE / (num_cb-2)
|
| - elif num_cb < 0:
|
| - cb_count = -1
|
| - else:
|
| - cb_count = 0
|
| - i = 0
|
| - cb(total_bytes_uploaded, file_length)
|
| -
|
| - # Build resumable upload headers for the transfer. Don't send a
|
| - # Content-Range header if the file is 0 bytes long, because the
|
| - # resumable upload protocol uses an *inclusive* end-range (so, sending
|
| - # 'bytes 0-0/1' would actually mean you're sending a 1-byte file).
|
| - put_headers = headers.copy() if headers else {}
|
| - if file_length:
|
| - if total_bytes_uploaded == file_length:
|
| - range_header = self._BuildContentRangeHeader(
|
| - '*', file_length)
|
| - else:
|
| - range_header = self._BuildContentRangeHeader(
|
| - '%d-%d' % (total_bytes_uploaded, file_length - 1),
|
| - file_length)
|
| - put_headers['Content-Range'] = range_header
|
| - # Set Content-Length to the total bytes we'll send with this PUT.
|
| - put_headers['Content-Length'] = str(file_length - total_bytes_uploaded)
|
| - http_request = AWSAuthConnection.build_base_http_request(
|
| - conn, 'PUT', path=self.upload_url_path, auth_path=None,
|
| - headers=put_headers, host=self.upload_url_host)
|
| - http_conn.putrequest('PUT', http_request.path)
|
| - for k in put_headers:
|
| - http_conn.putheader(k, put_headers[k])
|
| - http_conn.endheaders()
|
| -
|
| - # Turn off debug on http connection so upload content isn't included
|
| - # in debug stream.
|
| - http_conn.set_debuglevel(0)
|
| - while buf:
|
| - http_conn.send(buf)
|
| - total_bytes_uploaded += len(buf)
|
| - if cb:
|
| - i += 1
|
| - if i == cb_count or cb_count == -1:
|
| - cb(total_bytes_uploaded, file_length)
|
| - i = 0
|
| - buf = fp.read(self.BUFFER_SIZE)
|
| - http_conn.set_debuglevel(conn.debug)
|
| - if cb:
|
| - cb(total_bytes_uploaded, file_length)
|
| - if total_bytes_uploaded != file_length:
|
| - # Abort (and delete the tracker file) so if the user retries
|
| - # they'll start a new resumable upload rather than potentially
|
| - # attempting to pick back up later where we left off.
|
| - raise ResumableUploadException(
|
| - 'File changed during upload: EOF at %d bytes of %d byte file.' %
|
| - (total_bytes_uploaded, file_length),
|
| - ResumableTransferDisposition.ABORT)
|
| - resp = http_conn.getresponse()
|
| - # Restore http connection debug level.
|
| - http_conn.set_debuglevel(conn.debug)
|
| -
|
| - if resp.status == 200:
|
| - # Success.
|
| - return (resp.getheader('etag'),
|
| - resp.getheader('x-goog-generation'),
|
| - resp.getheader('x-goog-metageneration'))
|
| - # Retry timeout (408) and status 429, 500 and 503 errors after a delay.
|
| - elif resp.status in [408, 429, 500, 503]:
|
| - disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY
|
| - else:
|
| - # Catch all for any other error codes.
|
| - disposition = ResumableTransferDisposition.ABORT
|
| - raise ResumableUploadException('Got response code %d while attempting '
|
| - 'upload (%s)' %
|
| - (resp.status, resp.reason), disposition)
|
| -
|
| - def _AttemptResumableUpload(self, key, fp, file_length, headers, cb,
|
| - num_cb):
|
| - """Attempts a resumable upload.
|
| -
|
| - Args:
|
| - key: Boto key representing object to upload.
|
| - fp: File pointer containing upload bytes.
|
| - file_length: Total length of the upload.
|
| - headers: Headers to be used in upload requests.
|
| - cb: Progress callback function that takes (progress, total_size).
|
| - num_cb: Granularity of the callback (maximum number of times the
|
| - callback will be called during the file transfer). If negative,
|
| - perform callback with each buffer read.
|
| -
|
| - Returns:
|
| - (etag, generation, metageneration) from service upon success.
|
| -
|
| - Raises:
|
| - ResumableUploadException if any problems occur.
|
| - """
|
| - (service_start, service_end) = self.SERVICE_HAS_NOTHING
|
| - conn = key.bucket.connection
|
| - if self.upload_url:
|
| - # Try to resume existing resumable upload.
|
| - try:
|
| - (service_start, service_end) = (
|
| - self._QueryServicePos(conn, file_length))
|
| - self.service_has_bytes = service_start
|
| - if conn.debug >= 1:
|
| - self.logger.debug('Resuming transfer.')
|
| - except ResumableUploadException, e:
|
| - if conn.debug >= 1:
|
| - self.logger.debug('Unable to resume transfer (%s).', e.message)
|
| - self._StartNewResumableUpload(key, headers)
|
| - else:
|
| - self._StartNewResumableUpload(key, headers)
|
| -
|
| - # upload_start_point allows the code that instantiated the
|
| - # ResumableUploadHandler to find out the point from which it started
|
| - # uploading (e.g., so it can correctly compute throughput).
|
| - if self.upload_start_point is None:
|
| - self.upload_start_point = service_end
|
| -
|
| - total_bytes_uploaded = service_end + 1
|
| -
|
| - # Start reading from the file based upon the number of bytes that the
|
| - # server has so far.
|
| - if total_bytes_uploaded < file_length:
|
| - fp.seek(total_bytes_uploaded)
|
| -
|
| - conn = key.bucket.connection
|
| -
|
| - # Get a new HTTP connection (vs conn.get_http_connection(), which reuses
|
| - # pool connections) because httplib requires a new HTTP connection per
|
| - # transaction. (Without this, calling http_conn.getresponse() would get
|
| - # "ResponseNotReady".)
|
| - http_conn = conn.new_http_connection(self.upload_url_host, conn.port,
|
| - conn.is_secure)
|
| - http_conn.set_debuglevel(conn.debug)
|
| -
|
| - # Make sure to close http_conn at end so if a local file read
|
| - # failure occurs partway through service will terminate current upload
|
| - # and can report that progress on next attempt.
|
| - try:
|
| - return self._UploadFileBytes(conn, http_conn, fp, file_length,
|
| - total_bytes_uploaded, cb, num_cb,
|
| - headers)
|
| - except (ResumableUploadException, socket.error):
|
| - resp = self._QueryServiceState(conn, file_length)
|
| - if resp.status == 400:
|
| - raise ResumableUploadException(
|
| - 'Got 400 response from service state query after failed resumable '
|
| - 'upload attempt. This can happen for various reasons, including '
|
| - 'specifying an invalid request (e.g., an invalid canned ACL) or '
|
| - 'if the file size changed between upload attempts',
|
| - ResumableTransferDisposition.ABORT)
|
| - else:
|
| - raise
|
| - finally:
|
| - http_conn.close()
|
| -
|
| - def HandleResumableUploadException(self, e, debug):
|
| - if e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS:
|
| - if debug >= 1:
|
| - self.logger.debug('Caught non-retryable ResumableUploadException (%s); '
|
| - 'aborting but retaining tracker file', e.message)
|
| - raise
|
| - elif e.disposition == ResumableTransferDisposition.ABORT:
|
| - if debug >= 1:
|
| - self.logger.debug('Caught non-retryable ResumableUploadException (%s); '
|
| - 'aborting and removing tracker file', e.message)
|
| - raise
|
| - elif e.disposition == ResumableTransferDisposition.START_OVER:
|
| - raise
|
| - else:
|
| - if debug >= 1:
|
| - self.logger.debug(
|
| - 'Caught ResumableUploadException (%s) - will retry', e.message)
|
| -
|
| - def TrackProgressLessIterations(self, service_had_bytes_before_attempt,
|
| - debug=0):
|
| - """Tracks the number of iterations without progress.
|
| -
|
| - Performs randomized exponential backoff.
|
| -
|
| - Args:
|
| - service_had_bytes_before_attempt: Number of bytes the service had prior
|
| - to this upload attempt.
|
| - debug: debug level 0..3
|
| - """
|
| - # At this point we had a re-tryable failure; see if made progress.
|
| - if self.service_has_bytes > service_had_bytes_before_attempt:
|
| - self.progress_less_iterations = 0 # If progress, reset counter.
|
| - else:
|
| - self.progress_less_iterations += 1
|
| -
|
| - if self.progress_less_iterations > self.num_retries:
|
| - # Don't retry any longer in the current process.
|
| - raise ResumableUploadException(
|
| - 'Too many resumable upload attempts failed without '
|
| - 'progress. You might try this upload again later',
|
| - ResumableTransferDisposition.ABORT_CUR_PROCESS)
|
| -
|
| - # Use binary exponential backoff to desynchronize client requests.
|
| - sleep_time_secs = min(random.random() * (2**self.progress_less_iterations),
|
| - GetMaxRetryDelay())
|
| - if debug >= 1:
|
| - self.logger.debug('Got retryable failure (%d progress-less in a row).\n'
|
| - 'Sleeping %3.1f seconds before re-trying',
|
| - self.progress_less_iterations, sleep_time_secs)
|
| - time.sleep(sleep_time_secs)
|
| -
|
| - def SendFile(self, key, fp, size, headers, canned_acl=None, cb=None,
|
| - num_cb=XML_PROGRESS_CALLBACKS):
|
| - """Upload a file to a key into a bucket on GS, resumable upload protocol.
|
| -
|
| - Args:
|
| - key: `boto.s3.key.Key` or subclass representing the upload destination.
|
| - fp: File pointer to upload
|
| - size: Size of the file to upload.
|
| - headers: The headers to pass along with the PUT request
|
| - canned_acl: Optional canned ACL to apply to object.
|
| - cb: Callback function that will be called to report progress on
|
| - the upload. The callback should accept two integer parameters, the
|
| - first representing the number of bytes that have been successfully
|
| - transmitted to GS, and the second representing the total number of
|
| - bytes that need to be transmitted.
|
| - num_cb: (optional) If a callback is specified with the cb parameter, this
|
| - parameter determines the granularity of the callback by defining
|
| - the maximum number of times the callback will be called during the
|
| - file transfer. Providing a negative integer will cause your
|
| - callback to be called with each buffer read.
|
| -
|
| - Raises:
|
| - ResumableUploadException if a problem occurs during the transfer.
|
| - """
|
| -
|
| - if not headers:
|
| - headers = {}
|
| - # If Content-Type header is present and set to None, remove it.
|
| - # This is gsutil's way of asking boto to refrain from auto-generating
|
| - # that header.
|
| - content_type = 'Content-Type'
|
| - if content_type in headers and headers[content_type] is None:
|
| - del headers[content_type]
|
| -
|
| - if canned_acl:
|
| - headers[key.provider.acl_header] = canned_acl
|
| -
|
| - headers['User-Agent'] = UserAgent
|
| -
|
| - file_length = size
|
| - debug = key.bucket.connection.debug
|
| -
|
| - # Use num-retries from constructor if one was provided; else check
|
| - # for a value specified in the boto config file; else default to 5.
|
| - if self.num_retries is None:
|
| - self.num_retries = GetNumRetries()
|
| - self.progress_less_iterations = 0
|
| -
|
| - while True: # Retry as long as we're making progress.
|
| - service_had_bytes_before_attempt = self.service_has_bytes
|
| - try:
|
| - # Save generation and metageneration in class state so caller
|
| - # can find these values, for use in preconditions of future
|
| - # operations on the uploaded object.
|
| - (_, self.generation, self.metageneration) = (
|
| - self._AttemptResumableUpload(key, fp, file_length,
|
| - headers, cb, num_cb))
|
| -
|
| - key.generation = self.generation
|
| - if debug >= 1:
|
| - self.logger.debug('Resumable upload complete.')
|
| - return
|
| - except self.RETRYABLE_EXCEPTIONS, e:
|
| - if debug >= 1:
|
| - self.logger.debug('Caught exception (%s)', e.__repr__())
|
| - if isinstance(e, IOError) and e.errno == errno.EPIPE:
|
| - # Broken pipe error causes httplib to immediately
|
| - # close the socket (http://bugs.python.org/issue5542),
|
| - # so we need to close the connection before we resume
|
| - # the upload (which will cause a new connection to be
|
| - # opened the next time an HTTP request is sent).
|
| - key.bucket.connection.connection.close()
|
| - except ResumableUploadException, e:
|
| - self.HandleResumableUploadException(e, debug)
|
| -
|
| - self.TrackProgressLessIterations(service_had_bytes_before_attempt,
|
| - debug=debug)
|
|
|