OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 # |
| 3 # Copyright 2015 Google Inc. |
| 4 # |
| 5 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 # you may not use this file except in compliance with the License. |
| 7 # You may obtain a copy of the License at |
| 8 # |
| 9 # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 # |
| 11 # Unless required by applicable law or agreed to in writing, software |
| 12 # distributed under the License is distributed on an "AS IS" BASIS, |
| 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 # See the License for the specific language governing permissions and |
| 15 # limitations under the License. |
| 16 |
| 17 """Upload and download support for apitools.""" |
| 18 from __future__ import print_function |
| 19 |
| 20 import email.generator as email_generator |
| 21 import email.mime.multipart as mime_multipart |
| 22 import email.mime.nonmultipart as mime_nonmultipart |
| 23 import io |
| 24 import json |
| 25 import mimetypes |
| 26 import os |
| 27 import threading |
| 28 |
| 29 import six |
| 30 from six.moves import http_client |
| 31 |
| 32 from apitools.base.py import buffered_stream |
| 33 from apitools.base.py import exceptions |
| 34 from apitools.base.py import http_wrapper |
| 35 from apitools.base.py import stream_slice |
| 36 from apitools.base.py import util |
| 37 |
| 38 __all__ = [ |
| 39 'Download', |
| 40 'Upload', |
| 41 'RESUMABLE_UPLOAD', |
| 42 'SIMPLE_UPLOAD', |
| 43 'DownloadProgressPrinter', |
| 44 'DownloadCompletePrinter', |
| 45 'UploadProgressPrinter', |
| 46 'UploadCompletePrinter', |
| 47 ] |
| 48 |
| 49 _RESUMABLE_UPLOAD_THRESHOLD = 5 << 20 |
| 50 SIMPLE_UPLOAD = 'simple' |
| 51 RESUMABLE_UPLOAD = 'resumable' |
| 52 |
| 53 |
| 54 def DownloadProgressPrinter(response, unused_download): |
| 55 """Print download progress based on response.""" |
| 56 if 'content-range' in response.info: |
| 57 print('Received %s' % response.info['content-range']) |
| 58 else: |
| 59 print('Received %d bytes' % response.length) |
| 60 |
| 61 |
| 62 def DownloadCompletePrinter(unused_response, unused_download): |
| 63 """Print information about a completed download.""" |
| 64 print('Download complete') |
| 65 |
| 66 |
| 67 def UploadProgressPrinter(response, unused_upload): |
| 68 """Print upload progress based on response.""" |
| 69 print('Sent %s' % response.info['range']) |
| 70 |
| 71 |
| 72 def UploadCompletePrinter(unused_response, unused_upload): |
| 73 """Print information about a completed upload.""" |
| 74 print('Upload complete') |
| 75 |
| 76 |
| 77 class _Transfer(object): |
| 78 |
| 79 """Generic bits common to Uploads and Downloads.""" |
| 80 |
| 81 def __init__(self, stream, close_stream=False, chunksize=None, |
| 82 auto_transfer=True, http=None, num_retries=5): |
| 83 self.__bytes_http = None |
| 84 self.__close_stream = close_stream |
| 85 self.__http = http |
| 86 self.__stream = stream |
| 87 self.__url = None |
| 88 |
| 89 self.__num_retries = 5 |
| 90 # Let the @property do validation |
| 91 self.num_retries = num_retries |
| 92 |
| 93 self.retry_func = ( |
| 94 http_wrapper.HandleExceptionsAndRebuildHttpConnections) |
| 95 self.auto_transfer = auto_transfer |
| 96 self.chunksize = chunksize or 1048576 |
| 97 |
| 98 def __repr__(self): |
| 99 return str(self) |
| 100 |
| 101 @property |
| 102 def close_stream(self): |
| 103 return self.__close_stream |
| 104 |
| 105 @property |
| 106 def http(self): |
| 107 return self.__http |
| 108 |
| 109 @property |
| 110 def bytes_http(self): |
| 111 return self.__bytes_http or self.http |
| 112 |
| 113 @bytes_http.setter |
| 114 def bytes_http(self, value): |
| 115 self.__bytes_http = value |
| 116 |
| 117 @property |
| 118 def num_retries(self): |
| 119 return self.__num_retries |
| 120 |
| 121 @num_retries.setter |
| 122 def num_retries(self, value): |
| 123 util.Typecheck(value, six.integer_types) |
| 124 if value < 0: |
| 125 raise exceptions.InvalidDataError( |
| 126 'Cannot have negative value for num_retries') |
| 127 self.__num_retries = value |
| 128 |
| 129 @property |
| 130 def stream(self): |
| 131 return self.__stream |
| 132 |
| 133 @property |
| 134 def url(self): |
| 135 return self.__url |
| 136 |
| 137 def _Initialize(self, http, url): |
| 138 """Initialize this download by setting self.http and self.url. |
| 139 |
| 140 We want the user to be able to override self.http by having set |
| 141 the value in the constructor; in that case, we ignore the provided |
| 142 http. |
| 143 |
| 144 Args: |
| 145 http: An httplib2.Http instance or None. |
| 146 url: The url for this transfer. |
| 147 |
| 148 Returns: |
| 149 None. Initializes self. |
| 150 """ |
| 151 self.EnsureUninitialized() |
| 152 if self.http is None: |
| 153 self.__http = http or http_wrapper.GetHttp() |
| 154 self.__url = url |
| 155 |
| 156 @property |
| 157 def initialized(self): |
| 158 return self.url is not None and self.http is not None |
| 159 |
| 160 @property |
| 161 def _type_name(self): |
| 162 return type(self).__name__ |
| 163 |
| 164 def EnsureInitialized(self): |
| 165 if not self.initialized: |
| 166 raise exceptions.TransferInvalidError( |
| 167 'Cannot use uninitialized %s', self._type_name) |
| 168 |
| 169 def EnsureUninitialized(self): |
| 170 if self.initialized: |
| 171 raise exceptions.TransferInvalidError( |
| 172 'Cannot re-initialize %s', self._type_name) |
| 173 |
| 174 def __del__(self): |
| 175 if self.__close_stream: |
| 176 self.__stream.close() |
| 177 |
| 178 def _ExecuteCallback(self, callback, response): |
| 179 # TODO(craigcitro): Push these into a queue. |
| 180 if callback is not None: |
| 181 threading.Thread(target=callback, args=(response, self)).start() |
| 182 |
| 183 |
| 184 class Download(_Transfer): |
| 185 |
| 186 """Data for a single download. |
| 187 |
| 188 Public attributes: |
| 189 chunksize: default chunksize to use for transfers. |
| 190 """ |
| 191 _ACCEPTABLE_STATUSES = set(( |
| 192 http_client.OK, |
| 193 http_client.NO_CONTENT, |
| 194 http_client.PARTIAL_CONTENT, |
| 195 http_client.REQUESTED_RANGE_NOT_SATISFIABLE, |
| 196 )) |
| 197 _REQUIRED_SERIALIZATION_KEYS = set(( |
| 198 'auto_transfer', 'progress', 'total_size', 'url')) |
| 199 |
| 200 def __init__(self, stream, progress_callback=None, finish_callback=None, |
| 201 **kwds): |
| 202 total_size = kwds.pop('total_size', None) |
| 203 super(Download, self).__init__(stream, **kwds) |
| 204 self.__initial_response = None |
| 205 self.__progress = 0 |
| 206 self.__total_size = total_size |
| 207 self.__encoding = None |
| 208 |
| 209 self.progress_callback = progress_callback |
| 210 self.finish_callback = finish_callback |
| 211 |
| 212 @property |
| 213 def progress(self): |
| 214 return self.__progress |
| 215 |
| 216 @property |
| 217 def encoding(self): |
| 218 return self.__encoding |
| 219 |
| 220 @classmethod |
| 221 def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds): |
| 222 """Create a new download object from a filename.""" |
| 223 path = os.path.expanduser(filename) |
| 224 if os.path.exists(path) and not overwrite: |
| 225 raise exceptions.InvalidUserInputError( |
| 226 'File %s exists and overwrite not specified' % path) |
| 227 return cls(open(path, 'wb'), close_stream=True, |
| 228 auto_transfer=auto_transfer, **kwds) |
| 229 |
| 230 @classmethod |
| 231 def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds): |
| 232 """Create a new Download object from a stream.""" |
| 233 return cls(stream, auto_transfer=auto_transfer, total_size=total_size, |
| 234 **kwds) |
| 235 |
| 236 @classmethod |
| 237 def FromData(cls, stream, json_data, http=None, auto_transfer=None, |
| 238 **kwds): |
| 239 """Create a new Download object from a stream and serialized data.""" |
| 240 info = json.loads(json_data) |
| 241 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) |
| 242 if missing_keys: |
| 243 raise exceptions.InvalidDataError( |
| 244 'Invalid serialization data, missing keys: %s' % ( |
| 245 ', '.join(missing_keys))) |
| 246 download = cls.FromStream(stream, **kwds) |
| 247 if auto_transfer is not None: |
| 248 download.auto_transfer = auto_transfer |
| 249 else: |
| 250 download.auto_transfer = info['auto_transfer'] |
| 251 setattr(download, '_Download__progress', info['progress']) |
| 252 setattr(download, '_Download__total_size', info['total_size']) |
| 253 download._Initialize( # pylint: disable=protected-access |
| 254 http, info['url']) |
| 255 return download |
| 256 |
| 257 @property |
| 258 def serialization_data(self): |
| 259 self.EnsureInitialized() |
| 260 return { |
| 261 'auto_transfer': self.auto_transfer, |
| 262 'progress': self.progress, |
| 263 'total_size': self.total_size, |
| 264 'url': self.url, |
| 265 } |
| 266 |
| 267 @property |
| 268 def total_size(self): |
| 269 return self.__total_size |
| 270 |
| 271 def __str__(self): |
| 272 if not self.initialized: |
| 273 return 'Download (uninitialized)' |
| 274 else: |
| 275 return 'Download with %d/%s bytes transferred from url %s' % ( |
| 276 self.progress, self.total_size, self.url) |
| 277 |
| 278 def ConfigureRequest(self, http_request, url_builder): |
| 279 url_builder.query_params['alt'] = 'media' |
| 280 # TODO(craigcitro): We need to send range requests because by |
| 281 # default httplib2 stores entire reponses in memory. Override |
| 282 # httplib2's download method (as gsutil does) so that this is not |
| 283 # necessary. |
| 284 http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,) |
| 285 |
| 286 def __SetTotal(self, info): |
| 287 if 'content-range' in info: |
| 288 _, _, total = info['content-range'].rpartition('/') |
| 289 if total != '*': |
| 290 self.__total_size = int(total) |
| 291 # Note "total_size is None" means we don't know it; if no size |
| 292 # info was returned on our initial range request, that means we |
| 293 # have a 0-byte file. (That last statement has been verified |
| 294 # empirically, but is not clearly documented anywhere.) |
| 295 if self.total_size is None: |
| 296 self.__total_size = 0 |
| 297 |
| 298 def InitializeDownload(self, http_request, http=None, client=None): |
| 299 """Initialize this download by making a request. |
| 300 |
| 301 Args: |
| 302 http_request: The HttpRequest to use to initialize this download. |
| 303 http: The httplib2.Http instance for this request. |
| 304 client: If provided, let this client process the final URL before |
| 305 sending any additional requests. If client is provided and |
| 306 http is not, client.http will be used instead. |
| 307 """ |
| 308 self.EnsureUninitialized() |
| 309 if http is None and client is None: |
| 310 raise exceptions.UserError('Must provide client or http.') |
| 311 http = http or client.http |
| 312 if client is not None: |
| 313 http_request.url = client.FinalizeTransferUrl(http_request.url) |
| 314 url = http_request.url |
| 315 if self.auto_transfer: |
| 316 end_byte = self.__ComputeEndByte(0) |
| 317 self.__SetRangeHeader(http_request, 0, end_byte) |
| 318 response = http_wrapper.MakeRequest( |
| 319 self.bytes_http or http, http_request) |
| 320 if response.status_code not in self._ACCEPTABLE_STATUSES: |
| 321 raise exceptions.HttpError.FromResponse(response) |
| 322 self.__initial_response = response |
| 323 self.__SetTotal(response.info) |
| 324 url = response.info.get('content-location', response.request_url) |
| 325 if client is not None: |
| 326 url = client.FinalizeTransferUrl(url) |
| 327 self._Initialize(http, url) |
| 328 # Unless the user has requested otherwise, we want to just |
| 329 # go ahead and pump the bytes now. |
| 330 if self.auto_transfer: |
| 331 self.StreamInChunks() |
| 332 |
| 333 def __NormalizeStartEnd(self, start, end=None): |
| 334 if end is not None: |
| 335 if start < 0: |
| 336 raise exceptions.TransferInvalidError( |
| 337 'Cannot have end index with negative start index') |
| 338 elif start >= self.total_size: |
| 339 raise exceptions.TransferInvalidError( |
| 340 'Cannot have start index greater than total size') |
| 341 end = min(end, self.total_size - 1) |
| 342 if end < start: |
| 343 raise exceptions.TransferInvalidError( |
| 344 'Range requested with end[%s] < start[%s]' % (end, start)) |
| 345 return start, end |
| 346 else: |
| 347 if start < 0: |
| 348 start = max(0, start + self.total_size) |
| 349 return start, self.total_size - 1 |
| 350 |
| 351 def __SetRangeHeader(self, request, start, end=None): |
| 352 if start < 0: |
| 353 request.headers['range'] = 'bytes=%d' % start |
| 354 elif end is None: |
| 355 request.headers['range'] = 'bytes=%d-' % start |
| 356 else: |
| 357 request.headers['range'] = 'bytes=%d-%d' % (start, end) |
| 358 |
| 359 def __ComputeEndByte(self, start, end=None, use_chunks=True): |
| 360 """Compute the last byte to fetch for this request. |
| 361 |
| 362 This is all based on the HTTP spec for Range and |
| 363 Content-Range. |
| 364 |
| 365 Note that this is potentially confusing in several ways: |
| 366 * the value for the last byte is 0-based, eg "fetch 10 bytes |
| 367 from the beginning" would return 9 here. |
| 368 * if we have no information about size, and don't want to |
| 369 use the chunksize, we'll return None. |
| 370 See the tests for more examples. |
| 371 |
| 372 Args: |
| 373 start: byte to start at. |
| 374 end: (int or None, default: None) Suggested last byte. |
| 375 use_chunks: (bool, default: True) If False, ignore self.chunksize. |
| 376 |
| 377 Returns: |
| 378 Last byte to use in a Range header, or None. |
| 379 |
| 380 """ |
| 381 end_byte = end |
| 382 |
| 383 if start < 0 and not self.total_size: |
| 384 return end_byte |
| 385 |
| 386 if use_chunks: |
| 387 alternate = start + self.chunksize - 1 |
| 388 if end_byte is not None: |
| 389 end_byte = min(end_byte, alternate) |
| 390 else: |
| 391 end_byte = alternate |
| 392 |
| 393 if self.total_size: |
| 394 alternate = self.total_size - 1 |
| 395 if end_byte is not None: |
| 396 end_byte = min(end_byte, alternate) |
| 397 else: |
| 398 end_byte = alternate |
| 399 |
| 400 return end_byte |
| 401 |
| 402 def __GetChunk(self, start, end, additional_headers=None): |
| 403 """Retrieve a chunk, and return the full response.""" |
| 404 self.EnsureInitialized() |
| 405 request = http_wrapper.Request(url=self.url) |
| 406 self.__SetRangeHeader(request, start, end=end) |
| 407 if additional_headers is not None: |
| 408 request.headers.update(additional_headers) |
| 409 return http_wrapper.MakeRequest( |
| 410 self.bytes_http, request, retry_func=self.retry_func, |
| 411 retries=self.num_retries) |
| 412 |
| 413 def __ProcessResponse(self, response): |
| 414 """Process response (by updating self and writing to self.stream).""" |
| 415 if response.status_code not in self._ACCEPTABLE_STATUSES: |
| 416 # We distinguish errors that mean we made a mistake in setting |
| 417 # up the transfer versus something we should attempt again. |
| 418 if response.status_code in (http_client.FORBIDDEN, |
| 419 http_client.NOT_FOUND): |
| 420 raise exceptions.HttpError.FromResponse(response) |
| 421 else: |
| 422 raise exceptions.TransferRetryError(response.content) |
| 423 if response.status_code in (http_client.OK, |
| 424 http_client.PARTIAL_CONTENT): |
| 425 self.stream.write(response.content) |
| 426 self.__progress += response.length |
| 427 if response.info and 'content-encoding' in response.info: |
| 428 # TODO(craigcitro): Handle the case where this changes over a |
| 429 # download. |
| 430 self.__encoding = response.info['content-encoding'] |
| 431 elif response.status_code == http_client.NO_CONTENT: |
| 432 # It's important to write something to the stream for the case |
| 433 # of a 0-byte download to a file, as otherwise python won't |
| 434 # create the file. |
| 435 self.stream.write('') |
| 436 return response |
| 437 |
| 438 def GetRange(self, start, end=None, additional_headers=None, |
| 439 use_chunks=True): |
| 440 """Retrieve a given byte range from this download, inclusive. |
| 441 |
| 442 Range must be of one of these three forms: |
| 443 * 0 <= start, end = None: Fetch from start to the end of the file. |
| 444 * 0 <= start <= end: Fetch the bytes from start to end. |
| 445 * start < 0, end = None: Fetch the last -start bytes of the file. |
| 446 |
| 447 (These variations correspond to those described in the HTTP 1.1 |
| 448 protocol for range headers in RFC 2616, sec. 14.35.1.) |
| 449 |
| 450 Args: |
| 451 start: (int) Where to start fetching bytes. (See above.) |
| 452 end: (int, optional) Where to stop fetching bytes. (See above.) |
| 453 additional_headers: (bool, optional) Any additional headers to |
| 454 pass with the request. |
| 455 use_chunks: (bool, default: True) If False, ignore self.chunksize |
| 456 and fetch this range in a single request. |
| 457 |
| 458 Returns: |
| 459 None. Streams bytes into self.stream. |
| 460 """ |
| 461 self.EnsureInitialized() |
| 462 progress_end_normalized = False |
| 463 if self.total_size is not None: |
| 464 progress, end_byte = self.__NormalizeStartEnd(start, end) |
| 465 progress_end_normalized = True |
| 466 else: |
| 467 progress = start |
| 468 end_byte = end |
| 469 while (not progress_end_normalized or end_byte is None or |
| 470 progress <= end_byte): |
| 471 end_byte = self.__ComputeEndByte(progress, end=end_byte, |
| 472 use_chunks=use_chunks) |
| 473 response = self.__GetChunk(progress, end_byte, |
| 474 additional_headers=additional_headers) |
| 475 if not progress_end_normalized: |
| 476 self.__SetTotal(response.info) |
| 477 progress, end_byte = self.__NormalizeStartEnd(start, end) |
| 478 progress_end_normalized = True |
| 479 response = self.__ProcessResponse(response) |
| 480 progress += response.length |
| 481 if response.length == 0: |
| 482 raise exceptions.TransferRetryError( |
| 483 'Zero bytes unexpectedly returned in download response') |
| 484 |
| 485 def StreamInChunks(self, callback=None, finish_callback=None, |
| 486 additional_headers=None): |
| 487 """Stream the entire download in chunks.""" |
| 488 self.StreamMedia(callback=callback, finish_callback=finish_callback, |
| 489 additional_headers=additional_headers, |
| 490 use_chunks=True) |
| 491 |
| 492 def StreamMedia(self, callback=None, finish_callback=None, |
| 493 additional_headers=None, use_chunks=True): |
| 494 """Stream the entire download. |
| 495 |
| 496 Args: |
| 497 callback: (default: None) Callback to call as each chunk is |
| 498 completed. |
| 499 finish_callback: (default: None) Callback to call when the |
| 500 download is complete. |
| 501 additional_headers: (default: None) Additional headers to |
| 502 include in fetching bytes. |
| 503 use_chunks: (bool, default: True) If False, ignore self.chunksize |
| 504 and stream this download in a single request. |
| 505 |
| 506 Returns: |
| 507 None. Streams bytes into self.stream. |
| 508 """ |
| 509 callback = callback or self.progress_callback |
| 510 finish_callback = finish_callback or self.finish_callback |
| 511 |
| 512 self.EnsureInitialized() |
| 513 while True: |
| 514 if self.__initial_response is not None: |
| 515 response = self.__initial_response |
| 516 self.__initial_response = None |
| 517 else: |
| 518 end_byte = self.__ComputeEndByte(self.progress, |
| 519 use_chunks=use_chunks) |
| 520 response = self.__GetChunk( |
| 521 self.progress, end_byte, |
| 522 additional_headers=additional_headers) |
| 523 if self.total_size is None: |
| 524 self.__SetTotal(response.info) |
| 525 response = self.__ProcessResponse(response) |
| 526 self._ExecuteCallback(callback, response) |
| 527 if (response.status_code == http_client.OK or |
| 528 self.progress >= self.total_size): |
| 529 break |
| 530 self._ExecuteCallback(finish_callback, response) |
| 531 |
| 532 |
| 533 class Upload(_Transfer): |
| 534 |
| 535 """Data for a single Upload. |
| 536 |
| 537 Fields: |
| 538 stream: The stream to upload. |
| 539 mime_type: MIME type of the upload. |
| 540 total_size: (optional) Total upload size for the stream. |
| 541 close_stream: (default: False) Whether or not we should close the |
| 542 stream when finished with the upload. |
| 543 auto_transfer: (default: True) If True, stream all bytes as soon as |
| 544 the upload is created. |
| 545 """ |
| 546 _REQUIRED_SERIALIZATION_KEYS = set(( |
| 547 'auto_transfer', 'mime_type', 'total_size', 'url')) |
| 548 |
| 549 def __init__(self, stream, mime_type, total_size=None, http=None, |
| 550 close_stream=False, chunksize=None, auto_transfer=True, |
| 551 progress_callback=None, finish_callback=None, |
| 552 **kwds): |
| 553 super(Upload, self).__init__( |
| 554 stream, close_stream=close_stream, chunksize=chunksize, |
| 555 auto_transfer=auto_transfer, http=http, **kwds) |
| 556 self.__complete = False |
| 557 self.__final_response = None |
| 558 self.__mime_type = mime_type |
| 559 self.__progress = 0 |
| 560 self.__server_chunk_granularity = None |
| 561 self.__strategy = None |
| 562 self.__total_size = None |
| 563 |
| 564 self.progress_callback = progress_callback |
| 565 self.finish_callback = finish_callback |
| 566 self.total_size = total_size |
| 567 |
| 568 @property |
| 569 def progress(self): |
| 570 return self.__progress |
| 571 |
| 572 @classmethod |
| 573 def FromFile(cls, filename, mime_type=None, auto_transfer=True, **kwds): |
| 574 """Create a new Upload object from a filename.""" |
| 575 path = os.path.expanduser(filename) |
| 576 if not os.path.exists(path): |
| 577 raise exceptions.NotFoundError('Could not find file %s' % path) |
| 578 if not mime_type: |
| 579 mime_type, _ = mimetypes.guess_type(path) |
| 580 if mime_type is None: |
| 581 raise exceptions.InvalidUserInputError( |
| 582 'Could not guess mime type for %s' % path) |
| 583 size = os.stat(path).st_size |
| 584 return cls(open(path, 'rb'), mime_type, total_size=size, |
| 585 close_stream=True, auto_transfer=auto_transfer, **kwds) |
| 586 |
| 587 @classmethod |
| 588 def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True, |
| 589 **kwds): |
| 590 """Create a new Upload object from a stream.""" |
| 591 if mime_type is None: |
| 592 raise exceptions.InvalidUserInputError( |
| 593 'No mime_type specified for stream') |
| 594 return cls(stream, mime_type, total_size=total_size, |
| 595 close_stream=False, auto_transfer=auto_transfer, **kwds) |
| 596 |
| 597 @classmethod |
| 598 def FromData(cls, stream, json_data, http, auto_transfer=None, **kwds): |
| 599 """Create a new Upload of stream from serialized json_data and http.""" |
| 600 info = json.loads(json_data) |
| 601 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) |
| 602 if missing_keys: |
| 603 raise exceptions.InvalidDataError( |
| 604 'Invalid serialization data, missing keys: %s' % ( |
| 605 ', '.join(missing_keys))) |
| 606 if 'total_size' in kwds: |
| 607 raise exceptions.InvalidUserInputError( |
| 608 'Cannot override total_size on serialized Upload') |
| 609 upload = cls.FromStream(stream, info['mime_type'], |
| 610 total_size=info.get('total_size'), **kwds) |
| 611 if isinstance(stream, io.IOBase) and not stream.seekable(): |
| 612 raise exceptions.InvalidUserInputError( |
| 613 'Cannot restart resumable upload on non-seekable stream') |
| 614 if auto_transfer is not None: |
| 615 upload.auto_transfer = auto_transfer |
| 616 else: |
| 617 upload.auto_transfer = info['auto_transfer'] |
| 618 upload.strategy = RESUMABLE_UPLOAD |
| 619 upload._Initialize( # pylint: disable=protected-access |
| 620 http, info['url']) |
| 621 upload.RefreshResumableUploadState() |
| 622 upload.EnsureInitialized() |
| 623 if upload.auto_transfer: |
| 624 upload.StreamInChunks() |
| 625 return upload |
| 626 |
| 627 @property |
| 628 def serialization_data(self): |
| 629 self.EnsureInitialized() |
| 630 if self.strategy != RESUMABLE_UPLOAD: |
| 631 raise exceptions.InvalidDataError( |
| 632 'Serialization only supported for resumable uploads') |
| 633 return { |
| 634 'auto_transfer': self.auto_transfer, |
| 635 'mime_type': self.mime_type, |
| 636 'total_size': self.total_size, |
| 637 'url': self.url, |
| 638 } |
| 639 |
| 640 @property |
| 641 def complete(self): |
| 642 return self.__complete |
| 643 |
| 644 @property |
| 645 def mime_type(self): |
| 646 return self.__mime_type |
| 647 |
| 648 def __str__(self): |
| 649 if not self.initialized: |
| 650 return 'Upload (uninitialized)' |
| 651 else: |
| 652 return 'Upload with %d/%s bytes transferred for url %s' % ( |
| 653 self.progress, self.total_size or '???', self.url) |
| 654 |
| 655 @property |
| 656 def strategy(self): |
| 657 return self.__strategy |
| 658 |
| 659 @strategy.setter |
| 660 def strategy(self, value): |
| 661 if value not in (SIMPLE_UPLOAD, RESUMABLE_UPLOAD): |
| 662 raise exceptions.UserError(( |
| 663 'Invalid value "%s" for upload strategy, must be one of ' |
| 664 '"simple" or "resumable".') % value) |
| 665 self.__strategy = value |
| 666 |
| 667 @property |
| 668 def total_size(self): |
| 669 return self.__total_size |
| 670 |
| 671 @total_size.setter |
| 672 def total_size(self, value): |
| 673 self.EnsureUninitialized() |
| 674 self.__total_size = value |
| 675 |
| 676 def __SetDefaultUploadStrategy(self, upload_config, http_request): |
| 677 """Determine and set the default upload strategy for this upload. |
| 678 |
| 679 We generally prefer simple or multipart, unless we're forced to |
| 680 use resumable. This happens when any of (1) the upload is too |
| 681 large, (2) the simple endpoint doesn't support multipart requests |
| 682 and we have metadata, or (3) there is no simple upload endpoint. |
| 683 |
| 684 Args: |
| 685 upload_config: Configuration for the upload endpoint. |
| 686 http_request: The associated http request. |
| 687 |
| 688 Returns: |
| 689 None. |
| 690 """ |
| 691 if upload_config.resumable_path is None: |
| 692 self.strategy = SIMPLE_UPLOAD |
| 693 if self.strategy is not None: |
| 694 return |
| 695 strategy = SIMPLE_UPLOAD |
| 696 if (self.total_size is not None and |
| 697 self.total_size > _RESUMABLE_UPLOAD_THRESHOLD): |
| 698 strategy = RESUMABLE_UPLOAD |
| 699 if http_request.body and not upload_config.simple_multipart: |
| 700 strategy = RESUMABLE_UPLOAD |
| 701 if not upload_config.simple_path: |
| 702 strategy = RESUMABLE_UPLOAD |
| 703 self.strategy = strategy |
| 704 |
| 705 def ConfigureRequest(self, upload_config, http_request, url_builder): |
| 706 """Configure the request and url for this upload.""" |
| 707 # Validate total_size vs. max_size |
| 708 if (self.total_size and upload_config.max_size and |
| 709 self.total_size > upload_config.max_size): |
| 710 raise exceptions.InvalidUserInputError( |
| 711 'Upload too big: %s larger than max size %s' % ( |
| 712 self.total_size, upload_config.max_size)) |
| 713 # Validate mime type |
| 714 if not util.AcceptableMimeType(upload_config.accept, self.mime_type): |
| 715 raise exceptions.InvalidUserInputError( |
| 716 'MIME type %s does not match any accepted MIME ranges %s' % ( |
| 717 self.mime_type, upload_config.accept)) |
| 718 |
| 719 self.__SetDefaultUploadStrategy(upload_config, http_request) |
| 720 if self.strategy == SIMPLE_UPLOAD: |
| 721 url_builder.relative_path = upload_config.simple_path |
| 722 if http_request.body: |
| 723 url_builder.query_params['uploadType'] = 'multipart' |
| 724 self.__ConfigureMultipartRequest(http_request) |
| 725 else: |
| 726 url_builder.query_params['uploadType'] = 'media' |
| 727 self.__ConfigureMediaRequest(http_request) |
| 728 else: |
| 729 url_builder.relative_path = upload_config.resumable_path |
| 730 url_builder.query_params['uploadType'] = 'resumable' |
| 731 self.__ConfigureResumableRequest(http_request) |
| 732 |
| 733 def __ConfigureMediaRequest(self, http_request): |
| 734 """Configure http_request as a simple request for this upload.""" |
| 735 http_request.headers['content-type'] = self.mime_type |
| 736 http_request.body = self.stream.read() |
| 737 http_request.loggable_body = '<media body>' |
| 738 |
| 739 def __ConfigureMultipartRequest(self, http_request): |
| 740 """Configure http_request as a multipart request for this upload.""" |
| 741 # This is a multipart/related upload. |
| 742 msg_root = mime_multipart.MIMEMultipart('related') |
| 743 # msg_root should not write out its own headers |
| 744 setattr(msg_root, '_write_headers', lambda self: None) |
| 745 |
| 746 # attach the body as one part |
| 747 msg = mime_nonmultipart.MIMENonMultipart( |
| 748 *http_request.headers['content-type'].split('/')) |
| 749 msg.set_payload(http_request.body) |
| 750 msg_root.attach(msg) |
| 751 |
| 752 # attach the media as the second part |
| 753 msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/')) |
| 754 msg['Content-Transfer-Encoding'] = 'binary' |
| 755 msg.set_payload(self.stream.read()) |
| 756 msg_root.attach(msg) |
| 757 |
| 758 # NOTE: We encode the body, but can't use |
| 759 # `email.message.Message.as_string` because it prepends |
| 760 # `> ` to `From ` lines. |
| 761 # NOTE: We must use six.StringIO() instead of io.StringIO() since the |
| 762 # `email` library uses cStringIO in Py2 and io.StringIO in Py3. |
| 763 fp = six.StringIO() |
| 764 g = email_generator.Generator(fp, mangle_from_=False) |
| 765 g.flatten(msg_root, unixfrom=False) |
| 766 http_request.body = fp.getvalue() |
| 767 |
| 768 multipart_boundary = msg_root.get_boundary() |
| 769 http_request.headers['content-type'] = ( |
| 770 'multipart/related; boundary=%r' % multipart_boundary) |
| 771 |
| 772 body_components = http_request.body.split(multipart_boundary) |
| 773 headers, _, _ = body_components[-2].partition('\n\n') |
| 774 body_components[-2] = '\n\n'.join([headers, '<media body>\n\n--']) |
| 775 http_request.loggable_body = multipart_boundary.join(body_components) |
| 776 |
| 777 def __ConfigureResumableRequest(self, http_request): |
| 778 http_request.headers['X-Upload-Content-Type'] = self.mime_type |
| 779 if self.total_size is not None: |
| 780 http_request.headers[ |
| 781 'X-Upload-Content-Length'] = str(self.total_size) |
| 782 |
| 783 def RefreshResumableUploadState(self): |
| 784 """Talk to the server and refresh the state of this resumable upload. |
| 785 |
| 786 Returns: |
| 787 Response if the upload is complete. |
| 788 """ |
| 789 if self.strategy != RESUMABLE_UPLOAD: |
| 790 return |
| 791 self.EnsureInitialized() |
| 792 refresh_request = http_wrapper.Request( |
| 793 url=self.url, http_method='PUT', |
| 794 headers={'Content-Range': 'bytes */*'}) |
| 795 refresh_response = http_wrapper.MakeRequest( |
| 796 self.http, refresh_request, redirections=0, |
| 797 retries=self.num_retries) |
| 798 range_header = self._GetRangeHeaderFromResponse(refresh_response) |
| 799 if refresh_response.status_code in (http_client.OK, |
| 800 http_client.CREATED): |
| 801 self.__complete = True |
| 802 self.__progress = self.total_size |
| 803 self.stream.seek(self.progress) |
| 804 # If we're finished, the refresh response will contain the metadata |
| 805 # originally requested. Cache it so it can be returned in |
| 806 # StreamInChunks. |
| 807 self.__final_response = refresh_response |
| 808 elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE: |
| 809 if range_header is None: |
| 810 self.__progress = 0 |
| 811 else: |
| 812 self.__progress = self.__GetLastByte(range_header) + 1 |
| 813 self.stream.seek(self.progress) |
| 814 else: |
| 815 raise exceptions.HttpError.FromResponse(refresh_response) |
| 816 |
| 817 def _GetRangeHeaderFromResponse(self, response): |
| 818 return response.info.get('Range', response.info.get('range')) |
| 819 |
| 820 def InitializeUpload(self, http_request, http=None, client=None): |
| 821 """Initialize this upload from the given http_request.""" |
| 822 if self.strategy is None: |
| 823 raise exceptions.UserError( |
| 824 'No upload strategy set; did you call ConfigureRequest?') |
| 825 if http is None and client is None: |
| 826 raise exceptions.UserError('Must provide client or http.') |
| 827 if self.strategy != RESUMABLE_UPLOAD: |
| 828 return |
| 829 http = http or client.http |
| 830 if client is not None: |
| 831 http_request.url = client.FinalizeTransferUrl(http_request.url) |
| 832 self.EnsureUninitialized() |
| 833 http_response = http_wrapper.MakeRequest(http, http_request, |
| 834 retries=self.num_retries) |
| 835 if http_response.status_code != http_client.OK: |
| 836 raise exceptions.HttpError.FromResponse(http_response) |
| 837 |
| 838 self.__server_chunk_granularity = http_response.info.get( |
| 839 'X-Goog-Upload-Chunk-Granularity') |
| 840 url = http_response.info['location'] |
| 841 if client is not None: |
| 842 url = client.FinalizeTransferUrl(url) |
| 843 self._Initialize(http, url) |
| 844 |
| 845 # Unless the user has requested otherwise, we want to just |
| 846 # go ahead and pump the bytes now. |
| 847 if self.auto_transfer: |
| 848 return self.StreamInChunks() |
| 849 else: |
| 850 return http_response |
| 851 |
| 852 def __GetLastByte(self, range_header): |
| 853 _, _, end = range_header.partition('-') |
| 854 # TODO(craigcitro): Validate start == 0? |
| 855 return int(end) |
| 856 |
| 857 def __ValidateChunksize(self, chunksize=None): |
| 858 if self.__server_chunk_granularity is None: |
| 859 return |
| 860 chunksize = chunksize or self.chunksize |
| 861 if chunksize % self.__server_chunk_granularity: |
| 862 raise exceptions.ConfigurationValueError( |
| 863 'Server requires chunksize to be a multiple of %d', |
| 864 self.__server_chunk_granularity) |
| 865 |
| 866 def __StreamMedia(self, callback=None, finish_callback=None, |
| 867 additional_headers=None, use_chunks=True): |
| 868 """Helper function for StreamMedia / StreamInChunks.""" |
| 869 if self.strategy != RESUMABLE_UPLOAD: |
| 870 raise exceptions.InvalidUserInputError( |
| 871 'Cannot stream non-resumable upload') |
| 872 callback = callback or self.progress_callback |
| 873 finish_callback = finish_callback or self.finish_callback |
| 874 # final_response is set if we resumed an already-completed upload. |
| 875 response = self.__final_response |
| 876 send_func = self.__SendChunk if use_chunks else self.__SendMediaBody |
| 877 if use_chunks: |
| 878 self.__ValidateChunksize(self.chunksize) |
| 879 self.EnsureInitialized() |
| 880 while not self.complete: |
| 881 response = send_func(self.stream.tell(), |
| 882 additional_headers=additional_headers) |
| 883 if response.status_code in (http_client.OK, http_client.CREATED): |
| 884 self.__complete = True |
| 885 break |
| 886 self.__progress = self.__GetLastByte(response.info['range']) |
| 887 if self.progress + 1 != self.stream.tell(): |
| 888 # TODO(craigcitro): Add a better way to recover here. |
| 889 raise exceptions.CommunicationError( |
| 890 'Failed to transfer all bytes in chunk, upload paused at ' |
| 891 'byte %d' % self.progress) |
| 892 self._ExecuteCallback(callback, response) |
| 893 if self.__complete and hasattr(self.stream, 'seek'): |
| 894 current_pos = self.stream.tell() |
| 895 self.stream.seek(0, os.SEEK_END) |
| 896 end_pos = self.stream.tell() |
| 897 self.stream.seek(current_pos) |
| 898 if current_pos != end_pos: |
| 899 raise exceptions.TransferInvalidError( |
| 900 'Upload complete with %s additional bytes left in stream' % |
| 901 (int(end_pos) - int(current_pos))) |
| 902 self._ExecuteCallback(finish_callback, response) |
| 903 return response |
| 904 |
| 905 def StreamMedia(self, callback=None, finish_callback=None, |
| 906 additional_headers=None): |
| 907 """Send this resumable upload in a single request. |
| 908 |
| 909 Args: |
| 910 callback: Progress callback function with inputs |
| 911 (http_wrapper.Response, transfer.Upload) |
| 912 finish_callback: Final callback function with inputs |
| 913 (http_wrapper.Response, transfer.Upload) |
| 914 additional_headers: Dict of headers to include with the upload |
| 915 http_wrapper.Request. |
| 916 |
| 917 Returns: |
| 918 http_wrapper.Response of final response. |
| 919 """ |
| 920 return self.__StreamMedia( |
| 921 callback=callback, finish_callback=finish_callback, |
| 922 additional_headers=additional_headers, use_chunks=False) |
| 923 |
| 924 def StreamInChunks(self, callback=None, finish_callback=None, |
| 925 additional_headers=None): |
| 926 """Send this (resumable) upload in chunks.""" |
| 927 return self.__StreamMedia( |
| 928 callback=callback, finish_callback=finish_callback, |
| 929 additional_headers=additional_headers) |
| 930 |
| 931 def __SendMediaRequest(self, request, end): |
| 932 """Request helper function for SendMediaBody & SendChunk.""" |
| 933 response = http_wrapper.MakeRequest( |
| 934 self.bytes_http, request, retry_func=self.retry_func, |
| 935 retries=self.num_retries) |
| 936 if response.status_code not in (http_client.OK, http_client.CREATED, |
| 937 http_wrapper.RESUME_INCOMPLETE): |
| 938 # We want to reset our state to wherever the server left us |
| 939 # before this failed request, and then raise. |
| 940 self.RefreshResumableUploadState() |
| 941 raise exceptions.HttpError.FromResponse(response) |
| 942 if response.status_code == http_wrapper.RESUME_INCOMPLETE: |
| 943 last_byte = self.__GetLastByte( |
| 944 self._GetRangeHeaderFromResponse(response)) |
| 945 if last_byte + 1 != end: |
| 946 self.stream.seek(last_byte) |
| 947 return response |
| 948 |
| 949 def __SendMediaBody(self, start, additional_headers=None): |
| 950 """Send the entire media stream in a single request.""" |
| 951 self.EnsureInitialized() |
| 952 if self.total_size is None: |
| 953 raise exceptions.TransferInvalidError( |
| 954 'Total size must be known for SendMediaBody') |
| 955 body_stream = stream_slice.StreamSlice( |
| 956 self.stream, self.total_size - start) |
| 957 |
| 958 request = http_wrapper.Request(url=self.url, http_method='PUT', |
| 959 body=body_stream) |
| 960 request.headers['Content-Type'] = self.mime_type |
| 961 if start == self.total_size: |
| 962 # End of an upload with 0 bytes left to send; just finalize. |
| 963 range_string = 'bytes */%s' % self.total_size |
| 964 else: |
| 965 range_string = 'bytes %s-%s/%s' % (start, self.total_size - 1, |
| 966 self.total_size) |
| 967 |
| 968 request.headers['Content-Range'] = range_string |
| 969 if additional_headers: |
| 970 request.headers.update(additional_headers) |
| 971 |
| 972 return self.__SendMediaRequest(request, self.total_size) |
| 973 |
| 974 def __SendChunk(self, start, additional_headers=None): |
| 975 """Send the specified chunk.""" |
| 976 self.EnsureInitialized() |
| 977 no_log_body = self.total_size is None |
| 978 if self.total_size is None: |
| 979 # For the streaming resumable case, we need to detect when |
| 980 # we're at the end of the stream. |
| 981 body_stream = buffered_stream.BufferedStream( |
| 982 self.stream, start, self.chunksize) |
| 983 end = body_stream.stream_end_position |
| 984 if body_stream.stream_exhausted: |
| 985 self.__total_size = end |
| 986 # TODO: Here, change body_stream from a stream to a string object, |
| 987 # which means reading a chunk into memory. This works around |
| 988 # https://code.google.com/p/httplib2/issues/detail?id=176 which can |
| 989 # cause httplib2 to skip bytes on 401's for file objects. |
| 990 # Rework this solution to be more general. |
| 991 # pylint: disable=redefined-variable-type |
| 992 body_stream = body_stream.read(self.chunksize) |
| 993 else: |
| 994 end = min(start + self.chunksize, self.total_size) |
| 995 body_stream = stream_slice.StreamSlice(self.stream, end - start) |
| 996 # TODO(craigcitro): Think about clearer errors on "no data in |
| 997 # stream". |
| 998 request = http_wrapper.Request(url=self.url, http_method='PUT', |
| 999 body=body_stream) |
| 1000 request.headers['Content-Type'] = self.mime_type |
| 1001 if no_log_body: |
| 1002 # Disable logging of streaming body. |
| 1003 # TODO: Remove no_log_body and rework as part of a larger logs |
| 1004 # refactor. |
| 1005 request.loggable_body = '<media body>' |
| 1006 if self.total_size is None: |
| 1007 # Streaming resumable upload case, unknown total size. |
| 1008 range_string = 'bytes %s-%s/*' % (start, end - 1) |
| 1009 elif end == start: |
| 1010 # End of an upload with 0 bytes left to send; just finalize. |
| 1011 range_string = 'bytes */%s' % self.total_size |
| 1012 else: |
| 1013 # Normal resumable upload case with known sizes. |
| 1014 range_string = 'bytes %s-%s/%s' % (start, end - 1, self.total_size) |
| 1015 |
| 1016 request.headers['Content-Range'] = range_string |
| 1017 if additional_headers: |
| 1018 request.headers.update(additional_headers) |
| 1019 |
| 1020 return self.__SendMediaRequest(request, end) |
OLD | NEW |