Index: tools/telemetry/third_party/gsutilz/gslib/boto_translation.py |
diff --git a/tools/telemetry/third_party/gsutilz/gslib/boto_translation.py b/tools/telemetry/third_party/gsutilz/gslib/boto_translation.py |
index 92013904c47ffa66a82776e3d2f6da29ebcff54b..edb69ddcb2e1ddab89a9982a4102c4528b058344 100644 |
--- a/tools/telemetry/third_party/gsutilz/gslib/boto_translation.py |
+++ b/tools/telemetry/third_party/gsutilz/gslib/boto_translation.py |
@@ -30,6 +30,7 @@ import re |
import socket |
import tempfile |
import textwrap |
+import threading |
import time |
import xml |
from xml.dom.minidom import parseString as XmlParseString |
@@ -74,6 +75,7 @@ from gslib.translation_helper import AclTranslation |
from gslib.translation_helper import AddS3MarkerAclToObjectMetadata |
from gslib.translation_helper import CorsTranslation |
from gslib.translation_helper import CreateBucketNotFoundException |
+from gslib.translation_helper import CreateNotFoundExceptionForObjectWrite |
from gslib.translation_helper import CreateObjectNotFoundException |
from gslib.translation_helper import DEFAULT_CONTENT_TYPE |
from gslib.translation_helper import EncodeStringAsLong |
@@ -84,10 +86,8 @@ from gslib.translation_helper import REMOVE_CORS_CONFIG |
from gslib.translation_helper import S3MarkerAclFromObjectMetadata |
from gslib.util import ConfigureNoOpAuthIfNeeded |
from gslib.util import DEFAULT_FILE_BUFFER_SIZE |
-from gslib.util import GetFileSize |
from gslib.util import GetMaxRetryDelay |
from gslib.util import GetNumRetries |
-from gslib.util import MultiprocessingIsAvailable |
from gslib.util import S3_DELETE_MARKER_GUID |
from gslib.util import TWO_MIB |
from gslib.util import UnaryDictToXml |
@@ -101,8 +101,11 @@ TRANSLATABLE_BOTO_EXCEPTIONS = (boto.exception.BotoServerError, |
boto.exception.StorageCreateError, |
boto.exception.StorageResponseError) |
-# If multiprocessing is available, this will be overridden to a (thread-safe) |
-# multiprocessing.Value in a call to InitializeMultiprocessingVariables. |
+# pylint: disable=global-at-module-level |
+global boto_auth_initialized, boto_auth_initialized_lock |
+# If multiprocessing is available, these will be overridden to process-safe |
+# variables in InitializeMultiprocessingVariables. |
+boto_auth_initialized_lock = threading.Lock() |
boto_auth_initialized = False |
NON_EXISTENT_OBJECT_REGEX = re.compile(r'.*non-\s*existent\s*object', |
@@ -111,16 +114,36 @@ NON_EXISTENT_OBJECT_REGEX = re.compile(r'.*non-\s*existent\s*object', |
MD5_REGEX = re.compile(r'^"*[a-fA-F0-9]{32}"*$') |
-def InitializeMultiprocessingVariables(): |
+def InitializeMultiprocessingVariables(): # pylint: disable=invalid-name |
"""Perform necessary initialization for multiprocessing. |
See gslib.command.InitializeMultiprocessingVariables for an explanation |
of why this is necessary. |
""" |
- global boto_auth_initialized # pylint: disable=global-variable-undefined |
+ # pylint: disable=global-variable-undefined |
+ global boto_auth_initialized, boto_auth_initialized_lock |
+ boto_auth_initialized_lock = gslib.util.CreateLock() |
boto_auth_initialized = multiprocessing.Value('i', 0) |
+class DownloadProxyCallbackHandler(object): |
+ """Intermediary callback to keep track of the number of bytes downloaded.""" |
+ |
+ def __init__(self, start_byte, callback): |
+ self._start_byte = start_byte |
+ self._callback = callback |
+ |
+ def call(self, bytes_downloaded, total_size): |
+ """Saves necessary data and then calls the given Cloud API callback. |
+ |
+ Args: |
+ bytes_downloaded: Number of bytes processed so far. |
+ total_size: Total size of the ongoing operation. |
+ """ |
+ if self._callback: |
+ self._callback(self._start_byte + bytes_downloaded, total_size) |
+ |
+ |
class BotoTranslation(CloudApi): |
"""Boto-based XML translation implementation of gsutil Cloud API. |
@@ -130,7 +153,7 @@ class BotoTranslation(CloudApi): |
""" |
def __init__(self, bucket_storage_uri_class, logger, provider=None, |
- credentials=None, debug=0): |
+ credentials=None, debug=0, trace_token=None): |
"""Performs necessary setup for interacting with the cloud storage provider. |
Args: |
@@ -142,17 +165,19 @@ class BotoTranslation(CloudApi): |
the provider argument and use this one instead. |
credentials: Unused. |
debug: Debug level for the API implementation (0..3). |
+ trace_token: Unused in this subclass. |
""" |
super(BotoTranslation, self).__init__(bucket_storage_uri_class, logger, |
provider=provider, debug=debug) |
_ = credentials |
- global boto_auth_initialized # pylint: disable=global-variable-undefined |
- if MultiprocessingIsAvailable()[0] and not boto_auth_initialized.value: |
- ConfigureNoOpAuthIfNeeded() |
- boto_auth_initialized.value = 1 |
- elif not boto_auth_initialized: |
+ # pylint: disable=global-variable-undefined, global-variable-not-assigned |
+ global boto_auth_initialized, boto_auth_initialized_lock |
+ with boto_auth_initialized_lock: |
ConfigureNoOpAuthIfNeeded() |
- boto_auth_initialized = True |
+ if isinstance(boto_auth_initialized, bool): |
+ boto_auth_initialized = True |
+ else: |
+ boto_auth_initialized.value = 1 |
self.api_version = boto.config.get_value( |
'GSUtil', 'default_api_version', '1') |
@@ -318,8 +343,10 @@ class BotoTranslation(CloudApi): |
_ = provider |
get_fields = self._ListToGetFields(list_fields=fields) |
bucket_uri = self._StorageUriForBucket(bucket_name) |
- prefix_list = [] |
headers = {} |
+ yield_prefixes = fields is None or 'prefixes' in fields |
+ yield_objects = fields is None or any( |
+ field.startswith('items/') for field in fields) |
self._AddApiVersionToHeaders(headers) |
try: |
objects_iter = bucket_uri.list_bucket(prefix=prefix or '', |
@@ -331,11 +358,10 @@ class BotoTranslation(CloudApi): |
try: |
for key in objects_iter: |
- if isinstance(key, Prefix): |
- prefix_list.append(key.name) |
+ if yield_prefixes and isinstance(key, Prefix): |
yield CloudApi.CsObjectOrPrefix(key.name, |
CloudApi.CsObjectOrPrefixType.PREFIX) |
- else: |
+ elif yield_objects: |
key_to_convert = key |
# Listed keys are populated with these fields during bucket listing. |
@@ -407,11 +433,11 @@ class BotoTranslation(CloudApi): |
self._AddApiVersionToHeaders(headers) |
if 'accept-encoding' not in headers: |
headers['accept-encoding'] = 'gzip' |
- if end_byte: |
+ if end_byte is not None: |
headers['range'] = 'bytes=%s-%s' % (start_byte, end_byte) |
elif start_byte > 0: |
headers['range'] = 'bytes=%s-' % (start_byte) |
- else: |
+ elif start_byte < 0: |
headers['range'] = 'bytes=%s' % (start_byte) |
# Since in most cases we already made a call to get the object metadata, |
@@ -443,7 +469,8 @@ class BotoTranslation(CloudApi): |
try: |
if download_strategy is CloudApi.DownloadStrategy.RESUMABLE: |
self._PerformResumableDownload( |
- download_stream, key, headers=headers, callback=progress_callback, |
+ download_stream, start_byte, end_byte, key, |
+ headers=headers, callback=progress_callback, |
num_callbacks=num_progress_callbacks, hash_algs=hash_algs) |
elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: |
self._PerformSimpleDownload( |
@@ -505,13 +532,16 @@ class BotoTranslation(CloudApi): |
key.get_contents_to_file(download_stream, cb=progress_callback, |
num_cb=num_progress_callbacks, headers=headers) |
- def _PerformResumableDownload(self, fp, key, headers=None, callback=None, |
+ def _PerformResumableDownload(self, fp, start_byte, end_byte, key, |
+ headers=None, callback=None, |
num_callbacks=XML_PROGRESS_CALLBACKS, |
hash_algs=None): |
"""Downloads bytes from key to fp, resuming as needed. |
Args: |
- fp: File pointer into which data should be downloaded |
+ fp: File pointer into which data should be downloaded. |
+ start_byte: Start byte of the download. |
+ end_byte: End byte of the download. |
key: Key object from which data is to be downloaded |
headers: Headers to send when retrieving the file |
callback: (optional) a callback function that will be called to report |
@@ -540,37 +570,22 @@ class BotoTranslation(CloudApi): |
num_retries = GetNumRetries() |
progress_less_iterations = 0 |
+ last_progress_byte = start_byte |
while True: # Retry as long as we're making progress. |
- had_file_bytes_before_attempt = GetFileSize(fp) |
try: |
- cur_file_size = GetFileSize(fp, position_to_eof=True) |
- |
- def DownloadProxyCallback(total_bytes_downloaded, total_size): |
- """Translates a boto callback into a gsutil Cloud API callback. |
- |
- Callbacks are originally made by boto.s3.Key.get_file(); here we take |
- into account that we're resuming a download. |
- |
- Args: |
- total_bytes_downloaded: Actual bytes downloaded so far, not |
- including the point we resumed from. |
- total_size: Total size of the download. |
- """ |
- if callback: |
- callback(cur_file_size + total_bytes_downloaded, total_size) |
- |
+ cb_handler = DownloadProxyCallbackHandler(start_byte, callback) |
headers = headers.copy() |
- headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1) |
- cb = DownloadProxyCallback |
+ headers['Range'] = 'bytes=%d-%d' % (start_byte, end_byte) |
# Disable AWSAuthConnection-level retry behavior, since that would |
# cause downloads to restart from scratch. |
try: |
- key.get_file(fp, headers, cb, num_callbacks, override_num_retries=0, |
- hash_algs=hash_algs) |
+ key.get_file(fp, headers, cb_handler.call, num_callbacks, |
+ override_num_retries=0, hash_algs=hash_algs) |
except TypeError: |
- key.get_file(fp, headers, cb, num_callbacks, override_num_retries=0) |
+ key.get_file(fp, headers, cb_handler.call, num_callbacks, |
+ override_num_retries=0) |
fp.flush() |
# Download succeeded. |
return |
@@ -583,9 +598,10 @@ class BotoTranslation(CloudApi): |
# so we need to close and reopen the key before resuming |
# the download. |
if self.provider == 's3': |
- key.get_file(fp, headers, cb, num_callbacks, override_num_retries=0) |
+ key.get_file(fp, headers, cb_handler.call, num_callbacks, |
+ override_num_retries=0) |
else: # self.provider == 'gs' |
- key.get_file(fp, headers, cb, num_callbacks, |
+ key.get_file(fp, headers, cb_handler.call, num_callbacks, |
override_num_retries=0, hash_algs=hash_algs) |
except BotoResumableDownloadException, e: |
if (e.disposition == |
@@ -597,7 +613,9 @@ class BotoTranslation(CloudApi): |
'retry', e.message) |
# At this point we had a re-tryable failure; see if made progress. |
- if GetFileSize(fp) > had_file_bytes_before_attempt: |
+ start_byte = fp.tell() |
+ if start_byte > last_progress_byte: |
+ last_progress_byte = start_byte |
progress_less_iterations = 0 |
else: |
progress_less_iterations += 1 |
@@ -819,8 +837,11 @@ class BotoTranslation(CloudApi): |
return self._HandleSuccessfulUpload(dst_uri, object_metadata, |
fields=fields) |
except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
+ not_found_exception = CreateNotFoundExceptionForObjectWrite( |
+ self.provider, object_metadata.bucket) |
self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket, |
- object_name=object_metadata.name) |
+ object_name=object_metadata.name, |
+ not_found_exception=not_found_exception) |
def UploadObjectStreaming(self, upload_stream, object_metadata, |
canned_acl=None, progress_callback=None, |
@@ -836,8 +857,11 @@ class BotoTranslation(CloudApi): |
return self._HandleSuccessfulUpload(dst_uri, object_metadata, |
fields=fields) |
except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
+ not_found_exception = CreateNotFoundExceptionForObjectWrite( |
+ self.provider, object_metadata.bucket) |
self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket, |
- object_name=object_metadata.name) |
+ object_name=object_metadata.name, |
+ not_found_exception=not_found_exception) |
def UploadObject(self, upload_stream, object_metadata, canned_acl=None, |
preconditions=None, size=None, progress_callback=None, |
@@ -860,8 +884,11 @@ class BotoTranslation(CloudApi): |
return self._HandleSuccessfulUpload(dst_uri, object_metadata, |
fields=fields) |
except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
+ not_found_exception = CreateNotFoundExceptionForObjectWrite( |
+ self.provider, object_metadata.bucket) |
self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket, |
- object_name=object_metadata.name) |
+ object_name=object_metadata.name, |
+ not_found_exception=not_found_exception) |
def DeleteObject(self, bucket_name, object_name, preconditions=None, |
generation=None, provider=None): |
@@ -920,8 +947,13 @@ class BotoTranslation(CloudApi): |
return self._BotoKeyToObject(new_key, fields=fields) |
except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
- self._TranslateExceptionAndRaise(e, dst_obj_metadata.bucket, |
- dst_obj_metadata.name) |
+ not_found_exception = CreateNotFoundExceptionForObjectWrite( |
+ self.provider, dst_obj_metadata.bucket, src_provider=self.provider, |
+ src_bucket_name=src_obj_metadata.bucket, |
+ src_object_name=src_obj_metadata.name, src_generation=src_generation) |
+ self._TranslateExceptionAndRaise(e, bucket_name=dst_obj_metadata.bucket, |
+ object_name=dst_obj_metadata.name, |
+ not_found_exception=not_found_exception) |
def ComposeObject(self, src_objs_metadata, dst_obj_metadata, |
preconditions=None, provider=None, fields=None): |
@@ -1382,7 +1414,7 @@ class BotoTranslation(CloudApi): |
raise |
def _TranslateExceptionAndRaise(self, e, bucket_name=None, object_name=None, |
- generation=None): |
+ generation=None, not_found_exception=None): |
"""Translates a Boto exception and raises the translated or original value. |
Args: |
@@ -1390,6 +1422,7 @@ class BotoTranslation(CloudApi): |
bucket_name: Optional bucket name in request that caused the exception. |
object_name: Optional object name in request that caused the exception. |
generation: Optional generation in request that caused the exception. |
+ not_found_exception: Optional exception to raise in the not-found case. |
Raises: |
Translated CloudApi exception, or the original exception if it was not |
@@ -1397,14 +1430,14 @@ class BotoTranslation(CloudApi): |
""" |
translated_exception = self._TranslateBotoException( |
e, bucket_name=bucket_name, object_name=object_name, |
- generation=generation) |
+ generation=generation, not_found_exception=not_found_exception) |
if translated_exception: |
raise translated_exception |
else: |
raise |
def _TranslateBotoException(self, e, bucket_name=None, object_name=None, |
- generation=None): |
+ generation=None, not_found_exception=None): |
"""Translates boto exceptions into their gsutil Cloud API equivalents. |
Args: |
@@ -1412,6 +1445,7 @@ class BotoTranslation(CloudApi): |
bucket_name: Optional bucket name in request that caused the exception. |
object_name: Optional object name in request that caused the exception. |
generation: Optional generation in request that caused the exception. |
+ not_found_exception: Optional exception to raise in the not-found case. |
Returns: |
CloudStorageApiServiceException for translatable exceptions, None |
@@ -1425,14 +1459,20 @@ class BotoTranslation(CloudApi): |
elif e.status == 401 or e.status == 403: |
return AccessDeniedException(e.code, status=e.status, body=e.body) |
elif e.status == 404: |
- if bucket_name: |
+ if not_found_exception: |
+ # The exception is pre-constructed prior to translation; the HTTP |
+ # status code isn't available at that time. |
+ setattr(not_found_exception, 'status', e.status) |
+ return not_found_exception |
+ elif bucket_name: |
if object_name: |
return CreateObjectNotFoundException(e.status, self.provider, |
bucket_name, object_name, |
generation=generation) |
return CreateBucketNotFoundException(e.status, self.provider, |
bucket_name) |
- return NotFoundException(e.code, status=e.status, body=e.body) |
+ return NotFoundException(e.message, status=e.status, body=e.body) |
+ |
elif e.status == 409 and e.code and 'BucketNotEmpty' in e.code: |
return NotEmptyException('BucketNotEmpty (%s)' % bucket_name, |
status=e.status, body=e.body) |