OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2010 Google Inc. All Rights Reserved. | |
3 # | |
4 # Permission is hereby granted, free of charge, to any person obtaining a | |
5 # copy of this software and associated documentation files (the | |
6 # "Software"), to deal in the Software without restriction, including | |
7 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
8 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
9 # persons to whom the Software is furnished to do so, subject to the fol- | |
10 # lowing conditions: | |
11 # | |
12 # The above copyright notice and this permission notice shall be included | |
13 # in all copies or substantial portions of the Software. | |
14 # | |
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
21 # IN THE SOFTWARE. | |
22 """Boto translation layer for resumable uploads. | |
23 | |
24 See http://code.google.com/apis/storage/docs/developer-guide.html#resumable | |
25 for details. | |
26 | |
27 Resumable uploads will retry interrupted uploads, resuming at the byte | |
28 count completed by the last upload attempt. If too many retries happen with | |
29 no progress (per configurable num_retries param), the upload will be | |
30 aborted in the current process. | |
31 | |
32 Unlike the boto implementation of resumable upload handler, this class does | |
33 not directly interact with tracker files. | |
34 | |
35 Originally Google wrote and contributed this code to the boto project, | |
36 then copied that code back into gsutil on the release of gsutil 4.0 which | |
37 supports both boto and non-boto codepaths for resumable uploads. Any bug | |
38 fixes made to this file should also be integrated to resumable_upload_handler.py | |
39 in boto, where applicable. | |
40 | |
41 TODO: gsutil-beta: Add a similar comment to the boto code. | |
42 """ | |
43 | |
44 from __future__ import absolute_import | |
45 | |
46 import errno | |
47 import httplib | |
48 import random | |
49 import re | |
50 import socket | |
51 import time | |
52 import urlparse | |
53 from boto import UserAgent | |
54 from boto.connection import AWSAuthConnection | |
55 from boto.exception import ResumableTransferDisposition | |
56 from boto.exception import ResumableUploadException | |
57 from gslib.exception import InvalidUrlError | |
58 from gslib.util import GetMaxRetryDelay | |
59 from gslib.util import GetNumRetries | |
60 from gslib.util import XML_PROGRESS_CALLBACKS | |
61 | |
62 | |
63 class BotoResumableUpload(object): | |
64 """Upload helper class for resumable uploads via boto.""" | |
65 | |
66 BUFFER_SIZE = 8192 | |
67 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error, | |
68 socket.gaierror) | |
69 | |
70 # (start, end) response indicating service has nothing (upload protocol uses | |
71 # inclusive numbering). | |
72 SERVICE_HAS_NOTHING = (0, -1) | |
73 | |
74 def __init__(self, tracker_callback, logger, | |
75 resume_url=None, num_retries=None): | |
76 """Constructor. Instantiate once for each uploaded file. | |
77 | |
78 Args: | |
79 tracker_callback: Callback function that takes a string argument. Used | |
80 by caller to track this upload across upload | |
81 interruption. | |
82 logger: logging.logger instance to use for debug messages. | |
83 resume_url: If present, attempt to resume the upload at this URL. | |
84 num_retries: Number of times to retry the upload making no progress. | |
85 This count resets every time we make progress, so the upload | |
86 can span many more than this number of retries. | |
87 """ | |
88 if resume_url: | |
89 self._SetUploadUrl(resume_url) | |
90 else: | |
91 self.upload_url = None | |
92 self.num_retries = num_retries | |
93 self.service_has_bytes = 0 # Byte count at last service check. | |
94 # Save upload_start_point in instance state so caller can find how | |
95 # much was transferred by this ResumableUploadHandler (across retries). | |
96 self.upload_start_point = None | |
97 self.tracker_callback = tracker_callback | |
98 self.logger = logger | |
99 | |
100 def _SetUploadUrl(self, url): | |
101 """Saves URL and resets upload state. | |
102 | |
103 Called when we start a new resumable upload or get a new tracker | |
104 URL for the upload. | |
105 | |
106 Args: | |
107 url: URL string for the upload. | |
108 | |
109 Raises InvalidUrlError if URL is syntactically invalid. | |
110 """ | |
111 parse_result = urlparse.urlparse(url) | |
112 if (parse_result.scheme.lower() not in ['http', 'https'] or | |
113 not parse_result.netloc): | |
114 raise InvalidUrlError('Invalid upload URL (%s)' % url) | |
115 self.upload_url = url | |
116 self.upload_url_host = parse_result.netloc | |
117 self.upload_url_path = '%s?%s' % ( | |
118 parse_result.path, parse_result.query) | |
119 self.service_has_bytes = 0 | |
120 | |
121 def _BuildContentRangeHeader(self, range_spec='*', length_spec='*'): | |
122 return 'bytes %s/%s' % (range_spec, length_spec) | |
123 | |
124 def _QueryServiceState(self, conn, file_length): | |
125 """Queries service to find out state of given upload. | |
126 | |
127 Note that this method really just makes special case use of the | |
128 fact that the upload service always returns the current start/end | |
129 state whenever a PUT doesn't complete. | |
130 | |
131 Args: | |
132 conn: HTTPConnection to use for the query. | |
133 file_length: Total length of the file. | |
134 | |
135 Returns: | |
136 HTTP response from sending request. | |
137 | |
138 Raises: | |
139 ResumableUploadException if problem querying service. | |
140 """ | |
141 # Send an empty PUT so that service replies with this resumable | |
142 # transfer's state. | |
143 put_headers = {} | |
144 put_headers['Content-Range'] = ( | |
145 self._BuildContentRangeHeader('*', file_length)) | |
146 put_headers['Content-Length'] = '0' | |
147 return AWSAuthConnection.make_request( | |
148 conn, 'PUT', path=self.upload_url_path, auth_path=self.upload_url_path, | |
149 headers=put_headers, host=self.upload_url_host) | |
150 | |
151 def _QueryServicePos(self, conn, file_length): | |
152 """Queries service to find out what bytes it currently has. | |
153 | |
154 Args: | |
155 conn: HTTPConnection to use for the query. | |
156 file_length: Total length of the file. | |
157 | |
158 Returns: | |
159 (service_start, service_end), where the values are inclusive. | |
160 For example, (0, 2) would mean that the service has bytes 0, 1, *and* 2. | |
161 | |
162 Raises: | |
163 ResumableUploadException if problem querying service. | |
164 """ | |
165 resp = self._QueryServiceState(conn, file_length) | |
166 if resp.status == 200: | |
167 # To handle the boundary condition where the service has the complete | |
168 # file, we return (service_start, file_length-1). That way the | |
169 # calling code can always simply read up through service_end. (If we | |
170 # didn't handle this boundary condition here, the caller would have | |
171 # to check whether service_end == file_length and read one fewer byte | |
172 # in that case.) | |
173 return (0, file_length - 1) # Completed upload. | |
174 if resp.status != 308: | |
175 # This means the service didn't have any state for the given | |
176 # upload ID, which can happen (for example) if the caller saved | |
177 # the upload URL to a file and then tried to restart the transfer | |
178 # after that upload ID has gone stale. In that case we need to | |
179 # start a new transfer (and the caller will then save the new | |
180 # upload URL to the tracker file). | |
181 raise ResumableUploadException( | |
182 'Got non-308 response (%s) from service state query' % | |
183 resp.status, ResumableTransferDisposition.START_OVER) | |
184 got_valid_response = False | |
185 range_spec = resp.getheader('range') | |
186 if range_spec: | |
187 # Parse 'bytes=<from>-<to>' range_spec. | |
188 m = re.search(r'bytes=(\d+)-(\d+)', range_spec) | |
189 if m: | |
190 service_start = long(m.group(1)) | |
191 service_end = long(m.group(2)) | |
192 got_valid_response = True | |
193 else: | |
194 # No Range header, which means the service does not yet have | |
195 # any bytes. Note that the Range header uses inclusive 'from' | |
196 # and 'to' values. Since Range 0-0 would mean that the service | |
197 # has byte 0, omitting the Range header is used to indicate that | |
198 # the service doesn't have any bytes. | |
199 return self.SERVICE_HAS_NOTHING | |
200 if not got_valid_response: | |
201 raise ResumableUploadException( | |
202 'Couldn\'t parse upload service state query response (%s)' % | |
203 str(resp.getheaders()), ResumableTransferDisposition.START_OVER) | |
204 if conn.debug >= 1: | |
205 self.logger.debug('Service has: Range: %d - %d.', service_start, | |
206 service_end) | |
207 return (service_start, service_end) | |
208 | |
209 def _StartNewResumableUpload(self, key, headers=None): | |
210 """Starts a new resumable upload. | |
211 | |
212 Args: | |
213 key: Boto Key representing the object to upload. | |
214 headers: Headers to use in the upload requests. | |
215 | |
216 Raises: | |
217 ResumableUploadException if any errors occur. | |
218 """ | |
219 conn = key.bucket.connection | |
220 if conn.debug >= 1: | |
221 self.logger.debug('Starting new resumable upload.') | |
222 self.service_has_bytes = 0 | |
223 | |
224 # Start a new resumable upload by sending a POST request with an | |
225 # empty body and the "X-Goog-Resumable: start" header. Include any | |
226 # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length | |
227 # (and raise an exception if they tried to pass one, since it's | |
228 # a semantic error to specify it at this point, and if we were to | |
229 # include one now it would cause the service to expect that many | |
230 # bytes; the POST doesn't include the actual file bytes We set | |
231 # the Content-Length in the subsequent PUT, based on the uploaded | |
232 # file size. | |
233 post_headers = {} | |
234 for k in headers: | |
235 if k.lower() == 'content-length': | |
236 raise ResumableUploadException( | |
237 'Attempt to specify Content-Length header (disallowed)', | |
238 ResumableTransferDisposition.ABORT) | |
239 post_headers[k] = headers[k] | |
240 post_headers[conn.provider.resumable_upload_header] = 'start' | |
241 | |
242 resp = conn.make_request( | |
243 'POST', key.bucket.name, key.name, post_headers) | |
244 # Get upload URL from response 'Location' header. | |
245 body = resp.read() | |
246 | |
247 # Check for various status conditions. | |
248 if resp.status in [429, 500, 503]: | |
249 # Retry after a delay. | |
250 raise ResumableUploadException( | |
251 'Got status %d from attempt to start resumable upload. ' | |
252 'Will wait/retry' % resp.status, | |
253 ResumableTransferDisposition.WAIT_BEFORE_RETRY) | |
254 elif resp.status != 200 and resp.status != 201: | |
255 raise ResumableUploadException( | |
256 'Got status %d from attempt to start resumable upload. ' | |
257 'Aborting' % resp.status, | |
258 ResumableTransferDisposition.ABORT) | |
259 | |
260 # Else we got 200 or 201 response code, indicating the resumable | |
261 # upload was created. | |
262 upload_url = resp.getheader('Location') | |
263 if not upload_url: | |
264 raise ResumableUploadException( | |
265 'No resumable upload URL found in resumable initiation ' | |
266 'POST response (%s)' % body, | |
267 ResumableTransferDisposition.WAIT_BEFORE_RETRY) | |
268 self._SetUploadUrl(upload_url) | |
269 self.tracker_callback(upload_url) | |
270 | |
271 def _UploadFileBytes(self, conn, http_conn, fp, file_length, | |
272 total_bytes_uploaded, cb, num_cb, headers): | |
273 """Attempts to upload file bytes. | |
274 | |
275 Makes a single attempt using an existing resumable upload connection. | |
276 | |
277 Args: | |
278 conn: HTTPConnection from the boto Key. | |
279 http_conn: Separate HTTPConnection for the transfer. | |
280 fp: File pointer containing bytes to upload. | |
281 file_length: Total length of the file. | |
282 total_bytes_uploaded: The total number of bytes uploaded. | |
283 cb: Progress callback function that takes (progress, total_size). | |
284 num_cb: Granularity of the callback (maximum number of times the | |
285 callback will be called during the file transfer). If negative, | |
286 perform callback with each buffer read. | |
287 headers: Headers to be used in the upload requests. | |
288 | |
289 Returns: | |
290 (etag, generation, metageneration) from service upon success. | |
291 | |
292 Raises: | |
293 ResumableUploadException if any problems occur. | |
294 """ | |
295 buf = fp.read(self.BUFFER_SIZE) | |
296 if cb: | |
297 # The cb_count represents the number of full buffers to send between | |
298 # cb executions. | |
299 if num_cb > 2: | |
300 cb_count = file_length / self.BUFFER_SIZE / (num_cb-2) | |
301 elif num_cb < 0: | |
302 cb_count = -1 | |
303 else: | |
304 cb_count = 0 | |
305 i = 0 | |
306 cb(total_bytes_uploaded, file_length) | |
307 | |
308 # Build resumable upload headers for the transfer. Don't send a | |
309 # Content-Range header if the file is 0 bytes long, because the | |
310 # resumable upload protocol uses an *inclusive* end-range (so, sending | |
311 # 'bytes 0-0/1' would actually mean you're sending a 1-byte file). | |
312 put_headers = headers.copy() if headers else {} | |
313 if file_length: | |
314 if total_bytes_uploaded == file_length: | |
315 range_header = self._BuildContentRangeHeader( | |
316 '*', file_length) | |
317 else: | |
318 range_header = self._BuildContentRangeHeader( | |
319 '%d-%d' % (total_bytes_uploaded, file_length - 1), | |
320 file_length) | |
321 put_headers['Content-Range'] = range_header | |
322 # Set Content-Length to the total bytes we'll send with this PUT. | |
323 put_headers['Content-Length'] = str(file_length - total_bytes_uploaded) | |
324 http_request = AWSAuthConnection.build_base_http_request( | |
325 conn, 'PUT', path=self.upload_url_path, auth_path=None, | |
326 headers=put_headers, host=self.upload_url_host) | |
327 http_conn.putrequest('PUT', http_request.path) | |
328 for k in put_headers: | |
329 http_conn.putheader(k, put_headers[k]) | |
330 http_conn.endheaders() | |
331 | |
332 # Turn off debug on http connection so upload content isn't included | |
333 # in debug stream. | |
334 http_conn.set_debuglevel(0) | |
335 while buf: | |
336 http_conn.send(buf) | |
337 total_bytes_uploaded += len(buf) | |
338 if cb: | |
339 i += 1 | |
340 if i == cb_count or cb_count == -1: | |
341 cb(total_bytes_uploaded, file_length) | |
342 i = 0 | |
343 buf = fp.read(self.BUFFER_SIZE) | |
344 http_conn.set_debuglevel(conn.debug) | |
345 if cb: | |
346 cb(total_bytes_uploaded, file_length) | |
347 if total_bytes_uploaded != file_length: | |
348 # Abort (and delete the tracker file) so if the user retries | |
349 # they'll start a new resumable upload rather than potentially | |
350 # attempting to pick back up later where we left off. | |
351 raise ResumableUploadException( | |
352 'File changed during upload: EOF at %d bytes of %d byte file.' % | |
353 (total_bytes_uploaded, file_length), | |
354 ResumableTransferDisposition.ABORT) | |
355 resp = http_conn.getresponse() | |
356 # Restore http connection debug level. | |
357 http_conn.set_debuglevel(conn.debug) | |
358 | |
359 if resp.status == 200: | |
360 # Success. | |
361 return (resp.getheader('etag'), | |
362 resp.getheader('x-goog-generation'), | |
363 resp.getheader('x-goog-metageneration')) | |
364 # Retry timeout (408) and status 429, 500 and 503 errors after a delay. | |
365 elif resp.status in [408, 429, 500, 503]: | |
366 disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY | |
367 else: | |
368 # Catch all for any other error codes. | |
369 disposition = ResumableTransferDisposition.ABORT | |
370 raise ResumableUploadException('Got response code %d while attempting ' | |
371 'upload (%s)' % | |
372 (resp.status, resp.reason), disposition) | |
373 | |
374 def _AttemptResumableUpload(self, key, fp, file_length, headers, cb, | |
375 num_cb): | |
376 """Attempts a resumable upload. | |
377 | |
378 Args: | |
379 key: Boto key representing object to upload. | |
380 fp: File pointer containing upload bytes. | |
381 file_length: Total length of the upload. | |
382 headers: Headers to be used in upload requests. | |
383 cb: Progress callback function that takes (progress, total_size). | |
384 num_cb: Granularity of the callback (maximum number of times the | |
385 callback will be called during the file transfer). If negative, | |
386 perform callback with each buffer read. | |
387 | |
388 Returns: | |
389 (etag, generation, metageneration) from service upon success. | |
390 | |
391 Raises: | |
392 ResumableUploadException if any problems occur. | |
393 """ | |
394 (service_start, service_end) = self.SERVICE_HAS_NOTHING | |
395 conn = key.bucket.connection | |
396 if self.upload_url: | |
397 # Try to resume existing resumable upload. | |
398 try: | |
399 (service_start, service_end) = ( | |
400 self._QueryServicePos(conn, file_length)) | |
401 self.service_has_bytes = service_start | |
402 if conn.debug >= 1: | |
403 self.logger.debug('Resuming transfer.') | |
404 except ResumableUploadException, e: | |
405 if conn.debug >= 1: | |
406 self.logger.debug('Unable to resume transfer (%s).', e.message) | |
407 self._StartNewResumableUpload(key, headers) | |
408 else: | |
409 self._StartNewResumableUpload(key, headers) | |
410 | |
411 # upload_start_point allows the code that instantiated the | |
412 # ResumableUploadHandler to find out the point from which it started | |
413 # uploading (e.g., so it can correctly compute throughput). | |
414 if self.upload_start_point is None: | |
415 self.upload_start_point = service_end | |
416 | |
417 total_bytes_uploaded = service_end + 1 | |
418 | |
419 # Start reading from the file based upon the number of bytes that the | |
420 # server has so far. | |
421 if total_bytes_uploaded < file_length: | |
422 fp.seek(total_bytes_uploaded) | |
423 | |
424 conn = key.bucket.connection | |
425 | |
426 # Get a new HTTP connection (vs conn.get_http_connection(), which reuses | |
427 # pool connections) because httplib requires a new HTTP connection per | |
428 # transaction. (Without this, calling http_conn.getresponse() would get | |
429 # "ResponseNotReady".) | |
430 http_conn = conn.new_http_connection(self.upload_url_host, conn.port, | |
431 conn.is_secure) | |
432 http_conn.set_debuglevel(conn.debug) | |
433 | |
434 # Make sure to close http_conn at end so if a local file read | |
435 # failure occurs partway through service will terminate current upload | |
436 # and can report that progress on next attempt. | |
437 try: | |
438 return self._UploadFileBytes(conn, http_conn, fp, file_length, | |
439 total_bytes_uploaded, cb, num_cb, | |
440 headers) | |
441 except (ResumableUploadException, socket.error): | |
442 resp = self._QueryServiceState(conn, file_length) | |
443 if resp.status == 400: | |
444 raise ResumableUploadException( | |
445 'Got 400 response from service state query after failed resumable ' | |
446 'upload attempt. This can happen for various reasons, including ' | |
447 'specifying an invalid request (e.g., an invalid canned ACL) or ' | |
448 'if the file size changed between upload attempts', | |
449 ResumableTransferDisposition.ABORT) | |
450 else: | |
451 raise | |
452 finally: | |
453 http_conn.close() | |
454 | |
455 def HandleResumableUploadException(self, e, debug): | |
456 if e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS: | |
457 if debug >= 1: | |
458 self.logger.debug('Caught non-retryable ResumableUploadException (%s); ' | |
459 'aborting but retaining tracker file', e.message) | |
460 raise | |
461 elif e.disposition == ResumableTransferDisposition.ABORT: | |
462 if debug >= 1: | |
463 self.logger.debug('Caught non-retryable ResumableUploadException (%s); ' | |
464 'aborting and removing tracker file', e.message) | |
465 raise | |
466 elif e.disposition == ResumableTransferDisposition.START_OVER: | |
467 raise | |
468 else: | |
469 if debug >= 1: | |
470 self.logger.debug( | |
471 'Caught ResumableUploadException (%s) - will retry', e.message) | |
472 | |
473 def TrackProgressLessIterations(self, service_had_bytes_before_attempt, | |
474 debug=0): | |
475 """Tracks the number of iterations without progress. | |
476 | |
477 Performs randomized exponential backoff. | |
478 | |
479 Args: | |
480 service_had_bytes_before_attempt: Number of bytes the service had prior | |
481 to this upload attempt. | |
482 debug: debug level 0..3 | |
483 """ | |
484 # At this point we had a re-tryable failure; see if made progress. | |
485 if self.service_has_bytes > service_had_bytes_before_attempt: | |
486 self.progress_less_iterations = 0 # If progress, reset counter. | |
487 else: | |
488 self.progress_less_iterations += 1 | |
489 | |
490 if self.progress_less_iterations > self.num_retries: | |
491 # Don't retry any longer in the current process. | |
492 raise ResumableUploadException( | |
493 'Too many resumable upload attempts failed without ' | |
494 'progress. You might try this upload again later', | |
495 ResumableTransferDisposition.ABORT_CUR_PROCESS) | |
496 | |
497 # Use binary exponential backoff to desynchronize client requests. | |
498 sleep_time_secs = min(random.random() * (2**self.progress_less_iterations), | |
499 GetMaxRetryDelay()) | |
500 if debug >= 1: | |
501 self.logger.debug('Got retryable failure (%d progress-less in a row).\n' | |
502 'Sleeping %3.1f seconds before re-trying', | |
503 self.progress_less_iterations, sleep_time_secs) | |
504 time.sleep(sleep_time_secs) | |
505 | |
506 def SendFile(self, key, fp, size, headers, canned_acl=None, cb=None, | |
507 num_cb=XML_PROGRESS_CALLBACKS): | |
508 """Upload a file to a key into a bucket on GS, resumable upload protocol. | |
509 | |
510 Args: | |
511 key: `boto.s3.key.Key` or subclass representing the upload destination. | |
512 fp: File pointer to upload | |
513 size: Size of the file to upload. | |
514 headers: The headers to pass along with the PUT request | |
515 canned_acl: Optional canned ACL to apply to object. | |
516 cb: Callback function that will be called to report progress on | |
517 the upload. The callback should accept two integer parameters, the | |
518 first representing the number of bytes that have been successfully | |
519 transmitted to GS, and the second representing the total number of | |
520 bytes that need to be transmitted. | |
521 num_cb: (optional) If a callback is specified with the cb parameter, this | |
522 parameter determines the granularity of the callback by defining | |
523 the maximum number of times the callback will be called during the | |
524 file transfer. Providing a negative integer will cause your | |
525 callback to be called with each buffer read. | |
526 | |
527 Raises: | |
528 ResumableUploadException if a problem occurs during the transfer. | |
529 """ | |
530 | |
531 if not headers: | |
532 headers = {} | |
533 # If Content-Type header is present and set to None, remove it. | |
534 # This is gsutil's way of asking boto to refrain from auto-generating | |
535 # that header. | |
536 content_type = 'Content-Type' | |
537 if content_type in headers and headers[content_type] is None: | |
538 del headers[content_type] | |
539 | |
540 if canned_acl: | |
541 headers[key.provider.acl_header] = canned_acl | |
542 | |
543 headers['User-Agent'] = UserAgent | |
544 | |
545 file_length = size | |
546 debug = key.bucket.connection.debug | |
547 | |
548 # Use num-retries from constructor if one was provided; else check | |
549 # for a value specified in the boto config file; else default to 5. | |
550 if self.num_retries is None: | |
551 self.num_retries = GetNumRetries() | |
552 self.progress_less_iterations = 0 | |
553 | |
554 while True: # Retry as long as we're making progress. | |
555 service_had_bytes_before_attempt = self.service_has_bytes | |
556 try: | |
557 # Save generation and metageneration in class state so caller | |
558 # can find these values, for use in preconditions of future | |
559 # operations on the uploaded object. | |
560 (_, self.generation, self.metageneration) = ( | |
561 self._AttemptResumableUpload(key, fp, file_length, | |
562 headers, cb, num_cb)) | |
563 | |
564 key.generation = self.generation | |
565 if debug >= 1: | |
566 self.logger.debug('Resumable upload complete.') | |
567 return | |
568 except self.RETRYABLE_EXCEPTIONS, e: | |
569 if debug >= 1: | |
570 self.logger.debug('Caught exception (%s)', e.__repr__()) | |
571 if isinstance(e, IOError) and e.errno == errno.EPIPE: | |
572 # Broken pipe error causes httplib to immediately | |
573 # close the socket (http://bugs.python.org/issue5542), | |
574 # so we need to close the connection before we resume | |
575 # the upload (which will cause a new connection to be | |
576 # opened the next time an HTTP request is sent). | |
577 key.bucket.connection.connection.close() | |
578 except ResumableUploadException, e: | |
579 self.HandleResumableUploadException(e, debug) | |
580 | |
581 self.TrackProgressLessIterations(service_had_bytes_before_attempt, | |
582 debug=debug) | |
OLD | NEW |