Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(229)

Side by Side Diff: gslib/third_party/storage_apitools/transfer.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
Property Changes:
Added: svn:eol-style
+ LF
OLDNEW
(Empty)
1 # Copyright 2014 Google Inc. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #!/usr/bin/env python
15 """Upload and download support for apitools."""
16
17 import email.generator as email_generator
18 import email.mime.multipart as mime_multipart
19 import email.mime.nonmultipart as mime_nonmultipart
20 import httplib
21 import io
22 import json
23 import mimetypes
24 import os
25 import StringIO
26 import threading
27
28 from apiclient import mimeparse
29
30 from gslib.third_party.storage_apitools import exceptions
31 from gslib.third_party.storage_apitools import http_wrapper
32 from gslib.third_party.storage_apitools import stream_slice
33
34 __all__ = [
35 'Download',
36 'Upload',
37 ]
38
39 _RESUMABLE_UPLOAD_THRESHOLD = 5 << 20
40 _SIMPLE_UPLOAD = 'simple'
41 _RESUMABLE_UPLOAD = 'resumable'
42
43
44 class _Transfer(object):
45 """Generic bits common to Uploads and Downloads."""
46
47 def __init__(self, stream, close_stream=False, chunksize=None,
48 auto_transfer=True, total_size=None, http=None):
49 self.__bytes_http = None
50 self.__close_stream = close_stream
51 self.__http = http
52 self.__stream = stream
53 self.__url = None
54
55 self.retry_func = http_wrapper.HandleExceptionsAndRebuildHttpConnections
56 self.auto_transfer = auto_transfer
57 self.chunksize = chunksize or 1048576L
58
59 def __repr__(self):
60 return str(self)
61
62 @property
63 def close_stream(self):
64 return self.__close_stream
65
66 @property
67 def http(self):
68 return self.__http
69
70 @property
71 def bytes_http(self):
72 return self.__bytes_http or self.http
73
74 @bytes_http.setter
75 def bytes_http(self, value):
76 self.__bytes_http = value
77
78 @property
79 def stream(self):
80 return self.__stream
81
82 @property
83 def url(self):
84 return self.__url
85
86 def _Initialize(self, http, url):
87 """Initialize this download by setting self.http and self.url.
88
89 We want the user to be able to override self.http by having set
90 the value in the constructor; in that case, we ignore the provided
91 http.
92
93 Args:
94 http: An httplib2.Http instance or None.
95 url: The url for this transfer.
96
97 Returns:
98 None. Initializes self.
99 """
100 self.EnsureUninitialized()
101 if self.http is None:
102 self.__http = http or http_wrapper.GetHttp()
103 self.__url = url
104
105 @property
106 def initialized(self):
107 return self.url is not None and self.http is not None
108
109 @property
110 def _type_name(self):
111 return type(self).__name__
112
113 def EnsureInitialized(self):
114 if not self.initialized:
115 raise exceptions.TransferInvalidError(
116 'Cannot use uninitialized %s', self._type_name)
117
118 def EnsureUninitialized(self):
119 if self.initialized:
120 raise exceptions.TransferInvalidError(
121 'Cannot re-initialize %s', self._type_name)
122
123 def __del__(self):
124 if self.__close_stream:
125 self.__stream.close()
126
127 def _ExecuteCallback(self, callback, response):
128 # TODO: Push these into a queue.
129 if callback is not None:
130 threading.Thread(target=callback, args=(response, self)).start()
131
132
133 class Download(_Transfer):
134 """Data for a single download.
135
136 Public attributes:
137 chunksize: default chunksize to use for transfers.
138 """
139 _ACCEPTABLE_STATUSES = set((
140 httplib.OK,
141 httplib.NO_CONTENT,
142 httplib.PARTIAL_CONTENT,
143 httplib.REQUESTED_RANGE_NOT_SATISFIABLE,
144 ))
145 _REQUIRED_SERIALIZATION_KEYS = set((
146 'auto_transfer', 'progress', 'total_size', 'url'))
147
148 def __init__(self, *args, **kwds):
149 super(Download, self).__init__(*args, **kwds)
150 self.__initial_response = None
151 self.__progress = 0
152 self.__total_size = kwds['total_size'] if 'total_size' in kwds else None
153 self.__encoding = None
154
155 @property
156 def progress(self):
157 return self.__progress
158
159 @property
160 def encoding(self):
161 return self.__encoding
162
163 @classmethod
164 def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds):
165 """Create a new download object from a filename."""
166 path = os.path.expanduser(filename)
167 if os.path.exists(path) and not overwrite:
168 raise exceptions.InvalidUserInputError(
169 'File %s exists and overwrite not specified' % path)
170 return cls(open(path, 'wb'), close_stream=True, auto_transfer=auto_transfer,
171 **kwds)
172
173 @classmethod
174 def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds):
175 """Create a new Download object from a stream."""
176 return cls(stream, auto_transfer=auto_transfer, total_size=total_size,
177 **kwds)
178
179 @classmethod
180 def FromData(cls, stream, json_data, http=None, auto_transfer=None):
181 """Create a new Download object from a stream and serialized data."""
182 info = json.loads(json_data)
183 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
184 if missing_keys:
185 raise exceptions.InvalidDataError(
186 'Invalid serialization data, missing keys: %s' % (
187 ', '.join(missing_keys)))
188 download = cls.FromStream(stream)
189 if auto_transfer is not None:
190 download.auto_transfer = auto_transfer
191 else:
192 download.auto_transfer = info['auto_transfer']
193 setattr(download, '_Download__progress', info['progress'])
194 setattr(download, '_Download__total_size', info['total_size'])
195 download._Initialize(http, info['url']) # pylint: disable=protected-access
196 return download
197
198 @property
199 def serialization_data(self):
200 self.EnsureInitialized()
201 return {
202 'auto_transfer': self.auto_transfer,
203 'progress': self.progress,
204 'total_size': self.total_size,
205 'url': self.url,
206 }
207
208 @property
209 def total_size(self):
210 return self.__total_size
211
212 def __str__(self):
213 if not self.initialized:
214 return 'Download (uninitialized)'
215 else:
216 return 'Download with %d/%s bytes transferred from url %s' % (
217 self.progress, self.total_size, self.url)
218
219 def ConfigureRequest(self, http_request, url_builder):
220 url_builder.query_params['alt'] = 'media'
221 http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,)
222
223 def __SetTotal(self, info):
224 if 'content-range' in info:
225 _, _, total = info['content-range'].rpartition('/')
226 if total != '*':
227 self.__total_size = int(total)
228 # Note "total_size is None" means we don't know it; if no size
229 # info was returned on our initial range request, that means we
230 # have a 0-byte file. (That last statement has been verified
231 # empirically, but is not clearly documented anywhere.)
232 if self.total_size is None:
233 self.__total_size = 0
234
235 def InitializeDownload(self, http_request, http=None, client=None):
236 """Initialize this download by making a request.
237
238 Args:
239 http_request: The HttpRequest to use to initialize this download.
240 http: The httplib2.Http instance for this request.
241 client: If provided, let this client process the final URL before
242 sending any additional requests. If client is provided and
243 http is not, client.http will be used instead.
244 """
245 self.EnsureUninitialized()
246 if http is None and client is None:
247 raise exceptions.UserError('Must provide client or http.')
248 http = http or client.http
249 if client is not None:
250 http_request.url = client.FinalizeTransferUrl(http_request.url)
251 url = http_request.url
252 if client is not None:
253 url = client.FinalizeTransferUrl(url)
254 self._Initialize(http, url)
255 # Unless the user has requested otherwise, we want to just
256 # go ahead and pump the bytes now.
257 if self.auto_transfer:
258 self.StreamInChunks()
259
260 @staticmethod
261 def _ArgPrinter(response, unused_download):
262 if 'content-range' in response.info:
263 print 'Received %s' % response.info['content-range']
264 else:
265 print 'Received %d bytes' % response.length
266
267 @staticmethod
268 def _CompletePrinter(*unused_args):
269 print 'Download complete'
270
271 def __NormalizeStartEnd(self, start, end=None):
272 if end is not None:
273 if start < 0:
274 raise exceptions.TransferInvalidError(
275 'Cannot have end index with negative start index')
276 elif start >= self.total_size:
277 raise exceptions.TransferInvalidError(
278 'Cannot have start index greater than total size')
279 end = min(end, self.total_size - 1)
280 if end < start:
281 raise exceptions.TransferInvalidError(
282 'Range requested with end[%s] < start[%s]' % (end, start))
283 return start, end
284 else:
285 if start < 0:
286 start = max(0, start + self.total_size)
287 return start, self.total_size
288
289 def __SetRangeHeader(self, request, start, end=None):
290 if start < 0:
291 request.headers['range'] = 'bytes=%d' % start
292 elif end is None:
293 request.headers['range'] = 'bytes=%d-' % start
294 else:
295 request.headers['range'] = 'bytes=%d-%d' % (start, end)
296
297 def __GetChunk(self, start, end=None, additional_headers=None):
298 """Retrieve a chunk, and return the full response."""
299 self.EnsureInitialized()
300 end_byte = end
301 if self.total_size and end:
302 end_byte = min(end, self.total_size)
303 request = http_wrapper.Request(url=self.url)
304 self.__SetRangeHeader(request, start, end=end_byte)
305 if additional_headers is not None:
306 request.headers.update(additional_headers)
307 return http_wrapper.MakeRequest(
308 self.bytes_http, request, retry_func=self.retry_func)
309
310 def __ProcessResponse(self, response):
311 """Process this response (by updating self and writing to self.stream)."""
312 if response.status_code not in self._ACCEPTABLE_STATUSES:
313 raise exceptions.TransferRetryError(response.content)
314 if response.status_code in (httplib.OK, httplib.PARTIAL_CONTENT):
315 self.stream.write(response.content)
316 self.__progress += response.length
317 if response.info and 'content-encoding' in response.info:
318 # TODO: Handle the case where this changes over a download.
319 self.__encoding = response.info['content-encoding']
320 elif response.status_code == httplib.NO_CONTENT:
321 # It's important to write something to the stream for the case
322 # of a 0-byte download to a file, as otherwise python won't
323 # create the file.
324 self.stream.write('')
325 return response
326
327 def GetRange(self, start, end=None, additional_headers=None):
328 """Retrieve a given byte range from this download, inclusive.
329
330 Range must be of one of these three forms:
331 * 0 <= start, end = None: Fetch from start to the end of the file.
332 * 0 <= start <= end: Fetch the bytes from start to end.
333 * start < 0, end = None: Fetch the last -start bytes of the file.
334
335 (These variations correspond to those described in the HTTP 1.1
336 protocol for range headers in RFC 2616, sec. 14.35.1.)
337
338 Args:
339 start: (int) Where to start fetching bytes. (See above.)
340 end: (int, optional) Where to stop fetching bytes. (See above.)
341 additional_headers: (bool, optional) Any additional headers to
342 pass with the request.
343
344 Returns:
345 None. Streams bytes into self.stream.
346 """
347 self.EnsureInitialized()
348 progress_end_normalized = False
349 if self.total_size is not None:
350 progress, end = self.__NormalizeStartEnd(start, end)
351 progress_end_normalized = True
352 else:
353 progress = start
354 while not progress_end_normalized or progress < end:
355 response = self.__GetChunk(progress, end=end,
356 additional_headers=additional_headers)
357 if not progress_end_normalized:
358 self.__SetTotal(response.info)
359 progress, end = self.__NormalizeStartEnd(start, end)
360 progress_end_normalized = True
361 response = self.__ProcessResponse(response)
362 progress += response.length
363 if not response:
364 raise exceptions.TransferRetryError(
365 'Zero bytes unexpectedly returned in download response')
366
367 def StreamInChunks(self, callback=None, finish_callback=None,
368 additional_headers=None):
369 """Stream the entire download."""
370 callback = callback or self._ArgPrinter
371 finish_callback = finish_callback or self._CompletePrinter
372
373 self.EnsureInitialized()
374 while True:
375 if self.__initial_response is not None:
376 response = self.__initial_response
377 self.__initial_response = None
378 else:
379 response = self.__GetChunk(self.progress,
380 additional_headers=additional_headers)
381 response = self.__ProcessResponse(response)
382 self._ExecuteCallback(callback, response)
383 if (response.status_code == httplib.OK or
384 self.progress >= self.total_size):
385 break
386 self._ExecuteCallback(finish_callback, response)
387
388
389 class Upload(_Transfer):
390 """Data for a single Upload.
391
392 Fields:
393 stream: The stream to upload.
394 mime_type: MIME type of the upload.
395 total_size: (optional) Total upload size for the stream.
396 close_stream: (default: False) Whether or not we should close the
397 stream when finished with the upload.
398 auto_transfer: (default: True) If True, stream all bytes as soon as
399 the upload is created.
400 """
401 _REQUIRED_SERIALIZATION_KEYS = set((
402 'auto_transfer', 'mime_type', 'total_size', 'url'))
403
404 def __init__(self, stream, mime_type, total_size=None, http=None,
405 close_stream=False, chunksize=None, auto_transfer=True):
406 super(Upload, self).__init__(
407 stream, close_stream=close_stream, chunksize=chunksize,
408 auto_transfer=auto_transfer, http=http)
409 self.__complete = False
410 self.__final_response = None
411 self.__mime_type = mime_type
412 self.__progress = 0
413 self.__server_chunk_granularity = None
414 self.__strategy = None
415
416 self.total_size = total_size
417
418 @property
419 def progress(self):
420 return self.__progress
421
422 @classmethod
423 def FromFile(cls, filename, mime_type=None, auto_transfer=True):
424 """Create a new Upload object from a filename."""
425 path = os.path.expanduser(filename)
426 if not os.path.exists(path):
427 raise exceptions.NotFoundError('Could not find file %s' % path)
428 if not mime_type:
429 mime_type, _ = mimetypes.guess_type(path)
430 if mime_type is None:
431 raise exceptions.InvalidUserInputError(
432 'Could not guess mime type for %s' % path)
433 size = os.stat(path).st_size
434 return cls(open(path, 'rb'), mime_type, total_size=size, close_stream=True,
435 auto_transfer=auto_transfer)
436
437 @classmethod
438 def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True):
439 """Create a new Upload object from a stream."""
440 if mime_type is None:
441 raise exceptions.InvalidUserInputError(
442 'No mime_type specified for stream')
443 return cls(stream, mime_type, total_size=total_size, close_stream=False,
444 auto_transfer=auto_transfer)
445
446 @classmethod
447 def FromData(cls, stream, json_data, http, auto_transfer=None):
448 """Create a new Upload of stream from serialized json_data using http."""
449 info = json.loads(json_data)
450 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
451 if missing_keys:
452 raise exceptions.InvalidDataError(
453 'Invalid serialization data, missing keys: %s' % (
454 ', '.join(missing_keys)))
455 upload = cls.FromStream(stream, info['mime_type'],
456 total_size=info.get('total_size'))
457 if isinstance(stream, io.IOBase) and not stream.seekable():
458 raise exceptions.InvalidUserInputError(
459 'Cannot restart resumable upload on non-seekable stream')
460 if auto_transfer is not None:
461 upload.auto_transfer = auto_transfer
462 else:
463 upload.auto_transfer = info['auto_transfer']
464 upload.strategy = _RESUMABLE_UPLOAD
465 upload._Initialize(http, info['url']) # pylint: disable=protected-access
466 upload.RefreshResumableUploadState()
467 upload.EnsureInitialized()
468 if upload.auto_transfer:
469 upload.StreamInChunks()
470 return upload
471
472 @property
473 def serialization_data(self):
474 self.EnsureInitialized()
475 if self.strategy != _RESUMABLE_UPLOAD:
476 raise exceptions.InvalidDataError(
477 'Serialization only supported for resumable uploads')
478 return {
479 'auto_transfer': self.auto_transfer,
480 'mime_type': self.mime_type,
481 'total_size': self.total_size,
482 'url': self.url,
483 }
484
485 @property
486 def complete(self):
487 return self.__complete
488
489 @property
490 def mime_type(self):
491 return self.__mime_type
492
493 def __str__(self):
494 if not self.initialized:
495 return 'Upload (uninitialized)'
496 else:
497 return 'Upload with %d/%s bytes transferred for url %s' % (
498 self.progress, self.total_size or '???', self.url)
499
500 @property
501 def strategy(self):
502 return self.__strategy
503
504 @strategy.setter
505 def strategy(self, value):
506 if value not in (_SIMPLE_UPLOAD, _RESUMABLE_UPLOAD):
507 raise exceptions.UserError((
508 'Invalid value "%s" for upload strategy, must be one of '
509 '"simple" or "resumable".') % value)
510 self.__strategy = value
511
512 @property
513 def total_size(self):
514 return self.__total_size
515
516 @total_size.setter
517 def total_size(self, value):
518 self.EnsureUninitialized()
519 self.__total_size = value
520
521 def __SetDefaultUploadStrategy(self, upload_config, http_request):
522 """Determine and set the default upload strategy for this upload.
523
524 We generally prefer simple or multipart, unless we're forced to
525 use resumable. This happens when any of (1) the upload is too
526 large, (2) the simple endpoint doesn't support multipart requests
527 and we have metadata, or (3) there is no simple upload endpoint.
528
529 Args:
530 upload_config: Configuration for the upload endpoint.
531 http_request: The associated http request.
532
533 Returns:
534 None.
535 """
536 if self.strategy is not None:
537 return
538 strategy = _SIMPLE_UPLOAD
539 if (self.total_size is not None and
540 self.total_size > _RESUMABLE_UPLOAD_THRESHOLD):
541 strategy = _RESUMABLE_UPLOAD
542 if http_request.body and not upload_config.simple_multipart:
543 strategy = _RESUMABLE_UPLOAD
544 if not upload_config.simple_path:
545 strategy = _RESUMABLE_UPLOAD
546 self.strategy = strategy
547
548 def ConfigureRequest(self, upload_config, http_request, url_builder):
549 """Configure the request and url for this upload."""
550 # Validate total_size vs. max_size
551 if (self.total_size and upload_config.max_size and
552 self.total_size > upload_config.max_size):
553 raise exceptions.InvalidUserInputError(
554 'Upload too big: %s larger than max size %s' % (
555 self.total_size, upload_config.max_size))
556 # Validate mime type
557 if not mimeparse.best_match(upload_config.accept, self.mime_type):
558 raise exceptions.InvalidUserInputError(
559 'MIME type %s does not match any accepted MIME ranges %s' % (
560 self.mime_type, upload_config.accept))
561
562 self.__SetDefaultUploadStrategy(upload_config, http_request)
563 if self.strategy == _SIMPLE_UPLOAD:
564 url_builder.relative_path = upload_config.simple_path
565 if http_request.body:
566 url_builder.query_params['uploadType'] = 'multipart'
567 self.__ConfigureMultipartRequest(http_request)
568 else:
569 url_builder.query_params['uploadType'] = 'media'
570 self.__ConfigureMediaRequest(http_request)
571 else:
572 url_builder.relative_path = upload_config.resumable_path
573 url_builder.query_params['uploadType'] = 'resumable'
574 self.__ConfigureResumableRequest(http_request)
575
576 def __ConfigureMediaRequest(self, http_request):
577 """Configure http_request as a simple request for this upload."""
578 http_request.headers['content-type'] = self.mime_type
579 http_request.body = self.stream.read()
580 http_request.loggable_body = '<media body>'
581
582 def __ConfigureMultipartRequest(self, http_request):
583 """Configure http_request as a multipart request for this upload."""
584 # This is a multipart/related upload.
585 msg_root = mime_multipart.MIMEMultipart('related')
586 # msg_root should not write out its own headers
587 setattr(msg_root, '_write_headers', lambda self: None)
588
589 # attach the body as one part
590 msg = mime_nonmultipart.MIMENonMultipart(
591 *http_request.headers['content-type'].split('/'))
592 msg.set_payload(http_request.body)
593 msg_root.attach(msg)
594
595 # attach the media as the second part
596 msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/'))
597 msg['Content-Transfer-Encoding'] = 'binary'
598 msg.set_payload(self.stream.read())
599 msg_root.attach(msg)
600
601 # encode the body: note that we can't use `as_string`, because
602 # it plays games with `From ` lines.
603 fp = StringIO.StringIO()
604 g = email_generator.Generator(fp, mangle_from_=False)
605 g.flatten(msg_root, unixfrom=False)
606 http_request.body = fp.getvalue()
607
608 multipart_boundary = msg_root.get_boundary()
609 http_request.headers['content-type'] = (
610 'multipart/related; boundary=%r' % multipart_boundary)
611
612 body_components = http_request.body.split(multipart_boundary)
613 headers, _, _ = body_components[-2].partition('\n\n')
614 body_components[-2] = '\n\n'.join([headers, '<media body>\n\n--'])
615 http_request.loggable_body = multipart_boundary.join(body_components)
616
617 def __ConfigureResumableRequest(self, http_request):
618 http_request.headers['X-Upload-Content-Type'] = self.mime_type
619 if self.total_size is not None:
620 http_request.headers['X-Upload-Content-Length'] = str(self.total_size)
621
622 def RefreshResumableUploadState(self):
623 """Talk to the server and refresh the state of this resumable upload.
624
625 Returns:
626 Response if the upload is complete.
627 """
628 if self.strategy != _RESUMABLE_UPLOAD:
629 return
630 self.EnsureInitialized()
631 refresh_request = http_wrapper.Request(
632 url=self.url, http_method='PUT', headers={'Content-Range': 'bytes */*'})
633 refresh_response = http_wrapper.MakeRequest(
634 self.http, refresh_request, redirections=0)
635 range_header = self._GetRangeHeaderFromResponse(refresh_response)
636 if refresh_response.status_code in (httplib.OK, httplib.CREATED):
637 self.__complete = True
638 self.__progress = self.total_size
639 self.stream.seek(self.progress)
640 # If we're finished, the refresh response will contain the metadata
641 # originally requested. Cache it so it can be returned in StreamInChunks.
642 self.__final_response = refresh_response
643 elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE:
644 if range_header is None:
645 self.__progress = 0
646 else:
647 self.__progress = self.__GetLastByte(range_header) + 1
648 self.stream.seek(self.progress)
649 else:
650 raise exceptions.HttpError.FromResponse(refresh_response)
651
652 def _GetRangeHeaderFromResponse(self, response):
653 return response.info.get('Range', response.info.get('range'))
654
655 def InitializeUpload(self, http_request, http=None, client=None):
656 """Initialize this upload from the given http_request."""
657 if self.strategy is None:
658 raise exceptions.UserError(
659 'No upload strategy set; did you call ConfigureRequest?')
660 if http is None and client is None:
661 raise exceptions.UserError('Must provide client or http.')
662 if self.strategy != _RESUMABLE_UPLOAD:
663 return
664 if self.total_size is None:
665 raise exceptions.InvalidUserInputError(
666 'Cannot stream upload without total size')
667 http = http or client.http
668 if client is not None:
669 http_request.url = client.FinalizeTransferUrl(http_request.url)
670 self.EnsureUninitialized()
671 http_response = http_wrapper.MakeRequest(http, http_request)
672 if http_response.status_code != httplib.OK:
673 raise exceptions.HttpError.FromResponse(http_response)
674
675 self.__server_chunk_granularity = http_response.info.get(
676 'X-Goog-Upload-Chunk-Granularity')
677 self.__ValidateChunksize()
678 url = http_response.info['location']
679 if client is not None:
680 url = client.FinalizeTransferUrl(url)
681 self._Initialize(http, url)
682
683 # Unless the user has requested otherwise, we want to just
684 # go ahead and pump the bytes now.
685 if self.auto_transfer:
686 return self.StreamInChunks()
687
688 def __GetLastByte(self, range_header):
689 _, _, end = range_header.partition('-')
690 # TODO: Validate start == 0?
691 return int(end)
692
693 def __ValidateChunksize(self, chunksize=None):
694 if self.__server_chunk_granularity is None:
695 return
696 chunksize = chunksize or self.chunksize
697 if chunksize % self.__server_chunk_granularity:
698 raise exceptions.ConfigurationValueError(
699 'Server requires chunksize to be a multiple of %d',
700 self.__server_chunk_granularity)
701
702 @staticmethod
703 def _ArgPrinter(response, unused_upload):
704 print 'Sent %s' % response.info['range']
705
706 @staticmethod
707 def _CompletePrinter(*unused_args):
708 print 'Upload complete'
709
710 def StreamInChunks(self, callback=None, finish_callback=None,
711 additional_headers=None):
712 """Send this (resumable) upload in chunks."""
713 if self.strategy != _RESUMABLE_UPLOAD:
714 raise exceptions.InvalidUserInputError(
715 'Cannot stream non-resumable upload')
716 if self.total_size is None:
717 raise exceptions.InvalidUserInputError(
718 'Cannot stream upload without total size')
719 callback = callback or self._ArgPrinter
720 finish_callback = finish_callback or self._CompletePrinter
721 # final_response is set if we resumed an already-completed upload.
722 response = self.__final_response
723 self.__ValidateChunksize(self.chunksize)
724 self.EnsureInitialized()
725 while not self.complete:
726 response = self.__SendChunk(self.stream.tell(),
727 additional_headers=additional_headers)
728 if response.status_code in (httplib.OK, httplib.CREATED):
729 self.__complete = True
730 break
731 self.__progress = self.__GetLastByte(response.info['range'])
732 if self.progress + 1 != self.stream.tell():
733 # TODO: Add a better way to recover here.
734 raise exceptions.CommunicationError(
735 'Failed to transfer all bytes in chunk, upload paused at byte '
736 '%d' % self.progress)
737 self._ExecuteCallback(callback, response)
738 if self.__complete:
739 # TODO: Decide how to handle errors in the non-seekable case.
740 current_pos = self.stream.tell()
741 self.stream.seek(0, os.SEEK_END)
742 end_pos = self.stream.tell()
743 self.stream.seek(current_pos)
744 if current_pos != end_pos:
745 raise exceptions.TransferInvalidError(
746 'Upload complete with %s additional bytes left in stream' %
747 (long(end_pos) - long(current_pos)))
748 self._ExecuteCallback(finish_callback, response)
749 return response
750
751 def __SendChunk(self, start, additional_headers=None):
752 """Send the specified chunk."""
753 self.EnsureInitialized()
754 end = min(start + self.chunksize, self.total_size)
755 body_stream = stream_slice.StreamSlice(self.stream, end - start)
756 # TODO: Think about clearer errors on "no data in stream".
757
758 request = http_wrapper.Request(url=self.url, http_method='PUT',
759 body=body_stream)
760 request.headers['Content-Type'] = self.mime_type
761 request.headers['Content-Range'] = 'bytes %s-%s/%s' % (
762 start, end - 1, self.total_size)
763 if additional_headers:
764 request.headers.update(additional_headers)
765
766 response = http_wrapper.MakeRequest(
767 self.bytes_http, request, retry_func=self.retry_func)
768 if response.status_code not in (httplib.OK, httplib.CREATED,
769 http_wrapper.RESUME_INCOMPLETE):
770 # We want to reset our state to wherever the server left us
771 # before this failed request, and then raise.
772 self.RefreshResumableUploadState()
773 raise exceptions.HttpError.FromResponse(response)
774 if response.status_code == http_wrapper.RESUME_INCOMPLETE:
775 last_byte = self.__GetLastByte(
776 self._GetRangeHeaderFromResponse(response))
777 if last_byte + 1 != end:
778 self.stream.seek(last_byte)
779 return response
OLDNEW
« no previous file with comments | « gslib/third_party/storage_apitools/stream_slice.py ('k') | gslib/third_party/storage_apitools/util.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698