| OLD | NEW |
| (Empty) |
| 1 # -*- coding: utf-8 -*- | |
| 2 # Copyright 2010 Google Inc. All Rights Reserved. | |
| 3 # | |
| 4 # Permission is hereby granted, free of charge, to any person obtaining a | |
| 5 # copy of this software and associated documentation files (the | |
| 6 # "Software"), to deal in the Software without restriction, including | |
| 7 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
| 8 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
| 9 # persons to whom the Software is furnished to do so, subject to the fol- | |
| 10 # lowing conditions: | |
| 11 # | |
| 12 # The above copyright notice and this permission notice shall be included | |
| 13 # in all copies or substantial portions of the Software. | |
| 14 # | |
| 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
| 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
| 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
| 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
| 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
| 21 # IN THE SOFTWARE. | |
| 22 """Boto translation layer for resumable uploads. | |
| 23 | |
| 24 See http://code.google.com/apis/storage/docs/developer-guide.html#resumable | |
| 25 for details. | |
| 26 | |
| 27 Resumable uploads will retry interrupted uploads, resuming at the byte | |
| 28 count completed by the last upload attempt. If too many retries happen with | |
| 29 no progress (per configurable num_retries param), the upload will be | |
| 30 aborted in the current process. | |
| 31 | |
| 32 Unlike the boto implementation of resumable upload handler, this class does | |
| 33 not directly interact with tracker files. | |
| 34 | |
| 35 Originally Google wrote and contributed this code to the boto project, | |
| 36 then copied that code back into gsutil on the release of gsutil 4.0 which | |
| 37 supports both boto and non-boto codepaths for resumable uploads. Any bug | |
| 38 fixes made to this file should also be integrated to resumable_upload_handler.py | |
| 39 in boto, where applicable. | |
| 40 | |
| 41 TODO: gsutil-beta: Add a similar comment to the boto code. | |
| 42 """ | |
| 43 | |
| 44 from __future__ import absolute_import | |
| 45 | |
| 46 import errno | |
| 47 import httplib | |
| 48 import random | |
| 49 import re | |
| 50 import socket | |
| 51 import time | |
| 52 import urlparse | |
| 53 from boto import UserAgent | |
| 54 from boto.connection import AWSAuthConnection | |
| 55 from boto.exception import ResumableTransferDisposition | |
| 56 from boto.exception import ResumableUploadException | |
| 57 from gslib.exception import InvalidUrlError | |
| 58 from gslib.util import GetMaxRetryDelay | |
| 59 from gslib.util import GetNumRetries | |
| 60 from gslib.util import XML_PROGRESS_CALLBACKS | |
| 61 | |
| 62 | |
| 63 class BotoResumableUpload(object): | |
| 64 """Upload helper class for resumable uploads via boto.""" | |
| 65 | |
| 66 BUFFER_SIZE = 8192 | |
| 67 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error, | |
| 68 socket.gaierror) | |
| 69 | |
| 70 # (start, end) response indicating service has nothing (upload protocol uses | |
| 71 # inclusive numbering). | |
| 72 SERVICE_HAS_NOTHING = (0, -1) | |
| 73 | |
| 74 def __init__(self, tracker_callback, logger, | |
| 75 resume_url=None, num_retries=None): | |
| 76 """Constructor. Instantiate once for each uploaded file. | |
| 77 | |
| 78 Args: | |
| 79 tracker_callback: Callback function that takes a string argument. Used | |
| 80 by caller to track this upload across upload | |
| 81 interruption. | |
| 82 logger: logging.logger instance to use for debug messages. | |
| 83 resume_url: If present, attempt to resume the upload at this URL. | |
| 84 num_retries: Number of times to retry the upload making no progress. | |
| 85 This count resets every time we make progress, so the upload | |
| 86 can span many more than this number of retries. | |
| 87 """ | |
| 88 if resume_url: | |
| 89 self._SetUploadUrl(resume_url) | |
| 90 else: | |
| 91 self.upload_url = None | |
| 92 self.num_retries = num_retries | |
| 93 self.service_has_bytes = 0 # Byte count at last service check. | |
| 94 # Save upload_start_point in instance state so caller can find how | |
| 95 # much was transferred by this ResumableUploadHandler (across retries). | |
| 96 self.upload_start_point = None | |
| 97 self.tracker_callback = tracker_callback | |
| 98 self.logger = logger | |
| 99 | |
| 100 def _SetUploadUrl(self, url): | |
| 101 """Saves URL and resets upload state. | |
| 102 | |
| 103 Called when we start a new resumable upload or get a new tracker | |
| 104 URL for the upload. | |
| 105 | |
| 106 Args: | |
| 107 url: URL string for the upload. | |
| 108 | |
| 109 Raises InvalidUrlError if URL is syntactically invalid. | |
| 110 """ | |
| 111 parse_result = urlparse.urlparse(url) | |
| 112 if (parse_result.scheme.lower() not in ['http', 'https'] or | |
| 113 not parse_result.netloc): | |
| 114 raise InvalidUrlError('Invalid upload URL (%s)' % url) | |
| 115 self.upload_url = url | |
| 116 self.upload_url_host = parse_result.netloc | |
| 117 self.upload_url_path = '%s?%s' % ( | |
| 118 parse_result.path, parse_result.query) | |
| 119 self.service_has_bytes = 0 | |
| 120 | |
| 121 def _BuildContentRangeHeader(self, range_spec='*', length_spec='*'): | |
| 122 return 'bytes %s/%s' % (range_spec, length_spec) | |
| 123 | |
| 124 def _QueryServiceState(self, conn, file_length): | |
| 125 """Queries service to find out state of given upload. | |
| 126 | |
| 127 Note that this method really just makes special case use of the | |
| 128 fact that the upload service always returns the current start/end | |
| 129 state whenever a PUT doesn't complete. | |
| 130 | |
| 131 Args: | |
| 132 conn: HTTPConnection to use for the query. | |
| 133 file_length: Total length of the file. | |
| 134 | |
| 135 Returns: | |
| 136 HTTP response from sending request. | |
| 137 | |
| 138 Raises: | |
| 139 ResumableUploadException if problem querying service. | |
| 140 """ | |
| 141 # Send an empty PUT so that service replies with this resumable | |
| 142 # transfer's state. | |
| 143 put_headers = {} | |
| 144 put_headers['Content-Range'] = ( | |
| 145 self._BuildContentRangeHeader('*', file_length)) | |
| 146 put_headers['Content-Length'] = '0' | |
| 147 return AWSAuthConnection.make_request( | |
| 148 conn, 'PUT', path=self.upload_url_path, auth_path=self.upload_url_path, | |
| 149 headers=put_headers, host=self.upload_url_host) | |
| 150 | |
| 151 def _QueryServicePos(self, conn, file_length): | |
| 152 """Queries service to find out what bytes it currently has. | |
| 153 | |
| 154 Args: | |
| 155 conn: HTTPConnection to use for the query. | |
| 156 file_length: Total length of the file. | |
| 157 | |
| 158 Returns: | |
| 159 (service_start, service_end), where the values are inclusive. | |
| 160 For example, (0, 2) would mean that the service has bytes 0, 1, *and* 2. | |
| 161 | |
| 162 Raises: | |
| 163 ResumableUploadException if problem querying service. | |
| 164 """ | |
| 165 resp = self._QueryServiceState(conn, file_length) | |
| 166 if resp.status == 200: | |
| 167 # To handle the boundary condition where the service has the complete | |
| 168 # file, we return (service_start, file_length-1). That way the | |
| 169 # calling code can always simply read up through service_end. (If we | |
| 170 # didn't handle this boundary condition here, the caller would have | |
| 171 # to check whether service_end == file_length and read one fewer byte | |
| 172 # in that case.) | |
| 173 return (0, file_length - 1) # Completed upload. | |
| 174 if resp.status != 308: | |
| 175 # This means the service didn't have any state for the given | |
| 176 # upload ID, which can happen (for example) if the caller saved | |
| 177 # the upload URL to a file and then tried to restart the transfer | |
| 178 # after that upload ID has gone stale. In that case we need to | |
| 179 # start a new transfer (and the caller will then save the new | |
| 180 # upload URL to the tracker file). | |
| 181 raise ResumableUploadException( | |
| 182 'Got non-308 response (%s) from service state query' % | |
| 183 resp.status, ResumableTransferDisposition.START_OVER) | |
| 184 got_valid_response = False | |
| 185 range_spec = resp.getheader('range') | |
| 186 if range_spec: | |
| 187 # Parse 'bytes=<from>-<to>' range_spec. | |
| 188 m = re.search(r'bytes=(\d+)-(\d+)', range_spec) | |
| 189 if m: | |
| 190 service_start = long(m.group(1)) | |
| 191 service_end = long(m.group(2)) | |
| 192 got_valid_response = True | |
| 193 else: | |
| 194 # No Range header, which means the service does not yet have | |
| 195 # any bytes. Note that the Range header uses inclusive 'from' | |
| 196 # and 'to' values. Since Range 0-0 would mean that the service | |
| 197 # has byte 0, omitting the Range header is used to indicate that | |
| 198 # the service doesn't have any bytes. | |
| 199 return self.SERVICE_HAS_NOTHING | |
| 200 if not got_valid_response: | |
| 201 raise ResumableUploadException( | |
| 202 'Couldn\'t parse upload service state query response (%s)' % | |
| 203 str(resp.getheaders()), ResumableTransferDisposition.START_OVER) | |
| 204 if conn.debug >= 1: | |
| 205 self.logger.debug('Service has: Range: %d - %d.', service_start, | |
| 206 service_end) | |
| 207 return (service_start, service_end) | |
| 208 | |
| 209 def _StartNewResumableUpload(self, key, headers=None): | |
| 210 """Starts a new resumable upload. | |
| 211 | |
| 212 Args: | |
| 213 key: Boto Key representing the object to upload. | |
| 214 headers: Headers to use in the upload requests. | |
| 215 | |
| 216 Raises: | |
| 217 ResumableUploadException if any errors occur. | |
| 218 """ | |
| 219 conn = key.bucket.connection | |
| 220 if conn.debug >= 1: | |
| 221 self.logger.debug('Starting new resumable upload.') | |
| 222 self.service_has_bytes = 0 | |
| 223 | |
| 224 # Start a new resumable upload by sending a POST request with an | |
| 225 # empty body and the "X-Goog-Resumable: start" header. Include any | |
| 226 # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length | |
| 227 # (and raise an exception if they tried to pass one, since it's | |
| 228 # a semantic error to specify it at this point, and if we were to | |
| 229 # include one now it would cause the service to expect that many | |
| 230 # bytes; the POST doesn't include the actual file bytes We set | |
| 231 # the Content-Length in the subsequent PUT, based on the uploaded | |
| 232 # file size. | |
| 233 post_headers = {} | |
| 234 for k in headers: | |
| 235 if k.lower() == 'content-length': | |
| 236 raise ResumableUploadException( | |
| 237 'Attempt to specify Content-Length header (disallowed)', | |
| 238 ResumableTransferDisposition.ABORT) | |
| 239 post_headers[k] = headers[k] | |
| 240 post_headers[conn.provider.resumable_upload_header] = 'start' | |
| 241 | |
| 242 resp = conn.make_request( | |
| 243 'POST', key.bucket.name, key.name, post_headers) | |
| 244 # Get upload URL from response 'Location' header. | |
| 245 body = resp.read() | |
| 246 | |
| 247 # Check for various status conditions. | |
| 248 if resp.status in [429, 500, 503]: | |
| 249 # Retry after a delay. | |
| 250 raise ResumableUploadException( | |
| 251 'Got status %d from attempt to start resumable upload. ' | |
| 252 'Will wait/retry' % resp.status, | |
| 253 ResumableTransferDisposition.WAIT_BEFORE_RETRY) | |
| 254 elif resp.status != 200 and resp.status != 201: | |
| 255 raise ResumableUploadException( | |
| 256 'Got status %d from attempt to start resumable upload. ' | |
| 257 'Aborting' % resp.status, | |
| 258 ResumableTransferDisposition.ABORT) | |
| 259 | |
| 260 # Else we got 200 or 201 response code, indicating the resumable | |
| 261 # upload was created. | |
| 262 upload_url = resp.getheader('Location') | |
| 263 if not upload_url: | |
| 264 raise ResumableUploadException( | |
| 265 'No resumable upload URL found in resumable initiation ' | |
| 266 'POST response (%s)' % body, | |
| 267 ResumableTransferDisposition.WAIT_BEFORE_RETRY) | |
| 268 self._SetUploadUrl(upload_url) | |
| 269 self.tracker_callback(upload_url) | |
| 270 | |
| 271 def _UploadFileBytes(self, conn, http_conn, fp, file_length, | |
| 272 total_bytes_uploaded, cb, num_cb, headers): | |
| 273 """Attempts to upload file bytes. | |
| 274 | |
| 275 Makes a single attempt using an existing resumable upload connection. | |
| 276 | |
| 277 Args: | |
| 278 conn: HTTPConnection from the boto Key. | |
| 279 http_conn: Separate HTTPConnection for the transfer. | |
| 280 fp: File pointer containing bytes to upload. | |
| 281 file_length: Total length of the file. | |
| 282 total_bytes_uploaded: The total number of bytes uploaded. | |
| 283 cb: Progress callback function that takes (progress, total_size). | |
| 284 num_cb: Granularity of the callback (maximum number of times the | |
| 285 callback will be called during the file transfer). If negative, | |
| 286 perform callback with each buffer read. | |
| 287 headers: Headers to be used in the upload requests. | |
| 288 | |
| 289 Returns: | |
| 290 (etag, generation, metageneration) from service upon success. | |
| 291 | |
| 292 Raises: | |
| 293 ResumableUploadException if any problems occur. | |
| 294 """ | |
| 295 buf = fp.read(self.BUFFER_SIZE) | |
| 296 if cb: | |
| 297 # The cb_count represents the number of full buffers to send between | |
| 298 # cb executions. | |
| 299 if num_cb > 2: | |
| 300 cb_count = file_length / self.BUFFER_SIZE / (num_cb-2) | |
| 301 elif num_cb < 0: | |
| 302 cb_count = -1 | |
| 303 else: | |
| 304 cb_count = 0 | |
| 305 i = 0 | |
| 306 cb(total_bytes_uploaded, file_length) | |
| 307 | |
| 308 # Build resumable upload headers for the transfer. Don't send a | |
| 309 # Content-Range header if the file is 0 bytes long, because the | |
| 310 # resumable upload protocol uses an *inclusive* end-range (so, sending | |
| 311 # 'bytes 0-0/1' would actually mean you're sending a 1-byte file). | |
| 312 put_headers = headers.copy() if headers else {} | |
| 313 if file_length: | |
| 314 if total_bytes_uploaded == file_length: | |
| 315 range_header = self._BuildContentRangeHeader( | |
| 316 '*', file_length) | |
| 317 else: | |
| 318 range_header = self._BuildContentRangeHeader( | |
| 319 '%d-%d' % (total_bytes_uploaded, file_length - 1), | |
| 320 file_length) | |
| 321 put_headers['Content-Range'] = range_header | |
| 322 # Set Content-Length to the total bytes we'll send with this PUT. | |
| 323 put_headers['Content-Length'] = str(file_length - total_bytes_uploaded) | |
| 324 http_request = AWSAuthConnection.build_base_http_request( | |
| 325 conn, 'PUT', path=self.upload_url_path, auth_path=None, | |
| 326 headers=put_headers, host=self.upload_url_host) | |
| 327 http_conn.putrequest('PUT', http_request.path) | |
| 328 for k in put_headers: | |
| 329 http_conn.putheader(k, put_headers[k]) | |
| 330 http_conn.endheaders() | |
| 331 | |
| 332 # Turn off debug on http connection so upload content isn't included | |
| 333 # in debug stream. | |
| 334 http_conn.set_debuglevel(0) | |
| 335 while buf: | |
| 336 http_conn.send(buf) | |
| 337 total_bytes_uploaded += len(buf) | |
| 338 if cb: | |
| 339 i += 1 | |
| 340 if i == cb_count or cb_count == -1: | |
| 341 cb(total_bytes_uploaded, file_length) | |
| 342 i = 0 | |
| 343 buf = fp.read(self.BUFFER_SIZE) | |
| 344 http_conn.set_debuglevel(conn.debug) | |
| 345 if cb: | |
| 346 cb(total_bytes_uploaded, file_length) | |
| 347 if total_bytes_uploaded != file_length: | |
| 348 # Abort (and delete the tracker file) so if the user retries | |
| 349 # they'll start a new resumable upload rather than potentially | |
| 350 # attempting to pick back up later where we left off. | |
| 351 raise ResumableUploadException( | |
| 352 'File changed during upload: EOF at %d bytes of %d byte file.' % | |
| 353 (total_bytes_uploaded, file_length), | |
| 354 ResumableTransferDisposition.ABORT) | |
| 355 resp = http_conn.getresponse() | |
| 356 # Restore http connection debug level. | |
| 357 http_conn.set_debuglevel(conn.debug) | |
| 358 | |
| 359 if resp.status == 200: | |
| 360 # Success. | |
| 361 return (resp.getheader('etag'), | |
| 362 resp.getheader('x-goog-generation'), | |
| 363 resp.getheader('x-goog-metageneration')) | |
| 364 # Retry timeout (408) and status 429, 500 and 503 errors after a delay. | |
| 365 elif resp.status in [408, 429, 500, 503]: | |
| 366 disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY | |
| 367 else: | |
| 368 # Catch all for any other error codes. | |
| 369 disposition = ResumableTransferDisposition.ABORT | |
| 370 raise ResumableUploadException('Got response code %d while attempting ' | |
| 371 'upload (%s)' % | |
| 372 (resp.status, resp.reason), disposition) | |
| 373 | |
| 374 def _AttemptResumableUpload(self, key, fp, file_length, headers, cb, | |
| 375 num_cb): | |
| 376 """Attempts a resumable upload. | |
| 377 | |
| 378 Args: | |
| 379 key: Boto key representing object to upload. | |
| 380 fp: File pointer containing upload bytes. | |
| 381 file_length: Total length of the upload. | |
| 382 headers: Headers to be used in upload requests. | |
| 383 cb: Progress callback function that takes (progress, total_size). | |
| 384 num_cb: Granularity of the callback (maximum number of times the | |
| 385 callback will be called during the file transfer). If negative, | |
| 386 perform callback with each buffer read. | |
| 387 | |
| 388 Returns: | |
| 389 (etag, generation, metageneration) from service upon success. | |
| 390 | |
| 391 Raises: | |
| 392 ResumableUploadException if any problems occur. | |
| 393 """ | |
| 394 (service_start, service_end) = self.SERVICE_HAS_NOTHING | |
| 395 conn = key.bucket.connection | |
| 396 if self.upload_url: | |
| 397 # Try to resume existing resumable upload. | |
| 398 try: | |
| 399 (service_start, service_end) = ( | |
| 400 self._QueryServicePos(conn, file_length)) | |
| 401 self.service_has_bytes = service_start | |
| 402 if conn.debug >= 1: | |
| 403 self.logger.debug('Resuming transfer.') | |
| 404 except ResumableUploadException, e: | |
| 405 if conn.debug >= 1: | |
| 406 self.logger.debug('Unable to resume transfer (%s).', e.message) | |
| 407 self._StartNewResumableUpload(key, headers) | |
| 408 else: | |
| 409 self._StartNewResumableUpload(key, headers) | |
| 410 | |
| 411 # upload_start_point allows the code that instantiated the | |
| 412 # ResumableUploadHandler to find out the point from which it started | |
| 413 # uploading (e.g., so it can correctly compute throughput). | |
| 414 if self.upload_start_point is None: | |
| 415 self.upload_start_point = service_end | |
| 416 | |
| 417 total_bytes_uploaded = service_end + 1 | |
| 418 | |
| 419 # Start reading from the file based upon the number of bytes that the | |
| 420 # server has so far. | |
| 421 if total_bytes_uploaded < file_length: | |
| 422 fp.seek(total_bytes_uploaded) | |
| 423 | |
| 424 conn = key.bucket.connection | |
| 425 | |
| 426 # Get a new HTTP connection (vs conn.get_http_connection(), which reuses | |
| 427 # pool connections) because httplib requires a new HTTP connection per | |
| 428 # transaction. (Without this, calling http_conn.getresponse() would get | |
| 429 # "ResponseNotReady".) | |
| 430 http_conn = conn.new_http_connection(self.upload_url_host, conn.port, | |
| 431 conn.is_secure) | |
| 432 http_conn.set_debuglevel(conn.debug) | |
| 433 | |
| 434 # Make sure to close http_conn at end so if a local file read | |
| 435 # failure occurs partway through service will terminate current upload | |
| 436 # and can report that progress on next attempt. | |
| 437 try: | |
| 438 return self._UploadFileBytes(conn, http_conn, fp, file_length, | |
| 439 total_bytes_uploaded, cb, num_cb, | |
| 440 headers) | |
| 441 except (ResumableUploadException, socket.error): | |
| 442 resp = self._QueryServiceState(conn, file_length) | |
| 443 if resp.status == 400: | |
| 444 raise ResumableUploadException( | |
| 445 'Got 400 response from service state query after failed resumable ' | |
| 446 'upload attempt. This can happen for various reasons, including ' | |
| 447 'specifying an invalid request (e.g., an invalid canned ACL) or ' | |
| 448 'if the file size changed between upload attempts', | |
| 449 ResumableTransferDisposition.ABORT) | |
| 450 else: | |
| 451 raise | |
| 452 finally: | |
| 453 http_conn.close() | |
| 454 | |
| 455 def HandleResumableUploadException(self, e, debug): | |
| 456 if e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS: | |
| 457 if debug >= 1: | |
| 458 self.logger.debug('Caught non-retryable ResumableUploadException (%s); ' | |
| 459 'aborting but retaining tracker file', e.message) | |
| 460 raise | |
| 461 elif e.disposition == ResumableTransferDisposition.ABORT: | |
| 462 if debug >= 1: | |
| 463 self.logger.debug('Caught non-retryable ResumableUploadException (%s); ' | |
| 464 'aborting and removing tracker file', e.message) | |
| 465 raise | |
| 466 elif e.disposition == ResumableTransferDisposition.START_OVER: | |
| 467 raise | |
| 468 else: | |
| 469 if debug >= 1: | |
| 470 self.logger.debug( | |
| 471 'Caught ResumableUploadException (%s) - will retry', e.message) | |
| 472 | |
| 473 def TrackProgressLessIterations(self, service_had_bytes_before_attempt, | |
| 474 debug=0): | |
| 475 """Tracks the number of iterations without progress. | |
| 476 | |
| 477 Performs randomized exponential backoff. | |
| 478 | |
| 479 Args: | |
| 480 service_had_bytes_before_attempt: Number of bytes the service had prior | |
| 481 to this upload attempt. | |
| 482 debug: debug level 0..3 | |
| 483 """ | |
| 484 # At this point we had a re-tryable failure; see if made progress. | |
| 485 if self.service_has_bytes > service_had_bytes_before_attempt: | |
| 486 self.progress_less_iterations = 0 # If progress, reset counter. | |
| 487 else: | |
| 488 self.progress_less_iterations += 1 | |
| 489 | |
| 490 if self.progress_less_iterations > self.num_retries: | |
| 491 # Don't retry any longer in the current process. | |
| 492 raise ResumableUploadException( | |
| 493 'Too many resumable upload attempts failed without ' | |
| 494 'progress. You might try this upload again later', | |
| 495 ResumableTransferDisposition.ABORT_CUR_PROCESS) | |
| 496 | |
| 497 # Use binary exponential backoff to desynchronize client requests. | |
| 498 sleep_time_secs = min(random.random() * (2**self.progress_less_iterations), | |
| 499 GetMaxRetryDelay()) | |
| 500 if debug >= 1: | |
| 501 self.logger.debug('Got retryable failure (%d progress-less in a row).\n' | |
| 502 'Sleeping %3.1f seconds before re-trying', | |
| 503 self.progress_less_iterations, sleep_time_secs) | |
| 504 time.sleep(sleep_time_secs) | |
| 505 | |
| 506 def SendFile(self, key, fp, size, headers, canned_acl=None, cb=None, | |
| 507 num_cb=XML_PROGRESS_CALLBACKS): | |
| 508 """Upload a file to a key into a bucket on GS, resumable upload protocol. | |
| 509 | |
| 510 Args: | |
| 511 key: `boto.s3.key.Key` or subclass representing the upload destination. | |
| 512 fp: File pointer to upload | |
| 513 size: Size of the file to upload. | |
| 514 headers: The headers to pass along with the PUT request | |
| 515 canned_acl: Optional canned ACL to apply to object. | |
| 516 cb: Callback function that will be called to report progress on | |
| 517 the upload. The callback should accept two integer parameters, the | |
| 518 first representing the number of bytes that have been successfully | |
| 519 transmitted to GS, and the second representing the total number of | |
| 520 bytes that need to be transmitted. | |
| 521 num_cb: (optional) If a callback is specified with the cb parameter, this | |
| 522 parameter determines the granularity of the callback by defining | |
| 523 the maximum number of times the callback will be called during the | |
| 524 file transfer. Providing a negative integer will cause your | |
| 525 callback to be called with each buffer read. | |
| 526 | |
| 527 Raises: | |
| 528 ResumableUploadException if a problem occurs during the transfer. | |
| 529 """ | |
| 530 | |
| 531 if not headers: | |
| 532 headers = {} | |
| 533 # If Content-Type header is present and set to None, remove it. | |
| 534 # This is gsutil's way of asking boto to refrain from auto-generating | |
| 535 # that header. | |
| 536 content_type = 'Content-Type' | |
| 537 if content_type in headers and headers[content_type] is None: | |
| 538 del headers[content_type] | |
| 539 | |
| 540 if canned_acl: | |
| 541 headers[key.provider.acl_header] = canned_acl | |
| 542 | |
| 543 headers['User-Agent'] = UserAgent | |
| 544 | |
| 545 file_length = size | |
| 546 debug = key.bucket.connection.debug | |
| 547 | |
| 548 # Use num-retries from constructor if one was provided; else check | |
| 549 # for a value specified in the boto config file; else default to 5. | |
| 550 if self.num_retries is None: | |
| 551 self.num_retries = GetNumRetries() | |
| 552 self.progress_less_iterations = 0 | |
| 553 | |
| 554 while True: # Retry as long as we're making progress. | |
| 555 service_had_bytes_before_attempt = self.service_has_bytes | |
| 556 try: | |
| 557 # Save generation and metageneration in class state so caller | |
| 558 # can find these values, for use in preconditions of future | |
| 559 # operations on the uploaded object. | |
| 560 (_, self.generation, self.metageneration) = ( | |
| 561 self._AttemptResumableUpload(key, fp, file_length, | |
| 562 headers, cb, num_cb)) | |
| 563 | |
| 564 key.generation = self.generation | |
| 565 if debug >= 1: | |
| 566 self.logger.debug('Resumable upload complete.') | |
| 567 return | |
| 568 except self.RETRYABLE_EXCEPTIONS, e: | |
| 569 if debug >= 1: | |
| 570 self.logger.debug('Caught exception (%s)', e.__repr__()) | |
| 571 if isinstance(e, IOError) and e.errno == errno.EPIPE: | |
| 572 # Broken pipe error causes httplib to immediately | |
| 573 # close the socket (http://bugs.python.org/issue5542), | |
| 574 # so we need to close the connection before we resume | |
| 575 # the upload (which will cause a new connection to be | |
| 576 # opened the next time an HTTP request is sent). | |
| 577 key.bucket.connection.connection.close() | |
| 578 except ResumableUploadException, e: | |
| 579 self.HandleResumableUploadException(e, debug) | |
| 580 | |
| 581 self.TrackProgressLessIterations(service_had_bytes_before_attempt, | |
| 582 debug=debug) | |
| OLD | NEW |