| OLD | NEW |
| (Empty) | |
| 1 # Copyright 2014 Google Inc. All Rights Reserved. |
| 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at |
| 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 # See the License for the specific language governing permissions and |
| 13 # limitations under the License. |
| 14 #!/usr/bin/env python |
| 15 """Upload and download support for apitools.""" |
| 16 |
| 17 import email.generator as email_generator |
| 18 import email.mime.multipart as mime_multipart |
| 19 import email.mime.nonmultipart as mime_nonmultipart |
| 20 import httplib |
| 21 import io |
| 22 import json |
| 23 import mimetypes |
| 24 import os |
| 25 import StringIO |
| 26 import threading |
| 27 |
| 28 from apiclient import mimeparse |
| 29 |
| 30 from gslib.third_party.storage_apitools import exceptions |
| 31 from gslib.third_party.storage_apitools import http_wrapper |
| 32 from gslib.third_party.storage_apitools import stream_slice |
| 33 |
| 34 __all__ = [ |
| 35 'Download', |
| 36 'Upload', |
| 37 ] |
| 38 |
| 39 _RESUMABLE_UPLOAD_THRESHOLD = 5 << 20 |
| 40 _SIMPLE_UPLOAD = 'simple' |
| 41 _RESUMABLE_UPLOAD = 'resumable' |
| 42 |
| 43 |
| 44 class _Transfer(object): |
| 45 """Generic bits common to Uploads and Downloads.""" |
| 46 |
| 47 def __init__(self, stream, close_stream=False, chunksize=None, |
| 48 auto_transfer=True, total_size=None, http=None): |
| 49 self.__bytes_http = None |
| 50 self.__close_stream = close_stream |
| 51 self.__http = http |
| 52 self.__stream = stream |
| 53 self.__url = None |
| 54 |
| 55 self.retry_func = http_wrapper.HandleExceptionsAndRebuildHttpConnections |
| 56 self.auto_transfer = auto_transfer |
| 57 self.chunksize = chunksize or 1048576L |
| 58 |
| 59 def __repr__(self): |
| 60 return str(self) |
| 61 |
| 62 @property |
| 63 def close_stream(self): |
| 64 return self.__close_stream |
| 65 |
| 66 @property |
| 67 def http(self): |
| 68 return self.__http |
| 69 |
| 70 @property |
| 71 def bytes_http(self): |
| 72 return self.__bytes_http or self.http |
| 73 |
| 74 @bytes_http.setter |
| 75 def bytes_http(self, value): |
| 76 self.__bytes_http = value |
| 77 |
| 78 @property |
| 79 def stream(self): |
| 80 return self.__stream |
| 81 |
| 82 @property |
| 83 def url(self): |
| 84 return self.__url |
| 85 |
| 86 def _Initialize(self, http, url): |
| 87 """Initialize this download by setting self.http and self.url. |
| 88 |
| 89 We want the user to be able to override self.http by having set |
| 90 the value in the constructor; in that case, we ignore the provided |
| 91 http. |
| 92 |
| 93 Args: |
| 94 http: An httplib2.Http instance or None. |
| 95 url: The url for this transfer. |
| 96 |
| 97 Returns: |
| 98 None. Initializes self. |
| 99 """ |
| 100 self.EnsureUninitialized() |
| 101 if self.http is None: |
| 102 self.__http = http or http_wrapper.GetHttp() |
| 103 self.__url = url |
| 104 |
| 105 @property |
| 106 def initialized(self): |
| 107 return self.url is not None and self.http is not None |
| 108 |
| 109 @property |
| 110 def _type_name(self): |
| 111 return type(self).__name__ |
| 112 |
| 113 def EnsureInitialized(self): |
| 114 if not self.initialized: |
| 115 raise exceptions.TransferInvalidError( |
| 116 'Cannot use uninitialized %s', self._type_name) |
| 117 |
| 118 def EnsureUninitialized(self): |
| 119 if self.initialized: |
| 120 raise exceptions.TransferInvalidError( |
| 121 'Cannot re-initialize %s', self._type_name) |
| 122 |
| 123 def __del__(self): |
| 124 if self.__close_stream: |
| 125 self.__stream.close() |
| 126 |
| 127 def _ExecuteCallback(self, callback, response): |
| 128 # TODO: Push these into a queue. |
| 129 if callback is not None: |
| 130 threading.Thread(target=callback, args=(response, self)).start() |
| 131 |
| 132 |
| 133 class Download(_Transfer): |
| 134 """Data for a single download. |
| 135 |
| 136 Public attributes: |
| 137 chunksize: default chunksize to use for transfers. |
| 138 """ |
| 139 _ACCEPTABLE_STATUSES = set(( |
| 140 httplib.OK, |
| 141 httplib.NO_CONTENT, |
| 142 httplib.PARTIAL_CONTENT, |
| 143 httplib.REQUESTED_RANGE_NOT_SATISFIABLE, |
| 144 )) |
| 145 _REQUIRED_SERIALIZATION_KEYS = set(( |
| 146 'auto_transfer', 'progress', 'total_size', 'url')) |
| 147 |
| 148 def __init__(self, *args, **kwds): |
| 149 super(Download, self).__init__(*args, **kwds) |
| 150 self.__initial_response = None |
| 151 self.__progress = 0 |
| 152 self.__total_size = kwds['total_size'] if 'total_size' in kwds else None |
| 153 self.__encoding = None |
| 154 |
| 155 @property |
| 156 def progress(self): |
| 157 return self.__progress |
| 158 |
| 159 @property |
| 160 def encoding(self): |
| 161 return self.__encoding |
| 162 |
| 163 @classmethod |
| 164 def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds): |
| 165 """Create a new download object from a filename.""" |
| 166 path = os.path.expanduser(filename) |
| 167 if os.path.exists(path) and not overwrite: |
| 168 raise exceptions.InvalidUserInputError( |
| 169 'File %s exists and overwrite not specified' % path) |
| 170 return cls(open(path, 'wb'), close_stream=True, auto_transfer=auto_transfer, |
| 171 **kwds) |
| 172 |
| 173 @classmethod |
| 174 def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds): |
| 175 """Create a new Download object from a stream.""" |
| 176 return cls(stream, auto_transfer=auto_transfer, total_size=total_size, |
| 177 **kwds) |
| 178 |
| 179 @classmethod |
| 180 def FromData(cls, stream, json_data, http=None, auto_transfer=None): |
| 181 """Create a new Download object from a stream and serialized data.""" |
| 182 info = json.loads(json_data) |
| 183 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) |
| 184 if missing_keys: |
| 185 raise exceptions.InvalidDataError( |
| 186 'Invalid serialization data, missing keys: %s' % ( |
| 187 ', '.join(missing_keys))) |
| 188 download = cls.FromStream(stream) |
| 189 if auto_transfer is not None: |
| 190 download.auto_transfer = auto_transfer |
| 191 else: |
| 192 download.auto_transfer = info['auto_transfer'] |
| 193 setattr(download, '_Download__progress', info['progress']) |
| 194 setattr(download, '_Download__total_size', info['total_size']) |
| 195 download._Initialize(http, info['url']) # pylint: disable=protected-access |
| 196 return download |
| 197 |
| 198 @property |
| 199 def serialization_data(self): |
| 200 self.EnsureInitialized() |
| 201 return { |
| 202 'auto_transfer': self.auto_transfer, |
| 203 'progress': self.progress, |
| 204 'total_size': self.total_size, |
| 205 'url': self.url, |
| 206 } |
| 207 |
| 208 @property |
| 209 def total_size(self): |
| 210 return self.__total_size |
| 211 |
| 212 def __str__(self): |
| 213 if not self.initialized: |
| 214 return 'Download (uninitialized)' |
| 215 else: |
| 216 return 'Download with %d/%s bytes transferred from url %s' % ( |
| 217 self.progress, self.total_size, self.url) |
| 218 |
| 219 def ConfigureRequest(self, http_request, url_builder): |
| 220 url_builder.query_params['alt'] = 'media' |
| 221 http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,) |
| 222 |
| 223 def __SetTotal(self, info): |
| 224 if 'content-range' in info: |
| 225 _, _, total = info['content-range'].rpartition('/') |
| 226 if total != '*': |
| 227 self.__total_size = int(total) |
| 228 # Note "total_size is None" means we don't know it; if no size |
| 229 # info was returned on our initial range request, that means we |
| 230 # have a 0-byte file. (That last statement has been verified |
| 231 # empirically, but is not clearly documented anywhere.) |
| 232 if self.total_size is None: |
| 233 self.__total_size = 0 |
| 234 |
| 235 def InitializeDownload(self, http_request, http=None, client=None): |
| 236 """Initialize this download by making a request. |
| 237 |
| 238 Args: |
| 239 http_request: The HttpRequest to use to initialize this download. |
| 240 http: The httplib2.Http instance for this request. |
| 241 client: If provided, let this client process the final URL before |
| 242 sending any additional requests. If client is provided and |
| 243 http is not, client.http will be used instead. |
| 244 """ |
| 245 self.EnsureUninitialized() |
| 246 if http is None and client is None: |
| 247 raise exceptions.UserError('Must provide client or http.') |
| 248 http = http or client.http |
| 249 if client is not None: |
| 250 http_request.url = client.FinalizeTransferUrl(http_request.url) |
| 251 url = http_request.url |
| 252 if client is not None: |
| 253 url = client.FinalizeTransferUrl(url) |
| 254 self._Initialize(http, url) |
| 255 # Unless the user has requested otherwise, we want to just |
| 256 # go ahead and pump the bytes now. |
| 257 if self.auto_transfer: |
| 258 self.StreamInChunks() |
| 259 |
| 260 @staticmethod |
| 261 def _ArgPrinter(response, unused_download): |
| 262 if 'content-range' in response.info: |
| 263 print 'Received %s' % response.info['content-range'] |
| 264 else: |
| 265 print 'Received %d bytes' % response.length |
| 266 |
| 267 @staticmethod |
| 268 def _CompletePrinter(*unused_args): |
| 269 print 'Download complete' |
| 270 |
| 271 def __NormalizeStartEnd(self, start, end=None): |
| 272 if end is not None: |
| 273 if start < 0: |
| 274 raise exceptions.TransferInvalidError( |
| 275 'Cannot have end index with negative start index') |
| 276 elif start >= self.total_size: |
| 277 raise exceptions.TransferInvalidError( |
| 278 'Cannot have start index greater than total size') |
| 279 end = min(end, self.total_size - 1) |
| 280 if end < start: |
| 281 raise exceptions.TransferInvalidError( |
| 282 'Range requested with end[%s] < start[%s]' % (end, start)) |
| 283 return start, end |
| 284 else: |
| 285 if start < 0: |
| 286 start = max(0, start + self.total_size) |
| 287 return start, self.total_size |
| 288 |
| 289 def __SetRangeHeader(self, request, start, end=None): |
| 290 if start < 0: |
| 291 request.headers['range'] = 'bytes=%d' % start |
| 292 elif end is None: |
| 293 request.headers['range'] = 'bytes=%d-' % start |
| 294 else: |
| 295 request.headers['range'] = 'bytes=%d-%d' % (start, end) |
| 296 |
| 297 def __GetChunk(self, start, end=None, additional_headers=None): |
| 298 """Retrieve a chunk, and return the full response.""" |
| 299 self.EnsureInitialized() |
| 300 end_byte = end |
| 301 if self.total_size and end: |
| 302 end_byte = min(end, self.total_size) |
| 303 request = http_wrapper.Request(url=self.url) |
| 304 self.__SetRangeHeader(request, start, end=end_byte) |
| 305 if additional_headers is not None: |
| 306 request.headers.update(additional_headers) |
| 307 return http_wrapper.MakeRequest( |
| 308 self.bytes_http, request, retry_func=self.retry_func) |
| 309 |
| 310 def __ProcessResponse(self, response): |
| 311 """Process this response (by updating self and writing to self.stream).""" |
| 312 if response.status_code not in self._ACCEPTABLE_STATUSES: |
| 313 raise exceptions.TransferRetryError(response.content) |
| 314 if response.status_code in (httplib.OK, httplib.PARTIAL_CONTENT): |
| 315 self.stream.write(response.content) |
| 316 self.__progress += response.length |
| 317 if response.info and 'content-encoding' in response.info: |
| 318 # TODO: Handle the case where this changes over a download. |
| 319 self.__encoding = response.info['content-encoding'] |
| 320 elif response.status_code == httplib.NO_CONTENT: |
| 321 # It's important to write something to the stream for the case |
| 322 # of a 0-byte download to a file, as otherwise python won't |
| 323 # create the file. |
| 324 self.stream.write('') |
| 325 return response |
| 326 |
| 327 def GetRange(self, start, end=None, additional_headers=None): |
| 328 """Retrieve a given byte range from this download, inclusive. |
| 329 |
| 330 Range must be of one of these three forms: |
| 331 * 0 <= start, end = None: Fetch from start to the end of the file. |
| 332 * 0 <= start <= end: Fetch the bytes from start to end. |
| 333 * start < 0, end = None: Fetch the last -start bytes of the file. |
| 334 |
| 335 (These variations correspond to those described in the HTTP 1.1 |
| 336 protocol for range headers in RFC 2616, sec. 14.35.1.) |
| 337 |
| 338 Args: |
| 339 start: (int) Where to start fetching bytes. (See above.) |
| 340 end: (int, optional) Where to stop fetching bytes. (See above.) |
| 341 additional_headers: (bool, optional) Any additional headers to |
| 342 pass with the request. |
| 343 |
| 344 Returns: |
| 345 None. Streams bytes into self.stream. |
| 346 """ |
| 347 self.EnsureInitialized() |
| 348 progress_end_normalized = False |
| 349 if self.total_size is not None: |
| 350 progress, end = self.__NormalizeStartEnd(start, end) |
| 351 progress_end_normalized = True |
| 352 else: |
| 353 progress = start |
| 354 while not progress_end_normalized or progress < end: |
| 355 response = self.__GetChunk(progress, end=end, |
| 356 additional_headers=additional_headers) |
| 357 if not progress_end_normalized: |
| 358 self.__SetTotal(response.info) |
| 359 progress, end = self.__NormalizeStartEnd(start, end) |
| 360 progress_end_normalized = True |
| 361 response = self.__ProcessResponse(response) |
| 362 progress += response.length |
| 363 if not response: |
| 364 raise exceptions.TransferRetryError( |
| 365 'Zero bytes unexpectedly returned in download response') |
| 366 |
| 367 def StreamInChunks(self, callback=None, finish_callback=None, |
| 368 additional_headers=None): |
| 369 """Stream the entire download.""" |
| 370 callback = callback or self._ArgPrinter |
| 371 finish_callback = finish_callback or self._CompletePrinter |
| 372 |
| 373 self.EnsureInitialized() |
| 374 while True: |
| 375 if self.__initial_response is not None: |
| 376 response = self.__initial_response |
| 377 self.__initial_response = None |
| 378 else: |
| 379 response = self.__GetChunk(self.progress, |
| 380 additional_headers=additional_headers) |
| 381 response = self.__ProcessResponse(response) |
| 382 self._ExecuteCallback(callback, response) |
| 383 if (response.status_code == httplib.OK or |
| 384 self.progress >= self.total_size): |
| 385 break |
| 386 self._ExecuteCallback(finish_callback, response) |
| 387 |
| 388 |
| 389 class Upload(_Transfer): |
| 390 """Data for a single Upload. |
| 391 |
| 392 Fields: |
| 393 stream: The stream to upload. |
| 394 mime_type: MIME type of the upload. |
| 395 total_size: (optional) Total upload size for the stream. |
| 396 close_stream: (default: False) Whether or not we should close the |
| 397 stream when finished with the upload. |
| 398 auto_transfer: (default: True) If True, stream all bytes as soon as |
| 399 the upload is created. |
| 400 """ |
| 401 _REQUIRED_SERIALIZATION_KEYS = set(( |
| 402 'auto_transfer', 'mime_type', 'total_size', 'url')) |
| 403 |
| 404 def __init__(self, stream, mime_type, total_size=None, http=None, |
| 405 close_stream=False, chunksize=None, auto_transfer=True): |
| 406 super(Upload, self).__init__( |
| 407 stream, close_stream=close_stream, chunksize=chunksize, |
| 408 auto_transfer=auto_transfer, http=http) |
| 409 self.__complete = False |
| 410 self.__final_response = None |
| 411 self.__mime_type = mime_type |
| 412 self.__progress = 0 |
| 413 self.__server_chunk_granularity = None |
| 414 self.__strategy = None |
| 415 |
| 416 self.total_size = total_size |
| 417 |
| 418 @property |
| 419 def progress(self): |
| 420 return self.__progress |
| 421 |
| 422 @classmethod |
| 423 def FromFile(cls, filename, mime_type=None, auto_transfer=True): |
| 424 """Create a new Upload object from a filename.""" |
| 425 path = os.path.expanduser(filename) |
| 426 if not os.path.exists(path): |
| 427 raise exceptions.NotFoundError('Could not find file %s' % path) |
| 428 if not mime_type: |
| 429 mime_type, _ = mimetypes.guess_type(path) |
| 430 if mime_type is None: |
| 431 raise exceptions.InvalidUserInputError( |
| 432 'Could not guess mime type for %s' % path) |
| 433 size = os.stat(path).st_size |
| 434 return cls(open(path, 'rb'), mime_type, total_size=size, close_stream=True, |
| 435 auto_transfer=auto_transfer) |
| 436 |
| 437 @classmethod |
| 438 def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True): |
| 439 """Create a new Upload object from a stream.""" |
| 440 if mime_type is None: |
| 441 raise exceptions.InvalidUserInputError( |
| 442 'No mime_type specified for stream') |
| 443 return cls(stream, mime_type, total_size=total_size, close_stream=False, |
| 444 auto_transfer=auto_transfer) |
| 445 |
| 446 @classmethod |
| 447 def FromData(cls, stream, json_data, http, auto_transfer=None): |
| 448 """Create a new Upload of stream from serialized json_data using http.""" |
| 449 info = json.loads(json_data) |
| 450 missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) |
| 451 if missing_keys: |
| 452 raise exceptions.InvalidDataError( |
| 453 'Invalid serialization data, missing keys: %s' % ( |
| 454 ', '.join(missing_keys))) |
| 455 upload = cls.FromStream(stream, info['mime_type'], |
| 456 total_size=info.get('total_size')) |
| 457 if isinstance(stream, io.IOBase) and not stream.seekable(): |
| 458 raise exceptions.InvalidUserInputError( |
| 459 'Cannot restart resumable upload on non-seekable stream') |
| 460 if auto_transfer is not None: |
| 461 upload.auto_transfer = auto_transfer |
| 462 else: |
| 463 upload.auto_transfer = info['auto_transfer'] |
| 464 upload.strategy = _RESUMABLE_UPLOAD |
| 465 upload._Initialize(http, info['url']) # pylint: disable=protected-access |
| 466 upload.RefreshResumableUploadState() |
| 467 upload.EnsureInitialized() |
| 468 if upload.auto_transfer: |
| 469 upload.StreamInChunks() |
| 470 return upload |
| 471 |
| 472 @property |
| 473 def serialization_data(self): |
| 474 self.EnsureInitialized() |
| 475 if self.strategy != _RESUMABLE_UPLOAD: |
| 476 raise exceptions.InvalidDataError( |
| 477 'Serialization only supported for resumable uploads') |
| 478 return { |
| 479 'auto_transfer': self.auto_transfer, |
| 480 'mime_type': self.mime_type, |
| 481 'total_size': self.total_size, |
| 482 'url': self.url, |
| 483 } |
| 484 |
| 485 @property |
| 486 def complete(self): |
| 487 return self.__complete |
| 488 |
| 489 @property |
| 490 def mime_type(self): |
| 491 return self.__mime_type |
| 492 |
| 493 def __str__(self): |
| 494 if not self.initialized: |
| 495 return 'Upload (uninitialized)' |
| 496 else: |
| 497 return 'Upload with %d/%s bytes transferred for url %s' % ( |
| 498 self.progress, self.total_size or '???', self.url) |
| 499 |
| 500 @property |
| 501 def strategy(self): |
| 502 return self.__strategy |
| 503 |
| 504 @strategy.setter |
| 505 def strategy(self, value): |
| 506 if value not in (_SIMPLE_UPLOAD, _RESUMABLE_UPLOAD): |
| 507 raise exceptions.UserError(( |
| 508 'Invalid value "%s" for upload strategy, must be one of ' |
| 509 '"simple" or "resumable".') % value) |
| 510 self.__strategy = value |
| 511 |
| 512 @property |
| 513 def total_size(self): |
| 514 return self.__total_size |
| 515 |
| 516 @total_size.setter |
| 517 def total_size(self, value): |
| 518 self.EnsureUninitialized() |
| 519 self.__total_size = value |
| 520 |
| 521 def __SetDefaultUploadStrategy(self, upload_config, http_request): |
| 522 """Determine and set the default upload strategy for this upload. |
| 523 |
| 524 We generally prefer simple or multipart, unless we're forced to |
| 525 use resumable. This happens when any of (1) the upload is too |
| 526 large, (2) the simple endpoint doesn't support multipart requests |
| 527 and we have metadata, or (3) there is no simple upload endpoint. |
| 528 |
| 529 Args: |
| 530 upload_config: Configuration for the upload endpoint. |
| 531 http_request: The associated http request. |
| 532 |
| 533 Returns: |
| 534 None. |
| 535 """ |
| 536 if self.strategy is not None: |
| 537 return |
| 538 strategy = _SIMPLE_UPLOAD |
| 539 if (self.total_size is not None and |
| 540 self.total_size > _RESUMABLE_UPLOAD_THRESHOLD): |
| 541 strategy = _RESUMABLE_UPLOAD |
| 542 if http_request.body and not upload_config.simple_multipart: |
| 543 strategy = _RESUMABLE_UPLOAD |
| 544 if not upload_config.simple_path: |
| 545 strategy = _RESUMABLE_UPLOAD |
| 546 self.strategy = strategy |
| 547 |
| 548 def ConfigureRequest(self, upload_config, http_request, url_builder): |
| 549 """Configure the request and url for this upload.""" |
| 550 # Validate total_size vs. max_size |
| 551 if (self.total_size and upload_config.max_size and |
| 552 self.total_size > upload_config.max_size): |
| 553 raise exceptions.InvalidUserInputError( |
| 554 'Upload too big: %s larger than max size %s' % ( |
| 555 self.total_size, upload_config.max_size)) |
| 556 # Validate mime type |
| 557 if not mimeparse.best_match(upload_config.accept, self.mime_type): |
| 558 raise exceptions.InvalidUserInputError( |
| 559 'MIME type %s does not match any accepted MIME ranges %s' % ( |
| 560 self.mime_type, upload_config.accept)) |
| 561 |
| 562 self.__SetDefaultUploadStrategy(upload_config, http_request) |
| 563 if self.strategy == _SIMPLE_UPLOAD: |
| 564 url_builder.relative_path = upload_config.simple_path |
| 565 if http_request.body: |
| 566 url_builder.query_params['uploadType'] = 'multipart' |
| 567 self.__ConfigureMultipartRequest(http_request) |
| 568 else: |
| 569 url_builder.query_params['uploadType'] = 'media' |
| 570 self.__ConfigureMediaRequest(http_request) |
| 571 else: |
| 572 url_builder.relative_path = upload_config.resumable_path |
| 573 url_builder.query_params['uploadType'] = 'resumable' |
| 574 self.__ConfigureResumableRequest(http_request) |
| 575 |
| 576 def __ConfigureMediaRequest(self, http_request): |
| 577 """Configure http_request as a simple request for this upload.""" |
| 578 http_request.headers['content-type'] = self.mime_type |
| 579 http_request.body = self.stream.read() |
| 580 http_request.loggable_body = '<media body>' |
| 581 |
| 582 def __ConfigureMultipartRequest(self, http_request): |
| 583 """Configure http_request as a multipart request for this upload.""" |
| 584 # This is a multipart/related upload. |
| 585 msg_root = mime_multipart.MIMEMultipart('related') |
| 586 # msg_root should not write out its own headers |
| 587 setattr(msg_root, '_write_headers', lambda self: None) |
| 588 |
| 589 # attach the body as one part |
| 590 msg = mime_nonmultipart.MIMENonMultipart( |
| 591 *http_request.headers['content-type'].split('/')) |
| 592 msg.set_payload(http_request.body) |
| 593 msg_root.attach(msg) |
| 594 |
| 595 # attach the media as the second part |
| 596 msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/')) |
| 597 msg['Content-Transfer-Encoding'] = 'binary' |
| 598 msg.set_payload(self.stream.read()) |
| 599 msg_root.attach(msg) |
| 600 |
| 601 # encode the body: note that we can't use `as_string`, because |
| 602 # it plays games with `From ` lines. |
| 603 fp = StringIO.StringIO() |
| 604 g = email_generator.Generator(fp, mangle_from_=False) |
| 605 g.flatten(msg_root, unixfrom=False) |
| 606 http_request.body = fp.getvalue() |
| 607 |
| 608 multipart_boundary = msg_root.get_boundary() |
| 609 http_request.headers['content-type'] = ( |
| 610 'multipart/related; boundary=%r' % multipart_boundary) |
| 611 |
| 612 body_components = http_request.body.split(multipart_boundary) |
| 613 headers, _, _ = body_components[-2].partition('\n\n') |
| 614 body_components[-2] = '\n\n'.join([headers, '<media body>\n\n--']) |
| 615 http_request.loggable_body = multipart_boundary.join(body_components) |
| 616 |
| 617 def __ConfigureResumableRequest(self, http_request): |
| 618 http_request.headers['X-Upload-Content-Type'] = self.mime_type |
| 619 if self.total_size is not None: |
| 620 http_request.headers['X-Upload-Content-Length'] = str(self.total_size) |
| 621 |
| 622 def RefreshResumableUploadState(self): |
| 623 """Talk to the server and refresh the state of this resumable upload. |
| 624 |
| 625 Returns: |
| 626 Response if the upload is complete. |
| 627 """ |
| 628 if self.strategy != _RESUMABLE_UPLOAD: |
| 629 return |
| 630 self.EnsureInitialized() |
| 631 refresh_request = http_wrapper.Request( |
| 632 url=self.url, http_method='PUT', headers={'Content-Range': 'bytes */*'}) |
| 633 refresh_response = http_wrapper.MakeRequest( |
| 634 self.http, refresh_request, redirections=0) |
| 635 range_header = self._GetRangeHeaderFromResponse(refresh_response) |
| 636 if refresh_response.status_code in (httplib.OK, httplib.CREATED): |
| 637 self.__complete = True |
| 638 self.__progress = self.total_size |
| 639 self.stream.seek(self.progress) |
| 640 # If we're finished, the refresh response will contain the metadata |
| 641 # originally requested. Cache it so it can be returned in StreamInChunks. |
| 642 self.__final_response = refresh_response |
| 643 elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE: |
| 644 if range_header is None: |
| 645 self.__progress = 0 |
| 646 else: |
| 647 self.__progress = self.__GetLastByte(range_header) + 1 |
| 648 self.stream.seek(self.progress) |
| 649 else: |
| 650 raise exceptions.HttpError.FromResponse(refresh_response) |
| 651 |
| 652 def _GetRangeHeaderFromResponse(self, response): |
| 653 return response.info.get('Range', response.info.get('range')) |
| 654 |
| 655 def InitializeUpload(self, http_request, http=None, client=None): |
| 656 """Initialize this upload from the given http_request.""" |
| 657 if self.strategy is None: |
| 658 raise exceptions.UserError( |
| 659 'No upload strategy set; did you call ConfigureRequest?') |
| 660 if http is None and client is None: |
| 661 raise exceptions.UserError('Must provide client or http.') |
| 662 if self.strategy != _RESUMABLE_UPLOAD: |
| 663 return |
| 664 if self.total_size is None: |
| 665 raise exceptions.InvalidUserInputError( |
| 666 'Cannot stream upload without total size') |
| 667 http = http or client.http |
| 668 if client is not None: |
| 669 http_request.url = client.FinalizeTransferUrl(http_request.url) |
| 670 self.EnsureUninitialized() |
| 671 http_response = http_wrapper.MakeRequest(http, http_request) |
| 672 if http_response.status_code != httplib.OK: |
| 673 raise exceptions.HttpError.FromResponse(http_response) |
| 674 |
| 675 self.__server_chunk_granularity = http_response.info.get( |
| 676 'X-Goog-Upload-Chunk-Granularity') |
| 677 self.__ValidateChunksize() |
| 678 url = http_response.info['location'] |
| 679 if client is not None: |
| 680 url = client.FinalizeTransferUrl(url) |
| 681 self._Initialize(http, url) |
| 682 |
| 683 # Unless the user has requested otherwise, we want to just |
| 684 # go ahead and pump the bytes now. |
| 685 if self.auto_transfer: |
| 686 return self.StreamInChunks() |
| 687 |
| 688 def __GetLastByte(self, range_header): |
| 689 _, _, end = range_header.partition('-') |
| 690 # TODO: Validate start == 0? |
| 691 return int(end) |
| 692 |
| 693 def __ValidateChunksize(self, chunksize=None): |
| 694 if self.__server_chunk_granularity is None: |
| 695 return |
| 696 chunksize = chunksize or self.chunksize |
| 697 if chunksize % self.__server_chunk_granularity: |
| 698 raise exceptions.ConfigurationValueError( |
| 699 'Server requires chunksize to be a multiple of %d', |
| 700 self.__server_chunk_granularity) |
| 701 |
| 702 @staticmethod |
| 703 def _ArgPrinter(response, unused_upload): |
| 704 print 'Sent %s' % response.info['range'] |
| 705 |
| 706 @staticmethod |
| 707 def _CompletePrinter(*unused_args): |
| 708 print 'Upload complete' |
| 709 |
| 710 def StreamInChunks(self, callback=None, finish_callback=None, |
| 711 additional_headers=None): |
| 712 """Send this (resumable) upload in chunks.""" |
| 713 if self.strategy != _RESUMABLE_UPLOAD: |
| 714 raise exceptions.InvalidUserInputError( |
| 715 'Cannot stream non-resumable upload') |
| 716 if self.total_size is None: |
| 717 raise exceptions.InvalidUserInputError( |
| 718 'Cannot stream upload without total size') |
| 719 callback = callback or self._ArgPrinter |
| 720 finish_callback = finish_callback or self._CompletePrinter |
| 721 # final_response is set if we resumed an already-completed upload. |
| 722 response = self.__final_response |
| 723 self.__ValidateChunksize(self.chunksize) |
| 724 self.EnsureInitialized() |
| 725 while not self.complete: |
| 726 response = self.__SendChunk(self.stream.tell(), |
| 727 additional_headers=additional_headers) |
| 728 if response.status_code in (httplib.OK, httplib.CREATED): |
| 729 self.__complete = True |
| 730 break |
| 731 self.__progress = self.__GetLastByte(response.info['range']) |
| 732 if self.progress + 1 != self.stream.tell(): |
| 733 # TODO: Add a better way to recover here. |
| 734 raise exceptions.CommunicationError( |
| 735 'Failed to transfer all bytes in chunk, upload paused at byte ' |
| 736 '%d' % self.progress) |
| 737 self._ExecuteCallback(callback, response) |
| 738 if self.__complete: |
| 739 # TODO: Decide how to handle errors in the non-seekable case. |
| 740 current_pos = self.stream.tell() |
| 741 self.stream.seek(0, os.SEEK_END) |
| 742 end_pos = self.stream.tell() |
| 743 self.stream.seek(current_pos) |
| 744 if current_pos != end_pos: |
| 745 raise exceptions.TransferInvalidError( |
| 746 'Upload complete with %s additional bytes left in stream' % |
| 747 (long(end_pos) - long(current_pos))) |
| 748 self._ExecuteCallback(finish_callback, response) |
| 749 return response |
| 750 |
| 751 def __SendChunk(self, start, additional_headers=None): |
| 752 """Send the specified chunk.""" |
| 753 self.EnsureInitialized() |
| 754 end = min(start + self.chunksize, self.total_size) |
| 755 body_stream = stream_slice.StreamSlice(self.stream, end - start) |
| 756 # TODO: Think about clearer errors on "no data in stream". |
| 757 |
| 758 request = http_wrapper.Request(url=self.url, http_method='PUT', |
| 759 body=body_stream) |
| 760 request.headers['Content-Type'] = self.mime_type |
| 761 request.headers['Content-Range'] = 'bytes %s-%s/%s' % ( |
| 762 start, end - 1, self.total_size) |
| 763 if additional_headers: |
| 764 request.headers.update(additional_headers) |
| 765 |
| 766 response = http_wrapper.MakeRequest( |
| 767 self.bytes_http, request, retry_func=self.retry_func) |
| 768 if response.status_code not in (httplib.OK, httplib.CREATED, |
| 769 http_wrapper.RESUME_INCOMPLETE): |
| 770 # We want to reset our state to wherever the server left us |
| 771 # before this failed request, and then raise. |
| 772 self.RefreshResumableUploadState() |
| 773 raise exceptions.HttpError.FromResponse(response) |
| 774 if response.status_code == http_wrapper.RESUME_INCOMPLETE: |
| 775 last_byte = self.__GetLastByte( |
| 776 self._GetRangeHeaderFromResponse(response)) |
| 777 if last_byte + 1 != end: |
| 778 self.stream.seek(last_byte) |
| 779 return response |
| OLD | NEW |