| Index: boto/gs/resumable_upload_handler.py
|
| diff --git a/boto/gs/resumable_upload_handler.py b/boto/gs/resumable_upload_handler.py
|
| index e8d5b0376d0eac925a1eb8c458867eb5143b0a20..a60d91d5da1626736fc3f437e4f1208e6d33da15 100644
|
| --- a/boto/gs/resumable_upload_handler.py
|
| +++ b/boto/gs/resumable_upload_handler.py
|
| @@ -23,6 +23,7 @@ import cgi
|
| import errno
|
| import httplib
|
| import os
|
| +import random
|
| import re
|
| import socket
|
| import time
|
| @@ -35,13 +36,14 @@ from boto.exception import ResumableTransferDisposition
|
| from boto.exception import ResumableUploadException
|
|
|
| """
|
| -Handler for Google Storage resumable uploads. See
|
| +Handler for Google Cloud Storage resumable uploads. See
|
| http://code.google.com/apis/storage/docs/developer-guide.html#resumable
|
| for details.
|
|
|
| Resumable uploads will retry failed uploads, resuming at the byte
|
| count completed by the last upload attempt. If too many retries happen with
|
| -no progress (per configurable num_retries param), the upload will be aborted.
|
| +no progress (per configurable num_retries param), the upload will be
|
| +aborted in the current process.
|
|
|
| The caller can optionally specify a tracker_file_name param in the
|
| ResumableUploadHandler constructor. If you do this, that file will
|
| @@ -169,14 +171,13 @@ class ResumableUploadHandler(object):
|
|
|
| def _query_server_state(self, conn, file_length):
|
| """
|
| - Queries server to find out what bytes it currently has.
|
| + Queries server to find out state of given upload.
|
|
|
| Note that this method really just makes special case use of the
|
| fact that the upload server always returns the current start/end
|
| state whenever a PUT doesn't complete.
|
|
|
| - Returns (server_start, server_end), where the values are inclusive.
|
| - For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2.
|
| + Returns HTTP response from sending request.
|
|
|
| Raises ResumableUploadException if problem querying server.
|
| """
|
| @@ -186,11 +187,22 @@ class ResumableUploadHandler(object):
|
| put_headers['Content-Range'] = (
|
| self._build_content_range_header('*', file_length))
|
| put_headers['Content-Length'] = '0'
|
| - resp = AWSAuthConnection.make_request(conn, 'PUT',
|
| + return AWSAuthConnection.make_request(conn, 'PUT',
|
| path=self.tracker_uri_path,
|
| auth_path=self.tracker_uri_path,
|
| headers=put_headers,
|
| host=self.tracker_uri_host)
|
| +
|
| + def _query_server_pos(self, conn, file_length):
|
| + """
|
| + Queries server to find out what bytes it currently has.
|
| +
|
| + Returns (server_start, server_end), where the values are inclusive.
|
| + For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2.
|
| +
|
| + Raises ResumableUploadException if problem querying server.
|
| + """
|
| + resp = self._query_server_state(conn, file_length)
|
| if resp.status == 200:
|
| return (0, file_length) # Completed upload.
|
| if resp.status != 308:
|
| @@ -260,11 +272,22 @@ class ResumableUploadHandler(object):
|
| 'POST', key.bucket.name, key.name, post_headers)
|
| # Get tracker URI from response 'Location' header.
|
| body = resp.read()
|
| - # Check for '201 Created' response code.
|
| - if resp.status != 201:
|
| +
|
| + # Check for various status conditions.
|
| + if resp.status in [500, 503]:
|
| + # Retry status 500 and 503 errors after a delay.
|
| raise ResumableUploadException(
|
| - 'Got status %d from attempt to start resumable upload' %
|
| - resp.status, ResumableTransferDisposition.WAIT_BEFORE_RETRY)
|
| + 'Got status %d from attempt to start resumable upload. '
|
| + 'Will wait/retry' % resp.status,
|
| + ResumableTransferDisposition.WAIT_BEFORE_RETRY)
|
| + elif resp.status != 200 and resp.status != 201:
|
| + raise ResumableUploadException(
|
| + 'Got status %d from attempt to start resumable upload. '
|
| + 'Aborting' % resp.status,
|
| + ResumableTransferDisposition.ABORT)
|
| +
|
| + # Else we got 200 or 201 response code, indicating the resumable
|
| + # upload was created.
|
| tracker_uri = resp.getheader('Location')
|
| if not tracker_uri:
|
| raise ResumableUploadException(
|
| @@ -330,32 +353,29 @@ class ResumableUploadHandler(object):
|
| if cb:
|
| cb(total_bytes_uploaded, file_length)
|
| if total_bytes_uploaded != file_length:
|
| - raise ResumableUploadException('File changed during upload: EOF at '
|
| - '%d bytes of %d byte file.' %
|
| - (total_bytes_uploaded, file_length),
|
| - ResumableTransferDisposition.ABORT)
|
| + # Abort (and delete the tracker file) so if the user retries
|
| + # they'll start a new resumable upload rather than potentially
|
| + # attempting to pick back up later where we left off.
|
| + raise ResumableUploadException(
|
| + 'File changed during upload: EOF at %d bytes of %d byte file.' %
|
| + (total_bytes_uploaded, file_length),
|
| + ResumableTransferDisposition.ABORT)
|
| resp = http_conn.getresponse()
|
| body = resp.read()
|
| # Restore http connection debug level.
|
| http_conn.set_debuglevel(conn.debug)
|
|
|
| - additional_note = ''
|
| if resp.status == 200:
|
| return resp.getheader('etag') # Success
|
| - # Retry status 503 errors after a delay.
|
| - elif resp.status == 503:
|
| + # Retry timeout (408) and status 500 and 503 errors after a delay.
|
| + elif resp.status in [408, 500, 503]:
|
| disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY
|
| - elif resp.status == 500:
|
| - disposition = ResumableTransferDisposition.ABORT
|
| - additional_note = ('This can happen if you attempt to upload a '
|
| - 'different size file on a already partially '
|
| - 'uploaded resumable upload')
|
| else:
|
| + # Catch all for any other error codes.
|
| disposition = ResumableTransferDisposition.ABORT
|
| raise ResumableUploadException('Got response code %d while attempting '
|
| - 'upload (%s)%s' %
|
| - (resp.status, resp.reason,
|
| - additional_note), disposition)
|
| + 'upload (%s)' %
|
| + (resp.status, resp.reason), disposition)
|
|
|
| def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
|
| num_cb):
|
| @@ -372,8 +392,9 @@ class ResumableUploadHandler(object):
|
| # Try to resume existing resumable upload.
|
| try:
|
| (server_start, server_end) = (
|
| - self._query_server_state(conn, file_length))
|
| + self._query_server_pos(conn, file_length))
|
| self.server_has_bytes = server_start
|
| + key=key
|
| if conn.debug >= 1:
|
| print 'Resuming transfer.'
|
| except ResumableUploadException, e:
|
| @@ -390,8 +411,16 @@ class ResumableUploadHandler(object):
|
| self.upload_start_point = server_end
|
|
|
| if server_end == file_length:
|
| - return # Done.
|
| - total_bytes_uploaded = server_end + 1
|
| + # Boundary condition: complete file was already uploaded (e.g.,
|
| + # user interrupted a previous upload attempt after the upload
|
| + # completed but before the gsutil tracker file was deleted). Set
|
| + # total_bytes_uploaded to server_end so we'll attempt to upload
|
| + # no more bytes but will still make final HTTP request and get
|
| + # back the response (which contains the etag we need to compare
|
| + # at the end).
|
| + total_bytes_uploaded = server_end
|
| + else:
|
| + total_bytes_uploaded = server_end + 1
|
| fp.seek(total_bytes_uploaded)
|
| conn = key.bucket.connection
|
|
|
| @@ -409,6 +438,15 @@ class ResumableUploadHandler(object):
|
| try:
|
| return self._upload_file_bytes(conn, http_conn, fp, file_length,
|
| total_bytes_uploaded, cb, num_cb)
|
| + except (ResumableUploadException, socket.error):
|
| + resp = self._query_server_state(conn, file_length)
|
| + if resp.status == 400:
|
| + raise ResumableUploadException('Got 400 response from server '
|
| + 'state query after failed resumable upload attempt. This '
|
| + 'can happen if the file size changed between upload '
|
| + 'attempts', ResumableTransferDisposition.ABORT)
|
| + else:
|
| + raise
|
| finally:
|
| http_conn.close()
|
|
|
| @@ -494,11 +532,28 @@ class ResumableUploadHandler(object):
|
| except self.RETRYABLE_EXCEPTIONS, e:
|
| if debug >= 1:
|
| print('Caught exception (%s)' % e.__repr__())
|
| + if isinstance(e, IOError) and e.errno == errno.EPIPE:
|
| + # Broken pipe error causes httplib to immediately
|
| + # close the socket (http://bugs.python.org/issue5542),
|
| + # so we need to close the connection before we resume
|
| + # the upload (which will cause a new connection to be
|
| + # opened the next time an HTTP request is sent).
|
| + key.bucket.connection.connection.close()
|
| except ResumableUploadException, e:
|
| - if e.disposition == ResumableTransferDisposition.ABORT:
|
| + if (e.disposition ==
|
| + ResumableTransferDisposition.ABORT_CUR_PROCESS):
|
| + if debug >= 1:
|
| + print('Caught non-retryable ResumableUploadException '
|
| + '(%s); aborting but retaining tracker file' %
|
| + e.message)
|
| + raise
|
| + elif (e.disposition ==
|
| + ResumableTransferDisposition.ABORT):
|
| if debug >= 1:
|
| print('Caught non-retryable ResumableUploadException '
|
| - '(%s)' % e.message)
|
| + '(%s); aborting and removing tracker file' %
|
| + e.message)
|
| + self._remove_tracker_file()
|
| raise
|
| else:
|
| if debug >= 1:
|
| @@ -516,11 +571,12 @@ class ResumableUploadHandler(object):
|
| raise ResumableUploadException(
|
| 'Too many resumable upload attempts failed without '
|
| 'progress. You might try this upload again later',
|
| - ResumableTransferDisposition.ABORT)
|
| + ResumableTransferDisposition.ABORT_CUR_PROCESS)
|
|
|
| - sleep_time_secs = 2**progress_less_iterations
|
| + # Use binary exponential backoff to desynchronize client requests
|
| + sleep_time_secs = random.random() * (2**progress_less_iterations)
|
| if debug >= 1:
|
| print ('Got retryable failure (%d progress-less in a row).\n'
|
| - 'Sleeping %d seconds before re-trying' %
|
| + 'Sleeping %3.1f seconds before re-trying' %
|
| (progress_less_iterations, sleep_time_secs))
|
| time.sleep(sleep_time_secs)
|
|
|