OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright 2010 Google Inc. All Rights Reserved. |
| 3 # |
| 4 # Permission is hereby granted, free of charge, to any person obtaining a |
| 5 # copy of this software and associated documentation files (the |
| 6 # "Software"), to deal in the Software without restriction, including |
| 7 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 8 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 9 # persons to whom the Software is furnished to do so, subject to the fol- |
| 10 # lowing conditions: |
| 11 # |
| 12 # The above copyright notice and this permission notice shall be included |
| 13 # in all copies or substantial portions of the Software. |
| 14 # |
| 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 21 # IN THE SOFTWARE. |
| 22 """Boto translation layer for resumable uploads. |
| 23 |
| 24 See http://code.google.com/apis/storage/docs/developer-guide.html#resumable |
| 25 for details. |
| 26 |
| 27 Resumable uploads will retry interrupted uploads, resuming at the byte |
| 28 count completed by the last upload attempt. If too many retries happen with |
| 29 no progress (per configurable num_retries param), the upload will be |
| 30 aborted in the current process. |
| 31 |
| 32 Unlike the boto implementation of resumable upload handler, this class does |
| 33 not directly interact with tracker files. |
| 34 |
| 35 Originally Google wrote and contributed this code to the boto project, |
| 36 then copied that code back into gsutil on the release of gsutil 4.0 which |
| 37 supports both boto and non-boto codepaths for resumable uploads. Any bug |
| 38 fixes made to this file should also be integrated to resumable_upload_handler.py |
| 39 in boto, where applicable. |
| 40 |
| 41 TODO: gsutil-beta: Add a similar comment to the boto code. |
| 42 """ |
| 43 |
| 44 from __future__ import absolute_import |
| 45 |
| 46 import errno |
| 47 import httplib |
| 48 import random |
| 49 import re |
| 50 import socket |
| 51 import time |
| 52 import urlparse |
| 53 from boto import UserAgent |
| 54 from boto.connection import AWSAuthConnection |
| 55 from boto.exception import ResumableTransferDisposition |
| 56 from boto.exception import ResumableUploadException |
| 57 from gslib.exception import InvalidUrlError |
| 58 from gslib.util import GetMaxRetryDelay |
| 59 from gslib.util import GetNumRetries |
| 60 from gslib.util import XML_PROGRESS_CALLBACKS |
| 61 |
| 62 |
| 63 class BotoResumableUpload(object): |
| 64 """Upload helper class for resumable uploads via boto.""" |
| 65 |
| 66 BUFFER_SIZE = 8192 |
| 67 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error, |
| 68 socket.gaierror) |
| 69 |
| 70 # (start, end) response indicating service has nothing (upload protocol uses |
| 71 # inclusive numbering). |
| 72 SERVICE_HAS_NOTHING = (0, -1) |
| 73 |
| 74 def __init__(self, tracker_callback, logger, |
| 75 resume_url=None, num_retries=None): |
| 76 """Constructor. Instantiate once for each uploaded file. |
| 77 |
| 78 Args: |
| 79 tracker_callback: Callback function that takes a string argument. Used |
| 80 by caller to track this upload across upload |
| 81 interruption. |
| 82 logger: logging.logger instance to use for debug messages. |
| 83 resume_url: If present, attempt to resume the upload at this URL. |
| 84 num_retries: Number of times to retry the upload making no progress. |
| 85 This count resets every time we make progress, so the upload |
| 86 can span many more than this number of retries. |
| 87 """ |
| 88 if resume_url: |
| 89 self._SetUploadUrl(resume_url) |
| 90 else: |
| 91 self.upload_url = None |
| 92 self.num_retries = num_retries |
| 93 self.service_has_bytes = 0 # Byte count at last service check. |
| 94 # Save upload_start_point in instance state so caller can find how |
| 95 # much was transferred by this ResumableUploadHandler (across retries). |
| 96 self.upload_start_point = None |
| 97 self.tracker_callback = tracker_callback |
| 98 self.logger = logger |
| 99 |
| 100 def _SetUploadUrl(self, url): |
| 101 """Saves URL and resets upload state. |
| 102 |
| 103 Called when we start a new resumable upload or get a new tracker |
| 104 URL for the upload. |
| 105 |
| 106 Args: |
| 107 url: URL string for the upload. |
| 108 |
| 109 Raises InvalidUrlError if URL is syntactically invalid. |
| 110 """ |
| 111 parse_result = urlparse.urlparse(url) |
| 112 if (parse_result.scheme.lower() not in ['http', 'https'] or |
| 113 not parse_result.netloc): |
| 114 raise InvalidUrlError('Invalid upload URL (%s)' % url) |
| 115 self.upload_url = url |
| 116 self.upload_url_host = parse_result.netloc |
| 117 self.upload_url_path = '%s?%s' % ( |
| 118 parse_result.path, parse_result.query) |
| 119 self.service_has_bytes = 0 |
| 120 |
| 121 def _BuildContentRangeHeader(self, range_spec='*', length_spec='*'): |
| 122 return 'bytes %s/%s' % (range_spec, length_spec) |
| 123 |
| 124 def _QueryServiceState(self, conn, file_length): |
| 125 """Queries service to find out state of given upload. |
| 126 |
| 127 Note that this method really just makes special case use of the |
| 128 fact that the upload service always returns the current start/end |
| 129 state whenever a PUT doesn't complete. |
| 130 |
| 131 Args: |
| 132 conn: HTTPConnection to use for the query. |
| 133 file_length: Total length of the file. |
| 134 |
| 135 Returns: |
| 136 HTTP response from sending request. |
| 137 |
| 138 Raises: |
| 139 ResumableUploadException if problem querying service. |
| 140 """ |
| 141 # Send an empty PUT so that service replies with this resumable |
| 142 # transfer's state. |
| 143 put_headers = {} |
| 144 put_headers['Content-Range'] = ( |
| 145 self._BuildContentRangeHeader('*', file_length)) |
| 146 put_headers['Content-Length'] = '0' |
| 147 return AWSAuthConnection.make_request( |
| 148 conn, 'PUT', path=self.upload_url_path, auth_path=self.upload_url_path, |
| 149 headers=put_headers, host=self.upload_url_host) |
| 150 |
| 151 def _QueryServicePos(self, conn, file_length): |
| 152 """Queries service to find out what bytes it currently has. |
| 153 |
| 154 Args: |
| 155 conn: HTTPConnection to use for the query. |
| 156 file_length: Total length of the file. |
| 157 |
| 158 Returns: |
| 159 (service_start, service_end), where the values are inclusive. |
| 160 For example, (0, 2) would mean that the service has bytes 0, 1, *and* 2. |
| 161 |
| 162 Raises: |
| 163 ResumableUploadException if problem querying service. |
| 164 """ |
| 165 resp = self._QueryServiceState(conn, file_length) |
| 166 if resp.status == 200: |
| 167 # To handle the boundary condition where the service has the complete |
| 168 # file, we return (service_start, file_length-1). That way the |
| 169 # calling code can always simply read up through service_end. (If we |
| 170 # didn't handle this boundary condition here, the caller would have |
| 171 # to check whether service_end == file_length and read one fewer byte |
| 172 # in that case.) |
| 173 return (0, file_length - 1) # Completed upload. |
| 174 if resp.status != 308: |
| 175 # This means the service didn't have any state for the given |
| 176 # upload ID, which can happen (for example) if the caller saved |
| 177 # the upload URL to a file and then tried to restart the transfer |
| 178 # after that upload ID has gone stale. In that case we need to |
| 179 # start a new transfer (and the caller will then save the new |
| 180 # upload URL to the tracker file). |
| 181 raise ResumableUploadException( |
| 182 'Got non-308 response (%s) from service state query' % |
| 183 resp.status, ResumableTransferDisposition.START_OVER) |
| 184 got_valid_response = False |
| 185 range_spec = resp.getheader('range') |
| 186 if range_spec: |
| 187 # Parse 'bytes=<from>-<to>' range_spec. |
| 188 m = re.search(r'bytes=(\d+)-(\d+)', range_spec) |
| 189 if m: |
| 190 service_start = long(m.group(1)) |
| 191 service_end = long(m.group(2)) |
| 192 got_valid_response = True |
| 193 else: |
| 194 # No Range header, which means the service does not yet have |
| 195 # any bytes. Note that the Range header uses inclusive 'from' |
| 196 # and 'to' values. Since Range 0-0 would mean that the service |
| 197 # has byte 0, omitting the Range header is used to indicate that |
| 198 # the service doesn't have any bytes. |
| 199 return self.SERVICE_HAS_NOTHING |
| 200 if not got_valid_response: |
| 201 raise ResumableUploadException( |
| 202 'Couldn\'t parse upload service state query response (%s)' % |
| 203 str(resp.getheaders()), ResumableTransferDisposition.START_OVER) |
| 204 if conn.debug >= 1: |
| 205 self.logger.debug('Service has: Range: %d - %d.', service_start, |
| 206 service_end) |
| 207 return (service_start, service_end) |
| 208 |
| 209 def _StartNewResumableUpload(self, key, headers=None): |
| 210 """Starts a new resumable upload. |
| 211 |
| 212 Args: |
| 213 key: Boto Key representing the object to upload. |
| 214 headers: Headers to use in the upload requests. |
| 215 |
| 216 Raises: |
| 217 ResumableUploadException if any errors occur. |
| 218 """ |
| 219 conn = key.bucket.connection |
| 220 if conn.debug >= 1: |
| 221 self.logger.debug('Starting new resumable upload.') |
| 222 self.service_has_bytes = 0 |
| 223 |
| 224 # Start a new resumable upload by sending a POST request with an |
| 225 # empty body and the "X-Goog-Resumable: start" header. Include any |
| 226 # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length |
| 227 # (and raise an exception if they tried to pass one, since it's |
| 228 # a semantic error to specify it at this point, and if we were to |
| 229 # include one now it would cause the service to expect that many |
| 230 # bytes; the POST doesn't include the actual file bytes We set |
| 231 # the Content-Length in the subsequent PUT, based on the uploaded |
| 232 # file size. |
| 233 post_headers = {} |
| 234 for k in headers: |
| 235 if k.lower() == 'content-length': |
| 236 raise ResumableUploadException( |
| 237 'Attempt to specify Content-Length header (disallowed)', |
| 238 ResumableTransferDisposition.ABORT) |
| 239 post_headers[k] = headers[k] |
| 240 post_headers[conn.provider.resumable_upload_header] = 'start' |
| 241 |
| 242 resp = conn.make_request( |
| 243 'POST', key.bucket.name, key.name, post_headers) |
| 244 # Get upload URL from response 'Location' header. |
| 245 body = resp.read() |
| 246 |
| 247 # Check for various status conditions. |
| 248 if resp.status in [429, 500, 503]: |
| 249 # Retry after a delay. |
| 250 raise ResumableUploadException( |
| 251 'Got status %d from attempt to start resumable upload. ' |
| 252 'Will wait/retry' % resp.status, |
| 253 ResumableTransferDisposition.WAIT_BEFORE_RETRY) |
| 254 elif resp.status != 200 and resp.status != 201: |
| 255 raise ResumableUploadException( |
| 256 'Got status %d from attempt to start resumable upload. ' |
| 257 'Aborting' % resp.status, |
| 258 ResumableTransferDisposition.ABORT) |
| 259 |
| 260 # Else we got 200 or 201 response code, indicating the resumable |
| 261 # upload was created. |
| 262 upload_url = resp.getheader('Location') |
| 263 if not upload_url: |
| 264 raise ResumableUploadException( |
| 265 'No resumable upload URL found in resumable initiation ' |
| 266 'POST response (%s)' % body, |
| 267 ResumableTransferDisposition.WAIT_BEFORE_RETRY) |
| 268 self._SetUploadUrl(upload_url) |
| 269 self.tracker_callback(upload_url) |
| 270 |
| 271 def _UploadFileBytes(self, conn, http_conn, fp, file_length, |
| 272 total_bytes_uploaded, cb, num_cb, headers): |
| 273 """Attempts to upload file bytes. |
| 274 |
| 275 Makes a single attempt using an existing resumable upload connection. |
| 276 |
| 277 Args: |
| 278 conn: HTTPConnection from the boto Key. |
| 279 http_conn: Separate HTTPConnection for the transfer. |
| 280 fp: File pointer containing bytes to upload. |
| 281 file_length: Total length of the file. |
| 282 total_bytes_uploaded: The total number of bytes uploaded. |
| 283 cb: Progress callback function that takes (progress, total_size). |
| 284 num_cb: Granularity of the callback (maximum number of times the |
| 285 callback will be called during the file transfer). If negative, |
| 286 perform callback with each buffer read. |
| 287 headers: Headers to be used in the upload requests. |
| 288 |
| 289 Returns: |
| 290 (etag, generation, metageneration) from service upon success. |
| 291 |
| 292 Raises: |
| 293 ResumableUploadException if any problems occur. |
| 294 """ |
| 295 buf = fp.read(self.BUFFER_SIZE) |
| 296 if cb: |
| 297 # The cb_count represents the number of full buffers to send between |
| 298 # cb executions. |
| 299 if num_cb > 2: |
| 300 cb_count = file_length / self.BUFFER_SIZE / (num_cb-2) |
| 301 elif num_cb < 0: |
| 302 cb_count = -1 |
| 303 else: |
| 304 cb_count = 0 |
| 305 i = 0 |
| 306 cb(total_bytes_uploaded, file_length) |
| 307 |
| 308 # Build resumable upload headers for the transfer. Don't send a |
| 309 # Content-Range header if the file is 0 bytes long, because the |
| 310 # resumable upload protocol uses an *inclusive* end-range (so, sending |
| 311 # 'bytes 0-0/1' would actually mean you're sending a 1-byte file). |
| 312 put_headers = headers.copy() if headers else {} |
| 313 if file_length: |
| 314 if total_bytes_uploaded == file_length: |
| 315 range_header = self._BuildContentRangeHeader( |
| 316 '*', file_length) |
| 317 else: |
| 318 range_header = self._BuildContentRangeHeader( |
| 319 '%d-%d' % (total_bytes_uploaded, file_length - 1), |
| 320 file_length) |
| 321 put_headers['Content-Range'] = range_header |
| 322 # Set Content-Length to the total bytes we'll send with this PUT. |
| 323 put_headers['Content-Length'] = str(file_length - total_bytes_uploaded) |
| 324 http_request = AWSAuthConnection.build_base_http_request( |
| 325 conn, 'PUT', path=self.upload_url_path, auth_path=None, |
| 326 headers=put_headers, host=self.upload_url_host) |
| 327 http_conn.putrequest('PUT', http_request.path) |
| 328 for k in put_headers: |
| 329 http_conn.putheader(k, put_headers[k]) |
| 330 http_conn.endheaders() |
| 331 |
| 332 # Turn off debug on http connection so upload content isn't included |
| 333 # in debug stream. |
| 334 http_conn.set_debuglevel(0) |
| 335 while buf: |
| 336 http_conn.send(buf) |
| 337 total_bytes_uploaded += len(buf) |
| 338 if cb: |
| 339 i += 1 |
| 340 if i == cb_count or cb_count == -1: |
| 341 cb(total_bytes_uploaded, file_length) |
| 342 i = 0 |
| 343 buf = fp.read(self.BUFFER_SIZE) |
| 344 http_conn.set_debuglevel(conn.debug) |
| 345 if cb: |
| 346 cb(total_bytes_uploaded, file_length) |
| 347 if total_bytes_uploaded != file_length: |
| 348 # Abort (and delete the tracker file) so if the user retries |
| 349 # they'll start a new resumable upload rather than potentially |
| 350 # attempting to pick back up later where we left off. |
| 351 raise ResumableUploadException( |
| 352 'File changed during upload: EOF at %d bytes of %d byte file.' % |
| 353 (total_bytes_uploaded, file_length), |
| 354 ResumableTransferDisposition.ABORT) |
| 355 resp = http_conn.getresponse() |
| 356 # Restore http connection debug level. |
| 357 http_conn.set_debuglevel(conn.debug) |
| 358 |
| 359 if resp.status == 200: |
| 360 # Success. |
| 361 return (resp.getheader('etag'), |
| 362 resp.getheader('x-goog-generation'), |
| 363 resp.getheader('x-goog-metageneration')) |
| 364 # Retry timeout (408) and status 429, 500 and 503 errors after a delay. |
| 365 elif resp.status in [408, 429, 500, 503]: |
| 366 disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY |
| 367 else: |
| 368 # Catch all for any other error codes. |
| 369 disposition = ResumableTransferDisposition.ABORT |
| 370 raise ResumableUploadException('Got response code %d while attempting ' |
| 371 'upload (%s)' % |
| 372 (resp.status, resp.reason), disposition) |
| 373 |
| 374 def _AttemptResumableUpload(self, key, fp, file_length, headers, cb, |
| 375 num_cb): |
| 376 """Attempts a resumable upload. |
| 377 |
| 378 Args: |
| 379 key: Boto key representing object to upload. |
| 380 fp: File pointer containing upload bytes. |
| 381 file_length: Total length of the upload. |
| 382 headers: Headers to be used in upload requests. |
| 383 cb: Progress callback function that takes (progress, total_size). |
| 384 num_cb: Granularity of the callback (maximum number of times the |
| 385 callback will be called during the file transfer). If negative, |
| 386 perform callback with each buffer read. |
| 387 |
| 388 Returns: |
| 389 (etag, generation, metageneration) from service upon success. |
| 390 |
| 391 Raises: |
| 392 ResumableUploadException if any problems occur. |
| 393 """ |
| 394 (service_start, service_end) = self.SERVICE_HAS_NOTHING |
| 395 conn = key.bucket.connection |
| 396 if self.upload_url: |
| 397 # Try to resume existing resumable upload. |
| 398 try: |
| 399 (service_start, service_end) = ( |
| 400 self._QueryServicePos(conn, file_length)) |
| 401 self.service_has_bytes = service_start |
| 402 if conn.debug >= 1: |
| 403 self.logger.debug('Resuming transfer.') |
| 404 except ResumableUploadException, e: |
| 405 if conn.debug >= 1: |
| 406 self.logger.debug('Unable to resume transfer (%s).', e.message) |
| 407 self._StartNewResumableUpload(key, headers) |
| 408 else: |
| 409 self._StartNewResumableUpload(key, headers) |
| 410 |
| 411 # upload_start_point allows the code that instantiated the |
| 412 # ResumableUploadHandler to find out the point from which it started |
| 413 # uploading (e.g., so it can correctly compute throughput). |
| 414 if self.upload_start_point is None: |
| 415 self.upload_start_point = service_end |
| 416 |
| 417 total_bytes_uploaded = service_end + 1 |
| 418 |
| 419 # Start reading from the file based upon the number of bytes that the |
| 420 # server has so far. |
| 421 if total_bytes_uploaded < file_length: |
| 422 fp.seek(total_bytes_uploaded) |
| 423 |
| 424 conn = key.bucket.connection |
| 425 |
| 426 # Get a new HTTP connection (vs conn.get_http_connection(), which reuses |
| 427 # pool connections) because httplib requires a new HTTP connection per |
| 428 # transaction. (Without this, calling http_conn.getresponse() would get |
| 429 # "ResponseNotReady".) |
| 430 http_conn = conn.new_http_connection(self.upload_url_host, conn.port, |
| 431 conn.is_secure) |
| 432 http_conn.set_debuglevel(conn.debug) |
| 433 |
| 434 # Make sure to close http_conn at end so if a local file read |
| 435 # failure occurs partway through service will terminate current upload |
| 436 # and can report that progress on next attempt. |
| 437 try: |
| 438 return self._UploadFileBytes(conn, http_conn, fp, file_length, |
| 439 total_bytes_uploaded, cb, num_cb, |
| 440 headers) |
| 441 except (ResumableUploadException, socket.error): |
| 442 resp = self._QueryServiceState(conn, file_length) |
| 443 if resp.status == 400: |
| 444 raise ResumableUploadException( |
| 445 'Got 400 response from service state query after failed resumable ' |
| 446 'upload attempt. This can happen for various reasons, including ' |
| 447 'specifying an invalid request (e.g., an invalid canned ACL) or ' |
| 448 'if the file size changed between upload attempts', |
| 449 ResumableTransferDisposition.ABORT) |
| 450 else: |
| 451 raise |
| 452 finally: |
| 453 http_conn.close() |
| 454 |
| 455 def HandleResumableUploadException(self, e, debug): |
| 456 if e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS: |
| 457 if debug >= 1: |
| 458 self.logger.debug('Caught non-retryable ResumableUploadException (%s); ' |
| 459 'aborting but retaining tracker file', e.message) |
| 460 raise |
| 461 elif e.disposition == ResumableTransferDisposition.ABORT: |
| 462 if debug >= 1: |
| 463 self.logger.debug('Caught non-retryable ResumableUploadException (%s); ' |
| 464 'aborting and removing tracker file', e.message) |
| 465 raise |
| 466 elif e.disposition == ResumableTransferDisposition.START_OVER: |
| 467 raise |
| 468 else: |
| 469 if debug >= 1: |
| 470 self.logger.debug( |
| 471 'Caught ResumableUploadException (%s) - will retry', e.message) |
| 472 |
| 473 def TrackProgressLessIterations(self, service_had_bytes_before_attempt, |
| 474 debug=0): |
| 475 """Tracks the number of iterations without progress. |
| 476 |
| 477 Performs randomized exponential backoff. |
| 478 |
| 479 Args: |
| 480 service_had_bytes_before_attempt: Number of bytes the service had prior |
| 481 to this upload attempt. |
| 482 debug: debug level 0..3 |
| 483 """ |
| 484 # At this point we had a re-tryable failure; see if made progress. |
| 485 if self.service_has_bytes > service_had_bytes_before_attempt: |
| 486 self.progress_less_iterations = 0 # If progress, reset counter. |
| 487 else: |
| 488 self.progress_less_iterations += 1 |
| 489 |
| 490 if self.progress_less_iterations > self.num_retries: |
| 491 # Don't retry any longer in the current process. |
| 492 raise ResumableUploadException( |
| 493 'Too many resumable upload attempts failed without ' |
| 494 'progress. You might try this upload again later', |
| 495 ResumableTransferDisposition.ABORT_CUR_PROCESS) |
| 496 |
| 497 # Use binary exponential backoff to desynchronize client requests. |
| 498 sleep_time_secs = min(random.random() * (2**self.progress_less_iterations), |
| 499 GetMaxRetryDelay()) |
| 500 if debug >= 1: |
| 501 self.logger.debug('Got retryable failure (%d progress-less in a row).\n' |
| 502 'Sleeping %3.1f seconds before re-trying', |
| 503 self.progress_less_iterations, sleep_time_secs) |
| 504 time.sleep(sleep_time_secs) |
| 505 |
| 506 def SendFile(self, key, fp, size, headers, canned_acl=None, cb=None, |
| 507 num_cb=XML_PROGRESS_CALLBACKS): |
| 508 """Upload a file to a key into a bucket on GS, resumable upload protocol. |
| 509 |
| 510 Args: |
| 511 key: `boto.s3.key.Key` or subclass representing the upload destination. |
| 512 fp: File pointer to upload |
| 513 size: Size of the file to upload. |
| 514 headers: The headers to pass along with the PUT request |
| 515 canned_acl: Optional canned ACL to apply to object. |
| 516 cb: Callback function that will be called to report progress on |
| 517 the upload. The callback should accept two integer parameters, the |
| 518 first representing the number of bytes that have been successfully |
| 519 transmitted to GS, and the second representing the total number of |
| 520 bytes that need to be transmitted. |
| 521 num_cb: (optional) If a callback is specified with the cb parameter, this |
| 522 parameter determines the granularity of the callback by defining |
| 523 the maximum number of times the callback will be called during the |
| 524 file transfer. Providing a negative integer will cause your |
| 525 callback to be called with each buffer read. |
| 526 |
| 527 Raises: |
| 528 ResumableUploadException if a problem occurs during the transfer. |
| 529 """ |
| 530 |
| 531 if not headers: |
| 532 headers = {} |
| 533 # If Content-Type header is present and set to None, remove it. |
| 534 # This is gsutil's way of asking boto to refrain from auto-generating |
| 535 # that header. |
| 536 content_type = 'Content-Type' |
| 537 if content_type in headers and headers[content_type] is None: |
| 538 del headers[content_type] |
| 539 |
| 540 if canned_acl: |
| 541 headers[key.provider.acl_header] = canned_acl |
| 542 |
| 543 headers['User-Agent'] = UserAgent |
| 544 |
| 545 file_length = size |
| 546 debug = key.bucket.connection.debug |
| 547 |
| 548 # Use num-retries from constructor if one was provided; else check |
| 549 # for a value specified in the boto config file; else default to 5. |
| 550 if self.num_retries is None: |
| 551 self.num_retries = GetNumRetries() |
| 552 self.progress_less_iterations = 0 |
| 553 |
| 554 while True: # Retry as long as we're making progress. |
| 555 service_had_bytes_before_attempt = self.service_has_bytes |
| 556 try: |
| 557 # Save generation and metageneration in class state so caller |
| 558 # can find these values, for use in preconditions of future |
| 559 # operations on the uploaded object. |
| 560 (_, self.generation, self.metageneration) = ( |
| 561 self._AttemptResumableUpload(key, fp, file_length, |
| 562 headers, cb, num_cb)) |
| 563 |
| 564 key.generation = self.generation |
| 565 if debug >= 1: |
| 566 self.logger.debug('Resumable upload complete.') |
| 567 return |
| 568 except self.RETRYABLE_EXCEPTIONS, e: |
| 569 if debug >= 1: |
| 570 self.logger.debug('Caught exception (%s)', e.__repr__()) |
| 571 if isinstance(e, IOError) and e.errno == errno.EPIPE: |
| 572 # Broken pipe error causes httplib to immediately |
| 573 # close the socket (http://bugs.python.org/issue5542), |
| 574 # so we need to close the connection before we resume |
| 575 # the upload (which will cause a new connection to be |
| 576 # opened the next time an HTTP request is sent). |
| 577 key.bucket.connection.connection.close() |
| 578 except ResumableUploadException, e: |
| 579 self.HandleResumableUploadException(e, debug) |
| 580 |
| 581 self.TrackProgressLessIterations(service_had_bytes_before_attempt, |
| 582 debug=debug) |
OLD | NEW |