Index: boto/gs/resumable_upload_handler.py |
diff --git a/boto/gs/resumable_upload_handler.py b/boto/gs/resumable_upload_handler.py |
index e8d5b0376d0eac925a1eb8c458867eb5143b0a20..a60d91d5da1626736fc3f437e4f1208e6d33da15 100644 |
--- a/boto/gs/resumable_upload_handler.py |
+++ b/boto/gs/resumable_upload_handler.py |
@@ -23,6 +23,7 @@ import cgi |
import errno |
import httplib |
import os |
+import random |
import re |
import socket |
import time |
@@ -35,13 +36,14 @@ from boto.exception import ResumableTransferDisposition |
from boto.exception import ResumableUploadException |
""" |
-Handler for Google Storage resumable uploads. See |
+Handler for Google Cloud Storage resumable uploads. See |
http://code.google.com/apis/storage/docs/developer-guide.html#resumable |
for details. |
Resumable uploads will retry failed uploads, resuming at the byte |
count completed by the last upload attempt. If too many retries happen with |
-no progress (per configurable num_retries param), the upload will be aborted. |
+no progress (per configurable num_retries param), the upload will be |
+aborted in the current process. |
The caller can optionally specify a tracker_file_name param in the |
ResumableUploadHandler constructor. If you do this, that file will |
@@ -169,14 +171,13 @@ class ResumableUploadHandler(object): |
def _query_server_state(self, conn, file_length): |
""" |
- Queries server to find out what bytes it currently has. |
+ Queries server to find out state of given upload. |
Note that this method really just makes special case use of the |
fact that the upload server always returns the current start/end |
state whenever a PUT doesn't complete. |
- Returns (server_start, server_end), where the values are inclusive. |
- For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2. |
+ Returns HTTP response from sending request. |
Raises ResumableUploadException if problem querying server. |
""" |
@@ -186,11 +187,22 @@ class ResumableUploadHandler(object): |
put_headers['Content-Range'] = ( |
self._build_content_range_header('*', file_length)) |
put_headers['Content-Length'] = '0' |
- resp = AWSAuthConnection.make_request(conn, 'PUT', |
+ return AWSAuthConnection.make_request(conn, 'PUT', |
path=self.tracker_uri_path, |
auth_path=self.tracker_uri_path, |
headers=put_headers, |
host=self.tracker_uri_host) |
+ |
+ def _query_server_pos(self, conn, file_length): |
+ """ |
+ Queries server to find out what bytes it currently has. |
+ |
+ Returns (server_start, server_end), where the values are inclusive. |
+ For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2. |
+ |
+ Raises ResumableUploadException if problem querying server. |
+ """ |
+ resp = self._query_server_state(conn, file_length) |
if resp.status == 200: |
return (0, file_length) # Completed upload. |
if resp.status != 308: |
@@ -260,11 +272,22 @@ class ResumableUploadHandler(object): |
'POST', key.bucket.name, key.name, post_headers) |
# Get tracker URI from response 'Location' header. |
body = resp.read() |
- # Check for '201 Created' response code. |
- if resp.status != 201: |
+ |
+ # Check for various status conditions. |
+ if resp.status in [500, 503]: |
+ # Retry status 500 and 503 errors after a delay. |
raise ResumableUploadException( |
- 'Got status %d from attempt to start resumable upload' % |
- resp.status, ResumableTransferDisposition.WAIT_BEFORE_RETRY) |
+ 'Got status %d from attempt to start resumable upload. ' |
+ 'Will wait/retry' % resp.status, |
+ ResumableTransferDisposition.WAIT_BEFORE_RETRY) |
+ elif resp.status != 200 and resp.status != 201: |
+ raise ResumableUploadException( |
+ 'Got status %d from attempt to start resumable upload. ' |
+ 'Aborting' % resp.status, |
+ ResumableTransferDisposition.ABORT) |
+ |
+ # Else we got 200 or 201 response code, indicating the resumable |
+ # upload was created. |
tracker_uri = resp.getheader('Location') |
if not tracker_uri: |
raise ResumableUploadException( |
@@ -330,32 +353,29 @@ class ResumableUploadHandler(object): |
if cb: |
cb(total_bytes_uploaded, file_length) |
if total_bytes_uploaded != file_length: |
- raise ResumableUploadException('File changed during upload: EOF at ' |
- '%d bytes of %d byte file.' % |
- (total_bytes_uploaded, file_length), |
- ResumableTransferDisposition.ABORT) |
+ # Abort (and delete the tracker file) so if the user retries |
+ # they'll start a new resumable upload rather than potentially |
+ # attempting to pick back up later where we left off. |
+ raise ResumableUploadException( |
+ 'File changed during upload: EOF at %d bytes of %d byte file.' % |
+ (total_bytes_uploaded, file_length), |
+ ResumableTransferDisposition.ABORT) |
resp = http_conn.getresponse() |
body = resp.read() |
# Restore http connection debug level. |
http_conn.set_debuglevel(conn.debug) |
- additional_note = '' |
if resp.status == 200: |
return resp.getheader('etag') # Success |
- # Retry status 503 errors after a delay. |
- elif resp.status == 503: |
+ # Retry timeout (408) and status 500 and 503 errors after a delay. |
+ elif resp.status in [408, 500, 503]: |
disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY |
- elif resp.status == 500: |
- disposition = ResumableTransferDisposition.ABORT |
- additional_note = ('This can happen if you attempt to upload a ' |
- 'different size file on a already partially ' |
- 'uploaded resumable upload') |
else: |
+ # Catch all for any other error codes. |
disposition = ResumableTransferDisposition.ABORT |
raise ResumableUploadException('Got response code %d while attempting ' |
- 'upload (%s)%s' % |
- (resp.status, resp.reason, |
- additional_note), disposition) |
+ 'upload (%s)' % |
+ (resp.status, resp.reason), disposition) |
def _attempt_resumable_upload(self, key, fp, file_length, headers, cb, |
num_cb): |
@@ -372,8 +392,9 @@ class ResumableUploadHandler(object): |
# Try to resume existing resumable upload. |
try: |
(server_start, server_end) = ( |
- self._query_server_state(conn, file_length)) |
+ self._query_server_pos(conn, file_length)) |
self.server_has_bytes = server_start |
+ key=key |
if conn.debug >= 1: |
print 'Resuming transfer.' |
except ResumableUploadException, e: |
@@ -390,8 +411,16 @@ class ResumableUploadHandler(object): |
self.upload_start_point = server_end |
if server_end == file_length: |
- return # Done. |
- total_bytes_uploaded = server_end + 1 |
+ # Boundary condition: complete file was already uploaded (e.g., |
+ # user interrupted a previous upload attempt after the upload |
+ # completed but before the gsutil tracker file was deleted). Set |
+ # total_bytes_uploaded to server_end so we'll attempt to upload |
+ # no more bytes but will still make final HTTP request and get |
+ # back the response (which contains the etag we need to compare |
+ # at the end). |
+ total_bytes_uploaded = server_end |
+ else: |
+ total_bytes_uploaded = server_end + 1 |
fp.seek(total_bytes_uploaded) |
conn = key.bucket.connection |
@@ -409,6 +438,15 @@ class ResumableUploadHandler(object): |
try: |
return self._upload_file_bytes(conn, http_conn, fp, file_length, |
total_bytes_uploaded, cb, num_cb) |
+ except (ResumableUploadException, socket.error): |
+ resp = self._query_server_state(conn, file_length) |
+ if resp.status == 400: |
+ raise ResumableUploadException('Got 400 response from server ' |
+ 'state query after failed resumable upload attempt. This ' |
+ 'can happen if the file size changed between upload ' |
+ 'attempts', ResumableTransferDisposition.ABORT) |
+ else: |
+ raise |
finally: |
http_conn.close() |
@@ -494,11 +532,28 @@ class ResumableUploadHandler(object): |
except self.RETRYABLE_EXCEPTIONS, e: |
if debug >= 1: |
print('Caught exception (%s)' % e.__repr__()) |
+ if isinstance(e, IOError) and e.errno == errno.EPIPE: |
+ # Broken pipe error causes httplib to immediately |
+ # close the socket (http://bugs.python.org/issue5542), |
+ # so we need to close the connection before we resume |
+ # the upload (which will cause a new connection to be |
+ # opened the next time an HTTP request is sent). |
+ key.bucket.connection.connection.close() |
except ResumableUploadException, e: |
- if e.disposition == ResumableTransferDisposition.ABORT: |
+ if (e.disposition == |
+ ResumableTransferDisposition.ABORT_CUR_PROCESS): |
+ if debug >= 1: |
+ print('Caught non-retryable ResumableUploadException ' |
+ '(%s); aborting but retaining tracker file' % |
+ e.message) |
+ raise |
+ elif (e.disposition == |
+ ResumableTransferDisposition.ABORT): |
if debug >= 1: |
print('Caught non-retryable ResumableUploadException ' |
- '(%s)' % e.message) |
+ '(%s); aborting and removing tracker file' % |
+ e.message) |
+ self._remove_tracker_file() |
raise |
else: |
if debug >= 1: |
@@ -516,11 +571,12 @@ class ResumableUploadHandler(object): |
raise ResumableUploadException( |
'Too many resumable upload attempts failed without ' |
'progress. You might try this upload again later', |
- ResumableTransferDisposition.ABORT) |
+ ResumableTransferDisposition.ABORT_CUR_PROCESS) |
- sleep_time_secs = 2**progress_less_iterations |
+ # Use binary exponential backoff to desynchronize client requests |
+ sleep_time_secs = random.random() * (2**progress_less_iterations) |
if debug >= 1: |
print ('Got retryable failure (%d progress-less in a row).\n' |
- 'Sleeping %d seconds before re-trying' % |
+ 'Sleeping %3.1f seconds before re-trying' % |
(progress_less_iterations, sleep_time_secs)) |
time.sleep(sleep_time_secs) |