| Index: third_party/gsutil/gslib/boto_translation.py
|
| diff --git a/third_party/gsutil/gslib/boto_translation.py b/third_party/gsutil/gslib/boto_translation.py
|
| index 92013904c47ffa66a82776e3d2f6da29ebcff54b..edb69ddcb2e1ddab89a9982a4102c4528b058344 100644
|
| --- a/third_party/gsutil/gslib/boto_translation.py
|
| +++ b/third_party/gsutil/gslib/boto_translation.py
|
| @@ -30,6 +30,7 @@ import re
|
| import socket
|
| import tempfile
|
| import textwrap
|
| +import threading
|
| import time
|
| import xml
|
| from xml.dom.minidom import parseString as XmlParseString
|
| @@ -74,6 +75,7 @@ from gslib.translation_helper import AclTranslation
|
| from gslib.translation_helper import AddS3MarkerAclToObjectMetadata
|
| from gslib.translation_helper import CorsTranslation
|
| from gslib.translation_helper import CreateBucketNotFoundException
|
| +from gslib.translation_helper import CreateNotFoundExceptionForObjectWrite
|
| from gslib.translation_helper import CreateObjectNotFoundException
|
| from gslib.translation_helper import DEFAULT_CONTENT_TYPE
|
| from gslib.translation_helper import EncodeStringAsLong
|
| @@ -84,10 +86,8 @@ from gslib.translation_helper import REMOVE_CORS_CONFIG
|
| from gslib.translation_helper import S3MarkerAclFromObjectMetadata
|
| from gslib.util import ConfigureNoOpAuthIfNeeded
|
| from gslib.util import DEFAULT_FILE_BUFFER_SIZE
|
| -from gslib.util import GetFileSize
|
| from gslib.util import GetMaxRetryDelay
|
| from gslib.util import GetNumRetries
|
| -from gslib.util import MultiprocessingIsAvailable
|
| from gslib.util import S3_DELETE_MARKER_GUID
|
| from gslib.util import TWO_MIB
|
| from gslib.util import UnaryDictToXml
|
| @@ -101,8 +101,11 @@ TRANSLATABLE_BOTO_EXCEPTIONS = (boto.exception.BotoServerError,
|
| boto.exception.StorageCreateError,
|
| boto.exception.StorageResponseError)
|
|
|
| -# If multiprocessing is available, this will be overridden to a (thread-safe)
|
| -# multiprocessing.Value in a call to InitializeMultiprocessingVariables.
|
| +# pylint: disable=global-at-module-level
|
| +global boto_auth_initialized, boto_auth_initialized_lock
|
| +# If multiprocessing is available, these will be overridden to process-safe
|
| +# variables in InitializeMultiprocessingVariables.
|
| +boto_auth_initialized_lock = threading.Lock()
|
| boto_auth_initialized = False
|
|
|
| NON_EXISTENT_OBJECT_REGEX = re.compile(r'.*non-\s*existent\s*object',
|
| @@ -111,16 +114,36 @@ NON_EXISTENT_OBJECT_REGEX = re.compile(r'.*non-\s*existent\s*object',
|
| MD5_REGEX = re.compile(r'^"*[a-fA-F0-9]{32}"*$')
|
|
|
|
|
| -def InitializeMultiprocessingVariables():
|
| +def InitializeMultiprocessingVariables(): # pylint: disable=invalid-name
|
| """Perform necessary initialization for multiprocessing.
|
|
|
| See gslib.command.InitializeMultiprocessingVariables for an explanation
|
| of why this is necessary.
|
| """
|
| - global boto_auth_initialized # pylint: disable=global-variable-undefined
|
| + # pylint: disable=global-variable-undefined
|
| + global boto_auth_initialized, boto_auth_initialized_lock
|
| + boto_auth_initialized_lock = gslib.util.CreateLock()
|
| boto_auth_initialized = multiprocessing.Value('i', 0)
|
|
|
|
|
| +class DownloadProxyCallbackHandler(object):
|
| + """Intermediary callback to keep track of the number of bytes downloaded."""
|
| +
|
| + def __init__(self, start_byte, callback):
|
| + self._start_byte = start_byte
|
| + self._callback = callback
|
| +
|
| + def call(self, bytes_downloaded, total_size):
|
| + """Saves necessary data and then calls the given Cloud API callback.
|
| +
|
| + Args:
|
| + bytes_downloaded: Number of bytes processed so far.
|
| + total_size: Total size of the ongoing operation.
|
| + """
|
| + if self._callback:
|
| + self._callback(self._start_byte + bytes_downloaded, total_size)
|
| +
|
| +
|
| class BotoTranslation(CloudApi):
|
| """Boto-based XML translation implementation of gsutil Cloud API.
|
|
|
| @@ -130,7 +153,7 @@ class BotoTranslation(CloudApi):
|
| """
|
|
|
| def __init__(self, bucket_storage_uri_class, logger, provider=None,
|
| - credentials=None, debug=0):
|
| + credentials=None, debug=0, trace_token=None):
|
| """Performs necessary setup for interacting with the cloud storage provider.
|
|
|
| Args:
|
| @@ -142,17 +165,19 @@ class BotoTranslation(CloudApi):
|
| the provider argument and use this one instead.
|
| credentials: Unused.
|
| debug: Debug level for the API implementation (0..3).
|
| + trace_token: Unused in this subclass.
|
| """
|
| super(BotoTranslation, self).__init__(bucket_storage_uri_class, logger,
|
| provider=provider, debug=debug)
|
| _ = credentials
|
| - global boto_auth_initialized # pylint: disable=global-variable-undefined
|
| - if MultiprocessingIsAvailable()[0] and not boto_auth_initialized.value:
|
| - ConfigureNoOpAuthIfNeeded()
|
| - boto_auth_initialized.value = 1
|
| - elif not boto_auth_initialized:
|
| + # pylint: disable=global-variable-undefined, global-variable-not-assigned
|
| + global boto_auth_initialized, boto_auth_initialized_lock
|
| + with boto_auth_initialized_lock:
|
| ConfigureNoOpAuthIfNeeded()
|
| - boto_auth_initialized = True
|
| + if isinstance(boto_auth_initialized, bool):
|
| + boto_auth_initialized = True
|
| + else:
|
| + boto_auth_initialized.value = 1
|
| self.api_version = boto.config.get_value(
|
| 'GSUtil', 'default_api_version', '1')
|
|
|
| @@ -318,8 +343,10 @@ class BotoTranslation(CloudApi):
|
| _ = provider
|
| get_fields = self._ListToGetFields(list_fields=fields)
|
| bucket_uri = self._StorageUriForBucket(bucket_name)
|
| - prefix_list = []
|
| headers = {}
|
| + yield_prefixes = fields is None or 'prefixes' in fields
|
| + yield_objects = fields is None or any(
|
| + field.startswith('items/') for field in fields)
|
| self._AddApiVersionToHeaders(headers)
|
| try:
|
| objects_iter = bucket_uri.list_bucket(prefix=prefix or '',
|
| @@ -331,11 +358,10 @@ class BotoTranslation(CloudApi):
|
|
|
| try:
|
| for key in objects_iter:
|
| - if isinstance(key, Prefix):
|
| - prefix_list.append(key.name)
|
| + if yield_prefixes and isinstance(key, Prefix):
|
| yield CloudApi.CsObjectOrPrefix(key.name,
|
| CloudApi.CsObjectOrPrefixType.PREFIX)
|
| - else:
|
| + elif yield_objects:
|
| key_to_convert = key
|
|
|
| # Listed keys are populated with these fields during bucket listing.
|
| @@ -407,11 +433,11 @@ class BotoTranslation(CloudApi):
|
| self._AddApiVersionToHeaders(headers)
|
| if 'accept-encoding' not in headers:
|
| headers['accept-encoding'] = 'gzip'
|
| - if end_byte:
|
| + if end_byte is not None:
|
| headers['range'] = 'bytes=%s-%s' % (start_byte, end_byte)
|
| elif start_byte > 0:
|
| headers['range'] = 'bytes=%s-' % (start_byte)
|
| - else:
|
| + elif start_byte < 0:
|
| headers['range'] = 'bytes=%s' % (start_byte)
|
|
|
| # Since in most cases we already made a call to get the object metadata,
|
| @@ -443,7 +469,8 @@ class BotoTranslation(CloudApi):
|
| try:
|
| if download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
|
| self._PerformResumableDownload(
|
| - download_stream, key, headers=headers, callback=progress_callback,
|
| + download_stream, start_byte, end_byte, key,
|
| + headers=headers, callback=progress_callback,
|
| num_callbacks=num_progress_callbacks, hash_algs=hash_algs)
|
| elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
|
| self._PerformSimpleDownload(
|
| @@ -505,13 +532,16 @@ class BotoTranslation(CloudApi):
|
| key.get_contents_to_file(download_stream, cb=progress_callback,
|
| num_cb=num_progress_callbacks, headers=headers)
|
|
|
| - def _PerformResumableDownload(self, fp, key, headers=None, callback=None,
|
| + def _PerformResumableDownload(self, fp, start_byte, end_byte, key,
|
| + headers=None, callback=None,
|
| num_callbacks=XML_PROGRESS_CALLBACKS,
|
| hash_algs=None):
|
| """Downloads bytes from key to fp, resuming as needed.
|
|
|
| Args:
|
| - fp: File pointer into which data should be downloaded
|
| + fp: File pointer into which data should be downloaded.
|
| + start_byte: Start byte of the download.
|
| + end_byte: End byte of the download.
|
| key: Key object from which data is to be downloaded
|
| headers: Headers to send when retrieving the file
|
| callback: (optional) a callback function that will be called to report
|
| @@ -540,37 +570,22 @@ class BotoTranslation(CloudApi):
|
|
|
| num_retries = GetNumRetries()
|
| progress_less_iterations = 0
|
| + last_progress_byte = start_byte
|
|
|
| while True: # Retry as long as we're making progress.
|
| - had_file_bytes_before_attempt = GetFileSize(fp)
|
| try:
|
| - cur_file_size = GetFileSize(fp, position_to_eof=True)
|
| -
|
| - def DownloadProxyCallback(total_bytes_downloaded, total_size):
|
| - """Translates a boto callback into a gsutil Cloud API callback.
|
| -
|
| - Callbacks are originally made by boto.s3.Key.get_file(); here we take
|
| - into account that we're resuming a download.
|
| -
|
| - Args:
|
| - total_bytes_downloaded: Actual bytes downloaded so far, not
|
| - including the point we resumed from.
|
| - total_size: Total size of the download.
|
| - """
|
| - if callback:
|
| - callback(cur_file_size + total_bytes_downloaded, total_size)
|
| -
|
| + cb_handler = DownloadProxyCallbackHandler(start_byte, callback)
|
| headers = headers.copy()
|
| - headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1)
|
| - cb = DownloadProxyCallback
|
| + headers['Range'] = 'bytes=%d-%d' % (start_byte, end_byte)
|
|
|
| # Disable AWSAuthConnection-level retry behavior, since that would
|
| # cause downloads to restart from scratch.
|
| try:
|
| - key.get_file(fp, headers, cb, num_callbacks, override_num_retries=0,
|
| - hash_algs=hash_algs)
|
| + key.get_file(fp, headers, cb_handler.call, num_callbacks,
|
| + override_num_retries=0, hash_algs=hash_algs)
|
| except TypeError:
|
| - key.get_file(fp, headers, cb, num_callbacks, override_num_retries=0)
|
| + key.get_file(fp, headers, cb_handler.call, num_callbacks,
|
| + override_num_retries=0)
|
| fp.flush()
|
| # Download succeeded.
|
| return
|
| @@ -583,9 +598,10 @@ class BotoTranslation(CloudApi):
|
| # so we need to close and reopen the key before resuming
|
| # the download.
|
| if self.provider == 's3':
|
| - key.get_file(fp, headers, cb, num_callbacks, override_num_retries=0)
|
| + key.get_file(fp, headers, cb_handler.call, num_callbacks,
|
| + override_num_retries=0)
|
| else: # self.provider == 'gs'
|
| - key.get_file(fp, headers, cb, num_callbacks,
|
| + key.get_file(fp, headers, cb_handler.call, num_callbacks,
|
| override_num_retries=0, hash_algs=hash_algs)
|
| except BotoResumableDownloadException, e:
|
| if (e.disposition ==
|
| @@ -597,7 +613,9 @@ class BotoTranslation(CloudApi):
|
| 'retry', e.message)
|
|
|
| # At this point we had a re-tryable failure; see if made progress.
|
| - if GetFileSize(fp) > had_file_bytes_before_attempt:
|
| + start_byte = fp.tell()
|
| + if start_byte > last_progress_byte:
|
| + last_progress_byte = start_byte
|
| progress_less_iterations = 0
|
| else:
|
| progress_less_iterations += 1
|
| @@ -819,8 +837,11 @@ class BotoTranslation(CloudApi):
|
| return self._HandleSuccessfulUpload(dst_uri, object_metadata,
|
| fields=fields)
|
| except TRANSLATABLE_BOTO_EXCEPTIONS, e:
|
| + not_found_exception = CreateNotFoundExceptionForObjectWrite(
|
| + self.provider, object_metadata.bucket)
|
| self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket,
|
| - object_name=object_metadata.name)
|
| + object_name=object_metadata.name,
|
| + not_found_exception=not_found_exception)
|
|
|
| def UploadObjectStreaming(self, upload_stream, object_metadata,
|
| canned_acl=None, progress_callback=None,
|
| @@ -836,8 +857,11 @@ class BotoTranslation(CloudApi):
|
| return self._HandleSuccessfulUpload(dst_uri, object_metadata,
|
| fields=fields)
|
| except TRANSLATABLE_BOTO_EXCEPTIONS, e:
|
| + not_found_exception = CreateNotFoundExceptionForObjectWrite(
|
| + self.provider, object_metadata.bucket)
|
| self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket,
|
| - object_name=object_metadata.name)
|
| + object_name=object_metadata.name,
|
| + not_found_exception=not_found_exception)
|
|
|
| def UploadObject(self, upload_stream, object_metadata, canned_acl=None,
|
| preconditions=None, size=None, progress_callback=None,
|
| @@ -860,8 +884,11 @@ class BotoTranslation(CloudApi):
|
| return self._HandleSuccessfulUpload(dst_uri, object_metadata,
|
| fields=fields)
|
| except TRANSLATABLE_BOTO_EXCEPTIONS, e:
|
| + not_found_exception = CreateNotFoundExceptionForObjectWrite(
|
| + self.provider, object_metadata.bucket)
|
| self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket,
|
| - object_name=object_metadata.name)
|
| + object_name=object_metadata.name,
|
| + not_found_exception=not_found_exception)
|
|
|
| def DeleteObject(self, bucket_name, object_name, preconditions=None,
|
| generation=None, provider=None):
|
| @@ -920,8 +947,13 @@ class BotoTranslation(CloudApi):
|
|
|
| return self._BotoKeyToObject(new_key, fields=fields)
|
| except TRANSLATABLE_BOTO_EXCEPTIONS, e:
|
| - self._TranslateExceptionAndRaise(e, dst_obj_metadata.bucket,
|
| - dst_obj_metadata.name)
|
| + not_found_exception = CreateNotFoundExceptionForObjectWrite(
|
| + self.provider, dst_obj_metadata.bucket, src_provider=self.provider,
|
| + src_bucket_name=src_obj_metadata.bucket,
|
| + src_object_name=src_obj_metadata.name, src_generation=src_generation)
|
| + self._TranslateExceptionAndRaise(e, bucket_name=dst_obj_metadata.bucket,
|
| + object_name=dst_obj_metadata.name,
|
| + not_found_exception=not_found_exception)
|
|
|
| def ComposeObject(self, src_objs_metadata, dst_obj_metadata,
|
| preconditions=None, provider=None, fields=None):
|
| @@ -1382,7 +1414,7 @@ class BotoTranslation(CloudApi):
|
| raise
|
|
|
| def _TranslateExceptionAndRaise(self, e, bucket_name=None, object_name=None,
|
| - generation=None):
|
| + generation=None, not_found_exception=None):
|
| """Translates a Boto exception and raises the translated or original value.
|
|
|
| Args:
|
| @@ -1390,6 +1422,7 @@ class BotoTranslation(CloudApi):
|
| bucket_name: Optional bucket name in request that caused the exception.
|
| object_name: Optional object name in request that caused the exception.
|
| generation: Optional generation in request that caused the exception.
|
| + not_found_exception: Optional exception to raise in the not-found case.
|
|
|
| Raises:
|
| Translated CloudApi exception, or the original exception if it was not
|
| @@ -1397,14 +1430,14 @@ class BotoTranslation(CloudApi):
|
| """
|
| translated_exception = self._TranslateBotoException(
|
| e, bucket_name=bucket_name, object_name=object_name,
|
| - generation=generation)
|
| + generation=generation, not_found_exception=not_found_exception)
|
| if translated_exception:
|
| raise translated_exception
|
| else:
|
| raise
|
|
|
| def _TranslateBotoException(self, e, bucket_name=None, object_name=None,
|
| - generation=None):
|
| + generation=None, not_found_exception=None):
|
| """Translates boto exceptions into their gsutil Cloud API equivalents.
|
|
|
| Args:
|
| @@ -1412,6 +1445,7 @@ class BotoTranslation(CloudApi):
|
| bucket_name: Optional bucket name in request that caused the exception.
|
| object_name: Optional object name in request that caused the exception.
|
| generation: Optional generation in request that caused the exception.
|
| + not_found_exception: Optional exception to raise in the not-found case.
|
|
|
| Returns:
|
| CloudStorageApiServiceException for translatable exceptions, None
|
| @@ -1425,14 +1459,20 @@ class BotoTranslation(CloudApi):
|
| elif e.status == 401 or e.status == 403:
|
| return AccessDeniedException(e.code, status=e.status, body=e.body)
|
| elif e.status == 404:
|
| - if bucket_name:
|
| + if not_found_exception:
|
| + # The exception is pre-constructed prior to translation; the HTTP
|
| + # status code isn't available at that time.
|
| + setattr(not_found_exception, 'status', e.status)
|
| + return not_found_exception
|
| + elif bucket_name:
|
| if object_name:
|
| return CreateObjectNotFoundException(e.status, self.provider,
|
| bucket_name, object_name,
|
| generation=generation)
|
| return CreateBucketNotFoundException(e.status, self.provider,
|
| bucket_name)
|
| - return NotFoundException(e.code, status=e.status, body=e.body)
|
| + return NotFoundException(e.message, status=e.status, body=e.body)
|
| +
|
| elif e.status == 409 and e.code and 'BucketNotEmpty' in e.code:
|
| return NotEmptyException('BucketNotEmpty (%s)' % bucket_name,
|
| status=e.status, body=e.body)
|
|
|