Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(901)

Side by Side Diff: third_party/gsutil/gslib/boto_resumable_upload.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # -*- coding: utf-8 -*-
2 # Copyright 2010 Google Inc. All Rights Reserved.
3 #
4 # Permission is hereby granted, free of charge, to any person obtaining a
5 # copy of this software and associated documentation files (the
6 # "Software"), to deal in the Software without restriction, including
7 # without limitation the rights to use, copy, modify, merge, publish, dis-
8 # tribute, sublicense, and/or sell copies of the Software, and to permit
9 # persons to whom the Software is furnished to do so, subject to the fol-
10 # lowing conditions:
11 #
12 # The above copyright notice and this permission notice shall be included
13 # in all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 # IN THE SOFTWARE.
22 """Boto translation layer for resumable uploads.
23
24 See http://code.google.com/apis/storage/docs/developer-guide.html#resumable
25 for details.
26
27 Resumable uploads will retry interrupted uploads, resuming at the byte
28 count completed by the last upload attempt. If too many retries happen with
29 no progress (per configurable num_retries param), the upload will be
30 aborted in the current process.
31
32 Unlike the boto implementation of resumable upload handler, this class does
33 not directly interact with tracker files.
34
35 Originally Google wrote and contributed this code to the boto project,
36 then copied that code back into gsutil on the release of gsutil 4.0 which
37 supports both boto and non-boto codepaths for resumable uploads. Any bug
38 fixes made to this file should also be integrated to resumable_upload_handler.py
39 in boto, where applicable.
40
41 TODO: gsutil-beta: Add a similar comment to the boto code.
42 """
43
44 from __future__ import absolute_import
45
46 import errno
47 import httplib
48 import random
49 import re
50 import socket
51 import time
52 import urlparse
53 from boto import UserAgent
54 from boto.connection import AWSAuthConnection
55 from boto.exception import ResumableTransferDisposition
56 from boto.exception import ResumableUploadException
57 from gslib.exception import InvalidUrlError
58 from gslib.util import GetMaxRetryDelay
59 from gslib.util import GetNumRetries
60 from gslib.util import XML_PROGRESS_CALLBACKS
61
62
63 class BotoResumableUpload(object):
64 """Upload helper class for resumable uploads via boto."""
65
66 BUFFER_SIZE = 8192
67 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
68 socket.gaierror)
69
70 # (start, end) response indicating service has nothing (upload protocol uses
71 # inclusive numbering).
72 SERVICE_HAS_NOTHING = (0, -1)
73
74 def __init__(self, tracker_callback, logger,
75 resume_url=None, num_retries=None):
76 """Constructor. Instantiate once for each uploaded file.
77
78 Args:
79 tracker_callback: Callback function that takes a string argument. Used
80 by caller to track this upload across upload
81 interruption.
82 logger: logging.logger instance to use for debug messages.
83 resume_url: If present, attempt to resume the upload at this URL.
84 num_retries: Number of times to retry the upload making no progress.
85 This count resets every time we make progress, so the upload
86 can span many more than this number of retries.
87 """
88 if resume_url:
89 self._SetUploadUrl(resume_url)
90 else:
91 self.upload_url = None
92 self.num_retries = num_retries
93 self.service_has_bytes = 0 # Byte count at last service check.
94 # Save upload_start_point in instance state so caller can find how
95 # much was transferred by this ResumableUploadHandler (across retries).
96 self.upload_start_point = None
97 self.tracker_callback = tracker_callback
98 self.logger = logger
99
100 def _SetUploadUrl(self, url):
101 """Saves URL and resets upload state.
102
103 Called when we start a new resumable upload or get a new tracker
104 URL for the upload.
105
106 Args:
107 url: URL string for the upload.
108
109 Raises InvalidUrlError if URL is syntactically invalid.
110 """
111 parse_result = urlparse.urlparse(url)
112 if (parse_result.scheme.lower() not in ['http', 'https'] or
113 not parse_result.netloc):
114 raise InvalidUrlError('Invalid upload URL (%s)' % url)
115 self.upload_url = url
116 self.upload_url_host = parse_result.netloc
117 self.upload_url_path = '%s?%s' % (
118 parse_result.path, parse_result.query)
119 self.service_has_bytes = 0
120
121 def _BuildContentRangeHeader(self, range_spec='*', length_spec='*'):
122 return 'bytes %s/%s' % (range_spec, length_spec)
123
124 def _QueryServiceState(self, conn, file_length):
125 """Queries service to find out state of given upload.
126
127 Note that this method really just makes special case use of the
128 fact that the upload service always returns the current start/end
129 state whenever a PUT doesn't complete.
130
131 Args:
132 conn: HTTPConnection to use for the query.
133 file_length: Total length of the file.
134
135 Returns:
136 HTTP response from sending request.
137
138 Raises:
139 ResumableUploadException if problem querying service.
140 """
141 # Send an empty PUT so that service replies with this resumable
142 # transfer's state.
143 put_headers = {}
144 put_headers['Content-Range'] = (
145 self._BuildContentRangeHeader('*', file_length))
146 put_headers['Content-Length'] = '0'
147 return AWSAuthConnection.make_request(
148 conn, 'PUT', path=self.upload_url_path, auth_path=self.upload_url_path,
149 headers=put_headers, host=self.upload_url_host)
150
151 def _QueryServicePos(self, conn, file_length):
152 """Queries service to find out what bytes it currently has.
153
154 Args:
155 conn: HTTPConnection to use for the query.
156 file_length: Total length of the file.
157
158 Returns:
159 (service_start, service_end), where the values are inclusive.
160 For example, (0, 2) would mean that the service has bytes 0, 1, *and* 2.
161
162 Raises:
163 ResumableUploadException if problem querying service.
164 """
165 resp = self._QueryServiceState(conn, file_length)
166 if resp.status == 200:
167 # To handle the boundary condition where the service has the complete
168 # file, we return (service_start, file_length-1). That way the
169 # calling code can always simply read up through service_end. (If we
170 # didn't handle this boundary condition here, the caller would have
171 # to check whether service_end == file_length and read one fewer byte
172 # in that case.)
173 return (0, file_length - 1) # Completed upload.
174 if resp.status != 308:
175 # This means the service didn't have any state for the given
176 # upload ID, which can happen (for example) if the caller saved
177 # the upload URL to a file and then tried to restart the transfer
178 # after that upload ID has gone stale. In that case we need to
179 # start a new transfer (and the caller will then save the new
180 # upload URL to the tracker file).
181 raise ResumableUploadException(
182 'Got non-308 response (%s) from service state query' %
183 resp.status, ResumableTransferDisposition.START_OVER)
184 got_valid_response = False
185 range_spec = resp.getheader('range')
186 if range_spec:
187 # Parse 'bytes=<from>-<to>' range_spec.
188 m = re.search(r'bytes=(\d+)-(\d+)', range_spec)
189 if m:
190 service_start = long(m.group(1))
191 service_end = long(m.group(2))
192 got_valid_response = True
193 else:
194 # No Range header, which means the service does not yet have
195 # any bytes. Note that the Range header uses inclusive 'from'
196 # and 'to' values. Since Range 0-0 would mean that the service
197 # has byte 0, omitting the Range header is used to indicate that
198 # the service doesn't have any bytes.
199 return self.SERVICE_HAS_NOTHING
200 if not got_valid_response:
201 raise ResumableUploadException(
202 'Couldn\'t parse upload service state query response (%s)' %
203 str(resp.getheaders()), ResumableTransferDisposition.START_OVER)
204 if conn.debug >= 1:
205 self.logger.debug('Service has: Range: %d - %d.', service_start,
206 service_end)
207 return (service_start, service_end)
208
209 def _StartNewResumableUpload(self, key, headers=None):
210 """Starts a new resumable upload.
211
212 Args:
213 key: Boto Key representing the object to upload.
214 headers: Headers to use in the upload requests.
215
216 Raises:
217 ResumableUploadException if any errors occur.
218 """
219 conn = key.bucket.connection
220 if conn.debug >= 1:
221 self.logger.debug('Starting new resumable upload.')
222 self.service_has_bytes = 0
223
224 # Start a new resumable upload by sending a POST request with an
225 # empty body and the "X-Goog-Resumable: start" header. Include any
226 # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length
227 # (and raise an exception if they tried to pass one, since it's
228 # a semantic error to specify it at this point, and if we were to
229 # include one now it would cause the service to expect that many
230 # bytes; the POST doesn't include the actual file bytes We set
231 # the Content-Length in the subsequent PUT, based on the uploaded
232 # file size.
233 post_headers = {}
234 for k in headers:
235 if k.lower() == 'content-length':
236 raise ResumableUploadException(
237 'Attempt to specify Content-Length header (disallowed)',
238 ResumableTransferDisposition.ABORT)
239 post_headers[k] = headers[k]
240 post_headers[conn.provider.resumable_upload_header] = 'start'
241
242 resp = conn.make_request(
243 'POST', key.bucket.name, key.name, post_headers)
244 # Get upload URL from response 'Location' header.
245 body = resp.read()
246
247 # Check for various status conditions.
248 if resp.status in [429, 500, 503]:
249 # Retry after a delay.
250 raise ResumableUploadException(
251 'Got status %d from attempt to start resumable upload. '
252 'Will wait/retry' % resp.status,
253 ResumableTransferDisposition.WAIT_BEFORE_RETRY)
254 elif resp.status != 200 and resp.status != 201:
255 raise ResumableUploadException(
256 'Got status %d from attempt to start resumable upload. '
257 'Aborting' % resp.status,
258 ResumableTransferDisposition.ABORT)
259
260 # Else we got 200 or 201 response code, indicating the resumable
261 # upload was created.
262 upload_url = resp.getheader('Location')
263 if not upload_url:
264 raise ResumableUploadException(
265 'No resumable upload URL found in resumable initiation '
266 'POST response (%s)' % body,
267 ResumableTransferDisposition.WAIT_BEFORE_RETRY)
268 self._SetUploadUrl(upload_url)
269 self.tracker_callback(upload_url)
270
271 def _UploadFileBytes(self, conn, http_conn, fp, file_length,
272 total_bytes_uploaded, cb, num_cb, headers):
273 """Attempts to upload file bytes.
274
275 Makes a single attempt using an existing resumable upload connection.
276
277 Args:
278 conn: HTTPConnection from the boto Key.
279 http_conn: Separate HTTPConnection for the transfer.
280 fp: File pointer containing bytes to upload.
281 file_length: Total length of the file.
282 total_bytes_uploaded: The total number of bytes uploaded.
283 cb: Progress callback function that takes (progress, total_size).
284 num_cb: Granularity of the callback (maximum number of times the
285 callback will be called during the file transfer). If negative,
286 perform callback with each buffer read.
287 headers: Headers to be used in the upload requests.
288
289 Returns:
290 (etag, generation, metageneration) from service upon success.
291
292 Raises:
293 ResumableUploadException if any problems occur.
294 """
295 buf = fp.read(self.BUFFER_SIZE)
296 if cb:
297 # The cb_count represents the number of full buffers to send between
298 # cb executions.
299 if num_cb > 2:
300 cb_count = file_length / self.BUFFER_SIZE / (num_cb-2)
301 elif num_cb < 0:
302 cb_count = -1
303 else:
304 cb_count = 0
305 i = 0
306 cb(total_bytes_uploaded, file_length)
307
308 # Build resumable upload headers for the transfer. Don't send a
309 # Content-Range header if the file is 0 bytes long, because the
310 # resumable upload protocol uses an *inclusive* end-range (so, sending
311 # 'bytes 0-0/1' would actually mean you're sending a 1-byte file).
312 put_headers = headers.copy() if headers else {}
313 if file_length:
314 if total_bytes_uploaded == file_length:
315 range_header = self._BuildContentRangeHeader(
316 '*', file_length)
317 else:
318 range_header = self._BuildContentRangeHeader(
319 '%d-%d' % (total_bytes_uploaded, file_length - 1),
320 file_length)
321 put_headers['Content-Range'] = range_header
322 # Set Content-Length to the total bytes we'll send with this PUT.
323 put_headers['Content-Length'] = str(file_length - total_bytes_uploaded)
324 http_request = AWSAuthConnection.build_base_http_request(
325 conn, 'PUT', path=self.upload_url_path, auth_path=None,
326 headers=put_headers, host=self.upload_url_host)
327 http_conn.putrequest('PUT', http_request.path)
328 for k in put_headers:
329 http_conn.putheader(k, put_headers[k])
330 http_conn.endheaders()
331
332 # Turn off debug on http connection so upload content isn't included
333 # in debug stream.
334 http_conn.set_debuglevel(0)
335 while buf:
336 http_conn.send(buf)
337 total_bytes_uploaded += len(buf)
338 if cb:
339 i += 1
340 if i == cb_count or cb_count == -1:
341 cb(total_bytes_uploaded, file_length)
342 i = 0
343 buf = fp.read(self.BUFFER_SIZE)
344 http_conn.set_debuglevel(conn.debug)
345 if cb:
346 cb(total_bytes_uploaded, file_length)
347 if total_bytes_uploaded != file_length:
348 # Abort (and delete the tracker file) so if the user retries
349 # they'll start a new resumable upload rather than potentially
350 # attempting to pick back up later where we left off.
351 raise ResumableUploadException(
352 'File changed during upload: EOF at %d bytes of %d byte file.' %
353 (total_bytes_uploaded, file_length),
354 ResumableTransferDisposition.ABORT)
355 resp = http_conn.getresponse()
356 # Restore http connection debug level.
357 http_conn.set_debuglevel(conn.debug)
358
359 if resp.status == 200:
360 # Success.
361 return (resp.getheader('etag'),
362 resp.getheader('x-goog-generation'),
363 resp.getheader('x-goog-metageneration'))
364 # Retry timeout (408) and status 429, 500 and 503 errors after a delay.
365 elif resp.status in [408, 429, 500, 503]:
366 disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY
367 else:
368 # Catch all for any other error codes.
369 disposition = ResumableTransferDisposition.ABORT
370 raise ResumableUploadException('Got response code %d while attempting '
371 'upload (%s)' %
372 (resp.status, resp.reason), disposition)
373
374 def _AttemptResumableUpload(self, key, fp, file_length, headers, cb,
375 num_cb):
376 """Attempts a resumable upload.
377
378 Args:
379 key: Boto key representing object to upload.
380 fp: File pointer containing upload bytes.
381 file_length: Total length of the upload.
382 headers: Headers to be used in upload requests.
383 cb: Progress callback function that takes (progress, total_size).
384 num_cb: Granularity of the callback (maximum number of times the
385 callback will be called during the file transfer). If negative,
386 perform callback with each buffer read.
387
388 Returns:
389 (etag, generation, metageneration) from service upon success.
390
391 Raises:
392 ResumableUploadException if any problems occur.
393 """
394 (service_start, service_end) = self.SERVICE_HAS_NOTHING
395 conn = key.bucket.connection
396 if self.upload_url:
397 # Try to resume existing resumable upload.
398 try:
399 (service_start, service_end) = (
400 self._QueryServicePos(conn, file_length))
401 self.service_has_bytes = service_start
402 if conn.debug >= 1:
403 self.logger.debug('Resuming transfer.')
404 except ResumableUploadException, e:
405 if conn.debug >= 1:
406 self.logger.debug('Unable to resume transfer (%s).', e.message)
407 self._StartNewResumableUpload(key, headers)
408 else:
409 self._StartNewResumableUpload(key, headers)
410
411 # upload_start_point allows the code that instantiated the
412 # ResumableUploadHandler to find out the point from which it started
413 # uploading (e.g., so it can correctly compute throughput).
414 if self.upload_start_point is None:
415 self.upload_start_point = service_end
416
417 total_bytes_uploaded = service_end + 1
418
419 # Start reading from the file based upon the number of bytes that the
420 # server has so far.
421 if total_bytes_uploaded < file_length:
422 fp.seek(total_bytes_uploaded)
423
424 conn = key.bucket.connection
425
426 # Get a new HTTP connection (vs conn.get_http_connection(), which reuses
427 # pool connections) because httplib requires a new HTTP connection per
428 # transaction. (Without this, calling http_conn.getresponse() would get
429 # "ResponseNotReady".)
430 http_conn = conn.new_http_connection(self.upload_url_host, conn.port,
431 conn.is_secure)
432 http_conn.set_debuglevel(conn.debug)
433
434 # Make sure to close http_conn at end so if a local file read
435 # failure occurs partway through service will terminate current upload
436 # and can report that progress on next attempt.
437 try:
438 return self._UploadFileBytes(conn, http_conn, fp, file_length,
439 total_bytes_uploaded, cb, num_cb,
440 headers)
441 except (ResumableUploadException, socket.error):
442 resp = self._QueryServiceState(conn, file_length)
443 if resp.status == 400:
444 raise ResumableUploadException(
445 'Got 400 response from service state query after failed resumable '
446 'upload attempt. This can happen for various reasons, including '
447 'specifying an invalid request (e.g., an invalid canned ACL) or '
448 'if the file size changed between upload attempts',
449 ResumableTransferDisposition.ABORT)
450 else:
451 raise
452 finally:
453 http_conn.close()
454
455 def HandleResumableUploadException(self, e, debug):
456 if e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS:
457 if debug >= 1:
458 self.logger.debug('Caught non-retryable ResumableUploadException (%s); '
459 'aborting but retaining tracker file', e.message)
460 raise
461 elif e.disposition == ResumableTransferDisposition.ABORT:
462 if debug >= 1:
463 self.logger.debug('Caught non-retryable ResumableUploadException (%s); '
464 'aborting and removing tracker file', e.message)
465 raise
466 elif e.disposition == ResumableTransferDisposition.START_OVER:
467 raise
468 else:
469 if debug >= 1:
470 self.logger.debug(
471 'Caught ResumableUploadException (%s) - will retry', e.message)
472
473 def TrackProgressLessIterations(self, service_had_bytes_before_attempt,
474 debug=0):
475 """Tracks the number of iterations without progress.
476
477 Performs randomized exponential backoff.
478
479 Args:
480 service_had_bytes_before_attempt: Number of bytes the service had prior
481 to this upload attempt.
482 debug: debug level 0..3
483 """
484 # At this point we had a re-tryable failure; see if made progress.
485 if self.service_has_bytes > service_had_bytes_before_attempt:
486 self.progress_less_iterations = 0 # If progress, reset counter.
487 else:
488 self.progress_less_iterations += 1
489
490 if self.progress_less_iterations > self.num_retries:
491 # Don't retry any longer in the current process.
492 raise ResumableUploadException(
493 'Too many resumable upload attempts failed without '
494 'progress. You might try this upload again later',
495 ResumableTransferDisposition.ABORT_CUR_PROCESS)
496
497 # Use binary exponential backoff to desynchronize client requests.
498 sleep_time_secs = min(random.random() * (2**self.progress_less_iterations),
499 GetMaxRetryDelay())
500 if debug >= 1:
501 self.logger.debug('Got retryable failure (%d progress-less in a row).\n'
502 'Sleeping %3.1f seconds before re-trying',
503 self.progress_less_iterations, sleep_time_secs)
504 time.sleep(sleep_time_secs)
505
506 def SendFile(self, key, fp, size, headers, canned_acl=None, cb=None,
507 num_cb=XML_PROGRESS_CALLBACKS):
508 """Upload a file to a key into a bucket on GS, resumable upload protocol.
509
510 Args:
511 key: `boto.s3.key.Key` or subclass representing the upload destination.
512 fp: File pointer to upload
513 size: Size of the file to upload.
514 headers: The headers to pass along with the PUT request
515 canned_acl: Optional canned ACL to apply to object.
516 cb: Callback function that will be called to report progress on
517 the upload. The callback should accept two integer parameters, the
518 first representing the number of bytes that have been successfully
519 transmitted to GS, and the second representing the total number of
520 bytes that need to be transmitted.
521 num_cb: (optional) If a callback is specified with the cb parameter, this
522 parameter determines the granularity of the callback by defining
523 the maximum number of times the callback will be called during the
524 file transfer. Providing a negative integer will cause your
525 callback to be called with each buffer read.
526
527 Raises:
528 ResumableUploadException if a problem occurs during the transfer.
529 """
530
531 if not headers:
532 headers = {}
533 # If Content-Type header is present and set to None, remove it.
534 # This is gsutil's way of asking boto to refrain from auto-generating
535 # that header.
536 content_type = 'Content-Type'
537 if content_type in headers and headers[content_type] is None:
538 del headers[content_type]
539
540 if canned_acl:
541 headers[key.provider.acl_header] = canned_acl
542
543 headers['User-Agent'] = UserAgent
544
545 file_length = size
546 debug = key.bucket.connection.debug
547
548 # Use num-retries from constructor if one was provided; else check
549 # for a value specified in the boto config file; else default to 5.
550 if self.num_retries is None:
551 self.num_retries = GetNumRetries()
552 self.progress_less_iterations = 0
553
554 while True: # Retry as long as we're making progress.
555 service_had_bytes_before_attempt = self.service_has_bytes
556 try:
557 # Save generation and metageneration in class state so caller
558 # can find these values, for use in preconditions of future
559 # operations on the uploaded object.
560 (_, self.generation, self.metageneration) = (
561 self._AttemptResumableUpload(key, fp, file_length,
562 headers, cb, num_cb))
563
564 key.generation = self.generation
565 if debug >= 1:
566 self.logger.debug('Resumable upload complete.')
567 return
568 except self.RETRYABLE_EXCEPTIONS, e:
569 if debug >= 1:
570 self.logger.debug('Caught exception (%s)', e.__repr__())
571 if isinstance(e, IOError) and e.errno == errno.EPIPE:
572 # Broken pipe error causes httplib to immediately
573 # close the socket (http://bugs.python.org/issue5542),
574 # so we need to close the connection before we resume
575 # the upload (which will cause a new connection to be
576 # opened the next time an HTTP request is sent).
577 key.bucket.connection.connection.close()
578 except ResumableUploadException, e:
579 self.HandleResumableUploadException(e, debug)
580
581 self.TrackProgressLessIterations(service_had_bytes_before_attempt,
582 debug=debug)
OLDNEW
« no previous file with comments | « third_party/gsutil/gslib/addlhelp/wildcards.py ('k') | third_party/gsutil/gslib/boto_translation.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698