Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(66)

Side by Side Diff: boto/gs/resumable_upload_handler.py

Issue 8386013: Merging in latest boto. (Closed) Base URL: svn://svn.chromium.org/boto
Patch Set: Redoing vendor drop by deleting and then merging. Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « boto/gs/key.py ('k') | boto/https_connection.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2010 Google Inc. 1 # Copyright 2010 Google Inc.
2 # 2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a 3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the 4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including 5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis- 6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit 7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol- 8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions: 9 # lowing conditions:
10 # 10 #
11 # The above copyright notice and this permission notice shall be included 11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software. 12 # in all copies or substantial portions of the Software.
13 # 13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE. 20 # IN THE SOFTWARE.
21 21
22 import cgi 22 import cgi
23 import errno 23 import errno
24 import httplib 24 import httplib
25 import os 25 import os
26 import random
26 import re 27 import re
27 import socket 28 import socket
28 import time 29 import time
29 import urlparse 30 import urlparse
30 import boto 31 import boto
31 from boto import config 32 from boto import config
32 from boto.connection import AWSAuthConnection 33 from boto.connection import AWSAuthConnection
33 from boto.exception import InvalidUriError 34 from boto.exception import InvalidUriError
34 from boto.exception import ResumableTransferDisposition 35 from boto.exception import ResumableTransferDisposition
35 from boto.exception import ResumableUploadException 36 from boto.exception import ResumableUploadException
36 37
37 """ 38 """
38 Handler for Google Storage resumable uploads. See 39 Handler for Google Cloud Storage resumable uploads. See
39 http://code.google.com/apis/storage/docs/developer-guide.html#resumable 40 http://code.google.com/apis/storage/docs/developer-guide.html#resumable
40 for details. 41 for details.
41 42
42 Resumable uploads will retry failed uploads, resuming at the byte 43 Resumable uploads will retry failed uploads, resuming at the byte
43 count completed by the last upload attempt. If too many retries happen with 44 count completed by the last upload attempt. If too many retries happen with
44 no progress (per configurable num_retries param), the upload will be aborted. 45 no progress (per configurable num_retries param), the upload will be
46 aborted in the current process.
45 47
46 The caller can optionally specify a tracker_file_name param in the 48 The caller can optionally specify a tracker_file_name param in the
47 ResumableUploadHandler constructor. If you do this, that file will 49 ResumableUploadHandler constructor. If you do this, that file will
48 save the state needed to allow retrying later, in a separate process 50 save the state needed to allow retrying later, in a separate process
49 (e.g., in a later run of gsutil). 51 (e.g., in a later run of gsutil).
50 """ 52 """
51 53
52 54
53 class ResumableUploadHandler(object): 55 class ResumableUploadHandler(object):
54 56
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after
162 def _remove_tracker_file(self): 164 def _remove_tracker_file(self):
163 if (self.tracker_file_name and 165 if (self.tracker_file_name and
164 os.path.exists(self.tracker_file_name)): 166 os.path.exists(self.tracker_file_name)):
165 os.unlink(self.tracker_file_name) 167 os.unlink(self.tracker_file_name)
166 168
167 def _build_content_range_header(self, range_spec='*', length_spec='*'): 169 def _build_content_range_header(self, range_spec='*', length_spec='*'):
168 return 'bytes %s/%s' % (range_spec, length_spec) 170 return 'bytes %s/%s' % (range_spec, length_spec)
169 171
170 def _query_server_state(self, conn, file_length): 172 def _query_server_state(self, conn, file_length):
171 """ 173 """
172 Queries server to find out what bytes it currently has. 174 Queries server to find out state of given upload.
173 175
174 Note that this method really just makes special case use of the 176 Note that this method really just makes special case use of the
175 fact that the upload server always returns the current start/end 177 fact that the upload server always returns the current start/end
176 state whenever a PUT doesn't complete. 178 state whenever a PUT doesn't complete.
177 179
178 Returns (server_start, server_end), where the values are inclusive. 180 Returns HTTP response from sending request.
179 For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2.
180 181
181 Raises ResumableUploadException if problem querying server. 182 Raises ResumableUploadException if problem querying server.
182 """ 183 """
183 # Send an empty PUT so that server replies with this resumable 184 # Send an empty PUT so that server replies with this resumable
184 # transfer's state. 185 # transfer's state.
185 put_headers = {} 186 put_headers = {}
186 put_headers['Content-Range'] = ( 187 put_headers['Content-Range'] = (
187 self._build_content_range_header('*', file_length)) 188 self._build_content_range_header('*', file_length))
188 put_headers['Content-Length'] = '0' 189 put_headers['Content-Length'] = '0'
189 resp = AWSAuthConnection.make_request(conn, 'PUT', 190 return AWSAuthConnection.make_request(conn, 'PUT',
190 path=self.tracker_uri_path, 191 path=self.tracker_uri_path,
191 auth_path=self.tracker_uri_path, 192 auth_path=self.tracker_uri_path,
192 headers=put_headers, 193 headers=put_headers,
193 host=self.tracker_uri_host) 194 host=self.tracker_uri_host)
195
196 def _query_server_pos(self, conn, file_length):
197 """
198 Queries server to find out what bytes it currently has.
199
200 Returns (server_start, server_end), where the values are inclusive.
201 For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2.
202
203 Raises ResumableUploadException if problem querying server.
204 """
205 resp = self._query_server_state(conn, file_length)
194 if resp.status == 200: 206 if resp.status == 200:
195 return (0, file_length) # Completed upload. 207 return (0, file_length) # Completed upload.
196 if resp.status != 308: 208 if resp.status != 308:
197 # This means the server didn't have any state for the given 209 # This means the server didn't have any state for the given
198 # upload ID, which can happen (for example) if the caller saved 210 # upload ID, which can happen (for example) if the caller saved
199 # the tracker URI to a file and then tried to restart the transfer 211 # the tracker URI to a file and then tried to restart the transfer
200 # after that upload ID has gone stale. In that case we need to 212 # after that upload ID has gone stale. In that case we need to
201 # start a new transfer (and the caller will then save the new 213 # start a new transfer (and the caller will then save the new
202 # tracker URI to the tracker file). 214 # tracker URI to the tracker file).
203 raise ResumableUploadException( 215 raise ResumableUploadException(
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
253 raise ResumableUploadException( 265 raise ResumableUploadException(
254 'Attempt to specify Content-Length header (disallowed)', 266 'Attempt to specify Content-Length header (disallowed)',
255 ResumableTransferDisposition.ABORT) 267 ResumableTransferDisposition.ABORT)
256 post_headers[k] = headers[k] 268 post_headers[k] = headers[k]
257 post_headers[conn.provider.resumable_upload_header] = 'start' 269 post_headers[conn.provider.resumable_upload_header] = 'start'
258 270
259 resp = conn.make_request( 271 resp = conn.make_request(
260 'POST', key.bucket.name, key.name, post_headers) 272 'POST', key.bucket.name, key.name, post_headers)
261 # Get tracker URI from response 'Location' header. 273 # Get tracker URI from response 'Location' header.
262 body = resp.read() 274 body = resp.read()
263 # Check for '201 Created' response code. 275
264 if resp.status != 201: 276 # Check for various status conditions.
277 if resp.status in [500, 503]:
278 # Retry status 500 and 503 errors after a delay.
265 raise ResumableUploadException( 279 raise ResumableUploadException(
266 'Got status %d from attempt to start resumable upload' % 280 'Got status %d from attempt to start resumable upload. '
267 resp.status, ResumableTransferDisposition.WAIT_BEFORE_RETRY) 281 'Will wait/retry' % resp.status,
282 ResumableTransferDisposition.WAIT_BEFORE_RETRY)
283 elif resp.status != 200 and resp.status != 201:
284 raise ResumableUploadException(
285 'Got status %d from attempt to start resumable upload. '
286 'Aborting' % resp.status,
287 ResumableTransferDisposition.ABORT)
288
289 # Else we got 200 or 201 response code, indicating the resumable
290 # upload was created.
268 tracker_uri = resp.getheader('Location') 291 tracker_uri = resp.getheader('Location')
269 if not tracker_uri: 292 if not tracker_uri:
270 raise ResumableUploadException( 293 raise ResumableUploadException(
271 'No resumable tracker URI found in resumable initiation ' 294 'No resumable tracker URI found in resumable initiation '
272 'POST response (%s)' % body, 295 'POST response (%s)' % body,
273 ResumableTransferDisposition.WAIT_BEFORE_RETRY) 296 ResumableTransferDisposition.WAIT_BEFORE_RETRY)
274 self._set_tracker_uri(tracker_uri) 297 self._set_tracker_uri(tracker_uri)
275 self._save_tracker_uri_to_file() 298 self._save_tracker_uri_to_file()
276 299
277 def _upload_file_bytes(self, conn, http_conn, fp, file_length, 300 def _upload_file_bytes(self, conn, http_conn, fp, file_length,
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
323 total_bytes_uploaded += len(buf) 346 total_bytes_uploaded += len(buf)
324 if cb: 347 if cb:
325 i += 1 348 i += 1
326 if i == cb_count or cb_count == -1: 349 if i == cb_count or cb_count == -1:
327 cb(total_bytes_uploaded, file_length) 350 cb(total_bytes_uploaded, file_length)
328 i = 0 351 i = 0
329 buf = fp.read(self.BUFFER_SIZE) 352 buf = fp.read(self.BUFFER_SIZE)
330 if cb: 353 if cb:
331 cb(total_bytes_uploaded, file_length) 354 cb(total_bytes_uploaded, file_length)
332 if total_bytes_uploaded != file_length: 355 if total_bytes_uploaded != file_length:
333 raise ResumableUploadException('File changed during upload: EOF at ' 356 # Abort (and delete the tracker file) so if the user retries
334 '%d bytes of %d byte file.' % 357 # they'll start a new resumable upload rather than potentially
335 (total_bytes_uploaded, file_length), 358 # attempting to pick back up later where we left off.
336 ResumableTransferDisposition.ABORT) 359 raise ResumableUploadException(
360 'File changed during upload: EOF at %d bytes of %d byte file.' %
361 (total_bytes_uploaded, file_length),
362 ResumableTransferDisposition.ABORT)
337 resp = http_conn.getresponse() 363 resp = http_conn.getresponse()
338 body = resp.read() 364 body = resp.read()
339 # Restore http connection debug level. 365 # Restore http connection debug level.
340 http_conn.set_debuglevel(conn.debug) 366 http_conn.set_debuglevel(conn.debug)
341 367
342 additional_note = ''
343 if resp.status == 200: 368 if resp.status == 200:
344 return resp.getheader('etag') # Success 369 return resp.getheader('etag') # Success
345 # Retry status 503 errors after a delay. 370 # Retry timeout (408) and status 500 and 503 errors after a delay.
346 elif resp.status == 503: 371 elif resp.status in [408, 500, 503]:
347 disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY 372 disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY
348 elif resp.status == 500:
349 disposition = ResumableTransferDisposition.ABORT
350 additional_note = ('This can happen if you attempt to upload a '
351 'different size file on a already partially '
352 'uploaded resumable upload')
353 else: 373 else:
374 # Catch all for any other error codes.
354 disposition = ResumableTransferDisposition.ABORT 375 disposition = ResumableTransferDisposition.ABORT
355 raise ResumableUploadException('Got response code %d while attempting ' 376 raise ResumableUploadException('Got response code %d while attempting '
356 'upload (%s)%s' % 377 'upload (%s)' %
357 (resp.status, resp.reason, 378 (resp.status, resp.reason), disposition)
358 additional_note), disposition)
359 379
360 def _attempt_resumable_upload(self, key, fp, file_length, headers, cb, 380 def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
361 num_cb): 381 num_cb):
362 """ 382 """
363 Attempts a resumable upload. 383 Attempts a resumable upload.
364 384
365 Returns etag from server upon success. 385 Returns etag from server upon success.
366 386
367 Raises ResumableUploadException if any problems occur. 387 Raises ResumableUploadException if any problems occur.
368 """ 388 """
369 (server_start, server_end) = self.SERVER_HAS_NOTHING 389 (server_start, server_end) = self.SERVER_HAS_NOTHING
370 conn = key.bucket.connection 390 conn = key.bucket.connection
371 if self.tracker_uri: 391 if self.tracker_uri:
372 # Try to resume existing resumable upload. 392 # Try to resume existing resumable upload.
373 try: 393 try:
374 (server_start, server_end) = ( 394 (server_start, server_end) = (
375 self._query_server_state(conn, file_length)) 395 self._query_server_pos(conn, file_length))
376 self.server_has_bytes = server_start 396 self.server_has_bytes = server_start
397 key=key
377 if conn.debug >= 1: 398 if conn.debug >= 1:
378 print 'Resuming transfer.' 399 print 'Resuming transfer.'
379 except ResumableUploadException, e: 400 except ResumableUploadException, e:
380 if conn.debug >= 1: 401 if conn.debug >= 1:
381 print 'Unable to resume transfer (%s).' % e.message 402 print 'Unable to resume transfer (%s).' % e.message
382 self._start_new_resumable_upload(key, headers) 403 self._start_new_resumable_upload(key, headers)
383 else: 404 else:
384 self._start_new_resumable_upload(key, headers) 405 self._start_new_resumable_upload(key, headers)
385 406
386 # upload_start_point allows the code that instantiated the 407 # upload_start_point allows the code that instantiated the
387 # ResumableUploadHandler to find out the point from which it started 408 # ResumableUploadHandler to find out the point from which it started
388 # uploading (e.g., so it can correctly compute throughput). 409 # uploading (e.g., so it can correctly compute throughput).
389 if self.upload_start_point is None: 410 if self.upload_start_point is None:
390 self.upload_start_point = server_end 411 self.upload_start_point = server_end
391 412
392 if server_end == file_length: 413 if server_end == file_length:
393 return # Done. 414 # Boundary condition: complete file was already uploaded (e.g.,
394 total_bytes_uploaded = server_end + 1 415 # user interrupted a previous upload attempt after the upload
416 # completed but before the gsutil tracker file was deleted). Set
417 # total_bytes_uploaded to server_end so we'll attempt to upload
418 # no more bytes but will still make final HTTP request and get
419 # back the response (which contains the etag we need to compare
420 # at the end).
421 total_bytes_uploaded = server_end
422 else:
423 total_bytes_uploaded = server_end + 1
395 fp.seek(total_bytes_uploaded) 424 fp.seek(total_bytes_uploaded)
396 conn = key.bucket.connection 425 conn = key.bucket.connection
397 426
398 # Get a new HTTP connection (vs conn.get_http_connection(), which reuses 427 # Get a new HTTP connection (vs conn.get_http_connection(), which reuses
399 # pool connections) because httplib requires a new HTTP connection per 428 # pool connections) because httplib requires a new HTTP connection per
400 # transaction. (Without this, calling http_conn.getresponse() would get 429 # transaction. (Without this, calling http_conn.getresponse() would get
401 # "ResponseNotReady".) 430 # "ResponseNotReady".)
402 http_conn = conn.new_http_connection(self.tracker_uri_host, 431 http_conn = conn.new_http_connection(self.tracker_uri_host,
403 conn.is_secure) 432 conn.is_secure)
404 http_conn.set_debuglevel(conn.debug) 433 http_conn.set_debuglevel(conn.debug)
405 434
406 # Make sure to close http_conn at end so if a local file read 435 # Make sure to close http_conn at end so if a local file read
407 # failure occurs partway through server will terminate current upload 436 # failure occurs partway through server will terminate current upload
408 # and can report that progress on next attempt. 437 # and can report that progress on next attempt.
409 try: 438 try:
410 return self._upload_file_bytes(conn, http_conn, fp, file_length, 439 return self._upload_file_bytes(conn, http_conn, fp, file_length,
411 total_bytes_uploaded, cb, num_cb) 440 total_bytes_uploaded, cb, num_cb)
441 except (ResumableUploadException, socket.error):
442 resp = self._query_server_state(conn, file_length)
443 if resp.status == 400:
444 raise ResumableUploadException('Got 400 response from server '
445 'state query after failed resumable upload attempt. This '
446 'can happen if the file size changed between upload '
447 'attempts', ResumableTransferDisposition.ABORT)
448 else:
449 raise
412 finally: 450 finally:
413 http_conn.close() 451 http_conn.close()
414 452
415 def _check_final_md5(self, key, etag): 453 def _check_final_md5(self, key, etag):
416 """ 454 """
417 Checks that etag from server agrees with md5 computed before upload. 455 Checks that etag from server agrees with md5 computed before upload.
418 This is important, since the upload could have spanned a number of 456 This is important, since the upload could have spanned a number of
419 hours and multiple processes (e.g., gsutil runs), and the user could 457 hours and multiple processes (e.g., gsutil runs), and the user could
420 change some of the file and not realize they have inconsistent data. 458 change some of the file and not realize they have inconsistent data.
421 """ 459 """
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
487 headers, cb, num_cb) 525 headers, cb, num_cb)
488 # Upload succceded, so remove the tracker file (if have one). 526 # Upload succceded, so remove the tracker file (if have one).
489 self._remove_tracker_file() 527 self._remove_tracker_file()
490 self._check_final_md5(key, etag) 528 self._check_final_md5(key, etag)
491 if debug >= 1: 529 if debug >= 1:
492 print 'Resumable upload complete.' 530 print 'Resumable upload complete.'
493 return 531 return
494 except self.RETRYABLE_EXCEPTIONS, e: 532 except self.RETRYABLE_EXCEPTIONS, e:
495 if debug >= 1: 533 if debug >= 1:
496 print('Caught exception (%s)' % e.__repr__()) 534 print('Caught exception (%s)' % e.__repr__())
535 if isinstance(e, IOError) and e.errno == errno.EPIPE:
536 # Broken pipe error causes httplib to immediately
537 # close the socket (http://bugs.python.org/issue5542),
538 # so we need to close the connection before we resume
539 # the upload (which will cause a new connection to be
540 # opened the next time an HTTP request is sent).
541 key.bucket.connection.connection.close()
497 except ResumableUploadException, e: 542 except ResumableUploadException, e:
498 if e.disposition == ResumableTransferDisposition.ABORT: 543 if (e.disposition ==
544 ResumableTransferDisposition.ABORT_CUR_PROCESS):
499 if debug >= 1: 545 if debug >= 1:
500 print('Caught non-retryable ResumableUploadException ' 546 print('Caught non-retryable ResumableUploadException '
501 '(%s)' % e.message) 547 '(%s); aborting but retaining tracker file' %
548 e.message)
549 raise
550 elif (e.disposition ==
551 ResumableTransferDisposition.ABORT):
552 if debug >= 1:
553 print('Caught non-retryable ResumableUploadException '
554 '(%s); aborting and removing tracker file' %
555 e.message)
556 self._remove_tracker_file()
502 raise 557 raise
503 else: 558 else:
504 if debug >= 1: 559 if debug >= 1:
505 print('Caught ResumableUploadException (%s) - will ' 560 print('Caught ResumableUploadException (%s) - will '
506 'retry' % e.message) 561 'retry' % e.message)
507 562
508 # At this point we had a re-tryable failure; see if made progress. 563 # At this point we had a re-tryable failure; see if made progress.
509 if self.server_has_bytes > server_had_bytes_before_attempt: 564 if self.server_has_bytes > server_had_bytes_before_attempt:
510 progress_less_iterations = 0 565 progress_less_iterations = 0
511 else: 566 else:
512 progress_less_iterations += 1 567 progress_less_iterations += 1
513 568
514 if progress_less_iterations > self.num_retries: 569 if progress_less_iterations > self.num_retries:
515 # Don't retry any longer in the current process. 570 # Don't retry any longer in the current process.
516 raise ResumableUploadException( 571 raise ResumableUploadException(
517 'Too many resumable upload attempts failed without ' 572 'Too many resumable upload attempts failed without '
518 'progress. You might try this upload again later', 573 'progress. You might try this upload again later',
519 ResumableTransferDisposition.ABORT) 574 ResumableTransferDisposition.ABORT_CUR_PROCESS)
520 575
521 sleep_time_secs = 2**progress_less_iterations 576 # Use binary exponential backoff to desynchronize client requests
577 sleep_time_secs = random.random() * (2**progress_less_iterations)
522 if debug >= 1: 578 if debug >= 1:
523 print ('Got retryable failure (%d progress-less in a row).\n' 579 print ('Got retryable failure (%d progress-less in a row).\n'
524 'Sleeping %d seconds before re-trying' % 580 'Sleeping %3.1f seconds before re-trying' %
525 (progress_less_iterations, sleep_time_secs)) 581 (progress_less_iterations, sleep_time_secs))
526 time.sleep(sleep_time_secs) 582 time.sleep(sleep_time_secs)
OLDNEW
« no previous file with comments | « boto/gs/key.py ('k') | boto/https_connection.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698