Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(244)

Side by Side Diff: third_party/google-endpoints/apitools/base/py/transfer.py

Issue 2666783008: Add google-endpoints to third_party/. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 #
3 # Copyright 2015 Google Inc.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 """Upload and download support for apitools."""
18 from __future__ import print_function
19
20 import email.generator as email_generator
21 import email.mime.multipart as mime_multipart
22 import email.mime.nonmultipart as mime_nonmultipart
23 import io
24 import json
25 import mimetypes
26 import os
27 import threading
28
29 import six
30 from six.moves import http_client
31
32 from apitools.base.py import buffered_stream
33 from apitools.base.py import exceptions
34 from apitools.base.py import http_wrapper
35 from apitools.base.py import stream_slice
36 from apitools.base.py import util
37
38 __all__ = [
39 'Download',
40 'Upload',
41 'RESUMABLE_UPLOAD',
42 'SIMPLE_UPLOAD',
43 'DownloadProgressPrinter',
44 'DownloadCompletePrinter',
45 'UploadProgressPrinter',
46 'UploadCompletePrinter',
47 ]
48
49 _RESUMABLE_UPLOAD_THRESHOLD = 5 << 20
50 SIMPLE_UPLOAD = 'simple'
51 RESUMABLE_UPLOAD = 'resumable'
52
53
54 def DownloadProgressPrinter(response, unused_download):
55 """Print download progress based on response."""
56 if 'content-range' in response.info:
57 print('Received %s' % response.info['content-range'])
58 else:
59 print('Received %d bytes' % response.length)
60
61
62 def DownloadCompletePrinter(unused_response, unused_download):
63 """Print information about a completed download."""
64 print('Download complete')
65
66
67 def UploadProgressPrinter(response, unused_upload):
68 """Print upload progress based on response."""
69 print('Sent %s' % response.info['range'])
70
71
72 def UploadCompletePrinter(unused_response, unused_upload):
73 """Print information about a completed upload."""
74 print('Upload complete')
75
76
77 class _Transfer(object):
78
79 """Generic bits common to Uploads and Downloads."""
80
81 def __init__(self, stream, close_stream=False, chunksize=None,
82 auto_transfer=True, http=None, num_retries=5):
83 self.__bytes_http = None
84 self.__close_stream = close_stream
85 self.__http = http
86 self.__stream = stream
87 self.__url = None
88
89 self.__num_retries = 5
90 # Let the @property do validation
91 self.num_retries = num_retries
92
93 self.retry_func = (
94 http_wrapper.HandleExceptionsAndRebuildHttpConnections)
95 self.auto_transfer = auto_transfer
96 self.chunksize = chunksize or 1048576
97
98 def __repr__(self):
99 return str(self)
100
101 @property
102 def close_stream(self):
103 return self.__close_stream
104
105 @property
106 def http(self):
107 return self.__http
108
109 @property
110 def bytes_http(self):
111 return self.__bytes_http or self.http
112
113 @bytes_http.setter
114 def bytes_http(self, value):
115 self.__bytes_http = value
116
117 @property
118 def num_retries(self):
119 return self.__num_retries
120
121 @num_retries.setter
122 def num_retries(self, value):
123 util.Typecheck(value, six.integer_types)
124 if value < 0:
125 raise exceptions.InvalidDataError(
126 'Cannot have negative value for num_retries')
127 self.__num_retries = value
128
129 @property
130 def stream(self):
131 return self.__stream
132
133 @property
134 def url(self):
135 return self.__url
136
137 def _Initialize(self, http, url):
138 """Initialize this download by setting self.http and self.url.
139
140 We want the user to be able to override self.http by having set
141 the value in the constructor; in that case, we ignore the provided
142 http.
143
144 Args:
145 http: An httplib2.Http instance or None.
146 url: The url for this transfer.
147
148 Returns:
149 None. Initializes self.
150 """
151 self.EnsureUninitialized()
152 if self.http is None:
153 self.__http = http or http_wrapper.GetHttp()
154 self.__url = url
155
156 @property
157 def initialized(self):
158 return self.url is not None and self.http is not None
159
160 @property
161 def _type_name(self):
162 return type(self).__name__
163
164 def EnsureInitialized(self):
165 if not self.initialized:
166 raise exceptions.TransferInvalidError(
167 'Cannot use uninitialized %s', self._type_name)
168
169 def EnsureUninitialized(self):
170 if self.initialized:
171 raise exceptions.TransferInvalidError(
172 'Cannot re-initialize %s', self._type_name)
173
174 def __del__(self):
175 if self.__close_stream:
176 self.__stream.close()
177
178 def _ExecuteCallback(self, callback, response):
179 # TODO(craigcitro): Push these into a queue.
180 if callback is not None:
181 threading.Thread(target=callback, args=(response, self)).start()
182
183
184 class Download(_Transfer):
185
186 """Data for a single download.
187
188 Public attributes:
189 chunksize: default chunksize to use for transfers.
190 """
191 _ACCEPTABLE_STATUSES = set((
192 http_client.OK,
193 http_client.NO_CONTENT,
194 http_client.PARTIAL_CONTENT,
195 http_client.REQUESTED_RANGE_NOT_SATISFIABLE,
196 ))
197 _REQUIRED_SERIALIZATION_KEYS = set((
198 'auto_transfer', 'progress', 'total_size', 'url'))
199
200 def __init__(self, stream, progress_callback=None, finish_callback=None,
201 **kwds):
202 total_size = kwds.pop('total_size', None)
203 super(Download, self).__init__(stream, **kwds)
204 self.__initial_response = None
205 self.__progress = 0
206 self.__total_size = total_size
207 self.__encoding = None
208
209 self.progress_callback = progress_callback
210 self.finish_callback = finish_callback
211
212 @property
213 def progress(self):
214 return self.__progress
215
216 @property
217 def encoding(self):
218 return self.__encoding
219
220 @classmethod
221 def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds):
222 """Create a new download object from a filename."""
223 path = os.path.expanduser(filename)
224 if os.path.exists(path) and not overwrite:
225 raise exceptions.InvalidUserInputError(
226 'File %s exists and overwrite not specified' % path)
227 return cls(open(path, 'wb'), close_stream=True,
228 auto_transfer=auto_transfer, **kwds)
229
230 @classmethod
231 def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds):
232 """Create a new Download object from a stream."""
233 return cls(stream, auto_transfer=auto_transfer, total_size=total_size,
234 **kwds)
235
236 @classmethod
237 def FromData(cls, stream, json_data, http=None, auto_transfer=None,
238 **kwds):
239 """Create a new Download object from a stream and serialized data."""
240 info = json.loads(json_data)
241 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
242 if missing_keys:
243 raise exceptions.InvalidDataError(
244 'Invalid serialization data, missing keys: %s' % (
245 ', '.join(missing_keys)))
246 download = cls.FromStream(stream, **kwds)
247 if auto_transfer is not None:
248 download.auto_transfer = auto_transfer
249 else:
250 download.auto_transfer = info['auto_transfer']
251 setattr(download, '_Download__progress', info['progress'])
252 setattr(download, '_Download__total_size', info['total_size'])
253 download._Initialize( # pylint: disable=protected-access
254 http, info['url'])
255 return download
256
257 @property
258 def serialization_data(self):
259 self.EnsureInitialized()
260 return {
261 'auto_transfer': self.auto_transfer,
262 'progress': self.progress,
263 'total_size': self.total_size,
264 'url': self.url,
265 }
266
267 @property
268 def total_size(self):
269 return self.__total_size
270
271 def __str__(self):
272 if not self.initialized:
273 return 'Download (uninitialized)'
274 else:
275 return 'Download with %d/%s bytes transferred from url %s' % (
276 self.progress, self.total_size, self.url)
277
278 def ConfigureRequest(self, http_request, url_builder):
279 url_builder.query_params['alt'] = 'media'
280 # TODO(craigcitro): We need to send range requests because by
281 # default httplib2 stores entire reponses in memory. Override
282 # httplib2's download method (as gsutil does) so that this is not
283 # necessary.
284 http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,)
285
286 def __SetTotal(self, info):
287 if 'content-range' in info:
288 _, _, total = info['content-range'].rpartition('/')
289 if total != '*':
290 self.__total_size = int(total)
291 # Note "total_size is None" means we don't know it; if no size
292 # info was returned on our initial range request, that means we
293 # have a 0-byte file. (That last statement has been verified
294 # empirically, but is not clearly documented anywhere.)
295 if self.total_size is None:
296 self.__total_size = 0
297
298 def InitializeDownload(self, http_request, http=None, client=None):
299 """Initialize this download by making a request.
300
301 Args:
302 http_request: The HttpRequest to use to initialize this download.
303 http: The httplib2.Http instance for this request.
304 client: If provided, let this client process the final URL before
305 sending any additional requests. If client is provided and
306 http is not, client.http will be used instead.
307 """
308 self.EnsureUninitialized()
309 if http is None and client is None:
310 raise exceptions.UserError('Must provide client or http.')
311 http = http or client.http
312 if client is not None:
313 http_request.url = client.FinalizeTransferUrl(http_request.url)
314 url = http_request.url
315 if self.auto_transfer:
316 end_byte = self.__ComputeEndByte(0)
317 self.__SetRangeHeader(http_request, 0, end_byte)
318 response = http_wrapper.MakeRequest(
319 self.bytes_http or http, http_request)
320 if response.status_code not in self._ACCEPTABLE_STATUSES:
321 raise exceptions.HttpError.FromResponse(response)
322 self.__initial_response = response
323 self.__SetTotal(response.info)
324 url = response.info.get('content-location', response.request_url)
325 if client is not None:
326 url = client.FinalizeTransferUrl(url)
327 self._Initialize(http, url)
328 # Unless the user has requested otherwise, we want to just
329 # go ahead and pump the bytes now.
330 if self.auto_transfer:
331 self.StreamInChunks()
332
333 def __NormalizeStartEnd(self, start, end=None):
334 if end is not None:
335 if start < 0:
336 raise exceptions.TransferInvalidError(
337 'Cannot have end index with negative start index')
338 elif start >= self.total_size:
339 raise exceptions.TransferInvalidError(
340 'Cannot have start index greater than total size')
341 end = min(end, self.total_size - 1)
342 if end < start:
343 raise exceptions.TransferInvalidError(
344 'Range requested with end[%s] < start[%s]' % (end, start))
345 return start, end
346 else:
347 if start < 0:
348 start = max(0, start + self.total_size)
349 return start, self.total_size - 1
350
351 def __SetRangeHeader(self, request, start, end=None):
352 if start < 0:
353 request.headers['range'] = 'bytes=%d' % start
354 elif end is None:
355 request.headers['range'] = 'bytes=%d-' % start
356 else:
357 request.headers['range'] = 'bytes=%d-%d' % (start, end)
358
359 def __ComputeEndByte(self, start, end=None, use_chunks=True):
360 """Compute the last byte to fetch for this request.
361
362 This is all based on the HTTP spec for Range and
363 Content-Range.
364
365 Note that this is potentially confusing in several ways:
366 * the value for the last byte is 0-based, eg "fetch 10 bytes
367 from the beginning" would return 9 here.
368 * if we have no information about size, and don't want to
369 use the chunksize, we'll return None.
370 See the tests for more examples.
371
372 Args:
373 start: byte to start at.
374 end: (int or None, default: None) Suggested last byte.
375 use_chunks: (bool, default: True) If False, ignore self.chunksize.
376
377 Returns:
378 Last byte to use in a Range header, or None.
379
380 """
381 end_byte = end
382
383 if start < 0 and not self.total_size:
384 return end_byte
385
386 if use_chunks:
387 alternate = start + self.chunksize - 1
388 if end_byte is not None:
389 end_byte = min(end_byte, alternate)
390 else:
391 end_byte = alternate
392
393 if self.total_size:
394 alternate = self.total_size - 1
395 if end_byte is not None:
396 end_byte = min(end_byte, alternate)
397 else:
398 end_byte = alternate
399
400 return end_byte
401
402 def __GetChunk(self, start, end, additional_headers=None):
403 """Retrieve a chunk, and return the full response."""
404 self.EnsureInitialized()
405 request = http_wrapper.Request(url=self.url)
406 self.__SetRangeHeader(request, start, end=end)
407 if additional_headers is not None:
408 request.headers.update(additional_headers)
409 return http_wrapper.MakeRequest(
410 self.bytes_http, request, retry_func=self.retry_func,
411 retries=self.num_retries)
412
413 def __ProcessResponse(self, response):
414 """Process response (by updating self and writing to self.stream)."""
415 if response.status_code not in self._ACCEPTABLE_STATUSES:
416 # We distinguish errors that mean we made a mistake in setting
417 # up the transfer versus something we should attempt again.
418 if response.status_code in (http_client.FORBIDDEN,
419 http_client.NOT_FOUND):
420 raise exceptions.HttpError.FromResponse(response)
421 else:
422 raise exceptions.TransferRetryError(response.content)
423 if response.status_code in (http_client.OK,
424 http_client.PARTIAL_CONTENT):
425 self.stream.write(response.content)
426 self.__progress += response.length
427 if response.info and 'content-encoding' in response.info:
428 # TODO(craigcitro): Handle the case where this changes over a
429 # download.
430 self.__encoding = response.info['content-encoding']
431 elif response.status_code == http_client.NO_CONTENT:
432 # It's important to write something to the stream for the case
433 # of a 0-byte download to a file, as otherwise python won't
434 # create the file.
435 self.stream.write('')
436 return response
437
438 def GetRange(self, start, end=None, additional_headers=None,
439 use_chunks=True):
440 """Retrieve a given byte range from this download, inclusive.
441
442 Range must be of one of these three forms:
443 * 0 <= start, end = None: Fetch from start to the end of the file.
444 * 0 <= start <= end: Fetch the bytes from start to end.
445 * start < 0, end = None: Fetch the last -start bytes of the file.
446
447 (These variations correspond to those described in the HTTP 1.1
448 protocol for range headers in RFC 2616, sec. 14.35.1.)
449
450 Args:
451 start: (int) Where to start fetching bytes. (See above.)
452 end: (int, optional) Where to stop fetching bytes. (See above.)
453 additional_headers: (bool, optional) Any additional headers to
454 pass with the request.
455 use_chunks: (bool, default: True) If False, ignore self.chunksize
456 and fetch this range in a single request.
457
458 Returns:
459 None. Streams bytes into self.stream.
460 """
461 self.EnsureInitialized()
462 progress_end_normalized = False
463 if self.total_size is not None:
464 progress, end_byte = self.__NormalizeStartEnd(start, end)
465 progress_end_normalized = True
466 else:
467 progress = start
468 end_byte = end
469 while (not progress_end_normalized or end_byte is None or
470 progress <= end_byte):
471 end_byte = self.__ComputeEndByte(progress, end=end_byte,
472 use_chunks=use_chunks)
473 response = self.__GetChunk(progress, end_byte,
474 additional_headers=additional_headers)
475 if not progress_end_normalized:
476 self.__SetTotal(response.info)
477 progress, end_byte = self.__NormalizeStartEnd(start, end)
478 progress_end_normalized = True
479 response = self.__ProcessResponse(response)
480 progress += response.length
481 if response.length == 0:
482 raise exceptions.TransferRetryError(
483 'Zero bytes unexpectedly returned in download response')
484
485 def StreamInChunks(self, callback=None, finish_callback=None,
486 additional_headers=None):
487 """Stream the entire download in chunks."""
488 self.StreamMedia(callback=callback, finish_callback=finish_callback,
489 additional_headers=additional_headers,
490 use_chunks=True)
491
492 def StreamMedia(self, callback=None, finish_callback=None,
493 additional_headers=None, use_chunks=True):
494 """Stream the entire download.
495
496 Args:
497 callback: (default: None) Callback to call as each chunk is
498 completed.
499 finish_callback: (default: None) Callback to call when the
500 download is complete.
501 additional_headers: (default: None) Additional headers to
502 include in fetching bytes.
503 use_chunks: (bool, default: True) If False, ignore self.chunksize
504 and stream this download in a single request.
505
506 Returns:
507 None. Streams bytes into self.stream.
508 """
509 callback = callback or self.progress_callback
510 finish_callback = finish_callback or self.finish_callback
511
512 self.EnsureInitialized()
513 while True:
514 if self.__initial_response is not None:
515 response = self.__initial_response
516 self.__initial_response = None
517 else:
518 end_byte = self.__ComputeEndByte(self.progress,
519 use_chunks=use_chunks)
520 response = self.__GetChunk(
521 self.progress, end_byte,
522 additional_headers=additional_headers)
523 if self.total_size is None:
524 self.__SetTotal(response.info)
525 response = self.__ProcessResponse(response)
526 self._ExecuteCallback(callback, response)
527 if (response.status_code == http_client.OK or
528 self.progress >= self.total_size):
529 break
530 self._ExecuteCallback(finish_callback, response)
531
532
533 class Upload(_Transfer):
534
535 """Data for a single Upload.
536
537 Fields:
538 stream: The stream to upload.
539 mime_type: MIME type of the upload.
540 total_size: (optional) Total upload size for the stream.
541 close_stream: (default: False) Whether or not we should close the
542 stream when finished with the upload.
543 auto_transfer: (default: True) If True, stream all bytes as soon as
544 the upload is created.
545 """
546 _REQUIRED_SERIALIZATION_KEYS = set((
547 'auto_transfer', 'mime_type', 'total_size', 'url'))
548
549 def __init__(self, stream, mime_type, total_size=None, http=None,
550 close_stream=False, chunksize=None, auto_transfer=True,
551 progress_callback=None, finish_callback=None,
552 **kwds):
553 super(Upload, self).__init__(
554 stream, close_stream=close_stream, chunksize=chunksize,
555 auto_transfer=auto_transfer, http=http, **kwds)
556 self.__complete = False
557 self.__final_response = None
558 self.__mime_type = mime_type
559 self.__progress = 0
560 self.__server_chunk_granularity = None
561 self.__strategy = None
562 self.__total_size = None
563
564 self.progress_callback = progress_callback
565 self.finish_callback = finish_callback
566 self.total_size = total_size
567
568 @property
569 def progress(self):
570 return self.__progress
571
572 @classmethod
573 def FromFile(cls, filename, mime_type=None, auto_transfer=True, **kwds):
574 """Create a new Upload object from a filename."""
575 path = os.path.expanduser(filename)
576 if not os.path.exists(path):
577 raise exceptions.NotFoundError('Could not find file %s' % path)
578 if not mime_type:
579 mime_type, _ = mimetypes.guess_type(path)
580 if mime_type is None:
581 raise exceptions.InvalidUserInputError(
582 'Could not guess mime type for %s' % path)
583 size = os.stat(path).st_size
584 return cls(open(path, 'rb'), mime_type, total_size=size,
585 close_stream=True, auto_transfer=auto_transfer, **kwds)
586
587 @classmethod
588 def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True,
589 **kwds):
590 """Create a new Upload object from a stream."""
591 if mime_type is None:
592 raise exceptions.InvalidUserInputError(
593 'No mime_type specified for stream')
594 return cls(stream, mime_type, total_size=total_size,
595 close_stream=False, auto_transfer=auto_transfer, **kwds)
596
597 @classmethod
598 def FromData(cls, stream, json_data, http, auto_transfer=None, **kwds):
599 """Create a new Upload of stream from serialized json_data and http."""
600 info = json.loads(json_data)
601 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
602 if missing_keys:
603 raise exceptions.InvalidDataError(
604 'Invalid serialization data, missing keys: %s' % (
605 ', '.join(missing_keys)))
606 if 'total_size' in kwds:
607 raise exceptions.InvalidUserInputError(
608 'Cannot override total_size on serialized Upload')
609 upload = cls.FromStream(stream, info['mime_type'],
610 total_size=info.get('total_size'), **kwds)
611 if isinstance(stream, io.IOBase) and not stream.seekable():
612 raise exceptions.InvalidUserInputError(
613 'Cannot restart resumable upload on non-seekable stream')
614 if auto_transfer is not None:
615 upload.auto_transfer = auto_transfer
616 else:
617 upload.auto_transfer = info['auto_transfer']
618 upload.strategy = RESUMABLE_UPLOAD
619 upload._Initialize( # pylint: disable=protected-access
620 http, info['url'])
621 upload.RefreshResumableUploadState()
622 upload.EnsureInitialized()
623 if upload.auto_transfer:
624 upload.StreamInChunks()
625 return upload
626
627 @property
628 def serialization_data(self):
629 self.EnsureInitialized()
630 if self.strategy != RESUMABLE_UPLOAD:
631 raise exceptions.InvalidDataError(
632 'Serialization only supported for resumable uploads')
633 return {
634 'auto_transfer': self.auto_transfer,
635 'mime_type': self.mime_type,
636 'total_size': self.total_size,
637 'url': self.url,
638 }
639
640 @property
641 def complete(self):
642 return self.__complete
643
644 @property
645 def mime_type(self):
646 return self.__mime_type
647
648 def __str__(self):
649 if not self.initialized:
650 return 'Upload (uninitialized)'
651 else:
652 return 'Upload with %d/%s bytes transferred for url %s' % (
653 self.progress, self.total_size or '???', self.url)
654
655 @property
656 def strategy(self):
657 return self.__strategy
658
659 @strategy.setter
660 def strategy(self, value):
661 if value not in (SIMPLE_UPLOAD, RESUMABLE_UPLOAD):
662 raise exceptions.UserError((
663 'Invalid value "%s" for upload strategy, must be one of '
664 '"simple" or "resumable".') % value)
665 self.__strategy = value
666
667 @property
668 def total_size(self):
669 return self.__total_size
670
671 @total_size.setter
672 def total_size(self, value):
673 self.EnsureUninitialized()
674 self.__total_size = value
675
676 def __SetDefaultUploadStrategy(self, upload_config, http_request):
677 """Determine and set the default upload strategy for this upload.
678
679 We generally prefer simple or multipart, unless we're forced to
680 use resumable. This happens when any of (1) the upload is too
681 large, (2) the simple endpoint doesn't support multipart requests
682 and we have metadata, or (3) there is no simple upload endpoint.
683
684 Args:
685 upload_config: Configuration for the upload endpoint.
686 http_request: The associated http request.
687
688 Returns:
689 None.
690 """
691 if upload_config.resumable_path is None:
692 self.strategy = SIMPLE_UPLOAD
693 if self.strategy is not None:
694 return
695 strategy = SIMPLE_UPLOAD
696 if (self.total_size is not None and
697 self.total_size > _RESUMABLE_UPLOAD_THRESHOLD):
698 strategy = RESUMABLE_UPLOAD
699 if http_request.body and not upload_config.simple_multipart:
700 strategy = RESUMABLE_UPLOAD
701 if not upload_config.simple_path:
702 strategy = RESUMABLE_UPLOAD
703 self.strategy = strategy
704
705 def ConfigureRequest(self, upload_config, http_request, url_builder):
706 """Configure the request and url for this upload."""
707 # Validate total_size vs. max_size
708 if (self.total_size and upload_config.max_size and
709 self.total_size > upload_config.max_size):
710 raise exceptions.InvalidUserInputError(
711 'Upload too big: %s larger than max size %s' % (
712 self.total_size, upload_config.max_size))
713 # Validate mime type
714 if not util.AcceptableMimeType(upload_config.accept, self.mime_type):
715 raise exceptions.InvalidUserInputError(
716 'MIME type %s does not match any accepted MIME ranges %s' % (
717 self.mime_type, upload_config.accept))
718
719 self.__SetDefaultUploadStrategy(upload_config, http_request)
720 if self.strategy == SIMPLE_UPLOAD:
721 url_builder.relative_path = upload_config.simple_path
722 if http_request.body:
723 url_builder.query_params['uploadType'] = 'multipart'
724 self.__ConfigureMultipartRequest(http_request)
725 else:
726 url_builder.query_params['uploadType'] = 'media'
727 self.__ConfigureMediaRequest(http_request)
728 else:
729 url_builder.relative_path = upload_config.resumable_path
730 url_builder.query_params['uploadType'] = 'resumable'
731 self.__ConfigureResumableRequest(http_request)
732
733 def __ConfigureMediaRequest(self, http_request):
734 """Configure http_request as a simple request for this upload."""
735 http_request.headers['content-type'] = self.mime_type
736 http_request.body = self.stream.read()
737 http_request.loggable_body = '<media body>'
738
739 def __ConfigureMultipartRequest(self, http_request):
740 """Configure http_request as a multipart request for this upload."""
741 # This is a multipart/related upload.
742 msg_root = mime_multipart.MIMEMultipart('related')
743 # msg_root should not write out its own headers
744 setattr(msg_root, '_write_headers', lambda self: None)
745
746 # attach the body as one part
747 msg = mime_nonmultipart.MIMENonMultipart(
748 *http_request.headers['content-type'].split('/'))
749 msg.set_payload(http_request.body)
750 msg_root.attach(msg)
751
752 # attach the media as the second part
753 msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/'))
754 msg['Content-Transfer-Encoding'] = 'binary'
755 msg.set_payload(self.stream.read())
756 msg_root.attach(msg)
757
758 # NOTE: We encode the body, but can't use
759 # `email.message.Message.as_string` because it prepends
760 # `> ` to `From ` lines.
761 # NOTE: We must use six.StringIO() instead of io.StringIO() since the
762 # `email` library uses cStringIO in Py2 and io.StringIO in Py3.
763 fp = six.StringIO()
764 g = email_generator.Generator(fp, mangle_from_=False)
765 g.flatten(msg_root, unixfrom=False)
766 http_request.body = fp.getvalue()
767
768 multipart_boundary = msg_root.get_boundary()
769 http_request.headers['content-type'] = (
770 'multipart/related; boundary=%r' % multipart_boundary)
771
772 body_components = http_request.body.split(multipart_boundary)
773 headers, _, _ = body_components[-2].partition('\n\n')
774 body_components[-2] = '\n\n'.join([headers, '<media body>\n\n--'])
775 http_request.loggable_body = multipart_boundary.join(body_components)
776
777 def __ConfigureResumableRequest(self, http_request):
778 http_request.headers['X-Upload-Content-Type'] = self.mime_type
779 if self.total_size is not None:
780 http_request.headers[
781 'X-Upload-Content-Length'] = str(self.total_size)
782
783 def RefreshResumableUploadState(self):
784 """Talk to the server and refresh the state of this resumable upload.
785
786 Returns:
787 Response if the upload is complete.
788 """
789 if self.strategy != RESUMABLE_UPLOAD:
790 return
791 self.EnsureInitialized()
792 refresh_request = http_wrapper.Request(
793 url=self.url, http_method='PUT',
794 headers={'Content-Range': 'bytes */*'})
795 refresh_response = http_wrapper.MakeRequest(
796 self.http, refresh_request, redirections=0,
797 retries=self.num_retries)
798 range_header = self._GetRangeHeaderFromResponse(refresh_response)
799 if refresh_response.status_code in (http_client.OK,
800 http_client.CREATED):
801 self.__complete = True
802 self.__progress = self.total_size
803 self.stream.seek(self.progress)
804 # If we're finished, the refresh response will contain the metadata
805 # originally requested. Cache it so it can be returned in
806 # StreamInChunks.
807 self.__final_response = refresh_response
808 elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE:
809 if range_header is None:
810 self.__progress = 0
811 else:
812 self.__progress = self.__GetLastByte(range_header) + 1
813 self.stream.seek(self.progress)
814 else:
815 raise exceptions.HttpError.FromResponse(refresh_response)
816
817 def _GetRangeHeaderFromResponse(self, response):
818 return response.info.get('Range', response.info.get('range'))
819
820 def InitializeUpload(self, http_request, http=None, client=None):
821 """Initialize this upload from the given http_request."""
822 if self.strategy is None:
823 raise exceptions.UserError(
824 'No upload strategy set; did you call ConfigureRequest?')
825 if http is None and client is None:
826 raise exceptions.UserError('Must provide client or http.')
827 if self.strategy != RESUMABLE_UPLOAD:
828 return
829 http = http or client.http
830 if client is not None:
831 http_request.url = client.FinalizeTransferUrl(http_request.url)
832 self.EnsureUninitialized()
833 http_response = http_wrapper.MakeRequest(http, http_request,
834 retries=self.num_retries)
835 if http_response.status_code != http_client.OK:
836 raise exceptions.HttpError.FromResponse(http_response)
837
838 self.__server_chunk_granularity = http_response.info.get(
839 'X-Goog-Upload-Chunk-Granularity')
840 url = http_response.info['location']
841 if client is not None:
842 url = client.FinalizeTransferUrl(url)
843 self._Initialize(http, url)
844
845 # Unless the user has requested otherwise, we want to just
846 # go ahead and pump the bytes now.
847 if self.auto_transfer:
848 return self.StreamInChunks()
849 else:
850 return http_response
851
852 def __GetLastByte(self, range_header):
853 _, _, end = range_header.partition('-')
854 # TODO(craigcitro): Validate start == 0?
855 return int(end)
856
857 def __ValidateChunksize(self, chunksize=None):
858 if self.__server_chunk_granularity is None:
859 return
860 chunksize = chunksize or self.chunksize
861 if chunksize % self.__server_chunk_granularity:
862 raise exceptions.ConfigurationValueError(
863 'Server requires chunksize to be a multiple of %d',
864 self.__server_chunk_granularity)
865
866 def __StreamMedia(self, callback=None, finish_callback=None,
867 additional_headers=None, use_chunks=True):
868 """Helper function for StreamMedia / StreamInChunks."""
869 if self.strategy != RESUMABLE_UPLOAD:
870 raise exceptions.InvalidUserInputError(
871 'Cannot stream non-resumable upload')
872 callback = callback or self.progress_callback
873 finish_callback = finish_callback or self.finish_callback
874 # final_response is set if we resumed an already-completed upload.
875 response = self.__final_response
876 send_func = self.__SendChunk if use_chunks else self.__SendMediaBody
877 if use_chunks:
878 self.__ValidateChunksize(self.chunksize)
879 self.EnsureInitialized()
880 while not self.complete:
881 response = send_func(self.stream.tell(),
882 additional_headers=additional_headers)
883 if response.status_code in (http_client.OK, http_client.CREATED):
884 self.__complete = True
885 break
886 self.__progress = self.__GetLastByte(response.info['range'])
887 if self.progress + 1 != self.stream.tell():
888 # TODO(craigcitro): Add a better way to recover here.
889 raise exceptions.CommunicationError(
890 'Failed to transfer all bytes in chunk, upload paused at '
891 'byte %d' % self.progress)
892 self._ExecuteCallback(callback, response)
893 if self.__complete and hasattr(self.stream, 'seek'):
894 current_pos = self.stream.tell()
895 self.stream.seek(0, os.SEEK_END)
896 end_pos = self.stream.tell()
897 self.stream.seek(current_pos)
898 if current_pos != end_pos:
899 raise exceptions.TransferInvalidError(
900 'Upload complete with %s additional bytes left in stream' %
901 (int(end_pos) - int(current_pos)))
902 self._ExecuteCallback(finish_callback, response)
903 return response
904
905 def StreamMedia(self, callback=None, finish_callback=None,
906 additional_headers=None):
907 """Send this resumable upload in a single request.
908
909 Args:
910 callback: Progress callback function with inputs
911 (http_wrapper.Response, transfer.Upload)
912 finish_callback: Final callback function with inputs
913 (http_wrapper.Response, transfer.Upload)
914 additional_headers: Dict of headers to include with the upload
915 http_wrapper.Request.
916
917 Returns:
918 http_wrapper.Response of final response.
919 """
920 return self.__StreamMedia(
921 callback=callback, finish_callback=finish_callback,
922 additional_headers=additional_headers, use_chunks=False)
923
924 def StreamInChunks(self, callback=None, finish_callback=None,
925 additional_headers=None):
926 """Send this (resumable) upload in chunks."""
927 return self.__StreamMedia(
928 callback=callback, finish_callback=finish_callback,
929 additional_headers=additional_headers)
930
931 def __SendMediaRequest(self, request, end):
932 """Request helper function for SendMediaBody & SendChunk."""
933 response = http_wrapper.MakeRequest(
934 self.bytes_http, request, retry_func=self.retry_func,
935 retries=self.num_retries)
936 if response.status_code not in (http_client.OK, http_client.CREATED,
937 http_wrapper.RESUME_INCOMPLETE):
938 # We want to reset our state to wherever the server left us
939 # before this failed request, and then raise.
940 self.RefreshResumableUploadState()
941 raise exceptions.HttpError.FromResponse(response)
942 if response.status_code == http_wrapper.RESUME_INCOMPLETE:
943 last_byte = self.__GetLastByte(
944 self._GetRangeHeaderFromResponse(response))
945 if last_byte + 1 != end:
946 self.stream.seek(last_byte)
947 return response
948
949 def __SendMediaBody(self, start, additional_headers=None):
950 """Send the entire media stream in a single request."""
951 self.EnsureInitialized()
952 if self.total_size is None:
953 raise exceptions.TransferInvalidError(
954 'Total size must be known for SendMediaBody')
955 body_stream = stream_slice.StreamSlice(
956 self.stream, self.total_size - start)
957
958 request = http_wrapper.Request(url=self.url, http_method='PUT',
959 body=body_stream)
960 request.headers['Content-Type'] = self.mime_type
961 if start == self.total_size:
962 # End of an upload with 0 bytes left to send; just finalize.
963 range_string = 'bytes */%s' % self.total_size
964 else:
965 range_string = 'bytes %s-%s/%s' % (start, self.total_size - 1,
966 self.total_size)
967
968 request.headers['Content-Range'] = range_string
969 if additional_headers:
970 request.headers.update(additional_headers)
971
972 return self.__SendMediaRequest(request, self.total_size)
973
974 def __SendChunk(self, start, additional_headers=None):
975 """Send the specified chunk."""
976 self.EnsureInitialized()
977 no_log_body = self.total_size is None
978 if self.total_size is None:
979 # For the streaming resumable case, we need to detect when
980 # we're at the end of the stream.
981 body_stream = buffered_stream.BufferedStream(
982 self.stream, start, self.chunksize)
983 end = body_stream.stream_end_position
984 if body_stream.stream_exhausted:
985 self.__total_size = end
986 # TODO: Here, change body_stream from a stream to a string object,
987 # which means reading a chunk into memory. This works around
988 # https://code.google.com/p/httplib2/issues/detail?id=176 which can
989 # cause httplib2 to skip bytes on 401's for file objects.
990 # Rework this solution to be more general.
991 # pylint: disable=redefined-variable-type
992 body_stream = body_stream.read(self.chunksize)
993 else:
994 end = min(start + self.chunksize, self.total_size)
995 body_stream = stream_slice.StreamSlice(self.stream, end - start)
996 # TODO(craigcitro): Think about clearer errors on "no data in
997 # stream".
998 request = http_wrapper.Request(url=self.url, http_method='PUT',
999 body=body_stream)
1000 request.headers['Content-Type'] = self.mime_type
1001 if no_log_body:
1002 # Disable logging of streaming body.
1003 # TODO: Remove no_log_body and rework as part of a larger logs
1004 # refactor.
1005 request.loggable_body = '<media body>'
1006 if self.total_size is None:
1007 # Streaming resumable upload case, unknown total size.
1008 range_string = 'bytes %s-%s/*' % (start, end - 1)
1009 elif end == start:
1010 # End of an upload with 0 bytes left to send; just finalize.
1011 range_string = 'bytes */%s' % self.total_size
1012 else:
1013 # Normal resumable upload case with known sizes.
1014 range_string = 'bytes %s-%s/%s' % (start, end - 1, self.total_size)
1015
1016 request.headers['Content-Range'] = range_string
1017 if additional_headers:
1018 request.headers.update(additional_headers)
1019
1020 return self.__SendMediaRequest(request, end)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698