Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(592)

Unified Diff: boto/gs/resumable_upload_handler.py

Issue 8386013: Merging in latest boto. (Closed) Base URL: svn://svn.chromium.org/boto
Patch Set: Redoing vendor drop by deleting and then merging. Created 9 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « boto/gs/key.py ('k') | boto/https_connection.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: boto/gs/resumable_upload_handler.py
diff --git a/boto/gs/resumable_upload_handler.py b/boto/gs/resumable_upload_handler.py
index e8d5b0376d0eac925a1eb8c458867eb5143b0a20..a60d91d5da1626736fc3f437e4f1208e6d33da15 100644
--- a/boto/gs/resumable_upload_handler.py
+++ b/boto/gs/resumable_upload_handler.py
@@ -23,6 +23,7 @@ import cgi
import errno
import httplib
import os
+import random
import re
import socket
import time
@@ -35,13 +36,14 @@ from boto.exception import ResumableTransferDisposition
from boto.exception import ResumableUploadException
"""
-Handler for Google Storage resumable uploads. See
+Handler for Google Cloud Storage resumable uploads. See
http://code.google.com/apis/storage/docs/developer-guide.html#resumable
for details.
Resumable uploads will retry failed uploads, resuming at the byte
count completed by the last upload attempt. If too many retries happen with
-no progress (per configurable num_retries param), the upload will be aborted.
+no progress (per configurable num_retries param), the upload will be
+aborted in the current process.
The caller can optionally specify a tracker_file_name param in the
ResumableUploadHandler constructor. If you do this, that file will
@@ -169,14 +171,13 @@ class ResumableUploadHandler(object):
def _query_server_state(self, conn, file_length):
"""
- Queries server to find out what bytes it currently has.
+ Queries server to find out state of given upload.
Note that this method really just makes special case use of the
fact that the upload server always returns the current start/end
state whenever a PUT doesn't complete.
- Returns (server_start, server_end), where the values are inclusive.
- For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2.
+ Returns HTTP response from sending request.
Raises ResumableUploadException if problem querying server.
"""
@@ -186,11 +187,22 @@ class ResumableUploadHandler(object):
put_headers['Content-Range'] = (
self._build_content_range_header('*', file_length))
put_headers['Content-Length'] = '0'
- resp = AWSAuthConnection.make_request(conn, 'PUT',
+ return AWSAuthConnection.make_request(conn, 'PUT',
path=self.tracker_uri_path,
auth_path=self.tracker_uri_path,
headers=put_headers,
host=self.tracker_uri_host)
+
+ def _query_server_pos(self, conn, file_length):
+ """
+ Queries server to find out what bytes it currently has.
+
+ Returns (server_start, server_end), where the values are inclusive.
+ For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2.
+
+ Raises ResumableUploadException if problem querying server.
+ """
+ resp = self._query_server_state(conn, file_length)
if resp.status == 200:
return (0, file_length) # Completed upload.
if resp.status != 308:
@@ -260,11 +272,22 @@ class ResumableUploadHandler(object):
'POST', key.bucket.name, key.name, post_headers)
# Get tracker URI from response 'Location' header.
body = resp.read()
- # Check for '201 Created' response code.
- if resp.status != 201:
+
+ # Check for various status conditions.
+ if resp.status in [500, 503]:
+ # Retry status 500 and 503 errors after a delay.
raise ResumableUploadException(
- 'Got status %d from attempt to start resumable upload' %
- resp.status, ResumableTransferDisposition.WAIT_BEFORE_RETRY)
+ 'Got status %d from attempt to start resumable upload. '
+ 'Will wait/retry' % resp.status,
+ ResumableTransferDisposition.WAIT_BEFORE_RETRY)
+ elif resp.status != 200 and resp.status != 201:
+ raise ResumableUploadException(
+ 'Got status %d from attempt to start resumable upload. '
+ 'Aborting' % resp.status,
+ ResumableTransferDisposition.ABORT)
+
+ # Else we got 200 or 201 response code, indicating the resumable
+ # upload was created.
tracker_uri = resp.getheader('Location')
if not tracker_uri:
raise ResumableUploadException(
@@ -330,32 +353,29 @@ class ResumableUploadHandler(object):
if cb:
cb(total_bytes_uploaded, file_length)
if total_bytes_uploaded != file_length:
- raise ResumableUploadException('File changed during upload: EOF at '
- '%d bytes of %d byte file.' %
- (total_bytes_uploaded, file_length),
- ResumableTransferDisposition.ABORT)
+ # Abort (and delete the tracker file) so if the user retries
+ # they'll start a new resumable upload rather than potentially
+ # attempting to pick back up later where we left off.
+ raise ResumableUploadException(
+ 'File changed during upload: EOF at %d bytes of %d byte file.' %
+ (total_bytes_uploaded, file_length),
+ ResumableTransferDisposition.ABORT)
resp = http_conn.getresponse()
body = resp.read()
# Restore http connection debug level.
http_conn.set_debuglevel(conn.debug)
- additional_note = ''
if resp.status == 200:
return resp.getheader('etag') # Success
- # Retry status 503 errors after a delay.
- elif resp.status == 503:
+ # Retry timeout (408) and status 500 and 503 errors after a delay.
+ elif resp.status in [408, 500, 503]:
disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY
- elif resp.status == 500:
- disposition = ResumableTransferDisposition.ABORT
- additional_note = ('This can happen if you attempt to upload a '
- 'different size file on a already partially '
- 'uploaded resumable upload')
else:
+ # Catch all for any other error codes.
disposition = ResumableTransferDisposition.ABORT
raise ResumableUploadException('Got response code %d while attempting '
- 'upload (%s)%s' %
- (resp.status, resp.reason,
- additional_note), disposition)
+ 'upload (%s)' %
+ (resp.status, resp.reason), disposition)
def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
num_cb):
@@ -372,8 +392,9 @@ class ResumableUploadHandler(object):
# Try to resume existing resumable upload.
try:
(server_start, server_end) = (
- self._query_server_state(conn, file_length))
+ self._query_server_pos(conn, file_length))
self.server_has_bytes = server_start
+ key=key
if conn.debug >= 1:
print 'Resuming transfer.'
except ResumableUploadException, e:
@@ -390,8 +411,16 @@ class ResumableUploadHandler(object):
self.upload_start_point = server_end
if server_end == file_length:
- return # Done.
- total_bytes_uploaded = server_end + 1
+ # Boundary condition: complete file was already uploaded (e.g.,
+ # user interrupted a previous upload attempt after the upload
+ # completed but before the gsutil tracker file was deleted). Set
+ # total_bytes_uploaded to server_end so we'll attempt to upload
+ # no more bytes but will still make final HTTP request and get
+ # back the response (which contains the etag we need to compare
+ # at the end).
+ total_bytes_uploaded = server_end
+ else:
+ total_bytes_uploaded = server_end + 1
fp.seek(total_bytes_uploaded)
conn = key.bucket.connection
@@ -409,6 +438,15 @@ class ResumableUploadHandler(object):
try:
return self._upload_file_bytes(conn, http_conn, fp, file_length,
total_bytes_uploaded, cb, num_cb)
+ except (ResumableUploadException, socket.error):
+ resp = self._query_server_state(conn, file_length)
+ if resp.status == 400:
+ raise ResumableUploadException('Got 400 response from server '
+ 'state query after failed resumable upload attempt. This '
+ 'can happen if the file size changed between upload '
+ 'attempts', ResumableTransferDisposition.ABORT)
+ else:
+ raise
finally:
http_conn.close()
@@ -494,11 +532,28 @@ class ResumableUploadHandler(object):
except self.RETRYABLE_EXCEPTIONS, e:
if debug >= 1:
print('Caught exception (%s)' % e.__repr__())
+ if isinstance(e, IOError) and e.errno == errno.EPIPE:
+ # Broken pipe error causes httplib to immediately
+ # close the socket (http://bugs.python.org/issue5542),
+ # so we need to close the connection before we resume
+ # the upload (which will cause a new connection to be
+ # opened the next time an HTTP request is sent).
+ key.bucket.connection.connection.close()
except ResumableUploadException, e:
- if e.disposition == ResumableTransferDisposition.ABORT:
+ if (e.disposition ==
+ ResumableTransferDisposition.ABORT_CUR_PROCESS):
+ if debug >= 1:
+ print('Caught non-retryable ResumableUploadException '
+ '(%s); aborting but retaining tracker file' %
+ e.message)
+ raise
+ elif (e.disposition ==
+ ResumableTransferDisposition.ABORT):
if debug >= 1:
print('Caught non-retryable ResumableUploadException '
- '(%s)' % e.message)
+ '(%s); aborting and removing tracker file' %
+ e.message)
+ self._remove_tracker_file()
raise
else:
if debug >= 1:
@@ -516,11 +571,12 @@ class ResumableUploadHandler(object):
raise ResumableUploadException(
'Too many resumable upload attempts failed without '
'progress. You might try this upload again later',
- ResumableTransferDisposition.ABORT)
+ ResumableTransferDisposition.ABORT_CUR_PROCESS)
- sleep_time_secs = 2**progress_less_iterations
+ # Use binary exponential backoff to desynchronize client requests
+ sleep_time_secs = random.random() * (2**progress_less_iterations)
if debug >= 1:
print ('Got retryable failure (%d progress-less in a row).\n'
- 'Sleeping %d seconds before re-trying' %
+ 'Sleeping %3.1f seconds before re-trying' %
(progress_less_iterations, sleep_time_secs))
time.sleep(sleep_time_secs)
« no previous file with comments | « boto/gs/key.py ('k') | boto/https_connection.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698