Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(211)

Unified Diff: third_party/gsutil/gslib/boto_resumable_upload.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/gsutil/gslib/addlhelp/wildcards.py ('k') | third_party/gsutil/gslib/boto_translation.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/gsutil/gslib/boto_resumable_upload.py
diff --git a/third_party/gsutil/gslib/boto_resumable_upload.py b/third_party/gsutil/gslib/boto_resumable_upload.py
new file mode 100644
index 0000000000000000000000000000000000000000..0e3153b20ffd30199fd7be362db03b38729fcaef
--- /dev/null
+++ b/third_party/gsutil/gslib/boto_resumable_upload.py
@@ -0,0 +1,582 @@
+# -*- coding: utf-8 -*-
+# Copyright 2010 Google Inc. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+"""Boto translation layer for resumable uploads.
+
+See http://code.google.com/apis/storage/docs/developer-guide.html#resumable
+for details.
+
+Resumable uploads will retry interrupted uploads, resuming at the byte
+count completed by the last upload attempt. If too many retries happen with
+no progress (per configurable num_retries param), the upload will be
+aborted in the current process.
+
+Unlike the boto implementation of resumable upload handler, this class does
+not directly interact with tracker files.
+
+Originally Google wrote and contributed this code to the boto project,
+then copied that code back into gsutil on the release of gsutil 4.0 which
+supports both boto and non-boto codepaths for resumable uploads. Any bug
+fixes made to this file should also be integrated to resumable_upload_handler.py
+in boto, where applicable.
+
+TODO: gsutil-beta: Add a similar comment to the boto code.
+"""
+
+from __future__ import absolute_import
+
+import errno
+import httplib
+import random
+import re
+import socket
+import time
+import urlparse
+from boto import UserAgent
+from boto.connection import AWSAuthConnection
+from boto.exception import ResumableTransferDisposition
+from boto.exception import ResumableUploadException
+from gslib.exception import InvalidUrlError
+from gslib.util import GetMaxRetryDelay
+from gslib.util import GetNumRetries
+from gslib.util import XML_PROGRESS_CALLBACKS
+
+
+class BotoResumableUpload(object):
+ """Upload helper class for resumable uploads via boto."""
+
+ BUFFER_SIZE = 8192
+ RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
+ socket.gaierror)
+
+ # (start, end) response indicating service has nothing (upload protocol uses
+ # inclusive numbering).
+ SERVICE_HAS_NOTHING = (0, -1)
+
+ def __init__(self, tracker_callback, logger,
+ resume_url=None, num_retries=None):
+ """Constructor. Instantiate once for each uploaded file.
+
+ Args:
+ tracker_callback: Callback function that takes a string argument. Used
+ by caller to track this upload across upload
+ interruption.
+ logger: logging.logger instance to use for debug messages.
+ resume_url: If present, attempt to resume the upload at this URL.
+ num_retries: Number of times to retry the upload making no progress.
+ This count resets every time we make progress, so the upload
+ can span many more than this number of retries.
+ """
+ if resume_url:
+ self._SetUploadUrl(resume_url)
+ else:
+ self.upload_url = None
+ self.num_retries = num_retries
+ self.service_has_bytes = 0 # Byte count at last service check.
+ # Save upload_start_point in instance state so caller can find how
+ # much was transferred by this ResumableUploadHandler (across retries).
+ self.upload_start_point = None
+ self.tracker_callback = tracker_callback
+ self.logger = logger
+
+ def _SetUploadUrl(self, url):
+ """Saves URL and resets upload state.
+
+ Called when we start a new resumable upload or get a new tracker
+ URL for the upload.
+
+ Args:
+ url: URL string for the upload.
+
+ Raises InvalidUrlError if URL is syntactically invalid.
+ """
+ parse_result = urlparse.urlparse(url)
+ if (parse_result.scheme.lower() not in ['http', 'https'] or
+ not parse_result.netloc):
+ raise InvalidUrlError('Invalid upload URL (%s)' % url)
+ self.upload_url = url
+ self.upload_url_host = parse_result.netloc
+ self.upload_url_path = '%s?%s' % (
+ parse_result.path, parse_result.query)
+ self.service_has_bytes = 0
+
+ def _BuildContentRangeHeader(self, range_spec='*', length_spec='*'):
+ return 'bytes %s/%s' % (range_spec, length_spec)
+
+ def _QueryServiceState(self, conn, file_length):
+ """Queries service to find out state of given upload.
+
+ Note that this method really just makes special case use of the
+ fact that the upload service always returns the current start/end
+ state whenever a PUT doesn't complete.
+
+ Args:
+ conn: HTTPConnection to use for the query.
+ file_length: Total length of the file.
+
+ Returns:
+ HTTP response from sending request.
+
+ Raises:
+ ResumableUploadException if problem querying service.
+ """
+ # Send an empty PUT so that service replies with this resumable
+ # transfer's state.
+ put_headers = {}
+ put_headers['Content-Range'] = (
+ self._BuildContentRangeHeader('*', file_length))
+ put_headers['Content-Length'] = '0'
+ return AWSAuthConnection.make_request(
+ conn, 'PUT', path=self.upload_url_path, auth_path=self.upload_url_path,
+ headers=put_headers, host=self.upload_url_host)
+
+ def _QueryServicePos(self, conn, file_length):
+ """Queries service to find out what bytes it currently has.
+
+ Args:
+ conn: HTTPConnection to use for the query.
+ file_length: Total length of the file.
+
+ Returns:
+ (service_start, service_end), where the values are inclusive.
+ For example, (0, 2) would mean that the service has bytes 0, 1, *and* 2.
+
+ Raises:
+ ResumableUploadException if problem querying service.
+ """
+ resp = self._QueryServiceState(conn, file_length)
+ if resp.status == 200:
+ # To handle the boundary condition where the service has the complete
+ # file, we return (service_start, file_length-1). That way the
+ # calling code can always simply read up through service_end. (If we
+ # didn't handle this boundary condition here, the caller would have
+ # to check whether service_end == file_length and read one fewer byte
+ # in that case.)
+ return (0, file_length - 1) # Completed upload.
+ if resp.status != 308:
+ # This means the service didn't have any state for the given
+ # upload ID, which can happen (for example) if the caller saved
+ # the upload URL to a file and then tried to restart the transfer
+ # after that upload ID has gone stale. In that case we need to
+ # start a new transfer (and the caller will then save the new
+ # upload URL to the tracker file).
+ raise ResumableUploadException(
+ 'Got non-308 response (%s) from service state query' %
+ resp.status, ResumableTransferDisposition.START_OVER)
+ got_valid_response = False
+ range_spec = resp.getheader('range')
+ if range_spec:
+ # Parse 'bytes=<from>-<to>' range_spec.
+ m = re.search(r'bytes=(\d+)-(\d+)', range_spec)
+ if m:
+ service_start = long(m.group(1))
+ service_end = long(m.group(2))
+ got_valid_response = True
+ else:
+ # No Range header, which means the service does not yet have
+ # any bytes. Note that the Range header uses inclusive 'from'
+ # and 'to' values. Since Range 0-0 would mean that the service
+ # has byte 0, omitting the Range header is used to indicate that
+ # the service doesn't have any bytes.
+ return self.SERVICE_HAS_NOTHING
+ if not got_valid_response:
+ raise ResumableUploadException(
+ 'Couldn\'t parse upload service state query response (%s)' %
+ str(resp.getheaders()), ResumableTransferDisposition.START_OVER)
+ if conn.debug >= 1:
+ self.logger.debug('Service has: Range: %d - %d.', service_start,
+ service_end)
+ return (service_start, service_end)
+
+ def _StartNewResumableUpload(self, key, headers=None):
+ """Starts a new resumable upload.
+
+ Args:
+ key: Boto Key representing the object to upload.
+ headers: Headers to use in the upload requests.
+
+ Raises:
+ ResumableUploadException if any errors occur.
+ """
+ conn = key.bucket.connection
+ if conn.debug >= 1:
+ self.logger.debug('Starting new resumable upload.')
+ self.service_has_bytes = 0
+
+ # Start a new resumable upload by sending a POST request with an
+ # empty body and the "X-Goog-Resumable: start" header. Include any
+ # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length
+ # (and raise an exception if they tried to pass one, since it's
+ # a semantic error to specify it at this point, and if we were to
+ # include one now it would cause the service to expect that many
+ # bytes; the POST doesn't include the actual file bytes We set
+ # the Content-Length in the subsequent PUT, based on the uploaded
+ # file size.
+ post_headers = {}
+ for k in headers:
+ if k.lower() == 'content-length':
+ raise ResumableUploadException(
+ 'Attempt to specify Content-Length header (disallowed)',
+ ResumableTransferDisposition.ABORT)
+ post_headers[k] = headers[k]
+ post_headers[conn.provider.resumable_upload_header] = 'start'
+
+ resp = conn.make_request(
+ 'POST', key.bucket.name, key.name, post_headers)
+ # Get upload URL from response 'Location' header.
+ body = resp.read()
+
+ # Check for various status conditions.
+ if resp.status in [429, 500, 503]:
+ # Retry after a delay.
+ raise ResumableUploadException(
+ 'Got status %d from attempt to start resumable upload. '
+ 'Will wait/retry' % resp.status,
+ ResumableTransferDisposition.WAIT_BEFORE_RETRY)
+ elif resp.status != 200 and resp.status != 201:
+ raise ResumableUploadException(
+ 'Got status %d from attempt to start resumable upload. '
+ 'Aborting' % resp.status,
+ ResumableTransferDisposition.ABORT)
+
+ # Else we got 200 or 201 response code, indicating the resumable
+ # upload was created.
+ upload_url = resp.getheader('Location')
+ if not upload_url:
+ raise ResumableUploadException(
+ 'No resumable upload URL found in resumable initiation '
+ 'POST response (%s)' % body,
+ ResumableTransferDisposition.WAIT_BEFORE_RETRY)
+ self._SetUploadUrl(upload_url)
+ self.tracker_callback(upload_url)
+
+ def _UploadFileBytes(self, conn, http_conn, fp, file_length,
+ total_bytes_uploaded, cb, num_cb, headers):
+ """Attempts to upload file bytes.
+
+ Makes a single attempt using an existing resumable upload connection.
+
+ Args:
+ conn: HTTPConnection from the boto Key.
+ http_conn: Separate HTTPConnection for the transfer.
+ fp: File pointer containing bytes to upload.
+ file_length: Total length of the file.
+ total_bytes_uploaded: The total number of bytes uploaded.
+ cb: Progress callback function that takes (progress, total_size).
+ num_cb: Granularity of the callback (maximum number of times the
+ callback will be called during the file transfer). If negative,
+ perform callback with each buffer read.
+ headers: Headers to be used in the upload requests.
+
+ Returns:
+ (etag, generation, metageneration) from service upon success.
+
+ Raises:
+ ResumableUploadException if any problems occur.
+ """
+ buf = fp.read(self.BUFFER_SIZE)
+ if cb:
+ # The cb_count represents the number of full buffers to send between
+ # cb executions.
+ if num_cb > 2:
+ cb_count = file_length / self.BUFFER_SIZE / (num_cb-2)
+ elif num_cb < 0:
+ cb_count = -1
+ else:
+ cb_count = 0
+ i = 0
+ cb(total_bytes_uploaded, file_length)
+
+ # Build resumable upload headers for the transfer. Don't send a
+ # Content-Range header if the file is 0 bytes long, because the
+ # resumable upload protocol uses an *inclusive* end-range (so, sending
+ # 'bytes 0-0/1' would actually mean you're sending a 1-byte file).
+ put_headers = headers.copy() if headers else {}
+ if file_length:
+ if total_bytes_uploaded == file_length:
+ range_header = self._BuildContentRangeHeader(
+ '*', file_length)
+ else:
+ range_header = self._BuildContentRangeHeader(
+ '%d-%d' % (total_bytes_uploaded, file_length - 1),
+ file_length)
+ put_headers['Content-Range'] = range_header
+ # Set Content-Length to the total bytes we'll send with this PUT.
+ put_headers['Content-Length'] = str(file_length - total_bytes_uploaded)
+ http_request = AWSAuthConnection.build_base_http_request(
+ conn, 'PUT', path=self.upload_url_path, auth_path=None,
+ headers=put_headers, host=self.upload_url_host)
+ http_conn.putrequest('PUT', http_request.path)
+ for k in put_headers:
+ http_conn.putheader(k, put_headers[k])
+ http_conn.endheaders()
+
+ # Turn off debug on http connection so upload content isn't included
+ # in debug stream.
+ http_conn.set_debuglevel(0)
+ while buf:
+ http_conn.send(buf)
+ total_bytes_uploaded += len(buf)
+ if cb:
+ i += 1
+ if i == cb_count or cb_count == -1:
+ cb(total_bytes_uploaded, file_length)
+ i = 0
+ buf = fp.read(self.BUFFER_SIZE)
+ http_conn.set_debuglevel(conn.debug)
+ if cb:
+ cb(total_bytes_uploaded, file_length)
+ if total_bytes_uploaded != file_length:
+ # Abort (and delete the tracker file) so if the user retries
+ # they'll start a new resumable upload rather than potentially
+ # attempting to pick back up later where we left off.
+ raise ResumableUploadException(
+ 'File changed during upload: EOF at %d bytes of %d byte file.' %
+ (total_bytes_uploaded, file_length),
+ ResumableTransferDisposition.ABORT)
+ resp = http_conn.getresponse()
+ # Restore http connection debug level.
+ http_conn.set_debuglevel(conn.debug)
+
+ if resp.status == 200:
+ # Success.
+ return (resp.getheader('etag'),
+ resp.getheader('x-goog-generation'),
+ resp.getheader('x-goog-metageneration'))
+ # Retry timeout (408) and status 429, 500 and 503 errors after a delay.
+ elif resp.status in [408, 429, 500, 503]:
+ disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY
+ else:
+ # Catch all for any other error codes.
+ disposition = ResumableTransferDisposition.ABORT
+ raise ResumableUploadException('Got response code %d while attempting '
+ 'upload (%s)' %
+ (resp.status, resp.reason), disposition)
+
+ def _AttemptResumableUpload(self, key, fp, file_length, headers, cb,
+ num_cb):
+ """Attempts a resumable upload.
+
+ Args:
+ key: Boto key representing object to upload.
+ fp: File pointer containing upload bytes.
+ file_length: Total length of the upload.
+ headers: Headers to be used in upload requests.
+ cb: Progress callback function that takes (progress, total_size).
+ num_cb: Granularity of the callback (maximum number of times the
+ callback will be called during the file transfer). If negative,
+ perform callback with each buffer read.
+
+ Returns:
+ (etag, generation, metageneration) from service upon success.
+
+ Raises:
+ ResumableUploadException if any problems occur.
+ """
+ (service_start, service_end) = self.SERVICE_HAS_NOTHING
+ conn = key.bucket.connection
+ if self.upload_url:
+ # Try to resume existing resumable upload.
+ try:
+ (service_start, service_end) = (
+ self._QueryServicePos(conn, file_length))
+ self.service_has_bytes = service_start
+ if conn.debug >= 1:
+ self.logger.debug('Resuming transfer.')
+ except ResumableUploadException, e:
+ if conn.debug >= 1:
+ self.logger.debug('Unable to resume transfer (%s).', e.message)
+ self._StartNewResumableUpload(key, headers)
+ else:
+ self._StartNewResumableUpload(key, headers)
+
+ # upload_start_point allows the code that instantiated the
+ # ResumableUploadHandler to find out the point from which it started
+ # uploading (e.g., so it can correctly compute throughput).
+ if self.upload_start_point is None:
+ self.upload_start_point = service_end
+
+ total_bytes_uploaded = service_end + 1
+
+ # Start reading from the file based upon the number of bytes that the
+ # server has so far.
+ if total_bytes_uploaded < file_length:
+ fp.seek(total_bytes_uploaded)
+
+ conn = key.bucket.connection
+
+ # Get a new HTTP connection (vs conn.get_http_connection(), which reuses
+ # pool connections) because httplib requires a new HTTP connection per
+ # transaction. (Without this, calling http_conn.getresponse() would get
+ # "ResponseNotReady".)
+ http_conn = conn.new_http_connection(self.upload_url_host, conn.port,
+ conn.is_secure)
+ http_conn.set_debuglevel(conn.debug)
+
+ # Make sure to close http_conn at end so if a local file read
+ # failure occurs partway through service will terminate current upload
+ # and can report that progress on next attempt.
+ try:
+ return self._UploadFileBytes(conn, http_conn, fp, file_length,
+ total_bytes_uploaded, cb, num_cb,
+ headers)
+ except (ResumableUploadException, socket.error):
+ resp = self._QueryServiceState(conn, file_length)
+ if resp.status == 400:
+ raise ResumableUploadException(
+ 'Got 400 response from service state query after failed resumable '
+ 'upload attempt. This can happen for various reasons, including '
+ 'specifying an invalid request (e.g., an invalid canned ACL) or '
+ 'if the file size changed between upload attempts',
+ ResumableTransferDisposition.ABORT)
+ else:
+ raise
+ finally:
+ http_conn.close()
+
+ def HandleResumableUploadException(self, e, debug):
+ if e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS:
+ if debug >= 1:
+ self.logger.debug('Caught non-retryable ResumableUploadException (%s); '
+ 'aborting but retaining tracker file', e.message)
+ raise
+ elif e.disposition == ResumableTransferDisposition.ABORT:
+ if debug >= 1:
+ self.logger.debug('Caught non-retryable ResumableUploadException (%s); '
+ 'aborting and removing tracker file', e.message)
+ raise
+ elif e.disposition == ResumableTransferDisposition.START_OVER:
+ raise
+ else:
+ if debug >= 1:
+ self.logger.debug(
+ 'Caught ResumableUploadException (%s) - will retry', e.message)
+
+ def TrackProgressLessIterations(self, service_had_bytes_before_attempt,
+ debug=0):
+ """Tracks the number of iterations without progress.
+
+ Performs randomized exponential backoff.
+
+ Args:
+ service_had_bytes_before_attempt: Number of bytes the service had prior
+ to this upload attempt.
+ debug: debug level 0..3
+ """
+ # At this point we had a re-tryable failure; see if made progress.
+ if self.service_has_bytes > service_had_bytes_before_attempt:
+ self.progress_less_iterations = 0 # If progress, reset counter.
+ else:
+ self.progress_less_iterations += 1
+
+ if self.progress_less_iterations > self.num_retries:
+ # Don't retry any longer in the current process.
+ raise ResumableUploadException(
+ 'Too many resumable upload attempts failed without '
+ 'progress. You might try this upload again later',
+ ResumableTransferDisposition.ABORT_CUR_PROCESS)
+
+ # Use binary exponential backoff to desynchronize client requests.
+ sleep_time_secs = min(random.random() * (2**self.progress_less_iterations),
+ GetMaxRetryDelay())
+ if debug >= 1:
+ self.logger.debug('Got retryable failure (%d progress-less in a row).\n'
+ 'Sleeping %3.1f seconds before re-trying',
+ self.progress_less_iterations, sleep_time_secs)
+ time.sleep(sleep_time_secs)
+
+ def SendFile(self, key, fp, size, headers, canned_acl=None, cb=None,
+ num_cb=XML_PROGRESS_CALLBACKS):
+ """Upload a file to a key into a bucket on GS, resumable upload protocol.
+
+ Args:
+ key: `boto.s3.key.Key` or subclass representing the upload destination.
+ fp: File pointer to upload
+ size: Size of the file to upload.
+ headers: The headers to pass along with the PUT request
+ canned_acl: Optional canned ACL to apply to object.
+ cb: Callback function that will be called to report progress on
+ the upload. The callback should accept two integer parameters, the
+ first representing the number of bytes that have been successfully
+ transmitted to GS, and the second representing the total number of
+ bytes that need to be transmitted.
+ num_cb: (optional) If a callback is specified with the cb parameter, this
+ parameter determines the granularity of the callback by defining
+ the maximum number of times the callback will be called during the
+ file transfer. Providing a negative integer will cause your
+ callback to be called with each buffer read.
+
+ Raises:
+ ResumableUploadException if a problem occurs during the transfer.
+ """
+
+ if not headers:
+ headers = {}
+ # If Content-Type header is present and set to None, remove it.
+ # This is gsutil's way of asking boto to refrain from auto-generating
+ # that header.
+ content_type = 'Content-Type'
+ if content_type in headers and headers[content_type] is None:
+ del headers[content_type]
+
+ if canned_acl:
+ headers[key.provider.acl_header] = canned_acl
+
+ headers['User-Agent'] = UserAgent
+
+ file_length = size
+ debug = key.bucket.connection.debug
+
+ # Use num-retries from constructor if one was provided; else check
+ # for a value specified in the boto config file; else default to 5.
+ if self.num_retries is None:
+ self.num_retries = GetNumRetries()
+ self.progress_less_iterations = 0
+
+ while True: # Retry as long as we're making progress.
+ service_had_bytes_before_attempt = self.service_has_bytes
+ try:
+ # Save generation and metageneration in class state so caller
+ # can find these values, for use in preconditions of future
+ # operations on the uploaded object.
+ (_, self.generation, self.metageneration) = (
+ self._AttemptResumableUpload(key, fp, file_length,
+ headers, cb, num_cb))
+
+ key.generation = self.generation
+ if debug >= 1:
+ self.logger.debug('Resumable upload complete.')
+ return
+ except self.RETRYABLE_EXCEPTIONS, e:
+ if debug >= 1:
+ self.logger.debug('Caught exception (%s)', e.__repr__())
+ if isinstance(e, IOError) and e.errno == errno.EPIPE:
+ # Broken pipe error causes httplib to immediately
+ # close the socket (http://bugs.python.org/issue5542),
+ # so we need to close the connection before we resume
+ # the upload (which will cause a new connection to be
+ # opened the next time an HTTP request is sent).
+ key.bucket.connection.connection.close()
+ except ResumableUploadException, e:
+ self.HandleResumableUploadException(e, debug)
+
+ self.TrackProgressLessIterations(service_had_bytes_before_attempt,
+ debug=debug)
« no previous file with comments | « third_party/gsutil/gslib/addlhelp/wildcards.py ('k') | third_party/gsutil/gslib/boto_translation.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698